source: icGREP/icgrep-devel/icgrep/toolchain/pipeline.cpp @ 5446

Last change on this file since 5446 was 5446, checked in by nmedfort, 2 years ago

Refactoring work + correction for getRawItemPointer

File size: 26.8 KB
Line 
1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
6#include "pipeline.h"
7#include <toolchain/toolchain.h>
8#include <kernels/kernel.h>
9#include <kernels/streamset.h>
10#include <llvm/IR/Module.h>
11#include <boost/container/flat_set.hpp>
12#include <boost/container/flat_map.hpp>
13#include <kernels/kernel_builder.h>
14
15#include <llvm/Support/raw_ostream.h>
16
17using namespace kernel;
18using namespace parabix;
19using namespace llvm;
20
21
22template <typename Value>
23using StreamSetBufferMap = boost::container::flat_map<const StreamSetBuffer *, Value>;
24
25template <typename Value>
26using FlatSet = boost::container::flat_set<Value>;
27
28Function * makeThreadFunction(const std::unique_ptr<kernel::KernelBuilder> & b, const std::string & name) {
29    Function * const f = Function::Create(FunctionType::get(b->getVoidTy(), {b->getVoidPtrTy()}, false), Function::InternalLinkage, name, b->getModule());
30    f->setCallingConv(CallingConv::C);
31    f->arg_begin()->setName("input");
32    return f;
33}
34
35/** ------------------------------------------------------------------------------------------------------------- *
36 * @brief generateSegmentParallelPipeline
37 *
38 * Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
39 * operating over an input stream set S, a segment-parallel implementation divides the input
40 * into segments and coordinates a set of T <= K threads to each process one segment at a time.
41 * Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
42 * fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
43 ** ------------------------------------------------------------------------------------------------------------- */
44void generateSegmentParallelPipeline(const std::unique_ptr<KernelBuilder> & iBuilder, const std::vector<Kernel *> & kernels) {
45
46    const unsigned n = kernels.size();
47    Module * const m = iBuilder->getModule();
48    IntegerType * const sizeTy = iBuilder->getSizeTy();
49    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
50    Constant * nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
51    std::vector<Type *> structTypes;
52
53    Value * instance[n];
54    for (unsigned i = 0; i < n; ++i) {
55        instance[i] = kernels[i]->getInstance();
56        structTypes.push_back(instance[i]->getType());
57    }
58    StructType * const sharedStructType = StructType::get(m->getContext(), structTypes);
59    StructType * const threadStructType = StructType::get(sharedStructType->getPointerTo(), sizeTy, nullptr);
60
61    Function * const threadFunc = makeThreadFunction(iBuilder, "segment");
62
63    // -------------------------------------------------------------------------------------------------------------------------
64    // MAKE SEGMENT PARALLEL PIPELINE THREAD
65    // -------------------------------------------------------------------------------------------------------------------------
66    const auto ip = iBuilder->saveIP();
67
68     // Create the basic blocks for the thread function.
69    BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc);
70    iBuilder->SetInsertPoint(entryBlock);
71    Value * const input = &threadFunc->getArgumentList().front();
72    Value * const threadStruct = iBuilder->CreatePointerCast(input, threadStructType->getPointerTo());
73    Value * const sharedStatePtr = iBuilder->CreateLoad(iBuilder->CreateGEP(threadStruct, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
74    for (unsigned k = 0; k < n; ++k) {
75        Value * ptr = iBuilder->CreateLoad(iBuilder->CreateGEP(sharedStatePtr, {iBuilder->getInt32(0), iBuilder->getInt32(k)}));
76        kernels[k]->setInstance(ptr);
77    }
78    Value * const segOffset = iBuilder->CreateLoad(iBuilder->CreateGEP(threadStruct, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
79
80    BasicBlock * segmentLoop = BasicBlock::Create(iBuilder->getContext(), "segmentLoop", threadFunc);
81    iBuilder->CreateBr(segmentLoop);
82
83    iBuilder->SetInsertPoint(segmentLoop);
84    PHINode * const segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 2, "segNo");
85    segNo->addIncoming(segOffset, entryBlock);
86
87    Value * terminated = iBuilder->getFalse();
88    Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
89
90    BasicBlock * segmentLoopBody = nullptr;
91    BasicBlock * const exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
92
93    StreamSetBufferMap<Value *> producedPos;
94    StreamSetBufferMap<Value *> consumedPos;
95
96    for (unsigned k = 0; k < n; ++k) {
97
98        const auto & kernel = kernels[k];
99
100        BasicBlock * const segmentWait = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Wait", threadFunc);
101        iBuilder->CreateBr(segmentWait);
102
103        segmentLoopBody = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Do", threadFunc);
104
105        iBuilder->SetInsertPoint(segmentWait);
106        const unsigned waitIdx = codegen::DebugOptionIsSet(codegen::SerializeThreads) ? (n - 1) : k;
107
108        iBuilder->setKernel(kernels[waitIdx]);
109        Value * const processedSegmentCount = iBuilder->acquireLogicalSegmentNo();
110        iBuilder->setKernel(kernel);
111
112        assert (processedSegmentCount->getType() == segNo->getType());
113        Value * const ready = iBuilder->CreateICmpEQ(segNo, processedSegmentCount);
114
115        if (kernel->hasNoTerminateAttribute()) {
116            iBuilder->CreateCondBr(ready, segmentLoopBody, segmentWait);
117        } else { // If the kernel was terminated in a previous segment then the pipeline is done.
118            BasicBlock * completionTest = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Completed", threadFunc, 0);
119            BasicBlock * exitBlock = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Exit", threadFunc, 0);
120            iBuilder->CreateCondBr(ready, completionTest, segmentWait);
121
122            iBuilder->SetInsertPoint(completionTest);
123            Value * terminationSignal = iBuilder->getTerminationSignal();
124            iBuilder->CreateCondBr(terminationSignal, exitBlock, segmentLoopBody);
125            iBuilder->SetInsertPoint(exitBlock);
126            // Ensure that the next thread will also exit.
127            iBuilder->releaseLogicalSegmentNo(nextSegNo);
128            iBuilder->CreateBr(exitThreadBlock);
129        }
130
131        // Execute the kernel segment
132        iBuilder->SetInsertPoint(segmentLoopBody);
133        const auto & inputs = kernel->getStreamInputs();
134        std::vector<Value *> args = {kernel->getInstance(), terminated};
135        for (unsigned i = 0; i < inputs.size(); ++i) {
136            const auto f = producedPos.find(kernel->getStreamSetInputBuffer(i));
137            assert (f != producedPos.end());
138            args.push_back(f->second);
139        }
140
141        iBuilder->setKernel(kernel);
142        iBuilder->createDoSegmentCall(args);
143        if (!kernel->hasNoTerminateAttribute()) {
144            terminated = iBuilder->CreateOr(terminated, iBuilder->getTerminationSignal());
145        }
146
147        const auto & outputs = kernel->getStreamOutputs();
148        for (unsigned i = 0; i < outputs.size(); ++i) {           
149            Value * const produced = iBuilder->getProducedItemCount(outputs[i].name, terminated);
150            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
151            assert (producedPos.count(buf) == 0);
152            producedPos.emplace(buf, produced);
153        }
154        for (unsigned i = 0; i < inputs.size(); ++i) {
155            Value * const processedItemCount = iBuilder->getProcessedItemCount(inputs[i].name);
156            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);           
157            auto f = consumedPos.find(buf);
158            if (f == consumedPos.end()) {
159                consumedPos.emplace(buf, processedItemCount);
160            } else {
161                Value * lesser = iBuilder->CreateICmpULT(processedItemCount, f->second);
162                f->second = iBuilder->CreateSelect(lesser, processedItemCount, f->second);
163            }
164        }
165        iBuilder->releaseLogicalSegmentNo(nextSegNo);
166    }
167
168    assert (segmentLoopBody);
169    exitThreadBlock->moveAfter(segmentLoopBody);
170
171    for (const auto consumed : consumedPos) {
172        const StreamSetBuffer * const buf = consumed.first;
173        Kernel * kernel = buf->getProducer();
174        const auto & outputs = kernel->getStreamSetOutputBuffers();
175        for (unsigned i = 0; i < outputs.size(); ++i) {
176            if (outputs[i] == buf) {
177                iBuilder->setKernel(kernel);
178                iBuilder->setConsumedItemCount(kernel->getStreamOutput(i).name, consumed.second);
179                break;
180            }
181        }
182    }
183
184    segNo->addIncoming(iBuilder->CreateAdd(segNo, iBuilder->getSize(codegen::ThreadNum)), segmentLoopBody);
185    iBuilder->CreateCondBr(terminated, exitThreadBlock, segmentLoop);
186
187    iBuilder->SetInsertPoint(exitThreadBlock);
188
189    // only call pthread_exit() within spawned threads; otherwise it'll be equivalent to calling exit() within the process
190    BasicBlock * const exitThread = BasicBlock::Create(iBuilder->getContext(), "ExitThread", threadFunc);
191    BasicBlock * const exitFunction = BasicBlock::Create(iBuilder->getContext(), "ExitProcessFunction", threadFunc);
192
193    Value * const exitCond = iBuilder->CreateICmpEQ(segOffset, ConstantInt::getNullValue(segOffset->getType()));
194    iBuilder->CreateCondBr(exitCond, exitFunction, exitThread);
195    iBuilder->SetInsertPoint(exitThread);
196    iBuilder->CreatePThreadExitCall(nullVoidPtrVal);
197    iBuilder->CreateBr(exitFunction);
198    iBuilder->SetInsertPoint(exitFunction);
199    iBuilder->CreateRetVoid();
200
201    // -------------------------------------------------------------------------------------------------------------------------
202    iBuilder->restoreIP(ip);
203
204    for (unsigned i = 0; i < n; ++i) {
205        kernels[i]->setInstance(instance[i]);
206    }
207
208    // -------------------------------------------------------------------------------------------------------------------------
209    // MAKE SEGMENT PARALLEL PIPELINE DRIVER
210    // -------------------------------------------------------------------------------------------------------------------------
211    const unsigned threads = codegen::ThreadNum - 1;
212    assert (codegen::ThreadNum > 1);
213    Type * const pthreadsTy = ArrayType::get(sizeTy, threads);
214    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
215    Value * threadIdPtr[threads];
216
217    for (unsigned i = 0; i < threads; ++i) {
218        threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
219    }
220
221    for (unsigned i = 0; i < n; ++i) {
222        iBuilder->setKernel(kernels[i]);
223        iBuilder->releaseLogicalSegmentNo(iBuilder->getSize(0));
224    }
225
226    AllocaInst * const sharedStruct = iBuilder->CreateCacheAlignedAlloca(sharedStructType);
227    for (unsigned i = 0; i < n; ++i) {
228        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
229        iBuilder->CreateStore(kernels[i]->getInstance(), ptr);
230    }
231
232    // use the process thread to handle the initial segment function after spawning (n - 1) threads to handle the subsequent offsets
233    for (unsigned i = 0; i < threads; ++i) {
234        AllocaInst * const threadState = iBuilder->CreateAlloca(threadStructType);
235        iBuilder->CreateStore(sharedStruct, iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
236        iBuilder->CreateStore(iBuilder->getSize(i + 1), iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
237        iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, threadFunc, threadState);
238    }
239
240    AllocaInst * const threadState = iBuilder->CreateAlloca(threadStructType);
241    iBuilder->CreateStore(sharedStruct, iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
242    iBuilder->CreateStore(iBuilder->getSize(0), iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
243    iBuilder->CreateCall(threadFunc, iBuilder->CreatePointerCast(threadState, voidPtrTy));
244
245    AllocaInst * const status = iBuilder->CreateAlloca(voidPtrTy);
246    for (unsigned i = 0; i < threads; ++i) {
247        Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]);
248        iBuilder->CreatePThreadJoinCall(threadId, status);
249    }
250}
251
252
253/** ------------------------------------------------------------------------------------------------------------- *
254 * @brief generateParallelPipeline
255 ** ------------------------------------------------------------------------------------------------------------- */
256void generateParallelPipeline(const std::unique_ptr<KernelBuilder> & iBuilder, const std::vector<Kernel *> &kernels) {
257
258    Module * const m = iBuilder->getModule();
259    IntegerType * const sizeTy = iBuilder->getSizeTy();
260    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
261    ConstantInt * bufferSegments = ConstantInt::get(sizeTy, codegen::BufferSegments - 1);
262    ConstantInt * segmentItems = ConstantInt::get(sizeTy, codegen::SegmentSize * iBuilder->getBitBlockWidth());
263    Constant * const nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
264
265    const unsigned n = kernels.size();
266
267    Type * const pthreadsTy = ArrayType::get(sizeTy, n);
268    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
269    Value * threadIdPtr[n];
270    for (unsigned i = 0; i < n; ++i) {
271        threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
272    }
273
274    Value * instance[n];
275    Type * structTypes[n];
276    for (unsigned i = 0; i < n; ++i) {
277        instance[i] = kernels[i]->getInstance();
278        structTypes[i] = instance[i]->getType();
279    }
280
281    Type * const sharedStructType = StructType::get(m->getContext(), ArrayRef<Type *>{structTypes, n});
282
283
284    AllocaInst * sharedStruct = iBuilder->CreateAlloca(sharedStructType);
285    for (unsigned i = 0; i < n; ++i) {
286        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
287        iBuilder->CreateStore(instance[i], ptr);
288    }
289
290    for (auto & kernel : kernels) {
291        iBuilder->setKernel(kernel);
292        iBuilder->releaseLogicalSegmentNo(iBuilder->getSize(0));
293    }
294
295    // GENERATE THE PRODUCING AND CONSUMING KERNEL MAPS
296    StreamSetBufferMap<unsigned> producingKernel;
297    StreamSetBufferMap<std::vector<unsigned>> consumingKernels;
298    for (unsigned id = 0; id < n; ++id) {
299        const auto & kernel = kernels[id];
300        const auto & inputs = kernel->getStreamInputs();
301        const auto & outputs = kernel->getStreamOutputs();
302        // add any outputs from this kernel to the producing kernel map
303        for (unsigned j = 0; j < outputs.size(); ++j) {
304            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(j);
305            if (LLVM_UNLIKELY(producingKernel.count(buf) != 0)) {
306                report_fatal_error(kernel->getName() + " redefines stream set " + outputs[j].name);
307            }
308            producingKernel.emplace(buf, id);
309        }
310        // and any inputs to the consuming kernels list
311        for (unsigned j = 0; j < inputs.size(); ++j) {
312            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(j);
313            auto f = consumingKernels.find(buf);
314            if (f == consumingKernels.end()) {
315                if (LLVM_UNLIKELY(producingKernel.count(buf) == 0)) {
316                    report_fatal_error(kernel->getName() + " uses stream set " + inputs[j].name + " prior to its definition");
317                }
318                consumingKernels.emplace(buf, std::vector<unsigned>{ id });
319            } else {
320                f->second.push_back(id);
321            }
322        }
323    }
324
325    const auto ip = iBuilder->saveIP();
326
327    // GENERATE UNIQUE PIPELINE PARALLEL THREAD FUNCTION FOR EACH KERNEL
328    FlatSet<unsigned> kernelSet;
329    kernelSet.reserve(n);
330
331    Function * thread_functions[n];
332    Value * producerSegNo[n];
333    for (unsigned id = 0; id < n; id++) {
334        const auto & kernel = kernels[id];
335
336        iBuilder->setKernel(kernel);
337
338        const auto & inputs = kernel->getStreamInputs();
339
340        Function * const threadFunc = makeThreadFunction(iBuilder, "ppt:" + kernel->getName());
341
342         // Create the basic blocks for the thread function.
343        BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc);
344        BasicBlock * outputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "outputCheck", threadFunc);
345        BasicBlock * inputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "inputCheck", threadFunc);
346        BasicBlock * doSegmentBlock = BasicBlock::Create(iBuilder->getContext(), "doSegment", threadFunc);
347        BasicBlock * exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
348
349        iBuilder->SetInsertPoint(entryBlock);
350
351        Value * sharedStruct = iBuilder->CreateBitCast(&threadFunc->getArgumentList().front(), sharedStructType->getPointerTo());
352
353        for (unsigned k = 0; k < n; k++) {
354            Value * const ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
355            kernels[k]->setInstance(iBuilder->CreateLoad(ptr));
356        }
357
358        iBuilder->CreateBr(outputCheckBlock);
359
360        // Check whether the output buffers are ready for more data
361        iBuilder->SetInsertPoint(outputCheckBlock);
362        PHINode * segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 3, "segNo");
363        segNo->addIncoming(iBuilder->getSize(0), entryBlock);
364        segNo->addIncoming(segNo, outputCheckBlock);
365
366        Value * outputWaitCond = iBuilder->getTrue();
367        for (const StreamSetBuffer * buf : kernel->getStreamSetOutputBuffers()) {
368            const auto & list = consumingKernels[buf];
369            assert(std::is_sorted(list.begin(), list.end()));
370            kernelSet.insert(list.begin(), list.end());
371        }
372        for (unsigned k : kernelSet) {
373            iBuilder->setKernel(kernels[k]);
374            Value * consumerSegNo = iBuilder->acquireLogicalSegmentNo();
375            assert (consumerSegNo->getType() == segNo->getType());
376            Value * consumedSegNo = iBuilder->CreateAdd(consumerSegNo, bufferSegments);
377            outputWaitCond = iBuilder->CreateAnd(outputWaitCond, iBuilder->CreateICmpULE(segNo, consumedSegNo));
378        }
379        kernelSet.clear();
380        iBuilder->setKernel(kernel);
381        iBuilder->CreateCondBr(outputWaitCond, inputCheckBlock, outputCheckBlock);
382
383        // Check whether the input buffers have enough data for this kernel to begin
384        iBuilder->SetInsertPoint(inputCheckBlock);
385        for (const StreamSetBuffer * buf : kernel->getStreamSetInputBuffers()) {
386            kernelSet.insert(producingKernel[buf]);
387        }
388
389        Value * inputWaitCond = iBuilder->getTrue();
390        for (unsigned k : kernelSet) {
391            iBuilder->setKernel(kernels[k]);
392            producerSegNo[k] = iBuilder->acquireLogicalSegmentNo();
393            assert (producerSegNo[k]->getType() == segNo->getType());
394            inputWaitCond = iBuilder->CreateAnd(inputWaitCond, iBuilder->CreateICmpULT(segNo, producerSegNo[k]));
395        }
396        iBuilder->setKernel(kernel);
397        iBuilder->CreateCondBr(inputWaitCond, doSegmentBlock, inputCheckBlock);
398
399        // Process the segment
400        iBuilder->SetInsertPoint(doSegmentBlock);
401
402        Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
403        Value * terminated = nullptr;
404        if (kernelSet.empty()) {
405            // if this kernel has no input streams, the kernel itself must decide when it terminates.
406            terminated = iBuilder->getTerminationSignal();
407        } else {
408            // ... otherwise the kernel terminates only when it exhausts all of its input streams
409            terminated = iBuilder->getTrue();
410            for (unsigned k : kernelSet) {
411                iBuilder->setKernel(kernels[k]);
412                terminated = iBuilder->CreateAnd(terminated, iBuilder->getTerminationSignal());
413                terminated = iBuilder->CreateAnd(terminated, iBuilder->CreateICmpEQ(nextSegNo, producerSegNo[k]));
414            }
415            kernelSet.clear();
416            iBuilder->setKernel(kernel);
417        }
418
419        std::vector<Value *> args = {kernel->getInstance(), terminated};
420        args.insert(args.end(), inputs.size(), iBuilder->CreateMul(segmentItems, segNo));
421
422        iBuilder->createDoSegmentCall(args);
423        segNo->addIncoming(nextSegNo, doSegmentBlock);
424        iBuilder->releaseLogicalSegmentNo(nextSegNo);
425
426        iBuilder->CreateCondBr(terminated, exitThreadBlock, outputCheckBlock);
427
428        iBuilder->SetInsertPoint(exitThreadBlock);
429
430        iBuilder->CreatePThreadExitCall(nullVoidPtrVal);
431
432        iBuilder->CreateRetVoid();
433
434        thread_functions[id] = threadFunc;
435    }
436
437    iBuilder->restoreIP(ip);
438
439    for (unsigned i = 0; i < n; ++i) {
440        kernels[i]->setInstance(instance[i]);
441    }
442
443    for (unsigned i = 0; i < n; ++i) {
444        iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, thread_functions[i], sharedStruct);
445    }
446
447    AllocaInst * const status = iBuilder->CreateAlloca(voidPtrTy);
448    for (unsigned i = 0; i < n; ++i) {
449        Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]);
450        iBuilder->CreatePThreadJoinCall(threadId, status);
451    }
452}
453
454/** ------------------------------------------------------------------------------------------------------------- *
455 * @brief generatePipelineLoop
456 ** ------------------------------------------------------------------------------------------------------------- */
457void generatePipelineLoop(const std::unique_ptr<KernelBuilder> & iBuilder, const std::vector<Kernel *> & kernels) {
458
459    BasicBlock * entryBlock = iBuilder->GetInsertBlock();
460    Function * main = entryBlock->getParent();
461    Value * mCycleCounts = nullptr;
462    if (codegen::EnableCycleCounter) {
463        ArrayType * cycleCountArray = ArrayType::get(iBuilder->getInt64Ty(), kernels.size());
464        mCycleCounts = iBuilder->CreateAlloca(ArrayType::get(iBuilder->getInt64Ty(), kernels.size()));
465        iBuilder->CreateStore(Constant::getNullValue(cycleCountArray), mCycleCounts);
466    }
467
468    // Create the basic blocks for the loop.
469    BasicBlock * pipelineLoop = BasicBlock::Create(iBuilder->getContext(), "pipelineLoop", main);
470    BasicBlock * pipelineExit = BasicBlock::Create(iBuilder->getContext(), "pipelineExit", main);
471
472    StreamSetBufferMap<Value *> producedPos;
473    StreamSetBufferMap<Value *> consumedPos;
474
475    iBuilder->CreateBr(pipelineLoop);
476    iBuilder->SetInsertPoint(pipelineLoop);
477   
478    Value * cycleCountStart = nullptr;
479    Value * cycleCountEnd = nullptr;
480    if (codegen::EnableCycleCounter) {
481        cycleCountStart = iBuilder->CreateReadCycleCounter();
482    }
483    Value * terminated = iBuilder->getFalse();
484    for (unsigned k = 0; k < kernels.size(); k++) {
485
486        auto & kernel = kernels[k];
487
488        iBuilder->setKernel(kernel);
489        const auto & inputs = kernel->getStreamInputs();
490        const auto & outputs = kernel->getStreamOutputs();
491
492        std::vector<Value *> args = {kernel->getInstance(), terminated};
493        for (unsigned i = 0; i < inputs.size(); ++i) {
494            const auto f = producedPos.find(kernel->getStreamSetInputBuffer(i));
495            if (LLVM_UNLIKELY(f == producedPos.end())) {
496                report_fatal_error(kernel->getName() + " uses stream set " + inputs[i].name + " prior to its definition");
497            }
498            args.push_back(f->second);
499        }
500
501        iBuilder->createDoSegmentCall(args);
502        if (!kernel->hasNoTerminateAttribute()) {
503            Value * terminatedSignal = iBuilder->getTerminationSignal();
504            terminated = iBuilder->CreateOr(terminated, terminatedSignal);
505        }
506        for (unsigned i = 0; i < outputs.size(); ++i) {
507            Value * const produced = iBuilder->getProducedItemCount(outputs[i].name, terminated);
508            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
509            assert (producedPos.count(buf) == 0);
510            producedPos.emplace(buf, produced);
511        }
512
513        for (unsigned i = 0; i < inputs.size(); ++i) {
514            Value * const processedItemCount = iBuilder->getProcessedItemCount(inputs[i].name);
515            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);
516            auto f = consumedPos.find(buf);
517            if (f == consumedPos.end()) {
518                consumedPos.emplace(buf, processedItemCount);
519            } else {
520                Value * lesser = iBuilder->CreateICmpULT(processedItemCount, f->second);
521                f->second = iBuilder->CreateSelect(lesser, processedItemCount, f->second);
522            }
523        }
524        if (codegen::EnableCycleCounter) {
525            cycleCountEnd = iBuilder->CreateReadCycleCounter();
526            Value * counterPtr = iBuilder->CreateGEP(mCycleCounts, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
527            iBuilder->CreateStore(iBuilder->CreateAdd(iBuilder->CreateLoad(counterPtr), iBuilder->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
528            cycleCountStart = cycleCountEnd;
529        }
530
531
532        Value * const segNo = iBuilder->acquireLogicalSegmentNo();
533        Value * nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
534        iBuilder->releaseLogicalSegmentNo(nextSegNo);
535    }
536
537    for (const auto consumed : consumedPos) {
538        const StreamSetBuffer * const buf = consumed.first;
539        Kernel * k = buf->getProducer();
540        const auto & outputs = k->getStreamSetOutputBuffers();
541        for (unsigned i = 0; i < outputs.size(); ++i) {
542            if (outputs[i] == buf) {
543                iBuilder->setKernel(k);
544                iBuilder->setConsumedItemCount(k->getStreamOutput(i).name, consumed.second);
545                break;
546            }
547        }
548    }
549
550    iBuilder->CreateCondBr(terminated, pipelineExit, pipelineLoop);
551    iBuilder->SetInsertPoint(pipelineExit);
552    if (codegen::EnableCycleCounter) {
553        for (unsigned k = 0; k < kernels.size(); k++) {
554            auto & kernel = kernels[k];
555            iBuilder->setKernel(kernel);
556            const auto & inputs = kernel->getStreamInputs();
557            const auto & outputs = kernel->getStreamOutputs();
558            Value * items = nullptr;
559            if (inputs.empty()) {
560                items = iBuilder->getProducedItemCount(outputs[0].name);
561            } else {
562                items = iBuilder->getProcessedItemCount(inputs[0].name);
563            }
564            Value * fItems = iBuilder->CreateUIToFP(items, iBuilder->getDoubleTy());
565            Value * cycles = iBuilder->CreateLoad(iBuilder->CreateGEP(mCycleCounts, {iBuilder->getInt32(0), iBuilder->getInt32(k)}));
566            Value * fCycles = iBuilder->CreateUIToFP(cycles, iBuilder->getDoubleTy());
567            std::string formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
568            Value * stringPtr = iBuilder->CreatePointerCast(iBuilder->GetString(formatString), iBuilder->getInt8PtrTy());
569            iBuilder->CreateCall(iBuilder->GetDprintf(), {iBuilder->getInt32(2), stringPtr, fItems, fCycles, iBuilder->CreateFDiv(fCycles, fItems)});
570        }
571    }
572}
Note: See TracBrowser for help on using the repository browser.