source: icGREP/icgrep-devel/icgrep/kernels/pipeline.cpp @ 5263

Last change on this file since 5263 was 5263, checked in by cameron, 9 months ago

New doSegment partial progress

File size: 14.4 KB
Line 
1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
6#include "pipeline.h"
7#include <toolchain.h>
8#include <IR_Gen/idisa_builder.h>
9#include <kernels/interface.h>
10#include <kernels/kernel.h>
11#include <iostream>
12#include <unordered_map>
13
14using namespace kernel;
15using namespace parabix;
16using namespace llvm;
17
18using ProducerTable = std::vector<std::vector<std::pair<unsigned, unsigned>>>;
19
20ProducerTable createProducerTable(const std::vector<KernelBuilder *> & kernels) {
21    ProducerTable producerTable;
22    producerTable.reserve(kernels.size());
23   
24    std::vector<std::vector<bool>> userTable;
25    userTable.reserve(kernels.size());
26   
27    // First prepare a map from streamSet output buffers to their producing kernel and output index.
28    std::unordered_map<const StreamSetBuffer *, std::pair<unsigned, unsigned>> bufferMap;
29   
30    for (unsigned k = 0; k < kernels.size(); k++) {
31        auto outputSets = kernels[k]->getStreamSetOutputBuffers();
32        for (unsigned j = 0; j < outputSets.size(); j++) {
33            userTable[k].push_back(false);
34            bufferMap.insert(std::make_pair(outputSets[j], std::make_pair(k, j)));
35        }
36    }
37    for (unsigned k = 0; k < kernels.size(); k++) {
38        auto inputSets = kernels[k]->getStreamSetInputBuffers();
39        for (unsigned i = 0; i < inputSets.size(); i++) {
40            auto f = bufferMap.find(inputSets[i]);
41            if (f == bufferMap.end()) {
42                llvm::report_fatal_error("Pipeline error: input buffer #" + std::to_string(i) + " of " + kernels[k]->getName() + ": no corresponding output buffer. ");
43            }
44            producerTable[k].push_back(f->second);
45            unsigned sourceKernel, outputIndex;
46            std::tie(sourceKernel, outputIndex) = f->second;
47            if (sourceKernel >= k) {
48                llvm::report_fatal_error("Pipeline error: input buffer #" + std::to_string(i) + " of " + kernels[k]->getName() + ": not defined before use. ");
49            }
50            //errs() << "sourceKernel: " + std::to_string(sourceKernel) + ", outputIndex: " + std::to_string(outputIndex) + ", user: " + std::to_string(k) + "\n";
51            userTable[sourceKernel][outputIndex]= true;
52           
53        }
54    }
55    for (unsigned k = 0; k < kernels.size(); k++) {
56        auto outputSets = kernels[k]->getStreamSetOutputBuffers();
57        //errs() << "kernel: " + kernels[k]->getName() + "\n";
58        for (unsigned j = 0; j < outputSets.size(); j++) {
59            if (userTable[k][j] == false) {
60                llvm::report_fatal_error("Pipeline error: output buffer #" + std::to_string(j) + " of " + kernels[k]->getName() + ": no users. ");
61            }
62        }
63    }
64    return producerTable;
65}
66
67
68Function * generateSegmentParallelPipelineThreadFunction(std::string name, IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels, Type * sharedStructType, ProducerTable & producerTable, int id) {
69   
70    // ProducerPos[k][i] will hold the producedItemCount of the i^th output stream
71    // set of the k^th kernel.  These values will be loaded immediately after the
72    // doSegment and finalSegment calls for kernel k and later used as the
73    // producer position arguments for later doSegment/finalSegment calls.
74   
75    std::vector<std::vector<Value *>> ProducerPos;
76   
77   
78    const auto ip = iBuilder->saveIP();
79   
80    Module * m = iBuilder->getModule();
81    Type * const size_ty = iBuilder->getSizeTy();
82    Type * const voidTy = iBuilder->getVoidTy();
83    Type * const voidPtrTy = iBuilder->getVoidPtrTy();
84    Type * const int8PtrTy = iBuilder->getInt8PtrTy();
85
86    Function * const threadFunc = cast<Function>(m->getOrInsertFunction(name, voidTy, int8PtrTy, nullptr));
87    threadFunc->setCallingConv(CallingConv::C);
88    Function::arg_iterator args = threadFunc->arg_begin();
89
90    Value * const input = &*(args++);
91    input->setName("input");
92
93    unsigned threadNum = codegen::ThreadNum;
94
95     // Create the basic blocks for the thread function.
96    BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc, 0);
97    BasicBlock * segmentLoop = BasicBlock::Create(iBuilder->getContext(), "segmentLoop", threadFunc, 0);
98    BasicBlock * exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc, 0);
99   
100    std::vector<BasicBlock *> segmentWait;
101    std::vector<BasicBlock *> segmentLoopBody;
102    for (unsigned i = 0; i < kernels.size(); i++) {
103        std::string kname = kernels[i]->getName();
104        segmentWait.push_back(BasicBlock::Create(iBuilder->getContext(), kname + "Wait", threadFunc, 0));
105        segmentLoopBody.push_back(BasicBlock::Create(iBuilder->getContext(), "do_" + kname, threadFunc, 0));
106    }
107
108    iBuilder->SetInsertPoint(entryBlock);
109    Value * sharedStruct = iBuilder->CreateBitCast(input, PointerType::get(sharedStructType, 0));
110    Constant * myThreadId = ConstantInt::get(size_ty, id);
111    std::vector<Value *> instancePtrs;
112    for (unsigned i = 0; i < kernels.size(); i++) {
113        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
114        instancePtrs.push_back(iBuilder->CreateLoad(ptr));
115    }
116   
117    iBuilder->CreateBr(segmentLoop);
118
119    iBuilder->SetInsertPoint(segmentLoop);
120    PHINode * segNo = iBuilder->CreatePHI(size_ty, 2, "segNo");
121    segNo->addIncoming(myThreadId, entryBlock);
122    Value * nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
123    unsigned last_kernel = kernels.size() - 1;
124    Value * alreadyDone = kernels[last_kernel]->getTerminationSignal(instancePtrs[last_kernel]);
125    iBuilder->CreateCondBr(alreadyDone, exitThreadBlock, segmentWait[0]);
126   
127    Value * doFinal = ConstantInt::getNullValue(iBuilder->getInt1Ty());
128
129    for (unsigned k = 0; k < kernels.size(); k++) {
130        iBuilder->SetInsertPoint(segmentWait[k]);
131        Value * processedSegmentCount = kernels[k]->acquireLogicalSegmentNo(instancePtrs[k]);
132        Value * cond = iBuilder->CreateICmpEQ(segNo, processedSegmentCount);
133        iBuilder->CreateCondBr(cond, segmentLoopBody[k], segmentWait[k]);
134       
135        iBuilder->SetInsertPoint(segmentLoopBody[k]);
136        if (k == last_kernel) {
137            segNo->addIncoming(iBuilder->CreateAdd(segNo, ConstantInt::get(size_ty, threadNum)), segmentLoopBody[last_kernel]);
138        }
139       
140       
141       
142       
143        std::vector<Value *> doSegmentArgs = {instancePtrs[k], doFinal};
144        for (unsigned j = 0; j < kernels[k]->getStreamInputs().size(); j++) {
145            unsigned producerKernel, outputIndex;
146            std::tie(producerKernel, outputIndex) = producerTable[k][j];
147            doSegmentArgs.push_back(ProducerPos[producerKernel][outputIndex]);
148        }
149        kernels[k]->createDoSegmentCall(doSegmentArgs);
150        std::vector<Value *> produced;
151        for (unsigned i = 0; i < kernels[k]->getStreamOutputs().size(); i++) {
152            produced.push_back(kernels[k]->getProducedItemCount(instancePtrs[k], kernels[k]->getStreamOutputs()[i].name));
153        }
154        ProducerPos.push_back(produced);
155        if (! (kernels[k]->hasNoTerminateAttribute())) {
156            Value * terminated = kernels[k]->getTerminationSignal(instancePtrs[k]);
157            doFinal = iBuilder->CreateOr(doFinal, terminated);
158        }
159        kernels[k]->releaseLogicalSegmentNo(instancePtrs[k], nextSegNo);
160        if (k == last_kernel) {
161            iBuilder->CreateCondBr(doFinal, exitThreadBlock, segmentLoop);
162        }
163        else {
164            iBuilder->CreateBr(segmentWait[k+1]);
165        }
166    }
167   
168    iBuilder->SetInsertPoint(exitThreadBlock);
169    Value * nullVal = Constant::getNullValue(voidPtrTy);
170    iBuilder->CreatePThreadExitCall(nullVal);
171    iBuilder->CreateRetVoid();
172    iBuilder->restoreIP(ip);
173
174    return threadFunc;
175}
176
177// Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
178// operating over an input stream set S, a segment-parallel implementation divides the input
179// into segments and coordinates a set of T <= K threads to each process one segment at a time.   
180// Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
181// fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
182
183
184void generateSegmentParallelPipeline(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
185   
186    unsigned threadNum = codegen::ThreadNum;
187   
188    Module * m = iBuilder->getModule();
189   
190    Type * const size_ty = iBuilder->getSizeTy();
191    Type * const voidPtrTy = iBuilder->getVoidPtrTy();
192    Type * const int8PtrTy = iBuilder->getInt8PtrTy();
193   
194    for (auto k : kernels) k->createInstance();
195   
196    ProducerTable producerTable = createProducerTable(kernels);
197   
198    Type * const pthreadsTy = ArrayType::get(size_ty, threadNum);
199    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
200    std::vector<Value *> pthreadsPtrs;
201    for (unsigned i = 0; i < threadNum; i++) {
202        pthreadsPtrs.push_back(iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)}));
203    }
204    Value * nullVal = Constant::getNullValue(voidPtrTy);
205    AllocaInst * const status = iBuilder->CreateAlloca(int8PtrTy);
206   
207    std::vector<Type *> structTypes;
208    for (unsigned i = 0; i < kernels.size(); i++) {
209        structTypes.push_back(kernels[i]->getInstance()->getType());
210    }
211    Type * sharedStructType = StructType::get(m->getContext(), structTypes);
212   
213    AllocaInst * sharedStruct = iBuilder->CreateAlloca(sharedStructType);
214    for (unsigned i = 0; i < kernels.size(); i++) {
215        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
216        iBuilder->CreateStore(kernels[i]->getInstance(), ptr);
217    }
218   
219    std::vector<Function *> thread_functions;
220    const auto ip = iBuilder->saveIP();
221    for (unsigned i = 0; i < threadNum; i++) {
222        thread_functions.push_back(generateSegmentParallelPipelineThreadFunction("thread"+std::to_string(i), iBuilder, kernels, sharedStructType, producerTable, i));
223    }
224    iBuilder->restoreIP(ip);
225   
226    for (unsigned i = 0; i < threadNum; i++) {
227        iBuilder->CreatePThreadCreateCall(pthreadsPtrs[i], nullVal, thread_functions[i], iBuilder->CreateBitCast(sharedStruct, int8PtrTy));
228    }
229   
230    std::vector<Value *> threadIDs;
231    for (unsigned i = 0; i < threadNum; i++) { 
232        threadIDs.push_back(iBuilder->CreateLoad(pthreadsPtrs[i]));
233    }
234   
235    for (unsigned i = 0; i < threadNum; i++) { 
236        iBuilder->CreatePThreadJoinCall(threadIDs[i], status);
237    }
238   
239}
240
241void generatePipelineParallel(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
242   
243    Type * pthreadTy = iBuilder->getSizeTy();
244    Type * const voidPtrTy = iBuilder->getVoidPtrTy();
245    Type * const int8PtrTy = iBuilder->getInt8PtrTy();
246   
247    Type * const pthreadsTy = ArrayType::get(pthreadTy, kernels.size());
248   
249    for (auto k : kernels) k->createInstance();
250   
251    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
252    std::vector<Value *> pthreadsPtrs;
253    for (unsigned i = 0; i < kernels.size(); i++) {
254        pthreadsPtrs.push_back(iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)}));
255    }
256    Value * nullVal = Constant::getNullValue(voidPtrTy);
257    AllocaInst * const status = iBuilder->CreateAlloca(int8PtrTy);
258   
259    std::vector<Function *> kernel_functions;
260    const auto ip = iBuilder->saveIP();
261    for (unsigned i = 0; i < kernels.size(); i++) {
262        kernel_functions.push_back(kernels[i]->generateThreadFunction("k_"+std::to_string(i)));
263    }
264    iBuilder->restoreIP(ip);
265   
266    for (unsigned i = 0; i < kernels.size(); i++) {
267        iBuilder->CreatePThreadCreateCall(pthreadsPtrs[i], nullVal, kernel_functions[i], iBuilder->CreateBitCast(kernels[i]->getInstance(), int8PtrTy));
268    }
269   
270    std::vector<Value *> threadIDs;
271    for (unsigned i = 0; i < kernels.size(); i++) { 
272        threadIDs.push_back(iBuilder->CreateLoad(pthreadsPtrs[i]));
273    }
274   
275    for (unsigned i = 0; i < kernels.size(); i++) { 
276        iBuilder->CreatePThreadJoinCall(threadIDs[i], status);
277    }
278}
279
280
281void generatePipelineLoop(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
282    for (auto k : kernels) k->createInstance();
283   
284    BasicBlock * entryBlock = iBuilder->GetInsertBlock();
285    Function * main = entryBlock->getParent();
286   
287    // Create the basic blocks for the loop.
288    BasicBlock * segmentLoop = BasicBlock::Create(iBuilder->getContext(), "segmentLoop", main, 0);
289    BasicBlock * exitBlock = BasicBlock::Create(iBuilder->getContext(), "exitBlock", main, 0);
290   
291    ProducerTable producerTable = createProducerTable(kernels);
292   
293    // ProducerPos[k][i] will hold the producedItemCount of the i^th output stream
294    // set of the k^th kernel.  These values will be loaded immediately after the
295    // doSegment and finalSegment calls for kernel k and later used as the
296    // producer position arguments for later doSegment/finalSegment calls.
297   
298    std::vector<std::vector<Value *>> ProducerPos;
299   
300    iBuilder->CreateBr(segmentLoop);
301    iBuilder->SetInsertPoint(segmentLoop);
302
303    Value * terminationFound = ConstantInt::getNullValue(iBuilder->getInt1Ty());
304    for (unsigned k = 0; k < kernels.size(); k++) {
305        Value * instance = kernels[k]->getInstance();
306        std::vector<Value *> doSegmentArgs = {instance, terminationFound};
307        for (unsigned j = 0; j < kernels[k]->getStreamInputs().size(); j++) {
308            unsigned producerKernel, outputIndex;
309            std::tie(producerKernel, outputIndex) = producerTable[k][j];
310            doSegmentArgs.push_back(ProducerPos[producerKernel][outputIndex]);
311        }
312        kernels[k]->createDoSegmentCall(doSegmentArgs);
313        if (! (kernels[k]->hasNoTerminateAttribute())) {
314            Value * terminated = kernels[k]->getTerminationSignal(instance);
315            terminationFound = iBuilder->CreateOr(terminationFound, terminated);
316        }
317        std::vector<Value *> produced;
318        for (unsigned i = 0; i < kernels[k]->getStreamOutputs().size(); i++) {
319            produced.push_back(kernels[k]->getProducedItemCount(instance, kernels[k]->getStreamOutputs()[i].name));
320        }
321        ProducerPos.push_back(produced);
322        Value * segNo = kernels[k]->acquireLogicalSegmentNo(instance);
323        kernels[k]->releaseLogicalSegmentNo(instance, iBuilder->CreateAdd(segNo, iBuilder->getSize(1)));
324    }
325    iBuilder->CreateCondBr(terminationFound, exitBlock, segmentLoop);
326    iBuilder->SetInsertPoint(exitBlock);
327}
328
329   
Note: See TracBrowser for help on using the repository browser.