source: icGREP/icgrep-devel/icgrep/kernels/pipeline.cpp @ 5266

Last change on this file since 5266 was 5266, checked in by cameron, 2 years ago

Fixes

File size: 14.5 KB
RevLine 
[4929]1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
[5227]6#include "pipeline.h"
[5033]7#include <toolchain.h>
[5238]8#include <IR_Gen/idisa_builder.h>
[5086]9#include <kernels/interface.h>
10#include <kernels/kernel.h>
[5165]11#include <iostream>
[5251]12#include <unordered_map>
[4929]13
[4974]14using namespace kernel;
[5260]15using namespace parabix;
16using namespace llvm;
[4929]17
[5263]18using ProducerTable = std::vector<std::vector<std::pair<unsigned, unsigned>>>;
[5260]19
[5263]20ProducerTable createProducerTable(const std::vector<KernelBuilder *> & kernels) {
[5264]21    ProducerTable producerTable(kernels.size());
[5263]22   
[5264]23    std::vector<std::vector<bool>> userTable(kernels.size());
[5263]24   
25    // First prepare a map from streamSet output buffers to their producing kernel and output index.
26    std::unordered_map<const StreamSetBuffer *, std::pair<unsigned, unsigned>> bufferMap;
27   
28    for (unsigned k = 0; k < kernels.size(); k++) {
29        auto outputSets = kernels[k]->getStreamSetOutputBuffers();
30        for (unsigned j = 0; j < outputSets.size(); j++) {
31            userTable[k].push_back(false);
32            bufferMap.insert(std::make_pair(outputSets[j], std::make_pair(k, j)));
[5251]33        }
34    }
[5263]35    for (unsigned k = 0; k < kernels.size(); k++) {
36        auto inputSets = kernels[k]->getStreamSetInputBuffers();
[5251]37        for (unsigned i = 0; i < inputSets.size(); i++) {
[5263]38            auto f = bufferMap.find(inputSets[i]);
39            if (f == bufferMap.end()) {
40                llvm::report_fatal_error("Pipeline error: input buffer #" + std::to_string(i) + " of " + kernels[k]->getName() + ": no corresponding output buffer. ");
[5251]41            }
[5263]42            producerTable[k].push_back(f->second);
43            unsigned sourceKernel, outputIndex;
44            std::tie(sourceKernel, outputIndex) = f->second;
45            if (sourceKernel >= k) {
46                llvm::report_fatal_error("Pipeline error: input buffer #" + std::to_string(i) + " of " + kernels[k]->getName() + ": not defined before use. ");
47            }
48            //errs() << "sourceKernel: " + std::to_string(sourceKernel) + ", outputIndex: " + std::to_string(outputIndex) + ", user: " + std::to_string(k) + "\n";
49            userTable[sourceKernel][outputIndex]= true;
50           
[5251]51        }
52    }
[5264]53    /*
[5263]54    for (unsigned k = 0; k < kernels.size(); k++) {
55        auto outputSets = kernels[k]->getStreamSetOutputBuffers();
56        //errs() << "kernel: " + kernels[k]->getName() + "\n";
57        for (unsigned j = 0; j < outputSets.size(); j++) {
58            if (userTable[k][j] == false) {
59                llvm::report_fatal_error("Pipeline error: output buffer #" + std::to_string(j) + " of " + kernels[k]->getName() + ": no users. ");
60            }
61        }
62    }
[5264]63    */
[5263]64    return producerTable;
[5251]65}
66
67
[5263]68Function * generateSegmentParallelPipelineThreadFunction(std::string name, IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels, Type * sharedStructType, ProducerTable & producerTable, int id) {
69   
70    // ProducerPos[k][i] will hold the producedItemCount of the i^th output stream
71    // set of the k^th kernel.  These values will be loaded immediately after the
72    // doSegment and finalSegment calls for kernel k and later used as the
73    // producer position arguments for later doSegment/finalSegment calls.
74   
75    std::vector<std::vector<Value *>> ProducerPos;
76   
77   
78    const auto ip = iBuilder->saveIP();
79   
[5165]80    Module * m = iBuilder->getModule();
81    Type * const size_ty = iBuilder->getSizeTy();
[5227]82    Type * const voidTy = iBuilder->getVoidTy();
83    Type * const voidPtrTy = iBuilder->getVoidPtrTy();
[5165]84    Type * const int8PtrTy = iBuilder->getInt8PtrTy();
85
86    Function * const threadFunc = cast<Function>(m->getOrInsertFunction(name, voidTy, int8PtrTy, nullptr));
87    threadFunc->setCallingConv(CallingConv::C);
88    Function::arg_iterator args = threadFunc->arg_begin();
89
90    Value * const input = &*(args++);
91    input->setName("input");
92
[5175]93    unsigned threadNum = codegen::ThreadNum;
[5165]94
95     // Create the basic blocks for the thread function.
96    BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc, 0);
[5194]97    BasicBlock * segmentLoop = BasicBlock::Create(iBuilder->getContext(), "segmentLoop", threadFunc, 0);
[5165]98    BasicBlock * exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc, 0);
[5253]99   
[5165]100    std::vector<BasicBlock *> segmentWait;
101    std::vector<BasicBlock *> segmentLoopBody;
102    for (unsigned i = 0; i < kernels.size(); i++) {
[5253]103        std::string kname = kernels[i]->getName();
104        segmentWait.push_back(BasicBlock::Create(iBuilder->getContext(), kname + "Wait", threadFunc, 0));
105        segmentLoopBody.push_back(BasicBlock::Create(iBuilder->getContext(), "do_" + kname, threadFunc, 0));
[5165]106    }
107
108    iBuilder->SetInsertPoint(entryBlock);
109    Value * sharedStruct = iBuilder->CreateBitCast(input, PointerType::get(sharedStructType, 0));
[5194]110    Constant * myThreadId = ConstantInt::get(size_ty, id);
[5165]111    std::vector<Value *> instancePtrs;
112    for (unsigned i = 0; i < kernels.size(); i++) {
[5221]113        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
[5165]114        instancePtrs.push_back(iBuilder->CreateLoad(ptr));
115    }
116   
117    iBuilder->CreateBr(segmentLoop);
118
119    iBuilder->SetInsertPoint(segmentLoop);
[5174]120    PHINode * segNo = iBuilder->CreatePHI(size_ty, 2, "segNo");
[5194]121    segNo->addIncoming(myThreadId, entryBlock);
[5258]122    Value * nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
[5194]123    unsigned last_kernel = kernels.size() - 1;
124    Value * alreadyDone = kernels[last_kernel]->getTerminationSignal(instancePtrs[last_kernel]);
125    iBuilder->CreateCondBr(alreadyDone, exitThreadBlock, segmentWait[0]);
[5263]126   
127    Value * doFinal = ConstantInt::getNullValue(iBuilder->getInt1Ty());
[5165]128
[5263]129    for (unsigned k = 0; k < kernels.size(); k++) {
130        iBuilder->SetInsertPoint(segmentWait[k]);
131        Value * processedSegmentCount = kernels[k]->acquireLogicalSegmentNo(instancePtrs[k]);
[5174]132        Value * cond = iBuilder->CreateICmpEQ(segNo, processedSegmentCount);
[5263]133        iBuilder->CreateCondBr(cond, segmentLoopBody[k], segmentWait[k]);
134       
135        iBuilder->SetInsertPoint(segmentLoopBody[k]);
[5266]136       
137        //iBuilder->CallPrintInt(kernels[k]->getName() + " segment #", segNo);
[5263]138        if (k == last_kernel) {
[5253]139            segNo->addIncoming(iBuilder->CreateAdd(segNo, ConstantInt::get(size_ty, threadNum)), segmentLoopBody[last_kernel]);
140        }
[5263]141        std::vector<Value *> doSegmentArgs = {instancePtrs[k], doFinal};
142        for (unsigned j = 0; j < kernels[k]->getStreamInputs().size(); j++) {
143            unsigned producerKernel, outputIndex;
144            std::tie(producerKernel, outputIndex) = producerTable[k][j];
145            doSegmentArgs.push_back(ProducerPos[producerKernel][outputIndex]);
[5266]146            //iBuilder->CallPrintInt(kernels[k]->getName() + " producerPos[" + std::to_string(j) + "] ", doSegmentArgs.back());
147
[5253]148        }
[5263]149        kernels[k]->createDoSegmentCall(doSegmentArgs);
150        std::vector<Value *> produced;
151        for (unsigned i = 0; i < kernels[k]->getStreamOutputs().size(); i++) {
152            produced.push_back(kernels[k]->getProducedItemCount(instancePtrs[k], kernels[k]->getStreamOutputs()[i].name));
153        }
154        ProducerPos.push_back(produced);
155        if (! (kernels[k]->hasNoTerminateAttribute())) {
156            Value * terminated = kernels[k]->getTerminationSignal(instancePtrs[k]);
157            doFinal = iBuilder->CreateOr(doFinal, terminated);
158        }
159        kernels[k]->releaseLogicalSegmentNo(instancePtrs[k], nextSegNo);
[5266]160        if (k == last_kernel) break;
161        iBuilder->CreateBr(segmentWait[k+1]);
[5165]162    }
[5266]163    iBuilder->CreateCondBr(doFinal, exitThreadBlock, segmentLoop);
164
[5165]165    iBuilder->SetInsertPoint(exitThreadBlock);
166    Value * nullVal = Constant::getNullValue(voidPtrTy);
[5242]167    iBuilder->CreatePThreadExitCall(nullVal);
[5165]168    iBuilder->CreateRetVoid();
[5263]169    iBuilder->restoreIP(ip);
[5165]170
171    return threadFunc;
172}
173
[5251]174// Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
175// operating over an input stream set S, a segment-parallel implementation divides the input
176// into segments and coordinates a set of T <= K threads to each process one segment at a time.   
177// Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
178// fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
179
180
[5260]181void generateSegmentParallelPipeline(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
[5165]182   
[5175]183    unsigned threadNum = codegen::ThreadNum;
[5263]184   
[5165]185    Module * m = iBuilder->getModule();
[5263]186   
[5165]187    Type * const size_ty = iBuilder->getSizeTy();
[5227]188    Type * const voidPtrTy = iBuilder->getVoidPtrTy();
[5165]189    Type * const int8PtrTy = iBuilder->getInt8PtrTy();
[5263]190   
[5220]191    for (auto k : kernels) k->createInstance();
[5263]192   
193    ProducerTable producerTable = createProducerTable(kernels);
194   
[5165]195    Type * const pthreadsTy = ArrayType::get(size_ty, threadNum);
196    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
197    std::vector<Value *> pthreadsPtrs;
198    for (unsigned i = 0; i < threadNum; i++) {
199        pthreadsPtrs.push_back(iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)}));
200    }
201    Value * nullVal = Constant::getNullValue(voidPtrTy);
202    AllocaInst * const status = iBuilder->CreateAlloca(int8PtrTy);
[5263]203   
[5165]204    std::vector<Type *> structTypes;
[5220]205    for (unsigned i = 0; i < kernels.size(); i++) {
206        structTypes.push_back(kernels[i]->getInstance()->getType());
[5165]207    }
208    Type * sharedStructType = StructType::get(m->getContext(), structTypes);
[5263]209   
[5202]210    AllocaInst * sharedStruct = iBuilder->CreateAlloca(sharedStructType);
[5220]211    for (unsigned i = 0; i < kernels.size(); i++) {
[5221]212        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
[5220]213        iBuilder->CreateStore(kernels[i]->getInstance(), ptr);
[5165]214    }
[5263]215   
[5165]216    std::vector<Function *> thread_functions;
217    const auto ip = iBuilder->saveIP();
218    for (unsigned i = 0; i < threadNum; i++) {
[5263]219        thread_functions.push_back(generateSegmentParallelPipelineThreadFunction("thread"+std::to_string(i), iBuilder, kernels, sharedStructType, producerTable, i));
[5165]220    }
221    iBuilder->restoreIP(ip);
[5263]222   
[5165]223    for (unsigned i = 0; i < threadNum; i++) {
[5242]224        iBuilder->CreatePThreadCreateCall(pthreadsPtrs[i], nullVal, thread_functions[i], iBuilder->CreateBitCast(sharedStruct, int8PtrTy));
[5165]225    }
[5263]226   
[5165]227    std::vector<Value *> threadIDs;
228    for (unsigned i = 0; i < threadNum; i++) { 
229        threadIDs.push_back(iBuilder->CreateLoad(pthreadsPtrs[i]));
230    }
231   
232    for (unsigned i = 0; i < threadNum; i++) { 
[5242]233        iBuilder->CreatePThreadJoinCall(threadIDs[i], status);
[5165]234    }
[5263]235   
[5165]236}
237
[5260]238void generatePipelineParallel(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
[5263]239   
[5242]240    Type * pthreadTy = iBuilder->getSizeTy();
[5227]241    Type * const voidPtrTy = iBuilder->getVoidPtrTy();
[5135]242    Type * const int8PtrTy = iBuilder->getInt8PtrTy();
[5263]243   
[5135]244    Type * const pthreadsTy = ArrayType::get(pthreadTy, kernels.size());
[5263]245   
[5220]246    for (auto k : kernels) k->createInstance();
[5263]247   
[5135]248    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
249    std::vector<Value *> pthreadsPtrs;
250    for (unsigned i = 0; i < kernels.size(); i++) {
251        pthreadsPtrs.push_back(iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)}));
252    }
253    Value * nullVal = Constant::getNullValue(voidPtrTy);
254    AllocaInst * const status = iBuilder->CreateAlloca(int8PtrTy);
[5263]255   
[5135]256    std::vector<Function *> kernel_functions;
257    const auto ip = iBuilder->saveIP();
258    for (unsigned i = 0; i < kernels.size(); i++) {
259        kernel_functions.push_back(kernels[i]->generateThreadFunction("k_"+std::to_string(i)));
260    }
261    iBuilder->restoreIP(ip);
[5263]262   
[5135]263    for (unsigned i = 0; i < kernels.size(); i++) {
[5242]264        iBuilder->CreatePThreadCreateCall(pthreadsPtrs[i], nullVal, kernel_functions[i], iBuilder->CreateBitCast(kernels[i]->getInstance(), int8PtrTy));
[5135]265    }
[5263]266   
[5135]267    std::vector<Value *> threadIDs;
268    for (unsigned i = 0; i < kernels.size(); i++) { 
269        threadIDs.push_back(iBuilder->CreateLoad(pthreadsPtrs[i]));
270    }
271   
272    for (unsigned i = 0; i < kernels.size(); i++) { 
[5242]273        iBuilder->CreatePThreadJoinCall(threadIDs[i], status);
[5135]274    }
275}
276
277
[5260]278void generatePipelineLoop(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
[5252]279    for (auto k : kernels) k->createInstance();
280   
[5086]281    BasicBlock * entryBlock = iBuilder->GetInsertBlock();
282    Function * main = entryBlock->getParent();
[5263]283   
284    // Create the basic blocks for the loop.
[5252]285    BasicBlock * segmentLoop = BasicBlock::Create(iBuilder->getContext(), "segmentLoop", main, 0);
[5194]286    BasicBlock * exitBlock = BasicBlock::Create(iBuilder->getContext(), "exitBlock", main, 0);
[5252]287   
[5263]288    ProducerTable producerTable = createProducerTable(kernels);
[5252]289   
[5263]290    // ProducerPos[k][i] will hold the producedItemCount of the i^th output stream
291    // set of the k^th kernel.  These values will be loaded immediately after the
292    // doSegment and finalSegment calls for kernel k and later used as the
293    // producer position arguments for later doSegment/finalSegment calls.
294   
295    std::vector<std::vector<Value *>> ProducerPos;
296   
[5252]297    iBuilder->CreateBr(segmentLoop);
[5263]298    iBuilder->SetInsertPoint(segmentLoop);
299
300    Value * terminationFound = ConstantInt::getNullValue(iBuilder->getInt1Ty());
301    for (unsigned k = 0; k < kernels.size(); k++) {
302        Value * instance = kernels[k]->getInstance();
303        std::vector<Value *> doSegmentArgs = {instance, terminationFound};
304        for (unsigned j = 0; j < kernels[k]->getStreamInputs().size(); j++) {
305            unsigned producerKernel, outputIndex;
306            std::tie(producerKernel, outputIndex) = producerTable[k][j];
307            doSegmentArgs.push_back(ProducerPos[producerKernel][outputIndex]);
[5252]308        }
[5263]309        kernels[k]->createDoSegmentCall(doSegmentArgs);
310        if (! (kernels[k]->hasNoTerminateAttribute())) {
311            Value * terminated = kernels[k]->getTerminationSignal(instance);
312            terminationFound = iBuilder->CreateOr(terminationFound, terminated);
[5252]313        }
[5263]314        std::vector<Value *> produced;
315        for (unsigned i = 0; i < kernels[k]->getStreamOutputs().size(); i++) {
316            produced.push_back(kernels[k]->getProducedItemCount(instance, kernels[k]->getStreamOutputs()[i].name));
[5252]317        }
[5263]318        ProducerPos.push_back(produced);
319        Value * segNo = kernels[k]->acquireLogicalSegmentNo(instance);
320        kernels[k]->releaseLogicalSegmentNo(instance, iBuilder->CreateAdd(segNo, iBuilder->getSize(1)));
[5025]321    }
[5263]322    iBuilder->CreateCondBr(terminationFound, exitBlock, segmentLoop);
[5174]323    iBuilder->SetInsertPoint(exitBlock);
[5252]324}
[5263]325
326   
Note: See TracBrowser for help on using the repository browser.