source: icGREP/icgrep-devel/icgrep/toolchain/pipeline.cpp @ 5781

Last change on this file since 5781 was 5761, checked in by nmedfort, 19 months ago

Cache signature is now written into .kernel bitcode file. Minor bug fix and revision of GrepEngine::DoGrepThreadMethod?

File size: 30.7 KB
Line 
1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
6#include "pipeline.h"
7#include <toolchain/toolchain.h>
8#include <kernels/kernel.h>
9#include <kernels/streamset.h>
10#include <llvm/IR/Module.h>
11#include <boost/container/flat_set.hpp>
12#include <boost/container/flat_map.hpp>
13#include <kernels/kernel_builder.h>
14
15using namespace kernel;
16using namespace parabix;
17using namespace llvm;
18
19using Port = Kernel::Port;
20
21template <typename Value>
22using StreamSetBufferMap = boost::container::flat_map<const StreamSetBuffer *, Value>;
23
24template <typename Value>
25using FlatSet = boost::container::flat_set<Value>;
26
27Function * makeThreadFunction(const std::unique_ptr<kernel::KernelBuilder> & b, const std::string & name) {
28    Function * const f = Function::Create(FunctionType::get(b->getVoidTy(), {b->getVoidPtrTy()}, false), Function::InternalLinkage, name, b->getModule());
29    f->setCallingConv(CallingConv::C);
30    f->arg_begin()->setName("state");
31    return f;
32}
33
34void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel);
35
36/** ------------------------------------------------------------------------------------------------------------- *
37 * @brief generateSegmentParallelPipeline
38 *
39 * Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
40 * operating over an input stream set S, a segment-parallel implementation divides the input
41 * into segments and coordinates a set of T <= K threads to each process one segment at a time.
42 * Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
43 * fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
44 ** ------------------------------------------------------------------------------------------------------------- */
45void generateSegmentParallelPipeline(const std::unique_ptr<KernelBuilder> & iBuilder, const std::vector<Kernel *> & kernels) {
46
47    const unsigned n = kernels.size();
48    Module * const m = iBuilder->getModule();
49    IntegerType * const sizeTy = iBuilder->getSizeTy();
50    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
51    Constant * nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
52    std::vector<Type *> structTypes;
53    codegen::BufferSegments = std::max(codegen::BufferSegments, codegen::ThreadNum);
54
55    Value * instance[n];
56    for (unsigned i = 0; i < n; ++i) {
57        instance[i] = kernels[i]->getInstance();
58        structTypes.push_back(instance[i]->getType());
59    }
60    StructType * const sharedStructType = StructType::get(m->getContext(), structTypes);
61    StructType * const threadStructType = StructType::get(m->getContext(), {sharedStructType->getPointerTo(), sizeTy});
62
63    const auto ip = iBuilder->saveIP();
64
65    Function * const threadFunc = makeThreadFunction(iBuilder, "segment");
66    auto args = threadFunc->arg_begin();
67
68    // -------------------------------------------------------------------------------------------------------------------------
69    // MAKE SEGMENT PARALLEL PIPELINE THREAD
70    // -------------------------------------------------------------------------------------------------------------------------
71
72     // Create the basic blocks for the thread function.
73    BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc);
74    iBuilder->SetInsertPoint(entryBlock);
75
76    Value * const threadStruct = iBuilder->CreateBitCast(&*(args), threadStructType->getPointerTo());
77
78    Value * const sharedStatePtr = iBuilder->CreateLoad(iBuilder->CreateGEP(threadStruct, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
79    for (unsigned k = 0; k < n; ++k) {
80        Value * ptr = iBuilder->CreateLoad(iBuilder->CreateGEP(sharedStatePtr, {iBuilder->getInt32(0), iBuilder->getInt32(k)}));
81        kernels[k]->setInstance(ptr);
82    }
83    Value * const segOffset = iBuilder->CreateLoad(iBuilder->CreateGEP(threadStruct, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
84
85    BasicBlock * segmentLoop = BasicBlock::Create(iBuilder->getContext(), "segmentLoop", threadFunc);
86    iBuilder->CreateBr(segmentLoop);
87
88    iBuilder->SetInsertPoint(segmentLoop);
89    PHINode * const segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 2, "segNo");
90    segNo->addIncoming(segOffset, entryBlock);
91
92    Value * terminated = iBuilder->getFalse();
93    Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
94
95    BasicBlock * segmentLoopBody = nullptr;
96    BasicBlock * const exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
97
98    StreamSetBufferMap<Value *> producedPos;
99    StreamSetBufferMap<Value *> consumedPos;
100
101    Value * cycleCountStart = nullptr;
102    Value * cycleCountEnd = nullptr;
103    if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
104        cycleCountStart = iBuilder->CreateReadCycleCounter();
105    }
106
107    for (unsigned k = 0; k < n; ++k) {
108
109        const auto & kernel = kernels[k];
110
111        BasicBlock * const segmentWait = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Wait", threadFunc);
112
113        BasicBlock * segmentYield = segmentWait;
114        iBuilder->CreateBr(segmentWait);
115
116        segmentLoopBody = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Do", threadFunc);
117
118        iBuilder->SetInsertPoint(segmentWait);
119        const unsigned waitIdx = codegen::DebugOptionIsSet(codegen::SerializeThreads) ? (n - 1) : k;
120
121        iBuilder->setKernel(kernels[waitIdx]);
122        Value * const processedSegmentCount = iBuilder->acquireLogicalSegmentNo();
123        iBuilder->setKernel(kernel);
124
125        assert (processedSegmentCount->getType() == segNo->getType());
126        Value * const ready = iBuilder->CreateICmpEQ(segNo, processedSegmentCount);
127
128        if (kernel->hasNoTerminateAttribute()) {
129            iBuilder->CreateCondBr(ready, segmentLoopBody, segmentYield);
130        } else { // If the kernel was terminated in a previous segment then the pipeline is done.
131            BasicBlock * completionTest = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Completed", threadFunc, 0);
132            BasicBlock * exitBlock = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Exit", threadFunc, 0);
133            iBuilder->CreateCondBr(ready, completionTest, segmentYield);
134
135            iBuilder->SetInsertPoint(completionTest);
136            Value * terminationSignal = iBuilder->getTerminationSignal();
137            iBuilder->CreateCondBr(terminationSignal, exitBlock, segmentLoopBody);
138            iBuilder->SetInsertPoint(exitBlock);
139            // Ensure that the next thread will also exit.
140            iBuilder->releaseLogicalSegmentNo(nextSegNo);
141            iBuilder->CreateBr(exitThreadBlock);
142        }
143
144        // Execute the kernel segment
145        iBuilder->SetInsertPoint(segmentLoopBody);
146        const auto & inputs = kernel->getStreamInputs();
147        std::vector<Value *> args = {kernel->getInstance(), terminated};
148        for (unsigned i = 0; i < inputs.size(); ++i) {
149            const auto f = producedPos.find(kernel->getStreamSetInputBuffer(i));
150            assert (f != producedPos.end());
151            args.push_back(f->second);
152        }
153
154        iBuilder->setKernel(kernel);
155        iBuilder->createDoSegmentCall(args);
156        if (!kernel->hasNoTerminateAttribute()) {
157            terminated = iBuilder->CreateOr(terminated, iBuilder->getTerminationSignal());
158        }
159
160        const auto & outputs = kernel->getStreamOutputs();
161        for (unsigned i = 0; i < outputs.size(); ++i) {           
162            Value * const produced = iBuilder->getProducedItemCount(outputs[i].getName()); // terminated
163            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
164            assert (producedPos.count(buf) == 0);
165            producedPos.emplace(buf, produced);
166        }
167        for (unsigned i = 0; i < inputs.size(); ++i) {
168            Value * const processedItemCount = iBuilder->getProcessedItemCount(inputs[i].getName());
169            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);           
170            auto f = consumedPos.find(buf);
171            if (f == consumedPos.end()) {
172                consumedPos.emplace(buf, processedItemCount);
173            } else {
174                Value * lesser = iBuilder->CreateICmpULT(processedItemCount, f->second);
175                f->second = iBuilder->CreateSelect(lesser, processedItemCount, f->second);
176            }
177        }
178        if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
179            cycleCountEnd = iBuilder->CreateReadCycleCounter();
180            Value * counterPtr = iBuilder->getCycleCountPtr();
181            iBuilder->CreateStore(iBuilder->CreateAdd(iBuilder->CreateLoad(counterPtr), iBuilder->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
182            cycleCountStart = cycleCountEnd;
183        }
184       
185        iBuilder->releaseLogicalSegmentNo(nextSegNo);
186    }
187
188    assert (segmentLoopBody);
189    exitThreadBlock->moveAfter(segmentLoopBody);
190
191    for (const auto consumed : consumedPos) {
192        const StreamSetBuffer * const buf = consumed.first;
193        Kernel * const k = buf->getProducer();
194        const auto & outputs = k->getStreamSetOutputBuffers();
195        for (unsigned i = 0; i < outputs.size(); ++i) {
196            if (outputs[i] == buf) {
197                const auto & binding = k->getStreamOutput(i);
198                if (LLVM_UNLIKELY(binding.getRate().isDerived())) {
199                    continue;
200                }
201                iBuilder->setKernel(k);
202                iBuilder->setConsumedItemCount(binding.getName(), consumed.second);
203                break;
204            }
205        }
206    }
207
208    segNo->addIncoming(iBuilder->CreateAdd(segNo, iBuilder->getSize(codegen::ThreadNum)), segmentLoopBody);
209    iBuilder->CreateCondBr(terminated, exitThreadBlock, segmentLoop);
210
211    iBuilder->SetInsertPoint(exitThreadBlock);
212
213    // only call pthread_exit() within spawned threads; otherwise it'll be equivalent to calling exit() within the process
214    BasicBlock * const exitThread = BasicBlock::Create(iBuilder->getContext(), "ExitThread", threadFunc);
215    BasicBlock * const exitFunction = BasicBlock::Create(iBuilder->getContext(), "ExitProcessFunction", threadFunc);
216
217    Value * const exitCond = iBuilder->CreateICmpEQ(segOffset, ConstantInt::getNullValue(segOffset->getType()));
218    iBuilder->CreateCondBr(exitCond, exitFunction, exitThread);
219    iBuilder->SetInsertPoint(exitThread);
220    iBuilder->CreatePThreadExitCall(nullVoidPtrVal);
221    iBuilder->CreateBr(exitFunction);
222    iBuilder->SetInsertPoint(exitFunction);
223    iBuilder->CreateRetVoid();
224
225    // -------------------------------------------------------------------------------------------------------------------------
226    iBuilder->restoreIP(ip);
227
228    for (unsigned i = 0; i < n; ++i) {
229        kernels[i]->setInstance(instance[i]);
230    }
231
232    // -------------------------------------------------------------------------------------------------------------------------
233    // MAKE SEGMENT PARALLEL PIPELINE DRIVER
234    // -------------------------------------------------------------------------------------------------------------------------
235    const unsigned threads = codegen::ThreadNum - 1;
236    assert (codegen::ThreadNum > 1);
237    Type * const pthreadsTy = ArrayType::get(sizeTy, threads);
238    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
239    Value * threadIdPtr[threads];
240
241    for (unsigned i = 0; i < threads; ++i) {
242        threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
243    }
244
245    for (unsigned i = 0; i < n; ++i) {
246        iBuilder->setKernel(kernels[i]);
247        iBuilder->releaseLogicalSegmentNo(iBuilder->getSize(0));
248    }
249
250    AllocaInst * const sharedStruct = iBuilder->CreateCacheAlignedAlloca(sharedStructType);
251    for (unsigned i = 0; i < n; ++i) {
252        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
253        iBuilder->CreateStore(kernels[i]->getInstance(), ptr);
254    }
255
256    // use the process thread to handle the initial segment function after spawning (n - 1) threads to handle the subsequent offsets
257    for (unsigned i = 0; i < threads; ++i) {
258        AllocaInst * const threadState = iBuilder->CreateAlloca(threadStructType);
259        iBuilder->CreateStore(sharedStruct, iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
260        iBuilder->CreateStore(iBuilder->getSize(i + 1), iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
261        iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, threadFunc, threadState);
262    }
263
264    AllocaInst * const threadState = iBuilder->CreateAlloca(threadStructType);
265    iBuilder->CreateStore(sharedStruct, iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
266    iBuilder->CreateStore(iBuilder->getSize(0), iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
267    iBuilder->CreateCall(threadFunc, iBuilder->CreatePointerCast(threadState, voidPtrTy));
268
269    AllocaInst * const status = iBuilder->CreateAlloca(voidPtrTy);
270    for (unsigned i = 0; i < threads; ++i) {
271        Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]);
272        iBuilder->CreatePThreadJoinCall(threadId, status);
273    }
274   
275    if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
276        for (unsigned k = 0; k < kernels.size(); k++) {
277            auto & kernel = kernels[k];
278            iBuilder->setKernel(kernel);
279            const auto & inputs = kernel->getStreamInputs();
280            const auto & outputs = kernel->getStreamOutputs();
281            Value * items = nullptr;
282            if (inputs.empty()) {
283                items = iBuilder->getProducedItemCount(outputs[0].getName());
284            } else {
285                items = iBuilder->getProcessedItemCount(inputs[0].getName());
286            }
287            Value * fItems = iBuilder->CreateUIToFP(items, iBuilder->getDoubleTy());
288            Value * cycles = iBuilder->CreateLoad(iBuilder->getCycleCountPtr());
289            Value * fCycles = iBuilder->CreateUIToFP(cycles, iBuilder->getDoubleTy());
290            const auto formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
291            Value * stringPtr = iBuilder->CreatePointerCast(iBuilder->GetString(formatString), iBuilder->getInt8PtrTy());
292            iBuilder->CreateCall(iBuilder->GetDprintf(), {iBuilder->getInt32(2), stringPtr, fItems, fCycles, iBuilder->CreateFDiv(fCycles, fItems)});
293        }
294    }
295   
296}
297
298
299/** ------------------------------------------------------------------------------------------------------------- *
300 * @brief generateParallelPipeline
301 ** ------------------------------------------------------------------------------------------------------------- */
302void generateParallelPipeline(const std::unique_ptr<KernelBuilder> & iBuilder, const std::vector<Kernel *> &kernels) {
303
304    Module * const m = iBuilder->getModule();
305    IntegerType * const sizeTy = iBuilder->getSizeTy();
306    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
307    ConstantInt * bufferSegments = ConstantInt::get(sizeTy, codegen::BufferSegments - 1);
308    ConstantInt * segmentItems = ConstantInt::get(sizeTy, codegen::SegmentSize * iBuilder->getBitBlockWidth());
309    Constant * const nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
310
311    const unsigned n = kernels.size();
312
313    Type * const pthreadsTy = ArrayType::get(sizeTy, n);
314    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
315    Value * threadIdPtr[n];
316    for (unsigned i = 0; i < n; ++i) {
317        threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
318    }
319
320    Value * instance[n];
321    Type * structTypes[n];
322    for (unsigned i = 0; i < n; ++i) {
323        instance[i] = kernels[i]->getInstance();
324        structTypes[i] = instance[i]->getType();
325    }
326
327    Type * const sharedStructType = StructType::get(m->getContext(), ArrayRef<Type *>{structTypes, n});
328
329
330    AllocaInst * sharedStruct = iBuilder->CreateAlloca(sharedStructType);
331    for (unsigned i = 0; i < n; ++i) {
332        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
333        iBuilder->CreateStore(instance[i], ptr);
334    }
335
336    for (auto & kernel : kernels) {
337        iBuilder->setKernel(kernel);
338        iBuilder->releaseLogicalSegmentNo(iBuilder->getSize(0));
339    }
340
341    // GENERATE THE PRODUCING AND CONSUMING KERNEL MAPS
342    StreamSetBufferMap<unsigned> producingKernel;
343    StreamSetBufferMap<std::vector<unsigned>> consumingKernels;
344    for (unsigned id = 0; id < n; ++id) {
345        const auto & kernel = kernels[id];
346        const auto & inputs = kernel->getStreamInputs();
347        const auto & outputs = kernel->getStreamOutputs();
348        // add any outputs from this kernel to the producing kernel map
349        for (unsigned j = 0; j < outputs.size(); ++j) {
350            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(j);
351            if (LLVM_UNLIKELY(producingKernel.count(buf) != 0)) {
352                report_fatal_error(kernel->getName() + " redefines stream set " + outputs[j].getName());
353            }
354            producingKernel.emplace(buf, id);
355        }
356        // and any inputs to the consuming kernels list
357        for (unsigned j = 0; j < inputs.size(); ++j) {
358            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(j);
359            auto f = consumingKernels.find(buf);
360            if (f == consumingKernels.end()) {
361                if (LLVM_UNLIKELY(producingKernel.count(buf) == 0)) {
362                    report_fatal_error(kernel->getName() + " uses stream set " + inputs[j].getName() + " prior to its definition");
363                }
364                consumingKernels.emplace(buf, std::vector<unsigned>{ id });
365            } else {
366                f->second.push_back(id);
367            }
368        }
369    }
370
371    const auto ip = iBuilder->saveIP();
372
373    // GENERATE UNIQUE PIPELINE PARALLEL THREAD FUNCTION FOR EACH KERNEL
374    FlatSet<unsigned> kernelSet;
375    kernelSet.reserve(n);
376
377    Function * thread_functions[n];
378    Value * producerSegNo[n];
379    for (unsigned id = 0; id < n; id++) {
380        const auto & kernel = kernels[id];
381
382        iBuilder->setKernel(kernel);
383
384        const auto & inputs = kernel->getStreamInputs();
385
386        Function * const threadFunc = makeThreadFunction(iBuilder, "ppt:" + kernel->getName());
387        auto ai = threadFunc->arg_begin();
388       
389         // Create the basic blocks for the thread function.
390        BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc);
391        BasicBlock * outputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "outputCheck", threadFunc);
392        BasicBlock * inputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "inputCheck", threadFunc);
393        BasicBlock * doSegmentBlock = BasicBlock::Create(iBuilder->getContext(), "doSegment", threadFunc);
394        BasicBlock * exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
395
396        iBuilder->SetInsertPoint(entryBlock);
397
398        Value * const sharedStruct = iBuilder->CreateBitCast(&*(ai), sharedStructType->getPointerTo());
399
400        for (unsigned k = 0; k < n; k++) {
401            Value * const ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
402            kernels[k]->setInstance(iBuilder->CreateLoad(ptr));
403        }
404
405        iBuilder->CreateBr(outputCheckBlock);
406
407        // Check whether the output buffers are ready for more data
408        iBuilder->SetInsertPoint(outputCheckBlock);
409        PHINode * segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 3, "segNo");
410        segNo->addIncoming(iBuilder->getSize(0), entryBlock);
411        segNo->addIncoming(segNo, outputCheckBlock);
412
413        Value * outputWaitCond = iBuilder->getTrue();
414        for (const StreamSetBuffer * buf : kernel->getStreamSetOutputBuffers()) {
415            const auto & list = consumingKernels[buf];
416            assert(std::is_sorted(list.begin(), list.end()));
417            kernelSet.insert(list.begin(), list.end());
418        }
419        for (unsigned k : kernelSet) {
420            iBuilder->setKernel(kernels[k]);
421            Value * consumerSegNo = iBuilder->acquireLogicalSegmentNo();
422            assert (consumerSegNo->getType() == segNo->getType());
423            Value * consumedSegNo = iBuilder->CreateAdd(consumerSegNo, bufferSegments);
424            outputWaitCond = iBuilder->CreateAnd(outputWaitCond, iBuilder->CreateICmpULE(segNo, consumedSegNo));
425        }
426        kernelSet.clear();
427        iBuilder->setKernel(kernel);
428        iBuilder->CreateCondBr(outputWaitCond, inputCheckBlock, outputCheckBlock);
429
430        // Check whether the input buffers have enough data for this kernel to begin
431        iBuilder->SetInsertPoint(inputCheckBlock);
432        for (const StreamSetBuffer * buf : kernel->getStreamSetInputBuffers()) {
433            kernelSet.insert(producingKernel[buf]);
434        }
435
436        Value * inputWaitCond = iBuilder->getTrue();
437        for (unsigned k : kernelSet) {
438            iBuilder->setKernel(kernels[k]);
439            producerSegNo[k] = iBuilder->acquireLogicalSegmentNo();
440            assert (producerSegNo[k]->getType() == segNo->getType());
441            inputWaitCond = iBuilder->CreateAnd(inputWaitCond, iBuilder->CreateICmpULT(segNo, producerSegNo[k]));
442        }
443        iBuilder->setKernel(kernel);
444        iBuilder->CreateCondBr(inputWaitCond, doSegmentBlock, inputCheckBlock);
445
446        // Process the segment
447        iBuilder->SetInsertPoint(doSegmentBlock);
448
449        Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
450        Value * terminated = nullptr;
451        if (kernelSet.empty()) {
452            // if this kernel has no input streams, the kernel itself must decide when it terminates.
453            terminated = iBuilder->getTerminationSignal();
454        } else {
455            // ... otherwise the kernel terminates only when it exhausts all of its input streams
456            terminated = iBuilder->getTrue();
457            for (unsigned k : kernelSet) {
458                iBuilder->setKernel(kernels[k]);
459                terminated = iBuilder->CreateAnd(terminated, iBuilder->getTerminationSignal());
460                terminated = iBuilder->CreateAnd(terminated, iBuilder->CreateICmpEQ(nextSegNo, producerSegNo[k]));
461            }
462            kernelSet.clear();
463            iBuilder->setKernel(kernel);
464        }
465
466        std::vector<Value *> args = {kernel->getInstance(), terminated};
467        args.insert(args.end(), inputs.size(), iBuilder->CreateMul(segmentItems, segNo));
468
469        iBuilder->createDoSegmentCall(args);
470        segNo->addIncoming(nextSegNo, doSegmentBlock);
471        iBuilder->releaseLogicalSegmentNo(nextSegNo);
472
473        iBuilder->CreateCondBr(terminated, exitThreadBlock, outputCheckBlock);
474
475        iBuilder->SetInsertPoint(exitThreadBlock);
476
477        iBuilder->CreatePThreadExitCall(nullVoidPtrVal);
478
479        iBuilder->CreateRetVoid();
480
481        thread_functions[id] = threadFunc;
482    }
483
484    iBuilder->restoreIP(ip);
485
486    for (unsigned i = 0; i < n; ++i) {
487        kernels[i]->setInstance(instance[i]);
488    }
489
490    for (unsigned i = 0; i < n; ++i) {
491        iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, thread_functions[i], sharedStruct);
492    }
493
494    AllocaInst * const status = iBuilder->CreateAlloca(voidPtrTy);
495    for (unsigned i = 0; i < n; ++i) {
496        Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]);
497        iBuilder->CreatePThreadJoinCall(threadId, status);
498    }
499}
500
501/** ------------------------------------------------------------------------------------------------------------- *
502 * @brief generatePipelineLoop
503 ** ------------------------------------------------------------------------------------------------------------- */
504void generatePipelineLoop(const std::unique_ptr<KernelBuilder> & iBuilder, const std::vector<Kernel *> & kernels) {
505
506    BasicBlock * entryBlock = iBuilder->GetInsertBlock();
507    Function * main = entryBlock->getParent();
508
509    // Create the basic blocks for the loop.
510    BasicBlock * pipelineLoop = BasicBlock::Create(iBuilder->getContext(), "pipelineLoop", main);
511    BasicBlock * pipelineExit = BasicBlock::Create(iBuilder->getContext(), "pipelineExit", main);
512
513    StreamSetBufferMap<Value *> producedPos;
514    StreamSetBufferMap<Value *> consumedPos;
515
516    iBuilder->CreateBr(pipelineLoop);
517    iBuilder->SetInsertPoint(pipelineLoop);
518   
519    Value * cycleCountStart = nullptr;
520    Value * cycleCountEnd = nullptr;
521    if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
522        cycleCountStart = iBuilder->CreateReadCycleCounter();
523    }
524    Value * terminated = iBuilder->getFalse();
525
526    for (Kernel * const kernel : kernels) {
527
528        iBuilder->setKernel(kernel);
529        const auto & inputs = kernel->getStreamInputs();
530        const auto & outputs = kernel->getStreamOutputs();
531
532        std::vector<Value *> args = {kernel->getInstance(), terminated};
533
534        for (unsigned i = 0; i < inputs.size(); ++i) {
535            const auto f = producedPos.find(kernel->getStreamSetInputBuffer(i));
536            if (LLVM_UNLIKELY(f == producedPos.end())) {
537                report_fatal_error(kernel->getName() + " uses stream set " + inputs[i].getName() + " prior to its definition");
538            }
539            args.push_back(f->second);
540        }
541
542        applyOutputBufferExpansions(iBuilder, kernel);
543
544        iBuilder->createDoSegmentCall(args);
545
546        if (!kernel->hasNoTerminateAttribute()) {
547            Value * terminatedSignal = iBuilder->getTerminationSignal();
548            terminated = iBuilder->CreateOr(terminated, terminatedSignal);
549        }
550        for (unsigned i = 0; i < outputs.size(); ++i) {
551            Value * const produced = iBuilder->getProducedItemCount(outputs[i].getName()); // , terminated
552            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
553            assert (producedPos.count(buf) == 0);
554            producedPos.emplace(buf, produced);
555        }
556
557        for (unsigned i = 0; i < inputs.size(); ++i) {
558            Value * const processed = iBuilder->getProcessedItemCount(inputs[i].getName());
559            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);
560            auto f = consumedPos.find(buf);
561            if (f == consumedPos.end()) {
562                consumedPos.emplace(buf, processed);
563            } else {
564                Value * lesser = iBuilder->CreateICmpULT(processed, f->second);
565                f->second = iBuilder->CreateSelect(lesser, processed, f->second);
566            }
567        }
568        if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
569            cycleCountEnd = iBuilder->CreateReadCycleCounter();
570            Value * counterPtr = iBuilder->getCycleCountPtr();
571            iBuilder->CreateStore(iBuilder->CreateAdd(iBuilder->CreateLoad(counterPtr), iBuilder->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
572            cycleCountStart = cycleCountEnd;
573        }
574
575        Value * const segNo = iBuilder->acquireLogicalSegmentNo();
576        Value * nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
577        iBuilder->releaseLogicalSegmentNo(nextSegNo);
578    }
579
580    for (const auto consumed : consumedPos) {
581        const StreamSetBuffer * const buf = consumed.first;
582        Kernel * const k = buf->getProducer();
583        const auto & outputs = k->getStreamSetOutputBuffers();
584        for (unsigned i = 0; i < outputs.size(); ++i) {
585            if (outputs[i] == buf) {
586                const auto & binding = k->getStreamOutput(i);
587                if (LLVM_UNLIKELY(binding.getRate().isDerived())) {
588                    continue;
589                }
590                iBuilder->setKernel(k);
591                iBuilder->setConsumedItemCount(binding.getName(), consumed.second);
592                break;
593            }
594        }
595    }
596
597    iBuilder->CreateCondBr(terminated, pipelineExit, pipelineLoop);
598
599    iBuilder->SetInsertPoint(pipelineExit);
600
601    if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
602        for (unsigned k = 0; k < kernels.size(); k++) {
603            auto & kernel = kernels[k];
604            iBuilder->setKernel(kernel);
605            const auto & inputs = kernel->getStreamInputs();
606            const auto & outputs = kernel->getStreamOutputs();
607            Value * items = nullptr;
608            if (inputs.empty()) {
609                items = iBuilder->getProducedItemCount(outputs[0].getName());
610            } else {
611                items = iBuilder->getProcessedItemCount(inputs[0].getName());
612            }
613            Value * fItems = iBuilder->CreateUIToFP(items, iBuilder->getDoubleTy());
614            Value * cycles = iBuilder->CreateLoad(iBuilder->getCycleCountPtr());
615            Value * fCycles = iBuilder->CreateUIToFP(cycles, iBuilder->getDoubleTy());
616            const auto formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
617            Value * stringPtr = iBuilder->CreatePointerCast(iBuilder->GetString(formatString), iBuilder->getInt8PtrTy());
618            iBuilder->CreateCall(iBuilder->GetDprintf(), {iBuilder->getInt32(2), stringPtr, fItems, fCycles, iBuilder->CreateFDiv(fCycles, fItems)});
619        }
620    }
621}
622
623void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const std::string & name, DynamicBuffer * const db, const uint64_t baseSize) {
624
625    BasicBlock * const doExpand = b->CreateBasicBlock(name + "Expand");
626    BasicBlock * const nextBlock = b->GetInsertBlock()->getNextNode();
627    doExpand->moveAfter(b->GetInsertBlock());
628    BasicBlock * const bufferReady = b->CreateBasicBlock(name + "Ready");
629    bufferReady->moveAfter(doExpand);
630    if (nextBlock) nextBlock->moveAfter(bufferReady);
631
632    Value * const handle = db->getStreamSetHandle();
633
634    Value * const produced = b->getProducedItemCount(name);
635    Value * const consumed = b->getConsumedItemCount(name);
636    Value * const required = b->CreateAdd(b->CreateSub(produced, consumed), b->getSize(2 * baseSize));
637
638    b->CreateCondBr(b->CreateICmpUGT(required, db->getCapacity(b.get(), handle)), doExpand, bufferReady);
639
640    b->SetInsertPoint(doExpand);
641    db->doubleCapacity(b.get(), handle);
642    // Ensure that capacity is sufficient by successive doubling, if necessary.
643    b->CreateCondBr(b->CreateICmpUGT(required, db->getBufferedSize(b.get(), handle)), doExpand, bufferReady);
644
645    b->SetInsertPoint(bufferReady);
646}
647
648void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * k) {
649    const auto & outputs = k->getStreamSetOutputBuffers();
650    for (unsigned i = 0; i < outputs.size(); i++) {
651        if (isa<DynamicBuffer>(outputs[i])) {
652            const auto ub = k->getUpperBound(k->getStreamOutput(i).getRate());
653            const auto baseSize = (ub.numerator() * k->getStride() + ub.denominator() - 1) / ub.denominator();
654            if (LLVM_LIKELY(baseSize > 0)) {
655                const auto & name = k->getStreamOutput(i).getName();
656                applyOutputBufferExpansions(b, name, cast<DynamicBuffer>(outputs[i]), baseSize);
657            }
658        }
659    }
660}
Note: See TracBrowser for help on using the repository browser.