source: icGREP/icgrep-devel/icgrep/kernels/pipeline.cpp @ 5363

Last change on this file since 5363 was 5363, checked in by lindanl, 2 years ago

pipeline parallel for icgrep.

File size: 24.2 KB
Line 
1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
6#include "pipeline.h"
7#include <toolchain.h>
8#include <kernels/kernel.h>
9#include <kernels/streamset.h>
10#include <llvm/IR/Module.h>
11#include <unordered_map>
12
13using namespace kernel;
14using namespace parabix;
15using namespace llvm;
16
17#include <iostream>
18
19using ProducerTable = std::vector<std::vector<std::pair<unsigned, unsigned>>>;
20
21ProducerTable createProducerTable(const std::vector<KernelBuilder *> & kernels) {
22    ProducerTable producerTable(kernels.size());
23   
24    std::vector<std::vector<bool>> userTable(kernels.size());
25   
26    // First prepare a map from streamSet output buffers to their producing kernel and output index.
27    std::unordered_map<const StreamSetBuffer *, std::pair<unsigned, unsigned>> bufferMap;
28   
29    for (unsigned k = 0; k < kernels.size(); k++) {
30        auto outputSets = kernels[k]->getStreamSetOutputBuffers();
31        for (unsigned j = 0; j < outputSets.size(); j++) {
32            userTable[k].push_back(false);
33            bufferMap.insert(std::make_pair(outputSets[j], std::make_pair(k, j)));
34        }
35    }
36    for (unsigned k = 0; k < kernels.size(); k++) {
37        auto inputSets = kernels[k]->getStreamSetInputBuffers();
38        for (unsigned i = 0; i < inputSets.size(); i++) {
39            auto f = bufferMap.find(inputSets[i]);
40            if (f == bufferMap.end()) {
41                llvm::report_fatal_error("Pipeline error: input buffer #" + std::to_string(i) + " of " + kernels[k]->getName() + ": no corresponding output buffer. ");
42            }
43            producerTable[k].push_back(f->second);
44            unsigned sourceKernel, outputIndex;
45            std::tie(sourceKernel, outputIndex) = f->second;
46            if (sourceKernel >= k) {
47                llvm::report_fatal_error("Pipeline error: input buffer #" + std::to_string(i) + " of " + kernels[k]->getName() + ": not defined before use. ");
48            }
49            //errs() << "sourceKernel: " + std::to_string(sourceKernel) + ", outputIndex: " + std::to_string(outputIndex) + ", user: " + std::to_string(k) + "\n";
50            userTable[sourceKernel][outputIndex]= true;
51           
52        }
53    }
54    /*  TODO:  define sinks for  all outputs so that the following check succeeds on
55     *  well-structured pipelines.
56    for (unsigned k = 0; k < kernels.size(); k++) {
57        auto outputSets = kernels[k]->getStreamSetOutputBuffers();
58        //errs() << "kernel: " + kernels[k]->getName() + "\n";
59        for (unsigned j = 0; j < outputSets.size(); j++) {
60            if (userTable[k][j] == false) {
61                llvm::report_fatal_error("Pipeline error: output buffer #" + std::to_string(j) + " of " + kernels[k]->getName() + ": no users. ");
62            }
63        }
64    }
65    */
66    return producerTable;
67}
68
69using ConsumerTable = std::vector<std::vector<std::vector<unsigned>>>;
70
71ConsumerTable createConsumerTable(const std::vector<KernelBuilder *> & kernels) {
72    ConsumerTable consumerTable(kernels.size());
73   
74    // First prepare a map from streamSet input buffers to their consuming kernel and input index.
75    std::unordered_map<const StreamSetBuffer *, std::vector<unsigned>> bufferMap;
76   
77    for (unsigned k = 0; k < kernels.size(); k++) {
78        auto inputSets = kernels[k]->getStreamSetInputBuffers();
79        for (unsigned j = 0; j < inputSets.size(); j++) {
80            auto f = bufferMap.find(inputSets[j]);
81            std::vector<unsigned> kernelNo;
82            kernelNo.push_back(k);
83            if (f == bufferMap.end()) {
84                bufferMap.insert(std::make_pair(inputSets[j], kernelNo));
85            }
86            else{
87                f->second.push_back(k);
88            }
89        }
90    }
91    for (unsigned k = 0; k < kernels.size(); k++) {
92        auto outputSets = kernels[k]->getStreamSetOutputBuffers();
93        for (unsigned i = 0; i < outputSets.size(); i++) {
94            auto f = bufferMap.find(outputSets[i]);
95            if (f == bufferMap.end()) {
96                llvm::report_fatal_error("Pipeline error: output buffer #" + std::to_string(i) + " of " + kernels[k]->getName() + ": not used by any kernel. ");
97            }
98            else {
99                consumerTable[k].push_back(f->second); 
100            }         
101        }
102    }
103    return consumerTable;
104}
105
106Function * generateSegmentParallelPipelineThreadFunction(std::string name, IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels, Type * sharedStructType, ProducerTable & producerTable, int id) {
107   
108    // ProducerPos[k][i] will hold the producedItemCount of the i^th output stream
109    // set of the k^th kernel.  These values will be loaded immediately after the
110    // doSegment and finalSegment calls for kernel k and later used as the
111    // producer position arguments for later doSegment/finalSegment calls.
112   
113    std::vector<std::vector<Value *>> ProducerPos;
114   
115   
116    const auto ip = iBuilder->saveIP();
117   
118    Module * m = iBuilder->getModule();
119    Type * const voidTy = iBuilder->getVoidTy();
120    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
121    PointerType * const int8PtrTy = iBuilder->getInt8PtrTy();
122
123    Function * const threadFunc = cast<Function>(m->getOrInsertFunction(name, voidTy, int8PtrTy, nullptr));
124    threadFunc->setCallingConv(CallingConv::C);
125    Function::arg_iterator args = threadFunc->arg_begin();
126
127    Value * const input = &*(args++);
128    input->setName("input");
129
130
131     // Create the basic blocks for the thread function.
132    BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc, 0);
133    BasicBlock * segmentLoop = BasicBlock::Create(iBuilder->getContext(), "segmentLoop", threadFunc, 0);
134    BasicBlock * exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc, 0);
135   
136    std::vector<BasicBlock *> segmentWait;
137    std::vector<BasicBlock *> segmentLoopBody;
138    for (unsigned i = 0; i < kernels.size(); i++) {
139        auto kname = kernels[i]->getName();
140        segmentWait.push_back(BasicBlock::Create(iBuilder->getContext(), kname + "Wait", threadFunc, 0));
141        segmentLoopBody.push_back(BasicBlock::Create(iBuilder->getContext(), kname + "Do", threadFunc, 0));
142    }
143
144    iBuilder->SetInsertPoint(entryBlock);
145   
146    Value * sharedStruct = iBuilder->CreateBitCast(input, PointerType::get(sharedStructType, 0));
147    std::vector<Value *> instancePtrs;
148    for (unsigned k = 0; k < kernels.size(); k++) {
149        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
150        instancePtrs.push_back(iBuilder->CreateLoad(ptr));
151    }
152   
153    iBuilder->CreateBr(segmentLoop);
154
155    iBuilder->SetInsertPoint(segmentLoop);
156    PHINode * segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 2, "segNo");
157    segNo->addIncoming(iBuilder->getSize(id), entryBlock);
158    const unsigned last_kernel = kernels.size() - 1;
159    Value * doFinal = ConstantInt::getNullValue(iBuilder->getInt1Ty());
160    Value * nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
161    iBuilder->CreateBr(segmentWait[0]);
162    for (unsigned k = 0; k < kernels.size(); k++) {
163        iBuilder->SetInsertPoint(segmentWait[k]);
164        unsigned waitForKernel = k;
165        if (codegen::DebugOptionIsSet(codegen::SerializeThreads)) {
166            waitForKernel = last_kernel;
167        }
168        Value * processedSegmentCount = kernels[waitForKernel]->acquireLogicalSegmentNo(instancePtrs[waitForKernel]);
169        Value * ready = iBuilder->CreateICmpEQ(segNo, processedSegmentCount);
170
171        if (kernels[k]->hasNoTerminateAttribute()) {
172            iBuilder->CreateCondBr(ready, segmentLoopBody[k], segmentWait[k]);
173        } else { // If the kernel was terminated in a previous segment then the pipeline is done.
174            BasicBlock * completionTest = BasicBlock::Create(iBuilder->getContext(), kernels[k]->getName() + "Completed", threadFunc, 0);
175            BasicBlock * exitBlock = BasicBlock::Create(iBuilder->getContext(), kernels[k]->getName() + "Exit", threadFunc, 0);
176            iBuilder->CreateCondBr(ready, completionTest, segmentWait[k]);
177            iBuilder->SetInsertPoint(completionTest);
178            Value * alreadyDone = kernels[k]->getTerminationSignal(instancePtrs[k]);
179            iBuilder->CreateCondBr(alreadyDone, exitBlock, segmentLoopBody[k]);
180            iBuilder->SetInsertPoint(exitBlock);
181            // Ensure that the next thread will also exit.
182            kernels[k]->releaseLogicalSegmentNo(instancePtrs[k], nextSegNo);
183            iBuilder->CreateBr(exitThreadBlock);
184        }
185        iBuilder->SetInsertPoint(segmentLoopBody[k]);
186        std::vector<Value *> doSegmentArgs = {instancePtrs[k], doFinal};
187        for (unsigned j = 0; j < kernels[k]->getStreamInputs().size(); j++) {
188            unsigned producerKernel, outputIndex;
189            std::tie(producerKernel, outputIndex) = producerTable[k][j];
190            doSegmentArgs.push_back(ProducerPos[producerKernel][outputIndex]);
191        }
192        kernels[k]->createDoSegmentCall(doSegmentArgs);
193        std::vector<Value *> produced;
194        for (unsigned i = 0; i < kernels[k]->getStreamOutputs().size(); i++) {
195            produced.push_back(kernels[k]->getProducedItemCount(instancePtrs[k], kernels[k]->getStreamOutputs()[i].name));
196        }
197        ProducerPos.push_back(produced);
198        if (! (kernels[k]->hasNoTerminateAttribute())) {
199            Value * terminated = kernels[k]->getTerminationSignal(instancePtrs[k]);
200            doFinal = iBuilder->CreateOr(doFinal, terminated);
201        }
202
203        kernels[k]->releaseLogicalSegmentNo(instancePtrs[k], nextSegNo);
204        if (k == last_kernel) {
205            segNo->addIncoming(iBuilder->CreateAdd(segNo, iBuilder->getSize(codegen::ThreadNum)), segmentLoopBody[last_kernel]);
206            iBuilder->CreateCondBr(doFinal, exitThreadBlock, segmentLoop);
207        } else {
208            iBuilder->CreateBr(segmentWait[k+1]);
209        }
210    }
211
212    iBuilder->SetInsertPoint(exitThreadBlock);
213    Value * nullVal = Constant::getNullValue(voidPtrTy);
214    iBuilder->CreatePThreadExitCall(nullVal);
215    iBuilder->CreateRetVoid();
216    iBuilder->restoreIP(ip);
217
218    return threadFunc;
219}
220
221// Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
222// operating over an input stream set S, a segment-parallel implementation divides the input
223// into segments and coordinates a set of T <= K threads to each process one segment at a time.   
224// Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
225// fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
226
227
228void generateSegmentParallelPipeline(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
229   
230    const unsigned threadNum = codegen::ThreadNum;
231   
232    Module * m = iBuilder->getModule();
233   
234    IntegerType * const size_ty = iBuilder->getSizeTy();
235    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
236    PointerType * const int8PtrTy = iBuilder->getInt8PtrTy();
237   
238    for (auto k : kernels) k->createInstance();
239   
240    ProducerTable producerTable = createProducerTable(kernels);
241   
242    Type * const pthreadsTy = ArrayType::get(size_ty, threadNum);
243    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
244    std::vector<Value *> pthreadsPtrs;
245    for (unsigned i = 0; i < threadNum; i++) {
246        pthreadsPtrs.push_back(iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)}));
247    }
248    Value * nullVal = Constant::getNullValue(voidPtrTy);
249    AllocaInst * const status = iBuilder->CreateAlloca(int8PtrTy);
250   
251    std::vector<Type *> structTypes;
252    for (unsigned i = 0; i < kernels.size(); i++) {
253        structTypes.push_back(kernels[i]->getInstance()->getType());
254    }
255    Type * sharedStructType = StructType::get(m->getContext(), structTypes);
256   
257    AllocaInst * sharedStruct = iBuilder->CreateAlloca(sharedStructType);
258    for (unsigned i = 0; i < kernels.size(); i++) {
259        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
260        iBuilder->CreateStore(kernels[i]->getInstance(), ptr);
261    }
262    for (unsigned i = 0; i < kernels.size(); i++) {
263        kernels[i]->releaseLogicalSegmentNo(kernels[i]->getInstance(), iBuilder->getSize(0));
264    }
265
266    std::vector<Function *> thread_functions;
267    const auto ip = iBuilder->saveIP();
268    for (unsigned i = 0; i < threadNum; i++) {
269        thread_functions.push_back(generateSegmentParallelPipelineThreadFunction("thread"+std::to_string(i), iBuilder, kernels, sharedStructType, producerTable, i));
270    }
271    iBuilder->restoreIP(ip);
272   
273    for (unsigned i = 0; i < threadNum; i++) {
274        iBuilder->CreatePThreadCreateCall(pthreadsPtrs[i], nullVal, thread_functions[i], iBuilder->CreateBitCast(sharedStruct, int8PtrTy));
275    }
276   
277    std::vector<Value *> threadIDs;
278    for (unsigned i = 0; i < threadNum; i++) { 
279        threadIDs.push_back(iBuilder->CreateLoad(pthreadsPtrs[i]));
280    }
281   
282    for (unsigned i = 0; i < threadNum; i++) { 
283        iBuilder->CreatePThreadJoinCall(threadIDs[i], status);
284    }
285   
286}
287
288Function * generateParallelPipelineThreadFunction(std::string name, IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels, Type * sharedStructType, ProducerTable & producerTable, ConsumerTable & consumerTable, int id) {
289       
290    const auto ip = iBuilder->saveIP();
291   
292    Module * m = iBuilder->getModule();
293    Type * const voidTy = iBuilder->getVoidTy();
294    IntegerType * const size_ty = iBuilder->getSizeTy();
295    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
296    PointerType * const int8PtrTy = iBuilder->getInt8PtrTy();
297    IntegerType * const int1ty = iBuilder->getInt1Ty();
298
299    Function * const threadFunc = cast<Function>(m->getOrInsertFunction(name, voidTy, int8PtrTy, nullptr));
300    threadFunc->setCallingConv(CallingConv::C);
301    Function::arg_iterator args = threadFunc->arg_begin();
302
303    Value * const input = &*(args++);
304    input->setName("input");
305
306    KernelBuilder * targetK = kernels[id];
307    Value * bufferSegments = ConstantInt::get(size_ty, codegen::BufferSegments - 1);
308    ConstantInt * segmentItems = iBuilder->getSize(codegen::SegmentSize * iBuilder->getBitBlockWidth());
309    Value * waitCondTest = nullptr;
310
311     // Create the basic blocks for the thread function.
312    BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc, 0);
313    BasicBlock * outputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "outputCheck", threadFunc, 0);
314    BasicBlock * doSegmentBlock = BasicBlock::Create(iBuilder->getContext(), "doSegment", threadFunc, 0);
315    BasicBlock * exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc, 0);
316
317    iBuilder->SetInsertPoint(entryBlock);
318   
319    Value * sharedStruct = iBuilder->CreateBitCast(input, PointerType::get(sharedStructType, 0));
320    std::vector<Value *> instancePtrs;
321    std::vector<std::vector<Value *>> ProducerPos;
322    for (unsigned k = 0; k < kernels.size(); k++) {
323        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
324        instancePtrs.push_back(iBuilder->CreateLoad(ptr));
325
326        std::vector<Value *> produced;
327        for (unsigned i = 0; i < kernels[k]->getStreamOutputs().size(); i++) {
328            produced.push_back(kernels[k]->getProducedItemCount(instancePtrs[k], kernels[k]->getStreamOutputs()[i].name));
329        }
330        ProducerPos.push_back(produced);
331    }
332
333    iBuilder->CreateBr(outputCheckBlock);
334
335    iBuilder->SetInsertPoint(outputCheckBlock);
336    PHINode * segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 2, "segNo");
337    segNo->addIncoming(iBuilder->getSize(0), entryBlock);
338    segNo->addIncoming(segNo, outputCheckBlock);
339
340    waitCondTest = ConstantInt::get(int1ty, 1);
341    for (unsigned j = 0; j < targetK->getStreamOutputs().size(); j++) {
342        std::vector<unsigned> consumerKernels = consumerTable[id][j];
343        for (unsigned k = 0; k < consumerKernels.size(); k++) {
344            Value * consumerSegNo = kernels[consumerKernels[k]]->acquireLogicalSegmentNo(instancePtrs[consumerKernels[k]]);
345            waitCondTest = iBuilder->CreateAnd(waitCondTest, iBuilder->CreateICmpULE(segNo, iBuilder->CreateAdd(consumerSegNo, bufferSegments)));
346        } 
347    }
348
349    if(targetK->getStreamInputs().size() == 0) {
350
351        iBuilder->CreateCondBr(waitCondTest, doSegmentBlock, outputCheckBlock); 
352
353        iBuilder->SetInsertPoint(doSegmentBlock);
354
355        Value * terminated = targetK->getTerminationSignal(instancePtrs[id]);
356        std::vector<Value *> doSegmentArgs = {instancePtrs[id], terminated};       
357        targetK->createDoSegmentCall(doSegmentArgs);
358        Value * nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
359        segNo->addIncoming(nextSegNo, doSegmentBlock);
360        targetK->releaseLogicalSegmentNo(instancePtrs[id], nextSegNo);
361
362        iBuilder->CreateCondBr(terminated, exitThreadBlock, outputCheckBlock);
363
364    }
365    else{
366
367        BasicBlock * inputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "inputCheck", threadFunc, 0);
368
369        iBuilder->CreateCondBr(waitCondTest, inputCheckBlock, outputCheckBlock); 
370
371        iBuilder->SetInsertPoint(inputCheckBlock); 
372       
373        waitCondTest = ConstantInt::get(int1ty, 1);
374        for (unsigned j = 0; j < targetK->getStreamInputs().size(); j++) {
375            unsigned producerKernel, outputIndex;
376            std::tie(producerKernel, outputIndex) = producerTable[id][j];
377            Value * producerSegNo = kernels[producerKernel]->acquireLogicalSegmentNo(instancePtrs[producerKernel]);
378            waitCondTest = iBuilder->CreateAnd(waitCondTest, iBuilder->CreateICmpULT(segNo, producerSegNo)); 
379        }
380
381        iBuilder->CreateCondBr(waitCondTest, doSegmentBlock, inputCheckBlock);
382
383        iBuilder->SetInsertPoint(doSegmentBlock);
384
385        Value * nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
386        Value * terminated = ConstantInt::get(int1ty, 1);
387        for (unsigned j = 0; j < targetK->getStreamInputs().size(); j++) {
388            unsigned producerKernel, outputIndex;
389            std::tie(producerKernel, outputIndex) = producerTable[id][j];
390            terminated = iBuilder->CreateAnd(terminated, kernels[producerKernel]->getTerminationSignal(instancePtrs[producerKernel]));
391            Value * producerSegNo = kernels[producerKernel]->acquireLogicalSegmentNo(instancePtrs[producerKernel]);
392            terminated = iBuilder->CreateAnd(terminated, iBuilder->CreateICmpEQ(nextSegNo, producerSegNo));
393        }
394       
395        std::vector<Value *> doSegmentArgs = {instancePtrs[id], terminated};
396        for (unsigned j = 0; j < targetK->getStreamInputs().size(); j++) {
397            unsigned producerKernel, outputIndex;
398            std::tie(producerKernel, outputIndex) = producerTable[id][j];
399            // doSegmentArgs.push_back(ProducerPos[producerKernel][outputIndex]);
400            doSegmentArgs.push_back(iBuilder->CreateMul(segmentItems, segNo));
401        }
402        targetK->createDoSegmentCall(doSegmentArgs);
403        segNo->addIncoming(nextSegNo, doSegmentBlock);
404        targetK->releaseLogicalSegmentNo(instancePtrs[id], nextSegNo);
405
406        iBuilder->CreateCondBr(terminated, exitThreadBlock, outputCheckBlock);
407    }
408
409    iBuilder->SetInsertPoint(exitThreadBlock);
410
411    Value * nullVal = Constant::getNullValue(voidPtrTy);
412    iBuilder->CreatePThreadExitCall(nullVal);
413    iBuilder->CreateRetVoid();
414    iBuilder->restoreIP(ip);
415
416    return threadFunc;
417
418}
419
420void generateParallelPipeline(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
421    const unsigned threadNum = kernels.size();
422   
423    Module * m = iBuilder->getModule();
424   
425    IntegerType * const size_ty = iBuilder->getSizeTy();
426    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
427    PointerType * const int8PtrTy = iBuilder->getInt8PtrTy();
428   
429    for (auto k : kernels) k->createInstance();
430   
431    ProducerTable producerTable = createProducerTable(kernels);
432    ConsumerTable consumerTable = createConsumerTable(kernels);
433   
434    Type * const pthreadsTy = ArrayType::get(size_ty, threadNum);
435    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
436    std::vector<Value *> pthreadsPtrs;
437    for (unsigned i = 0; i < threadNum; i++) {
438        pthreadsPtrs.push_back(iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)}));
439    }
440    Value * nullVal = Constant::getNullValue(voidPtrTy);
441    AllocaInst * const status = iBuilder->CreateAlloca(int8PtrTy);
442   
443    std::vector<Type *> structTypes;
444    for (unsigned i = 0; i < kernels.size(); i++) {
445        structTypes.push_back(kernels[i]->getInstance()->getType());
446    }
447    Type * sharedStructType = StructType::get(m->getContext(), structTypes);
448   
449    AllocaInst * sharedStruct = iBuilder->CreateAlloca(sharedStructType);
450    for (unsigned i = 0; i < kernels.size(); i++) {
451        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
452        iBuilder->CreateStore(kernels[i]->getInstance(), ptr);
453    }
454    for (unsigned i = 0; i < kernels.size(); i++) {
455        kernels[i]->releaseLogicalSegmentNo(kernels[i]->getInstance(), iBuilder->getSize(0));
456    }
457
458    std::vector<Function *> thread_functions;
459    const auto ip = iBuilder->saveIP();
460    for (unsigned i = 0; i < threadNum; i++) {
461        thread_functions.push_back(generateParallelPipelineThreadFunction("thread"+std::to_string(i), iBuilder, kernels, sharedStructType, producerTable, consumerTable, i));
462    }
463    iBuilder->restoreIP(ip);
464   
465    for (unsigned i = 0; i < threadNum; i++) {
466        iBuilder->CreatePThreadCreateCall(pthreadsPtrs[i], nullVal, thread_functions[i], iBuilder->CreateBitCast(sharedStruct, int8PtrTy));
467    }
468   
469    std::vector<Value *> threadIDs;
470    for (unsigned i = 0; i < threadNum; i++) { 
471        threadIDs.push_back(iBuilder->CreateLoad(pthreadsPtrs[i]));
472    }
473   
474    for (unsigned i = 0; i < threadNum; i++) { 
475        iBuilder->CreatePThreadJoinCall(threadIDs[i], status);
476    }
477
478}
479
480void generatePipelineLoop(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
481    for (auto k : kernels) {
482        k->createInstance();
483    }
484    BasicBlock * entryBlock = iBuilder->GetInsertBlock();
485    Function * main = entryBlock->getParent();
486
487    // Create the basic blocks for the loop.
488    BasicBlock * segmentLoop = BasicBlock::Create(iBuilder->getContext(), "segmentLoop", main, 0);
489    BasicBlock * exitBlock = BasicBlock::Create(iBuilder->getContext(), "exitBlock", main, 0);
490   
491    ProducerTable producerTable = createProducerTable(kernels);
492   
493    // ProducerPos[k][i] will hold the producedItemCount of the i^th output stream
494    // set of the k^th kernel.  These values will be loaded immediately after the
495    // doSegment and finalSegment calls for kernel k and later used as the
496    // producer position arguments for later doSegment/finalSegment calls.
497   
498    std::vector<std::vector<Value *>> ProducerPos;
499   
500    iBuilder->CreateBr(segmentLoop);
501    iBuilder->SetInsertPoint(segmentLoop);
502
503    Value * terminationFound = ConstantInt::getNullValue(iBuilder->getInt1Ty());
504    for (unsigned k = 0; k < kernels.size(); k++) {
505        Value * instance = kernels[k]->getInstance();
506        std::vector<Value *> doSegmentArgs = {instance, terminationFound};
507        for (unsigned j = 0; j < kernels[k]->getStreamInputs().size(); j++) {
508            unsigned producerKernel, outputIndex;
509            std::tie(producerKernel, outputIndex) = producerTable[k][j];
510            doSegmentArgs.push_back(ProducerPos[producerKernel][outputIndex]);
511        }
512        kernels[k]->createDoSegmentCall(doSegmentArgs);
513        if (! (kernels[k]->hasNoTerminateAttribute())) {
514            Value * terminated = kernels[k]->getTerminationSignal(instance);
515            terminationFound = iBuilder->CreateOr(terminationFound, terminated);
516        }
517        std::vector<Value *> produced;
518        for (unsigned i = 0; i < kernels[k]->getStreamOutputs().size(); i++) {
519            produced.push_back(kernels[k]->getProducedItemCount(instance, kernels[k]->getStreamOutputs()[i].name));
520        }
521        ProducerPos.push_back(produced);
522        Value * segNo = kernels[k]->acquireLogicalSegmentNo(instance);
523        kernels[k]->releaseLogicalSegmentNo(instance, iBuilder->CreateAdd(segNo, iBuilder->getSize(1)));
524    }
525    iBuilder->CreateCondBr(terminationFound, exitBlock, segmentLoop);
526    iBuilder->SetInsertPoint(exitBlock);
527}
528
529   
Note: See TracBrowser for help on using the repository browser.