source: icGREP/icgrep-devel/icgrep/toolchain/pipeline.cpp @ 6165

Last change on this file since 6165 was 6165, checked in by xwa163, 12 months ago
  1. Small optimization for U8NonFinal Stream in UTF-8 LZ4 Grep
  2. Print total CPU cycles in CPU Counter
File size: 50.2 KB
Line 
1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
6#include "pipeline.h"
7#include <toolchain/toolchain.h>
8#include <kernels/kernel.h>
9#include <kernels/streamset.h>
10#include <llvm/IR/Module.h>
11#include <boost/container/flat_set.hpp>
12#include <boost/container/flat_map.hpp>
13#include <boost/graph/adjacency_list.hpp>
14#include <boost/range/adaptor/reversed.hpp>
15#include <kernels/kernel_builder.h>
16#include <llvm/Support/raw_ostream.h>
17
18using namespace kernel;
19using namespace parabix;
20using namespace llvm;
21using namespace boost;
22using namespace boost::container;
23
24#define DISABLE_COPY_TO_OVERFLOW
25
26using Port = Kernel::Port;
27
28Function * makeThreadFunction(const std::unique_ptr<kernel::KernelBuilder> & b, const std::string & name) {
29    Function * const f = Function::Create(FunctionType::get(b->getVoidTy(), {b->getVoidPtrTy()}, false), Function::InternalLinkage, name, b->getModule());
30    f->setCallingConv(CallingConv::C);
31    f->arg_begin()->setName("state");
32    return f;
33}
34
35class PipelineGenerator {
36public:
37
38    template <typename Value>
39    using StreamSetBufferMap = flat_map<const StreamSetBuffer *, Value>;
40
41    using RateValue = ProcessingRate::RateValue;
42
43    PipelineGenerator(const std::vector<Kernel *> & kernels)
44    : kernels(kernels)
45    , terminated(nullptr)
46    , noMore(nullptr)
47    , deadLockCounter(nullptr)
48    , anyProgress(nullptr)
49    , madeProgress(nullptr) {
50
51    }
52
53    struct Channel {
54        Channel() = default;
55        Channel(const RateValue & ratio, const StreamSetBuffer * const buffer = nullptr, const unsigned operand = 0)
56        : ratio(ratio), buffer(buffer), operand(operand) { }
57
58        RateValue               ratio;
59        const StreamSetBuffer * buffer;
60        unsigned                operand;
61    };
62
63    using ChannelGraph = adjacency_list<vecS, vecS, bidirectionalS, const Kernel *, Channel>;
64
65    using DependencyGraph = adjacency_list<hash_setS, vecS, bidirectionalS, Value *>;
66
67    using KernelMap = flat_map<const Kernel *, unsigned>;
68
69    void initialize(const std::unique_ptr<KernelBuilder> & b, BasicBlock * const entryBlock);
70
71    void execute(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
72
73    Value * finalize(const std::unique_ptr<KernelBuilder> & b);
74
75protected:
76
77    ChannelGraph makeInputGraph() const;
78
79    ChannelGraph makeOutputGraph() const;
80
81    DependencyGraph makeDependencyGraph() const;
82
83    template<class VertexList>
84    ChannelGraph pruneGraph(ChannelGraph && G, VertexList && V) const;
85
86    void checkIfAllInputKernelsHaveFinished(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
87
88    void checkAvailableInputData(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
89
90    void checkAvailableOutputSpace(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
91
92    Value * getStrideLength(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel, const Binding & binding);
93
94    Value * callKernel(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
95
96    void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel *kernel);
97
98    void runKernel(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel);
99
100    Value * getFullyProcessedItemCount(const std::unique_ptr<KernelBuilder> & b, const Binding & binding, Value * const final) const;
101
102    void printGraph(const ChannelGraph & G) const;
103
104    void printGraph(const DependencyGraph & G) const;
105
106private:
107
108    const std::vector<Kernel *> &       kernels;
109    PHINode *                           terminated;
110
111    Value *                             noMore;
112
113    DependencyGraph                     dependencyGraph;
114    ChannelGraph                        inputGraph;
115    ChannelGraph                        outputGraph;
116
117    BasicBlock *                        kernelFinished;
118
119    PHINode *                           deadLockCounter;
120    Value *                             anyProgress;
121    PHINode *                           madeProgress;
122
123    StreamSetBufferMap<Value *>         producedItemCount;
124    StreamSetBufferMap<Value *>         consumedItemCount;
125    StreamSetBufferMap<const Kernel *>  lastConsumer;
126    flat_set<const StreamSetBuffer *>   isConsumedAtNonFixedRate;
127};
128
129/** ------------------------------------------------------------------------------------------------------------- *
130 * @brief generateSegmentParallelPipeline
131 *
132 * Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
133 * operating over an input stream set S, a segment-parallel implementation divides the input
134 * into segments and coordinates a set of T <= K threads to each process one segment at a time.
135 * Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
136 * fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
137 ** ------------------------------------------------------------------------------------------------------------- */
138void generateSegmentParallelPipeline(const std::unique_ptr<KernelBuilder> & b, const std::vector<Kernel *> & kernels) {
139
140    const unsigned n = kernels.size();
141    Module * const m = b->getModule();
142    IntegerType * const sizeTy = b->getSizeTy();
143    PointerType * const voidPtrTy = b->getVoidPtrTy();
144    Constant * nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
145    std::vector<Type *> structTypes;
146    codegen::BufferSegments = std::max(codegen::BufferSegments, codegen::ThreadNum);
147
148    Value * instance[n];
149    for (unsigned i = 0; i < n; ++i) {
150        instance[i] = kernels[i]->getInstance();
151        structTypes.push_back(instance[i]->getType());
152    }
153    StructType * const sharedStructType = StructType::get(m->getContext(), structTypes);
154    StructType * const threadStructType = StructType::get(m->getContext(), {sharedStructType->getPointerTo(), sizeTy});
155
156    const auto ip = b->saveIP();
157
158    Function * const threadFunc = makeThreadFunction(b, "segment");
159    auto args = threadFunc->arg_begin();
160
161    // -------------------------------------------------------------------------------------------------------------------------
162    // MAKE SEGMENT PARALLEL PIPELINE THREAD
163    // -------------------------------------------------------------------------------------------------------------------------
164
165     // Create the basic blocks for the thread function.
166    BasicBlock * entryBlock = BasicBlock::Create(b->getContext(), "entry", threadFunc);
167    b->SetInsertPoint(entryBlock);
168
169    Value * const threadStruct = b->CreateBitCast(&*(args), threadStructType->getPointerTo());
170
171    Value * const sharedStatePtr = b->CreateLoad(b->CreateGEP(threadStruct, {b->getInt32(0), b->getInt32(0)}));
172    for (unsigned k = 0; k < n; ++k) {
173        Value * ptr = b->CreateLoad(b->CreateGEP(sharedStatePtr, {b->getInt32(0), b->getInt32(k)}));
174        kernels[k]->setInstance(ptr);
175    }
176    Value * const segOffset = b->CreateLoad(b->CreateGEP(threadStruct, {b->getInt32(0), b->getInt32(1)}));
177
178    PipelineGenerator G(kernels);
179
180    BasicBlock * const segmentLoop = b->CreateBasicBlock("segmentLoop");
181    b->CreateBr(segmentLoop);
182
183    b->SetInsertPoint(segmentLoop);   
184    PHINode * const segNo = b->CreatePHI(b->getSizeTy(), 2, "segNo");
185    segNo->addIncoming(segOffset, entryBlock);
186    G.initialize(b, entryBlock);
187
188    Value * cycleCountStart = nullptr;
189    Value * cycleCountEnd = nullptr;
190    if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
191        cycleCountStart = b->CreateReadCycleCounter();
192    }
193
194    const bool serialize = codegen::DebugOptionIsSet(codegen::SerializeThreads);
195
196    for (unsigned k = 0; k < n; ++k) {
197
198        const Kernel * const kernel = kernels[k];
199
200        BasicBlock * const kernelWait = b->CreateBasicBlock(kernel->getName() + "Wait");
201        b->CreateBr(kernelWait);
202
203        b->SetInsertPoint(kernelWait);
204        b->setKernel(kernels[serialize ? (n - 1) : k]);
205        Value * const processedSegmentCount = b->acquireLogicalSegmentNo();
206        assert (processedSegmentCount->getType() == segNo->getType());
207        Value * const ready = b->CreateICmpEQ(segNo, processedSegmentCount);
208
209        BasicBlock * const kernelCheck = b->CreateBasicBlock(kernel->getName() + "Check");
210        b->CreateCondBr(ready, kernelCheck, kernelWait);
211
212        b->SetInsertPoint(kernelCheck);
213
214        G.execute(b, k);
215
216        b->releaseLogicalSegmentNo(b->CreateAdd(segNo, b->getSize(1)));
217
218        if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
219            cycleCountEnd = b->CreateReadCycleCounter();
220            Value * counterPtr = b->getCycleCountPtr();
221            b->CreateStore(b->CreateAdd(b->CreateLoad(counterPtr), b->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
222            cycleCountStart = cycleCountEnd;
223        }
224
225    }
226
227    segNo->addIncoming(b->CreateAdd(segNo, b->getSize(codegen::ThreadNum)), b->GetInsertBlock());
228    Value * const finished = G.finalize(b);
229    BasicBlock * const segmentExit = b->CreateBasicBlock("segmentExit");
230    b->CreateUnlikelyCondBr(finished, segmentExit, segmentLoop);
231
232    b->SetInsertPoint(segmentExit);
233    // only call pthread_exit() within spawned threads; otherwise it'll be equivalent to calling exit() within the process
234    BasicBlock * const exitThread = b->CreateBasicBlock("ExitThread");
235    BasicBlock * const exitFunction = b->CreateBasicBlock("ExitProcessFunction");   
236    b->CreateCondBr(b->CreateIsNull(segOffset), exitFunction, exitThread);
237    b->SetInsertPoint(exitThread);
238    b->CreatePThreadExitCall(nullVoidPtrVal);
239    b->CreateBr(exitFunction);   
240    b->SetInsertPoint(exitFunction);
241    b->CreateRetVoid();
242
243    // -------------------------------------------------------------------------------------------------------------------------
244    b->restoreIP(ip);
245
246    for (unsigned i = 0; i < n; ++i) {
247        kernels[i]->setInstance(instance[i]);
248    }
249
250    // -------------------------------------------------------------------------------------------------------------------------
251    // MAKE SEGMENT PARALLEL PIPELINE DRIVER
252    // -------------------------------------------------------------------------------------------------------------------------
253    const unsigned threads = codegen::ThreadNum - 1;
254    assert (codegen::ThreadNum > 0);
255    Type * const pthreadsTy = ArrayType::get(sizeTy, threads);
256    AllocaInst * const pthreads = b->CreateAlloca(pthreadsTy);
257    Value * threadIdPtr[threads];
258
259    for (unsigned i = 0; i < threads; ++i) {
260        threadIdPtr[i] = b->CreateGEP(pthreads, {b->getInt32(0), b->getInt32(i)});
261    }
262
263    for (unsigned i = 0; i < n; ++i) {
264        b->setKernel(kernels[i]);
265        b->releaseLogicalSegmentNo(b->getSize(0));
266    }
267
268    AllocaInst * const sharedStruct = b->CreateCacheAlignedAlloca(sharedStructType);
269    for (unsigned i = 0; i < n; ++i) {
270        Value * ptr = b->CreateGEP(sharedStruct, {b->getInt32(0), b->getInt32(i)});
271        b->CreateStore(kernels[i]->getInstance(), ptr);
272    }
273
274    // use the process thread to handle the initial segment function after spawning (n - 1) threads to handle the subsequent offsets
275    for (unsigned i = 0; i < threads; ++i) {
276        AllocaInst * const threadState = b->CreateAlloca(threadStructType);
277        b->CreateStore(sharedStruct, b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(0)}));
278        b->CreateStore(b->getSize(i + 1), b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(1)}));
279        b->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, threadFunc, threadState);
280    }
281
282    AllocaInst * const threadState = b->CreateAlloca(threadStructType);
283    b->CreateStore(sharedStruct, b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(0)}));
284    b->CreateStore(b->getSize(0), b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(1)}));
285    b->CreateCall(threadFunc, b->CreatePointerCast(threadState, voidPtrTy));
286
287    AllocaInst * const status = b->CreateAlloca(voidPtrTy);
288    for (unsigned i = 0; i < threads; ++i) {
289        Value * threadId = b->CreateLoad(threadIdPtr[i]);
290        b->CreatePThreadJoinCall(threadId, status);
291    }
292
293    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
294        Value* FP_100 = ConstantFP::get(b->getDoubleTy(), 100.0);
295        Value* totalCycles = b->getSize(0);
296        for (const Kernel * kernel : kernels) {
297            b->setKernel(kernel);
298            Value * cycles = b->CreateLoad(b->getCycleCountPtr());
299            totalCycles = b->CreateAdd(totalCycles, cycles);
300        }
301        Value* fTotalCycle = b->CreateUIToFP(totalCycles, b->getDoubleTy());
302
303        for (const Kernel * kernel : kernels) {
304            b->setKernel(kernel);
305            const auto & inputs = kernel->getStreamInputs();
306            const auto & outputs = kernel->getStreamOutputs();
307            Value * items = nullptr;
308            if (inputs.empty()) {
309                items = b->getProducedItemCount(outputs[0].getName());
310            } else {
311                items = b->getProcessedItemCount(inputs[0].getName());
312            }
313            Value * fItems = b->CreateUIToFP(items, b->getDoubleTy());
314            Value * cycles = b->CreateLoad(b->getCycleCountPtr());
315            Value * fCycles = b->CreateUIToFP(cycles, b->getDoubleTy());
316            Value * percentage = b->CreateFDiv(b->CreateFMul(fCycles, FP_100), fTotalCycle);
317
318            const auto formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item,  %2.2f%% of Total CPU Cycles. \n ";
319            Value * stringPtr = b->CreatePointerCast(b->GetString(formatString), b->getInt8PtrTy());
320            b->CreateCall(b->GetDprintf(), {b->getInt32(2), stringPtr, fItems, fCycles, b->CreateFDiv(fCycles, fItems), percentage});
321        }
322        const auto formatString = "Total CPU Cycles: %7.2e\n ";
323        Value * stringPtr = b->CreatePointerCast(b->GetString(formatString), b->getInt8PtrTy());
324        b->CreateCall(b->GetDprintf(), {b->getInt32(2), stringPtr, fTotalCycle});
325    }
326   
327}
328
329
330/** ------------------------------------------------------------------------------------------------------------- *
331 * @brief generatePipelineLoop
332 ** ------------------------------------------------------------------------------------------------------------- */
333void generatePipelineLoop(const std::unique_ptr<KernelBuilder> & b, const std::vector<Kernel *> & kernels) {
334
335    // Create the basic blocks for the loop.
336    BasicBlock * const entryBlock = b->GetInsertBlock();
337    BasicBlock * const pipelineLoop = b->CreateBasicBlock("pipelineLoop");
338
339    PipelineGenerator G(kernels);
340
341    b->CreateBr(pipelineLoop);
342
343    b->SetInsertPoint(pipelineLoop);   
344    PHINode * const segNo = b->CreatePHI(b->getSizeTy(), 2, "segNo");
345    segNo->addIncoming(b->getSize(0), entryBlock);
346    G.initialize(b, entryBlock);
347
348    Value * const nextSegNo = b->CreateAdd(segNo, b->getSize(1));
349
350    Value * cycleCountStart = nullptr;
351    Value * cycleCountEnd = nullptr;
352    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
353        cycleCountStart = b->CreateReadCycleCounter();
354    }
355
356    for (unsigned i = 0; i < kernels.size(); ++i) {
357
358        G.execute(b, i);
359
360        b->releaseLogicalSegmentNo(nextSegNo);
361
362        if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
363            cycleCountEnd = b->CreateReadCycleCounter();
364            Value * counterPtr = b->getCycleCountPtr();
365            b->CreateStore(b->CreateAdd(b->CreateLoad(counterPtr), b->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
366            cycleCountStart = cycleCountEnd;
367        }
368    }
369
370    segNo->addIncoming(nextSegNo, b->GetInsertBlock());
371    BasicBlock * const pipelineExit = b->CreateBasicBlock("pipelineExit");
372    Value * const finished = G.finalize(b);
373    b->CreateCondBr(finished, pipelineExit, pipelineLoop);
374
375    b->SetInsertPoint(pipelineExit);
376
377    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
378        Value* FP_100 = ConstantFP::get(b->getDoubleTy(), 100.0);
379        Value* totalCycles = b->getSize(0);
380        for (const Kernel * kernel : kernels) {
381            b->setKernel(kernel);
382            Value * cycles = b->CreateLoad(b->getCycleCountPtr());
383            totalCycles = b->CreateAdd(totalCycles, cycles);
384        }
385        Value* fTotalCycle = b->CreateUIToFP(totalCycles, b->getDoubleTy());
386
387        for (const Kernel * kernel : kernels) {
388            b->setKernel(kernel);
389            const auto & inputs = kernel->getStreamInputs();
390            const auto & outputs = kernel->getStreamOutputs();
391            Value * items = nullptr;
392            if (inputs.empty()) {
393                items = b->getProducedItemCount(outputs[0].getName());
394            } else {
395                items = b->getProcessedItemCount(inputs[0].getName());
396            }
397            Value * fItems = b->CreateUIToFP(items, b->getDoubleTy());
398            Value * cycles = b->CreateLoad(b->getCycleCountPtr());
399            Value * fCycles = b->CreateUIToFP(cycles, b->getDoubleTy());
400            Value * percentage = b->CreateFDiv(b->CreateFMul(fCycles, FP_100), fTotalCycle);
401
402            const auto formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item,  %2.2f%% of Total CPU Cycles. \n ";
403            Value * stringPtr = b->CreatePointerCast(b->GetString(formatString), b->getInt8PtrTy());
404            b->CreateCall(b->GetDprintf(), {b->getInt32(2), stringPtr, fItems, fCycles, b->CreateFDiv(fCycles, fItems), percentage});
405        }
406
407        const auto formatString = "Total CPU Cycles: %7.2e\n ";
408        Value * stringPtr = b->CreatePointerCast(b->GetString(formatString), b->getInt8PtrTy());
409        b->CreateCall(b->GetDprintf(), {b->getInt32(2), stringPtr, fTotalCycle});
410    }
411
412}
413
414/** ------------------------------------------------------------------------------------------------------------- *
415 * @brief initialize
416 ** ------------------------------------------------------------------------------------------------------------- */
417void PipelineGenerator::initialize(const std::unique_ptr<KernelBuilder> & b, BasicBlock * const entryBlock) {
418
419    dependencyGraph = makeDependencyGraph();
420    inputGraph = makeInputGraph();
421    outputGraph = makeOutputGraph();
422
423    for (Kernel * const kernel : kernels) {
424        const auto & inputs = kernel->getStreamInputs();
425        for (unsigned i = 0; i < inputs.size(); ++i) {
426            const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
427            if (kernel->requiresCopyBack(inputs[i]) && !buffer->isUnbounded()) {
428                if (LLVM_LIKELY(buffer->supportsCopyBack())) {
429                    isConsumedAtNonFixedRate.insert(buffer);
430                } else {
431    //                std::string tmp;
432    //                raw_string_ostream out(tmp);
433    //                out << kernel->getName() << " : " << name << " must have an overflow";
434    //                report_fatal_error(out.str());
435                }
436            }
437        }
438        // if this kernel consumes this buffer, update the last consumer
439        for (const StreamSetBuffer * const buffer : kernel->getStreamSetInputBuffers()) {
440            auto f = lastConsumer.find(buffer);
441            assert (f != lastConsumer.end());
442            f->second = kernel;
443        }
444        // incase some output is never consumed, make the kernel that produced it the initial "last consumer"
445        for (const StreamSetBuffer * const buffer : kernel->getStreamSetOutputBuffers()) {
446            assert (buffer->getProducer() == kernel);
447            assert (lastConsumer.count(buffer) == 0);
448            lastConsumer.emplace(buffer, kernel);
449        }
450    }
451
452    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
453        deadLockCounter = b->CreatePHI(b->getSizeTy(), 2, "deadLockCounter");
454        deadLockCounter->addIncoming(b->getSize(0), entryBlock);
455        anyProgress = b->getFalse();
456    }
457
458}
459
460/** ------------------------------------------------------------------------------------------------------------- *
461 * @brief makeTerminationGraph
462 *
463 * The input graph models whether a kernel could *consume* more data than may be produced by a preceeding kernel.
464 ** ------------------------------------------------------------------------------------------------------------- */
465PipelineGenerator::DependencyGraph PipelineGenerator::makeDependencyGraph() const {
466    const auto n = kernels.size();
467    DependencyGraph G(n);
468    KernelMap M;
469    // construct a kernel dependency graph
470    for (unsigned v = 0; v < n; ++v) {
471        const Kernel * const kernel = kernels[v];       
472        for (const StreamSetBuffer * buf : kernel->getStreamSetInputBuffers()) {
473            const auto f = M.find(buf->getProducer()); assert (f != M.end());
474            add_edge(f->second, v, G);
475        }
476        M.emplace(kernel, v);
477    }
478    // generate a transitive closure
479    for (unsigned u = 0; u < n; ++u) {
480        for (auto e : make_iterator_range(in_edges(u, G))) {
481            const auto s = source(e, G);
482            for (auto f : make_iterator_range(out_edges(u, G))) {
483                add_edge(s, target(f, G), G);
484            }
485        }
486    }
487    // then take the transitive reduction
488    std::vector<unsigned> sources;
489    for (unsigned u = n; u-- > 0; ) {
490        if (in_degree(u, G) > 0 && out_degree(u, G) > 0) {
491            for (auto e : make_iterator_range(in_edges(u, G))) {
492                sources.push_back(source(e, G));
493            }
494            std::sort(sources.begin(), sources.end());
495            for (auto e : make_iterator_range(out_edges(u, G))) {
496                remove_in_edge_if(target(e, G), [&G, &sources](const DependencyGraph::edge_descriptor f) {
497                    return std::binary_search(sources.begin(), sources.end(), source(f, G));
498                }, G);
499            }
500            sources.clear();
501        }
502    }
503    return G;
504}
505
506/** ------------------------------------------------------------------------------------------------------------- *
507 * @brief makeInputGraph
508 *
509 * The input graph models whether a kernel could *consume* more data than may be produced by a preceeding kernel.
510 ** ------------------------------------------------------------------------------------------------------------- */
511PipelineGenerator::ChannelGraph PipelineGenerator::makeInputGraph() const {
512    const auto n = kernels.size();
513    ChannelGraph G(n);
514    KernelMap M;
515    for (unsigned v = 0; v < n; ++v) {
516
517        const Kernel * const consumer = kernels[v];
518        G[v] = consumer;
519        M.emplace(consumer, v);
520
521        const auto & inputs = consumer->getStreamInputs();
522        for (unsigned i = 0; i < inputs.size(); ++i) {
523
524            const Binding & input = inputs[i];
525            auto ub_in = consumer->getUpperBound(input.getRate()) * consumer->getStride() * codegen::SegmentSize; assert (ub_in > 0);
526            if (input.hasLookahead()) {
527                ub_in += input.getLookahead();
528            }
529
530            const auto buffer = consumer->getStreamSetInputBuffer(i);
531            const Kernel * const producer = buffer->getProducer();
532            const Binding & output = producer->getStreamOutput(buffer);
533            const auto lb_out = producer->getLowerBound(output.getRate()) * producer->getStride() * codegen::SegmentSize;
534
535            const auto min_oi_ratio = lb_out / ub_in;
536            const auto f = M.find(producer); assert (f != M.end());
537            const auto u = f->second;
538            // If we have multiple inputs from the same kernel, we only need to consider the "slowest" one
539            bool slowest = true;
540            for (const auto e : make_iterator_range(in_edges(v, G))) {
541                if (source(e, G) == u) {
542                    const Channel & p = G[e];
543                    if (min_oi_ratio > p.ratio) {
544                        slowest = false;
545                    } else if (min_oi_ratio < p.ratio) {
546                        clear_in_edges(v, G);
547                    }
548                    break;
549                }
550            }
551            if (slowest) {
552                add_edge(u, v, Channel{min_oi_ratio, buffer, i}, G);
553            }
554        }
555    }
556
557    return pruneGraph(std::move(G), make_iterator_range(vertices(G)));
558}
559
560/** ------------------------------------------------------------------------------------------------------------- *
561 * @brief makeOutputGraph
562 *
563 * The output graph models whether a kernel could *produce* more data than may be consumed by a subsequent kernel.
564 ** ------------------------------------------------------------------------------------------------------------- */
565PipelineGenerator::ChannelGraph PipelineGenerator::makeOutputGraph() const {
566    const auto n = kernels.size();
567    ChannelGraph G(n);
568    KernelMap M;
569    for (unsigned v = 0; v < n; ++v) {
570        const Kernel * const consumer = kernels[v];
571        G[v] = consumer;
572        M.emplace(consumer, v);
573
574        const auto & inputs = consumer->getStreamInputs();
575        for (unsigned i = 0; i < inputs.size(); ++i) {
576            const auto buffer = consumer->getStreamSetInputBuffer(i);
577            if (isa<ExternalBuffer>(buffer)) continue;
578            const Kernel * const producer = buffer->getProducer();
579            assert (consumer != producer);
580            const Binding & output = producer->getStreamOutput(buffer);
581            auto ub_out = producer->getUpperBound(output.getRate()) * producer->getStride() * codegen::SegmentSize;
582            if (ub_out > 0) { // unknown output rates are handled by reallocating their buffers
583                const Binding & input = inputs[i];
584                if (input.hasLookahead()) {
585                    const auto la = input.getLookahead();
586                    if (LLVM_UNLIKELY(ub_out <= la)) {
587                        llvm::report_fatal_error("lookahead exceeds segment size");
588                    }
589                    ub_out += la;
590                }
591                const auto lb_in = consumer->getLowerBound(input.getRate()) * consumer->getStride() * codegen::SegmentSize;
592                const auto min_io_ratio = lb_in / ub_out;
593                const auto f = M.find(producer); assert (f != M.end());
594                const auto u = f->second;
595                assert (v != u);
596                assert (G[u] == producer);
597                // If we have multiple inputs from the same kernel, we only need to consider the "fastest" one
598                bool fastest = true;
599                for (const auto e : make_iterator_range(in_edges(v, G))) {
600                    if (source(e, G) == u) {
601                        const Channel & p = G[e];
602                        if (min_io_ratio > p.ratio) {
603                            fastest = false;
604                        } else if (min_io_ratio < p.ratio) {
605                            clear_in_edges(v, G);
606                        }
607                        break;
608                    }
609                }
610                if (fastest) {
611                    add_edge(v, u, Channel{min_io_ratio, buffer, i}, G);
612                }
613            }
614        }
615    }
616
617    return pruneGraph(std::move(G), boost::adaptors::reverse(make_iterator_range(vertices(G))));
618}
619
620/** ------------------------------------------------------------------------------------------------------------- *
621 * @brief pruneGraph
622 ** ------------------------------------------------------------------------------------------------------------- */
623template<class VertexList>
624inline PipelineGenerator::ChannelGraph PipelineGenerator::pruneGraph(ChannelGraph && G, VertexList && V) const {
625    // Take a transitive closure of G but whenever we attempt to insert an edge into the closure
626    // that already exists, check instead whether the rate of our proposed edge is <= the existing
627    // edge's rate. If so, the data availability/consumption is transitively guaranteed.
628    for (const auto u : V) {
629        for (auto ei : make_iterator_range(in_edges(u, G))) {
630            const auto v = source(ei, G);
631            const Channel & ci = G[ei];
632            for (auto ej : make_iterator_range(out_edges(u, G))) {
633                const auto w = target(ej, G);
634                // ci.rate = BOUND_RATIO(u, v) * (STRIDE(u) / STRIDE(v))
635                const auto scaling = RateValue(G[v]->getStride(), G[w]->getStride());
636                const auto rate = ci.ratio * scaling;
637                bool insert = true;
638                for (auto ek : make_iterator_range(in_edges(w, G))) {
639                    // do we already have a vw edge?
640                    if (source(ek, G) == v) {
641                        Channel & ck = G[ek];
642                        if (rate <= ck.ratio) {
643                            ck.buffer = nullptr;
644                        }
645                        insert = false;
646                    }
647                }
648                if (insert) {
649                    add_edge(v, w, Channel{rate}, G);
650                }
651            }
652        }
653    }
654
655    // remove any closure edges from G
656    remove_edge_if([&G](const ChannelGraph::edge_descriptor e) { return G[e].buffer == nullptr; }, G);
657
658    for (const auto u : V) {
659        if (in_degree(u, G) == 0) {
660            // If we do not need to check any of its inputs, we can avoid testing any output of a kernel
661            // with a rate ratio of at least 1
662            remove_out_edge_if(u, [&G](const ChannelGraph::edge_descriptor e) {
663                return G[e].ratio >= RateValue{1, 1};
664            }, G);
665        }
666    }
667
668    return G;
669}
670
671/** ------------------------------------------------------------------------------------------------------------- *
672 * @brief executeKernel
673 ** ------------------------------------------------------------------------------------------------------------- */
674void PipelineGenerator::execute(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
675
676    const Kernel * const kernel = kernels[index];
677    b->setKernel(kernel);
678
679    const auto & inputs = kernel->getStreamInputs();
680    const auto & outputs = kernel->getStreamOutputs();
681
682    BasicBlock * const kernelEntry = b->GetInsertBlock();
683    BasicBlock * const kernelCode = b->CreateBasicBlock(kernel->getName());
684    kernelFinished = b->CreateBasicBlock(kernel->getName() + "Finished");
685    BasicBlock * const kernelExit = b->CreateBasicBlock(kernel->getName() + "Exit");
686
687    b->CreateUnlikelyCondBr(b->getTerminationSignal(), kernelExit, kernelCode);
688
689    b->SetInsertPoint(kernelFinished);
690    terminated = b->CreatePHI(b->getInt1Ty(), 2);
691    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
692        madeProgress = b->CreatePHI(b->getInt1Ty(), 3);
693    }
694    b->SetInsertPoint(kernelExit);
695    PHINode * const isFinal = b->CreatePHI(b->getInt1Ty(), 2, kernel->getName() + "_isFinal");
696    isFinal->addIncoming(b->getTrue(), kernelEntry);
697    isFinal->addIncoming(terminated, kernelFinished);
698
699    PHINode * progress = nullptr;
700    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
701        progress = b->CreatePHI(b->getInt1Ty(), 2, kernel->getName() + "_anyProgress");
702        progress->addIncoming(anyProgress, kernelEntry);
703        progress->addIncoming(madeProgress, kernelFinished);
704    }
705
706    // Since it is possible that a sole consumer of some stream could terminate early, set the
707    // initial consumed amount to the amount produced in this iteration.
708
709    // First determine the priorConsumedItemCounts, making sure
710    // that none of them are previous input buffers of this kernel!
711    std::vector<Value *> priorConsumedItemCount(inputs.size());
712
713    for (unsigned i = 0; i < inputs.size(); ++i) {
714        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
715        auto c = consumedItemCount.find(buffer);
716        if (c == consumedItemCount.end()) {
717            const auto p = producedItemCount.find(buffer);
718            assert (p != producedItemCount.end());
719            priorConsumedItemCount[i] = p->second;
720        } else {
721            priorConsumedItemCount[i] = c->second;
722        }
723    }
724
725    std::vector<PHINode *> consumedItemCountPhi(inputs.size());
726
727    for (unsigned i = 0; i < inputs.size(); ++i) {
728        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
729        PHINode * const consumedPhi = b->CreatePHI(b->getSizeTy(), 2);
730        auto c = consumedItemCount.find(buffer);
731        if (c == consumedItemCount.end()) {
732            consumedItemCount.emplace(buffer, consumedPhi);
733        } else {
734            c->second = consumedPhi;
735        }
736        consumedPhi->addIncoming(priorConsumedItemCount[i], kernelEntry);
737        consumedItemCountPhi[i] = consumedPhi;
738    }
739
740    b->SetInsertPoint(kernelCode);
741
742    Value * const finalStride = callKernel(b, index);
743
744    // TODO: add in some checks to verify the kernel actually adheres to its rate
745
746
747    BasicBlock * const kernelCodeExit = b->GetInsertBlock();
748    terminated->addIncoming(finalStride, kernelCodeExit);
749    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
750        madeProgress->addIncoming(b->getTrue(), kernelCodeExit);
751        anyProgress = progress;
752    }
753    b->CreateBr(kernelFinished);
754
755    b->SetInsertPoint(kernelFinished);
756
757    // update the consumed item counts
758    for (unsigned i = 0; i < inputs.size(); ++i) {
759        const Binding & input = inputs[i];
760        Value * const fullyProcessed = getFullyProcessedItemCount(b, input, terminated);
761        Value * const consumed = b->CreateUMin(priorConsumedItemCount[i], fullyProcessed);
762        consumedItemCountPhi[i]->addIncoming(consumed, kernelFinished);
763    }
764    b->CreateBr(kernelExit);
765
766    kernelExit->moveAfter(kernelFinished);
767
768    b->SetInsertPoint(kernelExit);
769
770    for (unsigned i = 0; i < outputs.size(); ++i) {
771        const Binding & output = outputs[i];
772        Value * const produced = b->getProducedItemCount(output.getName());
773        const StreamSetBuffer * const buffer = kernel->getStreamSetOutputBuffer(i);
774        // if some stream has no consumer, set the consumed item count to the produced item count
775        const auto c = lastConsumer.find(buffer);
776        assert (c != lastConsumer.end());
777        if (LLVM_UNLIKELY(c->second == kernel)) {
778            assert (buffer->getProducer() == kernel);
779            if (LLVM_UNLIKELY(output.getRate().isRelative())) {
780                continue;
781            }
782            b->setConsumedItemCount(output.getName(), produced);
783        } else { // otherwise record how many items were produced
784            assert (producedItemCount.count(buffer) == 0);
785            producedItemCount.emplace(buffer, produced);
786        }
787    }
788
789    // If this kernel is the last consumer of a input buffer, update the consumed count for that buffer.
790
791    // TODO: if all consumers process the data at a fixed rate, we can just set the consumed item count
792    // by the strideNo rather than tracking it.
793
794    // TODO: a kernel could take the same stream set for multiple arguments.
795
796    // TODO: if we can prove that this kernel cannot terminate before any prior consumer, this code
797    // could be executed in kernelFinished block.
798    for (unsigned i = 0; i < inputs.size(); ++i) {
799        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
800        const auto c = lastConsumer.find(buffer);
801        assert (c != lastConsumer.end());
802        if (LLVM_LIKELY(c->second == kernel)) {
803            Kernel * const producer = buffer->getProducer();
804            assert (producer != kernel);
805            const auto & output = producer->getStreamOutput(buffer);
806            if (output.getRate().isRelative()) continue;
807            b->setKernel(producer);
808            b->setConsumedItemCount(output.getName(), consumedItemCountPhi[i]);
809            b->setKernel(kernel);
810        }
811    }
812
813    dependencyGraph[index] = isFinal;
814}
815
816/** ------------------------------------------------------------------------------------------------------------- *
817 * @brief checkIfAllInputKernelsHaveFinished
818 ** ------------------------------------------------------------------------------------------------------------- */
819void PipelineGenerator::checkIfAllInputKernelsHaveFinished(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
820    const auto n = in_degree(index, dependencyGraph);
821    if (LLVM_UNLIKELY(n == 0)) {
822        noMore = b->getFalse();
823    } else {
824        noMore = b->getTrue();
825        for (auto e : make_iterator_range(in_edges(index, dependencyGraph))) {
826            const auto u = source(e, dependencyGraph);
827            Value * const finished = dependencyGraph[u];
828            noMore = b->CreateAnd(noMore, finished);
829        }
830    }
831}
832
833/** ------------------------------------------------------------------------------------------------------------- *
834 * @brief checkAvailableInputData
835 ** ------------------------------------------------------------------------------------------------------------- */
836void PipelineGenerator::checkAvailableInputData(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
837    const Kernel * const kernel = kernels[index];
838    b->setKernel(kernel);
839    for (auto e : make_iterator_range(in_edges(index, inputGraph))) {
840        const Channel & c = inputGraph[e];
841        const Binding & input = kernel->getStreamInput(c.operand);
842        Value * requiredInput = getStrideLength(b, kernel, input);
843        if (LLVM_UNLIKELY(input.hasLookahead())) {
844            Constant * const lookahead = b->getSize(input.getLookahead());
845            requiredInput = b->CreateAdd(requiredInput, lookahead);
846        }
847        const auto p = producedItemCount.find(c.buffer);
848        assert (p != producedItemCount.end());
849        Value * const produced = p->second;
850        Value * const processed = b->getNonDeferredProcessedItemCount(input);
851        Value * const unprocessed = b->CreateSub(produced, processed);
852        Value * const hasEnough = b->CreateICmpUGE(unprocessed, requiredInput);
853        terminated->addIncoming(b->getFalse(), b->GetInsertBlock());
854        if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
855            madeProgress->addIncoming(anyProgress, b->GetInsertBlock());
856        }
857        const auto prefix = kernel->getName() + "_" + input.getName();
858        BasicBlock * const hasSufficientInput = b->CreateBasicBlock(prefix + "_hasSufficientInput");
859        Value * const check = b->CreateOr(hasEnough, noMore);
860        b->CreateLikelyCondBr(check, hasSufficientInput, kernelFinished);
861        b->SetInsertPoint(hasSufficientInput);
862    }
863}
864
865/** ------------------------------------------------------------------------------------------------------------- *
866 * @brief checkAvailableOutputSpace
867 ** ------------------------------------------------------------------------------------------------------------- */
868void PipelineGenerator::checkAvailableOutputSpace(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
869    const Kernel * const kernel = kernels[index];
870    b->setKernel(kernel);
871    for (auto e : make_iterator_range(in_edges(index, outputGraph))) {
872        const Channel & c = outputGraph[e];
873        assert (c.buffer->getProducer() == kernel);
874        const Binding & output = kernel->getStreamOutput(c.buffer);
875        Value * requiredSpace = getStrideLength(b, kernel, output);
876        const auto & name = output.getName();
877        Value * const produced = b->getNonDeferredProducedItemCount(output);
878        Value * const consumed = b->getConsumedItemCount(name);
879        Value * const unconsumed = b->CreateSub(produced, consumed);
880        requiredSpace = b->CreateAdd(requiredSpace, unconsumed);
881        Value * const capacity = b->getCapacity(name);
882        Value * const check = b->CreateICmpULE(requiredSpace, capacity);
883        terminated->addIncoming(b->getFalse(), b->GetInsertBlock());
884        if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
885            madeProgress->addIncoming(anyProgress, b->GetInsertBlock());
886        }
887        const auto prefix = kernel->getName() + "_" + name;
888        BasicBlock * const hasOutputSpace = b->CreateBasicBlock(prefix + "_hasOutputSpace");
889        b->CreateLikelyCondBr(check, hasOutputSpace, kernelFinished);
890        b->SetInsertPoint(hasOutputSpace);
891    }
892}
893
894/** ------------------------------------------------------------------------------------------------------------- *
895 * @brief getStrideLength
896 ** ------------------------------------------------------------------------------------------------------------- */
897Value * PipelineGenerator::getStrideLength(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel, const Binding & binding) {
898    Value * strideLength = nullptr;
899    const ProcessingRate & rate = binding.getRate();
900    if (rate.isPopCount() || rate.isNegatedPopCount()) {
901        Port refPort; unsigned refIndex;
902        std::tie(refPort, refIndex) = kernel->getStreamPort(rate.getReference());
903        const Binding & ref = kernel->getStreamInput(refIndex);
904        Value * markers = b->loadInputStreamBlock(ref.getName(), b->getSize(0));
905        if (rate.isNegatedPopCount()) {
906            markers = b->CreateNot(markers);
907        }
908        strideLength = b->bitblock_popcount(markers);
909    } else if (binding.hasAttribute(kernel::Attribute::KindId::AlwaysConsume)) {
910        const auto lb = kernel->getLowerBound(rate);
911        strideLength = b->getSize(std::max(ceiling(lb * kernel->getStride()), 1U));
912    } else {
913        const auto ub = kernel->getUpperBound(rate); assert (ub > 0);
914        strideLength = b->getSize(ceiling(ub * kernel->getStride()));
915    }
916    return strideLength;
917}
918
919/** ------------------------------------------------------------------------------------------------------------- *
920 * @brief callKernel
921 ** ------------------------------------------------------------------------------------------------------------- */
922Value * PipelineGenerator::callKernel(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
923
924    const Kernel * const kernel = kernels[index];
925    b->setKernel(kernel);
926
927    checkIfAllInputKernelsHaveFinished(b, index);
928
929    checkAvailableInputData(b, index);
930
931    checkAvailableOutputSpace(b, index);
932
933    applyOutputBufferExpansions(b, kernel);
934
935    #ifndef DISABLE_COPY_TO_OVERFLOW
936    // Store how many items we produced by this kernel in the prior iteration. We'll use this to determine when
937    // to mirror the first K segments
938    const auto & outputs = kernel->getStreamOutputs();
939    const auto m = outputs.size();
940
941    std::vector<Value *> initiallyProducedItemCount(m, nullptr);
942    for (unsigned i = 0; i < m; ++i) {
943        const Binding & output = outputs[i];
944        const auto & name = output.getName();
945        const StreamSetBuffer * const buffer = kernel->getOutputStreamSetBuffer(name);
946        if (isConsumedAtNonFixedRate.count(buffer)) {
947            initiallyProducedItemCount[i] = b->getProducedItemCount(name);
948        }
949    }
950    #endif
951
952    runKernel(b, kernel);
953
954    #ifndef DISABLE_COPY_TO_OVERFLOW
955    // For each buffer with an overflow region of K blocks, overwrite the overflow with the first K blocks of
956    // data to ensure that even if this stream is produced at a fixed rate but consumed at a bounded rate,
957    // every kernel has a consistent view of the stream data.
958    for (unsigned i = 0; i < m; ++i) {
959        const Binding & output = outputs[i];
960        const auto & name = output.getName();
961        if (initiallyProducedItemCount[i]) {
962            Value * const bufferSize = b->getCapacity(name);
963            Value * const prior = initiallyProducedItemCount[i];
964            Value * const offset = b->CreateURem(prior, bufferSize);
965            Value * const produced = b->getNonDeferredProducedItemCount(output);
966            Value * const buffered = b->CreateAdd(offset, b->CreateSub(produced, prior));
967            BasicBlock * const copyBack = b->CreateBasicBlock(name + "MirrorOverflow");
968            BasicBlock * const done = b->CreateBasicBlock(name + "MirrorOverflowDone");
969            b->CreateCondBr(b->CreateICmpUGT(buffered, bufferSize), copyBack, done);
970            b->SetInsertPoint(copyBack);
971            b->CreateCopyToOverflow(name);
972            b->CreateBr(done);
973            b->SetInsertPoint(done);
974        }
975    }
976    #endif
977
978    return b->getTerminationSignal();
979}
980
981/** ------------------------------------------------------------------------------------------------------------- *
982 * @brief runKernel
983 ** ------------------------------------------------------------------------------------------------------------- */
984void PipelineGenerator::runKernel(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel) {
985
986    const auto & inputs = kernel->getStreamInputs();
987    const auto n = inputs.size();
988    std::vector<Value *> arguments(n + 2);
989
990    Value * isFinal = noMore;
991
992    for (unsigned i = 0; i < n; ++i) {
993        const Binding & input = inputs[i];
994        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
995
996        const auto p = producedItemCount.find(buffer);
997        assert (p != producedItemCount.end());
998        Value * const produced = p->second;
999
1000        const ProcessingRate & rate = input.getRate();
1001        if (rate.isPopCount()) {
1002            arguments[i + 2] = produced;
1003        } else {
1004            const unsigned strideSize = ceiling(kernel->getUpperBound(rate) * kernel->getStride());
1005            Value * const processed = b->getNonDeferredProcessedItemCount(input);
1006            Value * const limit = b->CreateAdd(processed, b->getSize(strideSize * codegen::SegmentSize));
1007            Value * const hasPartial = b->CreateICmpULT(produced, limit);
1008            arguments[i + 2] = b->CreateSelect(hasPartial, produced, limit);
1009            isFinal = b->CreateAnd(isFinal, hasPartial);
1010        }
1011    }
1012
1013    // TODO: pass in a strideNo for fixed rate streams to allow the kernel to calculate the current avail,
1014    // processed, and produced counts
1015
1016    arguments[0] = kernel->getInstance();
1017    arguments[1] = isFinal;
1018
1019    b->createDoSegmentCall(arguments);
1020}
1021
1022
1023/** ------------------------------------------------------------------------------------------------------------- *
1024 * @brief applyOutputBufferExpansions
1025 ** ------------------------------------------------------------------------------------------------------------- */
1026void PipelineGenerator::applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel) {
1027    const auto & outputs = kernel->getStreamSetOutputBuffers();
1028    for (unsigned i = 0; i < outputs.size(); i++) {
1029        if (isa<DynamicBuffer>(outputs[i])) {
1030            const auto & output = kernel->getStreamOutput(i);
1031            const auto baseSize = ceiling(kernel->getUpperBound(output.getRate()) * kernel->getStride() * codegen::SegmentSize);
1032            if (LLVM_LIKELY(baseSize > 0)) {
1033                const auto & name = output.getName();
1034                Value * const produced = b->getProducedItemCount(name);
1035                Value * const consumed = b->getConsumedItemCount(name);
1036                Value * const unconsumed = b->CreateSub(produced, consumed);
1037                Value * const required = b->CreateAdd(unconsumed, b->getSize(2 * baseSize));
1038                b->setCapacity(name, required);
1039            }
1040        }
1041    }
1042}
1043
1044/** ------------------------------------------------------------------------------------------------------------- *
1045 * @brief getFullyProcessedItemCount
1046 ** ------------------------------------------------------------------------------------------------------------- */
1047inline Value * PipelineGenerator::getFullyProcessedItemCount(const std::unique_ptr<KernelBuilder> & b, const Binding & input, Value * const isFinal) const {
1048    Value * const processed = b->getProcessedItemCount(input.getName());
1049    if (LLVM_UNLIKELY(input.hasAttribute(kernel::Attribute::KindId::BlockSize))) {
1050        // If the input rate has a block size attribute then --- for the purpose of determining how many
1051        // items have been consumed --- we consider a stream set to be fully processed when an entire
1052        // block (stride?) has been processed.
1053        Constant * const BLOCK_WIDTH = b->getSize(b->getBitBlockWidth());
1054        Value * const partial = b->CreateAnd(processed, ConstantExpr::getNeg(BLOCK_WIDTH));
1055        return b->CreateSelect(isFinal, processed, partial);
1056    }
1057    return processed;
1058}
1059
1060/** ------------------------------------------------------------------------------------------------------------- *
1061 * @brief finalize
1062 ** ------------------------------------------------------------------------------------------------------------- */
1063Value * PipelineGenerator::finalize(const std::unique_ptr<KernelBuilder> & b) {
1064    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
1065        ConstantInt * const ZERO = b->getSize(0);
1066        ConstantInt * const ONE = b->getSize(1);
1067        ConstantInt * const TWO = b->getSize(2);
1068        Value * const count = b->CreateSelect(anyProgress, ZERO, b->CreateAdd(deadLockCounter, ONE));
1069        b->CreateAssert(b->CreateICmpNE(count, TWO), "Dead lock detected: pipeline could not progress after two iterations");
1070        deadLockCounter->addIncoming(count, b->GetInsertBlock());
1071    }
1072    // return whether each sink has terminated
1073    Value * final = b->getTrue();
1074    for (const auto u : make_iterator_range(vertices(dependencyGraph))) {
1075        if (out_degree(u, dependencyGraph) == 0) {
1076            final = b->CreateAnd(final, dependencyGraph[u]);
1077        }
1078    }
1079    return final;
1080}
1081
1082/** ------------------------------------------------------------------------------------------------------------- *
1083 * @brief printGraph
1084 ** ------------------------------------------------------------------------------------------------------------- */
1085void PipelineGenerator::printGraph(const ChannelGraph & G) const {
1086
1087    auto & out = errs();
1088
1089    out << "digraph G {\n";
1090    for (auto u : make_iterator_range(vertices(G))) {
1091        assert (G[u] == kernels[u]);
1092        if (in_degree(u, G) > 0 || out_degree(u, G) > 0) {
1093            out << "v" << u << " [label=\"" << u << ": "
1094                << G[u]->getName() << "\"];\n";
1095        }
1096    }
1097
1098    for (auto e : make_iterator_range(edges(G))) {
1099        const Channel & c = G[e];
1100        const auto s = source(e, G);
1101        const auto t = target(e, G);
1102
1103        out << "v" << s << " -> v" << t
1104            << " [label=\""
1105
1106
1107
1108            << c.ratio.numerator() << " / " << c.ratio.denominator()
1109            << "\"];\n";
1110    }
1111
1112    out << "}\n\n";
1113    out.flush();
1114
1115}
1116
1117/** ------------------------------------------------------------------------------------------------------------- *
1118 * @brief printGraph
1119 ** ------------------------------------------------------------------------------------------------------------- */
1120void PipelineGenerator::printGraph(const DependencyGraph & G) const {
1121
1122    auto & out = errs();
1123
1124    out << "digraph G {\n";
1125    for (auto u : make_iterator_range(vertices(G))) {
1126            out << "v" << u << " [label=\"" << u << ": "
1127                << kernels[u]->getName() << "\"];\n";
1128    }
1129
1130    for (auto e : make_iterator_range(edges(G))) {
1131        const auto s = source(e, G);
1132        const auto t = target(e, G);
1133        out << "v" << s << " -> v" << t << ";\n";
1134    }
1135
1136    out << "}\n\n";
1137    out.flush();
1138
1139}
Note: See TracBrowser for help on using the repository browser.