source: icGREP/icgrep-devel/icgrep/kernels/pipeline.cpp @ 5267

Last change on this file since 5267 was 5267, checked in by nmedfort, 19 months ago

Code clean-up. Removed Pablo Call, SetIthBit? and Prototype.

File size: 14.5 KB
Line 
1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
6#include "pipeline.h"
7#include <toolchain.h>
8#include <kernels/kernel.h>
9#include <llvm/IR/Module.h>
10#include <unordered_map>
11
12using namespace kernel;
13using namespace parabix;
14using namespace llvm;
15
16using ProducerTable = std::vector<std::vector<std::pair<unsigned, unsigned>>>;
17
18ProducerTable createProducerTable(const std::vector<KernelBuilder *> & kernels) {
19    ProducerTable producerTable(kernels.size());
20   
21    std::vector<std::vector<bool>> userTable(kernels.size());
22   
23    // First prepare a map from streamSet output buffers to their producing kernel and output index.
24    std::unordered_map<const StreamSetBuffer *, std::pair<unsigned, unsigned>> bufferMap;
25   
26    for (unsigned k = 0; k < kernels.size(); k++) {
27        auto outputSets = kernels[k]->getStreamSetOutputBuffers();
28        for (unsigned j = 0; j < outputSets.size(); j++) {
29            userTable[k].push_back(false);
30            bufferMap.insert(std::make_pair(outputSets[j], std::make_pair(k, j)));
31        }
32    }
33    for (unsigned k = 0; k < kernels.size(); k++) {
34        auto inputSets = kernels[k]->getStreamSetInputBuffers();
35        for (unsigned i = 0; i < inputSets.size(); i++) {
36            auto f = bufferMap.find(inputSets[i]);
37            if (f == bufferMap.end()) {
38                llvm::report_fatal_error("Pipeline error: input buffer #" + std::to_string(i) + " of " + kernels[k]->getName() + ": no corresponding output buffer. ");
39            }
40            producerTable[k].push_back(f->second);
41            unsigned sourceKernel, outputIndex;
42            std::tie(sourceKernel, outputIndex) = f->second;
43            if (sourceKernel >= k) {
44                llvm::report_fatal_error("Pipeline error: input buffer #" + std::to_string(i) + " of " + kernels[k]->getName() + ": not defined before use. ");
45            }
46            //errs() << "sourceKernel: " + std::to_string(sourceKernel) + ", outputIndex: " + std::to_string(outputIndex) + ", user: " + std::to_string(k) + "\n";
47            userTable[sourceKernel][outputIndex]= true;
48           
49        }
50    }
51    /*
52    for (unsigned k = 0; k < kernels.size(); k++) {
53        auto outputSets = kernels[k]->getStreamSetOutputBuffers();
54        //errs() << "kernel: " + kernels[k]->getName() + "\n";
55        for (unsigned j = 0; j < outputSets.size(); j++) {
56            if (userTable[k][j] == false) {
57                llvm::report_fatal_error("Pipeline error: output buffer #" + std::to_string(j) + " of " + kernels[k]->getName() + ": no users. ");
58            }
59        }
60    }
61    */
62    return producerTable;
63}
64
65
66Function * generateSegmentParallelPipelineThreadFunction(std::string name, IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels, Type * sharedStructType, ProducerTable & producerTable, int id) {
67   
68    // ProducerPos[k][i] will hold the producedItemCount of the i^th output stream
69    // set of the k^th kernel.  These values will be loaded immediately after the
70    // doSegment and finalSegment calls for kernel k and later used as the
71    // producer position arguments for later doSegment/finalSegment calls.
72   
73    std::vector<std::vector<Value *>> ProducerPos;
74   
75   
76    const auto ip = iBuilder->saveIP();
77   
78    Module * m = iBuilder->getModule();
79    Type * const size_ty = iBuilder->getSizeTy();
80    Type * const voidTy = iBuilder->getVoidTy();
81    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
82    PointerType * const int8PtrTy = iBuilder->getInt8PtrTy();
83
84    Function * const threadFunc = cast<Function>(m->getOrInsertFunction(name, voidTy, int8PtrTy, nullptr));
85    threadFunc->setCallingConv(CallingConv::C);
86    Function::arg_iterator args = threadFunc->arg_begin();
87
88    Value * const input = &*(args++);
89    input->setName("input");
90
91    unsigned threadNum = codegen::ThreadNum;
92
93     // Create the basic blocks for the thread function.
94    BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc, 0);
95    BasicBlock * segmentLoop = BasicBlock::Create(iBuilder->getContext(), "segmentLoop", threadFunc, 0);
96    BasicBlock * exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc, 0);
97   
98    std::vector<BasicBlock *> segmentWait;
99    std::vector<BasicBlock *> segmentLoopBody;
100    for (unsigned i = 0; i < kernels.size(); i++) {
101        std::string kname = kernels[i]->getName();
102        segmentWait.push_back(BasicBlock::Create(iBuilder->getContext(), kname + "Wait", threadFunc, 0));
103        segmentLoopBody.push_back(BasicBlock::Create(iBuilder->getContext(), "do_" + kname, threadFunc, 0));
104    }
105
106    iBuilder->SetInsertPoint(entryBlock);
107    Value * sharedStruct = iBuilder->CreateBitCast(input, PointerType::get(sharedStructType, 0));
108    Constant * myThreadId = ConstantInt::get(size_ty, id);
109    std::vector<Value *> instancePtrs;
110    for (unsigned i = 0; i < kernels.size(); i++) {
111        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
112        instancePtrs.push_back(iBuilder->CreateLoad(ptr));
113    }
114   
115    iBuilder->CreateBr(segmentLoop);
116
117    iBuilder->SetInsertPoint(segmentLoop);
118    PHINode * segNo = iBuilder->CreatePHI(size_ty, 2, "segNo");
119    segNo->addIncoming(myThreadId, entryBlock);
120    Value * nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
121    unsigned last_kernel = kernels.size() - 1;
122    Value * alreadyDone = kernels[last_kernel]->getTerminationSignal(instancePtrs[last_kernel]);
123    iBuilder->CreateCondBr(alreadyDone, exitThreadBlock, segmentWait[0]);
124   
125    Value * doFinal = ConstantInt::getNullValue(iBuilder->getInt1Ty());
126
127    for (unsigned k = 0; k < kernels.size(); k++) {
128        iBuilder->SetInsertPoint(segmentWait[k]);
129        Value * processedSegmentCount = kernels[k]->acquireLogicalSegmentNo(instancePtrs[k]);
130        Value * cond = iBuilder->CreateICmpEQ(segNo, processedSegmentCount);
131        iBuilder->CreateCondBr(cond, segmentLoopBody[k], segmentWait[k]);
132       
133        iBuilder->SetInsertPoint(segmentLoopBody[k]);
134       
135        //iBuilder->CallPrintInt(kernels[k]->getName() + " segment #", segNo);
136        if (k == last_kernel) {
137            segNo->addIncoming(iBuilder->CreateAdd(segNo, ConstantInt::get(size_ty, threadNum)), segmentLoopBody[last_kernel]);
138        }
139        std::vector<Value *> doSegmentArgs = {instancePtrs[k], doFinal};
140        for (unsigned j = 0; j < kernels[k]->getStreamInputs().size(); j++) {
141            unsigned producerKernel, outputIndex;
142            std::tie(producerKernel, outputIndex) = producerTable[k][j];
143            doSegmentArgs.push_back(ProducerPos[producerKernel][outputIndex]);
144            //iBuilder->CallPrintInt(kernels[k]->getName() + " producerPos[" + std::to_string(j) + "] ", doSegmentArgs.back());
145
146        }
147        kernels[k]->createDoSegmentCall(doSegmentArgs);
148        std::vector<Value *> produced;
149        for (unsigned i = 0; i < kernels[k]->getStreamOutputs().size(); i++) {
150            produced.push_back(kernels[k]->getProducedItemCount(instancePtrs[k], kernels[k]->getStreamOutputs()[i].name));
151        }
152        ProducerPos.push_back(produced);
153        if (! (kernels[k]->hasNoTerminateAttribute())) {
154            Value * terminated = kernels[k]->getTerminationSignal(instancePtrs[k]);
155            doFinal = iBuilder->CreateOr(doFinal, terminated);
156        }
157        kernels[k]->releaseLogicalSegmentNo(instancePtrs[k], nextSegNo);
158        if (k == last_kernel) break;
159        iBuilder->CreateBr(segmentWait[k+1]);
160    }
161    iBuilder->CreateCondBr(doFinal, exitThreadBlock, segmentLoop);
162
163    iBuilder->SetInsertPoint(exitThreadBlock);
164    Value * nullVal = Constant::getNullValue(voidPtrTy);
165    iBuilder->CreatePThreadExitCall(nullVal);
166    iBuilder->CreateRetVoid();
167    iBuilder->restoreIP(ip);
168
169    return threadFunc;
170}
171
172// Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
173// operating over an input stream set S, a segment-parallel implementation divides the input
174// into segments and coordinates a set of T <= K threads to each process one segment at a time.   
175// Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
176// fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
177
178
179void generateSegmentParallelPipeline(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
180   
181    unsigned threadNum = codegen::ThreadNum;
182   
183    Module * m = iBuilder->getModule();
184   
185    IntegerType * const size_ty = iBuilder->getSizeTy();
186    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
187    PointerType * const int8PtrTy = iBuilder->getInt8PtrTy();
188   
189    for (auto k : kernels) k->createInstance();
190   
191    ProducerTable producerTable = createProducerTable(kernels);
192   
193    Type * const pthreadsTy = ArrayType::get(size_ty, threadNum);
194    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
195    std::vector<Value *> pthreadsPtrs;
196    for (unsigned i = 0; i < threadNum; i++) {
197        pthreadsPtrs.push_back(iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)}));
198    }
199    Value * nullVal = Constant::getNullValue(voidPtrTy);
200    AllocaInst * const status = iBuilder->CreateAlloca(int8PtrTy);
201   
202    std::vector<Type *> structTypes;
203    for (unsigned i = 0; i < kernels.size(); i++) {
204        structTypes.push_back(kernels[i]->getInstance()->getType());
205    }
206    Type * sharedStructType = StructType::get(m->getContext(), structTypes);
207   
208    AllocaInst * sharedStruct = iBuilder->CreateAlloca(sharedStructType);
209    for (unsigned i = 0; i < kernels.size(); i++) {
210        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
211        iBuilder->CreateStore(kernels[i]->getInstance(), ptr);
212    }
213   
214    std::vector<Function *> thread_functions;
215    const auto ip = iBuilder->saveIP();
216    for (unsigned i = 0; i < threadNum; i++) {
217        thread_functions.push_back(generateSegmentParallelPipelineThreadFunction("thread"+std::to_string(i), iBuilder, kernels, sharedStructType, producerTable, i));
218    }
219    iBuilder->restoreIP(ip);
220   
221    for (unsigned i = 0; i < threadNum; i++) {
222        iBuilder->CreatePThreadCreateCall(pthreadsPtrs[i], nullVal, thread_functions[i], iBuilder->CreateBitCast(sharedStruct, int8PtrTy));
223    }
224   
225    std::vector<Value *> threadIDs;
226    for (unsigned i = 0; i < threadNum; i++) { 
227        threadIDs.push_back(iBuilder->CreateLoad(pthreadsPtrs[i]));
228    }
229   
230    for (unsigned i = 0; i < threadNum; i++) { 
231        iBuilder->CreatePThreadJoinCall(threadIDs[i], status);
232    }
233   
234}
235
236void generatePipelineParallel(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
237   
238    IntegerType * pthreadTy = iBuilder->getSizeTy();
239    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
240    PointerType * const int8PtrTy = iBuilder->getInt8PtrTy();
241   
242    ArrayType * const pthreadsTy = ArrayType::get(pthreadTy, kernels.size());
243   
244    for (auto k : kernels) k->createInstance();
245   
246    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
247    std::vector<Value *> pthreadsPtrs;
248    for (unsigned i = 0; i < kernels.size(); i++) {
249        pthreadsPtrs.push_back(iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)}));
250    }
251    Value * nullVal = Constant::getNullValue(voidPtrTy);
252    AllocaInst * const status = iBuilder->CreateAlloca(int8PtrTy);
253   
254    std::vector<Function *> kernel_functions;
255    const auto ip = iBuilder->saveIP();
256    for (unsigned i = 0; i < kernels.size(); i++) {
257        kernel_functions.push_back(kernels[i]->generateThreadFunction("k_"+std::to_string(i)));
258    }
259    iBuilder->restoreIP(ip);
260   
261    for (unsigned i = 0; i < kernels.size(); i++) {
262        iBuilder->CreatePThreadCreateCall(pthreadsPtrs[i], nullVal, kernel_functions[i], iBuilder->CreateBitCast(kernels[i]->getInstance(), int8PtrTy));
263    }
264   
265    std::vector<Value *> threadIDs;
266    for (unsigned i = 0; i < kernels.size(); i++) { 
267        threadIDs.push_back(iBuilder->CreateLoad(pthreadsPtrs[i]));
268    }
269   
270    for (unsigned i = 0; i < kernels.size(); i++) { 
271        iBuilder->CreatePThreadJoinCall(threadIDs[i], status);
272    }
273}
274
275
276void generatePipelineLoop(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
277    for (auto k : kernels) k->createInstance();
278   
279    BasicBlock * entryBlock = iBuilder->GetInsertBlock();
280    Function * main = entryBlock->getParent();
281   
282    // Create the basic blocks for the loop.
283    BasicBlock * segmentLoop = BasicBlock::Create(iBuilder->getContext(), "segmentLoop", main, 0);
284    BasicBlock * exitBlock = BasicBlock::Create(iBuilder->getContext(), "exitBlock", main, 0);
285   
286    ProducerTable producerTable = createProducerTable(kernels);
287   
288    // ProducerPos[k][i] will hold the producedItemCount of the i^th output stream
289    // set of the k^th kernel.  These values will be loaded immediately after the
290    // doSegment and finalSegment calls for kernel k and later used as the
291    // producer position arguments for later doSegment/finalSegment calls.
292   
293    std::vector<std::vector<Value *>> ProducerPos;
294   
295    iBuilder->CreateBr(segmentLoop);
296    iBuilder->SetInsertPoint(segmentLoop);
297
298    Value * terminationFound = ConstantInt::getNullValue(iBuilder->getInt1Ty());
299    for (unsigned k = 0; k < kernels.size(); k++) {
300        Value * instance = kernels[k]->getInstance();
301        std::vector<Value *> doSegmentArgs = {instance, terminationFound};
302        for (unsigned j = 0; j < kernels[k]->getStreamInputs().size(); j++) {
303            unsigned producerKernel, outputIndex;
304            std::tie(producerKernel, outputIndex) = producerTable[k][j];
305            doSegmentArgs.push_back(ProducerPos[producerKernel][outputIndex]);
306        }
307        kernels[k]->createDoSegmentCall(doSegmentArgs);
308        if (! (kernels[k]->hasNoTerminateAttribute())) {
309            Value * terminated = kernels[k]->getTerminationSignal(instance);
310            terminationFound = iBuilder->CreateOr(terminationFound, terminated);
311        }
312        std::vector<Value *> produced;
313        for (unsigned i = 0; i < kernels[k]->getStreamOutputs().size(); i++) {
314            produced.push_back(kernels[k]->getProducedItemCount(instance, kernels[k]->getStreamOutputs()[i].name));
315        }
316        ProducerPos.push_back(produced);
317        Value * segNo = kernels[k]->acquireLogicalSegmentNo(instance);
318        kernels[k]->releaseLogicalSegmentNo(instance, iBuilder->CreateAdd(segNo, iBuilder->getSize(1)));
319    }
320    iBuilder->CreateCondBr(terminationFound, exitBlock, segmentLoop);
321    iBuilder->SetInsertPoint(exitBlock);
322}
323
324   
Note: See TracBrowser for help on using the repository browser.