source: icGREP/icgrep-devel/icgrep/toolchain/pipeline.cpp @ 5733

Last change on this file since 5733 was 5733, checked in by cameron, 19 months ago

Changes for compatibility with LLVM 5.0.0

File size: 31.5 KB
Line 
1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
6#include "pipeline.h"
7#include <toolchain/toolchain.h>
8#include <kernels/kernel.h>
9#include <kernels/streamset.h>
10#include <llvm/IR/Module.h>
11#include <boost/container/flat_set.hpp>
12#include <boost/container/flat_map.hpp>
13#include <kernels/kernel_builder.h>
14
15using namespace kernel;
16using namespace parabix;
17using namespace llvm;
18
19using Port = Kernel::Port;
20
21template <typename Value>
22using StreamSetBufferMap = boost::container::flat_map<const StreamSetBuffer *, Value>;
23
24template <typename Value>
25using FlatSet = boost::container::flat_set<Value>;
26
27Function * makeThreadFunction(const std::unique_ptr<kernel::KernelBuilder> & b, const std::string & name) {
28    Function * const f = Function::Create(FunctionType::get(b->getVoidTy(), {b->getVoidPtrTy()}, false), Function::InternalLinkage, name, b->getModule());
29    f->setCallingConv(CallingConv::C);
30    f->arg_begin()->setName("input");
31    return f;
32}
33
34void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel);
35
36/** ------------------------------------------------------------------------------------------------------------- *
37 * @brief generateSegmentParallelPipeline
38 *
39 * Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
40 * operating over an input stream set S, a segment-parallel implementation divides the input
41 * into segments and coordinates a set of T <= K threads to each process one segment at a time.
42 * Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
43 * fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
44 ** ------------------------------------------------------------------------------------------------------------- */
45void generateSegmentParallelPipeline(const std::unique_ptr<KernelBuilder> & iBuilder, const std::vector<Kernel *> & kernels) {
46
47    const unsigned n = kernels.size();
48    Module * const m = iBuilder->getModule();
49    IntegerType * const sizeTy = iBuilder->getSizeTy();
50    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
51    Constant * nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
52    std::vector<Type *> structTypes;
53
54    codegen::BufferSegments = std::max(codegen::BufferSegments, codegen::ThreadNum);
55
56    Value * instance[n];
57    for (unsigned i = 0; i < n; ++i) {
58        instance[i] = kernels[i]->getInstance();
59        structTypes.push_back(instance[i]->getType());
60    }
61    StructType * const sharedStructType = StructType::get(m->getContext(), structTypes);
62    StructType * const threadStructType = StructType::get(m->getContext(), {sharedStructType->getPointerTo(), sizeTy});
63
64    Function * const threadFunc = makeThreadFunction(iBuilder, "segment");
65    Function::arg_iterator args = threadFunc->arg_begin();
66    Value * threadStruct = iBuilder->CreateBitCast(&*(args), threadStructType->getPointerTo());
67
68    // -------------------------------------------------------------------------------------------------------------------------
69    // MAKE SEGMENT PARALLEL PIPELINE THREAD
70    // -------------------------------------------------------------------------------------------------------------------------
71    const auto ip = iBuilder->saveIP();
72
73     // Create the basic blocks for the thread function.
74    BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc);
75    iBuilder->SetInsertPoint(entryBlock);
76    Value * const sharedStatePtr = iBuilder->CreateLoad(iBuilder->CreateGEP(threadStruct, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
77    for (unsigned k = 0; k < n; ++k) {
78        Value * ptr = iBuilder->CreateLoad(iBuilder->CreateGEP(sharedStatePtr, {iBuilder->getInt32(0), iBuilder->getInt32(k)}));
79        kernels[k]->setInstance(ptr);
80    }
81    Value * const segOffset = iBuilder->CreateLoad(iBuilder->CreateGEP(threadStruct, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
82
83    BasicBlock * segmentLoop = BasicBlock::Create(iBuilder->getContext(), "segmentLoop", threadFunc);
84    iBuilder->CreateBr(segmentLoop);
85
86    iBuilder->SetInsertPoint(segmentLoop);
87    PHINode * const segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 2, "segNo");
88    segNo->addIncoming(segOffset, entryBlock);
89
90    Value * terminated = iBuilder->getFalse();
91    Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
92
93    BasicBlock * segmentLoopBody = nullptr;
94    BasicBlock * const exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
95
96    StreamSetBufferMap<Value *> producedPos;
97    StreamSetBufferMap<Value *> consumedPos;
98
99    Value * cycleCountStart = nullptr;
100    Value * cycleCountEnd = nullptr;
101    if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
102        cycleCountStart = iBuilder->CreateReadCycleCounter();
103    }
104
105    for (unsigned k = 0; k < n; ++k) {
106
107        const auto & kernel = kernels[k];
108
109        BasicBlock * const segmentWait = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Wait", threadFunc);
110
111        BasicBlock * segmentYield = segmentWait;
112        iBuilder->CreateBr(segmentWait);
113
114        segmentLoopBody = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Do", threadFunc);
115
116        iBuilder->SetInsertPoint(segmentWait);
117        const unsigned waitIdx = codegen::DebugOptionIsSet(codegen::SerializeThreads) ? (n - 1) : k;
118
119        iBuilder->setKernel(kernels[waitIdx]);
120        Value * const processedSegmentCount = iBuilder->acquireLogicalSegmentNo();
121        iBuilder->setKernel(kernel);
122
123        assert (processedSegmentCount->getType() == segNo->getType());
124        Value * const ready = iBuilder->CreateICmpEQ(segNo, processedSegmentCount);
125
126        if (kernel->hasNoTerminateAttribute()) {
127            iBuilder->CreateCondBr(ready, segmentLoopBody, segmentYield);
128        } else { // If the kernel was terminated in a previous segment then the pipeline is done.
129            BasicBlock * completionTest = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Completed", threadFunc, 0);
130            BasicBlock * exitBlock = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Exit", threadFunc, 0);
131            iBuilder->CreateCondBr(ready, completionTest, segmentYield);
132
133            iBuilder->SetInsertPoint(completionTest);
134            Value * terminationSignal = iBuilder->getTerminationSignal();
135            iBuilder->CreateCondBr(terminationSignal, exitBlock, segmentLoopBody);
136            iBuilder->SetInsertPoint(exitBlock);
137            // Ensure that the next thread will also exit.
138            iBuilder->releaseLogicalSegmentNo(nextSegNo);
139            iBuilder->CreateBr(exitThreadBlock);
140        }
141
142        // Execute the kernel segment
143        iBuilder->SetInsertPoint(segmentLoopBody);
144        const auto & inputs = kernel->getStreamInputs();
145        std::vector<Value *> args = {kernel->getInstance(), terminated};
146        for (unsigned i = 0; i < inputs.size(); ++i) {
147            const auto f = producedPos.find(kernel->getStreamSetInputBuffer(i));
148            assert (f != producedPos.end());
149            args.push_back(f->second);
150        }
151
152        iBuilder->setKernel(kernel);
153        iBuilder->createDoSegmentCall(args);
154        if (!kernel->hasNoTerminateAttribute()) {
155            terminated = iBuilder->CreateOr(terminated, iBuilder->getTerminationSignal());
156        }
157
158        const auto & outputs = kernel->getStreamOutputs();
159        for (unsigned i = 0; i < outputs.size(); ++i) {           
160            Value * const produced = iBuilder->getProducedItemCount(outputs[i].getName()); // terminated
161            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
162            assert (producedPos.count(buf) == 0);
163            producedPos.emplace(buf, produced);
164        }
165        for (unsigned i = 0; i < inputs.size(); ++i) {
166            Value * const processedItemCount = iBuilder->getProcessedItemCount(inputs[i].getName());
167            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);           
168            auto f = consumedPos.find(buf);
169            if (f == consumedPos.end()) {
170                consumedPos.emplace(buf, processedItemCount);
171            } else {
172                Value * lesser = iBuilder->CreateICmpULT(processedItemCount, f->second);
173                f->second = iBuilder->CreateSelect(lesser, processedItemCount, f->second);
174            }
175        }
176        if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
177            cycleCountEnd = iBuilder->CreateReadCycleCounter();
178            Value * counterPtr = iBuilder->getCycleCountPtr();
179            iBuilder->CreateStore(iBuilder->CreateAdd(iBuilder->CreateLoad(counterPtr), iBuilder->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
180            cycleCountStart = cycleCountEnd;
181        }
182       
183        iBuilder->releaseLogicalSegmentNo(nextSegNo);
184    }
185
186    assert (segmentLoopBody);
187    exitThreadBlock->moveAfter(segmentLoopBody);
188
189    for (const auto consumed : consumedPos) {
190        const StreamSetBuffer * const buf = consumed.first;
191        Kernel * const k = buf->getProducer();
192        const auto & outputs = k->getStreamSetOutputBuffers();
193        for (unsigned i = 0; i < outputs.size(); ++i) {
194            if (outputs[i] == buf) {
195                const auto binding = k->getStreamOutput(i);
196                if (LLVM_UNLIKELY(binding.getRate().isDerived())) {
197                    continue;
198                }
199                iBuilder->setKernel(k);
200                iBuilder->setConsumedItemCount(binding.getName(), consumed.second);
201                break;
202            }
203        }
204    }
205
206    segNo->addIncoming(iBuilder->CreateAdd(segNo, iBuilder->getSize(codegen::ThreadNum)), segmentLoopBody);
207    iBuilder->CreateCondBr(terminated, exitThreadBlock, segmentLoop);
208
209    iBuilder->SetInsertPoint(exitThreadBlock);
210
211    // only call pthread_exit() within spawned threads; otherwise it'll be equivalent to calling exit() within the process
212    BasicBlock * const exitThread = BasicBlock::Create(iBuilder->getContext(), "ExitThread", threadFunc);
213    BasicBlock * const exitFunction = BasicBlock::Create(iBuilder->getContext(), "ExitProcessFunction", threadFunc);
214
215    Value * const exitCond = iBuilder->CreateICmpEQ(segOffset, ConstantInt::getNullValue(segOffset->getType()));
216    iBuilder->CreateCondBr(exitCond, exitFunction, exitThread);
217    iBuilder->SetInsertPoint(exitThread);
218    iBuilder->CreatePThreadExitCall(nullVoidPtrVal);
219    iBuilder->CreateBr(exitFunction);
220    iBuilder->SetInsertPoint(exitFunction);
221    iBuilder->CreateRetVoid();
222
223    // -------------------------------------------------------------------------------------------------------------------------
224    iBuilder->restoreIP(ip);
225
226    for (unsigned i = 0; i < n; ++i) {
227        kernels[i]->setInstance(instance[i]);
228    }
229
230    // -------------------------------------------------------------------------------------------------------------------------
231    // MAKE SEGMENT PARALLEL PIPELINE DRIVER
232    // -------------------------------------------------------------------------------------------------------------------------
233    const unsigned threads = codegen::ThreadNum - 1;
234    assert (codegen::ThreadNum > 1);
235    Type * const pthreadsTy = ArrayType::get(sizeTy, threads);
236    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
237    Value * threadIdPtr[threads];
238
239    for (unsigned i = 0; i < threads; ++i) {
240        threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
241    }
242
243    for (unsigned i = 0; i < n; ++i) {
244        iBuilder->setKernel(kernels[i]);
245        iBuilder->releaseLogicalSegmentNo(iBuilder->getSize(0));
246    }
247
248    AllocaInst * const sharedStruct = iBuilder->CreateCacheAlignedAlloca(sharedStructType);
249    for (unsigned i = 0; i < n; ++i) {
250        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
251        iBuilder->CreateStore(kernels[i]->getInstance(), ptr);
252    }
253
254    // use the process thread to handle the initial segment function after spawning (n - 1) threads to handle the subsequent offsets
255    for (unsigned i = 0; i < threads; ++i) {
256        AllocaInst * const threadState = iBuilder->CreateAlloca(threadStructType);
257        iBuilder->CreateStore(sharedStruct, iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
258        iBuilder->CreateStore(iBuilder->getSize(i + 1), iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
259        iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, threadFunc, threadState);
260    }
261
262    AllocaInst * const threadState = iBuilder->CreateAlloca(threadStructType);
263    iBuilder->CreateStore(sharedStruct, iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
264    iBuilder->CreateStore(iBuilder->getSize(0), iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
265    iBuilder->CreateCall(threadFunc, iBuilder->CreatePointerCast(threadState, voidPtrTy));
266
267    AllocaInst * const status = iBuilder->CreateAlloca(voidPtrTy);
268    for (unsigned i = 0; i < threads; ++i) {
269        Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]);
270        iBuilder->CreatePThreadJoinCall(threadId, status);
271    }
272   
273    if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
274        for (unsigned k = 0; k < kernels.size(); k++) {
275            auto & kernel = kernels[k];
276            iBuilder->setKernel(kernel);
277            const auto & inputs = kernel->getStreamInputs();
278            const auto & outputs = kernel->getStreamOutputs();
279            Value * items = nullptr;
280            if (inputs.empty()) {
281                items = iBuilder->getProducedItemCount(outputs[0].getName());
282            } else {
283                items = iBuilder->getProcessedItemCount(inputs[0].getName());
284            }
285            Value * fItems = iBuilder->CreateUIToFP(items, iBuilder->getDoubleTy());
286            Value * cycles = iBuilder->CreateLoad(iBuilder->getCycleCountPtr());
287            Value * fCycles = iBuilder->CreateUIToFP(cycles, iBuilder->getDoubleTy());
288            std::string formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
289            Value * stringPtr = iBuilder->CreatePointerCast(iBuilder->GetString(formatString), iBuilder->getInt8PtrTy());
290            iBuilder->CreateCall(iBuilder->GetDprintf(), {iBuilder->getInt32(2), stringPtr, fItems, fCycles, iBuilder->CreateFDiv(fCycles, fItems)});
291        }
292    }
293   
294}
295
296
297/** ------------------------------------------------------------------------------------------------------------- *
298 * @brief generateParallelPipeline
299 ** ------------------------------------------------------------------------------------------------------------- */
300void generateParallelPipeline(const std::unique_ptr<KernelBuilder> & iBuilder, const std::vector<Kernel *> &kernels) {
301
302    Module * const m = iBuilder->getModule();
303    IntegerType * const sizeTy = iBuilder->getSizeTy();
304    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
305    ConstantInt * bufferSegments = ConstantInt::get(sizeTy, codegen::BufferSegments - 1);
306    ConstantInt * segmentItems = ConstantInt::get(sizeTy, codegen::SegmentSize * iBuilder->getBitBlockWidth());
307    Constant * const nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
308
309    const unsigned n = kernels.size();
310
311    Type * const pthreadsTy = ArrayType::get(sizeTy, n);
312    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
313    Value * threadIdPtr[n];
314    for (unsigned i = 0; i < n; ++i) {
315        threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
316    }
317
318    Value * instance[n];
319    Type * structTypes[n];
320    for (unsigned i = 0; i < n; ++i) {
321        instance[i] = kernels[i]->getInstance();
322        structTypes[i] = instance[i]->getType();
323    }
324
325    Type * const sharedStructType = StructType::get(m->getContext(), ArrayRef<Type *>{structTypes, n});
326
327
328    AllocaInst * sharedStruct = iBuilder->CreateAlloca(sharedStructType);
329    for (unsigned i = 0; i < n; ++i) {
330        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
331        iBuilder->CreateStore(instance[i], ptr);
332    }
333
334    for (auto & kernel : kernels) {
335        iBuilder->setKernel(kernel);
336        iBuilder->releaseLogicalSegmentNo(iBuilder->getSize(0));
337    }
338
339    // GENERATE THE PRODUCING AND CONSUMING KERNEL MAPS
340    StreamSetBufferMap<unsigned> producingKernel;
341    StreamSetBufferMap<std::vector<unsigned>> consumingKernels;
342    for (unsigned id = 0; id < n; ++id) {
343        const auto & kernel = kernels[id];
344        const auto & inputs = kernel->getStreamInputs();
345        const auto & outputs = kernel->getStreamOutputs();
346        // add any outputs from this kernel to the producing kernel map
347        for (unsigned j = 0; j < outputs.size(); ++j) {
348            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(j);
349            if (LLVM_UNLIKELY(producingKernel.count(buf) != 0)) {
350                report_fatal_error(kernel->getName() + " redefines stream set " + outputs[j].getName());
351            }
352            producingKernel.emplace(buf, id);
353        }
354        // and any inputs to the consuming kernels list
355        for (unsigned j = 0; j < inputs.size(); ++j) {
356            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(j);
357            auto f = consumingKernels.find(buf);
358            if (f == consumingKernels.end()) {
359                if (LLVM_UNLIKELY(producingKernel.count(buf) == 0)) {
360                    report_fatal_error(kernel->getName() + " uses stream set " + inputs[j].getName() + " prior to its definition");
361                }
362                consumingKernels.emplace(buf, std::vector<unsigned>{ id });
363            } else {
364                f->second.push_back(id);
365            }
366        }
367    }
368
369    const auto ip = iBuilder->saveIP();
370
371    // GENERATE UNIQUE PIPELINE PARALLEL THREAD FUNCTION FOR EACH KERNEL
372    FlatSet<unsigned> kernelSet;
373    kernelSet.reserve(n);
374
375    Function * thread_functions[n];
376    Value * producerSegNo[n];
377    for (unsigned id = 0; id < n; id++) {
378        const auto & kernel = kernels[id];
379
380        iBuilder->setKernel(kernel);
381
382        const auto & inputs = kernel->getStreamInputs();
383
384        Function * const threadFunc = makeThreadFunction(iBuilder, "ppt:" + kernel->getName());
385        Function::arg_iterator ai = threadFunc->arg_begin();
386        Value * sharedStruct = iBuilder->CreateBitCast(&*(ai), sharedStructType->getPointerTo());
387       
388         // Create the basic blocks for the thread function.
389        BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc);
390        BasicBlock * outputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "outputCheck", threadFunc);
391        BasicBlock * inputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "inputCheck", threadFunc);
392        BasicBlock * doSegmentBlock = BasicBlock::Create(iBuilder->getContext(), "doSegment", threadFunc);
393        BasicBlock * exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
394
395        iBuilder->SetInsertPoint(entryBlock);
396
397
398        for (unsigned k = 0; k < n; k++) {
399            Value * const ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
400            kernels[k]->setInstance(iBuilder->CreateLoad(ptr));
401        }
402
403        iBuilder->CreateBr(outputCheckBlock);
404
405        // Check whether the output buffers are ready for more data
406        iBuilder->SetInsertPoint(outputCheckBlock);
407        PHINode * segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 3, "segNo");
408        segNo->addIncoming(iBuilder->getSize(0), entryBlock);
409        segNo->addIncoming(segNo, outputCheckBlock);
410
411        Value * outputWaitCond = iBuilder->getTrue();
412        for (const StreamSetBuffer * buf : kernel->getStreamSetOutputBuffers()) {
413            const auto & list = consumingKernels[buf];
414            assert(std::is_sorted(list.begin(), list.end()));
415            kernelSet.insert(list.begin(), list.end());
416        }
417        for (unsigned k : kernelSet) {
418            iBuilder->setKernel(kernels[k]);
419            Value * consumerSegNo = iBuilder->acquireLogicalSegmentNo();
420            assert (consumerSegNo->getType() == segNo->getType());
421            Value * consumedSegNo = iBuilder->CreateAdd(consumerSegNo, bufferSegments);
422            outputWaitCond = iBuilder->CreateAnd(outputWaitCond, iBuilder->CreateICmpULE(segNo, consumedSegNo));
423        }
424        kernelSet.clear();
425        iBuilder->setKernel(kernel);
426        iBuilder->CreateCondBr(outputWaitCond, inputCheckBlock, outputCheckBlock);
427
428        // Check whether the input buffers have enough data for this kernel to begin
429        iBuilder->SetInsertPoint(inputCheckBlock);
430        for (const StreamSetBuffer * buf : kernel->getStreamSetInputBuffers()) {
431            kernelSet.insert(producingKernel[buf]);
432        }
433
434        Value * inputWaitCond = iBuilder->getTrue();
435        for (unsigned k : kernelSet) {
436            iBuilder->setKernel(kernels[k]);
437            producerSegNo[k] = iBuilder->acquireLogicalSegmentNo();
438            assert (producerSegNo[k]->getType() == segNo->getType());
439            inputWaitCond = iBuilder->CreateAnd(inputWaitCond, iBuilder->CreateICmpULT(segNo, producerSegNo[k]));
440        }
441        iBuilder->setKernel(kernel);
442        iBuilder->CreateCondBr(inputWaitCond, doSegmentBlock, inputCheckBlock);
443
444        // Process the segment
445        iBuilder->SetInsertPoint(doSegmentBlock);
446
447        Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
448        Value * terminated = nullptr;
449        if (kernelSet.empty()) {
450            // if this kernel has no input streams, the kernel itself must decide when it terminates.
451            terminated = iBuilder->getTerminationSignal();
452        } else {
453            // ... otherwise the kernel terminates only when it exhausts all of its input streams
454            terminated = iBuilder->getTrue();
455            for (unsigned k : kernelSet) {
456                iBuilder->setKernel(kernels[k]);
457                terminated = iBuilder->CreateAnd(terminated, iBuilder->getTerminationSignal());
458                terminated = iBuilder->CreateAnd(terminated, iBuilder->CreateICmpEQ(nextSegNo, producerSegNo[k]));
459            }
460            kernelSet.clear();
461            iBuilder->setKernel(kernel);
462        }
463
464        std::vector<Value *> args = {kernel->getInstance(), terminated};
465        args.insert(args.end(), inputs.size(), iBuilder->CreateMul(segmentItems, segNo));
466
467        iBuilder->createDoSegmentCall(args);
468        segNo->addIncoming(nextSegNo, doSegmentBlock);
469        iBuilder->releaseLogicalSegmentNo(nextSegNo);
470
471        iBuilder->CreateCondBr(terminated, exitThreadBlock, outputCheckBlock);
472
473        iBuilder->SetInsertPoint(exitThreadBlock);
474
475        iBuilder->CreatePThreadExitCall(nullVoidPtrVal);
476
477        iBuilder->CreateRetVoid();
478
479        thread_functions[id] = threadFunc;
480    }
481
482    iBuilder->restoreIP(ip);
483
484    for (unsigned i = 0; i < n; ++i) {
485        kernels[i]->setInstance(instance[i]);
486    }
487
488    for (unsigned i = 0; i < n; ++i) {
489        iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, thread_functions[i], sharedStruct);
490    }
491
492    AllocaInst * const status = iBuilder->CreateAlloca(voidPtrTy);
493    for (unsigned i = 0; i < n; ++i) {
494        Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]);
495        iBuilder->CreatePThreadJoinCall(threadId, status);
496    }
497}
498
499/** ------------------------------------------------------------------------------------------------------------- *
500 * @brief generatePipelineLoop
501 ** ------------------------------------------------------------------------------------------------------------- */
502void generatePipelineLoop(const std::unique_ptr<KernelBuilder> & iBuilder, const std::vector<Kernel *> & kernels) {
503
504    BasicBlock * entryBlock = iBuilder->GetInsertBlock();
505    Function * main = entryBlock->getParent();
506
507    // Create the basic blocks for the loop.
508    BasicBlock * pipelineLoop = BasicBlock::Create(iBuilder->getContext(), "pipelineLoop", main);
509    BasicBlock * pipelineExit = BasicBlock::Create(iBuilder->getContext(), "pipelineExit", main);
510
511    StreamSetBufferMap<Value *> producedPos;
512    StreamSetBufferMap<Value *> consumedPos;
513
514    iBuilder->CreateBr(pipelineLoop);
515    iBuilder->SetInsertPoint(pipelineLoop);
516   
517    Value * cycleCountStart = nullptr;
518    Value * cycleCountEnd = nullptr;
519    if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
520        cycleCountStart = iBuilder->CreateReadCycleCounter();
521    }
522    Value * terminated = iBuilder->getFalse();
523
524    for (Kernel * const kernel : kernels) {
525
526        iBuilder->setKernel(kernel);
527        const auto & inputs = kernel->getStreamInputs();
528        const auto & outputs = kernel->getStreamOutputs();
529
530        std::vector<Value *> args = {kernel->getInstance(), terminated};
531
532        for (unsigned i = 0; i < inputs.size(); ++i) {
533            const auto f = producedPos.find(kernel->getStreamSetInputBuffer(i));
534            if (LLVM_UNLIKELY(f == producedPos.end())) {
535                report_fatal_error(kernel->getName() + " uses stream set " + inputs[i].getName() + " prior to its definition");
536            }
537            args.push_back(f->second);
538        }
539
540        applyOutputBufferExpansions(iBuilder, kernel);
541
542        iBuilder->createDoSegmentCall(args);
543
544        if (!kernel->hasNoTerminateAttribute()) {
545            Value * terminatedSignal = iBuilder->getTerminationSignal();
546            terminated = iBuilder->CreateOr(terminated, terminatedSignal);
547        }
548        for (unsigned i = 0; i < outputs.size(); ++i) {
549            Value * const produced = iBuilder->getProducedItemCount(outputs[i].getName()); // , terminated
550            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
551            assert (producedPos.count(buf) == 0);
552            producedPos.emplace(buf, produced);
553        }
554
555        for (unsigned i = 0; i < inputs.size(); ++i) {
556            Value * const processed = iBuilder->getProcessedItemCount(inputs[i].getName());
557            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);
558            auto f = consumedPos.find(buf);
559            if (f == consumedPos.end()) {
560                consumedPos.emplace(buf, processed);
561            } else {
562                Value * lesser = iBuilder->CreateICmpULT(processed, f->second);
563                f->second = iBuilder->CreateSelect(lesser, processed, f->second);
564            }
565        }
566        if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
567            cycleCountEnd = iBuilder->CreateReadCycleCounter();
568            Value * counterPtr = iBuilder->getCycleCountPtr();
569            iBuilder->CreateStore(iBuilder->CreateAdd(iBuilder->CreateLoad(counterPtr), iBuilder->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
570            cycleCountStart = cycleCountEnd;
571        }
572
573        Value * const segNo = iBuilder->acquireLogicalSegmentNo();
574        Value * nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
575        iBuilder->releaseLogicalSegmentNo(nextSegNo);
576    }
577
578    for (const auto consumed : consumedPos) {
579        const StreamSetBuffer * const buf = consumed.first;
580        Kernel * const k = buf->getProducer();
581        const auto & outputs = k->getStreamSetOutputBuffers();
582        for (unsigned i = 0; i < outputs.size(); ++i) {
583            if (outputs[i] == buf) {
584                const auto binding = k->getStreamOutput(i);
585                if (LLVM_UNLIKELY(binding.getRate().isDerived())) {
586                    continue;
587                }
588                iBuilder->setKernel(k);
589                iBuilder->setConsumedItemCount(binding.getName(), consumed.second);
590                break;
591            }
592        }
593    }
594
595    iBuilder->CreateCondBr(terminated, pipelineExit, pipelineLoop);
596
597    iBuilder->SetInsertPoint(pipelineExit);
598
599    if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
600        for (unsigned k = 0; k < kernels.size(); k++) {
601            auto & kernel = kernels[k];
602            iBuilder->setKernel(kernel);
603            const auto & inputs = kernel->getStreamInputs();
604            const auto & outputs = kernel->getStreamOutputs();
605            Value * items = nullptr;
606            if (inputs.empty()) {
607                items = iBuilder->getProducedItemCount(outputs[0].getName());
608            } else {
609                items = iBuilder->getProcessedItemCount(inputs[0].getName());
610            }
611            Value * fItems = iBuilder->CreateUIToFP(items, iBuilder->getDoubleTy());
612            Value * cycles = iBuilder->CreateLoad(iBuilder->getCycleCountPtr());
613            Value * fCycles = iBuilder->CreateUIToFP(cycles, iBuilder->getDoubleTy());
614            std::string formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
615            Value * stringPtr = iBuilder->CreatePointerCast(iBuilder->GetString(formatString), iBuilder->getInt8PtrTy());
616            iBuilder->CreateCall(iBuilder->GetDprintf(), {iBuilder->getInt32(2), stringPtr, fItems, fCycles, iBuilder->CreateFDiv(fCycles, fItems)});
617        }
618    }
619}
620
621void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const std::string & name, DynamicBuffer * const db, const uint64_t l) {
622
623    BasicBlock * const doExpand = b->CreateBasicBlock(name + "Expand");
624    BasicBlock * const nextBlock = b->GetInsertBlock()->getNextNode();
625    doExpand->moveAfter(b->GetInsertBlock());
626    BasicBlock * const bufferReady = b->CreateBasicBlock(name + "Ready");
627    bufferReady->moveAfter(doExpand);
628    if (nextBlock) nextBlock->moveAfter(bufferReady);
629
630    Value * const handle = db->getStreamSetHandle();
631
632    Value * const produced = b->getProducedItemCount(name);
633    Value * const consumed = b->getConsumedItemCount(name);
634    Value * const required = b->CreateAdd(b->CreateSub(produced, consumed), b->getSize(2 * l));
635
636    b->CreateCondBr(b->CreateICmpUGT(required, db->getCapacity(b.get(), handle)), doExpand, bufferReady);
637
638    b->SetInsertPoint(doExpand);
639    db->doubleCapacity(b.get(), handle);
640    // Ensure that capacity is sufficient by successive doubling, if necessary.
641    b->CreateCondBr(b->CreateICmpUGT(required, db->getBufferedSize(b.get(), handle)), doExpand, bufferReady);
642
643    b->SetInsertPoint(bufferReady);
644}
645
646inline const Binding & getBinding(const Kernel * k, const std::string & name) {
647    Port port; unsigned index;
648    std::tie(port, index) = k->getStreamPort(name);
649    if (port == Port::Input) {
650        return k->getStreamInput(index);
651    } else {
652        return k->getStreamOutput(index);
653    }
654}
655
656void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * k) {
657    const auto & outputs = k->getStreamSetOutputBuffers();
658    for (unsigned i = 0; i < outputs.size(); i++) {
659        if (isa<DynamicBuffer>(outputs[i])) {
660            const ProcessingRate & rate = k->getStreamOutput(i).getRate();
661            if (rate.isFixed() || rate.isBounded()) {
662                const auto & name = k->getStreamOutput(i).getName();
663                const auto l = rate.getUpperBound() * k->getKernelStride();
664                applyOutputBufferExpansions(b, name, cast<DynamicBuffer>(outputs[i]), l);
665            } else if (rate.isExactlyRelative()) {
666                const auto binding = getBinding(k, rate.getReference());
667                const ProcessingRate & ref = binding.getRate();
668                if (rate.isFixed() || rate.isBounded()) {
669                    const auto & name = k->getStreamOutput(i).getName();
670                    const auto l = (ref.getUpperBound() * rate.getNumerator() * k->getKernelStride() + rate.getDenominator() - 1) / rate.getDenominator();
671                    applyOutputBufferExpansions(b, name, cast<DynamicBuffer>(outputs[i]), l);
672                }
673            }
674        }
675    }
676}
Note: See TracBrowser for help on using the repository browser.