source: icGREP/icgrep-devel/icgrep/toolchain/pipeline.cpp @ 5833

Last change on this file since 5833 was 5833, checked in by nmedfort, 15 months ago

Missing change

File size: 24.3 KB
Line 
1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
6#include "pipeline.h"
7#include <toolchain/toolchain.h>
8#include <kernels/kernel.h>
9#include <kernels/streamset.h>
10#include <llvm/IR/Module.h>
11#include <boost/container/flat_set.hpp>
12#include <boost/container/flat_map.hpp>
13#include <kernels/kernel_builder.h>
14
15using namespace kernel;
16using namespace parabix;
17using namespace llvm;
18
19using Port = Kernel::Port;
20
21template <typename Value>
22using StreamSetBufferMap = boost::container::flat_map<const StreamSetBuffer *, Value>;
23
24template <typename Value>
25using FlatSet = boost::container::flat_set<Value>;
26
27Function * makeThreadFunction(const std::unique_ptr<kernel::KernelBuilder> & b, const std::string & name) {
28    Function * const f = Function::Create(FunctionType::get(b->getVoidTy(), {b->getVoidPtrTy()}, false), Function::InternalLinkage, name, b->getModule());
29    f->setCallingConv(CallingConv::C);
30    f->arg_begin()->setName("state");
31    return f;
32}
33
34void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel);
35
36void handleInsufficientData(const std::unique_ptr<KernelBuilder> & b, Value * const produced, Value * const final, BasicBlock * const entry, const Kernel * const consumer,  const Binding & input, const StreamSetBuffer * const buffer);
37
38bool requiresCopyBack(const Kernel * k, const ProcessingRate & rate);
39
40/** ------------------------------------------------------------------------------------------------------------- *
41 * @brief generateSegmentParallelPipeline
42 *
43 * Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
44 * operating over an input stream set S, a segment-parallel implementation divides the input
45 * into segments and coordinates a set of T <= K threads to each process one segment at a time.
46 * Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
47 * fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
48 ** ------------------------------------------------------------------------------------------------------------- */
49void generateSegmentParallelPipeline(const std::unique_ptr<KernelBuilder> & b, const std::vector<Kernel *> & kernels) {
50
51    const unsigned n = kernels.size();
52    Module * const m = b->getModule();
53    IntegerType * const sizeTy = b->getSizeTy();
54    PointerType * const voidPtrTy = b->getVoidPtrTy();
55    Constant * nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
56    std::vector<Type *> structTypes;
57    codegen::BufferSegments = std::max(codegen::BufferSegments, codegen::ThreadNum);
58
59    Value * instance[n];
60    for (unsigned i = 0; i < n; ++i) {
61        instance[i] = kernels[i]->getInstance();
62        structTypes.push_back(instance[i]->getType());
63    }
64    StructType * const sharedStructType = StructType::get(m->getContext(), structTypes);
65    StructType * const threadStructType = StructType::get(m->getContext(), {sharedStructType->getPointerTo(), sizeTy});
66
67    const auto ip = b->saveIP();
68
69    Function * const threadFunc = makeThreadFunction(b, "segment");
70    auto args = threadFunc->arg_begin();
71
72    // -------------------------------------------------------------------------------------------------------------------------
73    // MAKE SEGMENT PARALLEL PIPELINE THREAD
74    // -------------------------------------------------------------------------------------------------------------------------
75
76     // Create the basic blocks for the thread function.
77    BasicBlock * entryBlock = BasicBlock::Create(b->getContext(), "entry", threadFunc);
78    b->SetInsertPoint(entryBlock);
79
80    Value * const threadStruct = b->CreateBitCast(&*(args), threadStructType->getPointerTo());
81
82    Value * const sharedStatePtr = b->CreateLoad(b->CreateGEP(threadStruct, {b->getInt32(0), b->getInt32(0)}));
83    for (unsigned k = 0; k < n; ++k) {
84        Value * ptr = b->CreateLoad(b->CreateGEP(sharedStatePtr, {b->getInt32(0), b->getInt32(k)}));
85        kernels[k]->setInstance(ptr);
86    }
87    Value * const segOffset = b->CreateLoad(b->CreateGEP(threadStruct, {b->getInt32(0), b->getInt32(1)}));
88
89    BasicBlock * const segmentLoop = BasicBlock::Create(b->getContext(), "segmentLoop", threadFunc);
90    b->CreateBr(segmentLoop);
91
92    b->SetInsertPoint(segmentLoop);
93    PHINode * const segNo = b->CreatePHI(b->getSizeTy(), 2, "segNo");
94    segNo->addIncoming(segOffset, entryBlock);
95
96    BasicBlock * const exitThreadBlock = BasicBlock::Create(b->getContext(), "exitThread", threadFunc);
97
98    StreamSetBufferMap<Value *> producedItemCount;
99    StreamSetBufferMap<Value *> consumedItemCount;
100
101    Value * cycleCountStart = nullptr;
102    Value * cycleCountEnd = nullptr;
103    if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
104        cycleCountStart = b->CreateReadCycleCounter();
105    }
106
107    Value * terminated = nullptr;
108
109    const bool serialize = codegen::DebugOptionIsSet(codegen::SerializeThreads);
110
111    for (unsigned k = 0; k < n; ++k) {
112
113        const auto & kernel = kernels[k];
114
115        BasicBlock * const kernelWait = BasicBlock::Create(b->getContext(), kernel->getName() + "Wait", threadFunc);
116
117        b->CreateBr(kernelWait);
118
119        BasicBlock * const kernelCheck = BasicBlock::Create(b->getContext(), kernel->getName() + "Check", threadFunc);
120
121        BasicBlock * const kernelBody = BasicBlock::Create(b->getContext(), kernel->getName() + "Do", threadFunc);
122
123        BasicBlock * const kernelEnd = BasicBlock::Create(b->getContext(), kernel->getName() + "End", threadFunc);
124
125        b->SetInsertPoint(kernelWait);
126
127        b->setKernel(kernels[serialize ? (n - 1) : k]);
128        Value * const processedSegmentCount = b->acquireLogicalSegmentNo();
129        b->setKernel(kernel);
130
131        assert (processedSegmentCount->getType() == segNo->getType());
132        Value * const ready = b->CreateICmpEQ(segNo, processedSegmentCount);       
133        b->CreateCondBr(ready, kernelCheck, kernelWait);
134
135        b->SetInsertPoint(kernelCheck);
136        b->CreateUnlikelyCondBr(b->getTerminationSignal(), kernelEnd, kernelBody);
137
138        // Execute the kernel segment
139        b->SetInsertPoint(kernelBody);
140        const auto & inputs = kernel->getStreamInputs();
141        Value * const isFinal = b->CreateOr(terminated ? terminated : b->getFalse(), b->getTerminationSignal());
142        std::vector<Value *> args = {kernel->getInstance(), isFinal};
143        for (unsigned i = 0; i < inputs.size(); ++i) {
144            const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
145            const auto f = producedItemCount.find(buffer);
146            assert (f != producedItemCount.end());
147            Value * const produced = f->second;
148            args.push_back(produced);
149            handleInsufficientData(b, produced, isFinal, kernelEnd, kernel, inputs[i], buffer);
150        }
151
152        b->setKernel(kernel);
153        b->createDoSegmentCall(args);
154        b->CreateBr(kernelEnd);
155
156        b->SetInsertPoint(kernelEnd);
157
158        Value * const finished = b->getTerminationSignal();
159        if (terminated) { // all kernels must terminate
160            terminated = b->CreateAnd(terminated, finished);
161        } else {
162            terminated = finished;
163        }
164
165        const auto & outputs = kernel->getStreamOutputs();
166        for (unsigned i = 0; i < outputs.size(); ++i) {           
167            Value * const produced = b->getProducedItemCount(outputs[i].getName());
168            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
169            assert (producedItemCount.count(buf) == 0);
170            producedItemCount.emplace(buf, produced);
171        }
172        for (unsigned i = 0; i < inputs.size(); ++i) {
173            Value * const processedItemCount = b->getProcessedItemCount(inputs[i].getName());
174            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);           
175            auto f = consumedItemCount.find(buf);
176            if (f == consumedItemCount.end()) {
177                consumedItemCount.emplace(buf, processedItemCount);
178            } else {
179                assert (f->second);
180                f->second = b->CreateUMin(processedItemCount, f->second);
181            }
182        }
183
184        if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
185            cycleCountEnd = b->CreateReadCycleCounter();
186            Value * counterPtr = b->getCycleCountPtr();
187            b->CreateStore(b->CreateAdd(b->CreateLoad(counterPtr), b->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
188            cycleCountStart = cycleCountEnd;
189        }
190
191        b->releaseLogicalSegmentNo(b->CreateAdd(segNo, b->getSize(1)));
192    }
193
194    exitThreadBlock->moveAfter(b->GetInsertBlock());
195    for (const auto consumed : consumedItemCount) {
196        const StreamSetBuffer * const buf = consumed.first;
197        Kernel * const k = buf->getProducer();
198        const auto & outputs = k->getStreamSetOutputBuffers();
199        for (unsigned i = 0; i < outputs.size(); ++i) {
200            if (outputs[i] == buf) {
201                const auto & binding = k->getStreamOutput(i);
202                if (LLVM_UNLIKELY(binding.getRate().isDerived())) {
203                    continue;
204                }
205                b->setKernel(k);
206                b->setConsumedItemCount(binding.getName(), consumed.second);
207                break;
208            }
209        }
210    }
211
212    segNo->addIncoming(b->CreateAdd(segNo, b->getSize(codegen::ThreadNum)), b->GetInsertBlock());
213
214    b->CreateUnlikelyCondBr(terminated, exitThreadBlock, segmentLoop);
215
216    b->SetInsertPoint(exitThreadBlock);
217
218    // only call pthread_exit() within spawned threads; otherwise it'll be equivalent to calling exit() within the process
219    BasicBlock * const exitThread = BasicBlock::Create(b->getContext(), "ExitThread", threadFunc);
220    BasicBlock * const exitFunction = BasicBlock::Create(b->getContext(), "ExitProcessFunction", threadFunc);
221
222    Value * const exitCond = b->CreateICmpEQ(segOffset, ConstantInt::getNullValue(segOffset->getType()));
223    b->CreateCondBr(exitCond, exitFunction, exitThread);
224    b->SetInsertPoint(exitThread);
225    b->CreatePThreadExitCall(nullVoidPtrVal);
226    b->CreateBr(exitFunction);
227    b->SetInsertPoint(exitFunction);
228    b->CreateRetVoid();
229
230    // -------------------------------------------------------------------------------------------------------------------------
231    b->restoreIP(ip);
232
233    for (unsigned i = 0; i < n; ++i) {
234        kernels[i]->setInstance(instance[i]);
235    }
236
237    // -------------------------------------------------------------------------------------------------------------------------
238    // MAKE SEGMENT PARALLEL PIPELINE DRIVER
239    // -------------------------------------------------------------------------------------------------------------------------
240    const unsigned threads = codegen::ThreadNum - 1;
241    assert (codegen::ThreadNum > 0);
242    Type * const pthreadsTy = ArrayType::get(sizeTy, threads);
243    AllocaInst * const pthreads = b->CreateAlloca(pthreadsTy);
244    Value * threadIdPtr[threads];
245
246    for (unsigned i = 0; i < threads; ++i) {
247        threadIdPtr[i] = b->CreateGEP(pthreads, {b->getInt32(0), b->getInt32(i)});
248    }
249
250    for (unsigned i = 0; i < n; ++i) {
251        b->setKernel(kernels[i]);
252        b->releaseLogicalSegmentNo(b->getSize(0));
253    }
254
255    AllocaInst * const sharedStruct = b->CreateCacheAlignedAlloca(sharedStructType);
256    for (unsigned i = 0; i < n; ++i) {
257        Value * ptr = b->CreateGEP(sharedStruct, {b->getInt32(0), b->getInt32(i)});
258        b->CreateStore(kernels[i]->getInstance(), ptr);
259    }
260
261    // use the process thread to handle the initial segment function after spawning (n - 1) threads to handle the subsequent offsets
262    for (unsigned i = 0; i < threads; ++i) {
263        AllocaInst * const threadState = b->CreateAlloca(threadStructType);
264        b->CreateStore(sharedStruct, b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(0)}));
265        b->CreateStore(b->getSize(i + 1), b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(1)}));
266        b->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, threadFunc, threadState);
267    }
268
269    AllocaInst * const threadState = b->CreateAlloca(threadStructType);
270    b->CreateStore(sharedStruct, b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(0)}));
271    b->CreateStore(b->getSize(0), b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(1)}));
272    b->CreateCall(threadFunc, b->CreatePointerCast(threadState, voidPtrTy));
273
274    AllocaInst * const status = b->CreateAlloca(voidPtrTy);
275    for (unsigned i = 0; i < threads; ++i) {
276        Value * threadId = b->CreateLoad(threadIdPtr[i]);
277        b->CreatePThreadJoinCall(threadId, status);
278    }
279   
280    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
281        for (const Kernel * kernel : kernels) {
282            b->setKernel(kernel);
283            const auto & inputs = kernel->getStreamInputs();
284            const auto & outputs = kernel->getStreamOutputs();
285            Value * items = nullptr;
286            if (inputs.empty()) {
287                items = b->getProducedItemCount(outputs[0].getName());
288            } else {
289                items = b->getProcessedItemCount(inputs[0].getName());
290            }
291            Value * fItems = b->CreateUIToFP(items, b->getDoubleTy());
292            Value * cycles = b->CreateLoad(b->getCycleCountPtr());
293            Value * fCycles = b->CreateUIToFP(cycles, b->getDoubleTy());
294            const auto formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
295            Value * stringPtr = b->CreatePointerCast(b->GetString(formatString), b->getInt8PtrTy());
296            b->CreateCall(b->GetDprintf(), {b->getInt32(2), stringPtr, fItems, fCycles, b->CreateFDiv(fCycles, fItems)});
297        }
298    }
299   
300}
301
302/** ------------------------------------------------------------------------------------------------------------- *
303 * @brief generatePipelineLoop
304 ** ------------------------------------------------------------------------------------------------------------- */
305void generatePipelineLoop(const std::unique_ptr<KernelBuilder> & b, const std::vector<Kernel *> & kernels) {
306
307    BasicBlock * entryBlock = b->GetInsertBlock();
308    Function * main = entryBlock->getParent();
309
310    // Create the basic blocks for the loop.
311    BasicBlock * const pipelineLoop = BasicBlock::Create(b->getContext(), "pipelineLoop", main);
312    BasicBlock * const pipelineExit = BasicBlock::Create(b->getContext(), "pipelineExit", main);
313
314    StreamSetBufferMap<Value *> producedItemCount;
315    StreamSetBufferMap<Value *> consumedItemCount;
316
317    b->CreateBr(pipelineLoop);
318    b->SetInsertPoint(pipelineLoop);
319   
320    Value * cycleCountStart = nullptr;
321    Value * cycleCountEnd = nullptr;
322    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
323        cycleCountStart = b->CreateReadCycleCounter();
324    }
325    Value * terminated = nullptr;
326
327    for (Kernel * const kernel : kernels) {
328
329        b->setKernel(kernel);
330
331        BasicBlock * const entry = b->GetInsertBlock();
332        BasicBlock * const kernelCode = BasicBlock::Create(b->getContext(), kernel->getName(), main);
333        BasicBlock * const kernelExit = BasicBlock::Create(b->getContext(), kernel->getName() + "_exit", main);
334
335        b->CreateUnlikelyCondBr(b->getTerminationSignal(), kernelExit, kernelCode);
336
337        b->SetInsertPoint(kernelCode);
338        const auto & inputs = kernel->getStreamInputs();
339        const auto & outputs = kernel->getStreamOutputs();
340
341        Value * const isFinal = terminated ? terminated : b->getFalse();
342
343        std::vector<Value *> args = {kernel->getInstance(), isFinal};
344
345        for (unsigned i = 0; i < inputs.size(); ++i) {
346            const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
347            const auto f = producedItemCount.find(buffer);
348            if (LLVM_UNLIKELY(f == producedItemCount.end())) {
349                report_fatal_error(kernel->getName() + " uses stream set " + inputs[i].getName() + " prior to its definition");
350            }
351            Value * const produced = f->second;
352            args.push_back(produced);
353            handleInsufficientData(b, produced, isFinal, pipelineLoop, kernel, inputs[i], buffer);
354        }
355
356        applyOutputBufferExpansions(b, kernel);
357
358        b->createDoSegmentCall(args);
359
360        BasicBlock * const kernelFinished = b->GetInsertBlock();
361        Value * const finished = b->getTerminationSignal();
362        b->CreateBr(kernelExit);
363
364        b->SetInsertPoint(kernelExit);
365        PHINode * const finishedPhi = b->CreatePHI(b->getInt1Ty(), 2);
366        finishedPhi->addIncoming(b->getTrue(), entry);
367        finishedPhi->addIncoming(finished, kernelFinished);
368        if (terminated) { // All kernels must agree that we've terminated.
369            terminated = b->CreateAnd(terminated, finishedPhi);
370        } else {
371            terminated = finishedPhi;
372        }
373
374        for (unsigned i = 0; i < outputs.size(); ++i) {
375            Value * const produced = b->getProducedItemCount(outputs[i].getName());
376            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
377            assert (producedItemCount.count(buf) == 0);
378            producedItemCount.emplace(buf, produced);
379        }
380
381        for (unsigned i = 0; i < inputs.size(); ++i) {
382            Value * const processed = b->getProcessedItemCount(inputs[i].getName());
383            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);
384            auto f = consumedItemCount.find(buf);
385            if (f == consumedItemCount.end()) {
386                consumedItemCount.emplace(buf, processed);
387            } else {
388                f->second = b->CreateUMin(processed, f->second);
389            }
390        }
391
392        if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
393            cycleCountEnd = b->CreateReadCycleCounter();
394            Value * counterPtr = b->getCycleCountPtr();
395            b->CreateStore(b->CreateAdd(b->CreateLoad(counterPtr), b->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
396            cycleCountStart = cycleCountEnd;
397        }
398//        Value * const segNo = b->acquireLogicalSegmentNo();
399//        Value * nextSegNo = b->CreateAdd(segNo, b->getSize(1));
400//        b->releaseLogicalSegmentNo(nextSegNo);
401    }
402
403    for (const auto consumed : consumedItemCount) {
404        const StreamSetBuffer * const buffer = consumed.first;
405        Kernel * const kernel = buffer->getProducer();
406        const auto & binding = kernel->getStreamOutput(buffer);
407        if (LLVM_UNLIKELY(binding.getRate().isDerived())) {
408            continue;
409        }
410        b->setKernel(kernel);
411        b->setConsumedItemCount(binding.getName(), consumed.second);
412    }
413
414    b->CreateCondBr(terminated, pipelineExit, pipelineLoop);
415
416    pipelineExit->moveAfter(b->GetInsertBlock());
417
418    b->SetInsertPoint(pipelineExit);
419
420    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
421        for (unsigned k = 0; k < kernels.size(); k++) {
422            auto & kernel = kernels[k];
423            b->setKernel(kernel);
424            const auto & inputs = kernel->getStreamInputs();
425            const auto & outputs = kernel->getStreamOutputs();
426            Value * items = nullptr;
427            if (inputs.empty()) {
428                items = b->getProducedItemCount(outputs[0].getName());
429            } else {
430                items = b->getProcessedItemCount(inputs[0].getName());
431            }
432            Value * fItems = b->CreateUIToFP(items, b->getDoubleTy());
433            Value * cycles = b->CreateLoad(b->getCycleCountPtr());
434            Value * fCycles = b->CreateUIToFP(cycles, b->getDoubleTy());
435            const auto formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
436            Value * stringPtr = b->CreatePointerCast(b->GetString(formatString), b->getInt8PtrTy());
437            b->CreateCall(b->GetDprintf(), {b->getInt32(2), stringPtr, fItems, fCycles, b->CreateFDiv(fCycles, fItems)});
438        }
439    }
440
441}
442
443/** ------------------------------------------------------------------------------------------------------------- *
444 * @brief applyOutputBufferExpansions
445 ** ------------------------------------------------------------------------------------------------------------- */
446void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const std::string & name, DynamicBuffer * const db, const uint64_t baseSize) {
447    BasicBlock * const doExpand = BasicBlock::Create(b->getContext(), name + "Expand", b->GetInsertBlock()->getParent());
448    BasicBlock * const nextBlock = b->GetInsertBlock()->getNextNode();
449    doExpand->moveAfter(b->GetInsertBlock());
450    BasicBlock * const bufferReady = b->CreateBasicBlock(name + "Ready");
451    bufferReady->moveAfter(doExpand);
452    if (nextBlock) nextBlock->moveAfter(bufferReady);
453
454    Value * const handle = db->getStreamSetHandle();
455
456    Value * const produced = b->getProducedItemCount(name);
457    Value * const consumed = b->getConsumedItemCount(name);
458    Value * const required = b->CreateAdd(b->CreateSub(produced, consumed), b->getSize(2 * baseSize));
459
460    b->CreateCondBr(b->CreateICmpUGT(required, db->getCapacity(b.get(), handle)), doExpand, bufferReady);
461
462    b->SetInsertPoint(doExpand);
463    db->doubleCapacity(b.get(), handle);
464    // Ensure that capacity is sufficient by successive doubling, if necessary.
465    b->CreateCondBr(b->CreateICmpUGT(required, db->getBufferedSize(b.get(), handle)), doExpand, bufferReady);
466
467    b->SetInsertPoint(bufferReady);
468}
469
470void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * k) {
471    const auto & outputs = k->getStreamSetOutputBuffers();
472    for (unsigned i = 0; i < outputs.size(); i++) {
473        if (isa<DynamicBuffer>(outputs[i])) {
474            const auto ub = k->getUpperBound(k->getStreamOutput(i).getRate());
475            const auto baseSize = (ub.numerator() * k->getStride() + ub.denominator() - 1) / ub.denominator();
476            if (LLVM_LIKELY(baseSize > 0)) {
477                const auto & name = k->getStreamOutput(i).getName();
478                applyOutputBufferExpansions(b, name, cast<DynamicBuffer>(outputs[i]), baseSize);
479            }
480        }
481    }
482}
483
484/** ------------------------------------------------------------------------------------------------------------- *
485 * @brief handleInsufficientData
486 ** ------------------------------------------------------------------------------------------------------------- */
487inline void handleInsufficientData(const std::unique_ptr<KernelBuilder> & b, Value * const produced, Value * const final, BasicBlock * const insufficient,
488                                   const Kernel * const consumer,  const Binding & input, const StreamSetBuffer * const buffer) {
489    const Kernel * const producer = buffer->getProducer();
490    const Binding & output = producer->getStreamOutput(buffer);
491    const auto consumedRate = consumer->getUpperBound(input.getRate()) * consumer->getStride();
492    if (consumedRate > 0) {
493        auto producedRate = producer->getLowerBound(output.getRate()) * producer->getStride();
494        if (LLVM_UNLIKELY(input.hasLookahead())) {
495            producedRate -= input.getLookahead();
496        }
497        if (LLVM_UNLIKELY(producedRate < consumedRate)) {
498            const auto name = input.getName();
499            BasicBlock * const sufficient = BasicBlock::Create(b->getContext(), name + "IsSufficient", b->GetInsertBlock()->getParent());
500            Value * const processed = b->getProcessedItemCount(name);
501
502            if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableAsserts))) {
503                b->CreateAssert(b->CreateICmpULE(processed, produced), input.getName() + ": processed cannot exceed produced");
504            }
505            Value * const unread = b->CreateSub(produced, processed);
506            Constant * const amount = ConstantInt::get(unread->getType(), ceiling(consumedRate));
507            Value * const cond = b->CreateOr(b->CreateICmpUGE(unread, amount), final);
508            b->CreateLikelyCondBr(cond, sufficient, insufficient);
509            b->SetInsertPoint(sufficient);
510        }
511    }
512}
513
514/** ------------------------------------------------------------------------------------------------------------- *
515 * @brief requiresCopyBack
516 ** ------------------------------------------------------------------------------------------------------------- */
517bool requiresCopyBack(const Kernel * k, const ProcessingRate & rate) {
518    if (rate.isBounded() || rate.isUnknown()) {
519        return true;
520    } else if (rate.isRelative()) {
521        return requiresCopyBack(k, k->getBinding(rate.getReference()).getRate());
522    }
523    return false;
524}
Note: See TracBrowser for help on using the repository browser.