source: icGREP/icgrep-devel/icgrep/toolchain/pipeline.cpp @ 5597

Last change on this file since 5597 was 5597, checked in by nmedfort, 23 months ago

Modified stream set buffers to use heap memory.

File size: 29.0 KB
RevLine 
[4929]1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
[5227]6#include "pipeline.h"
[5425]7#include <toolchain/toolchain.h>
[5086]8#include <kernels/kernel.h>
[5276]9#include <kernels/streamset.h>
[5267]10#include <llvm/IR/Module.h>
[5403]11#include <boost/container/flat_set.hpp>
[5390]12#include <boost/container/flat_map.hpp>
[5486]13#include <llvm/Support/CommandLine.h>
[5436]14#include <kernels/kernel_builder.h>
[4929]15
[5440]16#include <llvm/Support/raw_ostream.h>
17
[4974]18using namespace kernel;
[5260]19using namespace parabix;
20using namespace llvm;
[4929]21
[5486]22// static cl::opt<bool> UseYield("yield", cl::desc("yield after waiting"), cl::init(false));
[5436]23
[5390]24template <typename Value>
25using StreamSetBufferMap = boost::container::flat_map<const StreamSetBuffer *, Value>;
26
[5403]27template <typename Value>
28using FlatSet = boost::container::flat_set<Value>;
[5390]29
[5436]30Function * makeThreadFunction(const std::unique_ptr<kernel::KernelBuilder> & b, const std::string & name) {
[5411]31    Function * const f = Function::Create(FunctionType::get(b->getVoidTy(), {b->getVoidPtrTy()}, false), Function::InternalLinkage, name, b->getModule());
[5402]32    f->setCallingConv(CallingConv::C);
33    f->arg_begin()->setName("input");
34    return f;
35}
36
[5403]37/** ------------------------------------------------------------------------------------------------------------- *
38 * @brief generateSegmentParallelPipeline
39 *
40 * Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
41 * operating over an input stream set S, a segment-parallel implementation divides the input
42 * into segments and coordinates a set of T <= K threads to each process one segment at a time.
43 * Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
44 * fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
45 ** ------------------------------------------------------------------------------------------------------------- */
[5436]46void generateSegmentParallelPipeline(const std::unique_ptr<KernelBuilder> & iBuilder, const std::vector<Kernel *> & kernels) {
[5403]47
[5408]48    const unsigned n = kernels.size();
[5402]49    Module * const m = iBuilder->getModule();
[5403]50    IntegerType * const sizeTy = iBuilder->getSizeTy();
[5267]51    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
[5403]52    Constant * nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
53    std::vector<Type *> structTypes;
[5408]54
[5597]55    codegen::BufferSegments = std::max(codegen::BufferSegments, codegen::ThreadNum);
56
[5408]57    Value * instance[n];
58    for (unsigned i = 0; i < n; ++i) {
59        instance[i] = kernels[i]->getInstance();
60        structTypes.push_back(instance[i]->getType());
[5403]61    }
62    StructType * const sharedStructType = StructType::get(m->getContext(), structTypes);
63    StructType * const threadStructType = StructType::get(sharedStructType->getPointerTo(), sizeTy, nullptr);
[5165]64
[5411]65    Function * const threadFunc = makeThreadFunction(iBuilder, "segment");
[5408]66
[5403]67    // -------------------------------------------------------------------------------------------------------------------------
68    // MAKE SEGMENT PARALLEL PIPELINE THREAD
69    // -------------------------------------------------------------------------------------------------------------------------
70    const auto ip = iBuilder->saveIP();
71
[5165]72     // Create the basic blocks for the thread function.
[5402]73    BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc);
[5403]74    iBuilder->SetInsertPoint(entryBlock);
75    Value * const input = &threadFunc->getArgumentList().front();
76    Value * const threadStruct = iBuilder->CreatePointerCast(input, threadStructType->getPointerTo());
77    Value * const sharedStatePtr = iBuilder->CreateLoad(iBuilder->CreateGEP(threadStruct, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
[5408]78    for (unsigned k = 0; k < n; ++k) {
79        Value * ptr = iBuilder->CreateLoad(iBuilder->CreateGEP(sharedStatePtr, {iBuilder->getInt32(0), iBuilder->getInt32(k)}));
80        kernels[k]->setInstance(ptr);
[5165]81    }
[5403]82    Value * const segOffset = iBuilder->CreateLoad(iBuilder->CreateGEP(threadStruct, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
[5165]83
[5403]84    BasicBlock * segmentLoop = BasicBlock::Create(iBuilder->getContext(), "segmentLoop", threadFunc);
[5165]85    iBuilder->CreateBr(segmentLoop);
86
87    iBuilder->SetInsertPoint(segmentLoop);
[5403]88    PHINode * const segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 2, "segNo");
89    segNo->addIncoming(segOffset, entryBlock);
[5274]90
[5418]91    Value * terminated = iBuilder->getFalse();
[5403]92    Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
93
94    BasicBlock * segmentLoopBody = nullptr;
95    BasicBlock * const exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
96
97    StreamSetBufferMap<Value *> producedPos;
[5418]98    StreamSetBufferMap<Value *> consumedPos;
[5403]99
[5456]100    Value * cycleCountStart = nullptr;
101    Value * cycleCountEnd = nullptr;
102    if (codegen::EnableCycleCounter) {
103        cycleCountStart = iBuilder->CreateReadCycleCounter();
104    }
105
[5408]106    for (unsigned k = 0; k < n; ++k) {
107
[5407]108        const auto & kernel = kernels[k];
[5390]109
[5408]110        BasicBlock * const segmentWait = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Wait", threadFunc);
[5486]111
112        BasicBlock * segmentYield = segmentWait;
113//        if (UseYield) {
114//            segmentYield = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Yield", threadFunc);
115//        }
116
[5408]117        iBuilder->CreateBr(segmentWait);
118
[5403]119        segmentLoopBody = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Do", threadFunc);
120
121        iBuilder->SetInsertPoint(segmentWait);
[5408]122        const unsigned waitIdx = codegen::DebugOptionIsSet(codegen::SerializeThreads) ? (n - 1) : k;
[5435]123
[5440]124        iBuilder->setKernel(kernels[waitIdx]);
125        Value * const processedSegmentCount = iBuilder->acquireLogicalSegmentNo();
126        iBuilder->setKernel(kernel);
[5435]127
[5403]128        assert (processedSegmentCount->getType() == segNo->getType());
129        Value * const ready = iBuilder->CreateICmpEQ(segNo, processedSegmentCount);
130
[5402]131        if (kernel->hasNoTerminateAttribute()) {
[5486]132            iBuilder->CreateCondBr(ready, segmentLoopBody, segmentYield);
[5292]133        } else { // If the kernel was terminated in a previous segment then the pipeline is done.
[5402]134            BasicBlock * completionTest = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Completed", threadFunc, 0);
135            BasicBlock * exitBlock = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Exit", threadFunc, 0);
[5486]136            iBuilder->CreateCondBr(ready, completionTest, segmentYield);
[5408]137
[5274]138            iBuilder->SetInsertPoint(completionTest);
[5440]139            Value * terminationSignal = iBuilder->getTerminationSignal();
[5435]140            iBuilder->CreateCondBr(terminationSignal, exitBlock, segmentLoopBody);
[5305]141            iBuilder->SetInsertPoint(exitBlock);
142            // Ensure that the next thread will also exit.
[5440]143            iBuilder->releaseLogicalSegmentNo(nextSegNo);
[5305]144            iBuilder->CreateBr(exitThreadBlock);
[5274]145        }
[5403]146
[5486]147//        if (UseYield) {
148//            // Yield the thread after waiting
149//            iBuilder->SetInsertPoint(segmentYield);
150//            iBuilder->CreatePThreadYield();
151//            iBuilder->CreateBr(segmentWait);
152//        }
153
[5403]154        // Execute the kernel segment
155        iBuilder->SetInsertPoint(segmentLoopBody);
156        const auto & inputs = kernel->getStreamInputs();
[5418]157        std::vector<Value *> args = {kernel->getInstance(), terminated};
[5403]158        for (unsigned i = 0; i < inputs.size(); ++i) {
159            const auto f = producedPos.find(kernel->getStreamSetInputBuffer(i));
[5418]160            assert (f != producedPos.end());
[5403]161            args.push_back(f->second);
[5253]162        }
[5408]163
[5440]164        iBuilder->setKernel(kernel);
165        iBuilder->createDoSegmentCall(args);
[5403]166        if (!kernel->hasNoTerminateAttribute()) {
[5440]167            terminated = iBuilder->CreateOr(terminated, iBuilder->getTerminationSignal());
[5370]168        }
[5411]169
170        const auto & outputs = kernel->getStreamOutputs();
[5440]171        for (unsigned i = 0; i < outputs.size(); ++i) {           
172            Value * const produced = iBuilder->getProducedItemCount(outputs[i].name, terminated);
[5403]173            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
174            assert (producedPos.count(buf) == 0);
175            producedPos.emplace(buf, produced);
[5263]176        }
[5418]177        for (unsigned i = 0; i < inputs.size(); ++i) {
[5440]178            Value * const processedItemCount = iBuilder->getProcessedItemCount(inputs[i].name);
[5435]179            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);           
[5418]180            auto f = consumedPos.find(buf);
181            if (f == consumedPos.end()) {
182                consumedPos.emplace(buf, processedItemCount);
183            } else {
184                Value * lesser = iBuilder->CreateICmpULT(processedItemCount, f->second);
185                f->second = iBuilder->CreateSelect(lesser, processedItemCount, f->second);
186            }
187        }
[5456]188        if (codegen::EnableCycleCounter) {
189            cycleCountEnd = iBuilder->CreateReadCycleCounter();
190            Value * counterPtr = iBuilder->getScalarFieldPtr(Kernel::CYCLECOUNT_SCALAR);
191            iBuilder->CreateStore(iBuilder->CreateAdd(iBuilder->CreateLoad(counterPtr), iBuilder->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
192            cycleCountStart = cycleCountEnd;
193        }
194       
[5440]195        iBuilder->releaseLogicalSegmentNo(nextSegNo);
[5165]196    }
[5266]197
[5408]198    assert (segmentLoopBody);
199    exitThreadBlock->moveAfter(segmentLoopBody);
200
[5418]201    for (const auto consumed : consumedPos) {
202        const StreamSetBuffer * const buf = consumed.first;
[5435]203        Kernel * kernel = buf->getProducer();
204        const auto & outputs = kernel->getStreamSetOutputBuffers();
[5418]205        for (unsigned i = 0; i < outputs.size(); ++i) {
206            if (outputs[i] == buf) {
[5440]207                iBuilder->setKernel(kernel);
208                iBuilder->setConsumedItemCount(kernel->getStreamOutput(i).name, consumed.second);
[5418]209                break;
210            }
211        }
212    }
213
214    segNo->addIncoming(iBuilder->CreateAdd(segNo, iBuilder->getSize(codegen::ThreadNum)), segmentLoopBody);
215    iBuilder->CreateCondBr(terminated, exitThreadBlock, segmentLoop);
216
[5408]217    iBuilder->SetInsertPoint(exitThreadBlock);
[5418]218
219    // only call pthread_exit() within spawned threads; otherwise it'll be equivalent to calling exit() within the process
220    BasicBlock * const exitThread = BasicBlock::Create(iBuilder->getContext(), "ExitThread", threadFunc);
221    BasicBlock * const exitFunction = BasicBlock::Create(iBuilder->getContext(), "ExitProcessFunction", threadFunc);
222
223    Value * const exitCond = iBuilder->CreateICmpEQ(segOffset, ConstantInt::getNullValue(segOffset->getType()));
224    iBuilder->CreateCondBr(exitCond, exitFunction, exitThread);
225    iBuilder->SetInsertPoint(exitThread);
[5408]226    iBuilder->CreatePThreadExitCall(nullVoidPtrVal);
[5418]227    iBuilder->CreateBr(exitFunction);
228    iBuilder->SetInsertPoint(exitFunction);
[5408]229    iBuilder->CreateRetVoid();
230
[5403]231    // -------------------------------------------------------------------------------------------------------------------------
[5263]232    iBuilder->restoreIP(ip);
[5403]233
[5408]234    for (unsigned i = 0; i < n; ++i) {
235        kernels[i]->setInstance(instance[i]);
236    }
237
[5403]238    // -------------------------------------------------------------------------------------------------------------------------
239    // MAKE SEGMENT PARALLEL PIPELINE DRIVER
240    // -------------------------------------------------------------------------------------------------------------------------
[5418]241    const unsigned threads = codegen::ThreadNum - 1;
242    assert (codegen::ThreadNum > 1);
[5403]243    Type * const pthreadsTy = ArrayType::get(sizeTy, threads);
244    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
245    Value * threadIdPtr[threads];
[5408]246
247    for (unsigned i = 0; i < threads; ++i) {
[5403]248        threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
249    }
250
[5408]251    for (unsigned i = 0; i < n; ++i) {
[5440]252        iBuilder->setKernel(kernels[i]);
253        iBuilder->releaseLogicalSegmentNo(iBuilder->getSize(0));
[5403]254    }
255
256    AllocaInst * const sharedStruct = iBuilder->CreateCacheAlignedAlloca(sharedStructType);
[5408]257    for (unsigned i = 0; i < n; ++i) {
[5403]258        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
259        iBuilder->CreateStore(kernels[i]->getInstance(), ptr);
260    }
261
[5418]262    // use the process thread to handle the initial segment function after spawning (n - 1) threads to handle the subsequent offsets
[5408]263    for (unsigned i = 0; i < threads; ++i) {
[5418]264        AllocaInst * const threadState = iBuilder->CreateAlloca(threadStructType);
265        iBuilder->CreateStore(sharedStruct, iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
266        iBuilder->CreateStore(iBuilder->getSize(i + 1), iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
[5403]267        iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, threadFunc, threadState);
268    }
269
[5418]270    AllocaInst * const threadState = iBuilder->CreateAlloca(threadStructType);
271    iBuilder->CreateStore(sharedStruct, iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
272    iBuilder->CreateStore(iBuilder->getSize(0), iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
273    iBuilder->CreateCall(threadFunc, iBuilder->CreatePointerCast(threadState, voidPtrTy));
274
[5411]275    AllocaInst * const status = iBuilder->CreateAlloca(voidPtrTy);
[5408]276    for (unsigned i = 0; i < threads; ++i) {
[5403]277        Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]);
278        iBuilder->CreatePThreadJoinCall(threadId, status);
279    }
[5456]280   
281    if (codegen::EnableCycleCounter) {
282        for (unsigned k = 0; k < kernels.size(); k++) {
283            auto & kernel = kernels[k];
284            iBuilder->setKernel(kernel);
285            const auto & inputs = kernel->getStreamInputs();
286            const auto & outputs = kernel->getStreamOutputs();
287            Value * items = nullptr;
288            if (inputs.empty()) {
289                items = iBuilder->getProducedItemCount(outputs[0].name);
290            } else {
291                items = iBuilder->getProcessedItemCount(inputs[0].name);
292            }
293            Value * fItems = iBuilder->CreateUIToFP(items, iBuilder->getDoubleTy());
294            Value * cycles = iBuilder->CreateLoad(iBuilder->getScalarFieldPtr(Kernel::CYCLECOUNT_SCALAR));
295            Value * fCycles = iBuilder->CreateUIToFP(cycles, iBuilder->getDoubleTy());
296            std::string formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
297            Value * stringPtr = iBuilder->CreatePointerCast(iBuilder->GetString(formatString), iBuilder->getInt8PtrTy());
298            iBuilder->CreateCall(iBuilder->GetDprintf(), {iBuilder->getInt32(2), stringPtr, fItems, fCycles, iBuilder->CreateFDiv(fCycles, fItems)});
299        }
300    }
301   
[5165]302}
303
[5251]304
[5403]305/** ------------------------------------------------------------------------------------------------------------- *
306 * @brief generateParallelPipeline
307 ** ------------------------------------------------------------------------------------------------------------- */
[5436]308void generateParallelPipeline(const std::unique_ptr<KernelBuilder> & iBuilder, const std::vector<Kernel *> &kernels) {
[5251]309
[5403]310    Module * const m = iBuilder->getModule();
311    IntegerType * const sizeTy = iBuilder->getSizeTy();
[5267]312    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
[5403]313    ConstantInt * bufferSegments = ConstantInt::get(sizeTy, codegen::BufferSegments - 1);
314    ConstantInt * segmentItems = ConstantInt::get(sizeTy, codegen::SegmentSize * iBuilder->getBitBlockWidth());
315    Constant * const nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
316
317    const unsigned n = kernels.size();
318
319    Type * const pthreadsTy = ArrayType::get(sizeTy, n);
[5165]320    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
[5403]321    Value * threadIdPtr[n];
[5408]322    for (unsigned i = 0; i < n; ++i) {
[5403]323        threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
[5165]324    }
[5403]325
[5408]326    Value * instance[n];
[5403]327    Type * structTypes[n];
[5408]328    for (unsigned i = 0; i < n; ++i) {
329        instance[i] = kernels[i]->getInstance();
330        structTypes[i] = instance[i]->getType();
[5165]331    }
[5408]332
[5403]333    Type * const sharedStructType = StructType::get(m->getContext(), ArrayRef<Type *>{structTypes, n});
[5408]334
335
[5202]336    AllocaInst * sharedStruct = iBuilder->CreateAlloca(sharedStructType);
[5408]337    for (unsigned i = 0; i < n; ++i) {
[5221]338        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
[5408]339        iBuilder->CreateStore(instance[i], ptr);
[5165]340    }
[5408]341
[5407]342    for (auto & kernel : kernels) {
[5440]343        iBuilder->setKernel(kernel);
344        iBuilder->releaseLogicalSegmentNo(iBuilder->getSize(0));
[5273]345    }
346
[5403]347    // GENERATE THE PRODUCING AND CONSUMING KERNEL MAPS
348    StreamSetBufferMap<unsigned> producingKernel;
349    StreamSetBufferMap<std::vector<unsigned>> consumingKernels;
350    for (unsigned id = 0; id < n; ++id) {
[5407]351        const auto & kernel = kernels[id];
[5403]352        const auto & inputs = kernel->getStreamInputs();
353        const auto & outputs = kernel->getStreamOutputs();
354        // add any outputs from this kernel to the producing kernel map
355        for (unsigned j = 0; j < outputs.size(); ++j) {
356            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(j);
357            if (LLVM_UNLIKELY(producingKernel.count(buf) != 0)) {
358                report_fatal_error(kernel->getName() + " redefines stream set " + outputs[j].name);
359            }
360            producingKernel.emplace(buf, id);
361        }
362        // and any inputs to the consuming kernels list
363        for (unsigned j = 0; j < inputs.size(); ++j) {
364            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(j);
365            auto f = consumingKernels.find(buf);
366            if (f == consumingKernels.end()) {
367                if (LLVM_UNLIKELY(producingKernel.count(buf) == 0)) {
368                    report_fatal_error(kernel->getName() + " uses stream set " + inputs[j].name + " prior to its definition");
369                }
370                consumingKernels.emplace(buf, std::vector<unsigned>{ id });
371            } else {
372                f->second.push_back(id);
373            }
374        }
[5165]375    }
376
[5363]377    const auto ip = iBuilder->saveIP();
378
[5403]379    // GENERATE UNIQUE PIPELINE PARALLEL THREAD FUNCTION FOR EACH KERNEL
380    FlatSet<unsigned> kernelSet;
381    kernelSet.reserve(n);
[5363]382
[5403]383    Function * thread_functions[n];
[5408]384    Value * producerSegNo[n];
[5403]385    for (unsigned id = 0; id < n; id++) {
[5407]386        const auto & kernel = kernels[id];
[5440]387
388        iBuilder->setKernel(kernel);
389
[5403]390        const auto & inputs = kernel->getStreamInputs();
[5363]391
[5411]392        Function * const threadFunc = makeThreadFunction(iBuilder, "ppt:" + kernel->getName());
[5390]393
[5403]394         // Create the basic blocks for the thread function.
395        BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc);
396        BasicBlock * outputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "outputCheck", threadFunc);
397        BasicBlock * inputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "inputCheck", threadFunc);
398        BasicBlock * doSegmentBlock = BasicBlock::Create(iBuilder->getContext(), "doSegment", threadFunc);
399        BasicBlock * exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
[5363]400
[5403]401        iBuilder->SetInsertPoint(entryBlock);
[5363]402
[5403]403        Value * sharedStruct = iBuilder->CreateBitCast(&threadFunc->getArgumentList().front(), sharedStructType->getPointerTo());
[5363]404
[5403]405        for (unsigned k = 0; k < n; k++) {
406            Value * const ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
[5408]407            kernels[k]->setInstance(iBuilder->CreateLoad(ptr));
[5402]408        }
[5363]409
[5403]410        iBuilder->CreateBr(outputCheckBlock);
[5363]411
[5403]412        // Check whether the output buffers are ready for more data
413        iBuilder->SetInsertPoint(outputCheckBlock);
414        PHINode * segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 3, "segNo");
415        segNo->addIncoming(iBuilder->getSize(0), entryBlock);
416        segNo->addIncoming(segNo, outputCheckBlock);
[5363]417
[5403]418        Value * outputWaitCond = iBuilder->getTrue();
419        for (const StreamSetBuffer * buf : kernel->getStreamSetOutputBuffers()) {
420            const auto & list = consumingKernels[buf];
421            assert(std::is_sorted(list.begin(), list.end()));
422            kernelSet.insert(list.begin(), list.end());
423        }
424        for (unsigned k : kernelSet) {
[5440]425            iBuilder->setKernel(kernels[k]);
426            Value * consumerSegNo = iBuilder->acquireLogicalSegmentNo();
[5403]427            assert (consumerSegNo->getType() == segNo->getType());
428            Value * consumedSegNo = iBuilder->CreateAdd(consumerSegNo, bufferSegments);
429            outputWaitCond = iBuilder->CreateAnd(outputWaitCond, iBuilder->CreateICmpULE(segNo, consumedSegNo));
430        }
431        kernelSet.clear();
[5440]432        iBuilder->setKernel(kernel);
[5403]433        iBuilder->CreateCondBr(outputWaitCond, inputCheckBlock, outputCheckBlock);
[5363]434
[5403]435        // Check whether the input buffers have enough data for this kernel to begin
436        iBuilder->SetInsertPoint(inputCheckBlock);
437        for (const StreamSetBuffer * buf : kernel->getStreamSetInputBuffers()) {
438            kernelSet.insert(producingKernel[buf]);
[5402]439        }
[5363]440
[5402]441        Value * inputWaitCond = iBuilder->getTrue();
[5408]442        for (unsigned k : kernelSet) {
[5440]443            iBuilder->setKernel(kernels[k]);
444            producerSegNo[k] = iBuilder->acquireLogicalSegmentNo();
[5408]445            assert (producerSegNo[k]->getType() == segNo->getType());
446            inputWaitCond = iBuilder->CreateAnd(inputWaitCond, iBuilder->CreateICmpULT(segNo, producerSegNo[k]));
[5363]447        }
[5440]448        iBuilder->setKernel(kernel);
[5402]449        iBuilder->CreateCondBr(inputWaitCond, doSegmentBlock, inputCheckBlock);
[5363]450
[5403]451        // Process the segment
[5363]452        iBuilder->SetInsertPoint(doSegmentBlock);
453
[5403]454        Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
455        Value * terminated = nullptr;
[5408]456        if (kernelSet.empty()) {
[5403]457            // if this kernel has no input streams, the kernel itself must decide when it terminates.
[5440]458            terminated = iBuilder->getTerminationSignal();
[5403]459        } else {
460            // ... otherwise the kernel terminates only when it exhausts all of its input streams
461            terminated = iBuilder->getTrue();
462            for (unsigned k : kernelSet) {
[5440]463                iBuilder->setKernel(kernels[k]);
464                terminated = iBuilder->CreateAnd(terminated, iBuilder->getTerminationSignal());
[5408]465                terminated = iBuilder->CreateAnd(terminated, iBuilder->CreateICmpEQ(nextSegNo, producerSegNo[k]));
[5403]466            }
467            kernelSet.clear();
[5440]468            iBuilder->setKernel(kernel);
[5363]469        }
[5403]470
[5408]471        std::vector<Value *> args = {kernel->getInstance(), terminated};
[5403]472        args.insert(args.end(), inputs.size(), iBuilder->CreateMul(segmentItems, segNo));
473
[5440]474        iBuilder->createDoSegmentCall(args);
[5363]475        segNo->addIncoming(nextSegNo, doSegmentBlock);
[5440]476        iBuilder->releaseLogicalSegmentNo(nextSegNo);
[5363]477
478        iBuilder->CreateCondBr(terminated, exitThreadBlock, outputCheckBlock);
479
[5403]480        iBuilder->SetInsertPoint(exitThreadBlock);
[5418]481
[5403]482        iBuilder->CreatePThreadExitCall(nullVoidPtrVal);
[5418]483
[5403]484        iBuilder->CreateRetVoid();
[5135]485
[5403]486        thread_functions[id] = threadFunc;
[5390]487    }
[5402]488
[5403]489    iBuilder->restoreIP(ip);
[5402]490
[5408]491    for (unsigned i = 0; i < n; ++i) {
492        kernels[i]->setInstance(instance[i]);
493    }
494
495    for (unsigned i = 0; i < n; ++i) {
[5403]496        iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, thread_functions[i], sharedStruct);
[5363]497    }
[5135]498
[5411]499    AllocaInst * const status = iBuilder->CreateAlloca(voidPtrTy);
[5408]500    for (unsigned i = 0; i < n; ++i) {
[5403]501        Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]);
[5402]502        iBuilder->CreatePThreadJoinCall(threadId, status);
[5363]503    }
504}
505
[5403]506/** ------------------------------------------------------------------------------------------------------------- *
507 * @brief generatePipelineLoop
508 ** ------------------------------------------------------------------------------------------------------------- */
[5436]509void generatePipelineLoop(const std::unique_ptr<KernelBuilder> & iBuilder, const std::vector<Kernel *> & kernels) {
[5402]510
[5086]511    BasicBlock * entryBlock = iBuilder->GetInsertBlock();
512    Function * main = entryBlock->getParent();
[5273]513
[5263]514    // Create the basic blocks for the loop.
[5402]515    BasicBlock * pipelineLoop = BasicBlock::Create(iBuilder->getContext(), "pipelineLoop", main);
516    BasicBlock * pipelineExit = BasicBlock::Create(iBuilder->getContext(), "pipelineExit", main);
[5390]517
[5402]518    StreamSetBufferMap<Value *> producedPos;
[5418]519    StreamSetBufferMap<Value *> consumedPos;
[5263]520
[5402]521    iBuilder->CreateBr(pipelineLoop);
522    iBuilder->SetInsertPoint(pipelineLoop);
[5424]523   
524    Value * cycleCountStart = nullptr;
525    Value * cycleCountEnd = nullptr;
526    if (codegen::EnableCycleCounter) {
527        cycleCountStart = iBuilder->CreateReadCycleCounter();
528    }
[5402]529    Value * terminated = iBuilder->getFalse();
[5424]530    for (unsigned k = 0; k < kernels.size(); k++) {
[5440]531
[5424]532        auto & kernel = kernels[k];
[5418]533
[5440]534        iBuilder->setKernel(kernel);
[5402]535        const auto & inputs = kernel->getStreamInputs();
[5418]536        const auto & outputs = kernel->getStreamOutputs();
537
[5408]538        std::vector<Value *> args = {kernel->getInstance(), terminated};
[5402]539        for (unsigned i = 0; i < inputs.size(); ++i) {
540            const auto f = producedPos.find(kernel->getStreamSetInputBuffer(i));
541            if (LLVM_UNLIKELY(f == producedPos.end())) {
542                report_fatal_error(kernel->getName() + " uses stream set " + inputs[i].name + " prior to its definition");
543            }
544            args.push_back(f->second);
[5252]545        }
[5418]546
[5440]547        iBuilder->createDoSegmentCall(args);
[5398]548        if (!kernel->hasNoTerminateAttribute()) {
[5440]549            Value * terminatedSignal = iBuilder->getTerminationSignal();
[5435]550            terminated = iBuilder->CreateOr(terminated, terminatedSignal);
[5252]551        }
[5408]552        for (unsigned i = 0; i < outputs.size(); ++i) {
[5440]553            Value * const produced = iBuilder->getProducedItemCount(outputs[i].name, terminated);
[5402]554            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
555            assert (producedPos.count(buf) == 0);
556            producedPos.emplace(buf, produced);
[5252]557        }
[5408]558
[5418]559        for (unsigned i = 0; i < inputs.size(); ++i) {
[5440]560            Value * const processedItemCount = iBuilder->getProcessedItemCount(inputs[i].name);
[5418]561            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);
562            auto f = consumedPos.find(buf);
563            if (f == consumedPos.end()) {
564                consumedPos.emplace(buf, processedItemCount);
565            } else {
566                Value * lesser = iBuilder->CreateICmpULT(processedItemCount, f->second);
567                f->second = iBuilder->CreateSelect(lesser, processedItemCount, f->second);
568            }
569        }
[5424]570        if (codegen::EnableCycleCounter) {
571            cycleCountEnd = iBuilder->CreateReadCycleCounter();
[5456]572            //Value * counterPtr = iBuilder->CreateGEP(mCycleCounts, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
573            Value * counterPtr = iBuilder->getScalarFieldPtr(Kernel::CYCLECOUNT_SCALAR);
[5424]574            iBuilder->CreateStore(iBuilder->CreateAdd(iBuilder->CreateLoad(counterPtr), iBuilder->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
575            cycleCountStart = cycleCountEnd;
576        }
[5435]577
[5440]578        Value * const segNo = iBuilder->acquireLogicalSegmentNo();
[5435]579        Value * nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
[5440]580        iBuilder->releaseLogicalSegmentNo(nextSegNo);
[5025]581    }
[5408]582
[5418]583    for (const auto consumed : consumedPos) {
584        const StreamSetBuffer * const buf = consumed.first;
[5435]585        Kernel * k = buf->getProducer();
[5418]586        const auto & outputs = k->getStreamSetOutputBuffers();
587        for (unsigned i = 0; i < outputs.size(); ++i) {
588            if (outputs[i] == buf) {
[5440]589                iBuilder->setKernel(k);
590                iBuilder->setConsumedItemCount(k->getStreamOutput(i).name, consumed.second);
[5418]591                break;
592            }
593        }
594    }
595
[5402]596    iBuilder->CreateCondBr(terminated, pipelineExit, pipelineLoop);
597    iBuilder->SetInsertPoint(pipelineExit);
[5424]598    if (codegen::EnableCycleCounter) {
599        for (unsigned k = 0; k < kernels.size(); k++) {
600            auto & kernel = kernels[k];
[5440]601            iBuilder->setKernel(kernel);
[5424]602            const auto & inputs = kernel->getStreamInputs();
603            const auto & outputs = kernel->getStreamOutputs();
[5440]604            Value * items = nullptr;
605            if (inputs.empty()) {
606                items = iBuilder->getProducedItemCount(outputs[0].name);
607            } else {
[5446]608                items = iBuilder->getProcessedItemCount(inputs[0].name);
[5440]609            }
[5424]610            Value * fItems = iBuilder->CreateUIToFP(items, iBuilder->getDoubleTy());
[5456]611            Value * cycles = iBuilder->CreateLoad(iBuilder->getScalarFieldPtr(Kernel::CYCLECOUNT_SCALAR));
[5424]612            Value * fCycles = iBuilder->CreateUIToFP(cycles, iBuilder->getDoubleTy());
613            std::string formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
[5435]614            Value * stringPtr = iBuilder->CreatePointerCast(iBuilder->GetString(formatString), iBuilder->getInt8PtrTy());
[5424]615            iBuilder->CreateCall(iBuilder->GetDprintf(), {iBuilder->getInt32(2), stringPtr, fItems, fCycles, iBuilder->CreateFDiv(fCycles, fItems)});
616        }
617    }
[5252]618}
Note: See TracBrowser for help on using the repository browser.