source: icGREP/icgrep-devel/icgrep/toolchain/pipeline.cpp @ 5464

Last change on this file since 5464 was 5464, checked in by nmedfort, 2 years ago

Restructuring work for the Driver classes. Start of work to eliminate the memory leaks with the ExecutionEngine?. Replaced custom AlignedMalloc? with backend call to std::aligned_malloc. Salvaged some work on DistributionPass? for reevaluation.

File size: 28.5 KB
Line 
1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
6#include "pipeline.h"
7#include <toolchain/toolchain.h>
8#include <kernels/kernel.h>
9#include <kernels/streamset.h>
10#include <llvm/IR/Module.h>
11#include <boost/container/flat_set.hpp>
12#include <boost/container/flat_map.hpp>
13#include <kernels/kernel_builder.h>
14
15#include <llvm/Support/raw_ostream.h>
16
17using namespace kernel;
18using namespace parabix;
19using namespace llvm;
20
21
22template <typename Value>
23using StreamSetBufferMap = boost::container::flat_map<const StreamSetBuffer *, Value>;
24
25template <typename Value>
26using FlatSet = boost::container::flat_set<Value>;
27
28Function * makeThreadFunction(const std::unique_ptr<kernel::KernelBuilder> & b, const std::string & name) {
29    Function * const f = Function::Create(FunctionType::get(b->getVoidTy(), {b->getVoidPtrTy()}, false), Function::InternalLinkage, name, b->getModule());
30    f->setCallingConv(CallingConv::C);
31    f->arg_begin()->setName("input");
32    return f;
33}
34
35/** ------------------------------------------------------------------------------------------------------------- *
36 * @brief generateSegmentParallelPipeline
37 *
38 * Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
39 * operating over an input stream set S, a segment-parallel implementation divides the input
40 * into segments and coordinates a set of T <= K threads to each process one segment at a time.
41 * Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
42 * fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
43 ** ------------------------------------------------------------------------------------------------------------- */
44void generateSegmentParallelPipeline(const std::unique_ptr<KernelBuilder> & iBuilder, const std::vector<Kernel *> & kernels) {
45
46    const unsigned n = kernels.size();
47    Module * const m = iBuilder->getModule();
48    IntegerType * const sizeTy = iBuilder->getSizeTy();
49    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
50    Constant * nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
51    std::vector<Type *> structTypes;
52
53    Value * instance[n];
54    for (unsigned i = 0; i < n; ++i) {
55        instance[i] = kernels[i]->getInstance();
56        structTypes.push_back(instance[i]->getType());
57    }
58    StructType * const sharedStructType = StructType::get(m->getContext(), structTypes);
59    StructType * const threadStructType = StructType::get(sharedStructType->getPointerTo(), sizeTy, nullptr);
60
61    Function * const threadFunc = makeThreadFunction(iBuilder, "segment");
62
63    // -------------------------------------------------------------------------------------------------------------------------
64    // MAKE SEGMENT PARALLEL PIPELINE THREAD
65    // -------------------------------------------------------------------------------------------------------------------------
66    const auto ip = iBuilder->saveIP();
67
68     // Create the basic blocks for the thread function.
69    BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc);
70    iBuilder->SetInsertPoint(entryBlock);
71    Value * const input = &threadFunc->getArgumentList().front();
72    Value * const threadStruct = iBuilder->CreatePointerCast(input, threadStructType->getPointerTo());
73    Value * const sharedStatePtr = iBuilder->CreateLoad(iBuilder->CreateGEP(threadStruct, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
74    for (unsigned k = 0; k < n; ++k) {
75        Value * ptr = iBuilder->CreateLoad(iBuilder->CreateGEP(sharedStatePtr, {iBuilder->getInt32(0), iBuilder->getInt32(k)}));
76        kernels[k]->setInstance(ptr);
77    }
78    Value * const segOffset = iBuilder->CreateLoad(iBuilder->CreateGEP(threadStruct, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
79
80
81
82
83    BasicBlock * segmentLoop = BasicBlock::Create(iBuilder->getContext(), "segmentLoop", threadFunc);
84    iBuilder->CreateBr(segmentLoop);
85
86    iBuilder->SetInsertPoint(segmentLoop);
87    PHINode * const segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 2, "segNo");
88    segNo->addIncoming(segOffset, entryBlock);
89
90    Value * terminated = iBuilder->getFalse();
91    Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
92
93    BasicBlock * segmentLoopBody = nullptr;
94    BasicBlock * const exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
95
96    StreamSetBufferMap<Value *> producedPos;
97    StreamSetBufferMap<Value *> consumedPos;
98
99    Value * cycleCountStart = nullptr;
100    Value * cycleCountEnd = nullptr;
101    if (codegen::EnableCycleCounter) {
102        cycleCountStart = iBuilder->CreateReadCycleCounter();
103    }
104
105    for (unsigned k = 0; k < n; ++k) {
106
107        const auto & kernel = kernels[k];
108
109        BasicBlock * const segmentWait = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Wait", threadFunc);
110        iBuilder->CreateBr(segmentWait);
111
112        segmentLoopBody = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Do", threadFunc);
113
114        iBuilder->SetInsertPoint(segmentWait);
115        const unsigned waitIdx = codegen::DebugOptionIsSet(codegen::SerializeThreads) ? (n - 1) : k;
116
117        iBuilder->setKernel(kernels[waitIdx]);
118        Value * const processedSegmentCount = iBuilder->acquireLogicalSegmentNo();
119        iBuilder->setKernel(kernel);
120
121        assert (processedSegmentCount->getType() == segNo->getType());
122        Value * const ready = iBuilder->CreateICmpEQ(segNo, processedSegmentCount);
123
124        if (kernel->hasNoTerminateAttribute()) {
125            iBuilder->CreateCondBr(ready, segmentLoopBody, segmentWait);
126        } else { // If the kernel was terminated in a previous segment then the pipeline is done.
127            BasicBlock * completionTest = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Completed", threadFunc, 0);
128            BasicBlock * exitBlock = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Exit", threadFunc, 0);
129            iBuilder->CreateCondBr(ready, completionTest, segmentWait);
130
131            iBuilder->SetInsertPoint(completionTest);
132            Value * terminationSignal = iBuilder->getTerminationSignal();
133            iBuilder->CreateCondBr(terminationSignal, exitBlock, segmentLoopBody);
134            iBuilder->SetInsertPoint(exitBlock);
135            // Ensure that the next thread will also exit.
136            iBuilder->releaseLogicalSegmentNo(nextSegNo);
137            iBuilder->CreateBr(exitThreadBlock);
138        }
139
140        // Execute the kernel segment
141        iBuilder->SetInsertPoint(segmentLoopBody);
142        const auto & inputs = kernel->getStreamInputs();
143        std::vector<Value *> args = {kernel->getInstance(), terminated};
144        for (unsigned i = 0; i < inputs.size(); ++i) {
145            const auto f = producedPos.find(kernel->getStreamSetInputBuffer(i));
146            assert (f != producedPos.end());
147            args.push_back(f->second);
148        }
149
150        iBuilder->setKernel(kernel);
151        iBuilder->createDoSegmentCall(args);
152        if (!kernel->hasNoTerminateAttribute()) {
153            terminated = iBuilder->CreateOr(terminated, iBuilder->getTerminationSignal());
154        }
155
156        const auto & outputs = kernel->getStreamOutputs();
157        for (unsigned i = 0; i < outputs.size(); ++i) {           
158            Value * const produced = iBuilder->getProducedItemCount(outputs[i].name, terminated);
159            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
160            assert (producedPos.count(buf) == 0);
161            producedPos.emplace(buf, produced);
162        }
163        for (unsigned i = 0; i < inputs.size(); ++i) {
164            Value * const processedItemCount = iBuilder->getProcessedItemCount(inputs[i].name);
165            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);           
166            auto f = consumedPos.find(buf);
167            if (f == consumedPos.end()) {
168                consumedPos.emplace(buf, processedItemCount);
169            } else {
170                Value * lesser = iBuilder->CreateICmpULT(processedItemCount, f->second);
171                f->second = iBuilder->CreateSelect(lesser, processedItemCount, f->second);
172            }
173        }
174        if (codegen::EnableCycleCounter) {
175            cycleCountEnd = iBuilder->CreateReadCycleCounter();
176            //Value * counterPtr = iBuilder->CreateGEP(mCycleCounts, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
177            Value * counterPtr = iBuilder->getScalarFieldPtr(Kernel::CYCLECOUNT_SCALAR);
178            iBuilder->CreateStore(iBuilder->CreateAdd(iBuilder->CreateLoad(counterPtr), iBuilder->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
179            cycleCountStart = cycleCountEnd;
180        }
181       
182        iBuilder->releaseLogicalSegmentNo(nextSegNo);
183    }
184
185    assert (segmentLoopBody);
186    exitThreadBlock->moveAfter(segmentLoopBody);
187
188    for (const auto consumed : consumedPos) {
189        const StreamSetBuffer * const buf = consumed.first;
190        Kernel * kernel = buf->getProducer();
191        const auto & outputs = kernel->getStreamSetOutputBuffers();
192        for (unsigned i = 0; i < outputs.size(); ++i) {
193            if (outputs[i] == buf) {
194                iBuilder->setKernel(kernel);
195                iBuilder->setConsumedItemCount(kernel->getStreamOutput(i).name, consumed.second);
196                break;
197            }
198        }
199    }
200
201    segNo->addIncoming(iBuilder->CreateAdd(segNo, iBuilder->getSize(codegen::ThreadNum)), segmentLoopBody);
202    iBuilder->CreateCondBr(terminated, exitThreadBlock, segmentLoop);
203
204    iBuilder->SetInsertPoint(exitThreadBlock);
205
206    // only call pthread_exit() within spawned threads; otherwise it'll be equivalent to calling exit() within the process
207    BasicBlock * const exitThread = BasicBlock::Create(iBuilder->getContext(), "ExitThread", threadFunc);
208    BasicBlock * const exitFunction = BasicBlock::Create(iBuilder->getContext(), "ExitProcessFunction", threadFunc);
209
210    Value * const exitCond = iBuilder->CreateICmpEQ(segOffset, ConstantInt::getNullValue(segOffset->getType()));
211    iBuilder->CreateCondBr(exitCond, exitFunction, exitThread);
212    iBuilder->SetInsertPoint(exitThread);
213    iBuilder->CreatePThreadExitCall(nullVoidPtrVal);
214    iBuilder->CreateBr(exitFunction);
215    iBuilder->SetInsertPoint(exitFunction);
216    iBuilder->CreateRetVoid();
217
218    // -------------------------------------------------------------------------------------------------------------------------
219    iBuilder->restoreIP(ip);
220
221    for (unsigned i = 0; i < n; ++i) {
222        kernels[i]->setInstance(instance[i]);
223    }
224
225    // -------------------------------------------------------------------------------------------------------------------------
226    // MAKE SEGMENT PARALLEL PIPELINE DRIVER
227    // -------------------------------------------------------------------------------------------------------------------------
228    const unsigned threads = codegen::ThreadNum - 1;
229    assert (codegen::ThreadNum > 1);
230    Type * const pthreadsTy = ArrayType::get(sizeTy, threads);
231    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
232    Value * threadIdPtr[threads];
233
234    for (unsigned i = 0; i < threads; ++i) {
235        threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
236    }
237
238    for (unsigned i = 0; i < n; ++i) {
239        iBuilder->setKernel(kernels[i]);
240        iBuilder->releaseLogicalSegmentNo(iBuilder->getSize(0));
241    }
242
243    AllocaInst * const sharedStruct = iBuilder->CreateCacheAlignedAlloca(sharedStructType);
244    for (unsigned i = 0; i < n; ++i) {
245        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
246        iBuilder->CreateStore(kernels[i]->getInstance(), ptr);
247    }
248
249    // use the process thread to handle the initial segment function after spawning (n - 1) threads to handle the subsequent offsets
250    for (unsigned i = 0; i < threads; ++i) {
251        AllocaInst * const threadState = iBuilder->CreateAlloca(threadStructType);
252        iBuilder->CreateStore(sharedStruct, iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
253        iBuilder->CreateStore(iBuilder->getSize(i + 1), iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
254        iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, threadFunc, threadState);
255    }
256
257    AllocaInst * const threadState = iBuilder->CreateAlloca(threadStructType);
258    iBuilder->CreateStore(sharedStruct, iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
259    iBuilder->CreateStore(iBuilder->getSize(0), iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
260    iBuilder->CreateCall(threadFunc, iBuilder->CreatePointerCast(threadState, voidPtrTy));
261
262    AllocaInst * const status = iBuilder->CreateAlloca(voidPtrTy);
263    for (unsigned i = 0; i < threads; ++i) {
264        Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]);
265        iBuilder->CreatePThreadJoinCall(threadId, status);
266    }
267   
268    if (codegen::EnableCycleCounter) {
269        for (unsigned k = 0; k < kernels.size(); k++) {
270            auto & kernel = kernels[k];
271            iBuilder->setKernel(kernel);
272            const auto & inputs = kernel->getStreamInputs();
273            const auto & outputs = kernel->getStreamOutputs();
274            Value * items = nullptr;
275            if (inputs.empty()) {
276                items = iBuilder->getProducedItemCount(outputs[0].name);
277            } else {
278                items = iBuilder->getProcessedItemCount(inputs[0].name);
279            }
280            Value * fItems = iBuilder->CreateUIToFP(items, iBuilder->getDoubleTy());
281            Value * cycles = iBuilder->CreateLoad(iBuilder->getScalarFieldPtr(Kernel::CYCLECOUNT_SCALAR));
282            Value * fCycles = iBuilder->CreateUIToFP(cycles, iBuilder->getDoubleTy());
283            std::string formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
284            Value * stringPtr = iBuilder->CreatePointerCast(iBuilder->GetString(formatString), iBuilder->getInt8PtrTy());
285            iBuilder->CreateCall(iBuilder->GetDprintf(), {iBuilder->getInt32(2), stringPtr, fItems, fCycles, iBuilder->CreateFDiv(fCycles, fItems)});
286        }
287    }
288   
289}
290
291
292/** ------------------------------------------------------------------------------------------------------------- *
293 * @brief generateParallelPipeline
294 ** ------------------------------------------------------------------------------------------------------------- */
295void generateParallelPipeline(const std::unique_ptr<KernelBuilder> & iBuilder, const std::vector<Kernel *> &kernels) {
296
297    Module * const m = iBuilder->getModule();
298    IntegerType * const sizeTy = iBuilder->getSizeTy();
299    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
300    ConstantInt * bufferSegments = ConstantInt::get(sizeTy, codegen::BufferSegments - 1);
301    ConstantInt * segmentItems = ConstantInt::get(sizeTy, codegen::SegmentSize * iBuilder->getBitBlockWidth());
302    Constant * const nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
303
304    const unsigned n = kernels.size();
305
306    Type * const pthreadsTy = ArrayType::get(sizeTy, n);
307    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
308    Value * threadIdPtr[n];
309    for (unsigned i = 0; i < n; ++i) {
310        threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
311    }
312
313    Value * instance[n];
314    Type * structTypes[n];
315    for (unsigned i = 0; i < n; ++i) {
316        instance[i] = kernels[i]->getInstance();
317        structTypes[i] = instance[i]->getType();
318    }
319
320    Type * const sharedStructType = StructType::get(m->getContext(), ArrayRef<Type *>{structTypes, n});
321
322
323    AllocaInst * sharedStruct = iBuilder->CreateAlloca(sharedStructType);
324    for (unsigned i = 0; i < n; ++i) {
325        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
326        iBuilder->CreateStore(instance[i], ptr);
327    }
328
329    for (auto & kernel : kernels) {
330        iBuilder->setKernel(kernel);
331        iBuilder->releaseLogicalSegmentNo(iBuilder->getSize(0));
332    }
333
334    // GENERATE THE PRODUCING AND CONSUMING KERNEL MAPS
335    StreamSetBufferMap<unsigned> producingKernel;
336    StreamSetBufferMap<std::vector<unsigned>> consumingKernels;
337    for (unsigned id = 0; id < n; ++id) {
338        const auto & kernel = kernels[id];
339        const auto & inputs = kernel->getStreamInputs();
340        const auto & outputs = kernel->getStreamOutputs();
341        // add any outputs from this kernel to the producing kernel map
342        for (unsigned j = 0; j < outputs.size(); ++j) {
343            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(j);
344            if (LLVM_UNLIKELY(producingKernel.count(buf) != 0)) {
345                report_fatal_error(kernel->getName() + " redefines stream set " + outputs[j].name);
346            }
347            producingKernel.emplace(buf, id);
348        }
349        // and any inputs to the consuming kernels list
350        for (unsigned j = 0; j < inputs.size(); ++j) {
351            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(j);
352            auto f = consumingKernels.find(buf);
353            if (f == consumingKernels.end()) {
354                if (LLVM_UNLIKELY(producingKernel.count(buf) == 0)) {
355                    report_fatal_error(kernel->getName() + " uses stream set " + inputs[j].name + " prior to its definition");
356                }
357                consumingKernels.emplace(buf, std::vector<unsigned>{ id });
358            } else {
359                f->second.push_back(id);
360            }
361        }
362    }
363
364    const auto ip = iBuilder->saveIP();
365
366    // GENERATE UNIQUE PIPELINE PARALLEL THREAD FUNCTION FOR EACH KERNEL
367    FlatSet<unsigned> kernelSet;
368    kernelSet.reserve(n);
369
370    Function * thread_functions[n];
371    Value * producerSegNo[n];
372    for (unsigned id = 0; id < n; id++) {
373        const auto & kernel = kernels[id];
374
375        iBuilder->setKernel(kernel);
376
377        const auto & inputs = kernel->getStreamInputs();
378
379        Function * const threadFunc = makeThreadFunction(iBuilder, "ppt:" + kernel->getName());
380
381         // Create the basic blocks for the thread function.
382        BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc);
383        BasicBlock * outputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "outputCheck", threadFunc);
384        BasicBlock * inputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "inputCheck", threadFunc);
385        BasicBlock * doSegmentBlock = BasicBlock::Create(iBuilder->getContext(), "doSegment", threadFunc);
386        BasicBlock * exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
387
388        iBuilder->SetInsertPoint(entryBlock);
389
390        Value * sharedStruct = iBuilder->CreateBitCast(&threadFunc->getArgumentList().front(), sharedStructType->getPointerTo());
391
392        for (unsigned k = 0; k < n; k++) {
393            Value * const ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
394            kernels[k]->setInstance(iBuilder->CreateLoad(ptr));
395        }
396
397        iBuilder->CreateBr(outputCheckBlock);
398
399        // Check whether the output buffers are ready for more data
400        iBuilder->SetInsertPoint(outputCheckBlock);
401        PHINode * segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 3, "segNo");
402        segNo->addIncoming(iBuilder->getSize(0), entryBlock);
403        segNo->addIncoming(segNo, outputCheckBlock);
404
405        Value * outputWaitCond = iBuilder->getTrue();
406        for (const StreamSetBuffer * buf : kernel->getStreamSetOutputBuffers()) {
407            const auto & list = consumingKernels[buf];
408            assert(std::is_sorted(list.begin(), list.end()));
409            kernelSet.insert(list.begin(), list.end());
410        }
411        for (unsigned k : kernelSet) {
412            iBuilder->setKernel(kernels[k]);
413            Value * consumerSegNo = iBuilder->acquireLogicalSegmentNo();
414            assert (consumerSegNo->getType() == segNo->getType());
415            Value * consumedSegNo = iBuilder->CreateAdd(consumerSegNo, bufferSegments);
416            outputWaitCond = iBuilder->CreateAnd(outputWaitCond, iBuilder->CreateICmpULE(segNo, consumedSegNo));
417        }
418        kernelSet.clear();
419        iBuilder->setKernel(kernel);
420        iBuilder->CreateCondBr(outputWaitCond, inputCheckBlock, outputCheckBlock);
421
422        // Check whether the input buffers have enough data for this kernel to begin
423        iBuilder->SetInsertPoint(inputCheckBlock);
424        for (const StreamSetBuffer * buf : kernel->getStreamSetInputBuffers()) {
425            kernelSet.insert(producingKernel[buf]);
426        }
427
428        Value * inputWaitCond = iBuilder->getTrue();
429        for (unsigned k : kernelSet) {
430            iBuilder->setKernel(kernels[k]);
431            producerSegNo[k] = iBuilder->acquireLogicalSegmentNo();
432            assert (producerSegNo[k]->getType() == segNo->getType());
433            inputWaitCond = iBuilder->CreateAnd(inputWaitCond, iBuilder->CreateICmpULT(segNo, producerSegNo[k]));
434        }
435        iBuilder->setKernel(kernel);
436        iBuilder->CreateCondBr(inputWaitCond, doSegmentBlock, inputCheckBlock);
437
438        // Process the segment
439        iBuilder->SetInsertPoint(doSegmentBlock);
440
441        Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
442        Value * terminated = nullptr;
443        if (kernelSet.empty()) {
444            // if this kernel has no input streams, the kernel itself must decide when it terminates.
445            terminated = iBuilder->getTerminationSignal();
446        } else {
447            // ... otherwise the kernel terminates only when it exhausts all of its input streams
448            terminated = iBuilder->getTrue();
449            for (unsigned k : kernelSet) {
450                iBuilder->setKernel(kernels[k]);
451                terminated = iBuilder->CreateAnd(terminated, iBuilder->getTerminationSignal());
452                terminated = iBuilder->CreateAnd(terminated, iBuilder->CreateICmpEQ(nextSegNo, producerSegNo[k]));
453            }
454            kernelSet.clear();
455            iBuilder->setKernel(kernel);
456        }
457
458        std::vector<Value *> args = {kernel->getInstance(), terminated};
459        args.insert(args.end(), inputs.size(), iBuilder->CreateMul(segmentItems, segNo));
460
461        iBuilder->createDoSegmentCall(args);
462        segNo->addIncoming(nextSegNo, doSegmentBlock);
463        iBuilder->releaseLogicalSegmentNo(nextSegNo);
464
465        iBuilder->CreateCondBr(terminated, exitThreadBlock, outputCheckBlock);
466
467        iBuilder->SetInsertPoint(exitThreadBlock);
468
469        iBuilder->CreatePThreadExitCall(nullVoidPtrVal);
470
471        iBuilder->CreateRetVoid();
472
473        thread_functions[id] = threadFunc;
474    }
475
476    iBuilder->restoreIP(ip);
477
478    for (unsigned i = 0; i < n; ++i) {
479        kernels[i]->setInstance(instance[i]);
480    }
481
482    for (unsigned i = 0; i < n; ++i) {
483        iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, thread_functions[i], sharedStruct);
484    }
485
486    AllocaInst * const status = iBuilder->CreateAlloca(voidPtrTy);
487    for (unsigned i = 0; i < n; ++i) {
488        Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]);
489        iBuilder->CreatePThreadJoinCall(threadId, status);
490    }
491}
492
493/** ------------------------------------------------------------------------------------------------------------- *
494 * @brief generatePipelineLoop
495 ** ------------------------------------------------------------------------------------------------------------- */
496void generatePipelineLoop(const std::unique_ptr<KernelBuilder> & iBuilder, const std::vector<Kernel *> & kernels) {
497
498    BasicBlock * entryBlock = iBuilder->GetInsertBlock();
499    Function * main = entryBlock->getParent();
500
501    // Create the basic blocks for the loop.
502    BasicBlock * pipelineLoop = BasicBlock::Create(iBuilder->getContext(), "pipelineLoop", main);
503    BasicBlock * pipelineExit = BasicBlock::Create(iBuilder->getContext(), "pipelineExit", main);
504
505    StreamSetBufferMap<Value *> producedPos;
506    StreamSetBufferMap<Value *> consumedPos;
507
508    iBuilder->CreateBr(pipelineLoop);
509    iBuilder->SetInsertPoint(pipelineLoop);
510   
511    Value * cycleCountStart = nullptr;
512    Value * cycleCountEnd = nullptr;
513    if (codegen::EnableCycleCounter) {
514        cycleCountStart = iBuilder->CreateReadCycleCounter();
515    }
516    Value * terminated = iBuilder->getFalse();
517    for (unsigned k = 0; k < kernels.size(); k++) {
518
519        auto & kernel = kernels[k];
520
521        iBuilder->setKernel(kernel);
522        const auto & inputs = kernel->getStreamInputs();
523        const auto & outputs = kernel->getStreamOutputs();
524
525        std::vector<Value *> args = {kernel->getInstance(), terminated};
526        for (unsigned i = 0; i < inputs.size(); ++i) {
527            const auto f = producedPos.find(kernel->getStreamSetInputBuffer(i));
528            if (LLVM_UNLIKELY(f == producedPos.end())) {
529                report_fatal_error(kernel->getName() + " uses stream set " + inputs[i].name + " prior to its definition");
530            }
531            args.push_back(f->second);
532        }
533
534        iBuilder->createDoSegmentCall(args);
535        if (!kernel->hasNoTerminateAttribute()) {
536            Value * terminatedSignal = iBuilder->getTerminationSignal();
537            terminated = iBuilder->CreateOr(terminated, terminatedSignal);
538        }
539        for (unsigned i = 0; i < outputs.size(); ++i) {
540            Value * const produced = iBuilder->getProducedItemCount(outputs[i].name, terminated);
541            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
542            assert (producedPos.count(buf) == 0);
543            producedPos.emplace(buf, produced);
544        }
545
546        for (unsigned i = 0; i < inputs.size(); ++i) {
547            Value * const processedItemCount = iBuilder->getProcessedItemCount(inputs[i].name);
548            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);
549            auto f = consumedPos.find(buf);
550            if (f == consumedPos.end()) {
551                consumedPos.emplace(buf, processedItemCount);
552            } else {
553                Value * lesser = iBuilder->CreateICmpULT(processedItemCount, f->second);
554                f->second = iBuilder->CreateSelect(lesser, processedItemCount, f->second);
555            }
556        }
557        if (codegen::EnableCycleCounter) {
558            cycleCountEnd = iBuilder->CreateReadCycleCounter();
559            //Value * counterPtr = iBuilder->CreateGEP(mCycleCounts, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
560            Value * counterPtr = iBuilder->getScalarFieldPtr(Kernel::CYCLECOUNT_SCALAR);
561            iBuilder->CreateStore(iBuilder->CreateAdd(iBuilder->CreateLoad(counterPtr), iBuilder->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
562            cycleCountStart = cycleCountEnd;
563        }
564
565        Value * const segNo = iBuilder->acquireLogicalSegmentNo();
566        Value * nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
567        iBuilder->releaseLogicalSegmentNo(nextSegNo);
568    }
569
570    for (const auto consumed : consumedPos) {
571        const StreamSetBuffer * const buf = consumed.first;
572        Kernel * k = buf->getProducer();
573        const auto & outputs = k->getStreamSetOutputBuffers();
574        for (unsigned i = 0; i < outputs.size(); ++i) {
575            if (outputs[i] == buf) {
576                iBuilder->setKernel(k);
577                iBuilder->setConsumedItemCount(k->getStreamOutput(i).name, consumed.second);
578                break;
579            }
580        }
581    }
582
583    iBuilder->CreateCondBr(terminated, pipelineExit, pipelineLoop);
584    iBuilder->SetInsertPoint(pipelineExit);
585    if (codegen::EnableCycleCounter) {
586        for (unsigned k = 0; k < kernels.size(); k++) {
587            auto & kernel = kernels[k];
588            iBuilder->setKernel(kernel);
589            const auto & inputs = kernel->getStreamInputs();
590            const auto & outputs = kernel->getStreamOutputs();
591            Value * items = nullptr;
592            if (inputs.empty()) {
593                items = iBuilder->getProducedItemCount(outputs[0].name);
594            } else {
595                items = iBuilder->getProcessedItemCount(inputs[0].name);
596            }
597            Value * fItems = iBuilder->CreateUIToFP(items, iBuilder->getDoubleTy());
598            Value * cycles = iBuilder->CreateLoad(iBuilder->getScalarFieldPtr(Kernel::CYCLECOUNT_SCALAR));
599            Value * fCycles = iBuilder->CreateUIToFP(cycles, iBuilder->getDoubleTy());
600            std::string formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
601            Value * stringPtr = iBuilder->CreatePointerCast(iBuilder->GetString(formatString), iBuilder->getInt8PtrTy());
602            iBuilder->CreateCall(iBuilder->GetDprintf(), {iBuilder->getInt32(2), stringPtr, fItems, fCycles, iBuilder->CreateFDiv(fCycles, fItems)});
603        }
604    }
605}
Note: See TracBrowser for help on using the repository browser.