source: icGREP/icgrep-devel/icgrep/toolchain/pipeline.cpp @ 6047

Last change on this file since 6047 was 6047, checked in by nmedfort, 16 months ago

Major refactoring of buffer types. Static buffers replace Circular and CircularCopyback?. External buffers unify Source/External?.

File size: 49.8 KB
Line 
1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
6#include "pipeline.h"
7#include <toolchain/toolchain.h>
8#include <kernels/kernel.h>
9#include <kernels/streamset.h>
10#include <llvm/IR/Module.h>
11#include <boost/container/flat_set.hpp>
12#include <boost/container/flat_map.hpp>
13#include <boost/graph/adjacency_list.hpp>
14#include <boost/range/adaptor/reversed.hpp>
15#include <kernels/kernel_builder.h>
16#include <llvm/Support/raw_ostream.h>
17
18using namespace kernel;
19using namespace parabix;
20using namespace llvm;
21using namespace boost;
22using namespace boost::container;
23
24#define DISABLE_COPY_TO_OVERFLOW
25
26using Port = Kernel::Port;
27
28Function * makeThreadFunction(const std::unique_ptr<kernel::KernelBuilder> & b, const std::string & name) {
29    Function * const f = Function::Create(FunctionType::get(b->getVoidTy(), {b->getVoidPtrTy()}, false), Function::InternalLinkage, name, b->getModule());
30    f->setCallingConv(CallingConv::C);
31    f->arg_begin()->setName("state");
32    return f;
33}
34
35class PipelineGenerator {
36public:
37
38    template <typename Value>
39    using StreamSetBufferMap = flat_map<const StreamSetBuffer *, Value>;
40
41    using RateValue = ProcessingRate::RateValue;
42
43    PipelineGenerator(const std::vector<Kernel *> & kernels)
44    : kernels(kernels)
45    , terminated(nullptr)
46    , noMore(nullptr)
47    , deadLockCounter(nullptr)
48    , anyProgress(nullptr)
49    , madeProgress(nullptr) {
50
51    }
52
53    struct Channel {
54        Channel() = default;
55        Channel(const RateValue & ratio, const StreamSetBuffer * const buffer = nullptr, const unsigned operand = 0)
56        : ratio(ratio), buffer(buffer), operand(operand) { }
57
58        RateValue               ratio;
59        const StreamSetBuffer * buffer;
60        unsigned                operand;
61    };
62
63    using ChannelGraph = adjacency_list<vecS, vecS, bidirectionalS, const Kernel *, Channel>;
64
65    using DependencyGraph = adjacency_list<hash_setS, vecS, bidirectionalS, Value *>;
66
67    using KernelMap = flat_map<const Kernel *, unsigned>;
68
69    void initialize(const std::unique_ptr<KernelBuilder> & b, BasicBlock * const entryBlock);
70
71    void execute(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
72
73    Value * finalize(const std::unique_ptr<KernelBuilder> & b);
74
75protected:
76
77    ChannelGraph makeInputGraph() const;
78
79    ChannelGraph makeOutputGraph() const;
80
81    DependencyGraph makeDependencyGraph() const;
82
83    template<class VertexList>
84    ChannelGraph pruneGraph(ChannelGraph && G, VertexList && V) const;
85
86    void checkIfAllInputKernelsHaveFinished(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
87
88    void checkAvailableInputData(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
89
90    void checkAvailableOutputSpace(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
91
92    Value * getStrideLength(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel, const Binding & binding);
93
94    Value * callKernel(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
95
96    void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel *kernel);
97
98    void runKernel(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel);
99
100    Value * getFullyProcessedItemCount(const std::unique_ptr<KernelBuilder> & b, const Binding & binding, Value * const final) const;
101
102    void printGraph(const ChannelGraph & G) const;
103
104    void printGraph(const DependencyGraph & G) const;
105
106private:
107
108    const std::vector<Kernel *> &       kernels;
109    PHINode *                           terminated;
110
111    Value *                             noMore;
112
113    DependencyGraph                     dependencyGraph;
114    ChannelGraph                        inputGraph;
115    ChannelGraph                        outputGraph;
116
117    BasicBlock *                        kernelFinished;
118
119    PHINode *                           deadLockCounter;
120    Value *                             anyProgress;
121    PHINode *                           madeProgress;
122
123    StreamSetBufferMap<Value *>         producedItemCount;
124    StreamSetBufferMap<Value *>         consumedItemCount;
125    StreamSetBufferMap<const Kernel *>  lastConsumer;
126    flat_set<const StreamSetBuffer *>   isConsumedAtNonFixedRate;
127};
128
129/** ------------------------------------------------------------------------------------------------------------- *
130 * @brief generateSegmentParallelPipeline
131 *
132 * Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
133 * operating over an input stream set S, a segment-parallel implementation divides the input
134 * into segments and coordinates a set of T <= K threads to each process one segment at a time.
135 * Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
136 * fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
137 ** ------------------------------------------------------------------------------------------------------------- */
138void generateSegmentParallelPipeline(const std::unique_ptr<KernelBuilder> & b, const std::vector<Kernel *> & kernels) {
139
140    const unsigned n = kernels.size();
141    Module * const m = b->getModule();
142    IntegerType * const sizeTy = b->getSizeTy();
143    PointerType * const voidPtrTy = b->getVoidPtrTy();
144    Constant * nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
145    std::vector<Type *> structTypes;
146    codegen::BufferSegments = std::max(codegen::BufferSegments, codegen::ThreadNum);
147
148    Value * instance[n];
149    for (unsigned i = 0; i < n; ++i) {
150        instance[i] = kernels[i]->getInstance();
151        structTypes.push_back(instance[i]->getType());
152    }
153    StructType * const sharedStructType = StructType::get(m->getContext(), structTypes);
154    StructType * const threadStructType = StructType::get(m->getContext(), {sharedStructType->getPointerTo(), sizeTy});
155
156    const auto ip = b->saveIP();
157
158    Function * const threadFunc = makeThreadFunction(b, "segment");
159    auto args = threadFunc->arg_begin();
160
161    // -------------------------------------------------------------------------------------------------------------------------
162    // MAKE SEGMENT PARALLEL PIPELINE THREAD
163    // -------------------------------------------------------------------------------------------------------------------------
164
165     // Create the basic blocks for the thread function.
166    BasicBlock * entryBlock = BasicBlock::Create(b->getContext(), "entry", threadFunc);
167    b->SetInsertPoint(entryBlock);
168
169    Value * const threadStruct = b->CreateBitCast(&*(args), threadStructType->getPointerTo());
170
171    Value * const sharedStatePtr = b->CreateLoad(b->CreateGEP(threadStruct, {b->getInt32(0), b->getInt32(0)}));
172    for (unsigned k = 0; k < n; ++k) {
173        Value * ptr = b->CreateLoad(b->CreateGEP(sharedStatePtr, {b->getInt32(0), b->getInt32(k)}));
174        kernels[k]->setInstance(ptr);
175    }
176    Value * const segOffset = b->CreateLoad(b->CreateGEP(threadStruct, {b->getInt32(0), b->getInt32(1)}));
177
178    PipelineGenerator G(kernels);
179
180    BasicBlock * const segmentLoop = b->CreateBasicBlock("segmentLoop");
181    b->CreateBr(segmentLoop);
182
183    b->SetInsertPoint(segmentLoop);   
184    PHINode * const segNo = b->CreatePHI(b->getSizeTy(), 2, "segNo");
185    segNo->addIncoming(segOffset, entryBlock);
186    G.initialize(b, entryBlock);
187
188    Value * cycleCountStart = nullptr;
189    Value * cycleCountEnd = nullptr;
190    if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
191        cycleCountStart = b->CreateReadCycleCounter();
192    }
193
194    const bool serialize = codegen::DebugOptionIsSet(codegen::SerializeThreads);
195
196    for (unsigned k = 0; k < n; ++k) {
197
198        const Kernel * const kernel = kernels[k];
199
200        BasicBlock * const kernelWait = b->CreateBasicBlock(kernel->getName() + "Wait");
201        b->CreateBr(kernelWait);
202
203        b->SetInsertPoint(kernelWait);
204        b->setKernel(kernels[serialize ? (n - 1) : k]);
205        Value * const processedSegmentCount = b->acquireLogicalSegmentNo();
206        assert (processedSegmentCount->getType() == segNo->getType());
207        Value * const ready = b->CreateICmpEQ(segNo, processedSegmentCount);
208
209        BasicBlock * const kernelCheck = b->CreateBasicBlock(kernel->getName() + "Check");
210        b->CreateCondBr(ready, kernelCheck, kernelWait);
211
212        b->SetInsertPoint(kernelCheck);
213
214        G.execute(b, k);
215
216        b->releaseLogicalSegmentNo(b->CreateAdd(segNo, b->getSize(1)));
217
218        if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
219            cycleCountEnd = b->CreateReadCycleCounter();
220            Value * counterPtr = b->getCycleCountPtr();
221            b->CreateStore(b->CreateAdd(b->CreateLoad(counterPtr), b->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
222            cycleCountStart = cycleCountEnd;
223        }
224
225    }
226
227    segNo->addIncoming(b->CreateAdd(segNo, b->getSize(codegen::ThreadNum)), b->GetInsertBlock());
228    Value * const finished = G.finalize(b);
229    BasicBlock * const segmentExit = b->CreateBasicBlock("segmentExit");
230    b->CreateUnlikelyCondBr(finished, segmentExit, segmentLoop);
231
232    b->SetInsertPoint(segmentExit);
233    // only call pthread_exit() within spawned threads; otherwise it'll be equivalent to calling exit() within the process
234    BasicBlock * const exitThread = b->CreateBasicBlock("ExitThread");
235    BasicBlock * const exitFunction = b->CreateBasicBlock("ExitProcessFunction");   
236    b->CreateCondBr(b->CreateIsNull(segOffset), exitFunction, exitThread);
237    b->SetInsertPoint(exitThread);
238    b->CreatePThreadExitCall(nullVoidPtrVal);
239    b->CreateBr(exitFunction);   
240    b->SetInsertPoint(exitFunction);
241    b->CreateRetVoid();
242
243    // -------------------------------------------------------------------------------------------------------------------------
244    b->restoreIP(ip);
245
246    for (unsigned i = 0; i < n; ++i) {
247        kernels[i]->setInstance(instance[i]);
248    }
249
250    // -------------------------------------------------------------------------------------------------------------------------
251    // MAKE SEGMENT PARALLEL PIPELINE DRIVER
252    // -------------------------------------------------------------------------------------------------------------------------
253    const unsigned threads = codegen::ThreadNum - 1;
254    assert (codegen::ThreadNum > 0);
255    Type * const pthreadsTy = ArrayType::get(sizeTy, threads);
256    AllocaInst * const pthreads = b->CreateAlloca(pthreadsTy);
257    Value * threadIdPtr[threads];
258
259    for (unsigned i = 0; i < threads; ++i) {
260        threadIdPtr[i] = b->CreateGEP(pthreads, {b->getInt32(0), b->getInt32(i)});
261    }
262
263    for (unsigned i = 0; i < n; ++i) {
264        b->setKernel(kernels[i]);
265        b->releaseLogicalSegmentNo(b->getSize(0));
266    }
267
268    AllocaInst * const sharedStruct = b->CreateCacheAlignedAlloca(sharedStructType);
269    for (unsigned i = 0; i < n; ++i) {
270        Value * ptr = b->CreateGEP(sharedStruct, {b->getInt32(0), b->getInt32(i)});
271        b->CreateStore(kernels[i]->getInstance(), ptr);
272    }
273
274    // use the process thread to handle the initial segment function after spawning (n - 1) threads to handle the subsequent offsets
275    for (unsigned i = 0; i < threads; ++i) {
276        AllocaInst * const threadState = b->CreateAlloca(threadStructType);
277        b->CreateStore(sharedStruct, b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(0)}));
278        b->CreateStore(b->getSize(i + 1), b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(1)}));
279        b->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, threadFunc, threadState);
280    }
281
282    AllocaInst * const threadState = b->CreateAlloca(threadStructType);
283    b->CreateStore(sharedStruct, b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(0)}));
284    b->CreateStore(b->getSize(0), b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(1)}));
285    b->CreateCall(threadFunc, b->CreatePointerCast(threadState, voidPtrTy));
286
287    AllocaInst * const status = b->CreateAlloca(voidPtrTy);
288    for (unsigned i = 0; i < threads; ++i) {
289        Value * threadId = b->CreateLoad(threadIdPtr[i]);
290        b->CreatePThreadJoinCall(threadId, status);
291    }
292
293    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
294        Value* FP_100 = ConstantFP::get(b->getDoubleTy(), 100.0);
295        Value* totalCycles = b->getSize(0);
296        for (const Kernel * kernel : kernels) {
297            b->setKernel(kernel);
298            Value * cycles = b->CreateLoad(b->getCycleCountPtr());
299            totalCycles = b->CreateAdd(totalCycles, cycles);
300        }
301        Value* fTotalCycle = b->CreateUIToFP(totalCycles, b->getDoubleTy());
302
303        for (const Kernel * kernel : kernels) {
304            b->setKernel(kernel);
305            const auto & inputs = kernel->getStreamInputs();
306            const auto & outputs = kernel->getStreamOutputs();
307            Value * items = nullptr;
308            if (inputs.empty()) {
309                items = b->getProducedItemCount(outputs[0].getName());
310            } else {
311                items = b->getProcessedItemCount(inputs[0].getName());
312            }
313            Value * fItems = b->CreateUIToFP(items, b->getDoubleTy());
314            Value * cycles = b->CreateLoad(b->getCycleCountPtr());
315            Value * fCycles = b->CreateUIToFP(cycles, b->getDoubleTy());
316            Value * percentage = b->CreateFDiv(b->CreateFMul(fCycles, FP_100), fTotalCycle);
317
318            const auto formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item,  %2.2f%% of Total CPU Cycles. \n ";
319            Value * stringPtr = b->CreatePointerCast(b->GetString(formatString), b->getInt8PtrTy());
320            b->CreateCall(b->GetDprintf(), {b->getInt32(2), stringPtr, fItems, fCycles, b->CreateFDiv(fCycles, fItems), percentage});
321        }
322    }
323   
324}
325
326
327/** ------------------------------------------------------------------------------------------------------------- *
328 * @brief generatePipelineLoop
329 ** ------------------------------------------------------------------------------------------------------------- */
330void generatePipelineLoop(const std::unique_ptr<KernelBuilder> & b, const std::vector<Kernel *> & kernels) {
331
332    // Create the basic blocks for the loop.
333    BasicBlock * const entryBlock = b->GetInsertBlock();
334    BasicBlock * const pipelineLoop = b->CreateBasicBlock("pipelineLoop");
335
336    PipelineGenerator G(kernels);
337
338    b->CreateBr(pipelineLoop);
339
340    b->SetInsertPoint(pipelineLoop);   
341    PHINode * const segNo = b->CreatePHI(b->getSizeTy(), 2, "segNo");
342    segNo->addIncoming(b->getSize(0), entryBlock);
343    G.initialize(b, entryBlock);
344
345    Value * const nextSegNo = b->CreateAdd(segNo, b->getSize(1));
346
347    Value * cycleCountStart = nullptr;
348    Value * cycleCountEnd = nullptr;
349    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
350        cycleCountStart = b->CreateReadCycleCounter();
351    }
352
353    for (unsigned i = 0; i < kernels.size(); ++i) {
354
355        G.execute(b, i);
356
357        b->releaseLogicalSegmentNo(nextSegNo);
358
359        if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
360            cycleCountEnd = b->CreateReadCycleCounter();
361            Value * counterPtr = b->getCycleCountPtr();
362            b->CreateStore(b->CreateAdd(b->CreateLoad(counterPtr), b->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
363            cycleCountStart = cycleCountEnd;
364        }
365    }
366
367    segNo->addIncoming(nextSegNo, b->GetInsertBlock());
368    BasicBlock * const pipelineExit = b->CreateBasicBlock("pipelineExit");
369    Value * const finished = G.finalize(b);
370    b->CreateCondBr(finished, pipelineExit, pipelineLoop);
371
372    b->SetInsertPoint(pipelineExit);
373
374    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
375        Value* FP_100 = ConstantFP::get(b->getDoubleTy(), 100.0);
376        Value* totalCycles = b->getSize(0);
377        for (const Kernel * kernel : kernels) {
378            b->setKernel(kernel);
379            Value * cycles = b->CreateLoad(b->getCycleCountPtr());
380            totalCycles = b->CreateAdd(totalCycles, cycles);
381        }
382        Value* fTotalCycle = b->CreateUIToFP(totalCycles, b->getDoubleTy());
383
384        for (const Kernel * kernel : kernels) {
385            b->setKernel(kernel);
386            const auto & inputs = kernel->getStreamInputs();
387            const auto & outputs = kernel->getStreamOutputs();
388            Value * items = nullptr;
389            if (inputs.empty()) {
390                items = b->getProducedItemCount(outputs[0].getName());
391            } else {
392                items = b->getProcessedItemCount(inputs[0].getName());
393            }
394            Value * fItems = b->CreateUIToFP(items, b->getDoubleTy());
395            Value * cycles = b->CreateLoad(b->getCycleCountPtr());
396            Value * fCycles = b->CreateUIToFP(cycles, b->getDoubleTy());
397            Value * percentage = b->CreateFDiv(b->CreateFMul(fCycles, FP_100), fTotalCycle);
398
399            const auto formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item,  %2.2f%% of Total CPU Cycles. \n ";
400            Value * stringPtr = b->CreatePointerCast(b->GetString(formatString), b->getInt8PtrTy());
401            b->CreateCall(b->GetDprintf(), {b->getInt32(2), stringPtr, fItems, fCycles, b->CreateFDiv(fCycles, fItems), percentage});
402        }
403    }
404
405}
406
407/** ------------------------------------------------------------------------------------------------------------- *
408 * @brief initialize
409 ** ------------------------------------------------------------------------------------------------------------- */
410void PipelineGenerator::initialize(const std::unique_ptr<KernelBuilder> & b, BasicBlock * const entryBlock) {
411
412    dependencyGraph = makeDependencyGraph();
413    inputGraph = makeInputGraph();
414    outputGraph = makeOutputGraph();
415
416    for (Kernel * const kernel : kernels) {
417        const auto & inputs = kernel->getStreamInputs();
418        for (unsigned i = 0; i < inputs.size(); ++i) {
419            const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
420            if (kernel->requiresCopyBack(inputs[i]) && !buffer->isUnbounded()) {
421                if (LLVM_LIKELY(buffer->supportsCopyBack())) {
422                    isConsumedAtNonFixedRate.insert(buffer);
423                } else {
424    //                std::string tmp;
425    //                raw_string_ostream out(tmp);
426    //                out << kernel->getName() << " : " << name << " must have an overflow";
427    //                report_fatal_error(out.str());
428                }
429            }
430        }
431        // if this kernel consumes this buffer, update the last consumer
432        for (const StreamSetBuffer * const buffer : kernel->getStreamSetInputBuffers()) {
433            auto f = lastConsumer.find(buffer);
434            assert (f != lastConsumer.end());
435            f->second = kernel;
436        }
437        // incase some output is never consumed, make the kernel that produced it the initial "last consumer"
438        for (const StreamSetBuffer * const buffer : kernel->getStreamSetOutputBuffers()) {
439            assert (buffer->getProducer() == kernel);
440            assert (lastConsumer.count(buffer) == 0);
441            lastConsumer.emplace(buffer, kernel);
442        }
443    }
444
445    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
446        deadLockCounter = b->CreatePHI(b->getSizeTy(), 2, "deadLockCounter");
447        deadLockCounter->addIncoming(b->getSize(0), entryBlock);
448        anyProgress = b->getFalse();
449    }
450
451}
452
453/** ------------------------------------------------------------------------------------------------------------- *
454 * @brief makeTerminationGraph
455 *
456 * The input graph models whether a kernel could *consume* more data than may be produced by a preceeding kernel.
457 ** ------------------------------------------------------------------------------------------------------------- */
458PipelineGenerator::DependencyGraph PipelineGenerator::makeDependencyGraph() const {
459    const auto n = kernels.size();
460    DependencyGraph G(n);
461    KernelMap M;
462    // construct a kernel dependency graph
463    for (unsigned v = 0; v < n; ++v) {
464        const Kernel * const kernel = kernels[v];       
465        for (const StreamSetBuffer * buf : kernel->getStreamSetInputBuffers()) {
466            const auto f = M.find(buf->getProducer()); assert (f != M.end());
467            add_edge(f->second, v, G);
468        }
469        M.emplace(kernel, v);
470    }
471    // generate a transitive closure
472    for (unsigned u = 0; u < n; ++u) {
473        for (auto e : make_iterator_range(in_edges(u, G))) {
474            const auto s = source(e, G);
475            for (auto f : make_iterator_range(out_edges(u, G))) {
476                add_edge(s, target(f, G), G);
477            }
478        }
479    }
480    // then take the transitive reduction
481    std::vector<unsigned> sources;
482    for (unsigned u = n; u-- > 0; ) {
483        if (in_degree(u, G) > 0 && out_degree(u, G) > 0) {
484            for (auto e : make_iterator_range(in_edges(u, G))) {
485                sources.push_back(source(e, G));
486            }
487            std::sort(sources.begin(), sources.end());
488            for (auto e : make_iterator_range(out_edges(u, G))) {
489                remove_in_edge_if(target(e, G), [&G, &sources](const DependencyGraph::edge_descriptor f) {
490                    return std::binary_search(sources.begin(), sources.end(), source(f, G));
491                }, G);
492            }
493            sources.clear();
494        }
495    }
496    return G;
497}
498
499/** ------------------------------------------------------------------------------------------------------------- *
500 * @brief makeInputGraph
501 *
502 * The input graph models whether a kernel could *consume* more data than may be produced by a preceeding kernel.
503 ** ------------------------------------------------------------------------------------------------------------- */
504PipelineGenerator::ChannelGraph PipelineGenerator::makeInputGraph() const {
505    const auto n = kernels.size();
506    ChannelGraph G(n);
507    KernelMap M;
508    for (unsigned v = 0; v < n; ++v) {
509
510        const Kernel * const consumer = kernels[v];
511        G[v] = consumer;
512        M.emplace(consumer, v);
513
514        const auto & inputs = consumer->getStreamInputs();
515        for (unsigned i = 0; i < inputs.size(); ++i) {
516
517            const Binding & input = inputs[i];
518            auto ub_in = consumer->getUpperBound(input.getRate()) * consumer->getStride() * codegen::SegmentSize; assert (ub_in > 0);
519            if (input.hasLookahead()) {
520                ub_in += input.getLookahead();
521            }
522
523            const auto buffer = consumer->getStreamSetInputBuffer(i);
524            const Kernel * const producer = buffer->getProducer();
525            const Binding & output = producer->getStreamOutput(buffer);
526            const auto lb_out = producer->getLowerBound(output.getRate()) * producer->getStride() * codegen::SegmentSize;
527
528            const auto min_oi_ratio = lb_out / ub_in;
529            const auto f = M.find(producer); assert (f != M.end());
530            const auto u = f->second;
531            // If we have multiple inputs from the same kernel, we only need to consider the "slowest" one
532            bool slowest = true;
533            for (const auto e : make_iterator_range(in_edges(v, G))) {
534                if (source(e, G) == u) {
535                    const Channel & p = G[e];
536                    if (min_oi_ratio > p.ratio) {
537                        slowest = false;
538                    } else if (min_oi_ratio < p.ratio) {
539                        clear_in_edges(v, G);
540                    }
541                    break;
542                }
543            }
544            if (slowest) {
545                add_edge(u, v, Channel{min_oi_ratio, buffer, i}, G);
546            }
547        }
548    }
549
550    return pruneGraph(std::move(G), make_iterator_range(vertices(G)));
551}
552
553/** ------------------------------------------------------------------------------------------------------------- *
554 * @brief makeOutputGraph
555 *
556 * The output graph models whether a kernel could *produce* more data than may be consumed by a subsequent kernel.
557 ** ------------------------------------------------------------------------------------------------------------- */
558PipelineGenerator::ChannelGraph PipelineGenerator::makeOutputGraph() const {
559    const auto n = kernels.size();
560    ChannelGraph G(n);
561    KernelMap M;
562    for (unsigned v = 0; v < n; ++v) {
563        const Kernel * const consumer = kernels[v];
564        G[v] = consumer;
565        M.emplace(consumer, v);
566
567        const auto & inputs = consumer->getStreamInputs();
568        for (unsigned i = 0; i < inputs.size(); ++i) {
569            const auto buffer = consumer->getStreamSetInputBuffer(i);
570            if (isa<ExternalBuffer>(buffer)) continue;
571            const Kernel * const producer = buffer->getProducer();
572            assert (consumer != producer);
573            const Binding & output = producer->getStreamOutput(buffer);
574            auto ub_out = producer->getUpperBound(output.getRate()) * producer->getStride() * codegen::SegmentSize;
575            if (ub_out > 0) { // unknown output rates are handled by reallocating their buffers
576                const Binding & input = inputs[i];
577                if (input.hasLookahead()) {
578                    const auto la = input.getLookahead();
579                    if (LLVM_UNLIKELY(ub_out <= la)) {
580                        llvm::report_fatal_error("lookahead exceeds segment size");
581                    }
582                    ub_out += la;
583                }
584                const auto lb_in = consumer->getLowerBound(input.getRate()) * consumer->getStride() * codegen::SegmentSize;
585                const auto min_io_ratio = lb_in / ub_out;
586                const auto f = M.find(producer); assert (f != M.end());
587                const auto u = f->second;
588                assert (v != u);
589                assert (G[u] == producer);
590                // If we have multiple inputs from the same kernel, we only need to consider the "fastest" one
591                bool fastest = true;
592                for (const auto e : make_iterator_range(in_edges(v, G))) {
593                    if (source(e, G) == u) {
594                        const Channel & p = G[e];
595                        if (min_io_ratio > p.ratio) {
596                            fastest = false;
597                        } else if (min_io_ratio < p.ratio) {
598                            clear_in_edges(v, G);
599                        }
600                        break;
601                    }
602                }
603                if (fastest) {
604                    add_edge(v, u, Channel{min_io_ratio, buffer, i}, G);
605                }
606            }
607        }
608    }
609
610    return pruneGraph(std::move(G), boost::adaptors::reverse(make_iterator_range(vertices(G))));
611}
612
613/** ------------------------------------------------------------------------------------------------------------- *
614 * @brief pruneGraph
615 ** ------------------------------------------------------------------------------------------------------------- */
616template<class VertexList>
617inline PipelineGenerator::ChannelGraph PipelineGenerator::pruneGraph(ChannelGraph && G, VertexList && V) const {
618    // Take a transitive closure of G but whenever we attempt to insert an edge into the closure
619    // that already exists, check instead whether the rate of our proposed edge is <= the existing
620    // edge's rate. If so, the data availability/consumption is transitively guaranteed.
621    for (const auto u : V) {
622        for (auto ei : make_iterator_range(in_edges(u, G))) {
623            const auto v = source(ei, G);
624            const Channel & ci = G[ei];
625            for (auto ej : make_iterator_range(out_edges(u, G))) {
626                const auto w = target(ej, G);
627                // ci.rate = BOUND_RATIO(u, v) * (STRIDE(u) / STRIDE(v))
628                const auto scaling = RateValue(G[v]->getStride(), G[w]->getStride());
629                const auto rate = ci.ratio * scaling;
630                bool insert = true;
631                for (auto ek : make_iterator_range(in_edges(w, G))) {
632                    // do we already have a vw edge?
633                    if (source(ek, G) == v) {
634                        Channel & ck = G[ek];
635                        if (rate <= ck.ratio) {
636                            ck.buffer = nullptr;
637                        }
638                        insert = false;
639                    }
640                }
641                if (insert) {
642                    add_edge(v, w, Channel{rate}, G);
643                }
644            }
645        }
646    }
647
648    // remove any closure edges from G
649    remove_edge_if([&G](const ChannelGraph::edge_descriptor e) { return G[e].buffer == nullptr; }, G);
650
651    for (const auto u : V) {
652        if (in_degree(u, G) == 0) {
653            // If we do not need to check any of its inputs, we can avoid testing any output of a kernel
654            // with a rate ratio of at least 1
655            remove_out_edge_if(u, [&G](const ChannelGraph::edge_descriptor e) {
656                return G[e].ratio >= RateValue{1, 1};
657            }, G);
658        }
659    }
660
661    return G;
662}
663
664/** ------------------------------------------------------------------------------------------------------------- *
665 * @brief executeKernel
666 ** ------------------------------------------------------------------------------------------------------------- */
667void PipelineGenerator::execute(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
668
669    const Kernel * const kernel = kernels[index];
670    b->setKernel(kernel);
671
672    const auto & inputs = kernel->getStreamInputs();
673    const auto & outputs = kernel->getStreamOutputs();
674
675    BasicBlock * const kernelEntry = b->GetInsertBlock();
676    BasicBlock * const kernelCode = b->CreateBasicBlock(kernel->getName());
677    kernelFinished = b->CreateBasicBlock(kernel->getName() + "Finished");
678    BasicBlock * const kernelExit = b->CreateBasicBlock(kernel->getName() + "Exit");
679
680    b->CreateUnlikelyCondBr(b->getTerminationSignal(), kernelExit, kernelCode);
681
682    b->SetInsertPoint(kernelFinished);
683    terminated = b->CreatePHI(b->getInt1Ty(), 2);
684    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
685        madeProgress = b->CreatePHI(b->getInt1Ty(), 3);
686    }
687    b->SetInsertPoint(kernelExit);
688    PHINode * const isFinal = b->CreatePHI(b->getInt1Ty(), 2, kernel->getName() + "_isFinal");
689    isFinal->addIncoming(b->getTrue(), kernelEntry);
690    isFinal->addIncoming(terminated, kernelFinished);
691
692    PHINode * progress = nullptr;
693    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
694        progress = b->CreatePHI(b->getInt1Ty(), 2, kernel->getName() + "_anyProgress");
695        progress->addIncoming(anyProgress, kernelEntry);
696        progress->addIncoming(madeProgress, kernelFinished);
697    }
698
699    // Since it is possible that a sole consumer of some stream could terminate early, set the
700    // initial consumed amount to the amount produced in this iteration.
701
702    // First determine the priorConsumedItemCounts, making sure
703    // that none of them are previous input buffers of this kernel!
704    std::vector<Value *> priorConsumedItemCount(inputs.size());
705
706    for (unsigned i = 0; i < inputs.size(); ++i) {
707        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
708        auto c = consumedItemCount.find(buffer);
709        if (c == consumedItemCount.end()) {
710            const auto p = producedItemCount.find(buffer);
711            assert (p != producedItemCount.end());
712            priorConsumedItemCount[i] = p->second;
713        } else {
714            priorConsumedItemCount[i] = c->second;
715        }
716    }
717
718    std::vector<PHINode *> consumedItemCountPhi(inputs.size());
719
720    for (unsigned i = 0; i < inputs.size(); ++i) {
721        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
722        PHINode * const consumedPhi = b->CreatePHI(b->getSizeTy(), 2);
723        auto c = consumedItemCount.find(buffer);
724        if (c == consumedItemCount.end()) {
725            consumedItemCount.emplace(buffer, consumedPhi);
726        } else {
727            c->second = consumedPhi;
728        }
729        consumedPhi->addIncoming(priorConsumedItemCount[i], kernelEntry);
730        consumedItemCountPhi[i] = consumedPhi;
731    }
732
733    b->SetInsertPoint(kernelCode);
734
735    Value * const finalStride = callKernel(b, index);
736
737    // TODO: add in some checks to verify the kernel actually adheres to its rate
738
739
740    BasicBlock * const kernelCodeExit = b->GetInsertBlock();
741    terminated->addIncoming(finalStride, kernelCodeExit);
742    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
743        madeProgress->addIncoming(b->getTrue(), kernelCodeExit);
744        anyProgress = progress;
745    }
746    b->CreateBr(kernelFinished);
747
748    b->SetInsertPoint(kernelFinished);
749
750    // update the consumed item counts
751    for (unsigned i = 0; i < inputs.size(); ++i) {
752        const Binding & input = inputs[i];
753        Value * const fullyProcessed = getFullyProcessedItemCount(b, input, terminated);
754        Value * const consumed = b->CreateUMin(priorConsumedItemCount[i], fullyProcessed);
755        consumedItemCountPhi[i]->addIncoming(consumed, kernelFinished);
756    }
757    b->CreateBr(kernelExit);
758
759    kernelExit->moveAfter(kernelFinished);
760
761    b->SetInsertPoint(kernelExit);
762
763    for (unsigned i = 0; i < outputs.size(); ++i) {
764        const Binding & output = outputs[i];
765        Value * const produced = b->getProducedItemCount(output.getName());
766        const StreamSetBuffer * const buffer = kernel->getStreamSetOutputBuffer(i);
767        // if some stream has no consumer, set the consumed item count to the produced item count
768        const auto c = lastConsumer.find(buffer);
769        assert (c != lastConsumer.end());
770        if (LLVM_UNLIKELY(c->second == kernel)) {
771            assert (buffer->getProducer() == kernel);
772            if (LLVM_UNLIKELY(output.getRate().isRelative())) {
773                continue;
774            }
775            b->setConsumedItemCount(output.getName(), produced);
776        } else { // otherwise record how many items were produced
777            assert (producedItemCount.count(buffer) == 0);
778            producedItemCount.emplace(buffer, produced);
779        }
780    }
781
782    // If this kernel is the last consumer of a input buffer, update the consumed count for that buffer.
783
784    // TODO: if all consumers process the data at a fixed rate, we can just set the consumed item count
785    // by the strideNo rather than tracking it.
786
787    // TODO: a kernel could take the same stream set for multiple arguments.
788
789    // TODO: if we can prove that this kernel cannot terminate before any prior consumer, this code
790    // could be executed in kernelFinished block.
791    for (unsigned i = 0; i < inputs.size(); ++i) {
792        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
793        const auto c = lastConsumer.find(buffer);
794        assert (c != lastConsumer.end());
795        if (LLVM_LIKELY(c->second == kernel)) {
796            Kernel * const producer = buffer->getProducer();
797            assert (producer != kernel);
798            const auto & output = producer->getStreamOutput(buffer);
799            if (output.getRate().isRelative()) continue;
800            b->setKernel(producer);
801            b->setConsumedItemCount(output.getName(), consumedItemCountPhi[i]);
802            b->setKernel(kernel);
803        }
804    }
805
806    dependencyGraph[index] = isFinal;
807}
808
809/** ------------------------------------------------------------------------------------------------------------- *
810 * @brief checkIfAllInputKernelsHaveFinished
811 ** ------------------------------------------------------------------------------------------------------------- */
812void PipelineGenerator::checkIfAllInputKernelsHaveFinished(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
813    const auto n = in_degree(index, dependencyGraph);
814    if (LLVM_UNLIKELY(n == 0)) {
815        noMore = b->getFalse();
816    } else {
817        noMore = b->getTrue();
818        for (auto e : make_iterator_range(in_edges(index, dependencyGraph))) {
819            const auto u = source(e, dependencyGraph);
820            Value * const finished = dependencyGraph[u];
821            noMore = b->CreateAnd(noMore, finished);
822        }
823    }
824}
825
826/** ------------------------------------------------------------------------------------------------------------- *
827 * @brief checkAvailableInputData
828 ** ------------------------------------------------------------------------------------------------------------- */
829void PipelineGenerator::checkAvailableInputData(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
830    const Kernel * const kernel = kernels[index];
831    b->setKernel(kernel);
832    for (auto e : make_iterator_range(in_edges(index, inputGraph))) {
833        const Channel & c = inputGraph[e];
834        const Binding & input = kernel->getStreamInput(c.operand);
835        Value * requiredInput = getStrideLength(b, kernel, input);
836        if (LLVM_UNLIKELY(input.hasLookahead())) {
837            Constant * const lookahead = b->getSize(input.getLookahead());
838            requiredInput = b->CreateAdd(requiredInput, lookahead);
839        }
840        const auto p = producedItemCount.find(c.buffer);
841        assert (p != producedItemCount.end());
842        Value * const produced = p->second;
843        Value * const processed = b->getNonDeferredProcessedItemCount(input);
844        Value * const unprocessed = b->CreateSub(produced, processed);
845        Value * const hasEnough = b->CreateICmpUGE(unprocessed, requiredInput);
846        terminated->addIncoming(b->getFalse(), b->GetInsertBlock());
847        if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
848            madeProgress->addIncoming(anyProgress, b->GetInsertBlock());
849        }
850        const auto prefix = kernel->getName() + "_" + input.getName();
851        BasicBlock * const hasSufficientInput = b->CreateBasicBlock(prefix + "_hasSufficientInput");
852        Value * const check = b->CreateOr(hasEnough, noMore);
853        b->CreateLikelyCondBr(check, hasSufficientInput, kernelFinished);
854        b->SetInsertPoint(hasSufficientInput);
855    }
856}
857
858/** ------------------------------------------------------------------------------------------------------------- *
859 * @brief checkAvailableOutputSpace
860 ** ------------------------------------------------------------------------------------------------------------- */
861void PipelineGenerator::checkAvailableOutputSpace(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
862    const Kernel * const kernel = kernels[index];
863    b->setKernel(kernel);
864    for (auto e : make_iterator_range(in_edges(index, outputGraph))) {
865        const Channel & c = outputGraph[e];
866        assert (c.buffer->getProducer() == kernel);
867        const Binding & output = kernel->getStreamOutput(c.buffer);
868        Value * requiredSpace = getStrideLength(b, kernel, output);
869        const auto & name = output.getName();
870        Value * const produced = b->getNonDeferredProducedItemCount(output);
871        Value * const consumed = b->getConsumedItemCount(name);
872        Value * const unconsumed = b->CreateSub(produced, consumed);
873        requiredSpace = b->CreateAdd(requiredSpace, unconsumed);
874        Value * const capacity = b->getCapacity(name);
875        Value * const check = b->CreateICmpULE(requiredSpace, capacity);
876        terminated->addIncoming(b->getFalse(), b->GetInsertBlock());
877        if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
878            madeProgress->addIncoming(anyProgress, b->GetInsertBlock());
879        }
880        const auto prefix = kernel->getName() + "_" + name;
881        BasicBlock * const hasOutputSpace = b->CreateBasicBlock(prefix + "_hasOutputSpace");
882        b->CreateLikelyCondBr(check, hasOutputSpace, kernelFinished);
883        b->SetInsertPoint(hasOutputSpace);
884    }
885}
886
887/** ------------------------------------------------------------------------------------------------------------- *
888 * @brief getStrideLength
889 ** ------------------------------------------------------------------------------------------------------------- */
890Value * PipelineGenerator::getStrideLength(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel, const Binding & binding) {
891    Value * strideLength = nullptr;
892    const ProcessingRate & rate = binding.getRate();
893    if (rate.isPopCount() || rate.isNegatedPopCount()) {
894        Port refPort; unsigned refIndex;
895        std::tie(refPort, refIndex) = kernel->getStreamPort(rate.getReference());
896        const Binding & ref = kernel->getStreamInput(refIndex);
897        Value * markers = b->loadInputStreamBlock(ref.getName(), b->getSize(0));
898        if (rate.isNegatedPopCount()) {
899            markers = b->CreateNot(markers);
900        }
901        strideLength = b->bitblock_popcount(markers);
902    } else if (binding.hasAttribute(kernel::Attribute::KindId::AlwaysConsume)) {
903        const auto lb = kernel->getLowerBound(rate);
904        strideLength = b->getSize(std::max(ceiling(lb * kernel->getStride()), 1U));
905    } else {
906        const auto ub = kernel->getUpperBound(rate); assert (ub > 0);
907        strideLength = b->getSize(ceiling(ub * kernel->getStride()));
908    }
909    return strideLength;
910}
911
912/** ------------------------------------------------------------------------------------------------------------- *
913 * @brief callKernel
914 ** ------------------------------------------------------------------------------------------------------------- */
915Value * PipelineGenerator::callKernel(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
916
917    const Kernel * const kernel = kernels[index];
918    b->setKernel(kernel);
919
920    checkIfAllInputKernelsHaveFinished(b, index);
921
922    checkAvailableInputData(b, index);
923
924    checkAvailableOutputSpace(b, index);
925
926    applyOutputBufferExpansions(b, kernel);
927
928    #ifndef DISABLE_COPY_TO_OVERFLOW
929    // Store how many items we produced by this kernel in the prior iteration. We'll use this to determine when
930    // to mirror the first K segments
931    const auto & outputs = kernel->getStreamOutputs();
932    const auto m = outputs.size();
933
934    std::vector<Value *> initiallyProducedItemCount(m, nullptr);
935    for (unsigned i = 0; i < m; ++i) {
936        const Binding & output = outputs[i];
937        const auto & name = output.getName();
938        const StreamSetBuffer * const buffer = kernel->getOutputStreamSetBuffer(name);
939        if (isConsumedAtNonFixedRate.count(buffer)) {
940            initiallyProducedItemCount[i] = b->getProducedItemCount(name);
941        }
942    }
943    #endif
944
945    runKernel(b, kernel);
946
947    #ifndef DISABLE_COPY_TO_OVERFLOW
948    // For each buffer with an overflow region of K blocks, overwrite the overflow with the first K blocks of
949    // data to ensure that even if this stream is produced at a fixed rate but consumed at a bounded rate,
950    // every kernel has a consistent view of the stream data.
951    for (unsigned i = 0; i < m; ++i) {
952        const Binding & output = outputs[i];
953        const auto & name = output.getName();
954        if (initiallyProducedItemCount[i]) {
955            Value * const bufferSize = b->getCapacity(name);
956            Value * const prior = initiallyProducedItemCount[i];
957            Value * const offset = b->CreateURem(prior, bufferSize);
958            Value * const produced = b->getNonDeferredProducedItemCount(output);
959            Value * const buffered = b->CreateAdd(offset, b->CreateSub(produced, prior));
960            BasicBlock * const copyBack = b->CreateBasicBlock(name + "MirrorOverflow");
961            BasicBlock * const done = b->CreateBasicBlock(name + "MirrorOverflowDone");
962            b->CreateCondBr(b->CreateICmpUGT(buffered, bufferSize), copyBack, done);
963            b->SetInsertPoint(copyBack);
964            b->CreateCopyToOverflow(name);
965            b->CreateBr(done);
966            b->SetInsertPoint(done);
967        }
968    }
969    #endif
970
971    return b->getTerminationSignal();
972}
973
974/** ------------------------------------------------------------------------------------------------------------- *
975 * @brief runKernel
976 ** ------------------------------------------------------------------------------------------------------------- */
977void PipelineGenerator::runKernel(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel) {
978
979    const auto & inputs = kernel->getStreamInputs();
980    const auto n = inputs.size();
981    std::vector<Value *> arguments(n + 2);
982
983    Value * isFinal = noMore;
984
985    for (unsigned i = 0; i < n; ++i) {
986        const Binding & input = inputs[i];
987        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
988
989        const auto p = producedItemCount.find(buffer);
990        assert (p != producedItemCount.end());
991        Value * const produced = p->second;
992
993        const ProcessingRate & rate = input.getRate();
994        if (rate.isPopCount()) {
995            arguments[i + 2] = produced;
996        } else {
997            const unsigned strideSize = ceiling(kernel->getUpperBound(rate) * kernel->getStride());
998            Value * const processed = b->getNonDeferredProcessedItemCount(input);
999            Value * const limit = b->CreateAdd(processed, b->getSize(strideSize * codegen::SegmentSize));
1000            Value * const hasPartial = b->CreateICmpULT(produced, limit);
1001            arguments[i + 2] = b->CreateSelect(hasPartial, produced, limit);
1002            isFinal = b->CreateAnd(isFinal, hasPartial);
1003        }
1004    }
1005
1006    // TODO: pass in a strideNo for fixed rate streams to allow the kernel to calculate the current avail,
1007    // processed, and produced counts
1008
1009    arguments[0] = kernel->getInstance();
1010    arguments[1] = isFinal;
1011
1012    b->createDoSegmentCall(arguments);
1013}
1014
1015
1016/** ------------------------------------------------------------------------------------------------------------- *
1017 * @brief applyOutputBufferExpansions
1018 ** ------------------------------------------------------------------------------------------------------------- */
1019void PipelineGenerator::applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel) {
1020    const auto & outputs = kernel->getStreamSetOutputBuffers();
1021    for (unsigned i = 0; i < outputs.size(); i++) {
1022        if (isa<DynamicBuffer>(outputs[i])) {
1023            const auto & output = kernel->getStreamOutput(i);
1024            const auto baseSize = ceiling(kernel->getUpperBound(output.getRate()) * kernel->getStride() * codegen::SegmentSize);
1025            if (LLVM_LIKELY(baseSize > 0)) {
1026                const auto & name = output.getName();
1027                Value * const produced = b->getProducedItemCount(name);
1028                Value * const consumed = b->getConsumedItemCount(name);
1029                Value * const unconsumed = b->CreateSub(produced, consumed);
1030                Value * const required = b->CreateAdd(unconsumed, b->getSize(2 * baseSize));
1031                b->setCapacity(name, required);
1032            }
1033        }
1034    }
1035}
1036
1037/** ------------------------------------------------------------------------------------------------------------- *
1038 * @brief getFullyProcessedItemCount
1039 ** ------------------------------------------------------------------------------------------------------------- */
1040inline Value * PipelineGenerator::getFullyProcessedItemCount(const std::unique_ptr<KernelBuilder> & b, const Binding & input, Value * const isFinal) const {
1041    Value * const processed = b->getProcessedItemCount(input.getName());
1042    if (LLVM_UNLIKELY(input.hasAttribute(kernel::Attribute::KindId::BlockSize))) {
1043        // If the input rate has a block size attribute then --- for the purpose of determining how many
1044        // items have been consumed --- we consider a stream set to be fully processed when an entire
1045        // block (stride?) has been processed.
1046        Constant * const BLOCK_WIDTH = b->getSize(b->getBitBlockWidth());
1047        Value * const partial = b->CreateAnd(processed, ConstantExpr::getNeg(BLOCK_WIDTH));
1048        return b->CreateSelect(isFinal, processed, partial);
1049    }
1050    return processed;
1051}
1052
1053/** ------------------------------------------------------------------------------------------------------------- *
1054 * @brief finalize
1055 ** ------------------------------------------------------------------------------------------------------------- */
1056Value * PipelineGenerator::finalize(const std::unique_ptr<KernelBuilder> & b) {
1057    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
1058        ConstantInt * const ZERO = b->getSize(0);
1059        ConstantInt * const ONE = b->getSize(1);
1060        ConstantInt * const TWO = b->getSize(2);
1061        Value * const count = b->CreateSelect(anyProgress, ZERO, b->CreateAdd(deadLockCounter, ONE));
1062        b->CreateAssert(b->CreateICmpNE(count, TWO), "Dead lock detected: pipeline could not progress after two iterations");
1063        deadLockCounter->addIncoming(count, b->GetInsertBlock());
1064    }
1065    // return whether each sink has terminated
1066    Value * final = b->getTrue();
1067    for (const auto u : make_iterator_range(vertices(dependencyGraph))) {
1068        if (out_degree(u, dependencyGraph) == 0) {
1069            final = b->CreateAnd(final, dependencyGraph[u]);
1070        }
1071    }
1072    return final;
1073}
1074
1075/** ------------------------------------------------------------------------------------------------------------- *
1076 * @brief printGraph
1077 ** ------------------------------------------------------------------------------------------------------------- */
1078void PipelineGenerator::printGraph(const ChannelGraph & G) const {
1079
1080    auto & out = errs();
1081
1082    out << "digraph G {\n";
1083    for (auto u : make_iterator_range(vertices(G))) {
1084        assert (G[u] == kernels[u]);
1085        if (in_degree(u, G) > 0 || out_degree(u, G) > 0) {
1086            out << "v" << u << " [label=\"" << u << ": "
1087                << G[u]->getName() << "\"];\n";
1088        }
1089    }
1090
1091    for (auto e : make_iterator_range(edges(G))) {
1092        const Channel & c = G[e];
1093        const auto s = source(e, G);
1094        const auto t = target(e, G);
1095
1096        out << "v" << s << " -> v" << t
1097            << " [label=\""
1098
1099
1100
1101            << c.ratio.numerator() << " / " << c.ratio.denominator()
1102            << "\"];\n";
1103    }
1104
1105    out << "}\n\n";
1106    out.flush();
1107
1108}
1109
1110/** ------------------------------------------------------------------------------------------------------------- *
1111 * @brief printGraph
1112 ** ------------------------------------------------------------------------------------------------------------- */
1113void PipelineGenerator::printGraph(const DependencyGraph & G) const {
1114
1115    auto & out = errs();
1116
1117    out << "digraph G {\n";
1118    for (auto u : make_iterator_range(vertices(G))) {
1119            out << "v" << u << " [label=\"" << u << ": "
1120                << kernels[u]->getName() << "\"];\n";
1121    }
1122
1123    for (auto e : make_iterator_range(edges(G))) {
1124        const auto s = source(e, G);
1125        const auto t = target(e, G);
1126        out << "v" << s << " -> v" << t << ";\n";
1127    }
1128
1129    out << "}\n\n";
1130    out.flush();
1131
1132}
Note: See TracBrowser for help on using the repository browser.