source: icGREP/icgrep-devel/icgrep/kernels/pipeline.cpp @ 5385

Last change on this file since 5385 was 5370, checked in by xuedongx, 2 years ago

Add1 processing rate; pablo Count only up to EOFbit

File size: 24.2 KB
Line 
1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
6#include "pipeline.h"
7#include <toolchain.h>
8#include <kernels/kernel.h>
9#include <kernels/streamset.h>
10#include <llvm/IR/Module.h>
11#include <unordered_map>
12
13using namespace kernel;
14using namespace parabix;
15using namespace llvm;
16
17#include <iostream>
18
19using ProducerTable = std::vector<std::vector<std::pair<unsigned, unsigned>>>;
20
21ProducerTable createProducerTable(const std::vector<KernelBuilder *> & kernels) {
22    ProducerTable producerTable(kernels.size());
23   
24    std::vector<std::vector<bool>> userTable(kernels.size());
25   
26    // First prepare a map from streamSet output buffers to their producing kernel and output index.
27    std::unordered_map<const StreamSetBuffer *, std::pair<unsigned, unsigned>> bufferMap;
28   
29    for (unsigned k = 0; k < kernels.size(); k++) {
30        auto outputSets = kernels[k]->getStreamSetOutputBuffers();
31        for (unsigned j = 0; j < outputSets.size(); j++) {
32            userTable[k].push_back(false);
33            bufferMap.insert(std::make_pair(outputSets[j], std::make_pair(k, j)));
34        }
35    }
36    for (unsigned k = 0; k < kernels.size(); k++) {
37        auto inputSets = kernels[k]->getStreamSetInputBuffers();
38        for (unsigned i = 0; i < inputSets.size(); i++) {
39            auto f = bufferMap.find(inputSets[i]);
40            if (f == bufferMap.end()) {
41                llvm::report_fatal_error("Pipeline error: input buffer #" + std::to_string(i) + " of " + kernels[k]->getName() + ": no corresponding output buffer. ");
42            }
43            producerTable[k].push_back(f->second);
44            unsigned sourceKernel, outputIndex;
45            std::tie(sourceKernel, outputIndex) = f->second;
46            if (sourceKernel >= k) {
47                llvm::report_fatal_error("Pipeline error: input buffer #" + std::to_string(i) + " of " + kernels[k]->getName() + ": not defined before use. ");
48            }
49            //errs() << "sourceKernel: " + std::to_string(sourceKernel) + ", outputIndex: " + std::to_string(outputIndex) + ", user: " + std::to_string(k) + "\n";
50            userTable[sourceKernel][outputIndex]= true;
51           
52        }
53    }
54    /*  TODO:  define sinks for  all outputs so that the following check succeeds on
55     *  well-structured pipelines.
56    for (unsigned k = 0; k < kernels.size(); k++) {
57        auto outputSets = kernels[k]->getStreamSetOutputBuffers();
58        //errs() << "kernel: " + kernels[k]->getName() + "\n";
59        for (unsigned j = 0; j < outputSets.size(); j++) {
60            if (userTable[k][j] == false) {
61                llvm::report_fatal_error("Pipeline error: output buffer #" + std::to_string(j) + " of " + kernels[k]->getName() + ": no users. ");
62            }
63        }
64    }
65    */
66    return producerTable;
67}
68
69using ConsumerTable = std::vector<std::vector<std::vector<unsigned>>>;
70
71ConsumerTable createConsumerTable(const std::vector<KernelBuilder *> & kernels) {
72    ConsumerTable consumerTable(kernels.size());
73   
74    // First prepare a map from streamSet input buffers to their consuming kernel and input index.
75    std::unordered_map<const StreamSetBuffer *, std::vector<unsigned>> bufferMap;
76   
77    for (unsigned k = 0; k < kernels.size(); k++) {
78        auto inputSets = kernels[k]->getStreamSetInputBuffers();
79        for (unsigned j = 0; j < inputSets.size(); j++) {
80            auto f = bufferMap.find(inputSets[j]);
81            std::vector<unsigned> kernelNo;
82            kernelNo.push_back(k);
83            if (f == bufferMap.end()) {
84                bufferMap.insert(std::make_pair(inputSets[j], kernelNo));
85            }
86            else{
87                f->second.push_back(k);
88            }
89        }
90    }
91    for (unsigned k = 0; k < kernels.size(); k++) {
92        auto outputSets = kernels[k]->getStreamSetOutputBuffers();
93        for (unsigned i = 0; i < outputSets.size(); i++) {
94            auto f = bufferMap.find(outputSets[i]);
95            if (f == bufferMap.end()) {
96                llvm::report_fatal_error("Pipeline error: output buffer #" + std::to_string(i) + " of " + kernels[k]->getName() + ": not used by any kernel. ");
97            }
98            else {
99                consumerTable[k].push_back(f->second); 
100            }         
101        }
102    }
103    return consumerTable;
104}
105
106Function * generateSegmentParallelPipelineThreadFunction(std::string name, IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels, Type * sharedStructType, ProducerTable & producerTable, int id) {
107   
108    // ProducerPos[k][i] will hold the producedItemCount of the i^th output stream
109    // set of the k^th kernel.  These values will be loaded immediately after the
110    // doSegment and finalSegment calls for kernel k and later used as the
111    // producer position arguments for later doSegment/finalSegment calls.
112   
113    std::vector<std::vector<Value *>> ProducerPos;
114   
115   
116    const auto ip = iBuilder->saveIP();
117   
118    Module * m = iBuilder->getModule();
119    Type * const voidTy = iBuilder->getVoidTy();
120    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
121    PointerType * const int8PtrTy = iBuilder->getInt8PtrTy();
122
123    Function * const threadFunc = cast<Function>(m->getOrInsertFunction(name, voidTy, int8PtrTy, nullptr));
124    threadFunc->setCallingConv(CallingConv::C);
125    Function::arg_iterator args = threadFunc->arg_begin();
126
127    Value * const input = &*(args++);
128    input->setName("input");
129
130
131     // Create the basic blocks for the thread function.
132    BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc, 0);
133    BasicBlock * segmentLoop = BasicBlock::Create(iBuilder->getContext(), "segmentLoop", threadFunc, 0);
134    BasicBlock * exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc, 0);
135   
136    std::vector<BasicBlock *> segmentWait;
137    std::vector<BasicBlock *> segmentLoopBody;
138    for (unsigned i = 0; i < kernels.size(); i++) {
139        auto kname = kernels[i]->getName();
140        segmentWait.push_back(BasicBlock::Create(iBuilder->getContext(), kname + "Wait", threadFunc, 0));
141        segmentLoopBody.push_back(BasicBlock::Create(iBuilder->getContext(), kname + "Do", threadFunc, 0));
142    }
143
144    iBuilder->SetInsertPoint(entryBlock);
145   
146    Value * sharedStruct = iBuilder->CreateBitCast(input, PointerType::get(sharedStructType, 0));
147    std::vector<Value *> instancePtrs;
148    for (unsigned k = 0; k < kernels.size(); k++) {
149        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
150        instancePtrs.push_back(iBuilder->CreateLoad(ptr));
151    }
152   
153    iBuilder->CreateBr(segmentLoop);
154
155    iBuilder->SetInsertPoint(segmentLoop);
156    PHINode * segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 2, "segNo");
157    segNo->addIncoming(iBuilder->getSize(id), entryBlock);
158    const unsigned last_kernel = kernels.size() - 1;
159    Value * doFinal = ConstantInt::getNullValue(iBuilder->getInt1Ty());
160    Value * nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
161    iBuilder->CreateBr(segmentWait[0]);
162    for (unsigned k = 0; k < kernels.size(); k++) {
163        iBuilder->SetInsertPoint(segmentWait[k]);
164        unsigned waitForKernel = k;
165        if (codegen::DebugOptionIsSet(codegen::SerializeThreads)) {
166            waitForKernel = last_kernel;
167        }
168        Value * processedSegmentCount = kernels[waitForKernel]->acquireLogicalSegmentNo(instancePtrs[waitForKernel]);
169        Value * ready = iBuilder->CreateICmpEQ(segNo, processedSegmentCount);
170
171        if (kernels[k]->hasNoTerminateAttribute()) {
172            iBuilder->CreateCondBr(ready, segmentLoopBody[k], segmentWait[k]);
173        } else { // If the kernel was terminated in a previous segment then the pipeline is done.
174            BasicBlock * completionTest = BasicBlock::Create(iBuilder->getContext(), kernels[k]->getName() + "Completed", threadFunc, 0);
175            BasicBlock * exitBlock = BasicBlock::Create(iBuilder->getContext(), kernels[k]->getName() + "Exit", threadFunc, 0);
176            iBuilder->CreateCondBr(ready, completionTest, segmentWait[k]);
177            iBuilder->SetInsertPoint(completionTest);
178            Value * alreadyDone = kernels[k]->getTerminationSignal(instancePtrs[k]);
179            iBuilder->CreateCondBr(alreadyDone, exitBlock, segmentLoopBody[k]);
180            iBuilder->SetInsertPoint(exitBlock);
181            // Ensure that the next thread will also exit.
182            kernels[k]->releaseLogicalSegmentNo(instancePtrs[k], nextSegNo);
183            iBuilder->CreateBr(exitThreadBlock);
184        }
185        iBuilder->SetInsertPoint(segmentLoopBody[k]);
186        std::vector<Value *> doSegmentArgs = {instancePtrs[k], doFinal};
187        for (unsigned j = 0; j < kernels[k]->getStreamInputs().size(); j++) {
188            unsigned producerKernel, outputIndex;
189            std::tie(producerKernel, outputIndex) = producerTable[k][j];
190            doSegmentArgs.push_back(ProducerPos[producerKernel][outputIndex]);
191        }
192        kernels[k]->createDoSegmentCall(doSegmentArgs);
193         if (! (kernels[k]->hasNoTerminateAttribute())) {
194            Value * terminated = kernels[k]->getTerminationSignal(instancePtrs[k]);
195            doFinal = iBuilder->CreateOr(doFinal, terminated);
196        }
197       std::vector<Value *> produced;
198        for (unsigned i = 0; i < kernels[k]->getStreamOutputs().size(); i++) {
199            produced.push_back(kernels[k]->getProducedItemCount(instancePtrs[k], kernels[k]->getStreamOutputs()[i].name, doFinal));
200        }
201        ProducerPos.push_back(produced);
202
203        kernels[k]->releaseLogicalSegmentNo(instancePtrs[k], nextSegNo);
204        if (k == last_kernel) {
205            segNo->addIncoming(iBuilder->CreateAdd(segNo, iBuilder->getSize(codegen::ThreadNum)), segmentLoopBody[last_kernel]);
206            iBuilder->CreateCondBr(doFinal, exitThreadBlock, segmentLoop);
207        } else {
208            iBuilder->CreateBr(segmentWait[k+1]);
209        }
210    }
211
212    iBuilder->SetInsertPoint(exitThreadBlock);
213    Value * nullVal = Constant::getNullValue(voidPtrTy);
214    iBuilder->CreatePThreadExitCall(nullVal);
215    iBuilder->CreateRetVoid();
216    iBuilder->restoreIP(ip);
217
218    return threadFunc;
219}
220
221// Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
222// operating over an input stream set S, a segment-parallel implementation divides the input
223// into segments and coordinates a set of T <= K threads to each process one segment at a time.   
224// Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
225// fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
226
227
228void generateSegmentParallelPipeline(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
229       
230    Module * m = iBuilder->getModule();
231   
232    IntegerType * const size_ty = iBuilder->getSizeTy();
233    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
234    PointerType * const int8PtrTy = iBuilder->getInt8PtrTy();
235   
236    for (auto k : kernels) k->createInstance();
237   
238    ProducerTable producerTable = createProducerTable(kernels);
239   
240    Type * const pthreadsTy = ArrayType::get(size_ty, codegen::ThreadNum);
241    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
242    std::vector<Value *> pthreadsPtrs;
243    for (int i = 0; i < codegen::ThreadNum; i++) {
244        pthreadsPtrs.push_back(iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)}));
245    }
246    Value * nullVal = Constant::getNullValue(voidPtrTy);
247    AllocaInst * const status = iBuilder->CreateAlloca(int8PtrTy);
248   
249    std::vector<Type *> structTypes;
250    for (unsigned i = 0; i < kernels.size(); i++) {
251        structTypes.push_back(kernels[i]->getInstance()->getType());
252    }
253    Type * sharedStructType = StructType::get(m->getContext(), structTypes);
254   
255    AllocaInst * sharedStruct = iBuilder->CreateAlloca(sharedStructType);
256    for (unsigned i = 0; i < kernels.size(); i++) {
257        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
258        iBuilder->CreateStore(kernels[i]->getInstance(), ptr);
259    }
260    for (unsigned i = 0; i < kernels.size(); i++) {
261        kernels[i]->releaseLogicalSegmentNo(kernels[i]->getInstance(), iBuilder->getSize(0));
262    }
263
264    std::vector<Function *> thread_functions;
265    const auto ip = iBuilder->saveIP();
266    for (int i = 0; i < codegen::ThreadNum; i++) {
267        thread_functions.push_back(generateSegmentParallelPipelineThreadFunction("thread"+std::to_string(i), iBuilder, kernels, sharedStructType, producerTable, i));
268    }
269    iBuilder->restoreIP(ip);
270   
271    for (int i = 0; i < codegen::ThreadNum; i++) {
272        iBuilder->CreatePThreadCreateCall(pthreadsPtrs[i], nullVal, thread_functions[i], iBuilder->CreateBitCast(sharedStruct, int8PtrTy));
273    }
274   
275    std::vector<Value *> threadIDs;
276    for (int i = 0; i < codegen::ThreadNum; i++) {
277        threadIDs.push_back(iBuilder->CreateLoad(pthreadsPtrs[i]));
278    }
279   
280    for (int i = 0; i < codegen::ThreadNum; i++) {
281        iBuilder->CreatePThreadJoinCall(threadIDs[i], status);
282    }
283   
284}
285
286Function * generateParallelPipelineThreadFunction(std::string name, IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels, Type * sharedStructType, ProducerTable & producerTable, ConsumerTable & consumerTable, int id) {
287       
288    const auto ip = iBuilder->saveIP();
289   
290    Module * m = iBuilder->getModule();
291    Type * const voidTy = iBuilder->getVoidTy();
292    IntegerType * const size_ty = iBuilder->getSizeTy();
293    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
294    PointerType * const int8PtrTy = iBuilder->getInt8PtrTy();
295    IntegerType * const int1ty = iBuilder->getInt1Ty();
296
297    Function * const threadFunc = cast<Function>(m->getOrInsertFunction(name, voidTy, int8PtrTy, nullptr));
298    threadFunc->setCallingConv(CallingConv::C);
299    Function::arg_iterator args = threadFunc->arg_begin();
300
301    Value * const input = &*(args++);
302    input->setName("input");
303
304    KernelBuilder * targetK = kernels[id];
305    Value * bufferSegments = ConstantInt::get(size_ty, codegen::BufferSegments - 1);
306    ConstantInt * segmentItems = iBuilder->getSize(codegen::SegmentSize * iBuilder->getBitBlockWidth());
307    Value * waitCondTest = nullptr;
308
309     // Create the basic blocks for the thread function.
310    BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc, 0);
311    BasicBlock * outputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "outputCheck", threadFunc, 0);
312    BasicBlock * doSegmentBlock = BasicBlock::Create(iBuilder->getContext(), "doSegment", threadFunc, 0);
313    BasicBlock * exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc, 0);
314
315    iBuilder->SetInsertPoint(entryBlock);
316   
317    Value * sharedStruct = iBuilder->CreateBitCast(input, PointerType::get(sharedStructType, 0));
318    std::vector<Value *> instancePtrs;
319    std::vector<std::vector<Value *>> ProducerPos;
320    for (unsigned k = 0; k < kernels.size(); k++) {
321        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
322        instancePtrs.push_back(iBuilder->CreateLoad(ptr));
323
324        std::vector<Value *> produced;
325        for (unsigned i = 0; i < kernels[k]->getStreamOutputs().size(); i++) {
326            produced.push_back(kernels[k]->getProducedItemCount(instancePtrs[k], kernels[k]->getStreamOutputs()[i].name));
327        }
328        ProducerPos.push_back(produced);
329    }
330
331    iBuilder->CreateBr(outputCheckBlock);
332
333    iBuilder->SetInsertPoint(outputCheckBlock);
334    PHINode * segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 2, "segNo");
335    segNo->addIncoming(iBuilder->getSize(0), entryBlock);
336    segNo->addIncoming(segNo, outputCheckBlock);
337
338    waitCondTest = ConstantInt::get(int1ty, 1);
339    for (unsigned j = 0; j < targetK->getStreamOutputs().size(); j++) {
340        std::vector<unsigned> consumerKernels = consumerTable[id][j];
341        for (unsigned k = 0; k < consumerKernels.size(); k++) {
342            Value * consumerSegNo = kernels[consumerKernels[k]]->acquireLogicalSegmentNo(instancePtrs[consumerKernels[k]]);
343            waitCondTest = iBuilder->CreateAnd(waitCondTest, iBuilder->CreateICmpULE(segNo, iBuilder->CreateAdd(consumerSegNo, bufferSegments)));
344        } 
345    }
346
347    if(targetK->getStreamInputs().size() == 0) {
348
349        iBuilder->CreateCondBr(waitCondTest, doSegmentBlock, outputCheckBlock); 
350
351        iBuilder->SetInsertPoint(doSegmentBlock);
352
353        Value * terminated = targetK->getTerminationSignal(instancePtrs[id]);
354        std::vector<Value *> doSegmentArgs = {instancePtrs[id], terminated};       
355        targetK->createDoSegmentCall(doSegmentArgs);
356        Value * nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
357        segNo->addIncoming(nextSegNo, doSegmentBlock);
358        targetK->releaseLogicalSegmentNo(instancePtrs[id], nextSegNo);
359
360        iBuilder->CreateCondBr(terminated, exitThreadBlock, outputCheckBlock);
361
362    }
363    else{
364
365        BasicBlock * inputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "inputCheck", threadFunc, 0);
366
367        iBuilder->CreateCondBr(waitCondTest, inputCheckBlock, outputCheckBlock); 
368
369        iBuilder->SetInsertPoint(inputCheckBlock); 
370       
371        waitCondTest = ConstantInt::get(int1ty, 1);
372        for (unsigned j = 0; j < targetK->getStreamInputs().size(); j++) {
373            unsigned producerKernel, outputIndex;
374            std::tie(producerKernel, outputIndex) = producerTable[id][j];
375            Value * producerSegNo = kernels[producerKernel]->acquireLogicalSegmentNo(instancePtrs[producerKernel]);
376            waitCondTest = iBuilder->CreateAnd(waitCondTest, iBuilder->CreateICmpULT(segNo, producerSegNo)); 
377        }
378
379        iBuilder->CreateCondBr(waitCondTest, doSegmentBlock, inputCheckBlock);
380
381        iBuilder->SetInsertPoint(doSegmentBlock);
382
383        Value * nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
384        Value * terminated = ConstantInt::get(int1ty, 1);
385        for (unsigned j = 0; j < targetK->getStreamInputs().size(); j++) {
386            unsigned producerKernel, outputIndex;
387            std::tie(producerKernel, outputIndex) = producerTable[id][j];
388            terminated = iBuilder->CreateAnd(terminated, kernels[producerKernel]->getTerminationSignal(instancePtrs[producerKernel]));
389            Value * producerSegNo = kernels[producerKernel]->acquireLogicalSegmentNo(instancePtrs[producerKernel]);
390            terminated = iBuilder->CreateAnd(terminated, iBuilder->CreateICmpEQ(nextSegNo, producerSegNo));
391        }
392       
393        std::vector<Value *> doSegmentArgs = {instancePtrs[id], terminated};
394        for (unsigned j = 0; j < targetK->getStreamInputs().size(); j++) {
395            unsigned producerKernel, outputIndex;
396            std::tie(producerKernel, outputIndex) = producerTable[id][j];
397            // doSegmentArgs.push_back(ProducerPos[producerKernel][outputIndex]);
398            doSegmentArgs.push_back(iBuilder->CreateMul(segmentItems, segNo));
399        }
400        targetK->createDoSegmentCall(doSegmentArgs);
401        segNo->addIncoming(nextSegNo, doSegmentBlock);
402        targetK->releaseLogicalSegmentNo(instancePtrs[id], nextSegNo);
403
404        iBuilder->CreateCondBr(terminated, exitThreadBlock, outputCheckBlock);
405    }
406
407    iBuilder->SetInsertPoint(exitThreadBlock);
408
409    Value * nullVal = Constant::getNullValue(voidPtrTy);
410    iBuilder->CreatePThreadExitCall(nullVal);
411    iBuilder->CreateRetVoid();
412    iBuilder->restoreIP(ip);
413
414    return threadFunc;
415
416}
417
418void generateParallelPipeline(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
419    const unsigned threadNum = kernels.size();
420   
421    Module * m = iBuilder->getModule();
422   
423    IntegerType * const size_ty = iBuilder->getSizeTy();
424    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
425    PointerType * const int8PtrTy = iBuilder->getInt8PtrTy();
426   
427    for (auto k : kernels) k->createInstance();
428   
429    ProducerTable producerTable = createProducerTable(kernels);
430    ConsumerTable consumerTable = createConsumerTable(kernels);
431   
432    Type * const pthreadsTy = ArrayType::get(size_ty, threadNum);
433    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
434    std::vector<Value *> pthreadsPtrs;
435    for (unsigned i = 0; i < threadNum; i++) {
436        pthreadsPtrs.push_back(iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)}));
437    }
438    Value * nullVal = Constant::getNullValue(voidPtrTy);
439    AllocaInst * const status = iBuilder->CreateAlloca(int8PtrTy);
440   
441    std::vector<Type *> structTypes;
442    for (unsigned i = 0; i < kernels.size(); i++) {
443        structTypes.push_back(kernels[i]->getInstance()->getType());
444    }
445    Type * sharedStructType = StructType::get(m->getContext(), structTypes);
446   
447    AllocaInst * sharedStruct = iBuilder->CreateAlloca(sharedStructType);
448    for (unsigned i = 0; i < kernels.size(); i++) {
449        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
450        iBuilder->CreateStore(kernels[i]->getInstance(), ptr);
451    }
452    for (unsigned i = 0; i < kernels.size(); i++) {
453        kernels[i]->releaseLogicalSegmentNo(kernels[i]->getInstance(), iBuilder->getSize(0));
454    }
455
456    std::vector<Function *> thread_functions;
457    const auto ip = iBuilder->saveIP();
458    for (unsigned i = 0; i < threadNum; i++) {
459        thread_functions.push_back(generateParallelPipelineThreadFunction("thread"+std::to_string(i), iBuilder, kernels, sharedStructType, producerTable, consumerTable, i));
460    }
461    iBuilder->restoreIP(ip);
462   
463    for (unsigned i = 0; i < threadNum; i++) {
464        iBuilder->CreatePThreadCreateCall(pthreadsPtrs[i], nullVal, thread_functions[i], iBuilder->CreateBitCast(sharedStruct, int8PtrTy));
465    }
466   
467    std::vector<Value *> threadIDs;
468    for (unsigned i = 0; i < threadNum; i++) { 
469        threadIDs.push_back(iBuilder->CreateLoad(pthreadsPtrs[i]));
470    }
471   
472    for (unsigned i = 0; i < threadNum; i++) { 
473        iBuilder->CreatePThreadJoinCall(threadIDs[i], status);
474    }
475
476}
477
478void generatePipelineLoop(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
479    for (auto k : kernels) {
480        k->createInstance();
481    }
482    BasicBlock * entryBlock = iBuilder->GetInsertBlock();
483    Function * main = entryBlock->getParent();
484
485    // Create the basic blocks for the loop.
486    BasicBlock * segmentLoop = BasicBlock::Create(iBuilder->getContext(), "segmentLoop", main, 0);
487    BasicBlock * exitBlock = BasicBlock::Create(iBuilder->getContext(), "exitBlock", main, 0);
488   
489    ProducerTable producerTable = createProducerTable(kernels);
490   
491    // ProducerPos[k][i] will hold the producedItemCount of the i^th output stream
492    // set of the k^th kernel.  These values will be loaded immediately after the
493    // doSegment and finalSegment calls for kernel k and later used as the
494    // producer position arguments for later doSegment/finalSegment calls.
495   
496    std::vector<std::vector<Value *>> ProducerPos;
497   
498    iBuilder->CreateBr(segmentLoop);
499    iBuilder->SetInsertPoint(segmentLoop);
500
501    Value * terminationFound = ConstantInt::getNullValue(iBuilder->getInt1Ty());
502    for (unsigned k = 0; k < kernels.size(); k++) {
503        Value * instance = kernels[k]->getInstance();
504        std::vector<Value *> doSegmentArgs = {instance, terminationFound};
505        for (unsigned j = 0; j < kernels[k]->getStreamInputs().size(); j++) {
506            unsigned producerKernel, outputIndex;
507            std::tie(producerKernel, outputIndex) = producerTable[k][j];
508            doSegmentArgs.push_back(ProducerPos[producerKernel][outputIndex]);
509        }
510        kernels[k]->createDoSegmentCall(doSegmentArgs);
511        if (! (kernels[k]->hasNoTerminateAttribute())) {
512            Value * terminated = kernels[k]->getTerminationSignal(instance);
513            terminationFound = iBuilder->CreateOr(terminationFound, terminated);
514        }
515        std::vector<Value *> produced;
516        for (unsigned i = 0; i < kernels[k]->getStreamOutputs().size(); i++) {
517            produced.push_back(kernels[k]->getProducedItemCount(instance, kernels[k]->getStreamOutputs()[i].name, terminationFound));
518        }
519        ProducerPos.push_back(produced);
520        Value * segNo = kernels[k]->acquireLogicalSegmentNo(instance);
521        kernels[k]->releaseLogicalSegmentNo(instance, iBuilder->CreateAdd(segNo, iBuilder->getSize(1)));
522    }
523    iBuilder->CreateCondBr(terminationFound, exitBlock, segmentLoop);
524    iBuilder->SetInsertPoint(exitBlock);
525}
526
527   
Note: See TracBrowser for help on using the repository browser.