source: icGREP/icgrep-devel/icgrep/kernels/pipeline.cpp @ 5353

Last change on this file since 5353 was 5320, checked in by nmedfort, 3 years ago

memcpy/memset support for 32-bit systems; more error messages/handling; bug fix for ParabixCharacterClassKernelBuilder?. continued work on parenthesis matching + expandable buffers.

File size: 14.1 KB
RevLine 
[4929]1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
[5227]6#include "pipeline.h"
[5033]7#include <toolchain.h>
[5086]8#include <kernels/kernel.h>
[5276]9#include <kernels/streamset.h>
[5267]10#include <llvm/IR/Module.h>
[5251]11#include <unordered_map>
[4929]12
[4974]13using namespace kernel;
[5260]14using namespace parabix;
15using namespace llvm;
[4929]16
[5263]17using ProducerTable = std::vector<std::vector<std::pair<unsigned, unsigned>>>;
[5260]18
[5263]19ProducerTable createProducerTable(const std::vector<KernelBuilder *> & kernels) {
[5264]20    ProducerTable producerTable(kernels.size());
[5263]21   
[5264]22    std::vector<std::vector<bool>> userTable(kernels.size());
[5263]23   
24    // First prepare a map from streamSet output buffers to their producing kernel and output index.
25    std::unordered_map<const StreamSetBuffer *, std::pair<unsigned, unsigned>> bufferMap;
26   
27    for (unsigned k = 0; k < kernels.size(); k++) {
28        auto outputSets = kernels[k]->getStreamSetOutputBuffers();
29        for (unsigned j = 0; j < outputSets.size(); j++) {
30            userTable[k].push_back(false);
31            bufferMap.insert(std::make_pair(outputSets[j], std::make_pair(k, j)));
[5251]32        }
33    }
[5263]34    for (unsigned k = 0; k < kernels.size(); k++) {
35        auto inputSets = kernels[k]->getStreamSetInputBuffers();
[5251]36        for (unsigned i = 0; i < inputSets.size(); i++) {
[5263]37            auto f = bufferMap.find(inputSets[i]);
38            if (f == bufferMap.end()) {
39                llvm::report_fatal_error("Pipeline error: input buffer #" + std::to_string(i) + " of " + kernels[k]->getName() + ": no corresponding output buffer. ");
[5251]40            }
[5263]41            producerTable[k].push_back(f->second);
42            unsigned sourceKernel, outputIndex;
43            std::tie(sourceKernel, outputIndex) = f->second;
44            if (sourceKernel >= k) {
45                llvm::report_fatal_error("Pipeline error: input buffer #" + std::to_string(i) + " of " + kernels[k]->getName() + ": not defined before use. ");
46            }
47            //errs() << "sourceKernel: " + std::to_string(sourceKernel) + ", outputIndex: " + std::to_string(outputIndex) + ", user: " + std::to_string(k) + "\n";
48            userTable[sourceKernel][outputIndex]= true;
49           
[5251]50        }
51    }
[5273]52    /*  TODO:  define sinks for  all outputs so that the following check succeeds on
53     *  well-structured pipelines.
[5263]54    for (unsigned k = 0; k < kernels.size(); k++) {
55        auto outputSets = kernels[k]->getStreamSetOutputBuffers();
56        //errs() << "kernel: " + kernels[k]->getName() + "\n";
57        for (unsigned j = 0; j < outputSets.size(); j++) {
58            if (userTable[k][j] == false) {
59                llvm::report_fatal_error("Pipeline error: output buffer #" + std::to_string(j) + " of " + kernels[k]->getName() + ": no users. ");
60            }
61        }
62    }
[5264]63    */
[5263]64    return producerTable;
[5251]65}
66
[5263]67Function * generateSegmentParallelPipelineThreadFunction(std::string name, IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels, Type * sharedStructType, ProducerTable & producerTable, int id) {
68   
69    // ProducerPos[k][i] will hold the producedItemCount of the i^th output stream
70    // set of the k^th kernel.  These values will be loaded immediately after the
71    // doSegment and finalSegment calls for kernel k and later used as the
72    // producer position arguments for later doSegment/finalSegment calls.
73   
74    std::vector<std::vector<Value *>> ProducerPos;
75   
76   
77    const auto ip = iBuilder->saveIP();
78   
[5165]79    Module * m = iBuilder->getModule();
[5227]80    Type * const voidTy = iBuilder->getVoidTy();
[5267]81    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
82    PointerType * const int8PtrTy = iBuilder->getInt8PtrTy();
[5165]83
84    Function * const threadFunc = cast<Function>(m->getOrInsertFunction(name, voidTy, int8PtrTy, nullptr));
85    threadFunc->setCallingConv(CallingConv::C);
86    Function::arg_iterator args = threadFunc->arg_begin();
87
88    Value * const input = &*(args++);
89    input->setName("input");
90
91
92     // Create the basic blocks for the thread function.
93    BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc, 0);
[5194]94    BasicBlock * segmentLoop = BasicBlock::Create(iBuilder->getContext(), "segmentLoop", threadFunc, 0);
[5165]95    BasicBlock * exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc, 0);
[5253]96   
[5165]97    std::vector<BasicBlock *> segmentWait;
98    std::vector<BasicBlock *> segmentLoopBody;
99    for (unsigned i = 0; i < kernels.size(); i++) {
[5283]100        auto kname = kernels[i]->getName();
[5253]101        segmentWait.push_back(BasicBlock::Create(iBuilder->getContext(), kname + "Wait", threadFunc, 0));
[5283]102        segmentLoopBody.push_back(BasicBlock::Create(iBuilder->getContext(), kname + "Do", threadFunc, 0));
[5165]103    }
104
105    iBuilder->SetInsertPoint(entryBlock);
[5273]106   
[5165]107    Value * sharedStruct = iBuilder->CreateBitCast(input, PointerType::get(sharedStructType, 0));
108    std::vector<Value *> instancePtrs;
[5274]109    for (unsigned k = 0; k < kernels.size(); k++) {
110        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
[5165]111        instancePtrs.push_back(iBuilder->CreateLoad(ptr));
112    }
113   
114    iBuilder->CreateBr(segmentLoop);
115
116    iBuilder->SetInsertPoint(segmentLoop);
[5283]117    PHINode * segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 2, "segNo");
118    segNo->addIncoming(iBuilder->getSize(id), entryBlock);
119    const unsigned last_kernel = kernels.size() - 1;
[5275]120    Value * doFinal = ConstantInt::getNullValue(iBuilder->getInt1Ty());
[5295]121    Value * nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
[5275]122    iBuilder->CreateBr(segmentWait[0]);
[5263]123    for (unsigned k = 0; k < kernels.size(); k++) {
124        iBuilder->SetInsertPoint(segmentWait[k]);
[5295]125        unsigned waitForKernel = k;
126        if (codegen::DebugOptionIsSet(codegen::SerializeThreads)) {
127            waitForKernel = last_kernel;
128        }
129        Value * processedSegmentCount = kernels[waitForKernel]->acquireLogicalSegmentNo(instancePtrs[waitForKernel]);
[5305]130        Value * ready = iBuilder->CreateICmpEQ(segNo, processedSegmentCount);
[5274]131
132        if (kernels[k]->hasNoTerminateAttribute()) {
[5305]133            iBuilder->CreateCondBr(ready, segmentLoopBody[k], segmentWait[k]);
[5292]134        } else { // If the kernel was terminated in a previous segment then the pipeline is done.
[5274]135            BasicBlock * completionTest = BasicBlock::Create(iBuilder->getContext(), kernels[k]->getName() + "Completed", threadFunc, 0);
[5305]136            BasicBlock * exitBlock = BasicBlock::Create(iBuilder->getContext(), kernels[k]->getName() + "Exit", threadFunc, 0);
137            iBuilder->CreateCondBr(ready, completionTest, segmentWait[k]);
[5274]138            iBuilder->SetInsertPoint(completionTest);
139            Value * alreadyDone = kernels[k]->getTerminationSignal(instancePtrs[k]);
[5305]140            iBuilder->CreateCondBr(alreadyDone, exitBlock, segmentLoopBody[k]);
141            iBuilder->SetInsertPoint(exitBlock);
142            // Ensure that the next thread will also exit.
143            kernels[k]->releaseLogicalSegmentNo(instancePtrs[k], nextSegNo);
144            iBuilder->CreateBr(exitThreadBlock);
[5274]145        }
[5263]146        iBuilder->SetInsertPoint(segmentLoopBody[k]);
147        std::vector<Value *> doSegmentArgs = {instancePtrs[k], doFinal};
148        for (unsigned j = 0; j < kernels[k]->getStreamInputs().size(); j++) {
149            unsigned producerKernel, outputIndex;
150            std::tie(producerKernel, outputIndex) = producerTable[k][j];
151            doSegmentArgs.push_back(ProducerPos[producerKernel][outputIndex]);
[5253]152        }
[5263]153        kernels[k]->createDoSegmentCall(doSegmentArgs);
154        std::vector<Value *> produced;
155        for (unsigned i = 0; i < kernels[k]->getStreamOutputs().size(); i++) {
156            produced.push_back(kernels[k]->getProducedItemCount(instancePtrs[k], kernels[k]->getStreamOutputs()[i].name));
157        }
158        ProducerPos.push_back(produced);
159        if (! (kernels[k]->hasNoTerminateAttribute())) {
160            Value * terminated = kernels[k]->getTerminationSignal(instancePtrs[k]);
161            doFinal = iBuilder->CreateOr(doFinal, terminated);
162        }
[5283]163
[5263]164        kernels[k]->releaseLogicalSegmentNo(instancePtrs[k], nextSegNo);
[5273]165        if (k == last_kernel) {
[5283]166            segNo->addIncoming(iBuilder->CreateAdd(segNo, iBuilder->getSize(codegen::ThreadNum)), segmentLoopBody[last_kernel]);
[5274]167            iBuilder->CreateCondBr(doFinal, exitThreadBlock, segmentLoop);
[5292]168        } else {
[5274]169            iBuilder->CreateBr(segmentWait[k+1]);
170        }
[5165]171    }
[5266]172
[5165]173    iBuilder->SetInsertPoint(exitThreadBlock);
174    Value * nullVal = Constant::getNullValue(voidPtrTy);
[5242]175    iBuilder->CreatePThreadExitCall(nullVal);
[5165]176    iBuilder->CreateRetVoid();
[5263]177    iBuilder->restoreIP(ip);
[5165]178
179    return threadFunc;
180}
181
[5251]182// Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
183// operating over an input stream set S, a segment-parallel implementation divides the input
184// into segments and coordinates a set of T <= K threads to each process one segment at a time.   
185// Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
186// fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
187
188
[5260]189void generateSegmentParallelPipeline(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
[5165]190   
[5283]191    const unsigned threadNum = codegen::ThreadNum;
[5263]192   
[5165]193    Module * m = iBuilder->getModule();
[5263]194   
[5267]195    IntegerType * const size_ty = iBuilder->getSizeTy();
196    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
197    PointerType * const int8PtrTy = iBuilder->getInt8PtrTy();
[5263]198   
[5220]199    for (auto k : kernels) k->createInstance();
[5263]200   
201    ProducerTable producerTable = createProducerTable(kernels);
202   
[5165]203    Type * const pthreadsTy = ArrayType::get(size_ty, threadNum);
204    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
205    std::vector<Value *> pthreadsPtrs;
206    for (unsigned i = 0; i < threadNum; i++) {
207        pthreadsPtrs.push_back(iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)}));
208    }
209    Value * nullVal = Constant::getNullValue(voidPtrTy);
210    AllocaInst * const status = iBuilder->CreateAlloca(int8PtrTy);
[5263]211   
[5165]212    std::vector<Type *> structTypes;
[5220]213    for (unsigned i = 0; i < kernels.size(); i++) {
214        structTypes.push_back(kernels[i]->getInstance()->getType());
[5165]215    }
216    Type * sharedStructType = StructType::get(m->getContext(), structTypes);
[5263]217   
[5202]218    AllocaInst * sharedStruct = iBuilder->CreateAlloca(sharedStructType);
[5220]219    for (unsigned i = 0; i < kernels.size(); i++) {
[5221]220        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
[5220]221        iBuilder->CreateStore(kernels[i]->getInstance(), ptr);
[5165]222    }
[5273]223    for (unsigned i = 0; i < kernels.size(); i++) {
224        kernels[i]->releaseLogicalSegmentNo(kernels[i]->getInstance(), iBuilder->getSize(0));
225    }
226
[5165]227    std::vector<Function *> thread_functions;
228    const auto ip = iBuilder->saveIP();
229    for (unsigned i = 0; i < threadNum; i++) {
[5263]230        thread_functions.push_back(generateSegmentParallelPipelineThreadFunction("thread"+std::to_string(i), iBuilder, kernels, sharedStructType, producerTable, i));
[5165]231    }
232    iBuilder->restoreIP(ip);
[5263]233   
[5165]234    for (unsigned i = 0; i < threadNum; i++) {
[5242]235        iBuilder->CreatePThreadCreateCall(pthreadsPtrs[i], nullVal, thread_functions[i], iBuilder->CreateBitCast(sharedStruct, int8PtrTy));
[5165]236    }
[5263]237   
[5165]238    std::vector<Value *> threadIDs;
239    for (unsigned i = 0; i < threadNum; i++) { 
240        threadIDs.push_back(iBuilder->CreateLoad(pthreadsPtrs[i]));
241    }
242   
243    for (unsigned i = 0; i < threadNum; i++) { 
[5242]244        iBuilder->CreatePThreadJoinCall(threadIDs[i], status);
[5165]245    }
[5263]246   
[5165]247}
248
[5260]249void generatePipelineParallel(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
[5276]250    llvm::report_fatal_error("Pipeline parallelism no longer supported!");
[5135]251}
252
253
[5260]254void generatePipelineLoop(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
[5320]255    for (auto k : kernels) {
256        k->createInstance();
257    }
[5086]258    BasicBlock * entryBlock = iBuilder->GetInsertBlock();
259    Function * main = entryBlock->getParent();
[5273]260
[5263]261    // Create the basic blocks for the loop.
[5252]262    BasicBlock * segmentLoop = BasicBlock::Create(iBuilder->getContext(), "segmentLoop", main, 0);
[5194]263    BasicBlock * exitBlock = BasicBlock::Create(iBuilder->getContext(), "exitBlock", main, 0);
[5252]264   
[5263]265    ProducerTable producerTable = createProducerTable(kernels);
[5252]266   
[5263]267    // ProducerPos[k][i] will hold the producedItemCount of the i^th output stream
268    // set of the k^th kernel.  These values will be loaded immediately after the
269    // doSegment and finalSegment calls for kernel k and later used as the
270    // producer position arguments for later doSegment/finalSegment calls.
271   
272    std::vector<std::vector<Value *>> ProducerPos;
273   
[5252]274    iBuilder->CreateBr(segmentLoop);
[5263]275    iBuilder->SetInsertPoint(segmentLoop);
276
277    Value * terminationFound = ConstantInt::getNullValue(iBuilder->getInt1Ty());
278    for (unsigned k = 0; k < kernels.size(); k++) {
279        Value * instance = kernels[k]->getInstance();
280        std::vector<Value *> doSegmentArgs = {instance, terminationFound};
281        for (unsigned j = 0; j < kernels[k]->getStreamInputs().size(); j++) {
282            unsigned producerKernel, outputIndex;
283            std::tie(producerKernel, outputIndex) = producerTable[k][j];
284            doSegmentArgs.push_back(ProducerPos[producerKernel][outputIndex]);
[5252]285        }
[5263]286        kernels[k]->createDoSegmentCall(doSegmentArgs);
287        if (! (kernels[k]->hasNoTerminateAttribute())) {
288            Value * terminated = kernels[k]->getTerminationSignal(instance);
289            terminationFound = iBuilder->CreateOr(terminationFound, terminated);
[5252]290        }
[5263]291        std::vector<Value *> produced;
292        for (unsigned i = 0; i < kernels[k]->getStreamOutputs().size(); i++) {
293            produced.push_back(kernels[k]->getProducedItemCount(instance, kernels[k]->getStreamOutputs()[i].name));
[5252]294        }
[5263]295        ProducerPos.push_back(produced);
296        Value * segNo = kernels[k]->acquireLogicalSegmentNo(instance);
297        kernels[k]->releaseLogicalSegmentNo(instance, iBuilder->CreateAdd(segNo, iBuilder->getSize(1)));
[5025]298    }
[5263]299    iBuilder->CreateCondBr(terminationFound, exitBlock, segmentLoop);
[5174]300    iBuilder->SetInsertPoint(exitBlock);
[5252]301}
[5263]302
303   
Note: See TracBrowser for help on using the repository browser.