source: icGREP/icgrep-devel/icgrep/kernels/pipeline.cpp @ 5403

Last change on this file since 5403 was 5403, checked in by nmedfort, 2 years ago

Work on the pipeline algorithms.

File size: 23.9 KB
Line 
1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
6#include "pipeline.h"
7#include <kernels/toolchain.h>
8#include <kernels/kernel.h>
9#include <kernels/streamset.h>
10#include <llvm/IR/Module.h>
11#include <boost/container/flat_set.hpp>
12#include <boost/container/flat_map.hpp>
13
14using namespace kernel;
15using namespace parabix;
16using namespace llvm;
17
18template <typename Value>
19using StreamSetBufferMap = boost::container::flat_map<const StreamSetBuffer *, Value>;
20
21template <typename Value>
22using FlatSet = boost::container::flat_set<Value>;
23
24Function * makeThreadFunction(const std::string & name, Module * const m) {
25    LLVMContext & C = m->getContext();
26    Type * const voidTy = Type::getVoidTy(C);
27    PointerType * const int8PtrTy = Type::getInt8PtrTy(C);
28    Function * const f = Function::Create(FunctionType::get(voidTy, {int8PtrTy}, false), Function::InternalLinkage, name, m);
29    f->setCallingConv(CallingConv::C);
30    f->arg_begin()->setName("input");
31    return f;
32}
33
34/** ------------------------------------------------------------------------------------------------------------- *
35 * @brief generateSegmentParallelPipeline
36 *
37 * Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
38 * operating over an input stream set S, a segment-parallel implementation divides the input
39 * into segments and coordinates a set of T <= K threads to each process one segment at a time.
40 * Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
41 * fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
42 ** ------------------------------------------------------------------------------------------------------------- */
43void generateSegmentParallelPipeline(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
44
45    const unsigned n = kernels.size(); assert (n > 0);
46    Module * const m = iBuilder->getModule();
47    IntegerType * const sizeTy = iBuilder->getSizeTy();
48    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
49    PointerType * const int8PtrTy = iBuilder->getInt8PtrTy();
50    const unsigned threads = codegen::ThreadNum;
51    Constant * nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
52
53    std::vector<Type *> structTypes;
54    for (unsigned i = 0; i < n; i++) {
55        kernels[i]->createInstance();
56        structTypes.push_back(kernels[i]->getInstance()->getType());
57    }
58    StructType * const sharedStructType = StructType::get(m->getContext(), structTypes);
59    StructType * const threadStructType = StructType::get(sharedStructType->getPointerTo(), sizeTy, nullptr);
60
61    // -------------------------------------------------------------------------------------------------------------------------
62    // MAKE SEGMENT PARALLEL PIPELINE THREAD
63    // -------------------------------------------------------------------------------------------------------------------------
64    const auto ip = iBuilder->saveIP();
65    Function * const threadFunc = makeThreadFunction("sppt", m);
66
67     // Create the basic blocks for the thread function.
68    BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc);
69    iBuilder->SetInsertPoint(entryBlock);
70    Value * const input = &threadFunc->getArgumentList().front();
71    Value * const threadStruct = iBuilder->CreatePointerCast(input, threadStructType->getPointerTo());
72    Value * const sharedStatePtr = iBuilder->CreateLoad(iBuilder->CreateGEP(threadStruct, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
73    Value * instance[n];
74    for (unsigned k = 0; k < n; k++) {
75        Value * ptr = iBuilder->CreateGEP(sharedStatePtr, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
76        instance[k] = iBuilder->CreateLoad(ptr);
77    }
78    Value * const segOffset = iBuilder->CreateLoad(iBuilder->CreateGEP(threadStruct, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
79
80    BasicBlock * segmentLoop = BasicBlock::Create(iBuilder->getContext(), "segmentLoop", threadFunc);
81    iBuilder->CreateBr(segmentLoop);
82
83    iBuilder->SetInsertPoint(segmentLoop);
84    PHINode * const segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 2, "segNo");
85    segNo->addIncoming(segOffset, entryBlock);
86
87    Value * doFinal = iBuilder->getFalse();
88    Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
89
90    BasicBlock * segmentWait = BasicBlock::Create(iBuilder->getContext(), kernels[0]->getName() + "Wait", threadFunc);
91
92    BasicBlock * segmentLoopBody = nullptr;
93
94    BasicBlock * const exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
95
96    iBuilder->CreateBr(segmentWait);
97
98    StreamSetBufferMap<Value *> producedPos;
99
100    for (unsigned k = 0;;) {
101        KernelBuilder * const kernel = kernels[k];
102        const unsigned waitIdx = codegen::DebugOptionIsSet(codegen::SerializeThreads) ? (n - 1) : k;
103
104        segmentLoopBody = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Do", threadFunc);
105
106        iBuilder->SetInsertPoint(segmentWait);
107        Value * const processedSegmentCount = kernels[waitIdx]->acquireLogicalSegmentNo(instance[waitIdx]);
108        assert (processedSegmentCount->getType() == segNo->getType());
109        Value * const ready = iBuilder->CreateICmpEQ(segNo, processedSegmentCount);
110
111        if (kernel->hasNoTerminateAttribute()) {
112            iBuilder->CreateCondBr(ready, segmentLoopBody, segmentWait);
113        } else { // If the kernel was terminated in a previous segment then the pipeline is done.
114            BasicBlock * completionTest = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Completed", threadFunc, 0);
115            BasicBlock * exitBlock = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Exit", threadFunc, 0);
116            iBuilder->CreateCondBr(ready, completionTest, segmentWait);
117            iBuilder->SetInsertPoint(completionTest);
118            Value * alreadyDone = kernel->getTerminationSignal(instance[k]);
119            iBuilder->CreateCondBr(alreadyDone, exitBlock, segmentLoopBody);
120            iBuilder->SetInsertPoint(exitBlock);
121            // Ensure that the next thread will also exit.
122            kernel->releaseLogicalSegmentNo(instance[k], nextSegNo);
123            iBuilder->CreateBr(exitThreadBlock);
124        }
125
126        // Execute the kernel segment
127        iBuilder->SetInsertPoint(segmentLoopBody);
128        const auto & inputs = kernel->getStreamInputs();
129        const auto & outputs = kernel->getStreamOutputs();
130        std::vector<Value *> args = {instance[k], doFinal};
131        for (unsigned i = 0; i < inputs.size(); ++i) {
132            const auto f = producedPos.find(kernel->getStreamSetInputBuffer(i));
133            if (LLVM_UNLIKELY(f == producedPos.end())) {
134                report_fatal_error(kernel->getName() + " uses stream set " + inputs[i].name + " prior to its definition");
135            }
136            args.push_back(f->second);
137        }
138        for (unsigned i = 0; i < outputs.size(); ++i) {
139            args.push_back(iBuilder->getSize(0));
140        }
141        CallInst * ci = kernel->createDoSegmentCall(args);
142        // TODO: investigate whether this actually inlines the function call correctly despite being in a seperate module.
143        ci->addAttribute(AttributeSet::FunctionIndex, Attribute::AlwaysInline);
144
145        if (!kernel->hasNoTerminateAttribute()) {
146            doFinal = iBuilder->CreateOr(doFinal, kernel->getTerminationSignal(instance[k]));
147        }
148        for (unsigned i = 0; i < outputs.size(); i++) {
149            Value * const produced = kernel->getProducedItemCount(instance[k], outputs[i].name, doFinal);
150            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
151            assert (producedPos.count(buf) == 0);
152            producedPos.emplace(buf, produced);
153        }
154
155        kernel->releaseLogicalSegmentNo(instance[k], nextSegNo);
156        if (LLVM_UNLIKELY(++k == n)) {
157            assert (segmentLoopBody);
158            exitThreadBlock->moveAfter(segmentLoopBody);
159            segNo->addIncoming(iBuilder->CreateAdd(segNo, iBuilder->getSize(threads)), segmentLoopBody);
160            iBuilder->CreateCondBr(doFinal, exitThreadBlock, segmentLoop);
161
162            iBuilder->SetInsertPoint(exitThreadBlock);
163            iBuilder->CreatePThreadExitCall(nullVoidPtrVal);
164            iBuilder->CreateRetVoid();
165            break;
166        } else {
167            segmentWait = BasicBlock::Create(iBuilder->getContext(), kernels[k]->getName() + "Wait", threadFunc);
168            iBuilder->CreateBr(segmentWait);
169        }
170    }
171
172    // -------------------------------------------------------------------------------------------------------------------------
173    iBuilder->restoreIP(ip);
174
175    // -------------------------------------------------------------------------------------------------------------------------
176    // MAKE SEGMENT PARALLEL PIPELINE DRIVER
177    // -------------------------------------------------------------------------------------------------------------------------
178
179    Type * const pthreadsTy = ArrayType::get(sizeTy, threads);
180    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
181    Value * threadIdPtr[threads];
182    for (unsigned i = 0; i < threads; i++) {
183        threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
184    }
185
186    for (unsigned i = 0; i < n; i++) {
187        kernels[i]->releaseLogicalSegmentNo(kernels[i]->getInstance(), iBuilder->getSize(0));
188    }
189
190    AllocaInst * const sharedStruct = iBuilder->CreateCacheAlignedAlloca(sharedStructType);
191    for (unsigned i = 0; i < n; i++) {
192        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
193        iBuilder->CreateStore(kernels[i]->getInstance(), ptr);
194    }
195
196    for (unsigned i = 0; i < threads; i++) {
197        AllocaInst * threadState = iBuilder->CreateAlloca(threadStructType);
198        Value * const sharedStatePtr = iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(0)});
199        iBuilder->CreateStore(sharedStruct, sharedStatePtr);
200        Value * const segmentOffsetPtr = iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(1)});
201        iBuilder->CreateStore(iBuilder->getSize(i), segmentOffsetPtr);
202        iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, threadFunc, threadState);
203    }
204
205    AllocaInst * const status = iBuilder->CreateAlloca(int8PtrTy);
206    for (unsigned i = 0; i < threads; i++) {
207        Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]);
208        iBuilder->CreatePThreadJoinCall(threadId, status);
209    }
210}
211
212
213/** ------------------------------------------------------------------------------------------------------------- *
214 * @brief generateParallelPipeline
215 ** ------------------------------------------------------------------------------------------------------------- */
216void generateParallelPipeline(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
217
218    Module * const m = iBuilder->getModule();
219    IntegerType * const sizeTy = iBuilder->getSizeTy();
220    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
221    PointerType * const int8PtrTy = iBuilder->getInt8PtrTy();
222    ConstantInt * bufferSegments = ConstantInt::get(sizeTy, codegen::BufferSegments - 1);
223    ConstantInt * segmentItems = ConstantInt::get(sizeTy, codegen::SegmentSize * iBuilder->getBitBlockWidth());
224    Constant * const nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
225
226    for (auto k : kernels) {
227        k->createInstance();
228    }
229
230    const unsigned n = kernels.size();
231
232    Type * const pthreadsTy = ArrayType::get(sizeTy, n);
233    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
234    Value * threadIdPtr[n];
235    for (unsigned i = 0; i < n; i++) {
236        threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
237    }
238
239    Type * structTypes[n];
240    for (unsigned i = 0; i < n; i++) {
241        structTypes[i] = kernels[i]->getInstance()->getType();
242    }
243    Type * const sharedStructType = StructType::get(m->getContext(), ArrayRef<Type *>{structTypes, n});
244    AllocaInst * sharedStruct = iBuilder->CreateAlloca(sharedStructType);
245    for (unsigned i = 0; i < n; i++) {
246        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
247        iBuilder->CreateStore(kernels[i]->getInstance(), ptr);
248    }
249    for (unsigned i = 0; i < n; i++) {
250        kernels[i]->releaseLogicalSegmentNo(kernels[i]->getInstance(), iBuilder->getSize(0));
251    }
252
253    // GENERATE THE PRODUCING AND CONSUMING KERNEL MAPS
254    StreamSetBufferMap<unsigned> producingKernel;
255    StreamSetBufferMap<std::vector<unsigned>> consumingKernels;
256    for (unsigned id = 0; id < n; ++id) {
257        KernelBuilder * const kernel = kernels[id];
258        const auto & inputs = kernel->getStreamInputs();
259        const auto & outputs = kernel->getStreamOutputs();
260        // add any outputs from this kernel to the producing kernel map
261        for (unsigned j = 0; j < outputs.size(); ++j) {
262            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(j);
263            if (LLVM_UNLIKELY(producingKernel.count(buf) != 0)) {
264                report_fatal_error(kernel->getName() + " redefines stream set " + outputs[j].name);
265            }
266            producingKernel.emplace(buf, id);
267        }
268        // and any inputs to the consuming kernels list
269        for (unsigned j = 0; j < inputs.size(); ++j) {
270            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(j);
271            auto f = consumingKernels.find(buf);
272            if (f == consumingKernels.end()) {
273                if (LLVM_UNLIKELY(producingKernel.count(buf) == 0)) {
274                    report_fatal_error(kernel->getName() + " uses stream set " + inputs[j].name + " prior to its definition");
275                }
276                consumingKernels.emplace(buf, std::vector<unsigned>{ id });
277            } else {
278                f->second.push_back(id);
279            }
280        }
281    }
282
283    const auto ip = iBuilder->saveIP();
284
285    // GENERATE UNIQUE PIPELINE PARALLEL THREAD FUNCTION FOR EACH KERNEL
286    FlatSet<unsigned> kernelSet;
287    kernelSet.reserve(n);
288
289    Function * thread_functions[n];
290    for (unsigned id = 0; id < n; id++) {
291        KernelBuilder * const kernel = kernels[id];
292        const auto & inputs = kernel->getStreamInputs();
293        const auto & outputs = kernel->getStreamOutputs();
294
295        Function * const threadFunc = makeThreadFunction("ppt:" + kernel->getName(), m);
296
297         // Create the basic blocks for the thread function.
298        BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc);
299        BasicBlock * outputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "outputCheck", threadFunc);
300        BasicBlock * inputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "inputCheck", threadFunc);
301        BasicBlock * doSegmentBlock = BasicBlock::Create(iBuilder->getContext(), "doSegment", threadFunc);
302        BasicBlock * exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
303
304        iBuilder->SetInsertPoint(entryBlock);
305
306        Value * sharedStruct = iBuilder->CreateBitCast(&threadFunc->getArgumentList().front(), sharedStructType->getPointerTo());
307
308        Value * instancePtrs[n];
309        for (unsigned k = 0; k < n; k++) {
310            Value * const ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
311            instancePtrs[k] = iBuilder->CreateLoad(ptr);
312        }
313
314        iBuilder->CreateBr(outputCheckBlock);
315
316        // Check whether the output buffers are ready for more data
317        iBuilder->SetInsertPoint(outputCheckBlock);
318        PHINode * segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 3, "segNo");
319        segNo->addIncoming(iBuilder->getSize(0), entryBlock);
320        segNo->addIncoming(segNo, outputCheckBlock);
321
322        Value * outputWaitCond = iBuilder->getTrue();
323        for (const StreamSetBuffer * buf : kernel->getStreamSetOutputBuffers()) {
324            const auto & list = consumingKernels[buf];
325            assert(std::is_sorted(list.begin(), list.end()));
326            kernelSet.insert(list.begin(), list.end());
327        }
328        for (unsigned k : kernelSet) {
329            Value * consumerSegNo = kernels[k]->acquireLogicalSegmentNo(instancePtrs[k]);
330            assert (consumerSegNo->getType() == segNo->getType());
331            Value * consumedSegNo = iBuilder->CreateAdd(consumerSegNo, bufferSegments);
332            outputWaitCond = iBuilder->CreateAnd(outputWaitCond, iBuilder->CreateICmpULE(segNo, consumedSegNo));
333        }
334        kernelSet.clear();
335        iBuilder->CreateCondBr(outputWaitCond, inputCheckBlock, outputCheckBlock);
336
337        // Check whether the input buffers have enough data for this kernel to begin
338        iBuilder->SetInsertPoint(inputCheckBlock);
339        for (const StreamSetBuffer * buf : kernel->getStreamSetInputBuffers()) {
340            kernelSet.insert(producingKernel[buf]);
341        }
342        const auto m = kernelSet.size();
343        Value * producerSegNo[m];
344
345        Value * inputWaitCond = iBuilder->getTrue();
346        if (m) {
347            unsigned j = 0;
348            for (unsigned k : kernelSet) {
349                producerSegNo[j] = kernels[k]->acquireLogicalSegmentNo(instancePtrs[k]);
350                assert (producerSegNo[j]->getType() == segNo->getType());
351                inputWaitCond = iBuilder->CreateAnd(inputWaitCond, iBuilder->CreateICmpULT(segNo, producerSegNo[j++]));
352            }
353        }
354        iBuilder->CreateCondBr(inputWaitCond, doSegmentBlock, inputCheckBlock);
355
356        // Process the segment
357        iBuilder->SetInsertPoint(doSegmentBlock);
358
359        Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
360
361        Value * terminated = nullptr;
362        if (m == 0) {
363            // if this kernel has no input streams, the kernel itself must decide when it terminates.
364            terminated = kernel->getTerminationSignal(instancePtrs[id]);
365        } else {
366            // ... otherwise the kernel terminates only when it exhausts all of its input streams
367            terminated = iBuilder->getTrue();
368            unsigned j = 0;
369            for (unsigned k : kernelSet) {
370                terminated = iBuilder->CreateAnd(terminated, kernels[k]->getTerminationSignal(instancePtrs[k]));
371                terminated = iBuilder->CreateAnd(terminated, iBuilder->CreateICmpEQ(nextSegNo, producerSegNo[j++]));
372            }
373            kernelSet.clear();
374        }
375
376        std::vector<Value *> args = {instancePtrs[id], terminated};
377        args.insert(args.end(), inputs.size(), iBuilder->CreateMul(segmentItems, segNo));
378        args.insert(args.end(), outputs.size(), iBuilder->getSize(0));
379
380        kernel->createDoSegmentCall(args);
381        segNo->addIncoming(nextSegNo, doSegmentBlock);
382        kernel->releaseLogicalSegmentNo(instancePtrs[id], nextSegNo);
383
384        iBuilder->CreateCondBr(terminated, exitThreadBlock, outputCheckBlock);
385
386        iBuilder->SetInsertPoint(exitThreadBlock);
387        iBuilder->CreatePThreadExitCall(nullVoidPtrVal);
388        iBuilder->CreateRetVoid();
389
390        thread_functions[id] = threadFunc;
391    }
392
393    iBuilder->restoreIP(ip);
394
395    for (unsigned i = 0; i < n; i++) {
396        iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, thread_functions[i], sharedStruct);
397    }
398
399    AllocaInst * const status = iBuilder->CreateAlloca(int8PtrTy);
400    for (unsigned i = 0; i < n; i++) {
401        Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]);
402        iBuilder->CreatePThreadJoinCall(threadId, status);
403    }
404
405}
406
407/** ------------------------------------------------------------------------------------------------------------- *
408 * @brief generatePipelineLoop
409 ** ------------------------------------------------------------------------------------------------------------- */
410void generatePipelineLoop(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
411
412    for (auto k : kernels) {
413        k->createInstance();
414    }
415    BasicBlock * entryBlock = iBuilder->GetInsertBlock();
416    Function * main = entryBlock->getParent();
417
418    // Create the basic blocks for the loop.
419    BasicBlock * pipelineLoop = BasicBlock::Create(iBuilder->getContext(), "pipelineLoop", main);
420    BasicBlock * pipelineExit = BasicBlock::Create(iBuilder->getContext(), "pipelineExit", main);
421
422    StreamSetBufferMap<Value *> producedPos;
423    StreamSetBufferMap<std::pair<PHINode *, Value *>> consumedPos;
424
425    iBuilder->CreateBr(pipelineLoop);
426    iBuilder->SetInsertPoint(pipelineLoop);
427
428    // Build up the initial phi nodes for each of the consumed (minimum processed) positions
429    for (unsigned k = 0; k < kernels.size(); k++) {
430        KernelBuilder * const kernel = kernels[k];
431        const auto & outputs = kernel->getStreamOutputs();
432        for (unsigned i = 0; i < outputs.size(); ++i) {
433            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
434            if (LLVM_UNLIKELY(consumedPos.count(buf) != 0)) {
435                report_fatal_error(kernel->getName() + " redefines stream set " + outputs[i].name);
436            }
437            PHINode * phi = iBuilder->CreatePHI(iBuilder->getSizeTy(), 2);
438            phi->addIncoming(iBuilder->getSize(0), entryBlock);
439            consumedPos.emplace(buf, std::make_pair(phi, nullptr));
440        }
441    }
442
443    Value * terminated = iBuilder->getFalse();
444    for (unsigned k = 0; k < kernels.size(); k++) {
445        KernelBuilder * const kernel = kernels[k];
446        Value * const instance = kernel->getInstance();
447        const auto & inputs = kernel->getStreamInputs();
448        const auto & outputs = kernel->getStreamOutputs();
449        std::vector<Value *> args = {instance, terminated};
450        for (unsigned i = 0; i < inputs.size(); ++i) {
451            const auto f = producedPos.find(kernel->getStreamSetInputBuffer(i));
452            if (LLVM_UNLIKELY(f == producedPos.end())) {
453                report_fatal_error(kernel->getName() + " uses stream set " + inputs[i].name + " prior to its definition");
454            }
455            args.push_back(f->second);
456        }
457        for (unsigned i = 0; i < outputs.size(); ++i) {
458            const auto f = consumedPos.find(kernel->getStreamSetOutputBuffer(i));
459            assert (f != consumedPos.end());
460            args.push_back(std::get<0>(f->second));
461        }
462        kernel->createDoSegmentCall(args);
463        if (!kernel->hasNoTerminateAttribute()) {
464            terminated = iBuilder->CreateOr(terminated, kernel->getTerminationSignal(instance));
465        }
466        for (unsigned i = 0; i < outputs.size(); i++) {
467            Value * const produced = kernel->getProducedItemCount(instance, outputs[i].name, terminated);
468            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
469            assert (producedPos.count(buf) == 0);
470            producedPos.emplace(buf, produced);
471        }
472        for (unsigned i = 0; i < inputs.size(); i++) {
473            Value * const processed = kernel->getProcessedItemCount(instance, inputs[i].name);
474            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);
475            const auto f = consumedPos.find(buf);
476            assert (f != consumedPos.end());
477            Value *& consumed = std::get<1>(f->second);
478            if (consumed) {
479                consumed = iBuilder->CreateSelect(iBuilder->CreateICmpULT(processed, consumed), processed, consumed);
480            } else {
481                consumed = processed;
482            }
483        }
484        Value * const segNo = kernel->acquireLogicalSegmentNo(instance);
485        kernel->releaseLogicalSegmentNo(instance, iBuilder->CreateAdd(segNo, iBuilder->getSize(1)));
486    }
487    // update the consumed position phi nodes with the last min processed count of each input stream
488    for (const auto entry : consumedPos) {
489        PHINode * const phi = std::get<0>(entry.second);
490        Value * const value = std::get<1>(entry.second);
491        phi->addIncoming(value ? value : phi, pipelineLoop);
492    }
493    iBuilder->CreateCondBr(terminated, pipelineExit, pipelineLoop);
494    iBuilder->SetInsertPoint(pipelineExit);
495}
Note: See TracBrowser for help on using the repository browser.