source: icGREP/icgrep-devel/icgrep/toolchain/pipeline.cpp @ 5435

Last change on this file since 5435 was 5435, checked in by nmedfort, 2 years ago

Continued refactoring work.

File size: 26.6 KB
Line 
1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
6#include "pipeline.h"
7#include <toolchain/toolchain.h>
8#include <kernels/kernel.h>
9#include <kernels/streamset.h>
10#include <llvm/IR/Module.h>
11#include <boost/container/flat_set.hpp>
12#include <boost/container/flat_map.hpp>
13
14using namespace kernel;
15using namespace parabix;
16using namespace llvm;
17
18template <typename Value>
19using StreamSetBufferMap = boost::container::flat_map<const StreamSetBuffer *, Value>;
20
21template <typename Value>
22using FlatSet = boost::container::flat_set<Value>;
23
24Function * makeThreadFunction(IDISA::IDISA_Builder * const b, const std::string & name) {
25    Function * const f = Function::Create(FunctionType::get(b->getVoidTy(), {b->getVoidPtrTy()}, false), Function::InternalLinkage, name, b->getModule());
26    f->setCallingConv(CallingConv::C);
27    f->arg_begin()->setName("input");
28    return f;
29}
30
31/** ------------------------------------------------------------------------------------------------------------- *
32 * @brief generateSegmentParallelPipeline
33 *
34 * Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
35 * operating over an input stream set S, a segment-parallel implementation divides the input
36 * into segments and coordinates a set of T <= K threads to each process one segment at a time.
37 * Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
38 * fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
39 ** ------------------------------------------------------------------------------------------------------------- */
40void generateSegmentParallelPipeline(IDISA::IDISA_Builder * const iBuilder, const std::vector<Kernel *> & kernels) {
41
42    const unsigned n = kernels.size();
43    Module * const m = iBuilder->getModule();
44    IntegerType * const sizeTy = iBuilder->getSizeTy();
45    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
46    Constant * nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
47    std::vector<Type *> structTypes;
48
49    Value * instance[n];
50    for (unsigned i = 0; i < n; ++i) {
51        instance[i] = kernels[i]->getInstance();
52        structTypes.push_back(instance[i]->getType());
53    }
54    StructType * const sharedStructType = StructType::get(m->getContext(), structTypes);
55    StructType * const threadStructType = StructType::get(sharedStructType->getPointerTo(), sizeTy, nullptr);
56
57    Function * const threadFunc = makeThreadFunction(iBuilder, "segment");
58
59    // -------------------------------------------------------------------------------------------------------------------------
60    // MAKE SEGMENT PARALLEL PIPELINE THREAD
61    // -------------------------------------------------------------------------------------------------------------------------
62    const auto ip = iBuilder->saveIP();
63
64     // Create the basic blocks for the thread function.
65    BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc);
66    iBuilder->SetInsertPoint(entryBlock);
67    Value * const input = &threadFunc->getArgumentList().front();
68    Value * const threadStruct = iBuilder->CreatePointerCast(input, threadStructType->getPointerTo());
69    Value * const sharedStatePtr = iBuilder->CreateLoad(iBuilder->CreateGEP(threadStruct, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
70    for (unsigned k = 0; k < n; ++k) {
71        Value * ptr = iBuilder->CreateLoad(iBuilder->CreateGEP(sharedStatePtr, {iBuilder->getInt32(0), iBuilder->getInt32(k)}));
72        kernels[k]->setInstance(ptr);
73    }
74    Value * const segOffset = iBuilder->CreateLoad(iBuilder->CreateGEP(threadStruct, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
75
76    BasicBlock * segmentLoop = BasicBlock::Create(iBuilder->getContext(), "segmentLoop", threadFunc);
77    iBuilder->CreateBr(segmentLoop);
78
79    iBuilder->SetInsertPoint(segmentLoop);
80    PHINode * const segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 2, "segNo");
81    segNo->addIncoming(segOffset, entryBlock);
82
83    Value * terminated = iBuilder->getFalse();
84    Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
85
86    BasicBlock * segmentLoopBody = nullptr;
87    BasicBlock * const exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
88
89    StreamSetBufferMap<Value *> producedPos;
90    StreamSetBufferMap<Value *> consumedPos;
91
92    for (unsigned k = 0; k < n; ++k) {
93
94        const auto & kernel = kernels[k];
95
96        BasicBlock * const segmentWait = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Wait", threadFunc);
97        iBuilder->CreateBr(segmentWait);
98
99        segmentLoopBody = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Do", threadFunc);
100
101        iBuilder->SetInsertPoint(segmentWait);
102        const unsigned waitIdx = codegen::DebugOptionIsSet(codegen::SerializeThreads) ? (n - 1) : k;
103        kernels[waitIdx]->setBuilder(iBuilder);
104        Value * const processedSegmentCount = kernels[waitIdx]->acquireLogicalSegmentNo();
105
106
107        assert (processedSegmentCount->getType() == segNo->getType());
108        Value * const ready = iBuilder->CreateICmpEQ(segNo, processedSegmentCount);
109
110        if (kernel->hasNoTerminateAttribute()) {
111            iBuilder->CreateCondBr(ready, segmentLoopBody, segmentWait);
112        } else { // If the kernel was terminated in a previous segment then the pipeline is done.
113            BasicBlock * completionTest = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Completed", threadFunc, 0);
114            BasicBlock * exitBlock = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Exit", threadFunc, 0);
115            iBuilder->CreateCondBr(ready, completionTest, segmentWait);
116
117            iBuilder->SetInsertPoint(completionTest);
118
119            kernel->setBuilder(iBuilder);
120            Value * terminationSignal = kernel->getTerminationSignal();
121
122            iBuilder->CreateCondBr(terminationSignal, exitBlock, segmentLoopBody);
123            iBuilder->SetInsertPoint(exitBlock);
124            // Ensure that the next thread will also exit.
125            kernel->releaseLogicalSegmentNo(nextSegNo);
126            iBuilder->CreateBr(exitThreadBlock);
127        }
128
129        // Execute the kernel segment
130        iBuilder->SetInsertPoint(segmentLoopBody);
131        const auto & inputs = kernel->getStreamInputs();
132        std::vector<Value *> args = {kernel->getInstance(), terminated};
133        for (unsigned i = 0; i < inputs.size(); ++i) {
134            const auto f = producedPos.find(kernel->getStreamSetInputBuffer(i));
135            assert (f != producedPos.end());
136            args.push_back(f->second);
137        }
138
139        kernel->setBuilder(iBuilder);
140        kernel->createDoSegmentCall(args);       
141        if (!kernel->hasNoTerminateAttribute()) {
142            terminated = iBuilder->CreateOr(terminated, kernel->getTerminationSignal());
143        }
144
145        const auto & outputs = kernel->getStreamOutputs();
146        for (unsigned i = 0; i < outputs.size(); ++i) {
147            kernel->setBuilder(iBuilder);
148            Value * const produced = kernel->getProducedItemCount(outputs[i].name, terminated);
149            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
150            assert (producedPos.count(buf) == 0);
151            producedPos.emplace(buf, produced);
152        }
153        for (unsigned i = 0; i < inputs.size(); ++i) {
154            kernel->setBuilder(iBuilder);
155            Value * const processedItemCount = kernel->getProcessedItemCount(inputs[i].name);
156            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);           
157            auto f = consumedPos.find(buf);
158            if (f == consumedPos.end()) {
159                consumedPos.emplace(buf, processedItemCount);
160            } else {
161                Value * lesser = iBuilder->CreateICmpULT(processedItemCount, f->second);
162                f->second = iBuilder->CreateSelect(lesser, processedItemCount, f->second);
163            }
164        }
165        kernel->setBuilder(iBuilder);
166        kernel->releaseLogicalSegmentNo(nextSegNo);
167    }
168
169    assert (segmentLoopBody);
170    exitThreadBlock->moveAfter(segmentLoopBody);
171
172    for (const auto consumed : consumedPos) {
173        const StreamSetBuffer * const buf = consumed.first;
174        Kernel * kernel = buf->getProducer();
175        const auto & outputs = kernel->getStreamSetOutputBuffers();
176        for (unsigned i = 0; i < outputs.size(); ++i) {
177            if (outputs[i] == buf) {
178                kernel->setBuilder(iBuilder);
179                kernel->setConsumedItemCount(kernel->getStreamOutputs()[i].name, consumed.second);
180                break;
181            }
182        }
183    }
184
185    segNo->addIncoming(iBuilder->CreateAdd(segNo, iBuilder->getSize(codegen::ThreadNum)), segmentLoopBody);
186    iBuilder->CreateCondBr(terminated, exitThreadBlock, segmentLoop);
187
188    iBuilder->SetInsertPoint(exitThreadBlock);
189
190    // only call pthread_exit() within spawned threads; otherwise it'll be equivalent to calling exit() within the process
191    BasicBlock * const exitThread = BasicBlock::Create(iBuilder->getContext(), "ExitThread", threadFunc);
192    BasicBlock * const exitFunction = BasicBlock::Create(iBuilder->getContext(), "ExitProcessFunction", threadFunc);
193
194    Value * const exitCond = iBuilder->CreateICmpEQ(segOffset, ConstantInt::getNullValue(segOffset->getType()));
195    iBuilder->CreateCondBr(exitCond, exitFunction, exitThread);
196    iBuilder->SetInsertPoint(exitThread);
197    iBuilder->CreatePThreadExitCall(nullVoidPtrVal);
198    iBuilder->CreateBr(exitFunction);
199    iBuilder->SetInsertPoint(exitFunction);
200    iBuilder->CreateRetVoid();
201
202    // -------------------------------------------------------------------------------------------------------------------------
203    iBuilder->restoreIP(ip);
204
205    for (unsigned i = 0; i < n; ++i) {
206        kernels[i]->setInstance(instance[i]);
207    }
208
209    // -------------------------------------------------------------------------------------------------------------------------
210    // MAKE SEGMENT PARALLEL PIPELINE DRIVER
211    // -------------------------------------------------------------------------------------------------------------------------
212    const unsigned threads = codegen::ThreadNum - 1;
213    assert (codegen::ThreadNum > 1);
214    Type * const pthreadsTy = ArrayType::get(sizeTy, threads);
215    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
216    Value * threadIdPtr[threads];
217
218    for (unsigned i = 0; i < threads; ++i) {
219        threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
220    }
221
222    for (unsigned i = 0; i < n; ++i) {
223        auto kernel = kernels[i];
224        kernel->setBuilder(iBuilder);
225        kernel->releaseLogicalSegmentNo(iBuilder->getSize(0));
226    }
227
228    AllocaInst * const sharedStruct = iBuilder->CreateCacheAlignedAlloca(sharedStructType);
229    for (unsigned i = 0; i < n; ++i) {
230        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
231        iBuilder->CreateStore(kernels[i]->getInstance(), ptr);
232    }
233
234    // use the process thread to handle the initial segment function after spawning (n - 1) threads to handle the subsequent offsets
235    for (unsigned i = 0; i < threads; ++i) {
236        AllocaInst * const threadState = iBuilder->CreateAlloca(threadStructType);
237        iBuilder->CreateStore(sharedStruct, iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
238        iBuilder->CreateStore(iBuilder->getSize(i + 1), iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
239        iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, threadFunc, threadState);
240    }
241
242    AllocaInst * const threadState = iBuilder->CreateAlloca(threadStructType);
243    iBuilder->CreateStore(sharedStruct, iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
244    iBuilder->CreateStore(iBuilder->getSize(0), iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
245    iBuilder->CreateCall(threadFunc, iBuilder->CreatePointerCast(threadState, voidPtrTy));
246
247    AllocaInst * const status = iBuilder->CreateAlloca(voidPtrTy);
248    for (unsigned i = 0; i < threads; ++i) {
249        Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]);
250        iBuilder->CreatePThreadJoinCall(threadId, status);
251    }
252}
253
254
255/** ------------------------------------------------------------------------------------------------------------- *
256 * @brief generateParallelPipeline
257 ** ------------------------------------------------------------------------------------------------------------- */
258void generateParallelPipeline(IDISA::IDISA_Builder * const iBuilder, const std::vector<Kernel *> &kernels) {
259
260    Module * const m = iBuilder->getModule();
261    IntegerType * const sizeTy = iBuilder->getSizeTy();
262    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
263    ConstantInt * bufferSegments = ConstantInt::get(sizeTy, codegen::BufferSegments - 1);
264    ConstantInt * segmentItems = ConstantInt::get(sizeTy, codegen::SegmentSize * iBuilder->getBitBlockWidth());
265    Constant * const nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
266
267    const unsigned n = kernels.size();
268
269    Type * const pthreadsTy = ArrayType::get(sizeTy, n);
270    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
271    Value * threadIdPtr[n];
272    for (unsigned i = 0; i < n; ++i) {
273        threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
274    }
275
276    Value * instance[n];
277    Type * structTypes[n];
278    for (unsigned i = 0; i < n; ++i) {
279        instance[i] = kernels[i]->getInstance();
280        structTypes[i] = instance[i]->getType();
281    }
282
283    Type * const sharedStructType = StructType::get(m->getContext(), ArrayRef<Type *>{structTypes, n});
284
285
286    AllocaInst * sharedStruct = iBuilder->CreateAlloca(sharedStructType);
287    for (unsigned i = 0; i < n; ++i) {
288        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
289        iBuilder->CreateStore(instance[i], ptr);
290    }
291
292    for (auto & kernel : kernels) {
293        kernel->setBuilder(iBuilder);
294        kernel->releaseLogicalSegmentNo(iBuilder->getSize(0));
295    }
296
297    // GENERATE THE PRODUCING AND CONSUMING KERNEL MAPS
298    StreamSetBufferMap<unsigned> producingKernel;
299    StreamSetBufferMap<std::vector<unsigned>> consumingKernels;
300    for (unsigned id = 0; id < n; ++id) {
301        const auto & kernel = kernels[id];
302        const auto & inputs = kernel->getStreamInputs();
303        const auto & outputs = kernel->getStreamOutputs();
304        // add any outputs from this kernel to the producing kernel map
305        for (unsigned j = 0; j < outputs.size(); ++j) {
306            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(j);
307            if (LLVM_UNLIKELY(producingKernel.count(buf) != 0)) {
308                report_fatal_error(kernel->getName() + " redefines stream set " + outputs[j].name);
309            }
310            producingKernel.emplace(buf, id);
311        }
312        // and any inputs to the consuming kernels list
313        for (unsigned j = 0; j < inputs.size(); ++j) {
314            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(j);
315            auto f = consumingKernels.find(buf);
316            if (f == consumingKernels.end()) {
317                if (LLVM_UNLIKELY(producingKernel.count(buf) == 0)) {
318                    report_fatal_error(kernel->getName() + " uses stream set " + inputs[j].name + " prior to its definition");
319                }
320                consumingKernels.emplace(buf, std::vector<unsigned>{ id });
321            } else {
322                f->second.push_back(id);
323            }
324        }
325    }
326
327    const auto ip = iBuilder->saveIP();
328
329    // GENERATE UNIQUE PIPELINE PARALLEL THREAD FUNCTION FOR EACH KERNEL
330    FlatSet<unsigned> kernelSet;
331    kernelSet.reserve(n);
332
333    Function * thread_functions[n];
334    Value * producerSegNo[n];
335    for (unsigned id = 0; id < n; id++) {
336        const auto & kernel = kernels[id];
337        const auto & inputs = kernel->getStreamInputs();
338
339        Function * const threadFunc = makeThreadFunction(iBuilder, "ppt:" + kernel->getName());
340
341         // Create the basic blocks for the thread function.
342        BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc);
343        BasicBlock * outputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "outputCheck", threadFunc);
344        BasicBlock * inputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "inputCheck", threadFunc);
345        BasicBlock * doSegmentBlock = BasicBlock::Create(iBuilder->getContext(), "doSegment", threadFunc);
346        BasicBlock * exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
347
348        iBuilder->SetInsertPoint(entryBlock);
349
350        Value * sharedStruct = iBuilder->CreateBitCast(&threadFunc->getArgumentList().front(), sharedStructType->getPointerTo());
351
352        for (unsigned k = 0; k < n; k++) {
353            Value * const ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
354            kernels[k]->setInstance(iBuilder->CreateLoad(ptr));
355        }
356
357        iBuilder->CreateBr(outputCheckBlock);
358
359        // Check whether the output buffers are ready for more data
360        iBuilder->SetInsertPoint(outputCheckBlock);
361        PHINode * segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 3, "segNo");
362        segNo->addIncoming(iBuilder->getSize(0), entryBlock);
363        segNo->addIncoming(segNo, outputCheckBlock);
364
365        Value * outputWaitCond = iBuilder->getTrue();
366        for (const StreamSetBuffer * buf : kernel->getStreamSetOutputBuffers()) {
367            const auto & list = consumingKernels[buf];
368            assert(std::is_sorted(list.begin(), list.end()));
369            kernelSet.insert(list.begin(), list.end());
370        }
371        for (unsigned k : kernelSet) {
372            Value * consumerSegNo = kernels[k]->acquireLogicalSegmentNo();
373            assert (consumerSegNo->getType() == segNo->getType());
374            Value * consumedSegNo = iBuilder->CreateAdd(consumerSegNo, bufferSegments);
375            outputWaitCond = iBuilder->CreateAnd(outputWaitCond, iBuilder->CreateICmpULE(segNo, consumedSegNo));
376        }
377        kernelSet.clear();
378        iBuilder->CreateCondBr(outputWaitCond, inputCheckBlock, outputCheckBlock);
379
380        // Check whether the input buffers have enough data for this kernel to begin
381        iBuilder->SetInsertPoint(inputCheckBlock);
382        for (const StreamSetBuffer * buf : kernel->getStreamSetInputBuffers()) {
383            kernelSet.insert(producingKernel[buf]);
384        }
385
386        Value * inputWaitCond = iBuilder->getTrue();
387        for (unsigned k : kernelSet) {
388            Kernel * kernel = kernels[k];
389            kernel->setBuilder(iBuilder);
390            producerSegNo[k] = kernel->acquireLogicalSegmentNo();
391            assert (producerSegNo[k]->getType() == segNo->getType());
392            inputWaitCond = iBuilder->CreateAnd(inputWaitCond, iBuilder->CreateICmpULT(segNo, producerSegNo[k]));
393        }
394        iBuilder->CreateCondBr(inputWaitCond, doSegmentBlock, inputCheckBlock);
395
396        // Process the segment
397        iBuilder->SetInsertPoint(doSegmentBlock);
398
399        Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
400        Value * terminated = nullptr;
401        if (kernelSet.empty()) {
402            // if this kernel has no input streams, the kernel itself must decide when it terminates.
403            terminated = kernel->getTerminationSignal();
404        } else {
405            // ... otherwise the kernel terminates only when it exhausts all of its input streams
406            terminated = iBuilder->getTrue();
407            for (unsigned k : kernelSet) {
408                terminated = iBuilder->CreateAnd(terminated, kernels[k]->getTerminationSignal());
409                terminated = iBuilder->CreateAnd(terminated, iBuilder->CreateICmpEQ(nextSegNo, producerSegNo[k]));
410            }
411            kernelSet.clear();
412        }
413
414        std::vector<Value *> args = {kernel->getInstance(), terminated};
415        args.insert(args.end(), inputs.size(), iBuilder->CreateMul(segmentItems, segNo));
416
417        kernel->createDoSegmentCall(args);
418        segNo->addIncoming(nextSegNo, doSegmentBlock);
419        kernel->releaseLogicalSegmentNo(nextSegNo);
420
421        iBuilder->CreateCondBr(terminated, exitThreadBlock, outputCheckBlock);
422
423        iBuilder->SetInsertPoint(exitThreadBlock);
424
425        iBuilder->CreatePThreadExitCall(nullVoidPtrVal);
426
427        iBuilder->CreateRetVoid();
428
429        thread_functions[id] = threadFunc;
430    }
431
432    iBuilder->restoreIP(ip);
433
434    for (unsigned i = 0; i < n; ++i) {
435        kernels[i]->setInstance(instance[i]);
436    }
437
438    for (unsigned i = 0; i < n; ++i) {
439        iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, thread_functions[i], sharedStruct);
440    }
441
442    AllocaInst * const status = iBuilder->CreateAlloca(voidPtrTy);
443    for (unsigned i = 0; i < n; ++i) {
444        Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]);
445        iBuilder->CreatePThreadJoinCall(threadId, status);
446    }
447}
448
449/** ------------------------------------------------------------------------------------------------------------- *
450 * @brief generatePipelineLoop
451 ** ------------------------------------------------------------------------------------------------------------- */
452void generatePipelineLoop(IDISA::IDISA_Builder * const iBuilder, const std::vector<Kernel *> & kernels) {
453
454    BasicBlock * entryBlock = iBuilder->GetInsertBlock();
455    Function * main = entryBlock->getParent();
456    Value * mCycleCounts = nullptr;
457    if (codegen::EnableCycleCounter) {
458        ArrayType * cycleCountArray = ArrayType::get(iBuilder->getInt64Ty(), kernels.size());
459        mCycleCounts = iBuilder->CreateAlloca(ArrayType::get(iBuilder->getInt64Ty(), kernels.size()));
460        iBuilder->CreateStore(Constant::getNullValue(cycleCountArray), mCycleCounts);
461    }
462
463    // Create the basic blocks for the loop.
464    BasicBlock * pipelineLoop = BasicBlock::Create(iBuilder->getContext(), "pipelineLoop", main);
465    BasicBlock * pipelineExit = BasicBlock::Create(iBuilder->getContext(), "pipelineExit", main);
466
467    StreamSetBufferMap<Value *> producedPos;
468    StreamSetBufferMap<Value *> consumedPos;
469
470    iBuilder->CreateBr(pipelineLoop);
471    iBuilder->SetInsertPoint(pipelineLoop);
472   
473    Value * cycleCountStart = nullptr;
474    Value * cycleCountEnd = nullptr;
475    if (codegen::EnableCycleCounter) {
476        cycleCountStart = iBuilder->CreateReadCycleCounter();
477    }
478    Value * terminated = iBuilder->getFalse();
479    for (unsigned k = 0; k < kernels.size(); k++) {
480        auto & kernel = kernels[k];
481
482        const auto & inputs = kernel->getStreamInputs();
483        const auto & outputs = kernel->getStreamOutputs();
484
485        std::vector<Value *> args = {kernel->getInstance(), terminated};
486        for (unsigned i = 0; i < inputs.size(); ++i) {
487            const auto f = producedPos.find(kernel->getStreamSetInputBuffer(i));
488            if (LLVM_UNLIKELY(f == producedPos.end())) {
489                report_fatal_error(kernel->getName() + " uses stream set " + inputs[i].name + " prior to its definition");
490            }
491            args.push_back(f->second);
492        }
493
494        kernel->setBuilder(iBuilder);
495        kernel->createDoSegmentCall(args);
496        if (!kernel->hasNoTerminateAttribute()) {
497            Value * terminatedSignal = kernel->getTerminationSignal();
498            terminated = iBuilder->CreateOr(terminated, terminatedSignal);
499            kernel->setBuilder(iBuilder);
500        }
501        for (unsigned i = 0; i < outputs.size(); ++i) {
502            Value * const produced = kernel->getProducedItemCount(outputs[i].name, terminated);
503            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
504            assert (producedPos.count(buf) == 0);
505            producedPos.emplace(buf, produced);
506        }
507
508        for (unsigned i = 0; i < inputs.size(); ++i) {
509            kernel->setBuilder(iBuilder);
510            Value * const processedItemCount = kernel->getProcessedItemCount(inputs[i].name);
511            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);
512
513            auto f = consumedPos.find(buf);
514            if (f == consumedPos.end()) {
515                consumedPos.emplace(buf, processedItemCount);
516            } else {
517                Value * lesser = iBuilder->CreateICmpULT(processedItemCount, f->second);
518                f->second = iBuilder->CreateSelect(lesser, processedItemCount, f->second);
519            }
520        }
521        if (codegen::EnableCycleCounter) {
522            cycleCountEnd = iBuilder->CreateReadCycleCounter();
523            Value * counterPtr = iBuilder->CreateGEP(mCycleCounts, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
524            iBuilder->CreateStore(iBuilder->CreateAdd(iBuilder->CreateLoad(counterPtr), iBuilder->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
525            cycleCountStart = cycleCountEnd;
526        }
527
528        kernel->setBuilder(iBuilder);
529        Value * const segNo = kernel->acquireLogicalSegmentNo();
530        Value * nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
531        kernel->releaseLogicalSegmentNo(nextSegNo);
532    }
533
534    for (const auto consumed : consumedPos) {
535        const StreamSetBuffer * const buf = consumed.first;
536        Kernel * k = buf->getProducer();
537        const auto & outputs = k->getStreamSetOutputBuffers();
538        for (unsigned i = 0; i < outputs.size(); ++i) {
539            if (outputs[i] == buf) {
540                k->setBuilder(iBuilder);
541                k->setConsumedItemCount(k->getStreamOutputs()[i].name, consumed.second);
542                break;
543            }
544        }
545    }
546
547    iBuilder->CreateCondBr(terminated, pipelineExit, pipelineLoop);
548    iBuilder->SetInsertPoint(pipelineExit);
549    if (codegen::EnableCycleCounter) {
550        for (unsigned k = 0; k < kernels.size(); k++) {
551            auto & kernel = kernels[k];
552            const auto & inputs = kernel->getStreamInputs();
553            const auto & outputs = kernel->getStreamOutputs();
554            Value * items = inputs.size() > 0 ? kernel->getProcessedItemCount(inputs[0].name) : kernel->getProducedItemCount(outputs[0].name);
555            Value * fItems = iBuilder->CreateUIToFP(items, iBuilder->getDoubleTy());
556            Value * cycles = iBuilder->CreateLoad(iBuilder->CreateGEP(mCycleCounts, {iBuilder->getInt32(0), iBuilder->getInt32(k)}));
557            Value * fCycles = iBuilder->CreateUIToFP(cycles, iBuilder->getDoubleTy());
558            std::string formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
559            Value * stringPtr = iBuilder->CreatePointerCast(iBuilder->GetString(formatString), iBuilder->getInt8PtrTy());
560            iBuilder->CreateCall(iBuilder->GetDprintf(), {iBuilder->getInt32(2), stringPtr, fItems, fCycles, iBuilder->CreateFDiv(fCycles, fItems)});
561        }
562    }
563}
Note: See TracBrowser for help on using the repository browser.