source: icGREP/icgrep-devel/icgrep/toolchain/pipeline.cpp @ 6005

Last change on this file since 6005 was 6005, checked in by nmedfort, 16 months ago

Fix for overly aggressive temporary buffer allocation.

File size: 55.8 KB
Line 
1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
6#include "pipeline.h"
7#include <toolchain/toolchain.h>
8#include <kernels/kernel.h>
9#include <kernels/streamset.h>
10#include <llvm/IR/Module.h>
11#include <boost/container/flat_set.hpp>
12#include <boost/container/flat_map.hpp>
13#include <boost/graph/adjacency_list.hpp>
14#include <boost/range/adaptor/reversed.hpp>
15#include <kernels/kernel_builder.h>
16#include <llvm/Support/raw_ostream.h>
17
18using namespace kernel;
19using namespace parabix;
20using namespace llvm;
21using namespace boost;
22using namespace boost::container;
23
24#define DISABLE_COPY_TO_OVERFLOW
25
26using Port = Kernel::Port;
27
28Function * makeThreadFunction(const std::unique_ptr<kernel::KernelBuilder> & b, const std::string & name) {
29    Function * const f = Function::Create(FunctionType::get(b->getVoidTy(), {b->getVoidPtrTy()}, false), Function::InternalLinkage, name, b->getModule());
30    f->setCallingConv(CallingConv::C);
31    f->arg_begin()->setName("state");
32    return f;
33}
34
35class PipelineGenerator {
36public:
37
38    template <typename Value>
39    using StreamSetBufferMap = flat_map<const StreamSetBuffer *, Value>;
40
41    using RateValue = ProcessingRate::RateValue;
42
43    PipelineGenerator(const std::vector<Kernel *> & kernels)
44    : kernels(kernels)
45    , terminated(nullptr)
46    , noMore(nullptr)
47    , requiresTemporaryBuffers(nullptr)
48    , deadLockCounter(nullptr)
49    , anyProgress(nullptr)
50    , madeProgress(nullptr) {
51
52    }
53
54    struct Channel {
55        Channel() = default;
56        Channel(const RateValue & ratio, const StreamSetBuffer * const buffer = nullptr, const unsigned operand = 0)
57        : ratio(ratio), buffer(buffer), operand(operand) { }
58
59        RateValue               ratio;
60        const StreamSetBuffer * buffer;
61        unsigned                operand;
62    };
63
64    using ChannelGraph = adjacency_list<vecS, vecS, bidirectionalS, const Kernel *, Channel>;
65
66    using DependencyGraph = adjacency_list<hash_setS, vecS, bidirectionalS, Value *>;
67
68    using KernelMap = flat_map<const Kernel *, unsigned>;
69
70    void initialize(const std::unique_ptr<KernelBuilder> & b, BasicBlock * const entryBlock);
71
72    void execute(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
73
74    Value * finalize(const std::unique_ptr<KernelBuilder> & b);
75
76protected:
77
78    ChannelGraph makeInputGraph() const;
79
80    ChannelGraph makeOutputGraph() const;
81
82    DependencyGraph makeDependencyGraph() const;
83
84    template<class VertexList>
85    ChannelGraph pruneGraph(ChannelGraph && G, VertexList && V) const;
86
87    bool isPotentiallyUnsafeInputBuffer(const StreamSetBuffer * const buffer);
88
89    void allocateTemporaryBufferPointerArray(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel);
90
91    void checkIfAllInputKernelsHaveFinished(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
92
93    void checkAvailableInputData(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
94
95    void checkAvailableOutputSpace(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
96
97    Value * getStrideLength(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel, const Binding & binding);
98
99    Value * callKernel(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
100
101    void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel *kernel);
102
103    void runKernel(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel);
104
105    void allocateTemporaryBuffers(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel);
106
107    void freeTemporaryBuffers(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel);
108
109    Value * getFullyProcessedItemCount(const std::unique_ptr<KernelBuilder> & b, const Binding & binding, Value * const final) const;
110
111    void printGraph(const ChannelGraph & G) const;
112
113    void printGraph(const DependencyGraph & G) const;
114
115private:
116
117    const std::vector<Kernel *> &       kernels;
118    PHINode *                           terminated;
119
120    Value *                             noMore;
121    Value *                             requiresTemporaryBuffers;
122
123    DependencyGraph                     dependencyGraph;
124    ChannelGraph                        inputGraph;
125    ChannelGraph                        outputGraph;
126
127    std::vector<Value *>                temporaryBufferPtrs;
128
129    BasicBlock *                        kernelFinished;
130
131    PHINode *                           deadLockCounter;
132    Value *                             anyProgress;
133    PHINode *                           madeProgress;
134
135    StreamSetBufferMap<Value *>         producedItemCount;
136    StreamSetBufferMap<Value *>         consumedItemCount;
137    StreamSetBufferMap<const Kernel *>  lastConsumer;
138    flat_set<const StreamSetBuffer *>   isConsumedAtNonFixedRate;
139};
140
141/** ------------------------------------------------------------------------------------------------------------- *
142 * @brief generateSegmentParallelPipeline
143 *
144 * Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
145 * operating over an input stream set S, a segment-parallel implementation divides the input
146 * into segments and coordinates a set of T <= K threads to each process one segment at a time.
147 * Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
148 * fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
149 ** ------------------------------------------------------------------------------------------------------------- */
150void generateSegmentParallelPipeline(const std::unique_ptr<KernelBuilder> & b, const std::vector<Kernel *> & kernels) {
151
152    const unsigned n = kernels.size();
153    Module * const m = b->getModule();
154    IntegerType * const sizeTy = b->getSizeTy();
155    PointerType * const voidPtrTy = b->getVoidPtrTy();
156    Constant * nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
157    std::vector<Type *> structTypes;
158    codegen::BufferSegments = std::max(codegen::BufferSegments, codegen::ThreadNum);
159
160    Value * instance[n];
161    for (unsigned i = 0; i < n; ++i) {
162        instance[i] = kernels[i]->getInstance();
163        structTypes.push_back(instance[i]->getType());
164    }
165    StructType * const sharedStructType = StructType::get(m->getContext(), structTypes);
166    StructType * const threadStructType = StructType::get(m->getContext(), {sharedStructType->getPointerTo(), sizeTy});
167
168    const auto ip = b->saveIP();
169
170    Function * const threadFunc = makeThreadFunction(b, "segment");
171    auto args = threadFunc->arg_begin();
172
173    // -------------------------------------------------------------------------------------------------------------------------
174    // MAKE SEGMENT PARALLEL PIPELINE THREAD
175    // -------------------------------------------------------------------------------------------------------------------------
176
177     // Create the basic blocks for the thread function.
178    BasicBlock * entryBlock = BasicBlock::Create(b->getContext(), "entry", threadFunc);
179    b->SetInsertPoint(entryBlock);
180
181    Value * const threadStruct = b->CreateBitCast(&*(args), threadStructType->getPointerTo());
182
183    Value * const sharedStatePtr = b->CreateLoad(b->CreateGEP(threadStruct, {b->getInt32(0), b->getInt32(0)}));
184    for (unsigned k = 0; k < n; ++k) {
185        Value * ptr = b->CreateLoad(b->CreateGEP(sharedStatePtr, {b->getInt32(0), b->getInt32(k)}));
186        kernels[k]->setInstance(ptr);
187    }
188    Value * const segOffset = b->CreateLoad(b->CreateGEP(threadStruct, {b->getInt32(0), b->getInt32(1)}));
189
190    PipelineGenerator G(kernels);
191
192    BasicBlock * const segmentLoop = b->CreateBasicBlock("segmentLoop");
193    b->CreateBr(segmentLoop);
194
195    b->SetInsertPoint(segmentLoop);   
196    PHINode * const segNo = b->CreatePHI(b->getSizeTy(), 2, "segNo");
197    segNo->addIncoming(segOffset, entryBlock);
198    G.initialize(b, entryBlock);
199
200    Value * cycleCountStart = nullptr;
201    Value * cycleCountEnd = nullptr;
202    if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
203        cycleCountStart = b->CreateReadCycleCounter();
204    }
205
206    const bool serialize = codegen::DebugOptionIsSet(codegen::SerializeThreads);
207
208    for (unsigned k = 0; k < n; ++k) {
209
210        const Kernel * const kernel = kernels[k];
211
212        BasicBlock * const kernelWait = b->CreateBasicBlock(kernel->getName() + "Wait");
213        b->CreateBr(kernelWait);
214
215        b->SetInsertPoint(kernelWait);
216        b->setKernel(kernels[serialize ? (n - 1) : k]);
217        Value * const processedSegmentCount = b->acquireLogicalSegmentNo();
218        assert (processedSegmentCount->getType() == segNo->getType());
219        Value * const ready = b->CreateICmpEQ(segNo, processedSegmentCount);
220
221        BasicBlock * const kernelCheck = b->CreateBasicBlock(kernel->getName() + "Check");
222        b->CreateCondBr(ready, kernelCheck, kernelWait);
223
224        b->SetInsertPoint(kernelCheck);
225
226        G.execute(b, k);
227
228        b->releaseLogicalSegmentNo(b->CreateAdd(segNo, b->getSize(1)));
229
230        if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
231            cycleCountEnd = b->CreateReadCycleCounter();
232            Value * counterPtr = b->getCycleCountPtr();
233            b->CreateStore(b->CreateAdd(b->CreateLoad(counterPtr), b->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
234            cycleCountStart = cycleCountEnd;
235        }
236
237    }
238
239    segNo->addIncoming(b->CreateAdd(segNo, b->getSize(codegen::ThreadNum)), b->GetInsertBlock());
240    Value * const finished = G.finalize(b);
241    BasicBlock * const segmentExit = b->CreateBasicBlock("segmentExit");
242    b->CreateUnlikelyCondBr(finished, segmentExit, segmentLoop);
243
244    b->SetInsertPoint(segmentExit);
245    // only call pthread_exit() within spawned threads; otherwise it'll be equivalent to calling exit() within the process
246    BasicBlock * const exitThread = b->CreateBasicBlock("ExitThread");
247    BasicBlock * const exitFunction = b->CreateBasicBlock("ExitProcessFunction");   
248    b->CreateCondBr(b->CreateIsNull(segOffset), exitFunction, exitThread);
249    b->SetInsertPoint(exitThread);
250    b->CreatePThreadExitCall(nullVoidPtrVal);
251    b->CreateBr(exitFunction);   
252    b->SetInsertPoint(exitFunction);
253    b->CreateRetVoid();
254
255    // -------------------------------------------------------------------------------------------------------------------------
256    b->restoreIP(ip);
257
258    for (unsigned i = 0; i < n; ++i) {
259        kernels[i]->setInstance(instance[i]);
260    }
261
262    // -------------------------------------------------------------------------------------------------------------------------
263    // MAKE SEGMENT PARALLEL PIPELINE DRIVER
264    // -------------------------------------------------------------------------------------------------------------------------
265    const unsigned threads = codegen::ThreadNum - 1;
266    assert (codegen::ThreadNum > 0);
267    Type * const pthreadsTy = ArrayType::get(sizeTy, threads);
268    AllocaInst * const pthreads = b->CreateAlloca(pthreadsTy);
269    Value * threadIdPtr[threads];
270
271    for (unsigned i = 0; i < threads; ++i) {
272        threadIdPtr[i] = b->CreateGEP(pthreads, {b->getInt32(0), b->getInt32(i)});
273    }
274
275    for (unsigned i = 0; i < n; ++i) {
276        b->setKernel(kernels[i]);
277        b->releaseLogicalSegmentNo(b->getSize(0));
278    }
279
280    AllocaInst * const sharedStruct = b->CreateCacheAlignedAlloca(sharedStructType);
281    for (unsigned i = 0; i < n; ++i) {
282        Value * ptr = b->CreateGEP(sharedStruct, {b->getInt32(0), b->getInt32(i)});
283        b->CreateStore(kernels[i]->getInstance(), ptr);
284    }
285
286    // use the process thread to handle the initial segment function after spawning (n - 1) threads to handle the subsequent offsets
287    for (unsigned i = 0; i < threads; ++i) {
288        AllocaInst * const threadState = b->CreateAlloca(threadStructType);
289        b->CreateStore(sharedStruct, b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(0)}));
290        b->CreateStore(b->getSize(i + 1), b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(1)}));
291        b->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, threadFunc, threadState);
292    }
293
294    AllocaInst * const threadState = b->CreateAlloca(threadStructType);
295    b->CreateStore(sharedStruct, b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(0)}));
296    b->CreateStore(b->getSize(0), b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(1)}));
297    b->CreateCall(threadFunc, b->CreatePointerCast(threadState, voidPtrTy));
298
299    AllocaInst * const status = b->CreateAlloca(voidPtrTy);
300    for (unsigned i = 0; i < threads; ++i) {
301        Value * threadId = b->CreateLoad(threadIdPtr[i]);
302        b->CreatePThreadJoinCall(threadId, status);
303    }
304   
305    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
306        for (const Kernel * kernel : kernels) {
307            b->setKernel(kernel);
308            const auto & inputs = kernel->getStreamInputs();
309            const auto & outputs = kernel->getStreamOutputs();
310            Value * items = nullptr;
311            if (inputs.empty()) {
312                items = b->getProducedItemCount(outputs[0].getName());
313            } else {
314                items = b->getProcessedItemCount(inputs[0].getName());
315            }
316            Value * fItems = b->CreateUIToFP(items, b->getDoubleTy());
317            Value * cycles = b->CreateLoad(b->getCycleCountPtr());
318            Value * fCycles = b->CreateUIToFP(cycles, b->getDoubleTy());
319            const auto formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
320            Value * stringPtr = b->CreatePointerCast(b->GetString(formatString), b->getInt8PtrTy());
321            b->CreateCall(b->GetDprintf(), {b->getInt32(2), stringPtr, fItems, fCycles, b->CreateFDiv(fCycles, fItems)});
322        }
323    }
324   
325}
326
327
328/** ------------------------------------------------------------------------------------------------------------- *
329 * @brief generatePipelineLoop
330 ** ------------------------------------------------------------------------------------------------------------- */
331void generatePipelineLoop(const std::unique_ptr<KernelBuilder> & b, const std::vector<Kernel *> & kernels) {
332
333    // Create the basic blocks for the loop.
334    BasicBlock * const entryBlock = b->GetInsertBlock();
335    BasicBlock * const pipelineLoop = b->CreateBasicBlock("pipelineLoop");
336
337    PipelineGenerator G(kernels);
338
339    b->CreateBr(pipelineLoop);
340
341    b->SetInsertPoint(pipelineLoop);   
342    PHINode * const segNo = b->CreatePHI(b->getSizeTy(), 2, "segNo");
343    segNo->addIncoming(b->getSize(0), entryBlock);
344    G.initialize(b, entryBlock);
345
346    Value * const nextSegNo = b->CreateAdd(segNo, b->getSize(1));
347
348    Value * cycleCountStart = nullptr;
349    Value * cycleCountEnd = nullptr;
350    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
351        cycleCountStart = b->CreateReadCycleCounter();
352    }
353
354    for (unsigned i = 0; i < kernels.size(); ++i) {
355
356        G.execute(b, i);
357
358        b->releaseLogicalSegmentNo(nextSegNo);
359
360        if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
361            cycleCountEnd = b->CreateReadCycleCounter();
362            Value * counterPtr = b->getCycleCountPtr();
363            b->CreateStore(b->CreateAdd(b->CreateLoad(counterPtr), b->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
364            cycleCountStart = cycleCountEnd;
365        }
366    }
367
368    segNo->addIncoming(nextSegNo, b->GetInsertBlock());
369    BasicBlock * const pipelineExit = b->CreateBasicBlock("pipelineExit");
370    Value * const finished = G.finalize(b);
371    b->CreateCondBr(finished, pipelineExit, pipelineLoop);
372
373    b->SetInsertPoint(pipelineExit);
374    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
375        for (unsigned k = 0; k < kernels.size(); k++) {
376            auto & kernel = kernels[k];
377            b->setKernel(kernel);
378            const auto & inputs = kernel->getStreamInputs();
379            const auto & outputs = kernel->getStreamOutputs();
380            Value * items = nullptr;
381            if (inputs.empty()) {
382                items = b->getProducedItemCount(outputs[0].getName());
383            } else {
384                items = b->getProcessedItemCount(inputs[0].getName());
385            }
386            Value * fItems = b->CreateUIToFP(items, b->getDoubleTy());
387            Value * cycles = b->CreateLoad(b->getCycleCountPtr());
388            Value * fCycles = b->CreateUIToFP(cycles, b->getDoubleTy());
389            const auto formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
390            Value * stringPtr = b->CreatePointerCast(b->GetString(formatString), b->getInt8PtrTy());
391            b->CreateCall(b->GetDprintf(), {b->getInt32(2), stringPtr, fItems, fCycles, b->CreateFDiv(fCycles, fItems)});
392        }
393    }
394
395}
396
397/** ------------------------------------------------------------------------------------------------------------- *
398 * @brief initialize
399 ** ------------------------------------------------------------------------------------------------------------- */
400void PipelineGenerator::initialize(const std::unique_ptr<KernelBuilder> & b, BasicBlock * const entryBlock) {
401
402    dependencyGraph = makeDependencyGraph();
403    inputGraph = makeInputGraph();
404    outputGraph = makeOutputGraph();
405
406    for (Kernel * const kernel : kernels) {
407        const auto & inputs = kernel->getStreamInputs();
408        for (unsigned i = 0; i < inputs.size(); ++i) {
409            const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
410            if (kernel->requiresCopyBack(inputs[i]) && !buffer->isUnbounded()) {
411                if (LLVM_LIKELY(buffer->supportsCopyBack())) {
412                    isConsumedAtNonFixedRate.insert(buffer);
413                } else {
414    //                std::string tmp;
415    //                raw_string_ostream out(tmp);
416    //                out << kernel->getName() << " : " << name << " must have an overflow";
417    //                report_fatal_error(out.str());
418                }
419            }
420        }
421        // if this kernel consumes this buffer, update the last consumer
422        for (const StreamSetBuffer * const buffer : kernel->getStreamSetInputBuffers()) {
423            auto f = lastConsumer.find(buffer);
424            assert (f != lastConsumer.end());
425            f->second = kernel;
426        }
427        // incase some output is never consumed, make the kernel that produced it the initial "last consumer"
428        for (const StreamSetBuffer * const buffer : kernel->getStreamSetOutputBuffers()) {
429            assert (buffer->getProducer() == kernel);
430            assert (lastConsumer.count(buffer) == 0);
431            lastConsumer.emplace(buffer, kernel);
432        }
433    }
434
435    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
436        deadLockCounter = b->CreatePHI(b->getSizeTy(), 2, "deadLockCounter");
437        deadLockCounter->addIncoming(b->getSize(0), entryBlock);
438        anyProgress = b->getFalse();
439    }
440
441}
442
443/** ------------------------------------------------------------------------------------------------------------- *
444 * @brief makeTerminationGraph
445 *
446 * The input graph models whether a kernel could *consume* more data than may be produced by a preceeding kernel.
447 ** ------------------------------------------------------------------------------------------------------------- */
448PipelineGenerator::DependencyGraph PipelineGenerator::makeDependencyGraph() const {
449    const auto n = kernels.size();
450    DependencyGraph G(n);
451    KernelMap M;
452    // construct a kernel dependency graph
453    for (unsigned v = 0; v < n; ++v) {
454        const Kernel * const kernel = kernels[v];       
455        for (const StreamSetBuffer * buf : kernel->getStreamSetInputBuffers()) {
456            const auto f = M.find(buf->getProducer()); assert (f != M.end());
457            add_edge(f->second, v, G);
458        }
459        M.emplace(kernel, v);
460    }
461    // generate a transitive closure
462    for (unsigned u = 0; u < n; ++u) {
463        for (auto e : make_iterator_range(in_edges(u, G))) {
464            const auto s = source(e, G);
465            for (auto f : make_iterator_range(out_edges(u, G))) {
466                add_edge(s, target(f, G), G);
467            }
468        }
469    }
470    // then take the transitive reduction
471    std::vector<unsigned> sources;
472    for (unsigned u = n; u-- > 0; ) {
473        if (in_degree(u, G) > 0 && out_degree(u, G) > 0) {
474            for (auto e : make_iterator_range(in_edges(u, G))) {
475                sources.push_back(source(e, G));
476            }
477            std::sort(sources.begin(), sources.end());
478            for (auto e : make_iterator_range(out_edges(u, G))) {
479                remove_in_edge_if(target(e, G), [&G, &sources](const DependencyGraph::edge_descriptor f) {
480                    return std::binary_search(sources.begin(), sources.end(), source(f, G));
481                }, G);
482            }
483            sources.clear();
484        }
485    }
486    return G;
487}
488
489/** ------------------------------------------------------------------------------------------------------------- *
490 * @brief makeInputGraph
491 *
492 * The input graph models whether a kernel could *consume* more data than may be produced by a preceeding kernel.
493 ** ------------------------------------------------------------------------------------------------------------- */
494PipelineGenerator::ChannelGraph PipelineGenerator::makeInputGraph() const {
495    const auto n = kernels.size();
496    ChannelGraph G(n);
497    KernelMap M;
498    for (unsigned v = 0; v < n; ++v) {
499
500        const Kernel * const consumer = kernels[v];
501        G[v] = consumer;
502        M.emplace(consumer, v);
503
504        const auto & inputs = consumer->getStreamInputs();
505        for (unsigned i = 0; i < inputs.size(); ++i) {
506
507            const Binding & input = inputs[i];
508            auto ub_in = consumer->getUpperBound(input.getRate()) * consumer->getStride() * codegen::SegmentSize; assert (ub_in > 0);
509            if (input.hasLookahead()) {
510                ub_in += input.getLookahead();
511            }
512
513            const auto buffer = consumer->getStreamSetInputBuffer(i);
514            const Kernel * const producer = buffer->getProducer();
515            const Binding & output = producer->getStreamOutput(buffer);
516            const auto lb_out = producer->getLowerBound(output.getRate()) * producer->getStride() * codegen::SegmentSize;
517
518            const auto min_oi_ratio = lb_out / ub_in;
519            const auto f = M.find(producer); assert (f != M.end());
520            const auto u = f->second;
521            // If we have multiple inputs from the same kernel, we only need to consider the "slowest" one
522            bool slowest = true;
523            for (const auto e : make_iterator_range(in_edges(v, G))) {
524                if (source(e, G) == u) {
525                    const Channel & p = G[e];
526                    if (min_oi_ratio > p.ratio) {
527                        slowest = false;
528                    } else if (min_oi_ratio < p.ratio) {
529                        clear_in_edges(v, G);
530                    }
531                    break;
532                }
533            }
534            if (slowest) {
535                add_edge(u, v, Channel{min_oi_ratio, buffer, i}, G);
536            }
537        }
538    }
539
540    return pruneGraph(std::move(G), make_iterator_range(vertices(G)));
541}
542
543/** ------------------------------------------------------------------------------------------------------------- *
544 * @brief makeOutputGraph
545 *
546 * The output graph models whether a kernel could *produce* more data than may be consumed by a subsequent kernel.
547 ** ------------------------------------------------------------------------------------------------------------- */
548PipelineGenerator::ChannelGraph PipelineGenerator::makeOutputGraph() const {
549    const auto n = kernels.size();
550    ChannelGraph G(n);
551    KernelMap M;
552    for (unsigned v = 0; v < n; ++v) {
553        const Kernel * const consumer = kernels[v];
554        G[v] = consumer;
555        M.emplace(consumer, v);
556
557        const auto & inputs = consumer->getStreamInputs();
558        for (unsigned i = 0; i < inputs.size(); ++i) {
559            const auto buffer = consumer->getStreamSetInputBuffer(i);
560            if (isa<SourceBuffer>(buffer)) continue;
561            const Kernel * const producer = buffer->getProducer();
562            assert (consumer != producer);
563            const Binding & output = producer->getStreamOutput(buffer);
564            auto ub_out = producer->getUpperBound(output.getRate()) * producer->getStride() * codegen::SegmentSize;
565            if (ub_out > 0) { // unknown output rates are handled by reallocating their buffers
566                const Binding & input = inputs[i];
567                if (input.hasLookahead()) {
568                    const auto la = input.getLookahead();
569                    if (LLVM_UNLIKELY(ub_out <= la)) {
570                        llvm::report_fatal_error("lookahead exceeds segment size");
571                    }
572                    ub_out += la;
573                }
574                const auto lb_in = consumer->getLowerBound(input.getRate()) * consumer->getStride() * codegen::SegmentSize;
575                const auto min_io_ratio = lb_in / ub_out;
576                const auto f = M.find(producer); assert (f != M.end());
577                const auto u = f->second;
578                assert (v != u);
579                assert (G[u] == producer);
580                // If we have multiple inputs from the same kernel, we only need to consider the "fastest" one
581                bool fastest = true;
582                for (const auto e : make_iterator_range(in_edges(v, G))) {
583                    if (source(e, G) == u) {
584                        const Channel & p = G[e];
585                        if (min_io_ratio > p.ratio) {
586                            fastest = false;
587                        } else if (min_io_ratio < p.ratio) {
588                            clear_in_edges(v, G);
589                        }
590                        break;
591                    }
592                }
593                if (fastest) {
594                    add_edge(v, u, Channel{min_io_ratio, buffer, i}, G);
595                }
596            }
597        }
598    }
599
600    return pruneGraph(std::move(G), boost::adaptors::reverse(make_iterator_range(vertices(G))));
601}
602
603/** ------------------------------------------------------------------------------------------------------------- *
604 * @brief pruneGraph
605 ** ------------------------------------------------------------------------------------------------------------- */
606template<class VertexList>
607inline PipelineGenerator::ChannelGraph PipelineGenerator::pruneGraph(ChannelGraph && G, VertexList && V) const {
608    // Take a transitive closure of G but whenever we attempt to insert an edge into the closure
609    // that already exists, check instead whether the rate of our proposed edge is <= the existing
610    // edge's rate. If so, the data availability/consumption is transitively guaranteed.
611    for (const auto u : V) {
612        for (auto ei : make_iterator_range(in_edges(u, G))) {
613            const auto v = source(ei, G);
614            const Channel & ci = G[ei];
615            for (auto ej : make_iterator_range(out_edges(u, G))) {
616                const auto w = target(ej, G);
617                // ci.rate = BOUND_RATIO(u, v) * (STRIDE(u) / STRIDE(v))
618                const auto scaling = RateValue(G[v]->getStride(), G[w]->getStride());
619                const auto rate = ci.ratio * scaling;
620                bool insert = true;
621                for (auto ek : make_iterator_range(in_edges(w, G))) {
622                    // do we already have a vw edge?
623                    if (source(ek, G) == v) {
624                        Channel & ck = G[ek];
625                        if (rate <= ck.ratio) {
626                            ck.buffer = nullptr;
627                        }
628                        insert = false;
629                    }
630                }
631                if (insert) {
632                    add_edge(v, w, Channel{rate}, G);
633                }
634            }
635        }
636    }
637
638    // remove any closure edges from G
639    remove_edge_if([&G](const ChannelGraph::edge_descriptor e) { return G[e].buffer == nullptr; }, G);
640
641    for (const auto u : V) {
642        if (in_degree(u, G) == 0) {
643            // If we do not need to check any of its inputs, we can avoid testing any output of a kernel
644            // with a rate ratio of at least 1
645            remove_out_edge_if(u, [&G](const ChannelGraph::edge_descriptor e) {
646                return G[e].ratio >= RateValue{1, 1};
647            }, G);
648        }
649    }
650
651    return G;
652}
653
654/** ------------------------------------------------------------------------------------------------------------- *
655 * @brief executeKernel
656 ** ------------------------------------------------------------------------------------------------------------- */
657void PipelineGenerator::execute(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
658
659    const Kernel * const kernel = kernels[index];
660    b->setKernel(kernel);
661
662    const auto & inputs = kernel->getStreamInputs();
663    const auto & outputs = kernel->getStreamOutputs();
664
665    BasicBlock * const kernelEntry = b->GetInsertBlock();
666    BasicBlock * const kernelCode = b->CreateBasicBlock(kernel->getName());
667    kernelFinished = b->CreateBasicBlock(kernel->getName() + "Finished");
668    BasicBlock * const kernelExit = b->CreateBasicBlock(kernel->getName() + "Exit");
669
670    b->CreateUnlikelyCondBr(b->getTerminationSignal(), kernelExit, kernelCode);
671
672    b->SetInsertPoint(kernelFinished);
673    terminated = b->CreatePHI(b->getInt1Ty(), 2);
674    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
675        madeProgress = b->CreatePHI(b->getInt1Ty(), 3);
676    }
677    b->SetInsertPoint(kernelExit);
678    PHINode * const isFinal = b->CreatePHI(b->getInt1Ty(), 2, kernel->getName() + "_isFinal");
679    isFinal->addIncoming(b->getTrue(), kernelEntry);
680    isFinal->addIncoming(terminated, kernelFinished);
681
682    PHINode * progress = nullptr;
683    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
684        progress = b->CreatePHI(b->getInt1Ty(), 2, kernel->getName() + "_anyProgress");
685        progress->addIncoming(anyProgress, kernelEntry);
686        progress->addIncoming(madeProgress, kernelFinished);
687    }
688
689    // Since it is possible that a sole consumer of some stream could terminate early, set the
690    // initial consumed amount to the amount produced in this iteration.
691
692    // First determine the priorConsumedItemCounts, making sure
693    // that none of them are previous input buffers of this kernel!
694    std::vector<Value *> priorConsumedItemCount(inputs.size());
695
696    for (unsigned i = 0; i < inputs.size(); ++i) {
697        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
698        auto c = consumedItemCount.find(buffer);
699        if (c == consumedItemCount.end()) {
700            const auto p = producedItemCount.find(buffer);
701            assert (p != producedItemCount.end());
702            priorConsumedItemCount[i] = p->second;
703        } else {
704            priorConsumedItemCount[i] = c->second;
705        }
706    }
707
708    std::vector<PHINode *> consumedItemCountPhi(inputs.size());
709
710    for (unsigned i = 0; i < inputs.size(); ++i) {
711        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
712        PHINode * const consumedPhi = b->CreatePHI(b->getSizeTy(), 2);
713        auto c = consumedItemCount.find(buffer);
714        if (c == consumedItemCount.end()) {
715            consumedItemCount.emplace(buffer, consumedPhi);
716        } else {
717            c->second = consumedPhi;
718        }
719        consumedPhi->addIncoming(priorConsumedItemCount[i], kernelEntry);
720        consumedItemCountPhi[i] = consumedPhi;
721    }
722
723    b->SetInsertPoint(kernelCode);
724
725    Value * const finalStride = callKernel(b, index);
726
727    // TODO: add in some checks to verify the kernel actually adheres to its rate
728
729    BasicBlock * const kernelCodeExit = b->GetInsertBlock();
730    terminated->addIncoming(finalStride, kernelCodeExit);
731    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
732        madeProgress->addIncoming(b->getTrue(), kernelCodeExit);
733        anyProgress = progress;
734    }
735    b->CreateBr(kernelFinished);
736
737    b->SetInsertPoint(kernelFinished);
738
739    // update the consumed item counts
740    for (unsigned i = 0; i < inputs.size(); ++i) {
741        const Binding & input = inputs[i];
742        Value * const fullyProcessed = getFullyProcessedItemCount(b, input, terminated);
743        Value * const consumed = b->CreateUMin(priorConsumedItemCount[i], fullyProcessed);
744        consumedItemCountPhi[i]->addIncoming(consumed, kernelFinished);
745    }
746    b->CreateBr(kernelExit);
747
748    kernelExit->moveAfter(kernelFinished);
749
750    b->SetInsertPoint(kernelExit);
751
752    for (unsigned i = 0; i < outputs.size(); ++i) {
753        const Binding & output = outputs[i];
754        Value * const produced = b->getProducedItemCount(output.getName());
755        const StreamSetBuffer * const buffer = kernel->getStreamSetOutputBuffer(i);
756        // if some stream has no consumer, set the consumed item count to the produced item count
757        const auto c = lastConsumer.find(buffer);
758        assert (c != lastConsumer.end());
759        if (LLVM_UNLIKELY(c->second == kernel)) {
760            assert (buffer->getProducer() == kernel);
761            if (LLVM_UNLIKELY(output.getRate().isRelative())) {
762                continue;
763            }
764            b->setConsumedItemCount(output.getName(), produced);
765        } else { // otherwise record how many items were produced
766            assert (producedItemCount.count(buffer) == 0);
767            producedItemCount.emplace(buffer, produced);
768        }
769
770    }
771
772
773    // If this kernel is the last consumer of a input buffer, update the consumed count for that buffer.
774
775    // TODO: if all consumers process the data at a fixed rate, we can just set the consumed item count
776    // by the strideNo rather than tracking it.
777
778    // TODO: a kernel could take the same stream set for multiple arguments.
779
780    // TODO: if we can prove that this kernel cannot terminate before any prior consumer, this code
781    // could be executed in kernelFinished block.
782    for (unsigned i = 0; i < inputs.size(); ++i) {
783        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
784        const auto c = lastConsumer.find(buffer);
785        assert (c != lastConsumer.end());
786        if (LLVM_LIKELY(c->second == kernel)) {
787            Kernel * const producer = buffer->getProducer();
788            assert (producer != kernel);
789            const auto & output = producer->getStreamOutput(buffer);
790            if (output.getRate().isRelative()) continue;
791            b->setKernel(producer);
792            b->setConsumedItemCount(output.getName(), consumedItemCountPhi[i]);
793            b->setKernel(kernel);
794        }
795    }
796
797    dependencyGraph[index] = isFinal;
798}
799
800/** ------------------------------------------------------------------------------------------------------------- *
801 * @brief checkIfAllInputKernelsHaveFinished
802 ** ------------------------------------------------------------------------------------------------------------- */
803void PipelineGenerator::checkIfAllInputKernelsHaveFinished(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
804    const auto n = in_degree(index, dependencyGraph);
805    if (LLVM_UNLIKELY(n == 0)) {
806        noMore = b->getFalse();
807    } else {
808        noMore = b->getTrue();
809        for (auto e : make_iterator_range(in_edges(index, dependencyGraph))) {
810            const auto u = source(e, dependencyGraph);
811            Value * const finished = dependencyGraph[u];
812            noMore = b->CreateAnd(noMore, finished);
813        }
814    }
815}
816
817/** ------------------------------------------------------------------------------------------------------------- *
818 * @brief checkAvailableInputData
819 ** ------------------------------------------------------------------------------------------------------------- */
820void PipelineGenerator::checkAvailableInputData(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
821    const Kernel * const kernel = kernels[index];
822    b->setKernel(kernel);
823    requiresTemporaryBuffers = nullptr;
824    for (auto e : make_iterator_range(in_edges(index, inputGraph))) {
825        const Channel & c = inputGraph[e];
826        const Binding & input = kernel->getStreamInput(c.operand);
827
828        Value * requiredInput = getStrideLength(b, kernel, input);
829        if (LLVM_UNLIKELY(input.hasLookahead())) {
830            Constant * const lookahead = b->getSize(input.getLookahead());
831            requiredInput = b->CreateAdd(requiredInput, lookahead);
832        }
833        const auto p = producedItemCount.find(c.buffer);
834        assert (p != producedItemCount.end());
835        Value * const produced = p->second;
836        Value * const processed = b->getNonDeferredProcessedItemCount(input);
837        Value * const unprocessed = b->CreateSub(produced, processed);
838        Value * const hasEnough = b->CreateICmpUGE(unprocessed, requiredInput);
839        terminated->addIncoming(b->getFalse(), b->GetInsertBlock());
840        if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
841            madeProgress->addIncoming(anyProgress, b->GetInsertBlock());
842        }
843        const auto prefix = kernel->getName() + "_" + input.getName();
844        BasicBlock * const hasSufficientInput = b->CreateBasicBlock(prefix + "_hasSufficientInput");
845        Value * const check = b->CreateOr(hasEnough, noMore);
846        b->CreateLikelyCondBr(check, hasSufficientInput, kernelFinished);
847        b->SetInsertPoint(hasSufficientInput);
848        if (isPotentiallyUnsafeInputBuffer(c.buffer)) {
849            Value * const notEnough = b->CreateNot(hasEnough);
850            if (requiresTemporaryBuffers) {
851                requiresTemporaryBuffers = b->CreateOr(requiresTemporaryBuffers, notEnough);
852            } else {
853                requiresTemporaryBuffers = notEnough;
854            }
855        }
856    }
857}
858
859/** ------------------------------------------------------------------------------------------------------------- *
860 * @brief checkAvailableOutputSpace
861 ** ------------------------------------------------------------------------------------------------------------- */
862void PipelineGenerator::checkAvailableOutputSpace(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
863    const Kernel * const kernel = kernels[index];
864    b->setKernel(kernel);
865    for (auto e : make_iterator_range(in_edges(index, outputGraph))) {
866        const Channel & c = outputGraph[e];
867        assert (c.buffer->getProducer() == kernel);
868        const Binding & output = kernel->getStreamOutput(c.buffer);
869
870        Value * requiredSpace = getStrideLength(b, kernel, output);
871        const auto & name = output.getName();
872        Value * const produced = b->getNonDeferredProducedItemCount(output);
873        Value * const consumed = b->getConsumedItemCount(name);
874        Value * const unconsumed = b->CreateSub(produced, consumed);
875        requiredSpace = b->CreateAdd(requiredSpace, unconsumed);
876        Value * const capacity = b->getBufferedSize(name);
877        Value * const check = b->CreateICmpULE(requiredSpace, capacity);
878        terminated->addIncoming(b->getFalse(), b->GetInsertBlock());
879        if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
880            madeProgress->addIncoming(anyProgress, b->GetInsertBlock());
881        }
882        const auto prefix = kernel->getName() + "_" + name;
883        BasicBlock * const hasOutputSpace = b->CreateBasicBlock(prefix + "_hasOutputSpace");
884        b->CreateLikelyCondBr(check, hasOutputSpace, kernelFinished);
885        b->SetInsertPoint(hasOutputSpace);
886    }
887}
888
889/** ------------------------------------------------------------------------------------------------------------- *
890 * @brief getStrideLength
891 ** ------------------------------------------------------------------------------------------------------------- */
892Value * PipelineGenerator::getStrideLength(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel, const Binding & binding) {
893    Value * strideLength = nullptr;
894    const ProcessingRate & rate = binding.getRate();
895    if (rate.isPopCount() || rate.isNegatedPopCount()) {
896        Port refPort; unsigned refIndex;
897        std::tie(refPort, refIndex) = kernel->getStreamPort(rate.getReference());
898        const Binding & ref = kernel->getStreamInput(refIndex);
899        Value * markers = b->loadInputStreamBlock(ref.getName(), b->getSize(0));
900        if (rate.isNegatedPopCount()) {
901            markers = b->CreateNot(markers);
902        }
903        strideLength = b->bitblock_popcount(markers);
904    } else if (binding.hasAttribute(kernel::Attribute::KindId::AlwaysConsume)) {
905        const auto lb = kernel->getLowerBound(rate);
906        strideLength = b->getSize(std::max(ceiling(lb * kernel->getStride()), 1U));
907    } else {
908        const auto ub = kernel->getUpperBound(rate); assert (ub > 0);
909        strideLength = b->getSize(ceiling(ub * kernel->getStride()));
910    }
911    return strideLength;
912}
913
914/** ------------------------------------------------------------------------------------------------------------- *
915 * @brief callKernel
916 ** ------------------------------------------------------------------------------------------------------------- */
917Value * PipelineGenerator::callKernel(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
918
919    const Kernel * const kernel = kernels[index];
920    b->setKernel(kernel);
921
922    checkIfAllInputKernelsHaveFinished(b, index);
923
924    checkAvailableInputData(b, index);
925
926    checkAvailableOutputSpace(b, index);
927
928    applyOutputBufferExpansions(b, kernel);
929
930    #ifndef DISABLE_COPY_TO_OVERFLOW
931    // Store how many items we produced by this kernel in the prior iteration. We'll use this to determine when
932    // to mirror the first K segments
933    const auto & outputs = kernel->getStreamOutputs();
934    const auto m = outputs.size();
935
936    std::vector<Value *> initiallyProducedItemCount(m, nullptr);
937    for (unsigned i = 0; i < m; ++i) {
938        const Binding & output = outputs[i];
939        const auto & name = output.getName();
940        const StreamSetBuffer * const buffer = kernel->getOutputStreamSetBuffer(name);
941        if (isConsumedAtNonFixedRate.count(buffer)) {
942            initiallyProducedItemCount[i] = b->getProducedItemCount(name);
943        }
944    }
945    #endif
946
947    allocateTemporaryBufferPointerArray(b, kernel);
948
949    runKernel(b, kernel);
950
951    #ifndef DISABLE_COPY_TO_OVERFLOW
952    // For each buffer with an overflow region of K blocks, overwrite the overflow with the first K blocks of
953    // data to ensure that even if this stream is produced at a fixed rate but consumed at a bounded rate,
954    // every kernel has a consistent view of the stream data.
955    for (unsigned i = 0; i < m; ++i) {
956        const Binding & output = outputs[i];
957        const auto & name = output.getName();
958        if (initiallyProducedItemCount[i]) {
959            Value * const bufferSize = b->getBufferedSize(name);
960            Value * const prior = initiallyProducedItemCount[i];
961            Value * const offset = b->CreateURem(prior, bufferSize);
962            Value * const produced = b->getNonDeferredProducedItemCount(output);
963            Value * const buffered = b->CreateAdd(offset, b->CreateSub(produced, prior));
964            BasicBlock * const copyBack = b->CreateBasicBlock(name + "MirrorOverflow");
965            BasicBlock * const done = b->CreateBasicBlock(name + "MirrorOverflowDone");
966            b->CreateCondBr(b->CreateICmpUGT(buffered, bufferSize), copyBack, done);
967            b->SetInsertPoint(copyBack);
968            b->CreateCopyToOverflow(name);
969            b->CreateBr(done);
970            b->SetInsertPoint(done);
971        }
972    }
973    #endif
974
975    return b->getTerminationSignal();
976}
977
978/** ------------------------------------------------------------------------------------------------------------- *
979 * @brief runKernel
980 ** ------------------------------------------------------------------------------------------------------------- */
981void PipelineGenerator::runKernel(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel) {
982
983    const auto & inputs = kernel->getStreamInputs();
984    const auto n = inputs.size();
985    std::vector<Value *> arguments(n + 2);
986
987    Value * isFinal = noMore;
988
989    for (unsigned i = 0; i < n; ++i) {
990        const Binding & input = inputs[i];
991        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
992
993        const auto p = producedItemCount.find(buffer);
994        assert (p != producedItemCount.end());
995        Value * const produced = p->second;
996
997        const ProcessingRate & rate = input.getRate();
998        if (rate.isPopCount()) {
999            arguments[i + 2] = produced;
1000        } else {
1001            const unsigned strideSize = ceiling(kernel->getUpperBound(rate) * kernel->getStride());
1002            Value * const processed = b->getNonDeferredProcessedItemCount(input);
1003            Value * const limit = b->CreateAdd(processed, b->getSize(strideSize * codegen::SegmentSize));
1004            Value * const hasPartial = b->CreateICmpULT(produced, limit);
1005            arguments[i + 2] = b->CreateSelect(hasPartial, produced, limit);
1006            isFinal = b->CreateAnd(isFinal, hasPartial);
1007        }
1008    }
1009
1010    // TODO: pass in a strideNo for fixed rate streams to allow the kernel to calculate the current avail,
1011    // processed, and produced counts
1012
1013    arguments[0] = kernel->getInstance();
1014    arguments[1] = isFinal;
1015
1016    if (requiresTemporaryBuffers) {
1017        allocateTemporaryBuffers(b, kernel);
1018    }
1019
1020    b->createDoSegmentCall(arguments);
1021
1022    if (requiresTemporaryBuffers) {
1023        freeTemporaryBuffers(b, kernel);
1024    }
1025}
1026
1027
1028/** ------------------------------------------------------------------------------------------------------------- *
1029 * @brief applyOutputBufferExpansions
1030 ** ------------------------------------------------------------------------------------------------------------- */
1031void PipelineGenerator::applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel) {
1032    const auto & outputs = kernel->getStreamSetOutputBuffers();
1033    for (unsigned i = 0; i < outputs.size(); i++) {
1034        if (isa<DynamicBuffer>(outputs[i])) {
1035
1036            const auto baseSize = ceiling(kernel->getUpperBound(kernel->getStreamOutput(i).getRate()) * kernel->getStride() * codegen::SegmentSize);
1037            if (LLVM_LIKELY(baseSize > 0)) {
1038
1039                const auto & name = kernel->getStreamOutput(i).getName();
1040
1041                BasicBlock * const doExpand = b->CreateBasicBlock(name + "Expand");
1042                BasicBlock * const nextBlock = b->GetInsertBlock()->getNextNode();
1043                doExpand->moveAfter(b->GetInsertBlock());
1044                BasicBlock * const bufferReady = b->CreateBasicBlock(name + "Ready");
1045                bufferReady->moveAfter(doExpand);
1046                if (nextBlock) nextBlock->moveAfter(bufferReady);
1047
1048                Value * const produced = b->getProducedItemCount(name);
1049                Value * const consumed = b->getConsumedItemCount(name);
1050                Value * const required = b->CreateAdd(b->CreateSub(produced, consumed), b->getSize(2 * baseSize));
1051
1052                b->CreateCondBr(b->CreateICmpUGT(required, b->getBufferedSize(name)), doExpand, bufferReady);
1053                b->SetInsertPoint(doExpand);
1054
1055                b->doubleCapacity(name);
1056                // Ensure that capacity is sufficient by successive doubling, if necessary.
1057                b->CreateCondBr(b->CreateICmpUGT(required, b->getBufferedSize(name)), doExpand, bufferReady);
1058
1059                b->SetInsertPoint(bufferReady);
1060
1061            }
1062        }
1063    }
1064}
1065
1066/** ------------------------------------------------------------------------------------------------------------- *
1067 * @brief allocateTemporaryBufferPointerArray
1068 ** ------------------------------------------------------------------------------------------------------------- */
1069void PipelineGenerator::allocateTemporaryBufferPointerArray(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel) {
1070    // TODO: whenever two kernels are using the same "unsafe" buffer, they'll both create and destroy their own
1071    // temporary copies of it. This could be optimized to have it done at production and deleted after the last
1072    // consuming kernel utilizes it.
1073    if (requiresTemporaryBuffers) {
1074        const auto & inputs = kernel->getStreamInputs();
1075        temporaryBufferPtrs.resize(inputs.size());
1076        std::fill(temporaryBufferPtrs.begin(), temporaryBufferPtrs.end(), nullptr);
1077        for (unsigned i = 0; i < inputs.size(); ++i) {
1078            const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
1079            if (LLVM_UNLIKELY(isPotentiallyUnsafeInputBuffer(buffer))) {
1080                assert (temporaryBufferPtrs[i] == nullptr);
1081                PointerType * const ptrTy = buffer->getStreamSetPointerType();
1082                StructType * const structTy = StructType::create(b->getContext(), {ptrTy, ptrTy});
1083                AllocaInst * const tempBuffer = b->CreateAlloca(structTy);
1084                b->CreateStore(Constant::getNullValue(structTy), tempBuffer);
1085                temporaryBufferPtrs[i] = tempBuffer;
1086            }
1087        }
1088    }
1089}
1090
1091/** ------------------------------------------------------------------------------------------------------------- *
1092 * @brief isPotentiallyUnsafeInputBuffer
1093 *
1094 * We cannot trust that the final block of any single stream source or external buffer can be safely read past its
1095 * final item since kernels may attempt to load aligned blocks of data, leading to potentially-intermittent
1096 * segmentation faults, depending on whether the access crosses a page boundary.
1097 ** ------------------------------------------------------------------------------------------------------------- */
1098inline bool PipelineGenerator::isPotentiallyUnsafeInputBuffer(const StreamSetBuffer * const buffer) {
1099    return isa<SourceBuffer>(buffer) && buffer->getNumOfStreams() == 1;
1100}
1101
1102/** ------------------------------------------------------------------------------------------------------------- *
1103 * @brief allocateTemporaryBuffers
1104 ** ------------------------------------------------------------------------------------------------------------- */
1105void PipelineGenerator::allocateTemporaryBuffers(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel) {
1106    ConstantInt * const ZERO = b->getInt32(0);
1107    ConstantInt * const ONE = b->getInt32(1);
1108    BasicBlock * const allocateBuffers = b->CreateBasicBlock();
1109    BasicBlock * const runKernel = b->CreateBasicBlock();
1110    b->CreateUnlikelyCondBr(requiresTemporaryBuffers, allocateBuffers, runKernel);
1111
1112    b->SetInsertPoint(allocateBuffers);
1113    for (unsigned i = 0; i < temporaryBufferPtrs.size(); ++i) {
1114        if (temporaryBufferPtrs[i]) {
1115            const Binding & input = kernel->getStreamInput(i);
1116            const auto p = producedItemCount.find(kernel->getStreamSetInputBuffer(i));
1117            assert (p != producedItemCount.end());
1118            Value * const produced = p->second;
1119            Value * const processed = b->getProcessedItemCount(input.getName());
1120            Value * const unprocessed = b->CreateSub(produced, processed);
1121            const auto temp = b->AcquireTemporaryBuffer(input.getName(), processed, unprocessed);
1122            b->CreateStore(temp.first, b->CreateGEP(temporaryBufferPtrs[i], { ZERO, ZERO }));
1123            b->CreateStore(temp.second, b->CreateGEP(temporaryBufferPtrs[i], { ZERO, ONE }));
1124        }
1125    }
1126    b->CreateBr(runKernel);
1127
1128    b->SetInsertPoint(runKernel);
1129}
1130
1131/** ------------------------------------------------------------------------------------------------------------- *
1132 * @brief freeTemporaryBuffers
1133 ** ------------------------------------------------------------------------------------------------------------- */
1134void PipelineGenerator::freeTemporaryBuffers(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel) {
1135    ConstantInt * const ZERO = b->getInt32(0);
1136    ConstantInt * const ONE = b->getInt32(1);
1137
1138    BasicBlock * const freeBuffers = b->CreateBasicBlock();
1139    BasicBlock * const finishedKernel = b->CreateBasicBlock();
1140    b->CreateUnlikelyCondBr(requiresTemporaryBuffers, freeBuffers, finishedKernel);
1141    b->SetInsertPoint(freeBuffers);
1142    for (unsigned i = 0; i < temporaryBufferPtrs.size(); ++i) {
1143        if (temporaryBufferPtrs[i]) {
1144            Value * const originalBuffer = b->CreateLoad(b->CreateGEP(temporaryBufferPtrs[i], { ZERO, ZERO }));
1145            const Binding & input = kernel->getStreamInput(i);
1146            b->setBaseAddress(input.getName(), originalBuffer);
1147            Value * const temporaryBuffer = b->CreateLoad(b->CreateGEP(temporaryBufferPtrs[i], { ZERO, ONE }));
1148            b->CreateFree(temporaryBuffer);
1149        }
1150    }
1151    b->CreateBr(finishedKernel);
1152
1153    b->SetInsertPoint(finishedKernel);
1154}
1155
1156/** ------------------------------------------------------------------------------------------------------------- *
1157 * @brief getFullyProcessedItemCount
1158 ** ------------------------------------------------------------------------------------------------------------- */
1159inline Value * PipelineGenerator::getFullyProcessedItemCount(const std::unique_ptr<KernelBuilder> & b, const Binding & input, Value * const isFinal) const {
1160    Value * const processed = b->getProcessedItemCount(input.getName());
1161    if (LLVM_UNLIKELY(input.hasAttribute(kernel::Attribute::KindId::BlockSize))) {
1162        // If the input rate has a block size attribute then --- for the purpose of determining how many
1163        // items have been consumed --- we consider a stream set to be fully processed when an entire
1164        // block (stride?) has been processed.
1165        Constant * const BLOCK_WIDTH = b->getSize(b->getBitBlockWidth());
1166        Value * const partial = b->CreateAnd(processed, ConstantExpr::getNeg(BLOCK_WIDTH));
1167        return b->CreateSelect(isFinal, processed, partial);
1168    }
1169    return processed;
1170}
1171
1172/** ------------------------------------------------------------------------------------------------------------- *
1173 * @brief finalize
1174 ** ------------------------------------------------------------------------------------------------------------- */
1175Value * PipelineGenerator::finalize(const std::unique_ptr<KernelBuilder> & b) {
1176    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
1177        ConstantInt * const ZERO = b->getSize(0);
1178        ConstantInt * const ONE = b->getSize(1);
1179        ConstantInt * const TWO = b->getSize(2);
1180        Value * const count = b->CreateSelect(anyProgress, ZERO, b->CreateAdd(deadLockCounter, ONE));
1181        b->CreateAssert(b->CreateICmpNE(count, TWO), "Dead lock detected: pipeline could not progress after two iterations");
1182        deadLockCounter->addIncoming(count, b->GetInsertBlock());
1183    }
1184    // return whether each sink has terminated
1185    Value * final = b->getTrue();
1186    for (const auto u : make_iterator_range(vertices(dependencyGraph))) {
1187        if (out_degree(u, dependencyGraph) == 0) {
1188            final = b->CreateAnd(final, dependencyGraph[u]);
1189        }
1190    }
1191    return final;
1192}
1193
1194/** ------------------------------------------------------------------------------------------------------------- *
1195 * @brief printGraph
1196 ** ------------------------------------------------------------------------------------------------------------- */
1197void PipelineGenerator::printGraph(const ChannelGraph & G) const {
1198
1199    auto & out = errs();
1200
1201    out << "digraph G {\n";
1202    for (auto u : make_iterator_range(vertices(G))) {
1203        assert (G[u] == kernels[u]);
1204        if (in_degree(u, G) > 0 || out_degree(u, G) > 0) {
1205            out << "v" << u << " [label=\"" << u << ": "
1206                << G[u]->getName() << "\"];\n";
1207        }
1208    }
1209
1210    for (auto e : make_iterator_range(edges(G))) {
1211        const Channel & c = G[e];
1212        const auto s = source(e, G);
1213        const auto t = target(e, G);
1214
1215        out << "v" << s << " -> v" << t
1216            << " [label=\""
1217
1218
1219
1220            << c.ratio.numerator() << " / " << c.ratio.denominator()
1221            << "\"];\n";
1222    }
1223
1224    out << "}\n\n";
1225    out.flush();
1226
1227}
1228
1229/** ------------------------------------------------------------------------------------------------------------- *
1230 * @brief printGraph
1231 ** ------------------------------------------------------------------------------------------------------------- */
1232void PipelineGenerator::printGraph(const DependencyGraph & G) const {
1233
1234    auto & out = errs();
1235
1236    out << "digraph G {\n";
1237    for (auto u : make_iterator_range(vertices(G))) {
1238            out << "v" << u << " [label=\"" << u << ": "
1239                << kernels[u]->getName() << "\"];\n";
1240    }
1241
1242    for (auto e : make_iterator_range(edges(G))) {
1243        const auto s = source(e, G);
1244        const auto t = target(e, G);
1245        out << "v" << s << " -> v" << t << ";\n";
1246    }
1247
1248    out << "}\n\n";
1249    out.flush();
1250
1251}
Note: See TracBrowser for help on using the repository browser.