source: icGREP/icgrep-devel/icgrep/toolchain/pipeline.cpp @ 5856

Last change on this file since 5856 was 5856, checked in by nmedfort, 15 months ago

Revised pipeline structure to better control I/O rates

File size: 27.6 KB
Line 
1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
6#include "pipeline.h"
7#include <toolchain/toolchain.h>
8#include <kernels/kernel.h>
9#include <kernels/streamset.h>
10#include <llvm/IR/Module.h>
11#include <boost/container/flat_set.hpp>
12#include <boost/container/flat_map.hpp>
13#include <boost/graph/adjacency_list.hpp>
14#include <kernels/kernel_builder.h>
15
16#include <llvm/Support/raw_ostream.h>
17
18using namespace kernel;
19using namespace parabix;
20using namespace llvm;
21using namespace boost;
22using namespace boost::container;
23
24using Port = Kernel::Port;
25
26Function * makeThreadFunction(const std::unique_ptr<kernel::KernelBuilder> & b, const std::string & name) {
27    Function * const f = Function::Create(FunctionType::get(b->getVoidTy(), {b->getVoidPtrTy()}, false), Function::InternalLinkage, name, b->getModule());
28    f->setCallingConv(CallingConv::C);
29    f->arg_begin()->setName("state");
30    return f;
31}
32
33struct PipelineGenerator {
34
35    template <typename Value>
36    using StreamSetBufferMap = flat_map<const StreamSetBuffer *, Value>;
37
38    using RateValue = ProcessingRate::RateValue;
39
40    struct Channel {
41        Channel() = default;
42        Channel(const RateValue & rate, const StreamSetBuffer * const buffer)
43        : rate(rate), buffer(buffer) { }
44
45        RateValue               rate;
46        const StreamSetBuffer * buffer;
47    };
48
49    using Graph = adjacency_list<vecS, vecS, bidirectionalS, const Kernel *, Channel, vecS>;
50
51    using Map = flat_map<const Kernel *, Graph::vertex_descriptor>;
52
53    void initialize(const std::vector<Kernel *> & kernels);
54
55    Value * executeKernel(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel, PHINode * const segNo, Value * const finished);
56
57    void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel);
58
59    void updateProducedAndConsumedCounts(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel);
60
61private:
62
63    Graph   G;
64    Map     M;
65
66    StreamSetBufferMap<Value *>         producedItemCount;
67    StreamSetBufferMap<Value *>         consumedItemCount;
68    StreamSetBufferMap<const Kernel *>  lastConsumer;
69};
70
71/** ------------------------------------------------------------------------------------------------------------- *
72 * @brief generateSegmentParallelPipeline
73 *
74 * Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
75 * operating over an input stream set S, a segment-parallel implementation divides the input
76 * into segments and coordinates a set of T <= K threads to each process one segment at a time.
77 * Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
78 * fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
79 ** ------------------------------------------------------------------------------------------------------------- */
80void generateSegmentParallelPipeline(const std::unique_ptr<KernelBuilder> & b, const std::vector<Kernel *> & kernels) {
81
82    const unsigned n = kernels.size();
83    Module * const m = b->getModule();
84    IntegerType * const sizeTy = b->getSizeTy();
85    PointerType * const voidPtrTy = b->getVoidPtrTy();
86    Constant * nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
87    std::vector<Type *> structTypes;
88    codegen::BufferSegments = std::max(codegen::BufferSegments, codegen::ThreadNum);
89
90    Value * instance[n];
91    for (unsigned i = 0; i < n; ++i) {
92        instance[i] = kernels[i]->getInstance();
93        structTypes.push_back(instance[i]->getType());
94    }
95    StructType * const sharedStructType = StructType::get(m->getContext(), structTypes);
96    StructType * const threadStructType = StructType::get(m->getContext(), {sharedStructType->getPointerTo(), sizeTy});
97
98    const auto ip = b->saveIP();
99
100    Function * const threadFunc = makeThreadFunction(b, "segment");
101    auto args = threadFunc->arg_begin();
102
103    // -------------------------------------------------------------------------------------------------------------------------
104    // MAKE SEGMENT PARALLEL PIPELINE THREAD
105    // -------------------------------------------------------------------------------------------------------------------------
106
107     // Create the basic blocks for the thread function.
108    BasicBlock * entryBlock = BasicBlock::Create(b->getContext(), "entry", threadFunc);
109    b->SetInsertPoint(entryBlock);
110
111    Value * const threadStruct = b->CreateBitCast(&*(args), threadStructType->getPointerTo());
112
113    Value * const sharedStatePtr = b->CreateLoad(b->CreateGEP(threadStruct, {b->getInt32(0), b->getInt32(0)}));
114    for (unsigned k = 0; k < n; ++k) {
115        Value * ptr = b->CreateLoad(b->CreateGEP(sharedStatePtr, {b->getInt32(0), b->getInt32(k)}));
116        kernels[k]->setInstance(ptr);
117    }
118    Value * const segOffset = b->CreateLoad(b->CreateGEP(threadStruct, {b->getInt32(0), b->getInt32(1)}));
119
120    PipelineGenerator G;
121
122    BasicBlock * const segmentLoop = b->CreateBasicBlock("segmentLoop");
123    b->CreateBr(segmentLoop);
124
125    b->SetInsertPoint(segmentLoop);
126    G.initialize(kernels);
127    PHINode * const segNo = b->CreatePHI(b->getSizeTy(), 2, "segNo");
128    segNo->addIncoming(segOffset, entryBlock);
129    Value * finished = nullptr;
130
131    Value * cycleCountStart = nullptr;
132    Value * cycleCountEnd = nullptr;
133    if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
134        cycleCountStart = b->CreateReadCycleCounter();
135    }
136
137    const bool serialize = codegen::DebugOptionIsSet(codegen::SerializeThreads);
138
139    for (unsigned k = 0; k < n; ++k) {
140
141        const Kernel * const kernel = kernels[k];
142
143        BasicBlock * const kernelWait = b->CreateBasicBlock(kernel->getName() + "Wait");
144        b->CreateBr(kernelWait);
145
146        b->SetInsertPoint(kernelWait);
147        b->setKernel(kernels[serialize ? (n - 1) : k]);
148        Value * const processedSegmentCount = b->acquireLogicalSegmentNo();
149        b->setKernel(kernel);
150        assert (processedSegmentCount->getType() == segNo->getType());
151        Value * const ready = b->CreateICmpEQ(segNo, processedSegmentCount);
152
153        BasicBlock * const kernelCheck = b->CreateBasicBlock(kernel->getName() + "Check");
154        b->CreateCondBr(ready, kernelCheck, kernelWait);
155
156        b->SetInsertPoint(kernelCheck);
157
158        finished = G.executeKernel(b, kernel, segNo, finished);
159
160        if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
161            cycleCountEnd = b->CreateReadCycleCounter();
162            Value * counterPtr = b->getCycleCountPtr();
163            b->CreateStore(b->CreateAdd(b->CreateLoad(counterPtr), b->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
164            cycleCountStart = cycleCountEnd;
165        }
166
167        b->releaseLogicalSegmentNo(b->CreateAdd(segNo, b->getSize(1)));
168    }
169
170    segNo->addIncoming(b->CreateAdd(segNo, b->getSize(codegen::ThreadNum)), b->GetInsertBlock());
171
172    BasicBlock * const segmentExit = b->CreateBasicBlock("segmentExit");
173    b->CreateUnlikelyCondBr(finished, segmentExit, segmentLoop);
174
175    b->SetInsertPoint(segmentExit);
176
177    // only call pthread_exit() within spawned threads; otherwise it'll be equivalent to calling exit() within the process
178    BasicBlock * const exitThread = b->CreateBasicBlock("ExitThread");
179    BasicBlock * const exitFunction = b->CreateBasicBlock("ExitProcessFunction");
180
181    b->CreateCondBr(b->CreateIsNull(segOffset), exitFunction, exitThread);
182    b->SetInsertPoint(exitThread);
183    b->CreatePThreadExitCall(nullVoidPtrVal);
184    b->CreateBr(exitFunction);
185    b->SetInsertPoint(exitFunction);
186    b->CreateRetVoid();
187
188    // -------------------------------------------------------------------------------------------------------------------------
189    b->restoreIP(ip);
190
191    for (unsigned i = 0; i < n; ++i) {
192        kernels[i]->setInstance(instance[i]);
193    }
194
195    // -------------------------------------------------------------------------------------------------------------------------
196    // MAKE SEGMENT PARALLEL PIPELINE DRIVER
197    // -------------------------------------------------------------------------------------------------------------------------
198    const unsigned threads = codegen::ThreadNum - 1;
199    assert (codegen::ThreadNum > 0);
200    Type * const pthreadsTy = ArrayType::get(sizeTy, threads);
201    AllocaInst * const pthreads = b->CreateAlloca(pthreadsTy);
202    Value * threadIdPtr[threads];
203
204    for (unsigned i = 0; i < threads; ++i) {
205        threadIdPtr[i] = b->CreateGEP(pthreads, {b->getInt32(0), b->getInt32(i)});
206    }
207
208    for (unsigned i = 0; i < n; ++i) {
209        b->setKernel(kernels[i]);
210        b->releaseLogicalSegmentNo(b->getSize(0));
211    }
212
213    AllocaInst * const sharedStruct = b->CreateCacheAlignedAlloca(sharedStructType);
214    for (unsigned i = 0; i < n; ++i) {
215        Value * ptr = b->CreateGEP(sharedStruct, {b->getInt32(0), b->getInt32(i)});
216        b->CreateStore(kernels[i]->getInstance(), ptr);
217    }
218
219    // use the process thread to handle the initial segment function after spawning (n - 1) threads to handle the subsequent offsets
220    for (unsigned i = 0; i < threads; ++i) {
221        AllocaInst * const threadState = b->CreateAlloca(threadStructType);
222        b->CreateStore(sharedStruct, b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(0)}));
223        b->CreateStore(b->getSize(i + 1), b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(1)}));
224        b->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, threadFunc, threadState);
225    }
226
227    AllocaInst * const threadState = b->CreateAlloca(threadStructType);
228    b->CreateStore(sharedStruct, b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(0)}));
229    b->CreateStore(b->getSize(0), b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(1)}));
230    b->CreateCall(threadFunc, b->CreatePointerCast(threadState, voidPtrTy));
231
232    AllocaInst * const status = b->CreateAlloca(voidPtrTy);
233    for (unsigned i = 0; i < threads; ++i) {
234        Value * threadId = b->CreateLoad(threadIdPtr[i]);
235        b->CreatePThreadJoinCall(threadId, status);
236    }
237   
238    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
239        for (const Kernel * kernel : kernels) {
240            b->setKernel(kernel);
241            const auto & inputs = kernel->getStreamInputs();
242            const auto & outputs = kernel->getStreamOutputs();
243            Value * items = nullptr;
244            if (inputs.empty()) {
245                items = b->getProducedItemCount(outputs[0].getName());
246            } else {
247                items = b->getProcessedItemCount(inputs[0].getName());
248            }
249            Value * fItems = b->CreateUIToFP(items, b->getDoubleTy());
250            Value * cycles = b->CreateLoad(b->getCycleCountPtr());
251            Value * fCycles = b->CreateUIToFP(cycles, b->getDoubleTy());
252            const auto formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
253            Value * stringPtr = b->CreatePointerCast(b->GetString(formatString), b->getInt8PtrTy());
254            b->CreateCall(b->GetDprintf(), {b->getInt32(2), stringPtr, fItems, fCycles, b->CreateFDiv(fCycles, fItems)});
255        }
256    }
257   
258}
259
260
261/** ------------------------------------------------------------------------------------------------------------- *
262 * @brief generatePipelineLoop
263 ** ------------------------------------------------------------------------------------------------------------- */
264void generatePipelineLoop(const std::unique_ptr<KernelBuilder> & b, const std::vector<Kernel *> & kernels) {
265
266    // Create the basic blocks for the loop.
267    BasicBlock * const entryBlock = b->GetInsertBlock();
268    BasicBlock * const pipelineLoop = b->CreateBasicBlock("pipelineLoop");
269    BasicBlock * const pipelineExit = b->CreateBasicBlock("pipelineExit");
270
271    PipelineGenerator G;
272
273    b->CreateBr(pipelineLoop);
274
275    b->SetInsertPoint(pipelineLoop);
276    G.initialize(kernels);
277    PHINode * const segNo = b->CreatePHI(b->getSizeTy(), 2, "segNo");
278    segNo->addIncoming(b->getSize(0), entryBlock);
279    Value * finished = nullptr;
280
281    Value * cycleCountStart = nullptr;
282    Value * cycleCountEnd = nullptr;
283    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
284        cycleCountStart = b->CreateReadCycleCounter();
285    }
286
287    for (Kernel * const kernel : kernels) {
288
289        b->setKernel(kernel);
290
291        finished = G.executeKernel(b, kernel, segNo, finished);
292
293        if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
294            cycleCountEnd = b->CreateReadCycleCounter();
295            Value * counterPtr = b->getCycleCountPtr();
296            b->CreateStore(b->CreateAdd(b->CreateLoad(counterPtr), b->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
297            cycleCountStart = cycleCountEnd;
298        }
299    }
300
301    segNo->addIncoming(b->CreateAdd(segNo, b->getSize(1)), b->GetInsertBlock());
302    b->CreateCondBr(finished, pipelineExit, pipelineLoop);
303
304    pipelineExit->moveAfter(b->GetInsertBlock());
305
306    b->SetInsertPoint(pipelineExit);
307
308    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
309        for (unsigned k = 0; k < kernels.size(); k++) {
310            auto & kernel = kernels[k];
311            b->setKernel(kernel);
312            const auto & inputs = kernel->getStreamInputs();
313            const auto & outputs = kernel->getStreamOutputs();
314            Value * items = nullptr;
315            if (inputs.empty()) {
316                items = b->getProducedItemCount(outputs[0].getName());
317            } else {
318                items = b->getProcessedItemCount(inputs[0].getName());
319            }
320            Value * fItems = b->CreateUIToFP(items, b->getDoubleTy());
321            Value * cycles = b->CreateLoad(b->getCycleCountPtr());
322            Value * fCycles = b->CreateUIToFP(cycles, b->getDoubleTy());
323            const auto formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
324            Value * stringPtr = b->CreatePointerCast(b->GetString(formatString), b->getInt8PtrTy());
325            b->CreateCall(b->GetDprintf(), {b->getInt32(2), stringPtr, fItems, fCycles, b->CreateFDiv(fCycles, fItems)});
326        }
327    }
328
329}
330
331/** ------------------------------------------------------------------------------------------------------------- *
332 * @brief initialize
333 ** ------------------------------------------------------------------------------------------------------------- */
334void PipelineGenerator::initialize(const std::vector<Kernel *> & kernels) {
335
336    // Our goal when building G is *not* to model the dataflow of our program but instead to
337    // detetermine the minimum number of sufficient data tests needed to ensure each kernel has
338    // enough data to progress.
339
340    // For example, suppose we have kernels A, B and C, and that B has a fixed input and fixed
341    // output rate. C also has a fixed input rate but A does *not* have a fixed output rate.
342    // C must test whether it has enough input from B as B is not guaranteed to have enough
343    // input from A. Moreover if C is depedent on B, C could be skipped entirely.
344
345    // Note: we cannot simply test the output of A for both B and C. In a our data-parallel
346    // pipeline A's state may change by the time we process C.
347
348    for (const Kernel * const consumer : kernels) {
349        const auto v = add_vertex(consumer, G);
350        M.emplace(consumer, v);
351        const auto & inputs = consumer->getStreamInputs();
352        for (unsigned i = 0; i < inputs.size(); ++i) {
353
354            const auto buffer = consumer->getStreamSetInputBuffer(i);
355            const Kernel * const producer = buffer->getProducer();
356            const Binding & output = producer->getStreamOutput(buffer);
357            if (output.getRate().isRelative()) continue;
358
359            const Binding & input = inputs[i];
360            auto ub_in = consumer->getUpperBound(input.getRate()) * consumer->getStride();
361            if (input.hasLookahead()) {
362                ub_in += input.getLookahead();
363            }
364
365            const auto lb_out = producer->getLowerBound(output.getRate()) * producer->getStride();
366
367            const auto rate = lb_out / ub_in;
368            const auto f = M.find(producer); assert (f != M.end());
369            const auto u = f->second;
370            // If we have multiple inputs from the same kernel, we only need to consider the "slowest" one
371            bool slowest = true;
372            if (lb_out > 0) {
373                for (const auto e : make_iterator_range(in_edges(v, G))) {
374                    if (source(e, G) == u) {
375                        Channel & p = G[e];
376                        slowest = false;
377                        if (rate < p.rate) {
378                            p.rate = rate;
379                            p.buffer = buffer;
380                        }
381                        break;
382                    }
383                }
384            }
385            if (slowest) {
386                add_edge(u, v, Channel{rate, buffer}, G);
387            }
388        }
389    }
390
391    // Take a transitive closure of G but whenever we attempt to insert an edge into the closure
392    // that already exists, check instead whether the rate of our proposed edge is <= the existing
393    // edge's rate. If so, the data availability is transitively guaranteed.
394    for (const auto u : make_iterator_range(vertices(G))) {
395        for (auto ei : make_iterator_range(in_edges(u, G))) {
396            const auto v = source(ei, G);
397            const Channel & pu = G[ei];           
398            for (auto ej : make_iterator_range(out_edges(u, G))) {               
399                const auto w = target(ej, G);
400                const auto ratio = RateValue(G[u]->getStride(), G[w]->getStride());
401                const auto rate = pu.rate * ratio;
402                bool insert = true;
403                for (auto ek : make_iterator_range(in_edges(w, G))) {
404                    if (source(ek, G) == v) {
405                        Channel & pw = G[ek];
406                        if (rate <= pw.rate && pw.rate > 0) {
407                            pw.buffer = nullptr;
408                        }
409                        insert = false;
410                        break;
411                    }
412                }
413                if (insert) {
414                    add_edge(v, w, Channel{rate, nullptr}, G);
415                }
416            }
417        }
418    }
419
420    // remove any closure edges from G
421    remove_edge_if([&](const Graph::edge_descriptor e) { return G[e].buffer == nullptr; }, G);
422
423    // If a kernel has no 'necessary to check' inputs then we can remove every output with a rate >= 1 from G
424    for (const auto u : make_iterator_range(vertices(G))) {
425        if (in_degree(u, G) == 0) {
426            remove_out_edge_if(u, [&](const Graph::edge_descriptor e) { return G[e].rate >= RateValue{1, 1}; }, G);
427        }
428    }
429
430    // iterate through each kernel in order and determine which kernel last used a particular buffer
431    for (Kernel * const kernel : kernels) {
432        const auto & inputs = kernel->getStreamInputs();
433        for (unsigned i = 0; i < inputs.size(); ++i) {
434            lastConsumer[kernel->getStreamSetInputBuffer(i)] = kernel;
435        }
436    }
437
438}
439
440/** ------------------------------------------------------------------------------------------------------------- *
441 * @brief executeKernel
442 ** ------------------------------------------------------------------------------------------------------------- */
443Value *PipelineGenerator::executeKernel(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel, PHINode * const segNo, Value * const finished) {
444
445    const auto & inputs = kernel->getStreamInputs();
446
447    std::vector<Value *> args(2 + inputs.size());
448
449    const auto f = M.find(kernel); assert (f != M.end());
450    const auto u = f->second;
451
452    BasicBlock * const kernelEntry = b->GetInsertBlock();
453    BasicBlock * const kernelCode = b->CreateBasicBlock(kernel->getName());
454    BasicBlock * const kernelExit = b->CreateBasicBlock(kernel->getName() + "_exit");
455
456    b->CreateUnlikelyCondBr(b->getTerminationSignal(), kernelExit, kernelCode);
457
458    b->SetInsertPoint(kernelExit);
459    PHINode * const terminated = b->CreatePHI(b->getInt1Ty(), 2);
460    // Since our initial "isFinal" state is equal to what the first kernel's termination signal state
461    terminated->addIncoming(finished ? finished : b->getTrue(), kernelEntry);
462    Value * isFinal = finished ? finished : b->getFalse();
463
464    b->SetInsertPoint(kernelCode);
465    for (unsigned i = 0; i < inputs.size(); ++i) {
466
467        const Binding & input = inputs[i];
468
469        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
470
471        const auto name = input.getName();
472
473        const auto p = producedItemCount.find(buffer);
474        if (LLVM_UNLIKELY(p == producedItemCount.end())) {
475            report_fatal_error(kernel->getName() + " uses stream set " + name + " prior to its definition");
476        }
477        Value * const produced = p->second;
478        const auto ub = kernel->getUpperBound(input.getRate()); assert (ub > 0);
479        const auto strideLength = ceiling(ub * kernel->getStride()) ;
480        Constant * const segmentLength = b->getSize(strideLength * codegen::SegmentSize);
481
482        if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
483            b->CreateAssert(b->CreateICmpULE(segmentLength, b->getCapacity(name)),
484                            kernel->getName() + ": " + name + " upper bound of segment length exceeds buffer capacity");
485        }
486
487        Value * limit = nullptr;
488        if (input.getRate().isFixed()) {
489            // if the input is deferred, simply adding length to the processed item count may result in setting a limit
490            // that is too low for. instead just calculate the limit of all fixed rates from the segment no.
491            limit = b->CreateMul(b->CreateAdd(segNo, b->getSize(1)), segmentLength);
492        } else {
493            Value * const processed = b->getProcessedItemCount(name);
494            limit = b->CreateAdd(processed, segmentLength);
495        }
496
497        // TODO: currently, if we produce the exact amount as our limit states, we will have to process one additional segment
498        // before we can consider this kernel finished. We ought to be able to avoid doing in some cases but need to prove its
499        // always safe to do so.
500
501        Value * const consumingAll = b->CreateICmpULT(produced, limit);
502        args[i + 2] = b->CreateSelect(consumingAll, produced, limit);
503        isFinal = b->CreateAnd(isFinal, consumingAll);
504
505        // Check for available input (if it's both computable and not guaranteed to be sufficient by the processing rates)
506        for (auto e : make_iterator_range(in_edges(u, G))) {
507            const auto p = G[e];
508            if (p.buffer == buffer) {
509                BasicBlock * const sufficient = b->CreateBasicBlock(name + "_hasSufficientData");
510
511                Constant * const sl = b->getSize(strideLength);
512
513                Value * remaining = nullptr;
514                if (input.getRate().isFixed()) {
515                    remaining = b->CreateMul(segNo, sl);
516                } else {
517                    remaining = b->getProcessedItemCount(name);
518                }
519                remaining = b->CreateSub(produced, remaining);
520
521                Value * const hasSufficientData = b->CreateOr(b->CreateICmpUGE(remaining, sl), isFinal);
522                terminated->addIncoming(b->getFalse(), b->GetInsertBlock());
523                b->CreateLikelyCondBr(hasSufficientData, sufficient, kernelExit);
524                b->SetInsertPoint(sufficient);
525            }
526        }
527    }
528
529    applyOutputBufferExpansions(b, kernel);
530
531    args[0] = kernel->getInstance();
532    args[1] = isFinal;
533
534    b->createDoSegmentCall(args);
535
536    if (inputs.empty() || kernel->canTerminateEarly()) {
537        isFinal = b->CreateOr(isFinal, b->getTerminationSignal());
538    }
539    b->setTerminationSignal(isFinal);
540//    b->CallPrintInt(kernel->getName() + "_finished", isFinal);
541    BasicBlock * const kernelFinished = b->GetInsertBlock();
542    kernelExit->moveAfter(kernelFinished);
543    b->CreateBr(kernelExit);
544
545    b->SetInsertPoint(kernelExit);
546    terminated->addIncoming(isFinal, kernelFinished);
547
548    updateProducedAndConsumedCounts(b, kernel);
549
550    return terminated;
551}
552
553/** ------------------------------------------------------------------------------------------------------------- *
554 * @brief applyOutputBufferExpansions
555 ** ------------------------------------------------------------------------------------------------------------- */
556void PipelineGenerator::applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * k) {
557    const auto & outputs = k->getStreamSetOutputBuffers();
558    for (unsigned i = 0; i < outputs.size(); i++) {
559        if (isa<DynamicBuffer>(outputs[i])) {
560            const auto baseSize = ceiling(k->getUpperBound(k->getStreamOutput(i).getRate()) * k->getStride() * codegen::SegmentSize);
561            if (LLVM_LIKELY(baseSize > 0)) {
562
563                const auto & name = k->getStreamOutput(i).getName();
564
565                BasicBlock * const doExpand = b->CreateBasicBlock(name + "Expand");
566                BasicBlock * const nextBlock = b->GetInsertBlock()->getNextNode();
567                doExpand->moveAfter(b->GetInsertBlock());
568                BasicBlock * const bufferReady = b->CreateBasicBlock(name + "Ready");
569                bufferReady->moveAfter(doExpand);
570                if (nextBlock) nextBlock->moveAfter(bufferReady);
571
572                Value * const produced = b->getProducedItemCount(name);
573                Value * const consumed = b->getConsumedItemCount(name);
574                Value * const required = b->CreateAdd(b->CreateSub(produced, consumed), b->getSize(2 * baseSize));
575
576                b->CreateCondBr(b->CreateICmpUGT(required, b->getBufferedSize(name)), doExpand, bufferReady);
577                b->SetInsertPoint(doExpand);
578
579                b->doubleCapacity(name);
580                // Ensure that capacity is sufficient by successive doubling, if necessary.
581                b->CreateCondBr(b->CreateICmpUGT(required, b->getBufferedSize(name)), doExpand, bufferReady);
582
583                b->SetInsertPoint(bufferReady);
584
585            }
586        }
587    }
588}
589
590/** ------------------------------------------------------------------------------------------------------------- *
591 * @brief updateProducedAndConsumedCounts
592 ** ------------------------------------------------------------------------------------------------------------- */
593void PipelineGenerator::updateProducedAndConsumedCounts(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel) {
594
595    const auto & inputs = kernel->getStreamInputs();
596    for (unsigned i = 0; i < inputs.size(); ++i) {
597        Value * const processed = b->getProcessedItemCount(inputs[i].getName());
598
599        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
600        auto f = consumedItemCount.find(buffer);
601        Value * consumed = processed;
602        if (f == consumedItemCount.end()) {
603            consumedItemCount.emplace(buffer, consumed);
604        } else {
605            consumed = b->CreateUMin(consumed, f->second);
606            f->second = consumed;
607        }
608
609        // If this kernel is the last consumer of a input buffer, update the consumed count for that buffer.
610        const auto c = lastConsumer.find(buffer);
611        assert (c != lastConsumer.end());
612        if (c->second == kernel) {
613            Kernel * const producer = buffer->getProducer();
614            const auto & output = producer->getStreamOutput(buffer);
615            if (output.getRate().isRelative()) continue;
616            b->setKernel(producer);
617
618            b->setConsumedItemCount(output.getName(), consumed);
619            b->setKernel(kernel);
620        }
621    }
622
623    const auto & outputs = kernel->getStreamOutputs();
624    for (unsigned i = 0; i < outputs.size(); ++i) {
625        Value * const produced = b->getProducedItemCount(outputs[i].getName());
626        const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
627        assert (producedItemCount.count(buf) == 0);
628        producedItemCount.emplace(buf, produced);
629    }
630
631}
632
633
Note: See TracBrowser for help on using the repository browser.