source: icGREP/icgrep-devel/icgrep/toolchain/pipeline.cpp @ 5425

Last change on this file since 5425 was 5425, checked in by nmedfort, 2 years ago

Changes towards separate compilation

File size: 25.8 KB
Line 
1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
6#include "pipeline.h"
7#include <toolchain/toolchain.h>
8#include <kernels/kernel.h>
9#include <kernels/streamset.h>
10#include <llvm/IR/Module.h>
11#include <boost/container/flat_set.hpp>
12#include <boost/container/flat_map.hpp>
13
14using namespace kernel;
15using namespace parabix;
16using namespace llvm;
17
18template <typename Value>
19using StreamSetBufferMap = boost::container::flat_map<const StreamSetBuffer *, Value>;
20
21template <typename Value>
22using FlatSet = boost::container::flat_set<Value>;
23
24Function * makeThreadFunction(std::unique_ptr<IDISA::IDISA_Builder> & b, const std::string & name) {
25    Function * const f = Function::Create(FunctionType::get(b->getVoidTy(), {b->getVoidPtrTy()}, false), Function::InternalLinkage, name, b->getModule());
26    f->setCallingConv(CallingConv::C);
27    f->arg_begin()->setName("input");
28    return f;
29}
30
31/** ------------------------------------------------------------------------------------------------------------- *
32 * @brief generateSegmentParallelPipeline
33 *
34 * Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
35 * operating over an input stream set S, a segment-parallel implementation divides the input
36 * into segments and coordinates a set of T <= K threads to each process one segment at a time.
37 * Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
38 * fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
39 ** ------------------------------------------------------------------------------------------------------------- */
40void generateSegmentParallelPipeline(std::unique_ptr<IDISA::IDISA_Builder> & iBuilder, const std::vector<KernelBuilder *> & kernels) {
41
42    const unsigned n = kernels.size();
43    Module * const m = iBuilder->getModule();
44    IntegerType * const sizeTy = iBuilder->getSizeTy();
45    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
46    Constant * nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
47    std::vector<Type *> structTypes;
48
49    Value * instance[n];
50    for (unsigned i = 0; i < n; ++i) {
51        instance[i] = kernels[i]->getInstance();
52        structTypes.push_back(instance[i]->getType());
53    }
54    StructType * const sharedStructType = StructType::get(m->getContext(), structTypes);
55    StructType * const threadStructType = StructType::get(sharedStructType->getPointerTo(), sizeTy, nullptr);
56
57    Function * const threadFunc = makeThreadFunction(iBuilder, "segment");
58
59    // -------------------------------------------------------------------------------------------------------------------------
60    // MAKE SEGMENT PARALLEL PIPELINE THREAD
61    // -------------------------------------------------------------------------------------------------------------------------
62    const auto ip = iBuilder->saveIP();
63
64     // Create the basic blocks for the thread function.
65    BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc);
66    iBuilder->SetInsertPoint(entryBlock);
67    Value * const input = &threadFunc->getArgumentList().front();
68    Value * const threadStruct = iBuilder->CreatePointerCast(input, threadStructType->getPointerTo());
69    Value * const sharedStatePtr = iBuilder->CreateLoad(iBuilder->CreateGEP(threadStruct, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
70    for (unsigned k = 0; k < n; ++k) {
71        Value * ptr = iBuilder->CreateLoad(iBuilder->CreateGEP(sharedStatePtr, {iBuilder->getInt32(0), iBuilder->getInt32(k)}));
72        kernels[k]->setInstance(ptr);
73    }
74    Value * const segOffset = iBuilder->CreateLoad(iBuilder->CreateGEP(threadStruct, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
75
76    BasicBlock * segmentLoop = BasicBlock::Create(iBuilder->getContext(), "segmentLoop", threadFunc);
77    iBuilder->CreateBr(segmentLoop);
78
79    iBuilder->SetInsertPoint(segmentLoop);
80    PHINode * const segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 2, "segNo");
81    segNo->addIncoming(segOffset, entryBlock);
82
83    Value * terminated = iBuilder->getFalse();
84    Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
85
86    BasicBlock * segmentLoopBody = nullptr;
87    BasicBlock * const exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
88
89    StreamSetBufferMap<Value *> producedPos;
90    StreamSetBufferMap<Value *> consumedPos;
91
92    for (unsigned k = 0; k < n; ++k) {
93
94        const auto & kernel = kernels[k];
95
96        BasicBlock * const segmentWait = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Wait", threadFunc);
97        iBuilder->CreateBr(segmentWait);
98
99        segmentLoopBody = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Do", threadFunc);
100
101        iBuilder->SetInsertPoint(segmentWait);
102        const unsigned waitIdx = codegen::DebugOptionIsSet(codegen::SerializeThreads) ? (n - 1) : k;
103        Value * const processedSegmentCount = kernels[waitIdx]->acquireLogicalSegmentNo();
104        assert (processedSegmentCount->getType() == segNo->getType());
105        Value * const ready = iBuilder->CreateICmpEQ(segNo, processedSegmentCount);
106
107        if (kernel->hasNoTerminateAttribute()) {
108            iBuilder->CreateCondBr(ready, segmentLoopBody, segmentWait);
109        } else { // If the kernel was terminated in a previous segment then the pipeline is done.
110            BasicBlock * completionTest = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Completed", threadFunc, 0);
111            BasicBlock * exitBlock = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Exit", threadFunc, 0);
112            iBuilder->CreateCondBr(ready, completionTest, segmentWait);
113
114            iBuilder->SetInsertPoint(completionTest);
115            iBuilder->CreateCondBr(kernel->getTerminationSignal(), exitBlock, segmentLoopBody);
116            iBuilder->SetInsertPoint(exitBlock);
117            // Ensure that the next thread will also exit.
118            kernel->releaseLogicalSegmentNo(nextSegNo);
119            iBuilder->CreateBr(exitThreadBlock);
120        }
121
122        // Execute the kernel segment
123        iBuilder->SetInsertPoint(segmentLoopBody);
124        const auto & inputs = kernel->getStreamInputs();
125        std::vector<Value *> args = {kernel->getInstance(), terminated};
126        for (unsigned i = 0; i < inputs.size(); ++i) {
127            const auto f = producedPos.find(kernel->getStreamSetInputBuffer(i));
128            assert (f != producedPos.end());
129            args.push_back(f->second);
130        }
131
132        kernel->createDoSegmentCall(args);       
133        if (!kernel->hasNoTerminateAttribute()) {
134            terminated = iBuilder->CreateOr(terminated, kernel->getTerminationSignal());
135        }
136
137        const auto & outputs = kernel->getStreamOutputs();
138        for (unsigned i = 0; i < outputs.size(); ++i) {
139            Value * const produced = kernel->getProducedItemCount(outputs[i].name, terminated);
140            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
141            assert (producedPos.count(buf) == 0);
142            producedPos.emplace(buf, produced);
143        }
144        for (unsigned i = 0; i < inputs.size(); ++i) {
145            Value * const processedItemCount = kernel->getProcessedItemCount(inputs[i].name);
146            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);
147            auto f = consumedPos.find(buf);
148            if (f == consumedPos.end()) {
149                consumedPos.emplace(buf, processedItemCount);
150            } else {
151                Value * lesser = iBuilder->CreateICmpULT(processedItemCount, f->second);
152                f->second = iBuilder->CreateSelect(lesser, processedItemCount, f->second);
153            }
154        }
155
156        kernel->releaseLogicalSegmentNo(nextSegNo);
157    }
158
159    assert (segmentLoopBody);
160    exitThreadBlock->moveAfter(segmentLoopBody);
161
162    for (const auto consumed : consumedPos) {
163        const StreamSetBuffer * const buf = consumed.first;
164        KernelBuilder * k = buf->getProducer();
165        const auto & outputs = k->getStreamSetOutputBuffers();
166        for (unsigned i = 0; i < outputs.size(); ++i) {
167            if (outputs[i] == buf) {
168                k->setConsumedItemCount(k->getStreamOutputs()[i].name, consumed.second);
169                break;
170            }
171        }
172    }
173
174    segNo->addIncoming(iBuilder->CreateAdd(segNo, iBuilder->getSize(codegen::ThreadNum)), segmentLoopBody);
175    iBuilder->CreateCondBr(terminated, exitThreadBlock, segmentLoop);
176
177    iBuilder->SetInsertPoint(exitThreadBlock);
178
179    // only call pthread_exit() within spawned threads; otherwise it'll be equivalent to calling exit() within the process
180    BasicBlock * const exitThread = BasicBlock::Create(iBuilder->getContext(), "ExitThread", threadFunc);
181    BasicBlock * const exitFunction = BasicBlock::Create(iBuilder->getContext(), "ExitProcessFunction", threadFunc);
182
183    Value * const exitCond = iBuilder->CreateICmpEQ(segOffset, ConstantInt::getNullValue(segOffset->getType()));
184    iBuilder->CreateCondBr(exitCond, exitFunction, exitThread);
185    iBuilder->SetInsertPoint(exitThread);
186    iBuilder->CreatePThreadExitCall(nullVoidPtrVal);
187    iBuilder->CreateBr(exitFunction);
188    iBuilder->SetInsertPoint(exitFunction);
189    iBuilder->CreateRetVoid();
190
191    // -------------------------------------------------------------------------------------------------------------------------
192    iBuilder->restoreIP(ip);
193
194    for (unsigned i = 0; i < n; ++i) {
195        kernels[i]->setInstance(instance[i]);
196    }
197
198    // -------------------------------------------------------------------------------------------------------------------------
199    // MAKE SEGMENT PARALLEL PIPELINE DRIVER
200    // -------------------------------------------------------------------------------------------------------------------------
201    const unsigned threads = codegen::ThreadNum - 1;
202    assert (codegen::ThreadNum > 1);
203    Type * const pthreadsTy = ArrayType::get(sizeTy, threads);
204    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
205    Value * threadIdPtr[threads];
206
207    for (unsigned i = 0; i < threads; ++i) {
208        threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
209    }
210
211    for (unsigned i = 0; i < n; ++i) {
212        kernels[i]->releaseLogicalSegmentNo(iBuilder->getSize(0));
213    }
214
215    AllocaInst * const sharedStruct = iBuilder->CreateCacheAlignedAlloca(sharedStructType);
216    for (unsigned i = 0; i < n; ++i) {
217        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
218        iBuilder->CreateStore(kernels[i]->getInstance(), ptr);
219    }
220
221    // use the process thread to handle the initial segment function after spawning (n - 1) threads to handle the subsequent offsets
222    for (unsigned i = 0; i < threads; ++i) {
223        AllocaInst * const threadState = iBuilder->CreateAlloca(threadStructType);
224        iBuilder->CreateStore(sharedStruct, iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
225        iBuilder->CreateStore(iBuilder->getSize(i + 1), iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
226        iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, threadFunc, threadState);
227    }
228
229    AllocaInst * const threadState = iBuilder->CreateAlloca(threadStructType);
230    iBuilder->CreateStore(sharedStruct, iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
231    iBuilder->CreateStore(iBuilder->getSize(0), iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
232    iBuilder->CreateCall(threadFunc, iBuilder->CreatePointerCast(threadState, voidPtrTy));
233
234    AllocaInst * const status = iBuilder->CreateAlloca(voidPtrTy);
235    for (unsigned i = 0; i < threads; ++i) {
236        Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]);
237        iBuilder->CreatePThreadJoinCall(threadId, status);
238    }
239}
240
241
242/** ------------------------------------------------------------------------------------------------------------- *
243 * @brief generateParallelPipeline
244 ** ------------------------------------------------------------------------------------------------------------- */
245void generateParallelPipeline(std::unique_ptr<IDISA::IDISA_Builder> & iBuilder, const std::vector<KernelBuilder *> &kernels) {
246
247    Module * const m = iBuilder->getModule();
248    IntegerType * const sizeTy = iBuilder->getSizeTy();
249    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
250    ConstantInt * bufferSegments = ConstantInt::get(sizeTy, codegen::BufferSegments - 1);
251    ConstantInt * segmentItems = ConstantInt::get(sizeTy, codegen::SegmentSize * iBuilder->getBitBlockWidth());
252    Constant * const nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
253
254    const unsigned n = kernels.size();
255
256    Type * const pthreadsTy = ArrayType::get(sizeTy, n);
257    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
258    Value * threadIdPtr[n];
259    for (unsigned i = 0; i < n; ++i) {
260        threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
261    }
262
263    Value * instance[n];
264    Type * structTypes[n];
265    for (unsigned i = 0; i < n; ++i) {
266        instance[i] = kernels[i]->getInstance();
267        structTypes[i] = instance[i]->getType();
268    }
269
270    Type * const sharedStructType = StructType::get(m->getContext(), ArrayRef<Type *>{structTypes, n});
271
272
273    AllocaInst * sharedStruct = iBuilder->CreateAlloca(sharedStructType);
274    for (unsigned i = 0; i < n; ++i) {
275        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
276        iBuilder->CreateStore(instance[i], ptr);
277    }
278
279    for (auto & kernel : kernels) {
280        kernel->releaseLogicalSegmentNo(iBuilder->getSize(0));
281    }
282
283    // GENERATE THE PRODUCING AND CONSUMING KERNEL MAPS
284    StreamSetBufferMap<unsigned> producingKernel;
285    StreamSetBufferMap<std::vector<unsigned>> consumingKernels;
286    for (unsigned id = 0; id < n; ++id) {
287        const auto & kernel = kernels[id];
288        const auto & inputs = kernel->getStreamInputs();
289        const auto & outputs = kernel->getStreamOutputs();
290        // add any outputs from this kernel to the producing kernel map
291        for (unsigned j = 0; j < outputs.size(); ++j) {
292            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(j);
293            if (LLVM_UNLIKELY(producingKernel.count(buf) != 0)) {
294                report_fatal_error(kernel->getName() + " redefines stream set " + outputs[j].name);
295            }
296            producingKernel.emplace(buf, id);
297        }
298        // and any inputs to the consuming kernels list
299        for (unsigned j = 0; j < inputs.size(); ++j) {
300            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(j);
301            auto f = consumingKernels.find(buf);
302            if (f == consumingKernels.end()) {
303                if (LLVM_UNLIKELY(producingKernel.count(buf) == 0)) {
304                    report_fatal_error(kernel->getName() + " uses stream set " + inputs[j].name + " prior to its definition");
305                }
306                consumingKernels.emplace(buf, std::vector<unsigned>{ id });
307            } else {
308                f->second.push_back(id);
309            }
310        }
311    }
312
313    const auto ip = iBuilder->saveIP();
314
315    // GENERATE UNIQUE PIPELINE PARALLEL THREAD FUNCTION FOR EACH KERNEL
316    FlatSet<unsigned> kernelSet;
317    kernelSet.reserve(n);
318
319    Function * thread_functions[n];
320    Value * producerSegNo[n];
321    for (unsigned id = 0; id < n; id++) {
322        const auto & kernel = kernels[id];
323        const auto & inputs = kernel->getStreamInputs();
324
325        Function * const threadFunc = makeThreadFunction(iBuilder, "ppt:" + kernel->getName());
326
327         // Create the basic blocks for the thread function.
328        BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc);
329        BasicBlock * outputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "outputCheck", threadFunc);
330        BasicBlock * inputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "inputCheck", threadFunc);
331        BasicBlock * doSegmentBlock = BasicBlock::Create(iBuilder->getContext(), "doSegment", threadFunc);
332        BasicBlock * exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
333
334        iBuilder->SetInsertPoint(entryBlock);
335
336        Value * sharedStruct = iBuilder->CreateBitCast(&threadFunc->getArgumentList().front(), sharedStructType->getPointerTo());
337
338        for (unsigned k = 0; k < n; k++) {
339            Value * const ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
340            kernels[k]->setInstance(iBuilder->CreateLoad(ptr));
341        }
342
343        iBuilder->CreateBr(outputCheckBlock);
344
345        // Check whether the output buffers are ready for more data
346        iBuilder->SetInsertPoint(outputCheckBlock);
347        PHINode * segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 3, "segNo");
348        segNo->addIncoming(iBuilder->getSize(0), entryBlock);
349        segNo->addIncoming(segNo, outputCheckBlock);
350
351        Value * outputWaitCond = iBuilder->getTrue();
352        for (const StreamSetBuffer * buf : kernel->getStreamSetOutputBuffers()) {
353            const auto & list = consumingKernels[buf];
354            assert(std::is_sorted(list.begin(), list.end()));
355            kernelSet.insert(list.begin(), list.end());
356        }
357        for (unsigned k : kernelSet) {
358            Value * consumerSegNo = kernels[k]->acquireLogicalSegmentNo();
359            assert (consumerSegNo->getType() == segNo->getType());
360            Value * consumedSegNo = iBuilder->CreateAdd(consumerSegNo, bufferSegments);
361            outputWaitCond = iBuilder->CreateAnd(outputWaitCond, iBuilder->CreateICmpULE(segNo, consumedSegNo));
362        }
363        kernelSet.clear();
364        iBuilder->CreateCondBr(outputWaitCond, inputCheckBlock, outputCheckBlock);
365
366        // Check whether the input buffers have enough data for this kernel to begin
367        iBuilder->SetInsertPoint(inputCheckBlock);
368        for (const StreamSetBuffer * buf : kernel->getStreamSetInputBuffers()) {
369            kernelSet.insert(producingKernel[buf]);
370        }
371
372        Value * inputWaitCond = iBuilder->getTrue();
373        for (unsigned k : kernelSet) {
374            producerSegNo[k] = kernels[k]->acquireLogicalSegmentNo();
375            assert (producerSegNo[k]->getType() == segNo->getType());
376            inputWaitCond = iBuilder->CreateAnd(inputWaitCond, iBuilder->CreateICmpULT(segNo, producerSegNo[k]));
377        }
378        iBuilder->CreateCondBr(inputWaitCond, doSegmentBlock, inputCheckBlock);
379
380        // Process the segment
381        iBuilder->SetInsertPoint(doSegmentBlock);
382
383        Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
384        Value * terminated = nullptr;
385        if (kernelSet.empty()) {
386            // if this kernel has no input streams, the kernel itself must decide when it terminates.
387            terminated = kernel->getTerminationSignal();
388        } else {
389            // ... otherwise the kernel terminates only when it exhausts all of its input streams
390            terminated = iBuilder->getTrue();
391            for (unsigned k : kernelSet) {
392                terminated = iBuilder->CreateAnd(terminated, kernels[k]->getTerminationSignal());
393                terminated = iBuilder->CreateAnd(terminated, iBuilder->CreateICmpEQ(nextSegNo, producerSegNo[k]));
394            }
395            kernelSet.clear();
396        }
397
398        std::vector<Value *> args = {kernel->getInstance(), terminated};
399        args.insert(args.end(), inputs.size(), iBuilder->CreateMul(segmentItems, segNo));
400
401        kernel->createDoSegmentCall(args);
402        segNo->addIncoming(nextSegNo, doSegmentBlock);
403        kernel->releaseLogicalSegmentNo(nextSegNo);
404
405        iBuilder->CreateCondBr(terminated, exitThreadBlock, outputCheckBlock);
406
407        iBuilder->SetInsertPoint(exitThreadBlock);
408
409        iBuilder->CreatePThreadExitCall(nullVoidPtrVal);
410
411        iBuilder->CreateRetVoid();
412
413        thread_functions[id] = threadFunc;
414    }
415
416    iBuilder->restoreIP(ip);
417
418    for (unsigned i = 0; i < n; ++i) {
419        kernels[i]->setInstance(instance[i]);
420    }
421
422    for (unsigned i = 0; i < n; ++i) {
423        iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, thread_functions[i], sharedStruct);
424    }
425
426    AllocaInst * const status = iBuilder->CreateAlloca(voidPtrTy);
427    for (unsigned i = 0; i < n; ++i) {
428        Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]);
429        iBuilder->CreatePThreadJoinCall(threadId, status);
430    }
431
432}
433
434/** ------------------------------------------------------------------------------------------------------------- *
435 * @brief generatePipelineLoop
436 ** ------------------------------------------------------------------------------------------------------------- */
437void generatePipelineLoop(std::unique_ptr<IDISA::IDISA_Builder> & iBuilder, const std::vector<KernelBuilder *> & kernels) {
438
439    BasicBlock * entryBlock = iBuilder->GetInsertBlock();
440    Function * main = entryBlock->getParent();
441    Value * mCycleCounts = nullptr;
442    if (codegen::EnableCycleCounter) {
443        ArrayType * cycleCountArray = ArrayType::get(iBuilder->getInt64Ty(), kernels.size());
444        mCycleCounts = iBuilder->CreateAlloca(ArrayType::get(iBuilder->getInt64Ty(), kernels.size()));
445        iBuilder->CreateStore(Constant::getNullValue(cycleCountArray), mCycleCounts);
446    }
447
448    // Create the basic blocks for the loop.
449    BasicBlock * pipelineLoop = BasicBlock::Create(iBuilder->getContext(), "pipelineLoop", main);
450    BasicBlock * pipelineExit = BasicBlock::Create(iBuilder->getContext(), "pipelineExit", main);
451
452    StreamSetBufferMap<Value *> producedPos;
453    StreamSetBufferMap<Value *> consumedPos;
454
455    iBuilder->CreateBr(pipelineLoop);
456    iBuilder->SetInsertPoint(pipelineLoop);
457   
458    Value * cycleCountStart = nullptr;
459    Value * cycleCountEnd = nullptr;
460    if (codegen::EnableCycleCounter) {
461        cycleCountStart = iBuilder->CreateReadCycleCounter();
462    }
463    Value * terminated = iBuilder->getFalse();
464    for (unsigned k = 0; k < kernels.size(); k++) {
465        auto & kernel = kernels[k];
466
467        const auto & inputs = kernel->getStreamInputs();
468        const auto & outputs = kernel->getStreamOutputs();
469
470        std::vector<Value *> args = {kernel->getInstance(), terminated};
471        for (unsigned i = 0; i < inputs.size(); ++i) {
472            const auto f = producedPos.find(kernel->getStreamSetInputBuffer(i));
473            if (LLVM_UNLIKELY(f == producedPos.end())) {
474                report_fatal_error(kernel->getName() + " uses stream set " + inputs[i].name + " prior to its definition");
475            }
476            args.push_back(f->second);
477        }
478
479        kernel->createDoSegmentCall(args);
480        if (!kernel->hasNoTerminateAttribute()) {
481            terminated = iBuilder->CreateOr(terminated, kernel->getTerminationSignal());
482        }
483        for (unsigned i = 0; i < outputs.size(); ++i) {
484            Value * const produced = kernel->getProducedItemCount(outputs[i].name, terminated);
485            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
486            assert (producedPos.count(buf) == 0);
487            producedPos.emplace(buf, produced);
488        }
489
490        for (unsigned i = 0; i < inputs.size(); ++i) {
491            Value * const processedItemCount = kernel->getProcessedItemCount(inputs[i].name);
492            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);
493            auto f = consumedPos.find(buf);
494            if (f == consumedPos.end()) {
495                consumedPos.emplace(buf, processedItemCount);
496            } else {
497                Value * lesser = iBuilder->CreateICmpULT(processedItemCount, f->second);
498                f->second = iBuilder->CreateSelect(lesser, processedItemCount, f->second);
499            }
500        }
501        if (codegen::EnableCycleCounter) {
502            cycleCountEnd = iBuilder->CreateReadCycleCounter();
503            Value * counterPtr = iBuilder->CreateGEP(mCycleCounts, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
504            iBuilder->CreateStore(iBuilder->CreateAdd(iBuilder->CreateLoad(counterPtr), iBuilder->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
505            cycleCountStart = cycleCountEnd;
506        }
507        Value * const segNo = kernel->acquireLogicalSegmentNo();
508        kernel->releaseLogicalSegmentNo(iBuilder->CreateAdd(segNo, iBuilder->getSize(1)));
509    }
510
511    for (const auto consumed : consumedPos) {
512        const StreamSetBuffer * const buf = consumed.first;
513        KernelBuilder * k = buf->getProducer();
514        const auto & outputs = k->getStreamSetOutputBuffers();
515        for (unsigned i = 0; i < outputs.size(); ++i) {
516            if (outputs[i] == buf) {
517                k->setConsumedItemCount(k->getStreamOutputs()[i].name, consumed.second);
518                break;
519            }
520        }
521    }
522
523    iBuilder->CreateCondBr(terminated, pipelineExit, pipelineLoop);
524    iBuilder->SetInsertPoint(pipelineExit);
525    if (codegen::EnableCycleCounter) {
526        for (unsigned k = 0; k < kernels.size(); k++) {
527            auto & kernel = kernels[k];
528            const auto & inputs = kernel->getStreamInputs();
529            const auto & outputs = kernel->getStreamOutputs();
530            Value * items = inputs.size() > 0 ? kernel->getProcessedItemCount(inputs[0].name) : kernel->getProducedItemCount(outputs[0].name);
531            Value * fItems = iBuilder->CreateUIToFP(items, iBuilder->getDoubleTy());
532            Value * cycles = iBuilder->CreateLoad(iBuilder->CreateGEP(mCycleCounts, {iBuilder->getInt32(0), iBuilder->getInt32(k)}));
533            Value * fCycles = iBuilder->CreateUIToFP(cycles, iBuilder->getDoubleTy());
534            std::string formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
535            Value * stringPtr = iBuilder->CreatePointerCast(iBuilder->CreateGlobalString(formatString.c_str()), iBuilder->getInt8PtrTy());
536            iBuilder->CreateCall(iBuilder->GetDprintf(), {iBuilder->getInt32(2), stringPtr, fItems, fCycles, iBuilder->CreateFDiv(fCycles, fItems)});
537        }
538    }
539}
Note: See TracBrowser for help on using the repository browser.