source: icGREP/icgrep-devel/icgrep/toolchain/pipeline.cpp @ 5544

Last change on this file since 5544 was 5486, checked in by nmedfort, 2 years ago

Initial attempt to improve debugging capabilities with compilation stack traces on error.

File size: 28.9 KB
RevLine 
[4929]1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
[5227]6#include "pipeline.h"
[5425]7#include <toolchain/toolchain.h>
[5086]8#include <kernels/kernel.h>
[5276]9#include <kernels/streamset.h>
[5267]10#include <llvm/IR/Module.h>
[5403]11#include <boost/container/flat_set.hpp>
[5390]12#include <boost/container/flat_map.hpp>
[5486]13#include <llvm/Support/CommandLine.h>
[5436]14#include <kernels/kernel_builder.h>
[4929]15
[5440]16#include <llvm/Support/raw_ostream.h>
17
[4974]18using namespace kernel;
[5260]19using namespace parabix;
20using namespace llvm;
[4929]21
[5486]22// static cl::opt<bool> UseYield("yield", cl::desc("yield after waiting"), cl::init(false));
[5436]23
[5390]24template <typename Value>
25using StreamSetBufferMap = boost::container::flat_map<const StreamSetBuffer *, Value>;
26
[5403]27template <typename Value>
28using FlatSet = boost::container::flat_set<Value>;
[5390]29
[5436]30Function * makeThreadFunction(const std::unique_ptr<kernel::KernelBuilder> & b, const std::string & name) {
[5411]31    Function * const f = Function::Create(FunctionType::get(b->getVoidTy(), {b->getVoidPtrTy()}, false), Function::InternalLinkage, name, b->getModule());
[5402]32    f->setCallingConv(CallingConv::C);
33    f->arg_begin()->setName("input");
34    return f;
35}
36
[5403]37/** ------------------------------------------------------------------------------------------------------------- *
38 * @brief generateSegmentParallelPipeline
39 *
40 * Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
41 * operating over an input stream set S, a segment-parallel implementation divides the input
42 * into segments and coordinates a set of T <= K threads to each process one segment at a time.
43 * Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
44 * fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
45 ** ------------------------------------------------------------------------------------------------------------- */
[5436]46void generateSegmentParallelPipeline(const std::unique_ptr<KernelBuilder> & iBuilder, const std::vector<Kernel *> & kernels) {
[5403]47
[5408]48    const unsigned n = kernels.size();
[5402]49    Module * const m = iBuilder->getModule();
[5403]50    IntegerType * const sizeTy = iBuilder->getSizeTy();
[5267]51    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
[5403]52    Constant * nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
53    std::vector<Type *> structTypes;
[5408]54
55    Value * instance[n];
56    for (unsigned i = 0; i < n; ++i) {
57        instance[i] = kernels[i]->getInstance();
58        structTypes.push_back(instance[i]->getType());
[5403]59    }
60    StructType * const sharedStructType = StructType::get(m->getContext(), structTypes);
61    StructType * const threadStructType = StructType::get(sharedStructType->getPointerTo(), sizeTy, nullptr);
[5165]62
[5411]63    Function * const threadFunc = makeThreadFunction(iBuilder, "segment");
[5408]64
[5403]65    // -------------------------------------------------------------------------------------------------------------------------
66    // MAKE SEGMENT PARALLEL PIPELINE THREAD
67    // -------------------------------------------------------------------------------------------------------------------------
68    const auto ip = iBuilder->saveIP();
69
[5165]70     // Create the basic blocks for the thread function.
[5402]71    BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc);
[5403]72    iBuilder->SetInsertPoint(entryBlock);
73    Value * const input = &threadFunc->getArgumentList().front();
74    Value * const threadStruct = iBuilder->CreatePointerCast(input, threadStructType->getPointerTo());
75    Value * const sharedStatePtr = iBuilder->CreateLoad(iBuilder->CreateGEP(threadStruct, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
[5408]76    for (unsigned k = 0; k < n; ++k) {
77        Value * ptr = iBuilder->CreateLoad(iBuilder->CreateGEP(sharedStatePtr, {iBuilder->getInt32(0), iBuilder->getInt32(k)}));
78        kernels[k]->setInstance(ptr);
[5165]79    }
[5403]80    Value * const segOffset = iBuilder->CreateLoad(iBuilder->CreateGEP(threadStruct, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
[5165]81
[5403]82    BasicBlock * segmentLoop = BasicBlock::Create(iBuilder->getContext(), "segmentLoop", threadFunc);
[5165]83    iBuilder->CreateBr(segmentLoop);
84
85    iBuilder->SetInsertPoint(segmentLoop);
[5403]86    PHINode * const segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 2, "segNo");
87    segNo->addIncoming(segOffset, entryBlock);
[5274]88
[5418]89    Value * terminated = iBuilder->getFalse();
[5403]90    Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
91
92    BasicBlock * segmentLoopBody = nullptr;
93    BasicBlock * const exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
94
95    StreamSetBufferMap<Value *> producedPos;
[5418]96    StreamSetBufferMap<Value *> consumedPos;
[5403]97
[5456]98    Value * cycleCountStart = nullptr;
99    Value * cycleCountEnd = nullptr;
100    if (codegen::EnableCycleCounter) {
101        cycleCountStart = iBuilder->CreateReadCycleCounter();
102    }
103
[5408]104    for (unsigned k = 0; k < n; ++k) {
105
[5407]106        const auto & kernel = kernels[k];
[5390]107
[5408]108        BasicBlock * const segmentWait = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Wait", threadFunc);
[5486]109
110        BasicBlock * segmentYield = segmentWait;
111//        if (UseYield) {
112//            segmentYield = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Yield", threadFunc);
113//        }
114
[5408]115        iBuilder->CreateBr(segmentWait);
116
[5403]117        segmentLoopBody = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Do", threadFunc);
118
119        iBuilder->SetInsertPoint(segmentWait);
[5408]120        const unsigned waitIdx = codegen::DebugOptionIsSet(codegen::SerializeThreads) ? (n - 1) : k;
[5435]121
[5440]122        iBuilder->setKernel(kernels[waitIdx]);
123        Value * const processedSegmentCount = iBuilder->acquireLogicalSegmentNo();
124        iBuilder->setKernel(kernel);
[5435]125
[5403]126        assert (processedSegmentCount->getType() == segNo->getType());
127        Value * const ready = iBuilder->CreateICmpEQ(segNo, processedSegmentCount);
128
[5402]129        if (kernel->hasNoTerminateAttribute()) {
[5486]130            iBuilder->CreateCondBr(ready, segmentLoopBody, segmentYield);
[5292]131        } else { // If the kernel was terminated in a previous segment then the pipeline is done.
[5402]132            BasicBlock * completionTest = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Completed", threadFunc, 0);
133            BasicBlock * exitBlock = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Exit", threadFunc, 0);
[5486]134            iBuilder->CreateCondBr(ready, completionTest, segmentYield);
[5408]135
[5274]136            iBuilder->SetInsertPoint(completionTest);
[5440]137            Value * terminationSignal = iBuilder->getTerminationSignal();
[5435]138            iBuilder->CreateCondBr(terminationSignal, exitBlock, segmentLoopBody);
[5305]139            iBuilder->SetInsertPoint(exitBlock);
140            // Ensure that the next thread will also exit.
[5440]141            iBuilder->releaseLogicalSegmentNo(nextSegNo);
[5305]142            iBuilder->CreateBr(exitThreadBlock);
[5274]143        }
[5403]144
[5486]145//        if (UseYield) {
146//            // Yield the thread after waiting
147//            iBuilder->SetInsertPoint(segmentYield);
148//            iBuilder->CreatePThreadYield();
149//            iBuilder->CreateBr(segmentWait);
150//        }
151
[5403]152        // Execute the kernel segment
153        iBuilder->SetInsertPoint(segmentLoopBody);
154        const auto & inputs = kernel->getStreamInputs();
[5418]155        std::vector<Value *> args = {kernel->getInstance(), terminated};
[5403]156        for (unsigned i = 0; i < inputs.size(); ++i) {
157            const auto f = producedPos.find(kernel->getStreamSetInputBuffer(i));
[5418]158            assert (f != producedPos.end());
[5403]159            args.push_back(f->second);
[5253]160        }
[5408]161
[5440]162        iBuilder->setKernel(kernel);
163        iBuilder->createDoSegmentCall(args);
[5403]164        if (!kernel->hasNoTerminateAttribute()) {
[5440]165            terminated = iBuilder->CreateOr(terminated, iBuilder->getTerminationSignal());
[5370]166        }
[5411]167
168        const auto & outputs = kernel->getStreamOutputs();
[5440]169        for (unsigned i = 0; i < outputs.size(); ++i) {           
170            Value * const produced = iBuilder->getProducedItemCount(outputs[i].name, terminated);
[5403]171            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
172            assert (producedPos.count(buf) == 0);
173            producedPos.emplace(buf, produced);
[5263]174        }
[5418]175        for (unsigned i = 0; i < inputs.size(); ++i) {
[5440]176            Value * const processedItemCount = iBuilder->getProcessedItemCount(inputs[i].name);
[5435]177            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);           
[5418]178            auto f = consumedPos.find(buf);
179            if (f == consumedPos.end()) {
180                consumedPos.emplace(buf, processedItemCount);
181            } else {
182                Value * lesser = iBuilder->CreateICmpULT(processedItemCount, f->second);
183                f->second = iBuilder->CreateSelect(lesser, processedItemCount, f->second);
184            }
185        }
[5456]186        if (codegen::EnableCycleCounter) {
187            cycleCountEnd = iBuilder->CreateReadCycleCounter();
188            Value * counterPtr = iBuilder->getScalarFieldPtr(Kernel::CYCLECOUNT_SCALAR);
189            iBuilder->CreateStore(iBuilder->CreateAdd(iBuilder->CreateLoad(counterPtr), iBuilder->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
190            cycleCountStart = cycleCountEnd;
191        }
192       
[5440]193        iBuilder->releaseLogicalSegmentNo(nextSegNo);
[5165]194    }
[5266]195
[5408]196    assert (segmentLoopBody);
197    exitThreadBlock->moveAfter(segmentLoopBody);
198
[5418]199    for (const auto consumed : consumedPos) {
200        const StreamSetBuffer * const buf = consumed.first;
[5435]201        Kernel * kernel = buf->getProducer();
202        const auto & outputs = kernel->getStreamSetOutputBuffers();
[5418]203        for (unsigned i = 0; i < outputs.size(); ++i) {
204            if (outputs[i] == buf) {
[5440]205                iBuilder->setKernel(kernel);
206                iBuilder->setConsumedItemCount(kernel->getStreamOutput(i).name, consumed.second);
[5418]207                break;
208            }
209        }
210    }
211
212    segNo->addIncoming(iBuilder->CreateAdd(segNo, iBuilder->getSize(codegen::ThreadNum)), segmentLoopBody);
213    iBuilder->CreateCondBr(terminated, exitThreadBlock, segmentLoop);
214
[5408]215    iBuilder->SetInsertPoint(exitThreadBlock);
[5418]216
217    // only call pthread_exit() within spawned threads; otherwise it'll be equivalent to calling exit() within the process
218    BasicBlock * const exitThread = BasicBlock::Create(iBuilder->getContext(), "ExitThread", threadFunc);
219    BasicBlock * const exitFunction = BasicBlock::Create(iBuilder->getContext(), "ExitProcessFunction", threadFunc);
220
221    Value * const exitCond = iBuilder->CreateICmpEQ(segOffset, ConstantInt::getNullValue(segOffset->getType()));
222    iBuilder->CreateCondBr(exitCond, exitFunction, exitThread);
223    iBuilder->SetInsertPoint(exitThread);
[5408]224    iBuilder->CreatePThreadExitCall(nullVoidPtrVal);
[5418]225    iBuilder->CreateBr(exitFunction);
226    iBuilder->SetInsertPoint(exitFunction);
[5408]227    iBuilder->CreateRetVoid();
228
[5403]229    // -------------------------------------------------------------------------------------------------------------------------
[5263]230    iBuilder->restoreIP(ip);
[5403]231
[5408]232    for (unsigned i = 0; i < n; ++i) {
233        kernels[i]->setInstance(instance[i]);
234    }
235
[5403]236    // -------------------------------------------------------------------------------------------------------------------------
237    // MAKE SEGMENT PARALLEL PIPELINE DRIVER
238    // -------------------------------------------------------------------------------------------------------------------------
[5418]239    const unsigned threads = codegen::ThreadNum - 1;
240    assert (codegen::ThreadNum > 1);
[5403]241    Type * const pthreadsTy = ArrayType::get(sizeTy, threads);
242    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
243    Value * threadIdPtr[threads];
[5408]244
245    for (unsigned i = 0; i < threads; ++i) {
[5403]246        threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
247    }
248
[5408]249    for (unsigned i = 0; i < n; ++i) {
[5440]250        iBuilder->setKernel(kernels[i]);
251        iBuilder->releaseLogicalSegmentNo(iBuilder->getSize(0));
[5403]252    }
253
254    AllocaInst * const sharedStruct = iBuilder->CreateCacheAlignedAlloca(sharedStructType);
[5408]255    for (unsigned i = 0; i < n; ++i) {
[5403]256        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
257        iBuilder->CreateStore(kernels[i]->getInstance(), ptr);
258    }
259
[5418]260    // use the process thread to handle the initial segment function after spawning (n - 1) threads to handle the subsequent offsets
[5408]261    for (unsigned i = 0; i < threads; ++i) {
[5418]262        AllocaInst * const threadState = iBuilder->CreateAlloca(threadStructType);
263        iBuilder->CreateStore(sharedStruct, iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
264        iBuilder->CreateStore(iBuilder->getSize(i + 1), iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
[5403]265        iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, threadFunc, threadState);
266    }
267
[5418]268    AllocaInst * const threadState = iBuilder->CreateAlloca(threadStructType);
269    iBuilder->CreateStore(sharedStruct, iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
270    iBuilder->CreateStore(iBuilder->getSize(0), iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
271    iBuilder->CreateCall(threadFunc, iBuilder->CreatePointerCast(threadState, voidPtrTy));
272
[5411]273    AllocaInst * const status = iBuilder->CreateAlloca(voidPtrTy);
[5408]274    for (unsigned i = 0; i < threads; ++i) {
[5403]275        Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]);
276        iBuilder->CreatePThreadJoinCall(threadId, status);
277    }
[5456]278   
279    if (codegen::EnableCycleCounter) {
280        for (unsigned k = 0; k < kernels.size(); k++) {
281            auto & kernel = kernels[k];
282            iBuilder->setKernel(kernel);
283            const auto & inputs = kernel->getStreamInputs();
284            const auto & outputs = kernel->getStreamOutputs();
285            Value * items = nullptr;
286            if (inputs.empty()) {
287                items = iBuilder->getProducedItemCount(outputs[0].name);
288            } else {
289                items = iBuilder->getProcessedItemCount(inputs[0].name);
290            }
291            Value * fItems = iBuilder->CreateUIToFP(items, iBuilder->getDoubleTy());
292            Value * cycles = iBuilder->CreateLoad(iBuilder->getScalarFieldPtr(Kernel::CYCLECOUNT_SCALAR));
293            Value * fCycles = iBuilder->CreateUIToFP(cycles, iBuilder->getDoubleTy());
294            std::string formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
295            Value * stringPtr = iBuilder->CreatePointerCast(iBuilder->GetString(formatString), iBuilder->getInt8PtrTy());
296            iBuilder->CreateCall(iBuilder->GetDprintf(), {iBuilder->getInt32(2), stringPtr, fItems, fCycles, iBuilder->CreateFDiv(fCycles, fItems)});
297        }
298    }
299   
[5165]300}
301
[5251]302
[5403]303/** ------------------------------------------------------------------------------------------------------------- *
304 * @brief generateParallelPipeline
305 ** ------------------------------------------------------------------------------------------------------------- */
[5436]306void generateParallelPipeline(const std::unique_ptr<KernelBuilder> & iBuilder, const std::vector<Kernel *> &kernels) {
[5251]307
[5403]308    Module * const m = iBuilder->getModule();
309    IntegerType * const sizeTy = iBuilder->getSizeTy();
[5267]310    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
[5403]311    ConstantInt * bufferSegments = ConstantInt::get(sizeTy, codegen::BufferSegments - 1);
312    ConstantInt * segmentItems = ConstantInt::get(sizeTy, codegen::SegmentSize * iBuilder->getBitBlockWidth());
313    Constant * const nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
314
315    const unsigned n = kernels.size();
316
317    Type * const pthreadsTy = ArrayType::get(sizeTy, n);
[5165]318    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
[5403]319    Value * threadIdPtr[n];
[5408]320    for (unsigned i = 0; i < n; ++i) {
[5403]321        threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
[5165]322    }
[5403]323
[5408]324    Value * instance[n];
[5403]325    Type * structTypes[n];
[5408]326    for (unsigned i = 0; i < n; ++i) {
327        instance[i] = kernels[i]->getInstance();
328        structTypes[i] = instance[i]->getType();
[5165]329    }
[5408]330
[5403]331    Type * const sharedStructType = StructType::get(m->getContext(), ArrayRef<Type *>{structTypes, n});
[5408]332
333
[5202]334    AllocaInst * sharedStruct = iBuilder->CreateAlloca(sharedStructType);
[5408]335    for (unsigned i = 0; i < n; ++i) {
[5221]336        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
[5408]337        iBuilder->CreateStore(instance[i], ptr);
[5165]338    }
[5408]339
[5407]340    for (auto & kernel : kernels) {
[5440]341        iBuilder->setKernel(kernel);
342        iBuilder->releaseLogicalSegmentNo(iBuilder->getSize(0));
[5273]343    }
344
[5403]345    // GENERATE THE PRODUCING AND CONSUMING KERNEL MAPS
346    StreamSetBufferMap<unsigned> producingKernel;
347    StreamSetBufferMap<std::vector<unsigned>> consumingKernels;
348    for (unsigned id = 0; id < n; ++id) {
[5407]349        const auto & kernel = kernels[id];
[5403]350        const auto & inputs = kernel->getStreamInputs();
351        const auto & outputs = kernel->getStreamOutputs();
352        // add any outputs from this kernel to the producing kernel map
353        for (unsigned j = 0; j < outputs.size(); ++j) {
354            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(j);
355            if (LLVM_UNLIKELY(producingKernel.count(buf) != 0)) {
356                report_fatal_error(kernel->getName() + " redefines stream set " + outputs[j].name);
357            }
358            producingKernel.emplace(buf, id);
359        }
360        // and any inputs to the consuming kernels list
361        for (unsigned j = 0; j < inputs.size(); ++j) {
362            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(j);
363            auto f = consumingKernels.find(buf);
364            if (f == consumingKernels.end()) {
365                if (LLVM_UNLIKELY(producingKernel.count(buf) == 0)) {
366                    report_fatal_error(kernel->getName() + " uses stream set " + inputs[j].name + " prior to its definition");
367                }
368                consumingKernels.emplace(buf, std::vector<unsigned>{ id });
369            } else {
370                f->second.push_back(id);
371            }
372        }
[5165]373    }
374
[5363]375    const auto ip = iBuilder->saveIP();
376
[5403]377    // GENERATE UNIQUE PIPELINE PARALLEL THREAD FUNCTION FOR EACH KERNEL
378    FlatSet<unsigned> kernelSet;
379    kernelSet.reserve(n);
[5363]380
[5403]381    Function * thread_functions[n];
[5408]382    Value * producerSegNo[n];
[5403]383    for (unsigned id = 0; id < n; id++) {
[5407]384        const auto & kernel = kernels[id];
[5440]385
386        iBuilder->setKernel(kernel);
387
[5403]388        const auto & inputs = kernel->getStreamInputs();
[5363]389
[5411]390        Function * const threadFunc = makeThreadFunction(iBuilder, "ppt:" + kernel->getName());
[5390]391
[5403]392         // Create the basic blocks for the thread function.
393        BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc);
394        BasicBlock * outputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "outputCheck", threadFunc);
395        BasicBlock * inputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "inputCheck", threadFunc);
396        BasicBlock * doSegmentBlock = BasicBlock::Create(iBuilder->getContext(), "doSegment", threadFunc);
397        BasicBlock * exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
[5363]398
[5403]399        iBuilder->SetInsertPoint(entryBlock);
[5363]400
[5403]401        Value * sharedStruct = iBuilder->CreateBitCast(&threadFunc->getArgumentList().front(), sharedStructType->getPointerTo());
[5363]402
[5403]403        for (unsigned k = 0; k < n; k++) {
404            Value * const ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
[5408]405            kernels[k]->setInstance(iBuilder->CreateLoad(ptr));
[5402]406        }
[5363]407
[5403]408        iBuilder->CreateBr(outputCheckBlock);
[5363]409
[5403]410        // Check whether the output buffers are ready for more data
411        iBuilder->SetInsertPoint(outputCheckBlock);
412        PHINode * segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 3, "segNo");
413        segNo->addIncoming(iBuilder->getSize(0), entryBlock);
414        segNo->addIncoming(segNo, outputCheckBlock);
[5363]415
[5403]416        Value * outputWaitCond = iBuilder->getTrue();
417        for (const StreamSetBuffer * buf : kernel->getStreamSetOutputBuffers()) {
418            const auto & list = consumingKernels[buf];
419            assert(std::is_sorted(list.begin(), list.end()));
420            kernelSet.insert(list.begin(), list.end());
421        }
422        for (unsigned k : kernelSet) {
[5440]423            iBuilder->setKernel(kernels[k]);
424            Value * consumerSegNo = iBuilder->acquireLogicalSegmentNo();
[5403]425            assert (consumerSegNo->getType() == segNo->getType());
426            Value * consumedSegNo = iBuilder->CreateAdd(consumerSegNo, bufferSegments);
427            outputWaitCond = iBuilder->CreateAnd(outputWaitCond, iBuilder->CreateICmpULE(segNo, consumedSegNo));
428        }
429        kernelSet.clear();
[5440]430        iBuilder->setKernel(kernel);
[5403]431        iBuilder->CreateCondBr(outputWaitCond, inputCheckBlock, outputCheckBlock);
[5363]432
[5403]433        // Check whether the input buffers have enough data for this kernel to begin
434        iBuilder->SetInsertPoint(inputCheckBlock);
435        for (const StreamSetBuffer * buf : kernel->getStreamSetInputBuffers()) {
436            kernelSet.insert(producingKernel[buf]);
[5402]437        }
[5363]438
[5402]439        Value * inputWaitCond = iBuilder->getTrue();
[5408]440        for (unsigned k : kernelSet) {
[5440]441            iBuilder->setKernel(kernels[k]);
442            producerSegNo[k] = iBuilder->acquireLogicalSegmentNo();
[5408]443            assert (producerSegNo[k]->getType() == segNo->getType());
444            inputWaitCond = iBuilder->CreateAnd(inputWaitCond, iBuilder->CreateICmpULT(segNo, producerSegNo[k]));
[5363]445        }
[5440]446        iBuilder->setKernel(kernel);
[5402]447        iBuilder->CreateCondBr(inputWaitCond, doSegmentBlock, inputCheckBlock);
[5363]448
[5403]449        // Process the segment
[5363]450        iBuilder->SetInsertPoint(doSegmentBlock);
451
[5403]452        Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
453        Value * terminated = nullptr;
[5408]454        if (kernelSet.empty()) {
[5403]455            // if this kernel has no input streams, the kernel itself must decide when it terminates.
[5440]456            terminated = iBuilder->getTerminationSignal();
[5403]457        } else {
458            // ... otherwise the kernel terminates only when it exhausts all of its input streams
459            terminated = iBuilder->getTrue();
460            for (unsigned k : kernelSet) {
[5440]461                iBuilder->setKernel(kernels[k]);
462                terminated = iBuilder->CreateAnd(terminated, iBuilder->getTerminationSignal());
[5408]463                terminated = iBuilder->CreateAnd(terminated, iBuilder->CreateICmpEQ(nextSegNo, producerSegNo[k]));
[5403]464            }
465            kernelSet.clear();
[5440]466            iBuilder->setKernel(kernel);
[5363]467        }
[5403]468
[5408]469        std::vector<Value *> args = {kernel->getInstance(), terminated};
[5403]470        args.insert(args.end(), inputs.size(), iBuilder->CreateMul(segmentItems, segNo));
471
[5440]472        iBuilder->createDoSegmentCall(args);
[5363]473        segNo->addIncoming(nextSegNo, doSegmentBlock);
[5440]474        iBuilder->releaseLogicalSegmentNo(nextSegNo);
[5363]475
476        iBuilder->CreateCondBr(terminated, exitThreadBlock, outputCheckBlock);
477
[5403]478        iBuilder->SetInsertPoint(exitThreadBlock);
[5418]479
[5403]480        iBuilder->CreatePThreadExitCall(nullVoidPtrVal);
[5418]481
[5403]482        iBuilder->CreateRetVoid();
[5135]483
[5403]484        thread_functions[id] = threadFunc;
[5390]485    }
[5402]486
[5403]487    iBuilder->restoreIP(ip);
[5402]488
[5408]489    for (unsigned i = 0; i < n; ++i) {
490        kernels[i]->setInstance(instance[i]);
491    }
492
493    for (unsigned i = 0; i < n; ++i) {
[5403]494        iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, thread_functions[i], sharedStruct);
[5363]495    }
[5135]496
[5411]497    AllocaInst * const status = iBuilder->CreateAlloca(voidPtrTy);
[5408]498    for (unsigned i = 0; i < n; ++i) {
[5403]499        Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]);
[5402]500        iBuilder->CreatePThreadJoinCall(threadId, status);
[5363]501    }
502}
503
[5403]504/** ------------------------------------------------------------------------------------------------------------- *
505 * @brief generatePipelineLoop
506 ** ------------------------------------------------------------------------------------------------------------- */
[5436]507void generatePipelineLoop(const std::unique_ptr<KernelBuilder> & iBuilder, const std::vector<Kernel *> & kernels) {
[5402]508
[5086]509    BasicBlock * entryBlock = iBuilder->GetInsertBlock();
510    Function * main = entryBlock->getParent();
[5273]511
[5263]512    // Create the basic blocks for the loop.
[5402]513    BasicBlock * pipelineLoop = BasicBlock::Create(iBuilder->getContext(), "pipelineLoop", main);
514    BasicBlock * pipelineExit = BasicBlock::Create(iBuilder->getContext(), "pipelineExit", main);
[5390]515
[5402]516    StreamSetBufferMap<Value *> producedPos;
[5418]517    StreamSetBufferMap<Value *> consumedPos;
[5263]518
[5402]519    iBuilder->CreateBr(pipelineLoop);
520    iBuilder->SetInsertPoint(pipelineLoop);
[5424]521   
522    Value * cycleCountStart = nullptr;
523    Value * cycleCountEnd = nullptr;
524    if (codegen::EnableCycleCounter) {
525        cycleCountStart = iBuilder->CreateReadCycleCounter();
526    }
[5402]527    Value * terminated = iBuilder->getFalse();
[5424]528    for (unsigned k = 0; k < kernels.size(); k++) {
[5440]529
[5424]530        auto & kernel = kernels[k];
[5418]531
[5440]532        iBuilder->setKernel(kernel);
[5402]533        const auto & inputs = kernel->getStreamInputs();
[5418]534        const auto & outputs = kernel->getStreamOutputs();
535
[5408]536        std::vector<Value *> args = {kernel->getInstance(), terminated};
[5402]537        for (unsigned i = 0; i < inputs.size(); ++i) {
538            const auto f = producedPos.find(kernel->getStreamSetInputBuffer(i));
539            if (LLVM_UNLIKELY(f == producedPos.end())) {
540                report_fatal_error(kernel->getName() + " uses stream set " + inputs[i].name + " prior to its definition");
541            }
542            args.push_back(f->second);
[5252]543        }
[5418]544
[5440]545        iBuilder->createDoSegmentCall(args);
[5398]546        if (!kernel->hasNoTerminateAttribute()) {
[5440]547            Value * terminatedSignal = iBuilder->getTerminationSignal();
[5435]548            terminated = iBuilder->CreateOr(terminated, terminatedSignal);
[5252]549        }
[5408]550        for (unsigned i = 0; i < outputs.size(); ++i) {
[5440]551            Value * const produced = iBuilder->getProducedItemCount(outputs[i].name, terminated);
[5402]552            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
553            assert (producedPos.count(buf) == 0);
554            producedPos.emplace(buf, produced);
[5252]555        }
[5408]556
[5418]557        for (unsigned i = 0; i < inputs.size(); ++i) {
[5440]558            Value * const processedItemCount = iBuilder->getProcessedItemCount(inputs[i].name);
[5418]559            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);
560            auto f = consumedPos.find(buf);
561            if (f == consumedPos.end()) {
562                consumedPos.emplace(buf, processedItemCount);
563            } else {
564                Value * lesser = iBuilder->CreateICmpULT(processedItemCount, f->second);
565                f->second = iBuilder->CreateSelect(lesser, processedItemCount, f->second);
566            }
567        }
[5424]568        if (codegen::EnableCycleCounter) {
569            cycleCountEnd = iBuilder->CreateReadCycleCounter();
[5456]570            //Value * counterPtr = iBuilder->CreateGEP(mCycleCounts, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
571            Value * counterPtr = iBuilder->getScalarFieldPtr(Kernel::CYCLECOUNT_SCALAR);
[5424]572            iBuilder->CreateStore(iBuilder->CreateAdd(iBuilder->CreateLoad(counterPtr), iBuilder->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
573            cycleCountStart = cycleCountEnd;
574        }
[5435]575
[5440]576        Value * const segNo = iBuilder->acquireLogicalSegmentNo();
[5435]577        Value * nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
[5440]578        iBuilder->releaseLogicalSegmentNo(nextSegNo);
[5025]579    }
[5408]580
[5418]581    for (const auto consumed : consumedPos) {
582        const StreamSetBuffer * const buf = consumed.first;
[5435]583        Kernel * k = buf->getProducer();
[5418]584        const auto & outputs = k->getStreamSetOutputBuffers();
585        for (unsigned i = 0; i < outputs.size(); ++i) {
586            if (outputs[i] == buf) {
[5440]587                iBuilder->setKernel(k);
588                iBuilder->setConsumedItemCount(k->getStreamOutput(i).name, consumed.second);
[5418]589                break;
590            }
591        }
592    }
593
[5402]594    iBuilder->CreateCondBr(terminated, pipelineExit, pipelineLoop);
595    iBuilder->SetInsertPoint(pipelineExit);
[5424]596    if (codegen::EnableCycleCounter) {
597        for (unsigned k = 0; k < kernels.size(); k++) {
598            auto & kernel = kernels[k];
[5440]599            iBuilder->setKernel(kernel);
[5424]600            const auto & inputs = kernel->getStreamInputs();
601            const auto & outputs = kernel->getStreamOutputs();
[5440]602            Value * items = nullptr;
603            if (inputs.empty()) {
604                items = iBuilder->getProducedItemCount(outputs[0].name);
605            } else {
[5446]606                items = iBuilder->getProcessedItemCount(inputs[0].name);
[5440]607            }
[5424]608            Value * fItems = iBuilder->CreateUIToFP(items, iBuilder->getDoubleTy());
[5456]609            Value * cycles = iBuilder->CreateLoad(iBuilder->getScalarFieldPtr(Kernel::CYCLECOUNT_SCALAR));
[5424]610            Value * fCycles = iBuilder->CreateUIToFP(cycles, iBuilder->getDoubleTy());
611            std::string formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
[5435]612            Value * stringPtr = iBuilder->CreatePointerCast(iBuilder->GetString(formatString), iBuilder->getInt8PtrTy());
[5424]613            iBuilder->CreateCall(iBuilder->GetDprintf(), {iBuilder->getInt32(2), stringPtr, fItems, fCycles, iBuilder->CreateFDiv(fCycles, fItems)});
614        }
615    }
[5252]616}
Note: See TracBrowser for help on using the repository browser.