source: icGREP/icgrep-devel/icgrep/toolchain/pipeline.cpp @ 5998

Last change on this file since 5998 was 5998, checked in by nmedfort, 12 months ago

Added temporary buffer functionality to the pipeline for single stream source buffers. Fixed memory leak from UCD::UnicodeBreakRE()

File size: 55.8 KB
Line 
1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
6#include "pipeline.h"
7#include <toolchain/toolchain.h>
8#include <kernels/kernel.h>
9#include <kernels/streamset.h>
10#include <llvm/IR/Module.h>
11#include <boost/container/flat_set.hpp>
12#include <boost/container/flat_map.hpp>
13#include <boost/graph/adjacency_list.hpp>
14#include <boost/range/adaptor/reversed.hpp>
15#include <kernels/kernel_builder.h>
16#include <llvm/Support/raw_ostream.h>
17
18using namespace kernel;
19using namespace parabix;
20using namespace llvm;
21using namespace boost;
22using namespace boost::container;
23
24#define DISABLE_COPY_TO_OVERFLOW
25
26using Port = Kernel::Port;
27
28Function * makeThreadFunction(const std::unique_ptr<kernel::KernelBuilder> & b, const std::string & name) {
29    Function * const f = Function::Create(FunctionType::get(b->getVoidTy(), {b->getVoidPtrTy()}, false), Function::InternalLinkage, name, b->getModule());
30    f->setCallingConv(CallingConv::C);
31    f->arg_begin()->setName("state");
32    return f;
33}
34
35class PipelineGenerator {
36public:
37
38    template <typename Value>
39    using StreamSetBufferMap = flat_map<const StreamSetBuffer *, Value>;
40
41    using RateValue = ProcessingRate::RateValue;
42
43    PipelineGenerator(const std::vector<Kernel *> & kernels)
44    : kernels(kernels)
45    , terminated(nullptr)
46    , noMore(nullptr)
47    , deadLockCounter(nullptr)
48    , anyProgress(nullptr)
49    , madeProgress(nullptr) {
50
51    }
52
53    struct Channel {
54        Channel() = default;
55        Channel(const RateValue & ratio, const StreamSetBuffer * const buffer = nullptr, const unsigned operand = 0)
56        : ratio(ratio), buffer(buffer), operand(operand) { }
57
58        RateValue               ratio;
59        const StreamSetBuffer * buffer;
60        unsigned                operand;
61    };
62
63    using ChannelGraph = adjacency_list<vecS, vecS, bidirectionalS, const Kernel *, Channel>;
64
65    using DependencyGraph = adjacency_list<hash_setS, vecS, bidirectionalS, Value *>;
66
67    using KernelMap = flat_map<const Kernel *, unsigned>;
68
69    void initialize(const std::unique_ptr<KernelBuilder> & b, BasicBlock * const entryBlock);
70
71    void execute(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
72
73    Value * finalize(const std::unique_ptr<KernelBuilder> & b);
74
75protected:
76
77    ChannelGraph makeInputGraph() const;
78
79    ChannelGraph makeOutputGraph() const;
80
81    DependencyGraph makeDependencyGraph() const;
82
83    template<class VertexList>
84    ChannelGraph pruneGraph(ChannelGraph && G, VertexList && V) const;
85
86    bool isPotentiallyUnsafeInputBuffer(const StreamSetBuffer * const buffer);
87
88    void allocateTemporaryBufferPointerArray(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel);
89
90    void checkIfAllInputKernelsHaveFinished(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
91
92    void checkAvailableInputData(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
93
94    void checkAvailableOutputSpace(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
95
96    Value * getStrideLength(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel, const Binding & binding);
97
98    Value * callKernel(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
99
100    void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel *kernel);
101
102    void runKernel(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel);
103
104    void allocateTemporaryBuffers(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel, Value * const requiresTemporaryBuffers);
105
106    void freeTemporaryBuffers(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel, Value * const requiresTemporaryBuffers);
107
108    Value * getFullyProcessedItemCount(const std::unique_ptr<KernelBuilder> & b, const Binding & binding, Value * const final) const;
109
110    void printGraph(const ChannelGraph & G) const;
111
112    void printGraph(const DependencyGraph & G) const;
113
114private:
115
116    const std::vector<Kernel *> &       kernels;
117    PHINode *                           terminated;
118
119    Value *                             noMore;
120
121    DependencyGraph                     dependencyGraph;
122    ChannelGraph                        inputGraph;
123    ChannelGraph                        outputGraph;
124
125    std::vector<Value *>                temporaryBufferPtrs;
126
127    BasicBlock *                        kernelFinished;
128
129    PHINode *                           deadLockCounter;
130    Value *                             anyProgress;
131    PHINode *                           madeProgress;
132
133    StreamSetBufferMap<Value *>         producedItemCount;
134    StreamSetBufferMap<Value *>         consumedItemCount;
135    StreamSetBufferMap<const Kernel *>  lastConsumer;
136    flat_set<const StreamSetBuffer *>   isConsumedAtNonFixedRate;
137};
138
139/** ------------------------------------------------------------------------------------------------------------- *
140 * @brief generateSegmentParallelPipeline
141 *
142 * Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
143 * operating over an input stream set S, a segment-parallel implementation divides the input
144 * into segments and coordinates a set of T <= K threads to each process one segment at a time.
145 * Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
146 * fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
147 ** ------------------------------------------------------------------------------------------------------------- */
148void generateSegmentParallelPipeline(const std::unique_ptr<KernelBuilder> & b, const std::vector<Kernel *> & kernels) {
149
150    const unsigned n = kernels.size();
151    Module * const m = b->getModule();
152    IntegerType * const sizeTy = b->getSizeTy();
153    PointerType * const voidPtrTy = b->getVoidPtrTy();
154    Constant * nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
155    std::vector<Type *> structTypes;
156    codegen::BufferSegments = std::max(codegen::BufferSegments, codegen::ThreadNum);
157
158    Value * instance[n];
159    for (unsigned i = 0; i < n; ++i) {
160        instance[i] = kernels[i]->getInstance();
161        structTypes.push_back(instance[i]->getType());
162    }
163    StructType * const sharedStructType = StructType::get(m->getContext(), structTypes);
164    StructType * const threadStructType = StructType::get(m->getContext(), {sharedStructType->getPointerTo(), sizeTy});
165
166    const auto ip = b->saveIP();
167
168    Function * const threadFunc = makeThreadFunction(b, "segment");
169    auto args = threadFunc->arg_begin();
170
171    // -------------------------------------------------------------------------------------------------------------------------
172    // MAKE SEGMENT PARALLEL PIPELINE THREAD
173    // -------------------------------------------------------------------------------------------------------------------------
174
175     // Create the basic blocks for the thread function.
176    BasicBlock * entryBlock = BasicBlock::Create(b->getContext(), "entry", threadFunc);
177    b->SetInsertPoint(entryBlock);
178
179    Value * const threadStruct = b->CreateBitCast(&*(args), threadStructType->getPointerTo());
180
181    Value * const sharedStatePtr = b->CreateLoad(b->CreateGEP(threadStruct, {b->getInt32(0), b->getInt32(0)}));
182    for (unsigned k = 0; k < n; ++k) {
183        Value * ptr = b->CreateLoad(b->CreateGEP(sharedStatePtr, {b->getInt32(0), b->getInt32(k)}));
184        kernels[k]->setInstance(ptr);
185    }
186    Value * const segOffset = b->CreateLoad(b->CreateGEP(threadStruct, {b->getInt32(0), b->getInt32(1)}));
187
188    PipelineGenerator G(kernels);
189
190    BasicBlock * const segmentLoop = b->CreateBasicBlock("segmentLoop");
191    b->CreateBr(segmentLoop);
192
193    b->SetInsertPoint(segmentLoop);   
194    PHINode * const segNo = b->CreatePHI(b->getSizeTy(), 2, "segNo");
195    segNo->addIncoming(segOffset, entryBlock);
196    G.initialize(b, entryBlock);
197
198    Value * cycleCountStart = nullptr;
199    Value * cycleCountEnd = nullptr;
200    if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
201        cycleCountStart = b->CreateReadCycleCounter();
202    }
203
204    const bool serialize = codegen::DebugOptionIsSet(codegen::SerializeThreads);
205
206    for (unsigned k = 0; k < n; ++k) {
207
208        const Kernel * const kernel = kernels[k];
209
210        BasicBlock * const kernelWait = b->CreateBasicBlock(kernel->getName() + "Wait");
211        b->CreateBr(kernelWait);
212
213        b->SetInsertPoint(kernelWait);
214        b->setKernel(kernels[serialize ? (n - 1) : k]);
215        Value * const processedSegmentCount = b->acquireLogicalSegmentNo();
216        assert (processedSegmentCount->getType() == segNo->getType());
217        Value * const ready = b->CreateICmpEQ(segNo, processedSegmentCount);
218
219        BasicBlock * const kernelCheck = b->CreateBasicBlock(kernel->getName() + "Check");
220        b->CreateCondBr(ready, kernelCheck, kernelWait);
221
222        b->SetInsertPoint(kernelCheck);
223
224        G.execute(b, k);
225
226        b->releaseLogicalSegmentNo(b->CreateAdd(segNo, b->getSize(1)));
227
228        if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
229            cycleCountEnd = b->CreateReadCycleCounter();
230            Value * counterPtr = b->getCycleCountPtr();
231            b->CreateStore(b->CreateAdd(b->CreateLoad(counterPtr), b->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
232            cycleCountStart = cycleCountEnd;
233        }
234
235    }
236
237    segNo->addIncoming(b->CreateAdd(segNo, b->getSize(codegen::ThreadNum)), b->GetInsertBlock());
238    Value * const finished = G.finalize(b);
239    BasicBlock * const segmentExit = b->CreateBasicBlock("segmentExit");
240    b->CreateUnlikelyCondBr(finished, segmentExit, segmentLoop);
241
242    b->SetInsertPoint(segmentExit);
243    // only call pthread_exit() within spawned threads; otherwise it'll be equivalent to calling exit() within the process
244    BasicBlock * const exitThread = b->CreateBasicBlock("ExitThread");
245    BasicBlock * const exitFunction = b->CreateBasicBlock("ExitProcessFunction");   
246    b->CreateCondBr(b->CreateIsNull(segOffset), exitFunction, exitThread);
247    b->SetInsertPoint(exitThread);
248    b->CreatePThreadExitCall(nullVoidPtrVal);
249    b->CreateBr(exitFunction);   
250    b->SetInsertPoint(exitFunction);
251    b->CreateRetVoid();
252
253    // -------------------------------------------------------------------------------------------------------------------------
254    b->restoreIP(ip);
255
256    for (unsigned i = 0; i < n; ++i) {
257        kernels[i]->setInstance(instance[i]);
258    }
259
260    // -------------------------------------------------------------------------------------------------------------------------
261    // MAKE SEGMENT PARALLEL PIPELINE DRIVER
262    // -------------------------------------------------------------------------------------------------------------------------
263    const unsigned threads = codegen::ThreadNum - 1;
264    assert (codegen::ThreadNum > 0);
265    Type * const pthreadsTy = ArrayType::get(sizeTy, threads);
266    AllocaInst * const pthreads = b->CreateAlloca(pthreadsTy);
267    Value * threadIdPtr[threads];
268
269    for (unsigned i = 0; i < threads; ++i) {
270        threadIdPtr[i] = b->CreateGEP(pthreads, {b->getInt32(0), b->getInt32(i)});
271    }
272
273    for (unsigned i = 0; i < n; ++i) {
274        b->setKernel(kernels[i]);
275        b->releaseLogicalSegmentNo(b->getSize(0));
276    }
277
278    AllocaInst * const sharedStruct = b->CreateCacheAlignedAlloca(sharedStructType);
279    for (unsigned i = 0; i < n; ++i) {
280        Value * ptr = b->CreateGEP(sharedStruct, {b->getInt32(0), b->getInt32(i)});
281        b->CreateStore(kernels[i]->getInstance(), ptr);
282    }
283
284    // use the process thread to handle the initial segment function after spawning (n - 1) threads to handle the subsequent offsets
285    for (unsigned i = 0; i < threads; ++i) {
286        AllocaInst * const threadState = b->CreateAlloca(threadStructType);
287        b->CreateStore(sharedStruct, b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(0)}));
288        b->CreateStore(b->getSize(i + 1), b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(1)}));
289        b->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, threadFunc, threadState);
290    }
291
292    AllocaInst * const threadState = b->CreateAlloca(threadStructType);
293    b->CreateStore(sharedStruct, b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(0)}));
294    b->CreateStore(b->getSize(0), b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(1)}));
295    b->CreateCall(threadFunc, b->CreatePointerCast(threadState, voidPtrTy));
296
297    AllocaInst * const status = b->CreateAlloca(voidPtrTy);
298    for (unsigned i = 0; i < threads; ++i) {
299        Value * threadId = b->CreateLoad(threadIdPtr[i]);
300        b->CreatePThreadJoinCall(threadId, status);
301    }
302   
303    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
304        for (const Kernel * kernel : kernels) {
305            b->setKernel(kernel);
306            const auto & inputs = kernel->getStreamInputs();
307            const auto & outputs = kernel->getStreamOutputs();
308            Value * items = nullptr;
309            if (inputs.empty()) {
310                items = b->getProducedItemCount(outputs[0].getName());
311            } else {
312                items = b->getProcessedItemCount(inputs[0].getName());
313            }
314            Value * fItems = b->CreateUIToFP(items, b->getDoubleTy());
315            Value * cycles = b->CreateLoad(b->getCycleCountPtr());
316            Value * fCycles = b->CreateUIToFP(cycles, b->getDoubleTy());
317            const auto formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
318            Value * stringPtr = b->CreatePointerCast(b->GetString(formatString), b->getInt8PtrTy());
319            b->CreateCall(b->GetDprintf(), {b->getInt32(2), stringPtr, fItems, fCycles, b->CreateFDiv(fCycles, fItems)});
320        }
321    }
322   
323}
324
325
326/** ------------------------------------------------------------------------------------------------------------- *
327 * @brief generatePipelineLoop
328 ** ------------------------------------------------------------------------------------------------------------- */
329void generatePipelineLoop(const std::unique_ptr<KernelBuilder> & b, const std::vector<Kernel *> & kernels) {
330
331    // Create the basic blocks for the loop.
332    BasicBlock * const entryBlock = b->GetInsertBlock();
333    BasicBlock * const pipelineLoop = b->CreateBasicBlock("pipelineLoop");
334
335    PipelineGenerator G(kernels);
336
337    b->CreateBr(pipelineLoop);
338
339    b->SetInsertPoint(pipelineLoop);   
340    PHINode * const segNo = b->CreatePHI(b->getSizeTy(), 2, "segNo");
341    segNo->addIncoming(b->getSize(0), entryBlock);
342    G.initialize(b, entryBlock);
343
344    Value * const nextSegNo = b->CreateAdd(segNo, b->getSize(1));
345
346    Value * cycleCountStart = nullptr;
347    Value * cycleCountEnd = nullptr;
348    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
349        cycleCountStart = b->CreateReadCycleCounter();
350    }
351
352    for (unsigned i = 0; i < kernels.size(); ++i) {
353
354        G.execute(b, i);
355
356        b->releaseLogicalSegmentNo(nextSegNo);
357
358        if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
359            cycleCountEnd = b->CreateReadCycleCounter();
360            Value * counterPtr = b->getCycleCountPtr();
361            b->CreateStore(b->CreateAdd(b->CreateLoad(counterPtr), b->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
362            cycleCountStart = cycleCountEnd;
363        }
364    }
365
366    segNo->addIncoming(nextSegNo, b->GetInsertBlock());
367    BasicBlock * const pipelineExit = b->CreateBasicBlock("pipelineExit");
368    Value * const finished = G.finalize(b);
369    b->CreateCondBr(finished, pipelineExit, pipelineLoop);
370
371    b->SetInsertPoint(pipelineExit);
372    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
373        for (unsigned k = 0; k < kernels.size(); k++) {
374            auto & kernel = kernels[k];
375            b->setKernel(kernel);
376            const auto & inputs = kernel->getStreamInputs();
377            const auto & outputs = kernel->getStreamOutputs();
378            Value * items = nullptr;
379            if (inputs.empty()) {
380                items = b->getProducedItemCount(outputs[0].getName());
381            } else {
382                items = b->getProcessedItemCount(inputs[0].getName());
383            }
384            Value * fItems = b->CreateUIToFP(items, b->getDoubleTy());
385            Value * cycles = b->CreateLoad(b->getCycleCountPtr());
386            Value * fCycles = b->CreateUIToFP(cycles, b->getDoubleTy());
387            const auto formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
388            Value * stringPtr = b->CreatePointerCast(b->GetString(formatString), b->getInt8PtrTy());
389            b->CreateCall(b->GetDprintf(), {b->getInt32(2), stringPtr, fItems, fCycles, b->CreateFDiv(fCycles, fItems)});
390        }
391    }
392
393}
394
395/** ------------------------------------------------------------------------------------------------------------- *
396 * @brief initialize
397 ** ------------------------------------------------------------------------------------------------------------- */
398void PipelineGenerator::initialize(const std::unique_ptr<KernelBuilder> & b, BasicBlock * const entryBlock) {
399
400    dependencyGraph = makeDependencyGraph();
401    inputGraph = makeInputGraph();
402    outputGraph = makeOutputGraph();
403
404    for (Kernel * const kernel : kernels) {
405        const auto & inputs = kernel->getStreamInputs();
406        for (unsigned i = 0; i < inputs.size(); ++i) {
407            const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
408            if (kernel->requiresCopyBack(inputs[i]) && !buffer->isUnbounded()) {
409                if (LLVM_LIKELY(buffer->supportsCopyBack())) {
410                    isConsumedAtNonFixedRate.insert(buffer);
411                } else {
412    //                std::string tmp;
413    //                raw_string_ostream out(tmp);
414    //                out << kernel->getName() << " : " << name << " must have an overflow";
415    //                report_fatal_error(out.str());
416                }
417            }
418        }
419        // if this kernel consumes this buffer, update the last consumer
420        for (const StreamSetBuffer * const buffer : kernel->getStreamSetInputBuffers()) {
421            auto f = lastConsumer.find(buffer);
422            assert (f != lastConsumer.end());
423            f->second = kernel;
424        }
425        // incase some output is never consumed, make the kernel that produced it the initial "last consumer"
426        for (const StreamSetBuffer * const buffer : kernel->getStreamSetOutputBuffers()) {
427            assert (buffer->getProducer() == kernel);
428            assert (lastConsumer.count(buffer) == 0);
429            lastConsumer.emplace(buffer, kernel);
430        }
431    }
432
433    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
434        deadLockCounter = b->CreatePHI(b->getSizeTy(), 2, "deadLockCounter");
435        deadLockCounter->addIncoming(b->getSize(0), entryBlock);
436        anyProgress = b->getFalse();
437    }
438
439}
440
441/** ------------------------------------------------------------------------------------------------------------- *
442 * @brief makeTerminationGraph
443 *
444 * The input graph models whether a kernel could *consume* more data than may be produced by a preceeding kernel.
445 ** ------------------------------------------------------------------------------------------------------------- */
446PipelineGenerator::DependencyGraph PipelineGenerator::makeDependencyGraph() const {
447    const auto n = kernels.size();
448    DependencyGraph G(n);
449    KernelMap M;
450    // construct a kernel dependency graph
451    for (unsigned v = 0; v < n; ++v) {
452        const Kernel * const kernel = kernels[v];       
453        for (const StreamSetBuffer * buf : kernel->getStreamSetInputBuffers()) {
454            const auto f = M.find(buf->getProducer()); assert (f != M.end());
455            add_edge(f->second, v, G);
456        }
457        M.emplace(kernel, v);
458    }
459    // generate a transitive closure
460    for (unsigned u = 0; u < n; ++u) {
461        for (auto e : make_iterator_range(in_edges(u, G))) {
462            const auto s = source(e, G);
463            for (auto f : make_iterator_range(out_edges(u, G))) {
464                add_edge(s, target(f, G), G);
465            }
466        }
467    }
468    // then take the transitive reduction
469    std::vector<unsigned> sources;
470    for (unsigned u = n; u-- > 0; ) {
471        if (in_degree(u, G) > 0 && out_degree(u, G) > 0) {
472            for (auto e : make_iterator_range(in_edges(u, G))) {
473                sources.push_back(source(e, G));
474            }
475            std::sort(sources.begin(), sources.end());
476            for (auto e : make_iterator_range(out_edges(u, G))) {
477                remove_in_edge_if(target(e, G), [&G, &sources](const DependencyGraph::edge_descriptor f) {
478                    return std::binary_search(sources.begin(), sources.end(), source(f, G));
479                }, G);
480            }
481            sources.clear();
482        }
483    }
484    return G;
485}
486
487/** ------------------------------------------------------------------------------------------------------------- *
488 * @brief makeInputGraph
489 *
490 * The input graph models whether a kernel could *consume* more data than may be produced by a preceeding kernel.
491 ** ------------------------------------------------------------------------------------------------------------- */
492PipelineGenerator::ChannelGraph PipelineGenerator::makeInputGraph() const {
493    const auto n = kernels.size();
494    ChannelGraph G(n);
495    KernelMap M;
496    for (unsigned v = 0; v < n; ++v) {
497
498        const Kernel * const consumer = kernels[v];
499        G[v] = consumer;
500        M.emplace(consumer, v);
501
502        const auto & inputs = consumer->getStreamInputs();
503        for (unsigned i = 0; i < inputs.size(); ++i) {
504
505            const Binding & input = inputs[i];
506            auto ub_in = consumer->getUpperBound(input.getRate()) * consumer->getStride() * codegen::SegmentSize; assert (ub_in > 0);
507            if (input.hasLookahead()) {
508                ub_in += input.getLookahead();
509            }
510
511            const auto buffer = consumer->getStreamSetInputBuffer(i);
512            const Kernel * const producer = buffer->getProducer();
513            const Binding & output = producer->getStreamOutput(buffer);
514            const auto lb_out = producer->getLowerBound(output.getRate()) * producer->getStride() * codegen::SegmentSize;
515
516            const auto min_oi_ratio = lb_out / ub_in;
517            const auto f = M.find(producer); assert (f != M.end());
518            const auto u = f->second;
519            // If we have multiple inputs from the same kernel, we only need to consider the "slowest" one
520            bool slowest = true;
521            for (const auto e : make_iterator_range(in_edges(v, G))) {
522                if (source(e, G) == u) {
523                    const Channel & p = G[e];
524                    if (min_oi_ratio > p.ratio) {
525                        slowest = false;
526                    } else if (min_oi_ratio < p.ratio) {
527                        clear_in_edges(v, G);
528                    }
529                    break;
530                }
531            }
532            if (slowest) {
533                add_edge(u, v, Channel{min_oi_ratio, buffer, i}, G);
534            }
535        }
536    }
537
538    return pruneGraph(std::move(G), make_iterator_range(vertices(G)));
539}
540
541/** ------------------------------------------------------------------------------------------------------------- *
542 * @brief makeOutputGraph
543 *
544 * The output graph models whether a kernel could *produce* more data than may be consumed by a subsequent kernel.
545 ** ------------------------------------------------------------------------------------------------------------- */
546PipelineGenerator::ChannelGraph PipelineGenerator::makeOutputGraph() const {
547    const auto n = kernels.size();
548    ChannelGraph G(n);
549    KernelMap M;
550    for (unsigned v = 0; v < n; ++v) {
551        const Kernel * const consumer = kernels[v];
552        G[v] = consumer;
553        M.emplace(consumer, v);
554
555        const auto & inputs = consumer->getStreamInputs();
556        for (unsigned i = 0; i < inputs.size(); ++i) {
557            const auto buffer = consumer->getStreamSetInputBuffer(i);
558            if (isa<SourceBuffer>(buffer)) continue;
559            const Kernel * const producer = buffer->getProducer();
560            assert (consumer != producer);
561            const Binding & output = producer->getStreamOutput(buffer);
562            auto ub_out = producer->getUpperBound(output.getRate()) * producer->getStride() * codegen::SegmentSize;
563            if (ub_out > 0) { // unknown output rates are handled by reallocating their buffers
564                const Binding & input = inputs[i];
565                if (input.hasLookahead()) {
566                    const auto la = input.getLookahead();
567                    if (LLVM_UNLIKELY(ub_out <= la)) {
568                        llvm::report_fatal_error("lookahead exceeds segment size");
569                    }
570                    ub_out += la;
571                }
572                const auto lb_in = consumer->getLowerBound(input.getRate()) * consumer->getStride() * codegen::SegmentSize;
573                const auto min_io_ratio = lb_in / ub_out;
574                const auto f = M.find(producer); assert (f != M.end());
575                const auto u = f->second;
576                assert (v != u);
577                assert (G[u] == producer);
578                // If we have multiple inputs from the same kernel, we only need to consider the "fastest" one
579                bool fastest = true;
580                for (const auto e : make_iterator_range(in_edges(v, G))) {
581                    if (source(e, G) == u) {
582                        const Channel & p = G[e];
583                        if (min_io_ratio > p.ratio) {
584                            fastest = false;
585                        } else if (min_io_ratio < p.ratio) {
586                            clear_in_edges(v, G);
587                        }
588                        break;
589                    }
590                }
591                if (fastest) {
592                    add_edge(v, u, Channel{min_io_ratio, buffer, i}, G);
593                }
594            }
595        }
596    }
597
598    return pruneGraph(std::move(G), boost::adaptors::reverse(make_iterator_range(vertices(G))));
599}
600
601/** ------------------------------------------------------------------------------------------------------------- *
602 * @brief pruneGraph
603 ** ------------------------------------------------------------------------------------------------------------- */
604template<class VertexList>
605inline PipelineGenerator::ChannelGraph PipelineGenerator::pruneGraph(ChannelGraph && G, VertexList && V) const {
606    // Take a transitive closure of G but whenever we attempt to insert an edge into the closure
607    // that already exists, check instead whether the rate of our proposed edge is <= the existing
608    // edge's rate. If so, the data availability/consumption is transitively guaranteed.
609    for (const auto u : V) {
610        for (auto ei : make_iterator_range(in_edges(u, G))) {
611            const auto v = source(ei, G);
612            const Channel & ci = G[ei];
613            for (auto ej : make_iterator_range(out_edges(u, G))) {
614                const auto w = target(ej, G);
615                // ci.rate = BOUND_RATIO(u, v) * (STRIDE(u) / STRIDE(v))
616                const auto scaling = RateValue(G[v]->getStride(), G[w]->getStride());
617                const auto rate = ci.ratio * scaling;
618                bool insert = true;
619                for (auto ek : make_iterator_range(in_edges(w, G))) {
620                    // do we already have a vw edge?
621                    if (source(ek, G) == v) {
622                        Channel & ck = G[ek];
623                        if (rate <= ck.ratio) {
624                            ck.buffer = nullptr;
625                        }
626                        insert = false;
627                    }
628                }
629                if (insert) {
630                    add_edge(v, w, Channel{rate}, G);
631                }
632            }
633        }
634    }
635
636    // remove any closure edges from G
637    remove_edge_if([&G](const ChannelGraph::edge_descriptor e) { return G[e].buffer == nullptr; }, G);
638
639    for (const auto u : V) {
640        if (in_degree(u, G) == 0) {
641            // If we do not need to check any of its inputs, we can avoid testing any output of a kernel
642            // with a rate ratio of at least 1
643            remove_out_edge_if(u, [&G](const ChannelGraph::edge_descriptor e) {
644                return G[e].ratio >= RateValue{1, 1};
645            }, G);
646        }
647    }
648
649    return G;
650}
651
652/** ------------------------------------------------------------------------------------------------------------- *
653 * @brief executeKernel
654 ** ------------------------------------------------------------------------------------------------------------- */
655void PipelineGenerator::execute(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
656
657    const Kernel * const kernel = kernels[index];
658    b->setKernel(kernel);
659
660    const auto & inputs = kernel->getStreamInputs();
661    const auto & outputs = kernel->getStreamOutputs();
662
663    BasicBlock * const kernelEntry = b->GetInsertBlock();
664    BasicBlock * const kernelCode = b->CreateBasicBlock(kernel->getName());
665    kernelFinished = b->CreateBasicBlock(kernel->getName() + "Finished");
666    BasicBlock * const kernelExit = b->CreateBasicBlock(kernel->getName() + "Exit");
667
668    b->CreateUnlikelyCondBr(b->getTerminationSignal(), kernelExit, kernelCode);
669
670    b->SetInsertPoint(kernelFinished);
671    terminated = b->CreatePHI(b->getInt1Ty(), 2);
672    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
673        madeProgress = b->CreatePHI(b->getInt1Ty(), 3);
674    }
675    b->SetInsertPoint(kernelExit);
676    PHINode * const isFinal = b->CreatePHI(b->getInt1Ty(), 2, kernel->getName() + "_isFinal");
677    isFinal->addIncoming(b->getTrue(), kernelEntry);
678    isFinal->addIncoming(terminated, kernelFinished);
679
680    PHINode * progress = nullptr;
681    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
682        progress = b->CreatePHI(b->getInt1Ty(), 2, kernel->getName() + "_anyProgress");
683        progress->addIncoming(anyProgress, kernelEntry);
684        progress->addIncoming(madeProgress, kernelFinished);
685    }
686
687    // Since it is possible that a sole consumer of some stream could terminate early, set the
688    // initial consumed amount to the amount produced in this iteration.
689
690    // First determine the priorConsumedItemCounts, making sure
691    // that none of them are previous input buffers of this kernel!
692    std::vector<Value *> priorConsumedItemCount(inputs.size());
693
694    for (unsigned i = 0; i < inputs.size(); ++i) {
695        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
696        auto c = consumedItemCount.find(buffer);
697        if (c == consumedItemCount.end()) {
698            const auto p = producedItemCount.find(buffer);
699            assert (p != producedItemCount.end());
700            priorConsumedItemCount[i] = p->second;
701        } else {
702            priorConsumedItemCount[i] = c->second;
703        }
704    }
705
706    std::vector<PHINode *> consumedItemCountPhi(inputs.size());
707
708    for (unsigned i = 0; i < inputs.size(); ++i) {
709        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
710        PHINode * const consumedPhi = b->CreatePHI(b->getSizeTy(), 2);
711        auto c = consumedItemCount.find(buffer);
712        if (c == consumedItemCount.end()) {
713            consumedItemCount.emplace(buffer, consumedPhi);
714        } else {
715            c->second = consumedPhi;
716        }
717        consumedPhi->addIncoming(priorConsumedItemCount[i], kernelEntry);
718        consumedItemCountPhi[i] = consumedPhi;
719    }
720
721    b->SetInsertPoint(kernelCode);
722
723    Value * const finalStride = callKernel(b, index);
724
725    // TODO: add in some checks to verify the kernel actually adheres to its rate
726
727    BasicBlock * const kernelCodeExit = b->GetInsertBlock();
728    terminated->addIncoming(finalStride, kernelCodeExit);
729    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
730        madeProgress->addIncoming(b->getTrue(), kernelCodeExit);
731        anyProgress = progress;
732    }
733    b->CreateBr(kernelFinished);
734
735    b->SetInsertPoint(kernelFinished);
736
737    // update the consumed item counts
738    for (unsigned i = 0; i < inputs.size(); ++i) {
739        const Binding & input = inputs[i];
740        Value * const fullyProcessed = getFullyProcessedItemCount(b, input, terminated);
741        Value * const consumed = b->CreateUMin(priorConsumedItemCount[i], fullyProcessed);
742        consumedItemCountPhi[i]->addIncoming(consumed, kernelFinished);
743    }
744    b->CreateBr(kernelExit);
745
746    kernelExit->moveAfter(kernelFinished);
747
748    b->SetInsertPoint(kernelExit);
749
750    for (unsigned i = 0; i < outputs.size(); ++i) {
751        const Binding & output = outputs[i];
752        Value * const produced = b->getProducedItemCount(output.getName());
753        const StreamSetBuffer * const buffer = kernel->getStreamSetOutputBuffer(i);
754        // if some stream has no consumer, set the consumed item count to the produced item count
755        const auto c = lastConsumer.find(buffer);
756        assert (c != lastConsumer.end());
757        if (LLVM_UNLIKELY(c->second == kernel)) {
758            assert (buffer->getProducer() == kernel);
759            if (LLVM_UNLIKELY(output.getRate().isRelative())) {
760                continue;
761            }
762            b->setConsumedItemCount(output.getName(), produced);
763        } else { // otherwise record how many items were produced
764            assert (producedItemCount.count(buffer) == 0);
765            producedItemCount.emplace(buffer, produced);
766        }
767
768    }
769
770
771    // If this kernel is the last consumer of a input buffer, update the consumed count for that buffer.
772
773    // TODO: if all consumers process the data at a fixed rate, we can just set the consumed item count
774    // by the strideNo rather than tracking it.
775
776    // TODO: a kernel could take the same stream set for multiple arguments.
777
778    // TODO: if we can prove that this kernel cannot terminate before any prior consumer, this code
779    // could be executed in kernelFinished block.
780    for (unsigned i = 0; i < inputs.size(); ++i) {
781        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
782        const auto c = lastConsumer.find(buffer);
783        assert (c != lastConsumer.end());
784        if (LLVM_LIKELY(c->second == kernel)) {
785            Kernel * const producer = buffer->getProducer();
786            assert (producer != kernel);
787            const auto & output = producer->getStreamOutput(buffer);
788            if (output.getRate().isRelative()) continue;
789            b->setKernel(producer);
790            b->setConsumedItemCount(output.getName(), consumedItemCountPhi[i]);
791            b->setKernel(kernel);
792        }
793    }
794
795    dependencyGraph[index] = isFinal;
796}
797
798/** ------------------------------------------------------------------------------------------------------------- *
799 * @brief checkIfAllInputKernelsHaveFinished
800 ** ------------------------------------------------------------------------------------------------------------- */
801void PipelineGenerator::checkIfAllInputKernelsHaveFinished(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
802    const auto n = in_degree(index, dependencyGraph);
803    if (LLVM_UNLIKELY(n == 0)) {
804        noMore = b->getFalse();
805    } else {
806        noMore = b->getTrue();
807        for (auto e : make_iterator_range(in_edges(index, dependencyGraph))) {
808            const auto u = source(e, dependencyGraph);
809            Value * const finished = dependencyGraph[u];
810            noMore = b->CreateAnd(noMore, finished);
811        }
812    }
813}
814
815/** ------------------------------------------------------------------------------------------------------------- *
816 * @brief checkAvailableInputData
817 ** ------------------------------------------------------------------------------------------------------------- */
818void PipelineGenerator::checkAvailableInputData(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
819    const Kernel * const kernel = kernels[index];
820    b->setKernel(kernel);   
821    for (auto e : make_iterator_range(in_edges(index, inputGraph))) {
822        const Channel & c = inputGraph[e];
823        const Binding & input = kernel->getStreamInput(c.operand);
824
825        Value * requiredInput = getStrideLength(b, kernel, input);
826        if (LLVM_UNLIKELY(input.hasLookahead())) {
827            Constant * const lookahead = b->getSize(input.getLookahead());
828            requiredInput = b->CreateAdd(requiredInput, lookahead);
829        }
830        const auto p = producedItemCount.find(c.buffer);
831        assert (p != producedItemCount.end());
832        Value * const produced = p->second;
833        Value * const processed = b->getNonDeferredProcessedItemCount(input);
834        Value * const unprocessed = b->CreateSub(produced, processed);
835        Value * const hasEnough = b->CreateICmpUGE(unprocessed, requiredInput);
836        terminated->addIncoming(b->getFalse(), b->GetInsertBlock());
837        if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
838            madeProgress->addIncoming(anyProgress, b->GetInsertBlock());
839        }
840        const auto prefix = kernel->getName() + "_" + input.getName();
841        BasicBlock * const hasSufficientInput = b->CreateBasicBlock(prefix + "_hasSufficientInput");
842        Value * const check = b->CreateOr(hasEnough, noMore);
843        b->CreateLikelyCondBr(check, hasSufficientInput, kernelFinished);
844        b->SetInsertPoint(hasSufficientInput);
845    }
846}
847
848/** ------------------------------------------------------------------------------------------------------------- *
849 * @brief checkAvailableOutputSpace
850 ** ------------------------------------------------------------------------------------------------------------- */
851void PipelineGenerator::checkAvailableOutputSpace(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
852    const Kernel * const kernel = kernels[index];
853    b->setKernel(kernel);
854    for (auto e : make_iterator_range(in_edges(index, outputGraph))) {
855        const Channel & c = outputGraph[e];
856        assert (c.buffer->getProducer() == kernel);
857        const Binding & output = kernel->getStreamOutput(c.buffer);
858
859        Value * requiredSpace = getStrideLength(b, kernel, output);
860        const auto & name = output.getName();
861        Value * const produced = b->getNonDeferredProducedItemCount(output);
862        Value * const consumed = b->getConsumedItemCount(name);
863        Value * const unconsumed = b->CreateSub(produced, consumed);
864        requiredSpace = b->CreateAdd(requiredSpace, unconsumed);
865        Value * const capacity = b->getBufferedSize(name);
866        Value * const check = b->CreateICmpULE(requiredSpace, capacity);
867        terminated->addIncoming(b->getFalse(), b->GetInsertBlock());
868        if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
869            madeProgress->addIncoming(anyProgress, b->GetInsertBlock());
870        }
871        const auto prefix = kernel->getName() + "_" + name;
872        BasicBlock * const hasOutputSpace = b->CreateBasicBlock(prefix + "_hasOutputSpace");
873        b->CreateLikelyCondBr(check, hasOutputSpace, kernelFinished);
874        b->SetInsertPoint(hasOutputSpace);
875    }
876}
877
878/** ------------------------------------------------------------------------------------------------------------- *
879 * @brief getStrideLength
880 ** ------------------------------------------------------------------------------------------------------------- */
881Value * PipelineGenerator::getStrideLength(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel, const Binding & binding) {
882    Value * strideLength = nullptr;
883    const ProcessingRate & rate = binding.getRate();
884    if (rate.isPopCount() || rate.isNegatedPopCount()) {
885        Port refPort; unsigned refIndex;
886        std::tie(refPort, refIndex) = kernel->getStreamPort(rate.getReference());
887        const Binding & ref = kernel->getStreamInput(refIndex);
888        Value * markers = b->loadInputStreamBlock(ref.getName(), b->getSize(0));
889        if (rate.isNegatedPopCount()) {
890            markers = b->CreateNot(markers);
891        }
892        strideLength = b->bitblock_popcount(markers);
893    } else if (binding.hasAttribute(kernel::Attribute::KindId::AlwaysConsume)) {
894        const auto lb = kernel->getLowerBound(rate);
895        strideLength = b->getSize(ceiling(lb * kernel->getStride()));
896    } else {
897        const auto ub = kernel->getUpperBound(rate); assert (ub > 0);
898        strideLength = b->getSize(ceiling(ub * kernel->getStride()));
899    }
900    return strideLength;
901}
902
903/** ------------------------------------------------------------------------------------------------------------- *
904 * @brief callKernel
905 ** ------------------------------------------------------------------------------------------------------------- */
906Value * PipelineGenerator::callKernel(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
907
908    const Kernel * const kernel = kernels[index];
909    b->setKernel(kernel);
910
911    checkIfAllInputKernelsHaveFinished(b, index);
912
913    checkAvailableInputData(b, index);
914
915    checkAvailableOutputSpace(b, index);
916
917    applyOutputBufferExpansions(b, kernel);
918
919    #ifndef DISABLE_COPY_TO_OVERFLOW
920    // Store how many items we produced by this kernel in the prior iteration. We'll use this to determine when
921    // to mirror the first K segments
922    const auto & outputs = kernel->getStreamOutputs();
923    const auto m = outputs.size();
924
925    std::vector<Value *> initiallyProducedItemCount(m, nullptr);
926    for (unsigned i = 0; i < m; ++i) {
927        const Binding & output = outputs[i];
928        const auto & name = output.getName();
929        const StreamSetBuffer * const buffer = kernel->getOutputStreamSetBuffer(name);
930        if (isConsumedAtNonFixedRate.count(buffer)) {
931            initiallyProducedItemCount[i] = b->getProducedItemCount(name);
932        }
933    }
934    #endif
935
936    allocateTemporaryBufferPointerArray(b, kernel);
937
938    runKernel(b, kernel);
939
940    #ifndef DISABLE_COPY_TO_OVERFLOW
941    // For each buffer with an overflow region of K blocks, overwrite the overflow with the first K blocks of
942    // data to ensure that even if this stream is produced at a fixed rate but consumed at a bounded rate,
943    // every kernel has a consistent view of the stream data.
944    for (unsigned i = 0; i < m; ++i) {
945        const Binding & output = outputs[i];
946        const auto & name = output.getName();
947        if (initiallyProducedItemCount[i]) {
948            Value * const bufferSize = b->getBufferedSize(name);
949            Value * const prior = initiallyProducedItemCount[i];
950            Value * const offset = b->CreateURem(prior, bufferSize);
951            Value * const produced = b->getNonDeferredProducedItemCount(output);
952            Value * const buffered = b->CreateAdd(offset, b->CreateSub(produced, prior));
953            BasicBlock * const copyBack = b->CreateBasicBlock(name + "MirrorOverflow");
954            BasicBlock * const done = b->CreateBasicBlock(name + "MirrorOverflowDone");
955            b->CreateCondBr(b->CreateICmpUGT(buffered, bufferSize), copyBack, done);
956            b->SetInsertPoint(copyBack);
957            b->CreateCopyToOverflow(name);
958            b->CreateBr(done);
959            b->SetInsertPoint(done);
960        }
961    }
962    #endif
963
964    return b->getTerminationSignal();
965}
966
967/** ------------------------------------------------------------------------------------------------------------- *
968 * @brief runKernel
969 ** ------------------------------------------------------------------------------------------------------------- */
970void PipelineGenerator::runKernel(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel) {
971
972    const auto & inputs = kernel->getStreamInputs();
973    const auto n = inputs.size();
974    std::vector<Value *> arguments(n + 2);
975
976    Value * isFinal = noMore;
977
978    Value * requiresTemporaryBuffers = nullptr;
979
980    for (unsigned i = 0; i < n; ++i) {
981        const Binding & input = inputs[i];
982        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
983
984        const auto p = producedItemCount.find(buffer);
985        assert (p != producedItemCount.end());
986        Value * const produced = p->second;
987
988        const ProcessingRate & rate = input.getRate();
989        if (rate.isPopCount()) {
990            arguments[i + 2] = produced;
991        } else {
992            const unsigned strideSize = ceiling(kernel->getUpperBound(rate) * kernel->getStride());
993            Value * const processed = b->getNonDeferredProcessedItemCount(input);
994            Value * const limit = b->CreateAdd(processed, b->getSize(strideSize * codegen::SegmentSize));
995            Value * const hasPartial = b->CreateICmpULT(produced, limit);
996            arguments[i + 2] = b->CreateSelect(hasPartial, produced, limit);
997            isFinal = b->CreateAnd(isFinal, hasPartial);
998            if (!temporaryBufferPtrs.empty() && temporaryBufferPtrs[i]) {
999                if (requiresTemporaryBuffers) {
1000                    requiresTemporaryBuffers = b->CreateOr(requiresTemporaryBuffers, hasPartial);
1001                } else {
1002                    requiresTemporaryBuffers = hasPartial;
1003                }
1004            }
1005        }
1006    }
1007
1008    // TODO: pass in a strideNo for fixed rate streams to allow the kernel to calculate the current avail,
1009    // processed, and produced counts
1010
1011    arguments[0] = kernel->getInstance();
1012    arguments[1] = isFinal;
1013
1014    if (requiresTemporaryBuffers) {
1015        allocateTemporaryBuffers(b, kernel, requiresTemporaryBuffers);
1016    }
1017
1018    b->createDoSegmentCall(arguments);
1019
1020    if (requiresTemporaryBuffers) {
1021        freeTemporaryBuffers(b, kernel, requiresTemporaryBuffers);
1022    }
1023}
1024
1025
1026/** ------------------------------------------------------------------------------------------------------------- *
1027 * @brief applyOutputBufferExpansions
1028 ** ------------------------------------------------------------------------------------------------------------- */
1029void PipelineGenerator::applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel) {
1030    const auto & outputs = kernel->getStreamSetOutputBuffers();
1031    for (unsigned i = 0; i < outputs.size(); i++) {
1032        if (isa<DynamicBuffer>(outputs[i])) {
1033
1034
1035            const auto baseSize = ceiling(kernel->getUpperBound(kernel->getStreamOutput(i).getRate()) * kernel->getStride() * codegen::SegmentSize);
1036            if (LLVM_LIKELY(baseSize > 0)) {
1037
1038                const auto & name = kernel->getStreamOutput(i).getName();
1039
1040                BasicBlock * const doExpand = b->CreateBasicBlock(name + "Expand");
1041                BasicBlock * const nextBlock = b->GetInsertBlock()->getNextNode();
1042                doExpand->moveAfter(b->GetInsertBlock());
1043                BasicBlock * const bufferReady = b->CreateBasicBlock(name + "Ready");
1044                bufferReady->moveAfter(doExpand);
1045                if (nextBlock) nextBlock->moveAfter(bufferReady);
1046
1047                Value * const produced = b->getProducedItemCount(name);
1048                Value * const consumed = b->getConsumedItemCount(name);
1049                Value * const required = b->CreateAdd(b->CreateSub(produced, consumed), b->getSize(2 * baseSize));
1050
1051                b->CreateCondBr(b->CreateICmpUGT(required, b->getBufferedSize(name)), doExpand, bufferReady);
1052                b->SetInsertPoint(doExpand);
1053
1054                b->doubleCapacity(name);
1055                // Ensure that capacity is sufficient by successive doubling, if necessary.
1056                b->CreateCondBr(b->CreateICmpUGT(required, b->getBufferedSize(name)), doExpand, bufferReady);
1057
1058                b->SetInsertPoint(bufferReady);
1059
1060            }
1061        }
1062    }
1063}
1064
1065/** ------------------------------------------------------------------------------------------------------------- *
1066 * @brief allocateTemporaryBufferPointerArray
1067 ** ------------------------------------------------------------------------------------------------------------- */
1068void PipelineGenerator::allocateTemporaryBufferPointerArray(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel) {
1069    // TODO: whenever two kernels are using the same "unsafe" buffer, they'll both create and destroy their own
1070    // temporary copies of it. This could be optimized to have it done at production and deleted after the last
1071    // consuming kernel utilizes it.
1072    temporaryBufferPtrs.clear();
1073
1074    const auto & inputs = kernel->getStreamInputs();
1075    for (unsigned i = 0; i < inputs.size(); ++i) {
1076        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
1077        if (LLVM_UNLIKELY(isPotentiallyUnsafeInputBuffer(buffer))) {
1078            if (temporaryBufferPtrs.empty()) {
1079                temporaryBufferPtrs.resize(inputs.size(), nullptr);
1080            }
1081            assert (temporaryBufferPtrs[i] == nullptr);
1082            PointerType * const ptrTy = buffer->getStreamSetPointerType();
1083            StructType * const structTy = StructType::create(b->getContext(), {ptrTy, ptrTy});
1084            AllocaInst * const tempBuffer = b->CreateAlloca(structTy);
1085            b->CreateStore(Constant::getNullValue(structTy), tempBuffer);
1086            temporaryBufferPtrs[i] = tempBuffer;
1087        }
1088    }
1089
1090}
1091
1092/** ------------------------------------------------------------------------------------------------------------- *
1093 * @brief isPotentiallyUnsafeInputBuffer
1094 *
1095 * We cannot trust that the final block of any single stream source or external buffer can be safely read past its
1096 * final item since kernels may attempt to load aligned blocks of data, leading to potentially-intermittent
1097 * segmentation faults, depending on whether the access crosses a page boundary.
1098 ** ------------------------------------------------------------------------------------------------------------- */
1099inline bool PipelineGenerator::isPotentiallyUnsafeInputBuffer(const StreamSetBuffer * const buffer) {
1100    return isa<SourceBuffer>(buffer) && buffer->getNumOfStreams() == 1;
1101}
1102
1103/** ------------------------------------------------------------------------------------------------------------- *
1104 * @brief allocateTemporaryBuffers
1105 ** ------------------------------------------------------------------------------------------------------------- */
1106void PipelineGenerator::allocateTemporaryBuffers(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel, Value * const requiresTemporaryBuffers) {
1107    ConstantInt * const ZERO = b->getInt32(0);
1108    ConstantInt * const ONE = b->getInt32(1);
1109    BasicBlock * const allocateBuffers = b->CreateBasicBlock();
1110    BasicBlock * const runKernel = b->CreateBasicBlock();
1111    b->CreateUnlikelyCondBr(requiresTemporaryBuffers, allocateBuffers, runKernel);
1112
1113    b->SetInsertPoint(allocateBuffers);
1114    for (unsigned i = 0; i < temporaryBufferPtrs.size(); ++i) {
1115        if (temporaryBufferPtrs[i]) {
1116            const Binding & input = kernel->getStreamInput(i);
1117            const auto p = producedItemCount.find(kernel->getStreamSetInputBuffer(i));
1118            assert (p != producedItemCount.end());
1119            Value * const produced = p->second;
1120            Value * const processed = b->getProcessedItemCount(input.getName());
1121            Value * const unprocessed = b->CreateSub(produced, processed);
1122            const auto temp = b->AcquireTemporaryBuffer(input.getName(), processed, unprocessed);
1123            b->CreateStore(temp.first, b->CreateGEP(temporaryBufferPtrs[i], { ZERO, ZERO }));
1124            b->CreateStore(temp.second, b->CreateGEP(temporaryBufferPtrs[i], { ZERO, ONE }));
1125        }
1126    }
1127    b->CreateBr(runKernel);
1128
1129    b->SetInsertPoint(runKernel);
1130}
1131
1132/** ------------------------------------------------------------------------------------------------------------- *
1133 * @brief freeTemporaryBuffers
1134 ** ------------------------------------------------------------------------------------------------------------- */
1135void PipelineGenerator::freeTemporaryBuffers(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel, Value * const requiresTemporaryBuffers) {
1136    ConstantInt * const ZERO = b->getInt32(0);
1137    ConstantInt * const ONE = b->getInt32(1);
1138
1139    BasicBlock * const freeBuffers = b->CreateBasicBlock();
1140    BasicBlock * const finishedKernel = b->CreateBasicBlock();
1141    b->CreateUnlikelyCondBr(requiresTemporaryBuffers, freeBuffers, finishedKernel);
1142    b->SetInsertPoint(freeBuffers);
1143    for (unsigned i = 0; i < temporaryBufferPtrs.size(); ++i) {
1144        if (temporaryBufferPtrs[i]) {
1145            Value * const originalBuffer = b->CreateLoad(b->CreateGEP(temporaryBufferPtrs[i], { ZERO, ZERO }));
1146            const Binding & input = kernel->getStreamInput(i);
1147            b->setBaseAddress(input.getName(), originalBuffer);
1148            Value * const temporaryBuffer = b->CreateLoad(b->CreateGEP(temporaryBufferPtrs[i], { ZERO, ONE }));
1149            b->CreateFree(temporaryBuffer);
1150        }
1151    }
1152    b->CreateBr(finishedKernel);
1153
1154    b->SetInsertPoint(finishedKernel);
1155}
1156
1157/** ------------------------------------------------------------------------------------------------------------- *
1158 * @brief getFullyProcessedItemCount
1159 ** ------------------------------------------------------------------------------------------------------------- */
1160inline Value * PipelineGenerator::getFullyProcessedItemCount(const std::unique_ptr<KernelBuilder> & b, const Binding & input, Value * const isFinal) const {
1161    Value * const processed = b->getProcessedItemCount(input.getName());
1162    if (LLVM_UNLIKELY(input.hasAttribute(kernel::Attribute::KindId::BlockSize))) {
1163        // If the input rate has a block size attribute then --- for the purpose of determining how many
1164        // items have been consumed --- we consider a stream set to be fully processed when an entire
1165        // block (stride?) has been processed.
1166        Constant * const BLOCK_WIDTH = b->getSize(b->getBitBlockWidth());
1167        Value * const partial = b->CreateAnd(processed, ConstantExpr::getNeg(BLOCK_WIDTH));
1168        return b->CreateSelect(isFinal, processed, partial);
1169    }
1170    return processed;
1171}
1172
1173/** ------------------------------------------------------------------------------------------------------------- *
1174 * @brief finalize
1175 ** ------------------------------------------------------------------------------------------------------------- */
1176Value * PipelineGenerator::finalize(const std::unique_ptr<KernelBuilder> & b) {
1177    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
1178        ConstantInt * const ZERO = b->getSize(0);
1179        ConstantInt * const ONE = b->getSize(1);
1180        ConstantInt * const TWO = b->getSize(2);
1181        Value * const count = b->CreateSelect(anyProgress, ZERO, b->CreateAdd(deadLockCounter, ONE));
1182        b->CreateAssert(b->CreateICmpNE(count, TWO), "Dead lock detected: pipeline could not progress after two iterations");
1183        deadLockCounter->addIncoming(count, b->GetInsertBlock());
1184    }
1185    // return whether each sink has terminated
1186    Value * final = b->getTrue();
1187    for (const auto u : make_iterator_range(vertices(dependencyGraph))) {
1188        if (out_degree(u, dependencyGraph) == 0) {
1189            final = b->CreateAnd(final, dependencyGraph[u]);
1190        }
1191    }
1192    return final;
1193}
1194
1195/** ------------------------------------------------------------------------------------------------------------- *
1196 * @brief printGraph
1197 ** ------------------------------------------------------------------------------------------------------------- */
1198void PipelineGenerator::printGraph(const ChannelGraph & G) const {
1199
1200    auto & out = errs();
1201
1202    out << "digraph G {\n";
1203    for (auto u : make_iterator_range(vertices(G))) {
1204        assert (G[u] == kernels[u]);
1205        if (in_degree(u, G) > 0 || out_degree(u, G) > 0) {
1206            out << "v" << u << " [label=\"" << u << ": "
1207                << G[u]->getName() << "\"];\n";
1208        }
1209    }
1210
1211    for (auto e : make_iterator_range(edges(G))) {
1212        const Channel & c = G[e];
1213        const auto s = source(e, G);
1214        const auto t = target(e, G);
1215
1216        out << "v" << s << " -> v" << t
1217            << " [label=\""
1218
1219
1220
1221            << c.ratio.numerator() << " / " << c.ratio.denominator()
1222            << "\"];\n";
1223    }
1224
1225    out << "}\n\n";
1226    out.flush();
1227
1228}
1229
1230/** ------------------------------------------------------------------------------------------------------------- *
1231 * @brief printGraph
1232 ** ------------------------------------------------------------------------------------------------------------- */
1233void PipelineGenerator::printGraph(const DependencyGraph & G) const {
1234
1235    auto & out = errs();
1236
1237    out << "digraph G {\n";
1238    for (auto u : make_iterator_range(vertices(G))) {
1239            out << "v" << u << " [label=\"" << u << ": "
1240                << kernels[u]->getName() << "\"];\n";
1241    }
1242
1243    for (auto e : make_iterator_range(edges(G))) {
1244        const auto s = source(e, G);
1245        const auto t = target(e, G);
1246        out << "v" << s << " -> v" << t << ";\n";
1247    }
1248
1249    out << "}\n\n";
1250    out.flush();
1251
1252}
Note: See TracBrowser for help on using the repository browser.