source: icGREP/icgrep-devel/icgrep/toolchain/pipeline.cpp @ 5436

Last change on this file since 5436 was 5436, checked in by nmedfort, 2 years ago

Continued refactoring work. PabloKernel? now abstract base type with a 'generatePabloMethod' hook to generate Pablo code.

File size: 26.7 KB
Line 
1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
6#include "pipeline.h"
7#include <toolchain/toolchain.h>
8#include <kernels/kernel.h>
9#include <kernels/streamset.h>
10#include <llvm/IR/Module.h>
11#include <boost/container/flat_set.hpp>
12#include <boost/container/flat_map.hpp>
13#include <kernels/kernel_builder.h>
14
15using namespace kernel;
16using namespace parabix;
17using namespace llvm;
18
19
20template <typename Value>
21using StreamSetBufferMap = boost::container::flat_map<const StreamSetBuffer *, Value>;
22
23template <typename Value>
24using FlatSet = boost::container::flat_set<Value>;
25
26Function * makeThreadFunction(const std::unique_ptr<kernel::KernelBuilder> & b, const std::string & name) {
27    Function * const f = Function::Create(FunctionType::get(b->getVoidTy(), {b->getVoidPtrTy()}, false), Function::InternalLinkage, name, b->getModule());
28    f->setCallingConv(CallingConv::C);
29    f->arg_begin()->setName("input");
30    return f;
31}
32
33/** ------------------------------------------------------------------------------------------------------------- *
34 * @brief generateSegmentParallelPipeline
35 *
36 * Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
37 * operating over an input stream set S, a segment-parallel implementation divides the input
38 * into segments and coordinates a set of T <= K threads to each process one segment at a time.
39 * Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
40 * fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
41 ** ------------------------------------------------------------------------------------------------------------- */
42void generateSegmentParallelPipeline(const std::unique_ptr<KernelBuilder> & iBuilder, const std::vector<Kernel *> & kernels) {
43
44    const unsigned n = kernels.size();
45    Module * const m = iBuilder->getModule();
46    IntegerType * const sizeTy = iBuilder->getSizeTy();
47    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
48    Constant * nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
49    std::vector<Type *> structTypes;
50
51    Value * instance[n];
52    for (unsigned i = 0; i < n; ++i) {
53        instance[i] = kernels[i]->getInstance();
54        structTypes.push_back(instance[i]->getType());
55    }
56    StructType * const sharedStructType = StructType::get(m->getContext(), structTypes);
57    StructType * const threadStructType = StructType::get(sharedStructType->getPointerTo(), sizeTy, nullptr);
58
59    Function * const threadFunc = makeThreadFunction(iBuilder, "segment");
60
61    // -------------------------------------------------------------------------------------------------------------------------
62    // MAKE SEGMENT PARALLEL PIPELINE THREAD
63    // -------------------------------------------------------------------------------------------------------------------------
64    const auto ip = iBuilder->saveIP();
65
66     // Create the basic blocks for the thread function.
67    BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc);
68    iBuilder->SetInsertPoint(entryBlock);
69    Value * const input = &threadFunc->getArgumentList().front();
70    Value * const threadStruct = iBuilder->CreatePointerCast(input, threadStructType->getPointerTo());
71    Value * const sharedStatePtr = iBuilder->CreateLoad(iBuilder->CreateGEP(threadStruct, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
72    for (unsigned k = 0; k < n; ++k) {
73        Value * ptr = iBuilder->CreateLoad(iBuilder->CreateGEP(sharedStatePtr, {iBuilder->getInt32(0), iBuilder->getInt32(k)}));
74        kernels[k]->setInstance(ptr);
75    }
76    Value * const segOffset = iBuilder->CreateLoad(iBuilder->CreateGEP(threadStruct, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
77
78    BasicBlock * segmentLoop = BasicBlock::Create(iBuilder->getContext(), "segmentLoop", threadFunc);
79    iBuilder->CreateBr(segmentLoop);
80
81    iBuilder->SetInsertPoint(segmentLoop);
82    PHINode * const segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 2, "segNo");
83    segNo->addIncoming(segOffset, entryBlock);
84
85    Value * terminated = iBuilder->getFalse();
86    Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
87
88    BasicBlock * segmentLoopBody = nullptr;
89    BasicBlock * const exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
90
91    StreamSetBufferMap<Value *> producedPos;
92    StreamSetBufferMap<Value *> consumedPos;
93
94    for (unsigned k = 0; k < n; ++k) {
95
96        const auto & kernel = kernels[k];
97
98        BasicBlock * const segmentWait = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Wait", threadFunc);
99        iBuilder->CreateBr(segmentWait);
100
101        segmentLoopBody = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Do", threadFunc);
102
103        iBuilder->SetInsertPoint(segmentWait);
104        const unsigned waitIdx = codegen::DebugOptionIsSet(codegen::SerializeThreads) ? (n - 1) : k;
105        kernels[waitIdx]->setBuilder(iBuilder);
106        Value * const processedSegmentCount = kernels[waitIdx]->acquireLogicalSegmentNo();
107
108
109        assert (processedSegmentCount->getType() == segNo->getType());
110        Value * const ready = iBuilder->CreateICmpEQ(segNo, processedSegmentCount);
111
112        if (kernel->hasNoTerminateAttribute()) {
113            iBuilder->CreateCondBr(ready, segmentLoopBody, segmentWait);
114        } else { // If the kernel was terminated in a previous segment then the pipeline is done.
115            BasicBlock * completionTest = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Completed", threadFunc, 0);
116            BasicBlock * exitBlock = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Exit", threadFunc, 0);
117            iBuilder->CreateCondBr(ready, completionTest, segmentWait);
118
119            iBuilder->SetInsertPoint(completionTest);
120
121            kernel->setBuilder(iBuilder);
122            Value * terminationSignal = kernel->getTerminationSignal();
123
124            iBuilder->CreateCondBr(terminationSignal, exitBlock, segmentLoopBody);
125            iBuilder->SetInsertPoint(exitBlock);
126            // Ensure that the next thread will also exit.
127            kernel->releaseLogicalSegmentNo(nextSegNo);
128            iBuilder->CreateBr(exitThreadBlock);
129        }
130
131        // Execute the kernel segment
132        iBuilder->SetInsertPoint(segmentLoopBody);
133        const auto & inputs = kernel->getStreamInputs();
134        std::vector<Value *> args = {kernel->getInstance(), terminated};
135        for (unsigned i = 0; i < inputs.size(); ++i) {
136            const auto f = producedPos.find(kernel->getStreamSetInputBuffer(i));
137            assert (f != producedPos.end());
138            args.push_back(f->second);
139        }
140
141        kernel->setBuilder(iBuilder);
142        kernel->createDoSegmentCall(args);       
143        if (!kernel->hasNoTerminateAttribute()) {
144            terminated = iBuilder->CreateOr(terminated, kernel->getTerminationSignal());
145        }
146
147        const auto & outputs = kernel->getStreamOutputs();
148        for (unsigned i = 0; i < outputs.size(); ++i) {
149            kernel->setBuilder(iBuilder);
150            Value * const produced = kernel->getProducedItemCount(outputs[i].name, terminated);
151            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
152            assert (producedPos.count(buf) == 0);
153            producedPos.emplace(buf, produced);
154        }
155        for (unsigned i = 0; i < inputs.size(); ++i) {
156            kernel->setBuilder(iBuilder);
157            Value * const processedItemCount = kernel->getProcessedItemCount(inputs[i].name);
158            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);           
159            auto f = consumedPos.find(buf);
160            if (f == consumedPos.end()) {
161                consumedPos.emplace(buf, processedItemCount);
162            } else {
163                Value * lesser = iBuilder->CreateICmpULT(processedItemCount, f->second);
164                f->second = iBuilder->CreateSelect(lesser, processedItemCount, f->second);
165            }
166        }
167        kernel->setBuilder(iBuilder);
168        kernel->releaseLogicalSegmentNo(nextSegNo);
169    }
170
171    assert (segmentLoopBody);
172    exitThreadBlock->moveAfter(segmentLoopBody);
173
174    for (const auto consumed : consumedPos) {
175        const StreamSetBuffer * const buf = consumed.first;
176        Kernel * kernel = buf->getProducer();
177        const auto & outputs = kernel->getStreamSetOutputBuffers();
178        for (unsigned i = 0; i < outputs.size(); ++i) {
179            if (outputs[i] == buf) {
180                kernel->setBuilder(iBuilder);
181                kernel->setConsumedItemCount(kernel->getStreamOutputs()[i].name, consumed.second);
182                break;
183            }
184        }
185    }
186
187    segNo->addIncoming(iBuilder->CreateAdd(segNo, iBuilder->getSize(codegen::ThreadNum)), segmentLoopBody);
188    iBuilder->CreateCondBr(terminated, exitThreadBlock, segmentLoop);
189
190    iBuilder->SetInsertPoint(exitThreadBlock);
191
192    // only call pthread_exit() within spawned threads; otherwise it'll be equivalent to calling exit() within the process
193    BasicBlock * const exitThread = BasicBlock::Create(iBuilder->getContext(), "ExitThread", threadFunc);
194    BasicBlock * const exitFunction = BasicBlock::Create(iBuilder->getContext(), "ExitProcessFunction", threadFunc);
195
196    Value * const exitCond = iBuilder->CreateICmpEQ(segOffset, ConstantInt::getNullValue(segOffset->getType()));
197    iBuilder->CreateCondBr(exitCond, exitFunction, exitThread);
198    iBuilder->SetInsertPoint(exitThread);
199    iBuilder->CreatePThreadExitCall(nullVoidPtrVal);
200    iBuilder->CreateBr(exitFunction);
201    iBuilder->SetInsertPoint(exitFunction);
202    iBuilder->CreateRetVoid();
203
204    // -------------------------------------------------------------------------------------------------------------------------
205    iBuilder->restoreIP(ip);
206
207    for (unsigned i = 0; i < n; ++i) {
208        kernels[i]->setInstance(instance[i]);
209    }
210
211    // -------------------------------------------------------------------------------------------------------------------------
212    // MAKE SEGMENT PARALLEL PIPELINE DRIVER
213    // -------------------------------------------------------------------------------------------------------------------------
214    const unsigned threads = codegen::ThreadNum - 1;
215    assert (codegen::ThreadNum > 1);
216    Type * const pthreadsTy = ArrayType::get(sizeTy, threads);
217    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
218    Value * threadIdPtr[threads];
219
220    for (unsigned i = 0; i < threads; ++i) {
221        threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
222    }
223
224    for (unsigned i = 0; i < n; ++i) {
225        auto kernel = kernels[i];
226        kernel->setBuilder(iBuilder);
227        kernel->releaseLogicalSegmentNo(iBuilder->getSize(0));
228    }
229
230    AllocaInst * const sharedStruct = iBuilder->CreateCacheAlignedAlloca(sharedStructType);
231    for (unsigned i = 0; i < n; ++i) {
232        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
233        iBuilder->CreateStore(kernels[i]->getInstance(), ptr);
234    }
235
236    // use the process thread to handle the initial segment function after spawning (n - 1) threads to handle the subsequent offsets
237    for (unsigned i = 0; i < threads; ++i) {
238        AllocaInst * const threadState = iBuilder->CreateAlloca(threadStructType);
239        iBuilder->CreateStore(sharedStruct, iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
240        iBuilder->CreateStore(iBuilder->getSize(i + 1), iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
241        iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, threadFunc, threadState);
242    }
243
244    AllocaInst * const threadState = iBuilder->CreateAlloca(threadStructType);
245    iBuilder->CreateStore(sharedStruct, iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
246    iBuilder->CreateStore(iBuilder->getSize(0), iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
247    iBuilder->CreateCall(threadFunc, iBuilder->CreatePointerCast(threadState, voidPtrTy));
248
249    AllocaInst * const status = iBuilder->CreateAlloca(voidPtrTy);
250    for (unsigned i = 0; i < threads; ++i) {
251        Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]);
252        iBuilder->CreatePThreadJoinCall(threadId, status);
253    }
254}
255
256
257/** ------------------------------------------------------------------------------------------------------------- *
258 * @brief generateParallelPipeline
259 ** ------------------------------------------------------------------------------------------------------------- */
260void generateParallelPipeline(const std::unique_ptr<KernelBuilder> & iBuilder, const std::vector<Kernel *> &kernels) {
261
262    Module * const m = iBuilder->getModule();
263    IntegerType * const sizeTy = iBuilder->getSizeTy();
264    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
265    ConstantInt * bufferSegments = ConstantInt::get(sizeTy, codegen::BufferSegments - 1);
266    ConstantInt * segmentItems = ConstantInt::get(sizeTy, codegen::SegmentSize * iBuilder->getBitBlockWidth());
267    Constant * const nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
268
269    const unsigned n = kernels.size();
270
271    Type * const pthreadsTy = ArrayType::get(sizeTy, n);
272    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
273    Value * threadIdPtr[n];
274    for (unsigned i = 0; i < n; ++i) {
275        threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
276    }
277
278    Value * instance[n];
279    Type * structTypes[n];
280    for (unsigned i = 0; i < n; ++i) {
281        instance[i] = kernels[i]->getInstance();
282        structTypes[i] = instance[i]->getType();
283    }
284
285    Type * const sharedStructType = StructType::get(m->getContext(), ArrayRef<Type *>{structTypes, n});
286
287
288    AllocaInst * sharedStruct = iBuilder->CreateAlloca(sharedStructType);
289    for (unsigned i = 0; i < n; ++i) {
290        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
291        iBuilder->CreateStore(instance[i], ptr);
292    }
293
294    for (auto & kernel : kernels) {
295        kernel->setBuilder(iBuilder);
296        kernel->releaseLogicalSegmentNo(iBuilder->getSize(0));
297    }
298
299    // GENERATE THE PRODUCING AND CONSUMING KERNEL MAPS
300    StreamSetBufferMap<unsigned> producingKernel;
301    StreamSetBufferMap<std::vector<unsigned>> consumingKernels;
302    for (unsigned id = 0; id < n; ++id) {
303        const auto & kernel = kernels[id];
304        const auto & inputs = kernel->getStreamInputs();
305        const auto & outputs = kernel->getStreamOutputs();
306        // add any outputs from this kernel to the producing kernel map
307        for (unsigned j = 0; j < outputs.size(); ++j) {
308            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(j);
309            if (LLVM_UNLIKELY(producingKernel.count(buf) != 0)) {
310                report_fatal_error(kernel->getName() + " redefines stream set " + outputs[j].name);
311            }
312            producingKernel.emplace(buf, id);
313        }
314        // and any inputs to the consuming kernels list
315        for (unsigned j = 0; j < inputs.size(); ++j) {
316            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(j);
317            auto f = consumingKernels.find(buf);
318            if (f == consumingKernels.end()) {
319                if (LLVM_UNLIKELY(producingKernel.count(buf) == 0)) {
320                    report_fatal_error(kernel->getName() + " uses stream set " + inputs[j].name + " prior to its definition");
321                }
322                consumingKernels.emplace(buf, std::vector<unsigned>{ id });
323            } else {
324                f->second.push_back(id);
325            }
326        }
327    }
328
329    const auto ip = iBuilder->saveIP();
330
331    // GENERATE UNIQUE PIPELINE PARALLEL THREAD FUNCTION FOR EACH KERNEL
332    FlatSet<unsigned> kernelSet;
333    kernelSet.reserve(n);
334
335    Function * thread_functions[n];
336    Value * producerSegNo[n];
337    for (unsigned id = 0; id < n; id++) {
338        const auto & kernel = kernels[id];
339        const auto & inputs = kernel->getStreamInputs();
340
341        Function * const threadFunc = makeThreadFunction(iBuilder, "ppt:" + kernel->getName());
342
343         // Create the basic blocks for the thread function.
344        BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc);
345        BasicBlock * outputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "outputCheck", threadFunc);
346        BasicBlock * inputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "inputCheck", threadFunc);
347        BasicBlock * doSegmentBlock = BasicBlock::Create(iBuilder->getContext(), "doSegment", threadFunc);
348        BasicBlock * exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
349
350        iBuilder->SetInsertPoint(entryBlock);
351
352        Value * sharedStruct = iBuilder->CreateBitCast(&threadFunc->getArgumentList().front(), sharedStructType->getPointerTo());
353
354        for (unsigned k = 0; k < n; k++) {
355            Value * const ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
356            kernels[k]->setInstance(iBuilder->CreateLoad(ptr));
357        }
358
359        iBuilder->CreateBr(outputCheckBlock);
360
361        // Check whether the output buffers are ready for more data
362        iBuilder->SetInsertPoint(outputCheckBlock);
363        PHINode * segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 3, "segNo");
364        segNo->addIncoming(iBuilder->getSize(0), entryBlock);
365        segNo->addIncoming(segNo, outputCheckBlock);
366
367        Value * outputWaitCond = iBuilder->getTrue();
368        for (const StreamSetBuffer * buf : kernel->getStreamSetOutputBuffers()) {
369            const auto & list = consumingKernels[buf];
370            assert(std::is_sorted(list.begin(), list.end()));
371            kernelSet.insert(list.begin(), list.end());
372        }
373        for (unsigned k : kernelSet) {
374            Value * consumerSegNo = kernels[k]->acquireLogicalSegmentNo();
375            assert (consumerSegNo->getType() == segNo->getType());
376            Value * consumedSegNo = iBuilder->CreateAdd(consumerSegNo, bufferSegments);
377            outputWaitCond = iBuilder->CreateAnd(outputWaitCond, iBuilder->CreateICmpULE(segNo, consumedSegNo));
378        }
379        kernelSet.clear();
380        iBuilder->CreateCondBr(outputWaitCond, inputCheckBlock, outputCheckBlock);
381
382        // Check whether the input buffers have enough data for this kernel to begin
383        iBuilder->SetInsertPoint(inputCheckBlock);
384        for (const StreamSetBuffer * buf : kernel->getStreamSetInputBuffers()) {
385            kernelSet.insert(producingKernel[buf]);
386        }
387
388        Value * inputWaitCond = iBuilder->getTrue();
389        for (unsigned k : kernelSet) {
390            Kernel * kernel = kernels[k];
391            kernel->setBuilder(iBuilder);
392            producerSegNo[k] = kernel->acquireLogicalSegmentNo();
393            assert (producerSegNo[k]->getType() == segNo->getType());
394            inputWaitCond = iBuilder->CreateAnd(inputWaitCond, iBuilder->CreateICmpULT(segNo, producerSegNo[k]));
395        }
396        iBuilder->CreateCondBr(inputWaitCond, doSegmentBlock, inputCheckBlock);
397
398        // Process the segment
399        iBuilder->SetInsertPoint(doSegmentBlock);
400
401        Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
402        Value * terminated = nullptr;
403        if (kernelSet.empty()) {
404            // if this kernel has no input streams, the kernel itself must decide when it terminates.
405            terminated = kernel->getTerminationSignal();
406        } else {
407            // ... otherwise the kernel terminates only when it exhausts all of its input streams
408            terminated = iBuilder->getTrue();
409            for (unsigned k : kernelSet) {
410                terminated = iBuilder->CreateAnd(terminated, kernels[k]->getTerminationSignal());
411                terminated = iBuilder->CreateAnd(terminated, iBuilder->CreateICmpEQ(nextSegNo, producerSegNo[k]));
412            }
413            kernelSet.clear();
414        }
415
416        std::vector<Value *> args = {kernel->getInstance(), terminated};
417        args.insert(args.end(), inputs.size(), iBuilder->CreateMul(segmentItems, segNo));
418
419        kernel->createDoSegmentCall(args);
420        segNo->addIncoming(nextSegNo, doSegmentBlock);
421        kernel->releaseLogicalSegmentNo(nextSegNo);
422
423        iBuilder->CreateCondBr(terminated, exitThreadBlock, outputCheckBlock);
424
425        iBuilder->SetInsertPoint(exitThreadBlock);
426
427        iBuilder->CreatePThreadExitCall(nullVoidPtrVal);
428
429        iBuilder->CreateRetVoid();
430
431        thread_functions[id] = threadFunc;
432    }
433
434    iBuilder->restoreIP(ip);
435
436    for (unsigned i = 0; i < n; ++i) {
437        kernels[i]->setInstance(instance[i]);
438    }
439
440    for (unsigned i = 0; i < n; ++i) {
441        iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, thread_functions[i], sharedStruct);
442    }
443
444    AllocaInst * const status = iBuilder->CreateAlloca(voidPtrTy);
445    for (unsigned i = 0; i < n; ++i) {
446        Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]);
447        iBuilder->CreatePThreadJoinCall(threadId, status);
448    }
449}
450
451/** ------------------------------------------------------------------------------------------------------------- *
452 * @brief generatePipelineLoop
453 ** ------------------------------------------------------------------------------------------------------------- */
454void generatePipelineLoop(const std::unique_ptr<KernelBuilder> & iBuilder, const std::vector<Kernel *> & kernels) {
455
456    BasicBlock * entryBlock = iBuilder->GetInsertBlock();
457    Function * main = entryBlock->getParent();
458    Value * mCycleCounts = nullptr;
459    if (codegen::EnableCycleCounter) {
460        ArrayType * cycleCountArray = ArrayType::get(iBuilder->getInt64Ty(), kernels.size());
461        mCycleCounts = iBuilder->CreateAlloca(ArrayType::get(iBuilder->getInt64Ty(), kernels.size()));
462        iBuilder->CreateStore(Constant::getNullValue(cycleCountArray), mCycleCounts);
463    }
464
465    // Create the basic blocks for the loop.
466    BasicBlock * pipelineLoop = BasicBlock::Create(iBuilder->getContext(), "pipelineLoop", main);
467    BasicBlock * pipelineExit = BasicBlock::Create(iBuilder->getContext(), "pipelineExit", main);
468
469    StreamSetBufferMap<Value *> producedPos;
470    StreamSetBufferMap<Value *> consumedPos;
471
472    iBuilder->CreateBr(pipelineLoop);
473    iBuilder->SetInsertPoint(pipelineLoop);
474   
475    Value * cycleCountStart = nullptr;
476    Value * cycleCountEnd = nullptr;
477    if (codegen::EnableCycleCounter) {
478        cycleCountStart = iBuilder->CreateReadCycleCounter();
479    }
480    Value * terminated = iBuilder->getFalse();
481    for (unsigned k = 0; k < kernels.size(); k++) {
482        auto & kernel = kernels[k];
483
484        const auto & inputs = kernel->getStreamInputs();
485        const auto & outputs = kernel->getStreamOutputs();
486
487        std::vector<Value *> args = {kernel->getInstance(), terminated};
488        for (unsigned i = 0; i < inputs.size(); ++i) {
489            const auto f = producedPos.find(kernel->getStreamSetInputBuffer(i));
490            if (LLVM_UNLIKELY(f == producedPos.end())) {
491                report_fatal_error(kernel->getName() + " uses stream set " + inputs[i].name + " prior to its definition");
492            }
493            args.push_back(f->second);
494        }
495
496        kernel->setBuilder(iBuilder);
497        kernel->createDoSegmentCall(args);
498        if (!kernel->hasNoTerminateAttribute()) {
499            Value * terminatedSignal = kernel->getTerminationSignal();
500            terminated = iBuilder->CreateOr(terminated, terminatedSignal);
501            kernel->setBuilder(iBuilder);
502        }
503        for (unsigned i = 0; i < outputs.size(); ++i) {
504            Value * const produced = kernel->getProducedItemCount(outputs[i].name, terminated);
505            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
506            assert (producedPos.count(buf) == 0);
507            producedPos.emplace(buf, produced);
508        }
509
510        for (unsigned i = 0; i < inputs.size(); ++i) {
511            kernel->setBuilder(iBuilder);
512            Value * const processedItemCount = kernel->getProcessedItemCount(inputs[i].name);
513            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);
514
515            auto f = consumedPos.find(buf);
516            if (f == consumedPos.end()) {
517                consumedPos.emplace(buf, processedItemCount);
518            } else {
519                Value * lesser = iBuilder->CreateICmpULT(processedItemCount, f->second);
520                f->second = iBuilder->CreateSelect(lesser, processedItemCount, f->second);
521            }
522        }
523        if (codegen::EnableCycleCounter) {
524            cycleCountEnd = iBuilder->CreateReadCycleCounter();
525            Value * counterPtr = iBuilder->CreateGEP(mCycleCounts, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
526            iBuilder->CreateStore(iBuilder->CreateAdd(iBuilder->CreateLoad(counterPtr), iBuilder->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
527            cycleCountStart = cycleCountEnd;
528        }
529
530        kernel->setBuilder(iBuilder);
531        Value * const segNo = kernel->acquireLogicalSegmentNo();
532        Value * nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
533        kernel->releaseLogicalSegmentNo(nextSegNo);
534    }
535
536    for (const auto consumed : consumedPos) {
537        const StreamSetBuffer * const buf = consumed.first;
538        Kernel * k = buf->getProducer();
539        const auto & outputs = k->getStreamSetOutputBuffers();
540        for (unsigned i = 0; i < outputs.size(); ++i) {
541            if (outputs[i] == buf) {
542                k->setBuilder(iBuilder);
543                k->setConsumedItemCount(k->getStreamOutputs()[i].name, consumed.second);
544                break;
545            }
546        }
547    }
548
549    iBuilder->CreateCondBr(terminated, pipelineExit, pipelineLoop);
550    iBuilder->SetInsertPoint(pipelineExit);
551    if (codegen::EnableCycleCounter) {
552        for (unsigned k = 0; k < kernels.size(); k++) {
553            auto & kernel = kernels[k];
554            const auto & inputs = kernel->getStreamInputs();
555            const auto & outputs = kernel->getStreamOutputs();
556            Value * items = inputs.size() > 0 ? kernel->getProcessedItemCount(inputs[0].name) : kernel->getProducedItemCount(outputs[0].name);
557            Value * fItems = iBuilder->CreateUIToFP(items, iBuilder->getDoubleTy());
558            Value * cycles = iBuilder->CreateLoad(iBuilder->CreateGEP(mCycleCounts, {iBuilder->getInt32(0), iBuilder->getInt32(k)}));
559            Value * fCycles = iBuilder->CreateUIToFP(cycles, iBuilder->getDoubleTy());
560            std::string formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
561            Value * stringPtr = iBuilder->CreatePointerCast(iBuilder->GetString(formatString), iBuilder->getInt8PtrTy());
562            iBuilder->CreateCall(iBuilder->GetDprintf(), {iBuilder->getInt32(2), stringPtr, fItems, fCycles, iBuilder->CreateFDiv(fCycles, fItems)});
563        }
564    }
565}
Note: See TracBrowser for help on using the repository browser.