source: icGREP/icgrep-devel/icgrep/toolchain/pipeline.cpp @ 5793

Last change on this file since 5793 was 5793, checked in by nmedfort, 16 months ago

Bug fix for pipeline: it was terminating too early when there was insufficient output space to process all of the input for a kernel.

File size: 23.2 KB
Line 
1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
6#include "pipeline.h"
7#include <toolchain/toolchain.h>
8#include <kernels/kernel.h>
9#include <kernels/streamset.h>
10#include <llvm/IR/Module.h>
11#include <boost/container/flat_set.hpp>
12#include <boost/container/flat_map.hpp>
13#include <kernels/kernel_builder.h>
14
15using namespace kernel;
16using namespace parabix;
17using namespace llvm;
18
19using Port = Kernel::Port;
20
21template <typename Value>
22using StreamSetBufferMap = boost::container::flat_map<const StreamSetBuffer *, Value>;
23
24template <typename Value>
25using FlatSet = boost::container::flat_set<Value>;
26
27Function * makeThreadFunction(const std::unique_ptr<kernel::KernelBuilder> & b, const std::string & name) {
28    Function * const f = Function::Create(FunctionType::get(b->getVoidTy(), {b->getVoidPtrTy()}, false), Function::InternalLinkage, name, b->getModule());
29    f->setCallingConv(CallingConv::C);
30    f->arg_begin()->setName("state");
31    return f;
32}
33
34void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel);
35
36void handleInsufficientData(const std::unique_ptr<KernelBuilder> & b, Value * const produced, Value * const final, BasicBlock * const entry, const Kernel * const consumer,  const Binding & input, const StreamSetBuffer * const buffer);
37
38/** ------------------------------------------------------------------------------------------------------------- *
39 * @brief generateSegmentParallelPipeline
40 *
41 * Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
42 * operating over an input stream set S, a segment-parallel implementation divides the input
43 * into segments and coordinates a set of T <= K threads to each process one segment at a time.
44 * Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
45 * fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
46 ** ------------------------------------------------------------------------------------------------------------- */
47void generateSegmentParallelPipeline(const std::unique_ptr<KernelBuilder> & b, const std::vector<Kernel *> & kernels) {
48
49    const unsigned n = kernels.size();
50    Module * const m = b->getModule();
51    IntegerType * const sizeTy = b->getSizeTy();
52    PointerType * const voidPtrTy = b->getVoidPtrTy();
53    Constant * nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
54    std::vector<Type *> structTypes;
55    codegen::BufferSegments = std::max(codegen::BufferSegments, codegen::ThreadNum);
56
57    Value * instance[n];
58    for (unsigned i = 0; i < n; ++i) {
59        instance[i] = kernels[i]->getInstance();
60        structTypes.push_back(instance[i]->getType());
61    }
62    StructType * const sharedStructType = StructType::get(m->getContext(), structTypes);
63    StructType * const threadStructType = StructType::get(m->getContext(), {sharedStructType->getPointerTo(), sizeTy});
64
65    const auto ip = b->saveIP();
66
67    Function * const threadFunc = makeThreadFunction(b, "segment");
68    auto args = threadFunc->arg_begin();
69
70    // -------------------------------------------------------------------------------------------------------------------------
71    // MAKE SEGMENT PARALLEL PIPELINE THREAD
72    // -------------------------------------------------------------------------------------------------------------------------
73
74     // Create the basic blocks for the thread function.
75    BasicBlock * entryBlock = BasicBlock::Create(b->getContext(), "entry", threadFunc);
76    b->SetInsertPoint(entryBlock);
77
78    Value * const threadStruct = b->CreateBitCast(&*(args), threadStructType->getPointerTo());
79
80    Value * const sharedStatePtr = b->CreateLoad(b->CreateGEP(threadStruct, {b->getInt32(0), b->getInt32(0)}));
81    for (unsigned k = 0; k < n; ++k) {
82        Value * ptr = b->CreateLoad(b->CreateGEP(sharedStatePtr, {b->getInt32(0), b->getInt32(k)}));
83        kernels[k]->setInstance(ptr);
84    }
85    Value * const segOffset = b->CreateLoad(b->CreateGEP(threadStruct, {b->getInt32(0), b->getInt32(1)}));
86
87    BasicBlock * const segmentLoop = BasicBlock::Create(b->getContext(), "segmentLoop", threadFunc);
88    b->CreateBr(segmentLoop);
89
90    b->SetInsertPoint(segmentLoop);
91    PHINode * const segNo = b->CreatePHI(b->getSizeTy(), 2, "segNo");
92    segNo->addIncoming(segOffset, entryBlock);
93
94    BasicBlock * const exitThreadBlock = BasicBlock::Create(b->getContext(), "exitThread", threadFunc);
95
96    StreamSetBufferMap<Value *> producedItemCount;
97    StreamSetBufferMap<Value *> consumedItemCount;
98
99    Value * cycleCountStart = nullptr;
100    Value * cycleCountEnd = nullptr;
101    if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
102        cycleCountStart = b->CreateReadCycleCounter();
103    }
104
105    Value * terminated = nullptr;
106
107    const bool serialize = codegen::DebugOptionIsSet(codegen::SerializeThreads);
108
109    for (unsigned k = 0; k < n; ++k) {
110
111        const auto & kernel = kernels[k];
112
113        BasicBlock * const kernelWait = BasicBlock::Create(b->getContext(), kernel->getName() + "Wait", threadFunc);
114
115        b->CreateBr(kernelWait);
116
117        BasicBlock * const kernelCheck = BasicBlock::Create(b->getContext(), kernel->getName() + "Check", threadFunc);
118
119        BasicBlock * const kernelBody = BasicBlock::Create(b->getContext(), kernel->getName() + "Do", threadFunc);
120
121        BasicBlock * const kernelEnd = BasicBlock::Create(b->getContext(), kernel->getName() + "End", threadFunc);
122
123        b->SetInsertPoint(kernelWait);
124
125        b->setKernel(kernels[serialize ? (n - 1) : k]);
126        Value * const processedSegmentCount = b->acquireLogicalSegmentNo();
127        b->setKernel(kernel);
128
129        assert (processedSegmentCount->getType() == segNo->getType());
130        Value * const ready = b->CreateICmpEQ(segNo, processedSegmentCount);       
131        b->CreateCondBr(ready, kernelCheck, kernelWait);
132
133        b->SetInsertPoint(kernelCheck);
134        b->CreateUnlikelyCondBr(b->getTerminationSignal(), kernelEnd, kernelBody);
135
136        // Execute the kernel segment
137        b->SetInsertPoint(kernelBody);
138        const auto & inputs = kernel->getStreamInputs();
139        Value * const isFinal = b->CreateOr(terminated ? terminated : b->getFalse(), b->getTerminationSignal());
140        std::vector<Value *> args = {kernel->getInstance(), isFinal};
141        for (unsigned i = 0; i < inputs.size(); ++i) {
142            const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
143            const auto f = producedItemCount.find(buffer);
144            assert (f != producedItemCount.end());
145            Value * const produced = f->second;
146            args.push_back(produced);
147            handleInsufficientData(b, produced, isFinal, kernelEnd, kernel, inputs[i], buffer);
148        }
149
150        b->setKernel(kernel);
151        b->createDoSegmentCall(args);
152        b->CreateBr(kernelEnd);
153
154        b->SetInsertPoint(kernelEnd);
155
156        Value * const finished = b->getTerminationSignal();
157        if (terminated) { // all kernels must terminate
158            terminated = b->CreateAnd(terminated, finished);
159        } else {
160            terminated = finished;
161        }
162
163        const auto & outputs = kernel->getStreamOutputs();
164        for (unsigned i = 0; i < outputs.size(); ++i) {           
165            Value * const produced = b->getProducedItemCount(outputs[i].getName());
166            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
167            assert (producedItemCount.count(buf) == 0);
168            producedItemCount.emplace(buf, produced);
169        }
170        for (unsigned i = 0; i < inputs.size(); ++i) {
171            Value * const processedItemCount = b->getProcessedItemCount(inputs[i].getName());
172            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);           
173            auto f = consumedItemCount.find(buf);
174            if (f == consumedItemCount.end()) {
175                consumedItemCount.emplace(buf, processedItemCount);
176            } else {
177                assert (f->second);
178                f->second = b->CreateUMin(processedItemCount, f->second);
179            }
180        }
181
182        if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
183            cycleCountEnd = b->CreateReadCycleCounter();
184            Value * counterPtr = b->getCycleCountPtr();
185            b->CreateStore(b->CreateAdd(b->CreateLoad(counterPtr), b->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
186            cycleCountStart = cycleCountEnd;
187        }
188
189        b->releaseLogicalSegmentNo(b->CreateAdd(segNo, b->getSize(1)));
190    }
191
192    exitThreadBlock->moveAfter(b->GetInsertBlock());
193    for (const auto consumed : consumedItemCount) {
194        const StreamSetBuffer * const buf = consumed.first;
195        Kernel * const k = buf->getProducer();
196        const auto & outputs = k->getStreamSetOutputBuffers();
197        for (unsigned i = 0; i < outputs.size(); ++i) {
198            if (outputs[i] == buf) {
199                const auto & binding = k->getStreamOutput(i);
200                if (LLVM_UNLIKELY(binding.getRate().isDerived())) {
201                    continue;
202                }
203                b->setKernel(k);
204                b->setConsumedItemCount(binding.getName(), consumed.second);
205                break;
206            }
207        }
208    }
209
210    segNo->addIncoming(b->CreateAdd(segNo, b->getSize(codegen::ThreadNum)), b->GetInsertBlock());
211    if (LLVM_UNLIKELY(terminated == nullptr)) {
212        report_fatal_error("error: at least one kernel must have a termination signal");
213    }
214    b->CreateUnlikelyCondBr(terminated, exitThreadBlock, segmentLoop);
215
216    b->SetInsertPoint(exitThreadBlock);
217
218    // only call pthread_exit() within spawned threads; otherwise it'll be equivalent to calling exit() within the process
219    BasicBlock * const exitThread = BasicBlock::Create(b->getContext(), "ExitThread", threadFunc);
220    BasicBlock * const exitFunction = BasicBlock::Create(b->getContext(), "ExitProcessFunction", threadFunc);
221
222    Value * const exitCond = b->CreateICmpEQ(segOffset, ConstantInt::getNullValue(segOffset->getType()));
223    b->CreateCondBr(exitCond, exitFunction, exitThread);
224    b->SetInsertPoint(exitThread);
225    b->CreatePThreadExitCall(nullVoidPtrVal);
226    b->CreateBr(exitFunction);
227    b->SetInsertPoint(exitFunction);
228    b->CreateRetVoid();
229
230    // -------------------------------------------------------------------------------------------------------------------------
231    b->restoreIP(ip);
232
233    for (unsigned i = 0; i < n; ++i) {
234        kernels[i]->setInstance(instance[i]);
235    }
236
237    // -------------------------------------------------------------------------------------------------------------------------
238    // MAKE SEGMENT PARALLEL PIPELINE DRIVER
239    // -------------------------------------------------------------------------------------------------------------------------
240    const unsigned threads = codegen::ThreadNum - 1;
241    assert (codegen::ThreadNum > 0);
242    Type * const pthreadsTy = ArrayType::get(sizeTy, threads);
243    AllocaInst * const pthreads = b->CreateAlloca(pthreadsTy);
244    Value * threadIdPtr[threads];
245
246    for (unsigned i = 0; i < threads; ++i) {
247        threadIdPtr[i] = b->CreateGEP(pthreads, {b->getInt32(0), b->getInt32(i)});
248    }
249
250    for (unsigned i = 0; i < n; ++i) {
251        b->setKernel(kernels[i]);
252        b->releaseLogicalSegmentNo(b->getSize(0));
253    }
254
255    AllocaInst * const sharedStruct = b->CreateCacheAlignedAlloca(sharedStructType);
256    for (unsigned i = 0; i < n; ++i) {
257        Value * ptr = b->CreateGEP(sharedStruct, {b->getInt32(0), b->getInt32(i)});
258        b->CreateStore(kernels[i]->getInstance(), ptr);
259    }
260
261    // use the process thread to handle the initial segment function after spawning (n - 1) threads to handle the subsequent offsets
262    for (unsigned i = 0; i < threads; ++i) {
263        AllocaInst * const threadState = b->CreateAlloca(threadStructType);
264        b->CreateStore(sharedStruct, b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(0)}));
265        b->CreateStore(b->getSize(i + 1), b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(1)}));
266        b->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, threadFunc, threadState);
267    }
268
269    AllocaInst * const threadState = b->CreateAlloca(threadStructType);
270    b->CreateStore(sharedStruct, b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(0)}));
271    b->CreateStore(b->getSize(0), b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(1)}));
272    b->CreateCall(threadFunc, b->CreatePointerCast(threadState, voidPtrTy));
273
274    AllocaInst * const status = b->CreateAlloca(voidPtrTy);
275    for (unsigned i = 0; i < threads; ++i) {
276        Value * threadId = b->CreateLoad(threadIdPtr[i]);
277        b->CreatePThreadJoinCall(threadId, status);
278    }
279   
280    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
281        for (const Kernel * kernel : kernels) {
282            b->setKernel(kernel);
283            const auto & inputs = kernel->getStreamInputs();
284            const auto & outputs = kernel->getStreamOutputs();
285            Value * items = nullptr;
286            if (inputs.empty()) {
287                items = b->getProducedItemCount(outputs[0].getName());
288            } else {
289                items = b->getProcessedItemCount(inputs[0].getName());
290            }
291            Value * fItems = b->CreateUIToFP(items, b->getDoubleTy());
292            Value * cycles = b->CreateLoad(b->getCycleCountPtr());
293            Value * fCycles = b->CreateUIToFP(cycles, b->getDoubleTy());
294            const auto formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
295            Value * stringPtr = b->CreatePointerCast(b->GetString(formatString), b->getInt8PtrTy());
296            b->CreateCall(b->GetDprintf(), {b->getInt32(2), stringPtr, fItems, fCycles, b->CreateFDiv(fCycles, fItems)});
297        }
298    }
299   
300}
301
302/** ------------------------------------------------------------------------------------------------------------- *
303 * @brief generatePipelineLoop
304 ** ------------------------------------------------------------------------------------------------------------- */
305void generatePipelineLoop(const std::unique_ptr<KernelBuilder> & b, const std::vector<Kernel *> & kernels) {
306
307    BasicBlock * entryBlock = b->GetInsertBlock();
308    Function * main = entryBlock->getParent();
309
310    // Create the basic blocks for the loop.
311    BasicBlock * pipelineLoop = BasicBlock::Create(b->getContext(), "pipelineLoop", main);
312    BasicBlock * pipelineExit = BasicBlock::Create(b->getContext(), "pipelineExit", main);
313
314    StreamSetBufferMap<Value *> producedItemCount;
315    StreamSetBufferMap<Value *> consumedItemCount;
316
317    b->CreateBr(pipelineLoop);
318    b->SetInsertPoint(pipelineLoop);
319   
320    Value * cycleCountStart = nullptr;
321    Value * cycleCountEnd = nullptr;
322    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
323        cycleCountStart = b->CreateReadCycleCounter();
324    }
325    Value * terminated = nullptr;
326
327    for (Kernel * const kernel : kernels) {
328
329        b->setKernel(kernel);
330        const auto & inputs = kernel->getStreamInputs();
331        const auto & outputs = kernel->getStreamOutputs();
332
333        Value * const isFinal = terminated ? terminated : b->getFalse();
334
335        std::vector<Value *> args = {kernel->getInstance(), isFinal};
336
337        for (unsigned i = 0; i < inputs.size(); ++i) {
338            const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
339            const auto f = producedItemCount.find(buffer);
340            if (LLVM_UNLIKELY(f == producedItemCount.end())) {
341                report_fatal_error(kernel->getName() + " uses stream set " + inputs[i].getName() + " prior to its definition");
342            }
343            Value * const produced = f->second;
344            args.push_back(produced);
345            handleInsufficientData(b, produced, isFinal, pipelineLoop, kernel, inputs[i], buffer);
346        }
347
348        applyOutputBufferExpansions(b, kernel);
349
350        b->createDoSegmentCall(args);
351
352        Value * const finished = b->getTerminationSignal();
353        if (terminated) {
354            // All kernels must agree that we've terminated.
355            terminated = b->CreateAnd(terminated, finished);
356        } else {
357            terminated = finished;
358        }
359
360        for (unsigned i = 0; i < outputs.size(); ++i) {
361            Value * const produced = b->getProducedItemCount(outputs[i].getName());
362            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
363            assert (producedItemCount.count(buf) == 0);
364            producedItemCount.emplace(buf, produced);
365        }
366
367        for (unsigned i = 0; i < inputs.size(); ++i) {
368            Value * const processed = b->getProcessedItemCount(inputs[i].getName());
369            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);
370            auto f = consumedItemCount.find(buf);
371            if (f == consumedItemCount.end()) {
372                consumedItemCount.emplace(buf, processed);
373            } else {
374                f->second = b->CreateUMin(processed, f->second);
375            }
376        }
377
378        if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
379            cycleCountEnd = b->CreateReadCycleCounter();
380            Value * counterPtr = b->getCycleCountPtr();
381            b->CreateStore(b->CreateAdd(b->CreateLoad(counterPtr), b->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
382            cycleCountStart = cycleCountEnd;
383        }
384//        Value * const segNo = b->acquireLogicalSegmentNo();
385//        Value * nextSegNo = b->CreateAdd(segNo, b->getSize(1));
386//        b->releaseLogicalSegmentNo(nextSegNo);
387    }
388
389    for (const auto consumed : consumedItemCount) {
390        const StreamSetBuffer * const buffer = consumed.first;
391        Kernel * const kernel = buffer->getProducer();
392        const auto & binding = kernel->getStreamOutput(buffer);
393        if (LLVM_UNLIKELY(binding.getRate().isDerived())) {
394            continue;
395        }
396        b->setKernel(kernel);
397        b->setConsumedItemCount(binding.getName(), consumed.second);
398    }
399
400    if (LLVM_UNLIKELY(terminated == nullptr)) {
401        report_fatal_error("error: at least one kernel must have a termination signal");
402    }
403    b->CreateCondBr(terminated, pipelineExit, pipelineLoop);
404
405    pipelineExit->moveAfter(b->GetInsertBlock());
406
407    b->SetInsertPoint(pipelineExit);
408
409    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
410        for (unsigned k = 0; k < kernels.size(); k++) {
411            auto & kernel = kernels[k];
412            b->setKernel(kernel);
413            const auto & inputs = kernel->getStreamInputs();
414            const auto & outputs = kernel->getStreamOutputs();
415            Value * items = nullptr;
416            if (inputs.empty()) {
417                items = b->getProducedItemCount(outputs[0].getName());
418            } else {
419                items = b->getProcessedItemCount(inputs[0].getName());
420            }
421            Value * fItems = b->CreateUIToFP(items, b->getDoubleTy());
422            Value * cycles = b->CreateLoad(b->getCycleCountPtr());
423            Value * fCycles = b->CreateUIToFP(cycles, b->getDoubleTy());
424            const auto formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
425            Value * stringPtr = b->CreatePointerCast(b->GetString(formatString), b->getInt8PtrTy());
426            b->CreateCall(b->GetDprintf(), {b->getInt32(2), stringPtr, fItems, fCycles, b->CreateFDiv(fCycles, fItems)});
427        }
428    }
429
430}
431
432/** ------------------------------------------------------------------------------------------------------------- *
433 * @brief applyOutputBufferExpansions
434 ** ------------------------------------------------------------------------------------------------------------- */
435void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const std::string & name, DynamicBuffer * const db, const uint64_t baseSize) {
436    BasicBlock * const doExpand = BasicBlock::Create(b->getContext(), name + "Expand", b->GetInsertBlock()->getParent());
437    BasicBlock * const nextBlock = b->GetInsertBlock()->getNextNode();
438    doExpand->moveAfter(b->GetInsertBlock());
439    BasicBlock * const bufferReady = b->CreateBasicBlock(name + "Ready");
440    bufferReady->moveAfter(doExpand);
441    if (nextBlock) nextBlock->moveAfter(bufferReady);
442
443    Value * const handle = db->getStreamSetHandle();
444
445    Value * const produced = b->getProducedItemCount(name);
446    Value * const consumed = b->getConsumedItemCount(name);
447    Value * const required = b->CreateAdd(b->CreateSub(produced, consumed), b->getSize(2 * baseSize));
448
449    b->CreateCondBr(b->CreateICmpUGT(required, db->getCapacity(b.get(), handle)), doExpand, bufferReady);
450
451    b->SetInsertPoint(doExpand);
452    db->doubleCapacity(b.get(), handle);
453    // Ensure that capacity is sufficient by successive doubling, if necessary.
454    b->CreateCondBr(b->CreateICmpUGT(required, db->getBufferedSize(b.get(), handle)), doExpand, bufferReady);
455
456    b->SetInsertPoint(bufferReady);
457}
458
459void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * k) {
460    const auto & outputs = k->getStreamSetOutputBuffers();
461    for (unsigned i = 0; i < outputs.size(); i++) {
462        if (isa<DynamicBuffer>(outputs[i])) {
463            const auto ub = k->getUpperBound(k->getStreamOutput(i).getRate());
464            const auto baseSize = (ub.numerator() * k->getStride() + ub.denominator() - 1) / ub.denominator();
465            if (LLVM_LIKELY(baseSize > 0)) {
466                const auto & name = k->getStreamOutput(i).getName();
467                applyOutputBufferExpansions(b, name, cast<DynamicBuffer>(outputs[i]), baseSize);
468            }
469        }
470    }
471}
472
473/** ------------------------------------------------------------------------------------------------------------- *
474 * @brief handleInsufficientData
475 ** ------------------------------------------------------------------------------------------------------------- */
476inline void handleInsufficientData(const std::unique_ptr<KernelBuilder> & b, Value * const produced, Value * const final, BasicBlock * const insufficient,
477                                   const Kernel * const consumer,  const Binding & input, const StreamSetBuffer * const buffer) {
478    const Kernel * const producer = buffer->getProducer();
479    const Binding & output = producer->getStreamOutput(buffer);
480    const auto consumedRate = consumer->getUpperBound(input.getRate()) * consumer->getStride();
481    if (consumedRate > 0) {
482        auto producedRate = producer->getLowerBound(output.getRate()) * producer->getStride();
483        if (LLVM_UNLIKELY(input.hasLookahead())) {
484            producedRate -= input.getLookahead();
485        }
486        if (LLVM_UNLIKELY(producedRate < consumedRate)) {
487            const auto name = input.getName();
488            BasicBlock * const sufficient = BasicBlock::Create(b->getContext(), name + "IsSufficient", b->GetInsertBlock()->getParent());
489            Value * const processed = b->getProcessedItemCount(name);
490
491            if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableAsserts))) {
492                b->CreateAssert(b->CreateICmpULE(processed, produced), input.getName() + ": processed cannot exceed produced");
493            }
494            Value * const unread = b->CreateSub(produced, processed);
495            Constant * const amount = ConstantInt::get(unread->getType(), ceiling(consumedRate));
496            Value * const cond = b->CreateOr(b->CreateICmpUGE(unread, amount), final);
497            b->CreateLikelyCondBr(cond, sufficient, insufficient);
498            b->SetInsertPoint(sufficient);
499        }
500    }
501}
502
Note: See TracBrowser for help on using the repository browser.