source: icGREP/icgrep-devel/icgrep/toolchain/pipeline.cpp @ 5845

Last change on this file since 5845 was 5845, checked in by xwa163, 15 months ago

Update consumedItemCount early in pipeline

File size: 25.9 KB
Line 
1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
6#include "pipeline.h"
7#include <toolchain/toolchain.h>
8#include <kernels/kernel.h>
9#include <kernels/streamset.h>
10#include <llvm/IR/Module.h>
11#include <boost/container/flat_set.hpp>
12#include <boost/container/flat_map.hpp>
13#include <kernels/kernel_builder.h>
14
15using namespace kernel;
16using namespace parabix;
17using namespace llvm;
18
19using Port = Kernel::Port;
20
21template <typename Value>
22using StreamSetBufferMap = boost::container::flat_map<const StreamSetBuffer *, Value>;
23
24template <typename Value>
25using FlatSet = boost::container::flat_set<Value>;
26
27Function * makeThreadFunction(const std::unique_ptr<kernel::KernelBuilder> & b, const std::string & name) {
28    Function * const f = Function::Create(FunctionType::get(b->getVoidTy(), {b->getVoidPtrTy()}, false), Function::InternalLinkage, name, b->getModule());
29    f->setCallingConv(CallingConv::C);
30    f->arg_begin()->setName("state");
31    return f;
32}
33
34void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel);
35
36void handleInsufficientData(const std::unique_ptr<KernelBuilder> & b, Value * const produced, Value * const final, BasicBlock * const entry, const Kernel * const consumer,  const Binding & input, const StreamSetBuffer * const buffer);
37
38bool requiresCopyBack(const Kernel * k, const ProcessingRate & rate);
39
40/** ------------------------------------------------------------------------------------------------------------- *
41 * @brief generateSegmentParallelPipeline
42 *
43 * Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
44 * operating over an input stream set S, a segment-parallel implementation divides the input
45 * into segments and coordinates a set of T <= K threads to each process one segment at a time.
46 * Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
47 * fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
48 ** ------------------------------------------------------------------------------------------------------------- */
49void generateSegmentParallelPipeline(const std::unique_ptr<KernelBuilder> & b, const std::vector<Kernel *> & kernels) {
50
51    const unsigned n = kernels.size();
52    Module * const m = b->getModule();
53    IntegerType * const sizeTy = b->getSizeTy();
54    PointerType * const voidPtrTy = b->getVoidPtrTy();
55    Constant * nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
56    std::vector<Type *> structTypes;
57    codegen::BufferSegments = std::max(codegen::BufferSegments, codegen::ThreadNum);
58
59    Value * instance[n];
60    for (unsigned i = 0; i < n; ++i) {
61        instance[i] = kernels[i]->getInstance();
62        structTypes.push_back(instance[i]->getType());
63    }
64    StructType * const sharedStructType = StructType::get(m->getContext(), structTypes);
65    StructType * const threadStructType = StructType::get(m->getContext(), {sharedStructType->getPointerTo(), sizeTy});
66
67    const auto ip = b->saveIP();
68
69    Function * const threadFunc = makeThreadFunction(b, "segment");
70    auto args = threadFunc->arg_begin();
71
72    // -------------------------------------------------------------------------------------------------------------------------
73    // MAKE SEGMENT PARALLEL PIPELINE THREAD
74    // -------------------------------------------------------------------------------------------------------------------------
75
76     // Create the basic blocks for the thread function.
77    BasicBlock * entryBlock = BasicBlock::Create(b->getContext(), "entry", threadFunc);
78    b->SetInsertPoint(entryBlock);
79
80    Value * const threadStruct = b->CreateBitCast(&*(args), threadStructType->getPointerTo());
81
82    Value * const sharedStatePtr = b->CreateLoad(b->CreateGEP(threadStruct, {b->getInt32(0), b->getInt32(0)}));
83    for (unsigned k = 0; k < n; ++k) {
84        Value * ptr = b->CreateLoad(b->CreateGEP(sharedStatePtr, {b->getInt32(0), b->getInt32(k)}));
85        kernels[k]->setInstance(ptr);
86    }
87    Value * const segOffset = b->CreateLoad(b->CreateGEP(threadStruct, {b->getInt32(0), b->getInt32(1)}));
88
89    BasicBlock * const segmentLoop = BasicBlock::Create(b->getContext(), "segmentLoop", threadFunc);
90    b->CreateBr(segmentLoop);
91
92    b->SetInsertPoint(segmentLoop);
93    PHINode * const segNo = b->CreatePHI(b->getSizeTy(), 2, "segNo");
94    segNo->addIncoming(segOffset, entryBlock);
95
96    BasicBlock * const exitThreadBlock = BasicBlock::Create(b->getContext(), "exitThread", threadFunc);
97
98    StreamSetBufferMap<Value *> producedItemCount;
99    StreamSetBufferMap<Value *> consumedItemCount;
100    StreamSetBufferMap<Kernel *> lastUsedKernel;
101
102    Value * cycleCountStart = nullptr;
103    Value * cycleCountEnd = nullptr;
104    if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
105        cycleCountStart = b->CreateReadCycleCounter();
106    }
107
108    Value * terminated = nullptr;
109
110    const bool serialize = codegen::DebugOptionIsSet(codegen::SerializeThreads);
111
112    for (Kernel * const kernel : kernels) {
113        const auto & inputs = kernel->getStreamInputs();
114        for (unsigned i = 0; i < inputs.size(); ++i) {
115            const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
116            auto f = lastUsedKernel.find(buffer);
117            if (f == lastUsedKernel.end()) {
118                lastUsedKernel.emplace(buffer, kernel);
119            } else {
120                f->second = kernel;
121            }
122        }
123    }
124
125    for (unsigned k = 0; k < n; ++k) {
126
127        const auto & kernel = kernels[k];
128
129        BasicBlock * const kernelWait = BasicBlock::Create(b->getContext(), kernel->getName() + "Wait", threadFunc);
130
131        b->CreateBr(kernelWait);
132
133        BasicBlock * const kernelCheck = BasicBlock::Create(b->getContext(), kernel->getName() + "Check", threadFunc);
134
135        BasicBlock * const kernelBody = BasicBlock::Create(b->getContext(), kernel->getName() + "Do", threadFunc);
136
137        BasicBlock * const kernelEnd = BasicBlock::Create(b->getContext(), kernel->getName() + "End", threadFunc);
138
139        b->SetInsertPoint(kernelWait);
140
141        b->setKernel(kernels[serialize ? (n - 1) : k]);
142        Value * const processedSegmentCount = b->acquireLogicalSegmentNo();
143        b->setKernel(kernel);
144
145        assert (processedSegmentCount->getType() == segNo->getType());
146        Value * const ready = b->CreateICmpEQ(segNo, processedSegmentCount);       
147        b->CreateCondBr(ready, kernelCheck, kernelWait);
148
149        b->SetInsertPoint(kernelCheck);
150        b->CreateUnlikelyCondBr(b->getTerminationSignal(), kernelEnd, kernelBody);
151
152        // Execute the kernel segment
153        b->SetInsertPoint(kernelBody);
154        const auto & inputs = kernel->getStreamInputs();
155        Value * const isFinal = b->CreateOr(terminated ? terminated : b->getFalse(), b->getTerminationSignal());
156        std::vector<Value *> args = {kernel->getInstance(), isFinal};
157        for (unsigned i = 0; i < inputs.size(); ++i) {
158            const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
159            const auto f = producedItemCount.find(buffer);
160            assert (f != producedItemCount.end());
161            Value * const produced = f->second;
162            args.push_back(produced);
163            handleInsufficientData(b, produced, isFinal, kernelEnd, kernel, inputs[i], buffer);
164        }
165
166        b->setKernel(kernel);
167        b->createDoSegmentCall(args);
168        b->CreateBr(kernelEnd);
169
170        b->SetInsertPoint(kernelEnd);
171
172        Value * const finished = b->getTerminationSignal();
173        if (terminated) { // all kernels must terminate
174            terminated = b->CreateAnd(terminated, finished);
175        } else {
176            terminated = finished;
177        }
178
179        const auto & outputs = kernel->getStreamOutputs();
180        for (unsigned i = 0; i < outputs.size(); ++i) {           
181            Value * const produced = b->getProducedItemCount(outputs[i].getName());
182            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
183            assert (producedItemCount.count(buf) == 0);
184            producedItemCount.emplace(buf, produced);
185        }
186        for (unsigned i = 0; i < inputs.size(); ++i) {
187            Value * const processedItemCount = b->getProcessedItemCount(inputs[i].getName());
188            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);           
189            auto f = consumedItemCount.find(buf);
190            if (f == consumedItemCount.end()) {
191                consumedItemCount.emplace(buf, processedItemCount);
192            } else {
193                assert (f->second);
194                f->second = b->CreateUMin(processedItemCount, f->second);
195            }
196        }
197
198        for (auto i = lastUsedKernel.begin(); i != lastUsedKernel.end(); i++) {
199            if (i->second == kernel) {
200                const StreamSetBuffer * const buffer = i->first;
201                Kernel * const producerKernel = buffer->getProducer();
202                const auto & binding = producerKernel->getStreamOutput(buffer);
203                if (LLVM_UNLIKELY(binding.getRate().isDerived())) {
204                    continue;
205                }
206                auto f = consumedItemCount.find(buffer);
207                if (f != consumedItemCount.end()) {
208                    const Kernel* tempKernel = b->getKernel();
209                    b->setKernel(producerKernel);
210                    b->setConsumedItemCount(binding.getName(), f->second);
211                    b->setKernel(tempKernel);
212                }
213            }
214        }
215
216
217        if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
218            cycleCountEnd = b->CreateReadCycleCounter();
219            Value * counterPtr = b->getCycleCountPtr();
220            b->CreateStore(b->CreateAdd(b->CreateLoad(counterPtr), b->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
221            cycleCountStart = cycleCountEnd;
222        }
223
224        b->releaseLogicalSegmentNo(b->CreateAdd(segNo, b->getSize(1)));
225    }
226
227    exitThreadBlock->moveAfter(b->GetInsertBlock());
228
229    segNo->addIncoming(b->CreateAdd(segNo, b->getSize(codegen::ThreadNum)), b->GetInsertBlock());
230
231    b->CreateUnlikelyCondBr(terminated, exitThreadBlock, segmentLoop);
232
233    b->SetInsertPoint(exitThreadBlock);
234
235    // only call pthread_exit() within spawned threads; otherwise it'll be equivalent to calling exit() within the process
236    BasicBlock * const exitThread = BasicBlock::Create(b->getContext(), "ExitThread", threadFunc);
237    BasicBlock * const exitFunction = BasicBlock::Create(b->getContext(), "ExitProcessFunction", threadFunc);
238
239    Value * const exitCond = b->CreateICmpEQ(segOffset, ConstantInt::getNullValue(segOffset->getType()));
240    b->CreateCondBr(exitCond, exitFunction, exitThread);
241    b->SetInsertPoint(exitThread);
242    b->CreatePThreadExitCall(nullVoidPtrVal);
243    b->CreateBr(exitFunction);
244    b->SetInsertPoint(exitFunction);
245    b->CreateRetVoid();
246
247    // -------------------------------------------------------------------------------------------------------------------------
248    b->restoreIP(ip);
249
250    for (unsigned i = 0; i < n; ++i) {
251        kernels[i]->setInstance(instance[i]);
252    }
253
254    // -------------------------------------------------------------------------------------------------------------------------
255    // MAKE SEGMENT PARALLEL PIPELINE DRIVER
256    // -------------------------------------------------------------------------------------------------------------------------
257    const unsigned threads = codegen::ThreadNum - 1;
258    assert (codegen::ThreadNum > 0);
259    Type * const pthreadsTy = ArrayType::get(sizeTy, threads);
260    AllocaInst * const pthreads = b->CreateAlloca(pthreadsTy);
261    Value * threadIdPtr[threads];
262
263    for (unsigned i = 0; i < threads; ++i) {
264        threadIdPtr[i] = b->CreateGEP(pthreads, {b->getInt32(0), b->getInt32(i)});
265    }
266
267    for (unsigned i = 0; i < n; ++i) {
268        b->setKernel(kernels[i]);
269        b->releaseLogicalSegmentNo(b->getSize(0));
270    }
271
272    AllocaInst * const sharedStruct = b->CreateCacheAlignedAlloca(sharedStructType);
273    for (unsigned i = 0; i < n; ++i) {
274        Value * ptr = b->CreateGEP(sharedStruct, {b->getInt32(0), b->getInt32(i)});
275        b->CreateStore(kernels[i]->getInstance(), ptr);
276    }
277
278    // use the process thread to handle the initial segment function after spawning (n - 1) threads to handle the subsequent offsets
279    for (unsigned i = 0; i < threads; ++i) {
280        AllocaInst * const threadState = b->CreateAlloca(threadStructType);
281        b->CreateStore(sharedStruct, b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(0)}));
282        b->CreateStore(b->getSize(i + 1), b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(1)}));
283        b->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, threadFunc, threadState);
284    }
285
286    AllocaInst * const threadState = b->CreateAlloca(threadStructType);
287    b->CreateStore(sharedStruct, b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(0)}));
288    b->CreateStore(b->getSize(0), b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(1)}));
289    b->CreateCall(threadFunc, b->CreatePointerCast(threadState, voidPtrTy));
290
291    AllocaInst * const status = b->CreateAlloca(voidPtrTy);
292    for (unsigned i = 0; i < threads; ++i) {
293        Value * threadId = b->CreateLoad(threadIdPtr[i]);
294        b->CreatePThreadJoinCall(threadId, status);
295    }
296   
297    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
298        for (const Kernel * kernel : kernels) {
299            b->setKernel(kernel);
300            const auto & inputs = kernel->getStreamInputs();
301            const auto & outputs = kernel->getStreamOutputs();
302            Value * items = nullptr;
303            if (inputs.empty()) {
304                items = b->getProducedItemCount(outputs[0].getName());
305            } else {
306                items = b->getProcessedItemCount(inputs[0].getName());
307            }
308            Value * fItems = b->CreateUIToFP(items, b->getDoubleTy());
309            Value * cycles = b->CreateLoad(b->getCycleCountPtr());
310            Value * fCycles = b->CreateUIToFP(cycles, b->getDoubleTy());
311            const auto formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
312            Value * stringPtr = b->CreatePointerCast(b->GetString(formatString), b->getInt8PtrTy());
313            b->CreateCall(b->GetDprintf(), {b->getInt32(2), stringPtr, fItems, fCycles, b->CreateFDiv(fCycles, fItems)});
314        }
315    }
316   
317}
318
319/** ------------------------------------------------------------------------------------------------------------- *
320 * @brief generatePipelineLoop
321 ** ------------------------------------------------------------------------------------------------------------- */
322void generatePipelineLoop(const std::unique_ptr<KernelBuilder> & b, const std::vector<Kernel *> & kernels) {
323
324    BasicBlock * entryBlock = b->GetInsertBlock();
325    Function * main = entryBlock->getParent();
326
327    // Create the basic blocks for the loop.
328    BasicBlock * const pipelineLoop = BasicBlock::Create(b->getContext(), "pipelineLoop", main);
329    BasicBlock * const pipelineExit = BasicBlock::Create(b->getContext(), "pipelineExit", main);
330
331    StreamSetBufferMap<Value *> producedItemCount;
332    StreamSetBufferMap<Value *> consumedItemCount;
333    StreamSetBufferMap<Kernel *> lastUsedKernel;
334
335    b->CreateBr(pipelineLoop);
336    b->SetInsertPoint(pipelineLoop);
337   
338    Value * cycleCountStart = nullptr;
339    Value * cycleCountEnd = nullptr;
340    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
341        cycleCountStart = b->CreateReadCycleCounter();
342    }
343    Value * terminated = nullptr;
344
345    for (Kernel * const kernel : kernels) {
346        const auto & inputs = kernel->getStreamInputs();
347        for (unsigned i = 0; i < inputs.size(); ++i) {
348            const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
349            auto f = lastUsedKernel.find(buffer);
350            if (f == lastUsedKernel.end()) {
351                lastUsedKernel.emplace(buffer, kernel);
352            } else {
353                f->second = kernel;
354            }
355        }
356    }
357
358    for (Kernel * const kernel : kernels) {
359
360        b->setKernel(kernel);
361
362        BasicBlock * const entry = b->GetInsertBlock();
363        BasicBlock * const kernelCode = BasicBlock::Create(b->getContext(), kernel->getName(), main);
364        BasicBlock * const kernelExit = BasicBlock::Create(b->getContext(), kernel->getName() + "_exit", main);
365
366        b->CreateUnlikelyCondBr(b->getTerminationSignal(), kernelExit, kernelCode);
367
368        b->SetInsertPoint(kernelCode);
369        const auto & inputs = kernel->getStreamInputs();
370        const auto & outputs = kernel->getStreamOutputs();
371
372        Value * const isFinal = terminated ? terminated : b->getFalse();
373
374        std::vector<Value *> args = {kernel->getInstance(), isFinal};
375
376        const auto name = kernel->getName();
377        for (unsigned i = 0; i < inputs.size(); ++i) {
378            const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
379            const auto f = producedItemCount.find(buffer);
380            if (LLVM_UNLIKELY(f == producedItemCount.end())) {
381                report_fatal_error(kernel->getName() + " uses stream set " + inputs[i].getName() + " prior to its definition");
382            }
383            Value * const produced = f->second;
384            args.push_back(produced);
385            handleInsufficientData(b, produced, isFinal, pipelineLoop, kernel, inputs[i], buffer);
386        }
387
388        applyOutputBufferExpansions(b, kernel);
389
390        b->createDoSegmentCall(args);
391
392        BasicBlock * const kernelFinished = b->GetInsertBlock();
393        Value * const finished = b->getTerminationSignal();
394        b->CreateBr(kernelExit);
395
396        b->SetInsertPoint(kernelExit);
397        PHINode * const finishedPhi = b->CreatePHI(b->getInt1Ty(), 2);
398        finishedPhi->addIncoming(b->getTrue(), entry);
399        finishedPhi->addIncoming(finished, kernelFinished);
400        if (terminated) { // All kernels must agree that we've terminated.
401            terminated = b->CreateAnd(terminated, finishedPhi);
402        } else {
403            terminated = finishedPhi;
404        }
405
406        for (unsigned i = 0; i < outputs.size(); ++i) {
407            Value * const produced = b->getProducedItemCount(outputs[i].getName());
408            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
409            assert (producedItemCount.count(buf) == 0);
410            producedItemCount.emplace(buf, produced);
411        }
412
413        for (unsigned i = 0; i < inputs.size(); ++i) {
414            Value * const processed = b->getProcessedItemCount(inputs[i].getName());
415            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);
416            auto f = consumedItemCount.find(buf);
417            if (f == consumedItemCount.end()) {
418                consumedItemCount.emplace(buf, processed);
419            } else {
420                f->second = b->CreateUMin(processed, f->second);
421            }
422        }
423
424        for (auto i = lastUsedKernel.begin(); i != lastUsedKernel.end(); i++) {
425            if (i->second == kernel) {
426                const StreamSetBuffer * const buffer = i->first;
427                Kernel * const producerKernel = buffer->getProducer();
428                const auto & binding = producerKernel->getStreamOutput(buffer);
429                if (LLVM_UNLIKELY(binding.getRate().isDerived())) {
430                    continue;
431                }
432                auto f = consumedItemCount.find(buffer);
433                if (f != consumedItemCount.end()) {
434                    const Kernel* tempKernel = b->getKernel();
435                    b->setKernel(producerKernel);
436                    b->setConsumedItemCount(binding.getName(), f->second);
437                    b->setKernel(tempKernel);
438                }
439            }
440        }
441
442        if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
443            cycleCountEnd = b->CreateReadCycleCounter();
444            Value * counterPtr = b->getCycleCountPtr();
445            b->CreateStore(b->CreateAdd(b->CreateLoad(counterPtr), b->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
446            cycleCountStart = cycleCountEnd;
447        }
448//        Value * const segNo = b->acquireLogicalSegmentNo();
449//        Value * nextSegNo = b->CreateAdd(segNo, b->getSize(1));
450//        b->releaseLogicalSegmentNo(nextSegNo);
451    }
452
453    b->CreateCondBr(terminated, pipelineExit, pipelineLoop);
454
455    pipelineExit->moveAfter(b->GetInsertBlock());
456
457    b->SetInsertPoint(pipelineExit);
458
459    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
460        for (unsigned k = 0; k < kernels.size(); k++) {
461            auto & kernel = kernels[k];
462            b->setKernel(kernel);
463            const auto & inputs = kernel->getStreamInputs();
464            const auto & outputs = kernel->getStreamOutputs();
465            Value * items = nullptr;
466            if (inputs.empty()) {
467                items = b->getProducedItemCount(outputs[0].getName());
468            } else {
469                items = b->getProcessedItemCount(inputs[0].getName());
470            }
471            Value * fItems = b->CreateUIToFP(items, b->getDoubleTy());
472            Value * cycles = b->CreateLoad(b->getCycleCountPtr());
473            Value * fCycles = b->CreateUIToFP(cycles, b->getDoubleTy());
474            const auto formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
475            Value * stringPtr = b->CreatePointerCast(b->GetString(formatString), b->getInt8PtrTy());
476            b->CreateCall(b->GetDprintf(), {b->getInt32(2), stringPtr, fItems, fCycles, b->CreateFDiv(fCycles, fItems)});
477        }
478    }
479
480}
481
482/** ------------------------------------------------------------------------------------------------------------- *
483 * @brief applyOutputBufferExpansions
484 ** ------------------------------------------------------------------------------------------------------------- */
485void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const std::string & name, DynamicBuffer * const db, const uint64_t baseSize) {
486    BasicBlock * const doExpand = BasicBlock::Create(b->getContext(), name + "Expand", b->GetInsertBlock()->getParent());
487    BasicBlock * const nextBlock = b->GetInsertBlock()->getNextNode();
488    doExpand->moveAfter(b->GetInsertBlock());
489    BasicBlock * const bufferReady = b->CreateBasicBlock(name + "Ready");
490    bufferReady->moveAfter(doExpand);
491    if (nextBlock) nextBlock->moveAfter(bufferReady);
492
493    Value * const handle = db->getStreamSetHandle();
494
495    Value * const produced = b->getProducedItemCount(name);
496    Value * const consumed = b->getConsumedItemCount(name);
497    Value * const required = b->CreateAdd(b->CreateSub(produced, consumed), b->getSize(2 * baseSize));
498
499    b->CreateCondBr(b->CreateICmpUGT(required, db->getCapacity(b.get(), handle)), doExpand, bufferReady);
500
501    b->SetInsertPoint(doExpand);
502    db->doubleCapacity(b.get(), handle);
503    // Ensure that capacity is sufficient by successive doubling, if necessary.
504    b->CreateCondBr(b->CreateICmpUGT(required, db->getBufferedSize(b.get(), handle)), doExpand, bufferReady);
505
506    b->SetInsertPoint(bufferReady);
507}
508
509void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * k) {
510    const auto & outputs = k->getStreamSetOutputBuffers();
511    for (unsigned i = 0; i < outputs.size(); i++) {
512        if (isa<DynamicBuffer>(outputs[i])) {
513            const auto ub = k->getUpperBound(k->getStreamOutput(i).getRate());
514            const auto baseSize = (ub.numerator() * k->getStride() + ub.denominator() - 1) / ub.denominator();
515            if (LLVM_LIKELY(baseSize > 0)) {
516                const auto & name = k->getStreamOutput(i).getName();
517                applyOutputBufferExpansions(b, name, cast<DynamicBuffer>(outputs[i]), baseSize);
518            }
519        }
520    }
521}
522
523/** ------------------------------------------------------------------------------------------------------------- *
524 * @brief handleInsufficientData
525 ** ------------------------------------------------------------------------------------------------------------- */
526inline void handleInsufficientData(const std::unique_ptr<KernelBuilder> & b, Value * const produced, Value * const final, BasicBlock * const insufficient,
527                                   const Kernel * const consumer,  const Binding & input, const StreamSetBuffer * const buffer) {
528    const Kernel * const producer = buffer->getProducer();
529    const Binding & output = producer->getStreamOutput(buffer);
530    const auto consumedRate = consumer->getUpperBound(input.getRate()) * consumer->getStride();
531    if (consumedRate > 0) {
532        auto producedRate = producer->getLowerBound(output.getRate()) * producer->getStride();
533        if (LLVM_UNLIKELY(input.hasLookahead())) {
534            producedRate -= input.getLookahead();
535        }
536        if (LLVM_UNLIKELY(producedRate < consumedRate)) {
537            const auto name = input.getName();
538            BasicBlock * const sufficient = BasicBlock::Create(b->getContext(), name + "IsSufficient", b->GetInsertBlock()->getParent());
539            Value * const processed = b->getProcessedItemCount(name);
540
541            if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableAsserts))) {
542                b->CreateAssert(b->CreateICmpULE(processed, produced), input.getName() + ": processed cannot exceed produced");
543            }
544            Value * const unread = b->CreateSub(produced, processed);
545            Constant * const amount = ConstantInt::get(unread->getType(), ceiling(consumedRate));
546            Value * const cond = b->CreateOr(b->CreateICmpUGE(unread, amount), final);
547            b->CreateLikelyCondBr(cond, sufficient, insufficient);
548            b->SetInsertPoint(sufficient);
549        }
550    }
551}
552
553/** ------------------------------------------------------------------------------------------------------------- *
554 * @brief requiresCopyBack
555 ** ------------------------------------------------------------------------------------------------------------- */
556bool requiresCopyBack(const Kernel * k, const ProcessingRate & rate) {
557    if (rate.isBounded() || rate.isUnknown()) {
558        return true;
559    } else if (rate.isRelative()) {
560        return requiresCopyBack(k, k->getBinding(rate.getReference()).getRate());
561    }
562    return false;
563}
Note: See TracBrowser for help on using the repository browser.