source: icGREP/icgrep-devel/icgrep/toolchain/pipeline.cpp @ 5474

Last change on this file since 5474 was 5474, checked in by nmedfort, 2 years ago

Eliminated ExecutionEngine? memory leak. Intentionally broke compatibility with prior versions to ensure unchecked in projects are restructured.

File size: 28.5 KB
Line 
1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
6#include "pipeline.h"
7#include <toolchain/toolchain.h>
8#include <kernels/kernel.h>
9#include <kernels/streamset.h>
10#include <llvm/IR/Module.h>
11#include <boost/container/flat_set.hpp>
12#include <boost/container/flat_map.hpp>
13#include <kernels/kernel_builder.h>
14
15#include <llvm/Support/raw_ostream.h>
16
17using namespace kernel;
18using namespace parabix;
19using namespace llvm;
20
21
22template <typename Value>
23using StreamSetBufferMap = boost::container::flat_map<const StreamSetBuffer *, Value>;
24
25template <typename Value>
26using FlatSet = boost::container::flat_set<Value>;
27
28Function * makeThreadFunction(const std::unique_ptr<kernel::KernelBuilder> & b, const std::string & name) {
29    Function * const f = Function::Create(FunctionType::get(b->getVoidTy(), {b->getVoidPtrTy()}, false), Function::InternalLinkage, name, b->getModule());
30    f->setCallingConv(CallingConv::C);
31    f->arg_begin()->setName("input");
32    return f;
33}
34
35/** ------------------------------------------------------------------------------------------------------------- *
36 * @brief generateSegmentParallelPipeline
37 *
38 * Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
39 * operating over an input stream set S, a segment-parallel implementation divides the input
40 * into segments and coordinates a set of T <= K threads to each process one segment at a time.
41 * Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
42 * fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
43 ** ------------------------------------------------------------------------------------------------------------- */
44void generateSegmentParallelPipeline(const std::unique_ptr<KernelBuilder> & iBuilder, const std::vector<Kernel *> & kernels) {
45
46    const unsigned n = kernels.size();
47    Module * const m = iBuilder->getModule();
48    IntegerType * const sizeTy = iBuilder->getSizeTy();
49    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
50    Constant * nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
51    std::vector<Type *> structTypes;
52
53    Value * instance[n];
54    for (unsigned i = 0; i < n; ++i) {
55        instance[i] = kernels[i]->getInstance();
56        structTypes.push_back(instance[i]->getType());
57    }
58    StructType * const sharedStructType = StructType::get(m->getContext(), structTypes);
59    StructType * const threadStructType = StructType::get(sharedStructType->getPointerTo(), sizeTy, nullptr);
60
61    Function * const threadFunc = makeThreadFunction(iBuilder, "segment");
62
63    // -------------------------------------------------------------------------------------------------------------------------
64    // MAKE SEGMENT PARALLEL PIPELINE THREAD
65    // -------------------------------------------------------------------------------------------------------------------------
66    const auto ip = iBuilder->saveIP();
67
68     // Create the basic blocks for the thread function.
69    BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc);
70    iBuilder->SetInsertPoint(entryBlock);
71    Value * const input = &threadFunc->getArgumentList().front();
72    Value * const threadStruct = iBuilder->CreatePointerCast(input, threadStructType->getPointerTo());
73    Value * const sharedStatePtr = iBuilder->CreateLoad(iBuilder->CreateGEP(threadStruct, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
74    for (unsigned k = 0; k < n; ++k) {
75        Value * ptr = iBuilder->CreateLoad(iBuilder->CreateGEP(sharedStatePtr, {iBuilder->getInt32(0), iBuilder->getInt32(k)}));
76        kernels[k]->setInstance(ptr);
77    }
78    Value * const segOffset = iBuilder->CreateLoad(iBuilder->CreateGEP(threadStruct, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
79
80    BasicBlock * segmentLoop = BasicBlock::Create(iBuilder->getContext(), "segmentLoop", threadFunc);
81    iBuilder->CreateBr(segmentLoop);
82
83    iBuilder->SetInsertPoint(segmentLoop);
84    PHINode * const segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 2, "segNo");
85    segNo->addIncoming(segOffset, entryBlock);
86
87    Value * terminated = iBuilder->getFalse();
88    Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
89
90    BasicBlock * segmentLoopBody = nullptr;
91    BasicBlock * const exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
92
93    StreamSetBufferMap<Value *> producedPos;
94    StreamSetBufferMap<Value *> consumedPos;
95
96    Value * cycleCountStart = nullptr;
97    Value * cycleCountEnd = nullptr;
98    if (codegen::EnableCycleCounter) {
99        cycleCountStart = iBuilder->CreateReadCycleCounter();
100    }
101
102    for (unsigned k = 0; k < n; ++k) {
103
104        const auto & kernel = kernels[k];
105
106        BasicBlock * const segmentWait = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Wait", threadFunc);
107        iBuilder->CreateBr(segmentWait);
108
109        segmentLoopBody = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Do", threadFunc);
110
111        iBuilder->SetInsertPoint(segmentWait);
112        const unsigned waitIdx = codegen::DebugOptionIsSet(codegen::SerializeThreads) ? (n - 1) : k;
113
114        iBuilder->setKernel(kernels[waitIdx]);
115        Value * const processedSegmentCount = iBuilder->acquireLogicalSegmentNo();
116        iBuilder->setKernel(kernel);
117
118        assert (processedSegmentCount->getType() == segNo->getType());
119        Value * const ready = iBuilder->CreateICmpEQ(segNo, processedSegmentCount);
120
121        if (kernel->hasNoTerminateAttribute()) {
122            iBuilder->CreateCondBr(ready, segmentLoopBody, segmentWait);
123        } else { // If the kernel was terminated in a previous segment then the pipeline is done.
124            BasicBlock * completionTest = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Completed", threadFunc, 0);
125            BasicBlock * exitBlock = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Exit", threadFunc, 0);
126            iBuilder->CreateCondBr(ready, completionTest, segmentWait);
127
128            iBuilder->SetInsertPoint(completionTest);
129            Value * terminationSignal = iBuilder->getTerminationSignal();
130            iBuilder->CreateCondBr(terminationSignal, exitBlock, segmentLoopBody);
131            iBuilder->SetInsertPoint(exitBlock);
132            // Ensure that the next thread will also exit.
133            iBuilder->releaseLogicalSegmentNo(nextSegNo);
134            iBuilder->CreateBr(exitThreadBlock);
135        }
136
137        // Execute the kernel segment
138        iBuilder->SetInsertPoint(segmentLoopBody);
139        const auto & inputs = kernel->getStreamInputs();
140        std::vector<Value *> args = {kernel->getInstance(), terminated};
141        for (unsigned i = 0; i < inputs.size(); ++i) {
142            const auto f = producedPos.find(kernel->getStreamSetInputBuffer(i));
143            assert (f != producedPos.end());
144            args.push_back(f->second);
145        }
146
147        iBuilder->setKernel(kernel);
148        iBuilder->createDoSegmentCall(args);
149        if (!kernel->hasNoTerminateAttribute()) {
150            terminated = iBuilder->CreateOr(terminated, iBuilder->getTerminationSignal());
151        }
152
153        const auto & outputs = kernel->getStreamOutputs();
154        for (unsigned i = 0; i < outputs.size(); ++i) {           
155            Value * const produced = iBuilder->getProducedItemCount(outputs[i].name, terminated);
156            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
157            assert (producedPos.count(buf) == 0);
158            producedPos.emplace(buf, produced);
159        }
160        for (unsigned i = 0; i < inputs.size(); ++i) {
161            Value * const processedItemCount = iBuilder->getProcessedItemCount(inputs[i].name);
162            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);           
163            auto f = consumedPos.find(buf);
164            if (f == consumedPos.end()) {
165                consumedPos.emplace(buf, processedItemCount);
166            } else {
167                Value * lesser = iBuilder->CreateICmpULT(processedItemCount, f->second);
168                f->second = iBuilder->CreateSelect(lesser, processedItemCount, f->second);
169            }
170        }
171        if (codegen::EnableCycleCounter) {
172            cycleCountEnd = iBuilder->CreateReadCycleCounter();
173            //Value * counterPtr = iBuilder->CreateGEP(mCycleCounts, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
174            Value * counterPtr = iBuilder->getScalarFieldPtr(Kernel::CYCLECOUNT_SCALAR);
175            iBuilder->CreateStore(iBuilder->CreateAdd(iBuilder->CreateLoad(counterPtr), iBuilder->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
176            cycleCountStart = cycleCountEnd;
177        }
178       
179        iBuilder->releaseLogicalSegmentNo(nextSegNo);
180    }
181
182    assert (segmentLoopBody);
183    exitThreadBlock->moveAfter(segmentLoopBody);
184
185    for (const auto consumed : consumedPos) {
186        const StreamSetBuffer * const buf = consumed.first;
187        Kernel * kernel = buf->getProducer();
188        const auto & outputs = kernel->getStreamSetOutputBuffers();
189        for (unsigned i = 0; i < outputs.size(); ++i) {
190            if (outputs[i] == buf) {
191                iBuilder->setKernel(kernel);
192                iBuilder->setConsumedItemCount(kernel->getStreamOutput(i).name, consumed.second);
193                break;
194            }
195        }
196    }
197
198    segNo->addIncoming(iBuilder->CreateAdd(segNo, iBuilder->getSize(codegen::ThreadNum)), segmentLoopBody);
199    iBuilder->CreateCondBr(terminated, exitThreadBlock, segmentLoop);
200
201    iBuilder->SetInsertPoint(exitThreadBlock);
202
203    // only call pthread_exit() within spawned threads; otherwise it'll be equivalent to calling exit() within the process
204    BasicBlock * const exitThread = BasicBlock::Create(iBuilder->getContext(), "ExitThread", threadFunc);
205    BasicBlock * const exitFunction = BasicBlock::Create(iBuilder->getContext(), "ExitProcessFunction", threadFunc);
206
207    Value * const exitCond = iBuilder->CreateICmpEQ(segOffset, ConstantInt::getNullValue(segOffset->getType()));
208    iBuilder->CreateCondBr(exitCond, exitFunction, exitThread);
209    iBuilder->SetInsertPoint(exitThread);
210    iBuilder->CreatePThreadExitCall(nullVoidPtrVal);
211    iBuilder->CreateBr(exitFunction);
212    iBuilder->SetInsertPoint(exitFunction);
213    iBuilder->CreateRetVoid();
214
215    // -------------------------------------------------------------------------------------------------------------------------
216    iBuilder->restoreIP(ip);
217
218    for (unsigned i = 0; i < n; ++i) {
219        kernels[i]->setInstance(instance[i]);
220    }
221
222    // -------------------------------------------------------------------------------------------------------------------------
223    // MAKE SEGMENT PARALLEL PIPELINE DRIVER
224    // -------------------------------------------------------------------------------------------------------------------------
225    const unsigned threads = codegen::ThreadNum - 1;
226    assert (codegen::ThreadNum > 1);
227    Type * const pthreadsTy = ArrayType::get(sizeTy, threads);
228    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
229    Value * threadIdPtr[threads];
230
231    for (unsigned i = 0; i < threads; ++i) {
232        threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
233    }
234
235    for (unsigned i = 0; i < n; ++i) {
236        iBuilder->setKernel(kernels[i]);
237        iBuilder->releaseLogicalSegmentNo(iBuilder->getSize(0));
238    }
239
240    AllocaInst * const sharedStruct = iBuilder->CreateCacheAlignedAlloca(sharedStructType);
241    for (unsigned i = 0; i < n; ++i) {
242        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
243        iBuilder->CreateStore(kernels[i]->getInstance(), ptr);
244    }
245
246    // use the process thread to handle the initial segment function after spawning (n - 1) threads to handle the subsequent offsets
247    for (unsigned i = 0; i < threads; ++i) {
248        AllocaInst * const threadState = iBuilder->CreateAlloca(threadStructType);
249        iBuilder->CreateStore(sharedStruct, iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
250        iBuilder->CreateStore(iBuilder->getSize(i + 1), iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
251        iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, threadFunc, threadState);
252    }
253
254    AllocaInst * const threadState = iBuilder->CreateAlloca(threadStructType);
255    iBuilder->CreateStore(sharedStruct, iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
256    iBuilder->CreateStore(iBuilder->getSize(0), iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
257    iBuilder->CreateCall(threadFunc, iBuilder->CreatePointerCast(threadState, voidPtrTy));
258
259    AllocaInst * const status = iBuilder->CreateAlloca(voidPtrTy);
260    for (unsigned i = 0; i < threads; ++i) {
261        Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]);
262        iBuilder->CreatePThreadJoinCall(threadId, status);
263    }
264   
265    if (codegen::EnableCycleCounter) {
266        for (unsigned k = 0; k < kernels.size(); k++) {
267            auto & kernel = kernels[k];
268            iBuilder->setKernel(kernel);
269            const auto & inputs = kernel->getStreamInputs();
270            const auto & outputs = kernel->getStreamOutputs();
271            Value * items = nullptr;
272            if (inputs.empty()) {
273                items = iBuilder->getProducedItemCount(outputs[0].name);
274            } else {
275                items = iBuilder->getProcessedItemCount(inputs[0].name);
276            }
277            Value * fItems = iBuilder->CreateUIToFP(items, iBuilder->getDoubleTy());
278            Value * cycles = iBuilder->CreateLoad(iBuilder->getScalarFieldPtr(Kernel::CYCLECOUNT_SCALAR));
279            Value * fCycles = iBuilder->CreateUIToFP(cycles, iBuilder->getDoubleTy());
280            std::string formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
281            Value * stringPtr = iBuilder->CreatePointerCast(iBuilder->GetString(formatString), iBuilder->getInt8PtrTy());
282            iBuilder->CreateCall(iBuilder->GetDprintf(), {iBuilder->getInt32(2), stringPtr, fItems, fCycles, iBuilder->CreateFDiv(fCycles, fItems)});
283        }
284    }
285   
286}
287
288
289/** ------------------------------------------------------------------------------------------------------------- *
290 * @brief generateParallelPipeline
291 ** ------------------------------------------------------------------------------------------------------------- */
292void generateParallelPipeline(const std::unique_ptr<KernelBuilder> & iBuilder, const std::vector<Kernel *> &kernels) {
293
294    Module * const m = iBuilder->getModule();
295    IntegerType * const sizeTy = iBuilder->getSizeTy();
296    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
297    ConstantInt * bufferSegments = ConstantInt::get(sizeTy, codegen::BufferSegments - 1);
298    ConstantInt * segmentItems = ConstantInt::get(sizeTy, codegen::SegmentSize * iBuilder->getBitBlockWidth());
299    Constant * const nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
300
301    const unsigned n = kernels.size();
302
303    Type * const pthreadsTy = ArrayType::get(sizeTy, n);
304    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
305    Value * threadIdPtr[n];
306    for (unsigned i = 0; i < n; ++i) {
307        threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
308    }
309
310    Value * instance[n];
311    Type * structTypes[n];
312    for (unsigned i = 0; i < n; ++i) {
313        instance[i] = kernels[i]->getInstance();
314        structTypes[i] = instance[i]->getType();
315    }
316
317    Type * const sharedStructType = StructType::get(m->getContext(), ArrayRef<Type *>{structTypes, n});
318
319
320    AllocaInst * sharedStruct = iBuilder->CreateAlloca(sharedStructType);
321    for (unsigned i = 0; i < n; ++i) {
322        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
323        iBuilder->CreateStore(instance[i], ptr);
324    }
325
326    for (auto & kernel : kernels) {
327        iBuilder->setKernel(kernel);
328        iBuilder->releaseLogicalSegmentNo(iBuilder->getSize(0));
329    }
330
331    // GENERATE THE PRODUCING AND CONSUMING KERNEL MAPS
332    StreamSetBufferMap<unsigned> producingKernel;
333    StreamSetBufferMap<std::vector<unsigned>> consumingKernels;
334    for (unsigned id = 0; id < n; ++id) {
335        const auto & kernel = kernels[id];
336        const auto & inputs = kernel->getStreamInputs();
337        const auto & outputs = kernel->getStreamOutputs();
338        // add any outputs from this kernel to the producing kernel map
339        for (unsigned j = 0; j < outputs.size(); ++j) {
340            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(j);
341            if (LLVM_UNLIKELY(producingKernel.count(buf) != 0)) {
342                report_fatal_error(kernel->getName() + " redefines stream set " + outputs[j].name);
343            }
344            producingKernel.emplace(buf, id);
345        }
346        // and any inputs to the consuming kernels list
347        for (unsigned j = 0; j < inputs.size(); ++j) {
348            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(j);
349            auto f = consumingKernels.find(buf);
350            if (f == consumingKernels.end()) {
351                if (LLVM_UNLIKELY(producingKernel.count(buf) == 0)) {
352                    report_fatal_error(kernel->getName() + " uses stream set " + inputs[j].name + " prior to its definition");
353                }
354                consumingKernels.emplace(buf, std::vector<unsigned>{ id });
355            } else {
356                f->second.push_back(id);
357            }
358        }
359    }
360
361    const auto ip = iBuilder->saveIP();
362
363    // GENERATE UNIQUE PIPELINE PARALLEL THREAD FUNCTION FOR EACH KERNEL
364    FlatSet<unsigned> kernelSet;
365    kernelSet.reserve(n);
366
367    Function * thread_functions[n];
368    Value * producerSegNo[n];
369    for (unsigned id = 0; id < n; id++) {
370        const auto & kernel = kernels[id];
371
372        iBuilder->setKernel(kernel);
373
374        const auto & inputs = kernel->getStreamInputs();
375
376        Function * const threadFunc = makeThreadFunction(iBuilder, "ppt:" + kernel->getName());
377
378         // Create the basic blocks for the thread function.
379        BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc);
380        BasicBlock * outputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "outputCheck", threadFunc);
381        BasicBlock * inputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "inputCheck", threadFunc);
382        BasicBlock * doSegmentBlock = BasicBlock::Create(iBuilder->getContext(), "doSegment", threadFunc);
383        BasicBlock * exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
384
385        iBuilder->SetInsertPoint(entryBlock);
386
387        Value * sharedStruct = iBuilder->CreateBitCast(&threadFunc->getArgumentList().front(), sharedStructType->getPointerTo());
388
389        for (unsigned k = 0; k < n; k++) {
390            Value * const ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
391            kernels[k]->setInstance(iBuilder->CreateLoad(ptr));
392        }
393
394        iBuilder->CreateBr(outputCheckBlock);
395
396        // Check whether the output buffers are ready for more data
397        iBuilder->SetInsertPoint(outputCheckBlock);
398        PHINode * segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 3, "segNo");
399        segNo->addIncoming(iBuilder->getSize(0), entryBlock);
400        segNo->addIncoming(segNo, outputCheckBlock);
401
402        Value * outputWaitCond = iBuilder->getTrue();
403        for (const StreamSetBuffer * buf : kernel->getStreamSetOutputBuffers()) {
404            const auto & list = consumingKernels[buf];
405            assert(std::is_sorted(list.begin(), list.end()));
406            kernelSet.insert(list.begin(), list.end());
407        }
408        for (unsigned k : kernelSet) {
409            iBuilder->setKernel(kernels[k]);
410            Value * consumerSegNo = iBuilder->acquireLogicalSegmentNo();
411            assert (consumerSegNo->getType() == segNo->getType());
412            Value * consumedSegNo = iBuilder->CreateAdd(consumerSegNo, bufferSegments);
413            outputWaitCond = iBuilder->CreateAnd(outputWaitCond, iBuilder->CreateICmpULE(segNo, consumedSegNo));
414        }
415        kernelSet.clear();
416        iBuilder->setKernel(kernel);
417        iBuilder->CreateCondBr(outputWaitCond, inputCheckBlock, outputCheckBlock);
418
419        // Check whether the input buffers have enough data for this kernel to begin
420        iBuilder->SetInsertPoint(inputCheckBlock);
421        for (const StreamSetBuffer * buf : kernel->getStreamSetInputBuffers()) {
422            kernelSet.insert(producingKernel[buf]);
423        }
424
425        Value * inputWaitCond = iBuilder->getTrue();
426        for (unsigned k : kernelSet) {
427            iBuilder->setKernel(kernels[k]);
428            producerSegNo[k] = iBuilder->acquireLogicalSegmentNo();
429            assert (producerSegNo[k]->getType() == segNo->getType());
430            inputWaitCond = iBuilder->CreateAnd(inputWaitCond, iBuilder->CreateICmpULT(segNo, producerSegNo[k]));
431        }
432        iBuilder->setKernel(kernel);
433        iBuilder->CreateCondBr(inputWaitCond, doSegmentBlock, inputCheckBlock);
434
435        // Process the segment
436        iBuilder->SetInsertPoint(doSegmentBlock);
437
438        Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
439        Value * terminated = nullptr;
440        if (kernelSet.empty()) {
441            // if this kernel has no input streams, the kernel itself must decide when it terminates.
442            terminated = iBuilder->getTerminationSignal();
443        } else {
444            // ... otherwise the kernel terminates only when it exhausts all of its input streams
445            terminated = iBuilder->getTrue();
446            for (unsigned k : kernelSet) {
447                iBuilder->setKernel(kernels[k]);
448                terminated = iBuilder->CreateAnd(terminated, iBuilder->getTerminationSignal());
449                terminated = iBuilder->CreateAnd(terminated, iBuilder->CreateICmpEQ(nextSegNo, producerSegNo[k]));
450            }
451            kernelSet.clear();
452            iBuilder->setKernel(kernel);
453        }
454
455        std::vector<Value *> args = {kernel->getInstance(), terminated};
456        args.insert(args.end(), inputs.size(), iBuilder->CreateMul(segmentItems, segNo));
457
458        iBuilder->createDoSegmentCall(args);
459        segNo->addIncoming(nextSegNo, doSegmentBlock);
460        iBuilder->releaseLogicalSegmentNo(nextSegNo);
461
462        iBuilder->CreateCondBr(terminated, exitThreadBlock, outputCheckBlock);
463
464        iBuilder->SetInsertPoint(exitThreadBlock);
465
466        iBuilder->CreatePThreadExitCall(nullVoidPtrVal);
467
468        iBuilder->CreateRetVoid();
469
470        thread_functions[id] = threadFunc;
471    }
472
473    iBuilder->restoreIP(ip);
474
475    for (unsigned i = 0; i < n; ++i) {
476        kernels[i]->setInstance(instance[i]);
477    }
478
479    for (unsigned i = 0; i < n; ++i) {
480        iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, thread_functions[i], sharedStruct);
481    }
482
483    AllocaInst * const status = iBuilder->CreateAlloca(voidPtrTy);
484    for (unsigned i = 0; i < n; ++i) {
485        Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]);
486        iBuilder->CreatePThreadJoinCall(threadId, status);
487    }
488}
489
490/** ------------------------------------------------------------------------------------------------------------- *
491 * @brief generatePipelineLoop
492 ** ------------------------------------------------------------------------------------------------------------- */
493void generatePipelineLoop(const std::unique_ptr<KernelBuilder> & iBuilder, const std::vector<Kernel *> & kernels) {
494
495    BasicBlock * entryBlock = iBuilder->GetInsertBlock();
496    Function * main = entryBlock->getParent();
497
498    // Create the basic blocks for the loop.
499    BasicBlock * pipelineLoop = BasicBlock::Create(iBuilder->getContext(), "pipelineLoop", main);
500    BasicBlock * pipelineExit = BasicBlock::Create(iBuilder->getContext(), "pipelineExit", main);
501
502    StreamSetBufferMap<Value *> producedPos;
503    StreamSetBufferMap<Value *> consumedPos;
504
505    iBuilder->CreateBr(pipelineLoop);
506    iBuilder->SetInsertPoint(pipelineLoop);
507   
508    Value * cycleCountStart = nullptr;
509    Value * cycleCountEnd = nullptr;
510    if (codegen::EnableCycleCounter) {
511        cycleCountStart = iBuilder->CreateReadCycleCounter();
512    }
513    Value * terminated = iBuilder->getFalse();
514    for (unsigned k = 0; k < kernels.size(); k++) {
515
516        auto & kernel = kernels[k];
517
518        iBuilder->setKernel(kernel);
519        const auto & inputs = kernel->getStreamInputs();
520        const auto & outputs = kernel->getStreamOutputs();
521
522        std::vector<Value *> args = {kernel->getInstance(), terminated};
523        for (unsigned i = 0; i < inputs.size(); ++i) {
524            const auto f = producedPos.find(kernel->getStreamSetInputBuffer(i));
525            if (LLVM_UNLIKELY(f == producedPos.end())) {
526                report_fatal_error(kernel->getName() + " uses stream set " + inputs[i].name + " prior to its definition");
527            }
528            args.push_back(f->second);
529        }
530
531        iBuilder->createDoSegmentCall(args);
532        if (!kernel->hasNoTerminateAttribute()) {
533            Value * terminatedSignal = iBuilder->getTerminationSignal();
534            terminated = iBuilder->CreateOr(terminated, terminatedSignal);
535        }
536        for (unsigned i = 0; i < outputs.size(); ++i) {
537            Value * const produced = iBuilder->getProducedItemCount(outputs[i].name, terminated);
538            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
539            assert (producedPos.count(buf) == 0);
540            producedPos.emplace(buf, produced);
541        }
542
543        for (unsigned i = 0; i < inputs.size(); ++i) {
544            Value * const processedItemCount = iBuilder->getProcessedItemCount(inputs[i].name);
545            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);
546            auto f = consumedPos.find(buf);
547            if (f == consumedPos.end()) {
548                consumedPos.emplace(buf, processedItemCount);
549            } else {
550                Value * lesser = iBuilder->CreateICmpULT(processedItemCount, f->second);
551                f->second = iBuilder->CreateSelect(lesser, processedItemCount, f->second);
552            }
553        }
554        if (codegen::EnableCycleCounter) {
555            cycleCountEnd = iBuilder->CreateReadCycleCounter();
556            //Value * counterPtr = iBuilder->CreateGEP(mCycleCounts, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
557            Value * counterPtr = iBuilder->getScalarFieldPtr(Kernel::CYCLECOUNT_SCALAR);
558            iBuilder->CreateStore(iBuilder->CreateAdd(iBuilder->CreateLoad(counterPtr), iBuilder->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
559            cycleCountStart = cycleCountEnd;
560        }
561
562        Value * const segNo = iBuilder->acquireLogicalSegmentNo();
563        Value * nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
564        iBuilder->releaseLogicalSegmentNo(nextSegNo);
565    }
566
567    for (const auto consumed : consumedPos) {
568        const StreamSetBuffer * const buf = consumed.first;
569        Kernel * k = buf->getProducer();
570        const auto & outputs = k->getStreamSetOutputBuffers();
571        for (unsigned i = 0; i < outputs.size(); ++i) {
572            if (outputs[i] == buf) {
573                iBuilder->setKernel(k);
574                iBuilder->setConsumedItemCount(k->getStreamOutput(i).name, consumed.second);
575                break;
576            }
577        }
578    }
579
580    iBuilder->CreateCondBr(terminated, pipelineExit, pipelineLoop);
581    iBuilder->SetInsertPoint(pipelineExit);
582    if (codegen::EnableCycleCounter) {
583        for (unsigned k = 0; k < kernels.size(); k++) {
584            auto & kernel = kernels[k];
585            iBuilder->setKernel(kernel);
586            const auto & inputs = kernel->getStreamInputs();
587            const auto & outputs = kernel->getStreamOutputs();
588            Value * items = nullptr;
589            if (inputs.empty()) {
590                items = iBuilder->getProducedItemCount(outputs[0].name);
591            } else {
592                items = iBuilder->getProcessedItemCount(inputs[0].name);
593            }
594            Value * fItems = iBuilder->CreateUIToFP(items, iBuilder->getDoubleTy());
595            Value * cycles = iBuilder->CreateLoad(iBuilder->getScalarFieldPtr(Kernel::CYCLECOUNT_SCALAR));
596            Value * fCycles = iBuilder->CreateUIToFP(cycles, iBuilder->getDoubleTy());
597            std::string formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
598            Value * stringPtr = iBuilder->CreatePointerCast(iBuilder->GetString(formatString), iBuilder->getInt8PtrTy());
599            iBuilder->CreateCall(iBuilder->GetDprintf(), {iBuilder->getInt32(2), stringPtr, fItems, fCycles, iBuilder->CreateFDiv(fCycles, fItems)});
600        }
601    }
602}
Note: See TracBrowser for help on using the repository browser.