source: icGREP/icgrep-devel/icgrep/toolchain/pipeline.cpp @ 5994

Last change on this file since 5994 was 5985, checked in by nmedfort, 18 months ago

Restructured MultiBlock? kernel. Removal of Swizzled buffers. Inclusion of PopCount? rates / non-linear access. Modifications to several kernels to better align them with the kernel and pipeline changes.

File size: 49.1 KB
RevLine 
[4929]1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
[5227]6#include "pipeline.h"
[5425]7#include <toolchain/toolchain.h>
[5086]8#include <kernels/kernel.h>
[5276]9#include <kernels/streamset.h>
[5267]10#include <llvm/IR/Module.h>
[5403]11#include <boost/container/flat_set.hpp>
[5390]12#include <boost/container/flat_map.hpp>
[5856]13#include <boost/graph/adjacency_list.hpp>
[5985]14#include <boost/range/adaptor/reversed.hpp>
[5436]15#include <kernels/kernel_builder.h>
[5856]16#include <llvm/Support/raw_ostream.h>
17
[4974]18using namespace kernel;
[5260]19using namespace parabix;
20using namespace llvm;
[5856]21using namespace boost;
22using namespace boost::container;
[4929]23
[5985]24#define DISABLE_COPY_TO_OVERFLOW
25
[5706]26using Port = Kernel::Port;
[5436]27
28Function * makeThreadFunction(const std::unique_ptr<kernel::KernelBuilder> & b, const std::string & name) {
[5411]29    Function * const f = Function::Create(FunctionType::get(b->getVoidTy(), {b->getVoidPtrTy()}, false), Function::InternalLinkage, name, b->getModule());
[5402]30    f->setCallingConv(CallingConv::C);
[5748]31    f->arg_begin()->setName("state");
[5402]32    return f;
33}
34
[5985]35class PipelineGenerator {
36public:
[5706]37
[5856]38    template <typename Value>
39    using StreamSetBufferMap = flat_map<const StreamSetBuffer *, Value>;
[5782]40
[5856]41    using RateValue = ProcessingRate::RateValue;
[5833]42
[5985]43    PipelineGenerator(const std::vector<Kernel *> & kernels)
44    : kernels(kernels)
45    , terminated(nullptr)
46    , noMore(nullptr)
47    , deadLockCounter(nullptr)
48    , anyProgress(nullptr)
49    , madeProgress(nullptr) {
50
51    }
52
[5856]53    struct Channel {
54        Channel() = default;
[5985]55        Channel(const RateValue & ratio, const StreamSetBuffer * const buffer = nullptr, const unsigned operand = 0)
56        : ratio(ratio), buffer(buffer), operand(operand) { }
[5856]57
[5985]58        RateValue               ratio;
[5856]59        const StreamSetBuffer * buffer;
[5985]60        unsigned                operand;
[5856]61    };
62
[5985]63    using ChannelGraph = adjacency_list<vecS, vecS, bidirectionalS, const Kernel *, Channel>;
[5856]64
[5985]65    using DependencyGraph = adjacency_list<hash_setS, vecS, bidirectionalS, Value *>;
[5856]66
[5985]67    using KernelMap = flat_map<const Kernel *, unsigned>;
[5856]68
[5985]69    void initialize(const std::unique_ptr<KernelBuilder> & b, BasicBlock * const entryBlock);
[5856]70
[5985]71    void execute(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
72
73    Value * finalize(const std::unique_ptr<KernelBuilder> & b);
74
[5865]75protected:
76
[5985]77    ChannelGraph makeInputGraph() const;
[5865]78
[5985]79    ChannelGraph makeOutputGraph() const;
[5865]80
[5985]81    DependencyGraph makeDependencyGraph() const;
[5865]82
[5985]83    template<class VertexList>
84    ChannelGraph pruneGraph(ChannelGraph && G, VertexList && V) const;
[5865]85
[5985]86    void checkIfAllInputKernelsAreTerminated(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
[5865]87
[5985]88    void checkAvailableInputData(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
[5856]89
[5985]90    void checkAvailableOutputSpace(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
91
92    Value * getStrideLength(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel, const Binding & binding);
93
94    Value * callKernel(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
95
96    void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
97
98    Value * getFullyProcessedItemCount(const std::unique_ptr<KernelBuilder> & b, const Binding & binding, Value * const final) const;
99
100    void printGraph(const ChannelGraph & G) const;
101
102    void printGraph(const DependencyGraph & G) const;
103
[5856]104private:
105
[5985]106    const std::vector<Kernel *> &       kernels;
107    PHINode *                           terminated;
[5856]108
[5985]109    Value *                             noMore;
110
111    DependencyGraph                     dependencyGraph;
112    ChannelGraph                        inputGraph;
113    ChannelGraph                        outputGraph;
114
115    BasicBlock *                        kernelFinished;
116
117    PHINode *                           deadLockCounter;
118    Value *                             anyProgress;
119    PHINode *                           madeProgress;
120
[5856]121    StreamSetBufferMap<Value *>         producedItemCount;
122    StreamSetBufferMap<Value *>         consumedItemCount;
123    StreamSetBufferMap<const Kernel *>  lastConsumer;
[5985]124    flat_set<const StreamSetBuffer *>   isConsumedAtNonFixedRate;
[5856]125};
126
[5403]127/** ------------------------------------------------------------------------------------------------------------- *
128 * @brief generateSegmentParallelPipeline
129 *
130 * Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
131 * operating over an input stream set S, a segment-parallel implementation divides the input
132 * into segments and coordinates a set of T <= K threads to each process one segment at a time.
133 * Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
134 * fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
135 ** ------------------------------------------------------------------------------------------------------------- */
[5782]136void generateSegmentParallelPipeline(const std::unique_ptr<KernelBuilder> & b, const std::vector<Kernel *> & kernels) {
[5403]137
[5408]138    const unsigned n = kernels.size();
[5782]139    Module * const m = b->getModule();
140    IntegerType * const sizeTy = b->getSizeTy();
141    PointerType * const voidPtrTy = b->getVoidPtrTy();
[5403]142    Constant * nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
143    std::vector<Type *> structTypes;
[5597]144    codegen::BufferSegments = std::max(codegen::BufferSegments, codegen::ThreadNum);
145
[5408]146    Value * instance[n];
147    for (unsigned i = 0; i < n; ++i) {
148        instance[i] = kernels[i]->getInstance();
149        structTypes.push_back(instance[i]->getType());
[5403]150    }
151    StructType * const sharedStructType = StructType::get(m->getContext(), structTypes);
[5733]152    StructType * const threadStructType = StructType::get(m->getContext(), {sharedStructType->getPointerTo(), sizeTy});
[5165]153
[5782]154    const auto ip = b->saveIP();
[5748]155
[5782]156    Function * const threadFunc = makeThreadFunction(b, "segment");
[5748]157    auto args = threadFunc->arg_begin();
[5408]158
[5403]159    // -------------------------------------------------------------------------------------------------------------------------
160    // MAKE SEGMENT PARALLEL PIPELINE THREAD
161    // -------------------------------------------------------------------------------------------------------------------------
162
[5165]163     // Create the basic blocks for the thread function.
[5782]164    BasicBlock * entryBlock = BasicBlock::Create(b->getContext(), "entry", threadFunc);
165    b->SetInsertPoint(entryBlock);
[5748]166
[5782]167    Value * const threadStruct = b->CreateBitCast(&*(args), threadStructType->getPointerTo());
[5748]168
[5782]169    Value * const sharedStatePtr = b->CreateLoad(b->CreateGEP(threadStruct, {b->getInt32(0), b->getInt32(0)}));
[5408]170    for (unsigned k = 0; k < n; ++k) {
[5782]171        Value * ptr = b->CreateLoad(b->CreateGEP(sharedStatePtr, {b->getInt32(0), b->getInt32(k)}));
[5408]172        kernels[k]->setInstance(ptr);
[5165]173    }
[5782]174    Value * const segOffset = b->CreateLoad(b->CreateGEP(threadStruct, {b->getInt32(0), b->getInt32(1)}));
[5165]175
[5985]176    PipelineGenerator G(kernels);
[5856]177
178    BasicBlock * const segmentLoop = b->CreateBasicBlock("segmentLoop");
[5782]179    b->CreateBr(segmentLoop);
[5165]180
[5985]181    b->SetInsertPoint(segmentLoop);   
[5782]182    PHINode * const segNo = b->CreatePHI(b->getSizeTy(), 2, "segNo");
[5403]183    segNo->addIncoming(segOffset, entryBlock);
[5985]184    G.initialize(b, entryBlock);
[5274]185
[5456]186    Value * cycleCountStart = nullptr;
187    Value * cycleCountEnd = nullptr;
[5721]188    if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
[5782]189        cycleCountStart = b->CreateReadCycleCounter();
[5456]190    }
191
[5793]192    const bool serialize = codegen::DebugOptionIsSet(codegen::SerializeThreads);
193
[5408]194    for (unsigned k = 0; k < n; ++k) {
195
[5856]196        const Kernel * const kernel = kernels[k];
[5390]197
[5856]198        BasicBlock * const kernelWait = b->CreateBasicBlock(kernel->getName() + "Wait");
[5782]199        b->CreateBr(kernelWait);
[5408]200
[5782]201        b->SetInsertPoint(kernelWait);
[5793]202        b->setKernel(kernels[serialize ? (n - 1) : k]);
[5782]203        Value * const processedSegmentCount = b->acquireLogicalSegmentNo();
[5856]204        assert (processedSegmentCount->getType() == segNo->getType());
205        Value * const ready = b->CreateICmpEQ(segNo, processedSegmentCount);
[5435]206
[5856]207        BasicBlock * const kernelCheck = b->CreateBasicBlock(kernel->getName() + "Check");
[5793]208        b->CreateCondBr(ready, kernelCheck, kernelWait);
[5403]209
[5793]210        b->SetInsertPoint(kernelCheck);
[5408]211
[5985]212        G.execute(b, k);
[5408]213
[5985]214        b->releaseLogicalSegmentNo(b->CreateAdd(segNo, b->getSize(1)));
215
[5721]216        if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
[5782]217            cycleCountEnd = b->CreateReadCycleCounter();
218            Value * counterPtr = b->getCycleCountPtr();
219            b->CreateStore(b->CreateAdd(b->CreateLoad(counterPtr), b->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
[5456]220            cycleCountStart = cycleCountEnd;
[5793]221        }
222
[5165]223    }
[5266]224
[5782]225    segNo->addIncoming(b->CreateAdd(segNo, b->getSize(codegen::ThreadNum)), b->GetInsertBlock());
[5985]226    Value * const finished = G.finalize(b);
[5856]227    BasicBlock * const segmentExit = b->CreateBasicBlock("segmentExit");
228    b->CreateUnlikelyCondBr(finished, segmentExit, segmentLoop);
[5418]229
[5856]230    b->SetInsertPoint(segmentExit);
[5418]231    // only call pthread_exit() within spawned threads; otherwise it'll be equivalent to calling exit() within the process
[5856]232    BasicBlock * const exitThread = b->CreateBasicBlock("ExitThread");
[5985]233    BasicBlock * const exitFunction = b->CreateBasicBlock("ExitProcessFunction");   
[5856]234    b->CreateCondBr(b->CreateIsNull(segOffset), exitFunction, exitThread);
[5782]235    b->SetInsertPoint(exitThread);
236    b->CreatePThreadExitCall(nullVoidPtrVal);
[5985]237    b->CreateBr(exitFunction);   
[5782]238    b->SetInsertPoint(exitFunction);
239    b->CreateRetVoid();
[5408]240
[5403]241    // -------------------------------------------------------------------------------------------------------------------------
[5782]242    b->restoreIP(ip);
[5403]243
[5408]244    for (unsigned i = 0; i < n; ++i) {
245        kernels[i]->setInstance(instance[i]);
246    }
247
[5403]248    // -------------------------------------------------------------------------------------------------------------------------
249    // MAKE SEGMENT PARALLEL PIPELINE DRIVER
250    // -------------------------------------------------------------------------------------------------------------------------
[5418]251    const unsigned threads = codegen::ThreadNum - 1;
[5793]252    assert (codegen::ThreadNum > 0);
[5403]253    Type * const pthreadsTy = ArrayType::get(sizeTy, threads);
[5782]254    AllocaInst * const pthreads = b->CreateAlloca(pthreadsTy);
[5403]255    Value * threadIdPtr[threads];
[5408]256
257    for (unsigned i = 0; i < threads; ++i) {
[5782]258        threadIdPtr[i] = b->CreateGEP(pthreads, {b->getInt32(0), b->getInt32(i)});
[5403]259    }
260
[5408]261    for (unsigned i = 0; i < n; ++i) {
[5782]262        b->setKernel(kernels[i]);
263        b->releaseLogicalSegmentNo(b->getSize(0));
[5403]264    }
265
[5782]266    AllocaInst * const sharedStruct = b->CreateCacheAlignedAlloca(sharedStructType);
[5408]267    for (unsigned i = 0; i < n; ++i) {
[5782]268        Value * ptr = b->CreateGEP(sharedStruct, {b->getInt32(0), b->getInt32(i)});
269        b->CreateStore(kernels[i]->getInstance(), ptr);
[5403]270    }
271
[5418]272    // use the process thread to handle the initial segment function after spawning (n - 1) threads to handle the subsequent offsets
[5408]273    for (unsigned i = 0; i < threads; ++i) {
[5782]274        AllocaInst * const threadState = b->CreateAlloca(threadStructType);
275        b->CreateStore(sharedStruct, b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(0)}));
276        b->CreateStore(b->getSize(i + 1), b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(1)}));
277        b->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, threadFunc, threadState);
[5403]278    }
279
[5782]280    AllocaInst * const threadState = b->CreateAlloca(threadStructType);
281    b->CreateStore(sharedStruct, b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(0)}));
282    b->CreateStore(b->getSize(0), b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(1)}));
283    b->CreateCall(threadFunc, b->CreatePointerCast(threadState, voidPtrTy));
[5418]284
[5782]285    AllocaInst * const status = b->CreateAlloca(voidPtrTy);
[5408]286    for (unsigned i = 0; i < threads; ++i) {
[5782]287        Value * threadId = b->CreateLoad(threadIdPtr[i]);
288        b->CreatePThreadJoinCall(threadId, status);
[5403]289    }
[5456]290   
[5793]291    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
292        for (const Kernel * kernel : kernels) {
[5782]293            b->setKernel(kernel);
[5456]294            const auto & inputs = kernel->getStreamInputs();
295            const auto & outputs = kernel->getStreamOutputs();
296            Value * items = nullptr;
297            if (inputs.empty()) {
[5782]298                items = b->getProducedItemCount(outputs[0].getName());
[5456]299            } else {
[5782]300                items = b->getProcessedItemCount(inputs[0].getName());
[5456]301            }
[5782]302            Value * fItems = b->CreateUIToFP(items, b->getDoubleTy());
303            Value * cycles = b->CreateLoad(b->getCycleCountPtr());
304            Value * fCycles = b->CreateUIToFP(cycles, b->getDoubleTy());
[5761]305            const auto formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
[5782]306            Value * stringPtr = b->CreatePointerCast(b->GetString(formatString), b->getInt8PtrTy());
307            b->CreateCall(b->GetDprintf(), {b->getInt32(2), stringPtr, fItems, fCycles, b->CreateFDiv(fCycles, fItems)});
[5456]308        }
309    }
310   
[5165]311}
312
[5856]313
[5403]314/** ------------------------------------------------------------------------------------------------------------- *
315 * @brief generatePipelineLoop
316 ** ------------------------------------------------------------------------------------------------------------- */
[5782]317void generatePipelineLoop(const std::unique_ptr<KernelBuilder> & b, const std::vector<Kernel *> & kernels) {
[5402]318
[5263]319    // Create the basic blocks for the loop.
[5856]320    BasicBlock * const entryBlock = b->GetInsertBlock();
321    BasicBlock * const pipelineLoop = b->CreateBasicBlock("pipelineLoop");
[5390]322
[5985]323    PipelineGenerator G(kernels);
[5263]324
[5782]325    b->CreateBr(pipelineLoop);
[5856]326
[5985]327    b->SetInsertPoint(pipelineLoop);   
[5856]328    PHINode * const segNo = b->CreatePHI(b->getSizeTy(), 2, "segNo");
329    segNo->addIncoming(b->getSize(0), entryBlock);
[5985]330    G.initialize(b, entryBlock);
[5856]331
[5985]332    Value * const nextSegNo = b->CreateAdd(segNo, b->getSize(1));
333
[5424]334    Value * cycleCountStart = nullptr;
335    Value * cycleCountEnd = nullptr;
[5793]336    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
[5782]337        cycleCountStart = b->CreateReadCycleCounter();
[5424]338    }
[5440]339
[5985]340    for (unsigned i = 0; i < kernels.size(); ++i) {
[5418]341
[5985]342        G.execute(b, i);
[5833]343
[5985]344        b->releaseLogicalSegmentNo(nextSegNo);
[5833]345
[5793]346        if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
[5782]347            cycleCountEnd = b->CreateReadCycleCounter();
348            Value * counterPtr = b->getCycleCountPtr();
349            b->CreateStore(b->CreateAdd(b->CreateLoad(counterPtr), b->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
[5424]350            cycleCountStart = cycleCountEnd;
351        }
[5025]352    }
[5408]353
[5985]354    segNo->addIncoming(nextSegNo, b->GetInsertBlock());
355    BasicBlock * const pipelineExit = b->CreateBasicBlock("pipelineExit");
356    Value * const finished = G.finalize(b);
[5856]357    b->CreateCondBr(finished, pipelineExit, pipelineLoop);
[5706]358
[5782]359    b->SetInsertPoint(pipelineExit);
[5793]360    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
[5424]361        for (unsigned k = 0; k < kernels.size(); k++) {
362            auto & kernel = kernels[k];
[5782]363            b->setKernel(kernel);
[5424]364            const auto & inputs = kernel->getStreamInputs();
365            const auto & outputs = kernel->getStreamOutputs();
[5440]366            Value * items = nullptr;
367            if (inputs.empty()) {
[5782]368                items = b->getProducedItemCount(outputs[0].getName());
[5440]369            } else {
[5782]370                items = b->getProcessedItemCount(inputs[0].getName());
[5440]371            }
[5782]372            Value * fItems = b->CreateUIToFP(items, b->getDoubleTy());
373            Value * cycles = b->CreateLoad(b->getCycleCountPtr());
374            Value * fCycles = b->CreateUIToFP(cycles, b->getDoubleTy());
[5761]375            const auto formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
[5782]376            Value * stringPtr = b->CreatePointerCast(b->GetString(formatString), b->getInt8PtrTy());
377            b->CreateCall(b->GetDprintf(), {b->getInt32(2), stringPtr, fItems, fCycles, b->CreateFDiv(fCycles, fItems)});
[5424]378        }
379    }
[5793]380
[5252]381}
[5706]382
[5782]383/** ------------------------------------------------------------------------------------------------------------- *
[5856]384 * @brief initialize
[5782]385 ** ------------------------------------------------------------------------------------------------------------- */
[5985]386void PipelineGenerator::initialize(const std::unique_ptr<KernelBuilder> & b, BasicBlock * const entryBlock) {
[5706]387
[5985]388    dependencyGraph = makeDependencyGraph();
389    inputGraph = makeInputGraph();
390    outputGraph = makeOutputGraph();
[5706]391
[5865]392    for (Kernel * const kernel : kernels) {
393        const auto & inputs = kernel->getStreamInputs();
394        for (unsigned i = 0; i < inputs.size(); ++i) {
[5985]395            const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
396            if (kernel->requiresCopyBack(inputs[i]) && !buffer->isUnbounded()) {
397                if (LLVM_LIKELY(buffer->supportsCopyBack())) {
398                    isConsumedAtNonFixedRate.insert(buffer);
399                } else {
400    //                std::string tmp;
401    //                raw_string_ostream out(tmp);
402    //                out << kernel->getName() << " : " << name << " must have an overflow";
403    //                report_fatal_error(out.str());
404                }
405            }
[5865]406        }
[5985]407        // if this kernel consumes this buffer, update the last consumer
408        for (const StreamSetBuffer * const buffer : kernel->getStreamSetInputBuffers()) {
409            auto f = lastConsumer.find(buffer);
410            assert (f != lastConsumer.end());
411            f->second = kernel;
412        }
413        // incase some output is never consumed, make the kernel that produced it the initial "last consumer"
414        for (const StreamSetBuffer * const buffer : kernel->getStreamSetOutputBuffers()) {
415            assert (buffer->getProducer() == kernel);
416            assert (lastConsumer.count(buffer) == 0);
417            lastConsumer.emplace(buffer, kernel);
418        }
[5865]419    }
420
[5985]421    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
422        deadLockCounter = b->CreatePHI(b->getSizeTy(), 2, "deadLockCounter");
423        deadLockCounter->addIncoming(b->getSize(0), entryBlock);
424        anyProgress = b->getFalse();
425    }
426
[5865]427}
428
429/** ------------------------------------------------------------------------------------------------------------- *
[5985]430 * @brief makeTerminationGraph
431 *
432 * The input graph models whether a kernel could *consume* more data than may be produced by a preceeding kernel.
433 ** ------------------------------------------------------------------------------------------------------------- */
434PipelineGenerator::DependencyGraph PipelineGenerator::makeDependencyGraph() const {
435    const auto n = kernels.size();
436    DependencyGraph G(n);
437    KernelMap M;
438    // construct a kernel dependency graph
439    for (unsigned v = 0; v < n; ++v) {
440        const Kernel * const kernel = kernels[v];       
441        for (const StreamSetBuffer * buf : kernel->getStreamSetInputBuffers()) {
442            const auto f = M.find(buf->getProducer()); assert (f != M.end());
443            add_edge(f->second, v, G);
444        }
445        M.emplace(kernel, v);
446    }
447    // generate a transitive closure
448    for (unsigned u = 0; u < n; ++u) {
449        for (auto e : make_iterator_range(in_edges(u, G))) {
450            const auto s = source(e, G);
451            for (auto f : make_iterator_range(out_edges(u, G))) {
452                add_edge(s, target(f, G), G);
453            }
454        }
455    }
456    // then take the transitive reduction
457    std::vector<unsigned> sources;
458    for (unsigned u = n; u-- > 0; ) {
459        if (in_degree(u, G) > 0 && out_degree(u, G) > 0) {
460            for (auto e : make_iterator_range(in_edges(u, G))) {
461                sources.push_back(source(e, G));
462            }
463            std::sort(sources.begin(), sources.end());
464            for (auto e : make_iterator_range(out_edges(u, G))) {
465                remove_in_edge_if(target(e, G), [&G, &sources](const DependencyGraph::edge_descriptor f) {
466                    return std::binary_search(sources.begin(), sources.end(), source(f, G));
467                }, G);
468            }
469            sources.clear();
470        }
471    }
472    return G;
473}
474
475/** ------------------------------------------------------------------------------------------------------------- *
[5865]476 * @brief makeInputGraph
477 *
478 * The input graph models whether a kernel could *consume* more data than may be produced by a preceeding kernel.
479 ** ------------------------------------------------------------------------------------------------------------- */
[5985]480PipelineGenerator::ChannelGraph PipelineGenerator::makeInputGraph() const {
[5865]481    const auto n = kernels.size();
[5985]482    ChannelGraph G(n);
483    KernelMap M;
484    for (unsigned v = 0; v < n; ++v) {
[5865]485
486        const Kernel * const consumer = kernels[v];
[5985]487        G[v] = consumer;
[5856]488        M.emplace(consumer, v);
[5865]489
[5856]490        const auto & inputs = consumer->getStreamInputs();
491        for (unsigned i = 0; i < inputs.size(); ++i) {
[5706]492
[5856]493            const Binding & input = inputs[i];
[5985]494            auto ub_in = consumer->getUpperBound(input.getRate()) * consumer->getStride() * codegen::SegmentSize; assert (ub_in > 0);
[5856]495            if (input.hasLookahead()) {
496                ub_in += input.getLookahead();
497            }
498
[5865]499            const auto buffer = consumer->getStreamSetInputBuffer(i);
500            const Kernel * const producer = buffer->getProducer();
501            const Binding & output = producer->getStreamOutput(buffer);
[5985]502            const auto lb_out = producer->getLowerBound(output.getRate()) * producer->getStride() * codegen::SegmentSize;
[5856]503
[5985]504            const auto min_oi_ratio = lb_out / ub_in;
[5856]505            const auto f = M.find(producer); assert (f != M.end());
506            const auto u = f->second;
507            // If we have multiple inputs from the same kernel, we only need to consider the "slowest" one
508            bool slowest = true;
[5985]509            for (const auto e : make_iterator_range(in_edges(v, G))) {
510                if (source(e, G) == u) {
511                    const Channel & p = G[e];
512                    if (min_oi_ratio > p.ratio) {
[5856]513                        slowest = false;
[5985]514                    } else if (min_oi_ratio < p.ratio) {
515                        clear_in_edges(v, G);
[5856]516                    }
[5985]517                    break;
[5856]518                }
519            }
520            if (slowest) {
[5985]521                add_edge(u, v, Channel{min_oi_ratio, buffer, i}, G);
[5856]522            }
523        }
524    }
[5985]525
526    return pruneGraph(std::move(G), std::move(make_iterator_range(vertices(G))));
[5865]527}
[5856]528
[5865]529/** ------------------------------------------------------------------------------------------------------------- *
530 * @brief makeOutputGraph
531 *
532 * The output graph models whether a kernel could *produce* more data than may be consumed by a subsequent kernel.
533 ** ------------------------------------------------------------------------------------------------------------- */
[5985]534PipelineGenerator::ChannelGraph PipelineGenerator::makeOutputGraph() const {
[5865]535    const auto n = kernels.size();
[5985]536    ChannelGraph G(n);
537    KernelMap M;
538    for (unsigned v = 0; v < n; ++v) {
539        const Kernel * const consumer = kernels[v];
540        G[v] = consumer;
[5865]541        M.emplace(consumer, v);
542
543        const auto & inputs = consumer->getStreamInputs();
544        for (unsigned i = 0; i < inputs.size(); ++i) {
545            const auto buffer = consumer->getStreamSetInputBuffer(i);
546            if (isa<SourceBuffer>(buffer)) continue;
547            const Kernel * const producer = buffer->getProducer();
[5985]548            assert (consumer != producer);
[5865]549            const Binding & output = producer->getStreamOutput(buffer);
[5985]550            auto ub_out = producer->getUpperBound(output.getRate()) * producer->getStride() * codegen::SegmentSize;
551            if (ub_out > 0) { // unknown output rates are handled by reallocating their buffers
[5865]552                const Binding & input = inputs[i];
553                if (input.hasLookahead()) {
[5985]554                    const auto la = input.getLookahead();
555                    if (LLVM_UNLIKELY(ub_out <= la)) {
556                        llvm::report_fatal_error("lookahead exceeds segment size");
557                    }
558                    ub_out += la;
[5865]559                }
[5985]560                const auto lb_in = consumer->getLowerBound(input.getRate()) * consumer->getStride() * codegen::SegmentSize;
561                const auto min_io_ratio = lb_in / ub_out;
[5865]562                const auto f = M.find(producer); assert (f != M.end());
563                const auto u = f->second;
[5985]564                assert (v != u);
565                assert (G[u] == producer);
[5865]566                // If we have multiple inputs from the same kernel, we only need to consider the "fastest" one
567                bool fastest = true;
[5985]568                for (const auto e : make_iterator_range(in_edges(v, G))) {
569                    if (source(e, G) == u) {
570                        const Channel & p = G[e];
571                        if (min_io_ratio > p.ratio) {
[5865]572                            fastest = false;
[5985]573                        } else if (min_io_ratio < p.ratio) {
574                            clear_in_edges(v, G);
[5865]575                        }
[5985]576                        break;
[5865]577                    }
578                }
579                if (fastest) {
[5985]580                    add_edge(v, u, Channel{min_io_ratio, buffer, i}, G);
[5865]581                }
582            }
583        }
584    }
585
[5985]586    return pruneGraph(std::move(G), std::move(boost::adaptors::reverse(make_iterator_range(vertices(G)))));
[5865]587}
588
589/** ------------------------------------------------------------------------------------------------------------- *
590 * @brief pruneGraph
591 ** ------------------------------------------------------------------------------------------------------------- */
[5985]592template<class VertexList>
593inline PipelineGenerator::ChannelGraph PipelineGenerator::pruneGraph(ChannelGraph && G, VertexList && V) const {
[5856]594    // Take a transitive closure of G but whenever we attempt to insert an edge into the closure
595    // that already exists, check instead whether the rate of our proposed edge is <= the existing
[5865]596    // edge's rate. If so, the data availability/consumption is transitively guaranteed.
[5985]597    for (const auto u : V) {
[5856]598        for (auto ei : make_iterator_range(in_edges(u, G))) {
599            const auto v = source(ei, G);
[5985]600            const Channel & ci = G[ei];
[5865]601            for (auto ej : make_iterator_range(out_edges(u, G))) {
[5856]602                const auto w = target(ej, G);
[5985]603                // ci.rate = BOUND_RATIO(u, v) * (STRIDE(u) / STRIDE(v))
604                const auto scaling = RateValue(G[v]->getStride(), G[w]->getStride());
605                const auto rate = ci.ratio * scaling;
[5856]606                bool insert = true;
607                for (auto ek : make_iterator_range(in_edges(w, G))) {
[5985]608                    // do we already have a vw edge?
[5856]609                    if (source(ek, G) == v) {
[5985]610                        Channel & ck = G[ek];
611                        if (rate <= ck.ratio) {
612                            ck.buffer = nullptr;
[5856]613                        }
614                        insert = false;
615                    }
616                }
617                if (insert) {
[5985]618                    add_edge(v, w, Channel{rate}, G);
[5856]619                }
620            }
621        }
622    }
623
624    // remove any closure edges from G
[5985]625    remove_edge_if([&G](const ChannelGraph::edge_descriptor e) { return G[e].buffer == nullptr; }, G);
[5856]626
[5985]627    for (const auto u : V) {
[5856]628        if (in_degree(u, G) == 0) {
[5985]629            // If we do not need to check any of its inputs, we can avoid testing any output of a kernel
630            // with a rate ratio of at least 1
631            remove_out_edge_if(u, [&G](const ChannelGraph::edge_descriptor e) {
632                return G[e].ratio >= RateValue{1, 1};
633            }, G);
[5856]634        }
635    }
636
[5865]637    return G;
638}
639
640/** ------------------------------------------------------------------------------------------------------------- *
[5856]641 * @brief executeKernel
642 ** ------------------------------------------------------------------------------------------------------------- */
[5985]643void PipelineGenerator::execute(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
[5856]644
[5985]645    const Kernel * const kernel = kernels[index];
646    b->setKernel(kernel);
647
[5856]648    const auto & inputs = kernel->getStreamInputs();
[5985]649    const auto & outputs = kernel->getStreamOutputs();
[5856]650
651    BasicBlock * const kernelEntry = b->GetInsertBlock();
652    BasicBlock * const kernelCode = b->CreateBasicBlock(kernel->getName());
[5985]653    kernelFinished = b->CreateBasicBlock(kernel->getName() + "Finished");
[5865]654    BasicBlock * const kernelExit = b->CreateBasicBlock(kernel->getName() + "Exit");
[5856]655
656    b->CreateUnlikelyCondBr(b->getTerminationSignal(), kernelExit, kernelCode);
657
[5865]658    b->SetInsertPoint(kernelFinished);
[5985]659    terminated = b->CreatePHI(b->getInt1Ty(), 2);
660    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
661        madeProgress = b->CreatePHI(b->getInt1Ty(), 3);
662    }
[5856]663    b->SetInsertPoint(kernelExit);
[5985]664    PHINode * const isFinal = b->CreatePHI(b->getInt1Ty(), 2, kernel->getName() + "_isFinal");
665    isFinal->addIncoming(b->getTrue(), kernelEntry);
666    isFinal->addIncoming(terminated, kernelFinished);
[5856]667
[5985]668    PHINode * progress = nullptr;
669    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
670        progress = b->CreatePHI(b->getInt1Ty(), 2, kernel->getName() + "_anyProgress");
671        progress->addIncoming(anyProgress, kernelEntry);
672        progress->addIncoming(madeProgress, kernelFinished);
673    }
674
[5865]675    // Since it is possible that a sole consumer of some stream could terminate early, set the
676    // initial consumed amount to the amount produced in this iteration.
677    std::vector<PHINode *> consumedItemCountPhi(inputs.size());
678    std::vector<Value *> priorConsumedItemCount(inputs.size());
[5985]679
[5865]680    for (unsigned i = 0; i < inputs.size(); ++i) {
681        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
682        auto c = consumedItemCount.find(buffer);
683        PHINode * const consumedPhi = b->CreatePHI(b->getSizeTy(), 2);
684        Value * consumed = nullptr;
685        if (c == consumedItemCount.end()) {
686            const auto p = producedItemCount.find(buffer);
687            assert (p != producedItemCount.end());
688            consumed = p->second;
689            consumedItemCount.emplace(buffer, consumedPhi);
690        } else {
691            consumed = c->second;
692            c->second = consumedPhi;
693        }
694        consumedPhi->addIncoming(consumed, kernelEntry);
695        consumedItemCountPhi[i] = consumedPhi;
696        priorConsumedItemCount[i] = consumed;
697    }
698
[5856]699    b->SetInsertPoint(kernelCode);
[5865]700
[5985]701    checkIfAllInputKernelsAreTerminated(b, index);
[5865]702
[5985]703    checkAvailableInputData(b, index);
[5865]704
[5985]705    checkAvailableOutputSpace(b, index);
[5941]706
[5985]707    applyOutputBufferExpansions(b, index);
[5941]708
[5985]709    Value * const finalStride = callKernel(b, index);
[5865]710
[5985]711    // TODO: add in some checks to verify the kernel actually adheres to its rate
[5865]712
[5985]713    BasicBlock * const kernelCodeExit = b->GetInsertBlock();
714    terminated->addIncoming(finalStride, kernelCodeExit);
715    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
716        madeProgress->addIncoming(b->getTrue(), kernelCodeExit);
717        anyProgress = progress;
718    }
719    b->CreateBr(kernelFinished);
[5865]720
[5985]721    b->SetInsertPoint(kernelFinished);
[5865]722
[5985]723    // update the consumed item counts
[5856]724    for (unsigned i = 0; i < inputs.size(); ++i) {
725        const Binding & input = inputs[i];
[5985]726        Value * const fullyProcessed = getFullyProcessedItemCount(b, input, terminated);
727        Value * const consumed = b->CreateUMin(priorConsumedItemCount[i], fullyProcessed);
728        consumedItemCountPhi[i]->addIncoming(consumed, kernelFinished);
729    }
730    b->CreateBr(kernelExit);
[5856]731
[5985]732    kernelExit->moveAfter(kernelFinished);
[5865]733
[5985]734    b->SetInsertPoint(kernelExit);
[5865]735
[5985]736    for (unsigned i = 0; i < outputs.size(); ++i) {
737        const Binding & output = outputs[i];
738        Value * const produced = b->getProducedItemCount(output.getName());
739        const StreamSetBuffer * const buffer = kernel->getStreamSetOutputBuffer(i);
740        // if some stream has no consumer, set the consumed item count to the produced item count
741        const auto c = lastConsumer.find(buffer);
742        assert (c != lastConsumer.end());
743        if (LLVM_UNLIKELY(c->second == kernel)) {
744            assert (buffer->getProducer() == kernel);
745            if (LLVM_UNLIKELY(output.getRate().isRelative())) {
[5941]746                continue;
747            }
[5985]748            b->setConsumedItemCount(output.getName(), produced);
749        } else { // otherwise record how many items were produced
750            assert (producedItemCount.count(buffer) == 0);
751            producedItemCount.emplace(buffer, produced);
752        }
[5941]753
[5985]754    }
[5882]755
756
[5985]757    // TODO: if all consumers process the data at a fixed rate, we can just set the consumed item count
758    // by the strideNo rather than tracking it.
[5856]759
760
[5985]761    // If this kernel is the last consumer of a input buffer, update the consumed count for that buffer.
762    // NOTE: unless we can prove that this kernel cannot terminate before any prior consumer, we cannot
763    // put this code into the kernelFinished block.
764    for (unsigned i = 0; i < inputs.size(); ++i) {
765        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
766        const auto c = lastConsumer.find(buffer);
767        assert (c != lastConsumer.end());
768        if (LLVM_LIKELY(c->second == kernel)) {
769            Kernel * const producer = buffer->getProducer();
770            assert (producer != kernel);
771            const auto & output = producer->getStreamOutput(buffer);
772            if (output.getRate().isRelative()) continue;
773            b->setKernel(producer);
774            if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
775                Value * const alreadyConsumed = b->getConsumedItemCount(output.getName());
776                b->CreateAssert(b->CreateICmpULE(alreadyConsumed, consumedItemCountPhi[i]),
777                                producer->getName() + ": " + output.getName() + " consumed item count is not monotonically non-decreasing!");
778            }
779            b->setConsumedItemCount(output.getName(), consumedItemCountPhi[i]);
780            b->setKernel(kernel);
781        }
782    }
[5856]783
[5985]784    dependencyGraph[index] = isFinal;
785}
[5865]786
[5985]787/** ------------------------------------------------------------------------------------------------------------- *
788 * @brief checkAvailableInputData
789 ** ------------------------------------------------------------------------------------------------------------- */
790void PipelineGenerator::checkIfAllInputKernelsAreTerminated(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
791    const auto n = in_degree(index, dependencyGraph);
792    if (LLVM_UNLIKELY(n == 0)) {
793        noMore = b->getFalse();
794    } else {
795        noMore = b->getTrue();
796        for (auto e : make_iterator_range(in_edges(index, dependencyGraph))) {
797            const auto u = source(e, dependencyGraph);
798            Value * const finished = dependencyGraph[u];
799            //b->CallPrintInt("* " + kernels[u]->getName() + "_hasFinished", finished);
800            noMore = b->CreateAnd(noMore, finished);
[5856]801        }
802    }
[5985]803}
[5856]804
[5985]805/** ------------------------------------------------------------------------------------------------------------- *
806 * @brief checkAvailableInputData
807 ** ------------------------------------------------------------------------------------------------------------- */
808void PipelineGenerator::checkAvailableInputData(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
809    const Kernel * const kernel = kernels[index];
810    b->setKernel(kernel);
811    for (auto e : make_iterator_range(in_edges(index, inputGraph))) {
812        const Channel & c = inputGraph[e];
813        const Binding & input = kernel->getStreamInput(c.operand);
[5856]814
[5985]815        Value * requiredInput = getStrideLength(b, kernel, input);
816        if (LLVM_UNLIKELY(input.hasLookahead())) {
817            Constant * const lookahead = b->getSize(input.getLookahead());
818            requiredInput = b->CreateAdd(requiredInput, lookahead);
819        }
820        const auto p = producedItemCount.find(c.buffer);
821        assert (p != producedItemCount.end());
822        Value * const produced = p->second;
823        Value * const processed = b->getNonDeferredProcessedItemCount(input);
824        Value * const unprocessed = b->CreateSub(produced, processed);
825        Value * const hasEnough = b->CreateICmpUGE(unprocessed, requiredInput);
826        Value * const check = b->CreateOr(hasEnough, noMore);
827        terminated->addIncoming(b->getFalse(), b->GetInsertBlock());
828        if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
829            madeProgress->addIncoming(anyProgress, b->GetInsertBlock());
830        }
831        BasicBlock * const hasSufficientInput = b->CreateBasicBlock(kernel->getName() + "_" + input.getName() + "_hasSufficientInput");
832        b->CreateLikelyCondBr(check, hasSufficientInput, kernelFinished);
833        b->SetInsertPoint(hasSufficientInput);
834    }
835}
[5856]836
[5985]837/** ------------------------------------------------------------------------------------------------------------- *
838 * @brief checkAvailableOutputSpace
839 ** ------------------------------------------------------------------------------------------------------------- */
840void PipelineGenerator::checkAvailableOutputSpace(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
841    const Kernel * const kernel = kernels[index];
842    b->setKernel(kernel);
843    for (auto e : make_iterator_range(in_edges(index, outputGraph))) {
844        const Channel & c = outputGraph[e];
845        assert (c.buffer->getProducer() == kernel);
846        const Binding & output = kernel->getStreamOutput(c.buffer);
[5856]847
[5985]848        Value * requiredSpace = getStrideLength(b, kernel, output);
849        const auto & name = output.getName();
850        Value * const produced = b->getNonDeferredProducedItemCount(output);
851        Value * const consumed = b->getConsumedItemCount(name);
852        Value * const unconsumed = b->CreateSub(produced, consumed);
853        requiredSpace = b->CreateAdd(requiredSpace, unconsumed);
854        Value * const capacity = b->getBufferedSize(name);
855        Value * const check = b->CreateICmpULE(requiredSpace, capacity);
856        terminated->addIncoming(b->getFalse(), b->GetInsertBlock());
857        if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
858            madeProgress->addIncoming(anyProgress, b->GetInsertBlock());
[5948]859        }
[5985]860        BasicBlock * const hasOutputSpace = b->CreateBasicBlock(kernel->getName() + "_" + name + "_hasOutputSpace");
861        b->CreateLikelyCondBr(check, hasOutputSpace, kernelFinished);
862        b->SetInsertPoint(hasOutputSpace);
[5948]863    }
[5985]864}
[5948]865
[5985]866/** ------------------------------------------------------------------------------------------------------------- *
867 * @brief getStrideLength
868 ** ------------------------------------------------------------------------------------------------------------- */
869Value * PipelineGenerator::getStrideLength(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel, const Binding & binding) {
870    Value * strideLength = nullptr;
871    const ProcessingRate & rate = binding.getRate();
872    if (rate.isPopCount() || rate.isNegatedPopCount()) {
873        Port refPort; unsigned refIndex;
874        std::tie(refPort, refIndex) = kernel->getStreamPort(rate.getReference());
875        const Binding & ref = kernel->getStreamInput(refIndex);
876        Value * markers = b->loadInputStreamBlock(ref.getName(), b->getSize(0));
877        if (rate.isNegatedPopCount()) {
878            markers = b->CreateNot(markers);
879        }
880        strideLength = b->bitblock_popcount(markers);
881    } else if (binding.hasAttribute(kernel::Attribute::KindId::AlwaysConsume)) {
882        const auto lb = kernel->getLowerBound(rate);
883        strideLength = b->getSize(ceiling(lb * kernel->getStride()));
[5883]884    } else {
[5985]885        const auto ub = kernel->getUpperBound(rate); assert (ub > 0);
886        strideLength = b->getSize(ceiling(ub * kernel->getStride()));
[5856]887    }
[5985]888    return strideLength;
889}
[5883]890
[5985]891/** ------------------------------------------------------------------------------------------------------------- *
892 * @brief callKernel
893 ** ------------------------------------------------------------------------------------------------------------- */
894Value * PipelineGenerator::callKernel(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
[5865]895
[5985]896    const Kernel * const kernel = kernels[index];
897    b->setKernel(kernel);
[5865]898
[5985]899    #ifndef DISABLE_COPY_TO_OVERFLOW
900    // Store how many items we produced by this kernel in the prior iteration. We'll use this to determine when
901    // to mirror the first K segments
902    const auto & outputs = kernel->getStreamOutputs();
903    const auto m = outputs.size();
904
905    std::vector<Value *> initiallyProducedItemCount(m, nullptr);
906    for (unsigned i = 0; i < m; ++i) {
907        const Binding & output = outputs[i];
908        const auto & name = output.getName();
909        const StreamSetBuffer * const buffer = kernel->getOutputStreamSetBuffer(name);
910        if (isConsumedAtNonFixedRate.count(buffer)) {
911            initiallyProducedItemCount[i] = b->getProducedItemCount(name);
912        }
[5865]913    }
[5985]914    #endif
[5856]915
[5985]916    const auto & inputs = kernel->getStreamInputs();
917    const auto n = inputs.size();
918    std::vector<Value *> arguments(n + 2);
[5865]919
[5985]920    Value * isFinal = noMore;
921    for (unsigned i = 0; i < n; ++i) {
922        const Binding & input = inputs[i];
[5865]923        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
924
[5985]925        const auto p = producedItemCount.find(buffer);
926        assert (p != producedItemCount.end());
927        Value * const produced = p->second;
[5865]928
[5985]929        const ProcessingRate & rate = input.getRate();
930        if (rate.isPopCount()) {
931            arguments[i + 2] = produced;
932        } else {
933            const unsigned strideSize = ceiling(kernel->getUpperBound(rate) * kernel->getStride());
934            Value * const processed = b->getNonDeferredProcessedItemCount(input);
935            Value * const limit = b->CreateAdd(processed, b->getSize(strideSize * codegen::SegmentSize));
936            Value * const partial = b->CreateICmpULT(produced, limit);
937            arguments[i + 2] = b->CreateSelect(partial, produced, limit);
938            isFinal = b->CreateAnd(isFinal, partial);
[5865]939        }
940    }
941
[5985]942    // TODO: pass in a strideNo for fixed rate streams to allow the kernel to calculate the current avail,
943    // processed, and produced counts
[5865]944
[5985]945    arguments[0] = kernel->getInstance();
946    arguments[1] = isFinal;
[5865]947
[5985]948    b->createDoSegmentCall(arguments);
949
950    #ifndef DISABLE_COPY_TO_OVERFLOW
951    // For each buffer with an overflow region of K blocks, overwrite the overflow with the first K blocks of
952    // data to ensure that even if this stream is produced at a fixed rate but consumed at a bounded rate,
953    // every kernel has a consistent view of the stream data.
954    for (unsigned i = 0; i < m; ++i) {
955        const Binding & output = outputs[i];
956        const auto & name = output.getName();
957        if (initiallyProducedItemCount[i]) {
958            Value * const bufferSize = b->getBufferedSize(name);
959            Value * const prior = initiallyProducedItemCount[i];
960            Value * const offset = b->CreateURem(prior, bufferSize);
961            Value * const produced = b->getNonDeferredProducedItemCount(output);
962            Value * const buffered = b->CreateAdd(offset, b->CreateSub(produced, prior));
963            BasicBlock * const copyBack = b->CreateBasicBlock(name + "MirrorOverflow");
964            BasicBlock * const done = b->CreateBasicBlock(name + "MirrorOverflowDone");
965            b->CreateCondBr(b->CreateICmpUGT(buffered, bufferSize), copyBack, done);
966            b->SetInsertPoint(copyBack);
967            b->CreateCopyToOverflow(name);
968            b->CreateBr(done);
969            b->SetInsertPoint(done);
970        }
[5865]971    }
[5985]972    #endif
[5865]973
[5985]974    return b->getTerminationSignal();
[5856]975}
976
977/** ------------------------------------------------------------------------------------------------------------- *
978 * @brief applyOutputBufferExpansions
979 ** ------------------------------------------------------------------------------------------------------------- */
[5985]980void PipelineGenerator::applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
981    const Kernel * const kernel = kernels[index];
982    const auto & outputs = kernel->getStreamSetOutputBuffers();
[5706]983    for (unsigned i = 0; i < outputs.size(); i++) {
984        if (isa<DynamicBuffer>(outputs[i])) {
[5985]985
986
987            const auto baseSize = ceiling(kernel->getUpperBound(kernel->getStreamOutput(i).getRate()) * kernel->getStride() * codegen::SegmentSize);
[5755]988            if (LLVM_LIKELY(baseSize > 0)) {
[5856]989
[5985]990                const auto & name = kernel->getStreamOutput(i).getName();
[5856]991
992                BasicBlock * const doExpand = b->CreateBasicBlock(name + "Expand");
993                BasicBlock * const nextBlock = b->GetInsertBlock()->getNextNode();
994                doExpand->moveAfter(b->GetInsertBlock());
995                BasicBlock * const bufferReady = b->CreateBasicBlock(name + "Ready");
996                bufferReady->moveAfter(doExpand);
997                if (nextBlock) nextBlock->moveAfter(bufferReady);
998
999                Value * const produced = b->getProducedItemCount(name);
1000                Value * const consumed = b->getConsumedItemCount(name);
1001                Value * const required = b->CreateAdd(b->CreateSub(produced, consumed), b->getSize(2 * baseSize));
1002
1003                b->CreateCondBr(b->CreateICmpUGT(required, b->getBufferedSize(name)), doExpand, bufferReady);
1004                b->SetInsertPoint(doExpand);
1005
1006                b->doubleCapacity(name);
1007                // Ensure that capacity is sufficient by successive doubling, if necessary.
1008                b->CreateCondBr(b->CreateICmpUGT(required, b->getBufferedSize(name)), doExpand, bufferReady);
1009
1010                b->SetInsertPoint(bufferReady);
1011
[5706]1012            }
1013        }
1014    }
1015}
[5782]1016
[5985]1017
1018/** ------------------------------------------------------------------------------------------------------------- *
1019 * @brief getFullyProcessedItemCount
1020 ** ------------------------------------------------------------------------------------------------------------- */
1021inline Value * PipelineGenerator::getFullyProcessedItemCount(const std::unique_ptr<KernelBuilder> & b, const Binding & input, Value * const isFinal) const {
1022    Value * const processed = b->getProcessedItemCount(input.getName());
1023    if (LLVM_UNLIKELY(input.hasAttribute(kernel::Attribute::KindId::BlockSize))) {
1024        // If the input rate has a block size attribute then --- for the purpose of determining how many
1025        // items have been consumed --- we consider a stream set to be fully processed when an entire
1026        // block (stride?) has been processed.
1027        Constant * const BLOCK_WIDTH = b->getSize(b->getBitBlockWidth());
1028        Value * const partial = b->CreateAnd(processed, ConstantExpr::getNeg(BLOCK_WIDTH));
1029        return b->CreateSelect(isFinal, processed, partial);
1030    }
1031    return processed;
1032}
1033
1034/** ------------------------------------------------------------------------------------------------------------- *
1035 * @brief finalize
1036 ** ------------------------------------------------------------------------------------------------------------- */
1037Value * PipelineGenerator::finalize(const std::unique_ptr<KernelBuilder> & b) {
1038    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
1039        ConstantInt * const ZERO = b->getSize(0);
1040        ConstantInt * const ONE = b->getSize(1);
1041        ConstantInt * const TWO = b->getSize(2);
1042        Value * const count = b->CreateSelect(anyProgress, ZERO, b->CreateAdd(deadLockCounter, ONE));
1043        b->CreateAssert(b->CreateICmpNE(count, TWO), "Dead lock detected: pipeline could not progress after two iterations");
1044        deadLockCounter->addIncoming(count, b->GetInsertBlock());
1045    }
1046    // return whether each sink has terminated
1047    Value * final = b->getTrue();
1048    for (const auto u : make_iterator_range(vertices(dependencyGraph))) {
1049        if (out_degree(u, dependencyGraph) == 0) {
1050            final = b->CreateAnd(final, dependencyGraph[u]);
1051        }
1052    }
1053    return final;
1054
1055}
1056
1057/** ------------------------------------------------------------------------------------------------------------- *
1058 * @brief printGraph
1059 ** ------------------------------------------------------------------------------------------------------------- */
1060void PipelineGenerator::printGraph(const ChannelGraph & G) const {
1061
1062    auto & out = errs();
1063
1064    out << "digraph G {\n";
1065    for (auto u : make_iterator_range(vertices(G))) {
1066        assert (G[u] == kernels[u]);
1067        if (in_degree(u, G) > 0 || out_degree(u, G) > 0) {
1068            out << "v" << u << " [label=\"" << u << ": "
1069                << G[u]->getName() << "\"];\n";
1070        }
1071    }
1072
1073    for (auto e : make_iterator_range(edges(G))) {
1074        const Channel & c = G[e];
1075        const auto s = source(e, G);
1076        const auto t = target(e, G);
1077
1078        out << "v" << s << " -> v" << t
1079            << " [label=\""
1080
1081
1082
1083            << c.ratio.numerator() << " / " << c.ratio.denominator()
1084            << "\"];\n";
1085    }
1086
1087    out << "}\n\n";
1088    out.flush();
1089
1090}
1091
1092/** ------------------------------------------------------------------------------------------------------------- *
1093 * @brief printGraph
1094 ** ------------------------------------------------------------------------------------------------------------- */
1095void PipelineGenerator::printGraph(const DependencyGraph & G) const {
1096
1097    auto & out = errs();
1098
1099    out << "digraph G {\n";
1100    for (auto u : make_iterator_range(vertices(G))) {
1101            out << "v" << u << " [label=\"" << u << ": "
1102                << kernels[u]->getName() << "\"];\n";
1103    }
1104
1105    for (auto e : make_iterator_range(edges(G))) {
1106        const auto s = source(e, G);
1107        const auto t = target(e, G);
1108        out << "v" << s << " -> v" << t << ";\n";
1109    }
1110
1111    out << "}\n\n";
1112    out.flush();
1113
1114}
Note: See TracBrowser for help on using the repository browser.