source: icGREP/icgrep-devel/icgrep/kernels/pipeline.cpp @ 5255

Last change on this file since 5255 was 5253, checked in by cameron, 3 years ago

Termination signal checking and doSegment/final segment separation for segment-pipeline parallel thread functions.

File size: 15.4 KB
Line 
1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
6#include "pipeline.h"
7#include <toolchain.h>
8#include <IR_Gen/idisa_builder.h>
9#include <kernels/interface.h>
10#include <kernels/kernel.h>
11#include <kernels/s2p_kernel.h>
12#include <iostream>
13#include <unordered_map>
14
15using namespace kernel;
16
17using BufferMap = std::unordered_map<StreamSetBuffer *, std::pair<KernelBuilder *, unsigned>>;
18
19
20static void createStreamBufferMap(BufferMap & bufferMap, std::vector<KernelBuilder *> kernels) {
21    for (auto k: kernels) {
22        auto outputSets = k->getStreamSetOutputBuffers();
23        for (unsigned i = 0; i < outputSets.size(); i++) {
24            bufferMap.insert(std::make_pair(outputSets[i], std::make_pair(k, i)));
25        }
26    }
27    for (auto k: kernels) {
28        auto inputSets = k->getStreamSetInputBuffers();
29        for (unsigned i = 0; i < inputSets.size(); i++) {
30            if (bufferMap.find(inputSets[i]) == bufferMap.end()) {
31                llvm::report_fatal_error("Pipeline error: input buffer #" + std::to_string(i) + " of " + k->getName() + ": no corresponding output buffer. ");
32            }
33        }
34    }
35}
36
37static Value * getSegmentBlocks(BufferMap & bufferMap, KernelBuilder * kernel) {
38    IDISA::IDISA_Builder * iBuilder = kernel->getBuilder();
39    std::cerr << "getSegmentBlocks\n"; 
40
41    KernelBuilder * sourceKernel;
42
43    unsigned outputIndex;
44    auto inputs = kernel->getStreamSetInputBuffers();
45    if (inputs.empty()) return iBuilder->getSize(codegen::SegmentSize * iBuilder->getStride() / iBuilder->getBitBlockWidth());
46    std::string inputSetName = kernel->getStreamInputs()[0].name;
47    std::cerr << "inputSetName = " << inputSetName << "\n"; 
48    auto f = bufferMap.find(inputs[0]);
49    assert(f != bufferMap.end()  && "bufferMap failure");
50    std::tie(sourceKernel, outputIndex) = f->second;
51    std::cerr << "outputIndex = " << outputIndex << "\n"; 
52    Value * produced = sourceKernel->getProducedItemCount(sourceKernel->getInstance(), sourceKernel->getStreamOutputs()[outputIndex].name);
53    iBuilder->CallPrintInt("produced", produced);
54    Value * processed = kernel->getProcessedItemCount(kernel->getInstance(), inputSetName);
55    iBuilder->CallPrintInt("processed", processed);
56    Value * itemsToDo = iBuilder->CreateSub(produced, processed);
57    return iBuilder->CreateUDiv(itemsToDo, iBuilder->getSize(iBuilder->getStride()));
58}
59                                   
60
61
62Function * generateSegmentParallelPipelineThreadFunction(std::string name, IDISA::IDISA_Builder * iBuilder, std::vector<KernelBuilder *> kernels, Type * sharedStructType, int id) {
63
64    Module * m = iBuilder->getModule();
65    Type * const size_ty = iBuilder->getSizeTy();
66    Type * const voidTy = iBuilder->getVoidTy();
67    Type * const voidPtrTy = iBuilder->getVoidPtrTy();
68    Type * const int8PtrTy = iBuilder->getInt8PtrTy();
69
70    Function * const threadFunc = cast<Function>(m->getOrInsertFunction(name, voidTy, int8PtrTy, nullptr));
71    threadFunc->setCallingConv(CallingConv::C);
72    Function::arg_iterator args = threadFunc->arg_begin();
73
74    Value * const input = &*(args++);
75    input->setName("input");
76
77    unsigned threadNum = codegen::ThreadNum;
78
79     // Create the basic blocks for the thread function.
80    BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc, 0);
81    BasicBlock * segmentLoop = BasicBlock::Create(iBuilder->getContext(), "segmentLoop", threadFunc, 0);
82    BasicBlock * exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc, 0);
83   
84    std::vector<BasicBlock *> segmentWait;
85    std::vector<BasicBlock *> segmentLoopBody;
86    std::vector<BasicBlock *> partialSegmentWait;
87    std::vector<BasicBlock *> partialSegmentLoopBody;
88    bool terminationSignalEncountered = false;
89    for (unsigned i = 0; i < kernels.size(); i++) {
90        std::string kname = kernels[i]->getName();
91        segmentWait.push_back(BasicBlock::Create(iBuilder->getContext(), kname + "Wait", threadFunc, 0));
92        segmentLoopBody.push_back(BasicBlock::Create(iBuilder->getContext(), "do_" + kname, threadFunc, 0));
93        if (terminationSignalEncountered) {
94            partialSegmentWait.push_back(BasicBlock::Create(iBuilder->getContext(), kname + "WaitFinal", threadFunc, 0));
95            partialSegmentLoopBody.push_back(BasicBlock::Create(iBuilder->getContext(), "finish_" + kname, threadFunc, 0));
96        }
97        else {
98            partialSegmentWait.push_back(nullptr);
99            partialSegmentLoopBody.push_back(nullptr);
100            terminationSignalEncountered = kernels[i]->hasNoTerminateAttribute() == false;
101        }
102    }
103    segmentWait.push_back(segmentLoop); // If the last kernel does not terminate, loop back.
104    partialSegmentWait.push_back(exitThreadBlock); // If the last kernel does terminate, we're done.
105
106    iBuilder->SetInsertPoint(entryBlock);
107    Value * sharedStruct = iBuilder->CreateBitCast(input, PointerType::get(sharedStructType, 0));
108    Constant * myThreadId = ConstantInt::get(size_ty, id);
109    std::vector<Value *> instancePtrs;
110    for (unsigned i = 0; i < kernels.size(); i++) {
111        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
112        instancePtrs.push_back(iBuilder->CreateLoad(ptr));
113    }
114   
115    // Some important constant values.
116    int segmentSize = codegen::SegmentSize;
117    Constant * segmentBlocks = ConstantInt::get(size_ty, segmentSize);
118    iBuilder->CreateBr(segmentLoop);
119
120    iBuilder->SetInsertPoint(segmentLoop);
121    PHINode * segNo = iBuilder->CreatePHI(size_ty, 2, "segNo");
122    segNo->addIncoming(myThreadId, entryBlock);
123    unsigned last_kernel = kernels.size() - 1;
124    Value * alreadyDone = kernels[last_kernel]->getTerminationSignal(instancePtrs[last_kernel]);
125    iBuilder->CreateCondBr(alreadyDone, exitThreadBlock, segmentWait[0]);
126
127    for (unsigned i = 0; i < kernels.size(); i++) {
128        iBuilder->SetInsertPoint(segmentWait[i]);
129        Value * processedSegmentCount = kernels[i]->acquireLogicalSegmentNo(instancePtrs[i]);
130        Value * cond = iBuilder->CreateICmpEQ(segNo, processedSegmentCount);
131        iBuilder->CreateCondBr(cond, segmentLoopBody[i], segmentWait[i]);
132
133        iBuilder->SetInsertPoint(segmentLoopBody[i]);
134        if (i == last_kernel) {
135            segNo->addIncoming(iBuilder->CreateAdd(segNo, ConstantInt::get(size_ty, threadNum)), segmentLoopBody[last_kernel]);
136        }
137        kernels[i]->createDoSegmentCall(instancePtrs[i], segmentBlocks);
138        if (kernels[i]->hasNoTerminateAttribute()) {
139            kernels[i]->releaseLogicalSegmentNo(instancePtrs[i], iBuilder->CreateAdd(processedSegmentCount, iBuilder->getSize(1)));
140            iBuilder->CreateBr(segmentWait[i+1]);
141        }
142        else {
143            Value * terminated = kernels[i]->getTerminationSignal(instancePtrs[i]);
144            kernels[i]->releaseLogicalSegmentNo(instancePtrs[i], iBuilder->CreateAdd(processedSegmentCount, iBuilder->getSize(1)));
145            iBuilder->CreateCondBr(terminated, partialSegmentWait[i+1], segmentWait[i+1]);
146        }
147        if (partialSegmentWait[i] != nullptr) {
148            iBuilder->SetInsertPoint(partialSegmentWait[i]);
149            Value * processedSegmentCount = kernels[i]->acquireLogicalSegmentNo(instancePtrs[i]);
150            Value * cond = iBuilder->CreateICmpEQ(segNo, processedSegmentCount);
151            iBuilder->CreateCondBr(cond, partialSegmentLoopBody[i], partialSegmentWait[i]);
152           
153            iBuilder->SetInsertPoint(partialSegmentLoopBody[i]);
154            kernels[i]->createDoSegmentCall(instancePtrs[i], segmentBlocks);
155            kernels[i]->releaseLogicalSegmentNo(instancePtrs[i], iBuilder->CreateAdd(processedSegmentCount, iBuilder->getSize(1)));
156            iBuilder->CreateBr(partialSegmentWait[i+1]);
157        }
158    }
159   
160    iBuilder->SetInsertPoint(exitThreadBlock);
161    Value * nullVal = Constant::getNullValue(voidPtrTy);
162    iBuilder->CreatePThreadExitCall(nullVal);
163    iBuilder->CreateRetVoid();
164
165    return threadFunc;
166}
167
168// Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
169// operating over an input stream set S, a segment-parallel implementation divides the input
170// into segments and coordinates a set of T <= K threads to each process one segment at a time.   
171// Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
172// fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
173
174
175void generateSegmentParallelPipeline(IDISA::IDISA_Builder * iBuilder, std::vector<KernelBuilder *> kernels) {
176   
177    unsigned threadNum = codegen::ThreadNum;
178
179    Module * m = iBuilder->getModule();
180
181    Type * const size_ty = iBuilder->getSizeTy();
182    Type * const voidPtrTy = iBuilder->getVoidPtrTy();
183    Type * const int8PtrTy = iBuilder->getInt8PtrTy();
184
185    for (auto k : kernels) k->createInstance();
186
187    Type * const pthreadsTy = ArrayType::get(size_ty, threadNum);
188    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
189    std::vector<Value *> pthreadsPtrs;
190    for (unsigned i = 0; i < threadNum; i++) {
191        pthreadsPtrs.push_back(iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)}));
192    }
193    Value * nullVal = Constant::getNullValue(voidPtrTy);
194    AllocaInst * const status = iBuilder->CreateAlloca(int8PtrTy);
195
196    std::vector<Type *> structTypes;
197    for (unsigned i = 0; i < kernels.size(); i++) {
198        structTypes.push_back(kernels[i]->getInstance()->getType());
199    }
200    Type * sharedStructType = StructType::get(m->getContext(), structTypes);
201
202    AllocaInst * sharedStruct = iBuilder->CreateAlloca(sharedStructType);
203    for (unsigned i = 0; i < kernels.size(); i++) {
204        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
205        iBuilder->CreateStore(kernels[i]->getInstance(), ptr);
206    }
207
208    std::vector<Function *> thread_functions;
209    const auto ip = iBuilder->saveIP();
210    for (unsigned i = 0; i < threadNum; i++) {
211        thread_functions.push_back(generateSegmentParallelPipelineThreadFunction("thread"+std::to_string(i), iBuilder, kernels, sharedStructType, i));
212    }
213    iBuilder->restoreIP(ip);
214
215    for (unsigned i = 0; i < threadNum; i++) {
216        iBuilder->CreatePThreadCreateCall(pthreadsPtrs[i], nullVal, thread_functions[i], iBuilder->CreateBitCast(sharedStruct, int8PtrTy));
217    }
218
219    std::vector<Value *> threadIDs;
220    for (unsigned i = 0; i < threadNum; i++) { 
221        threadIDs.push_back(iBuilder->CreateLoad(pthreadsPtrs[i]));
222    }
223   
224    for (unsigned i = 0; i < threadNum; i++) { 
225        iBuilder->CreatePThreadJoinCall(threadIDs[i], status);
226    }
227
228}
229
230void generatePipelineParallel(IDISA::IDISA_Builder * iBuilder, std::vector<KernelBuilder *> kernels) {
231 
232    Type * pthreadTy = iBuilder->getSizeTy();
233    Type * const voidPtrTy = iBuilder->getVoidPtrTy();
234    Type * const int8PtrTy = iBuilder->getInt8PtrTy();
235
236    Type * const pthreadsTy = ArrayType::get(pthreadTy, kernels.size());
237
238    for (auto k : kernels) k->createInstance();
239
240    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
241    std::vector<Value *> pthreadsPtrs;
242    for (unsigned i = 0; i < kernels.size(); i++) {
243        pthreadsPtrs.push_back(iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)}));
244    }
245    Value * nullVal = Constant::getNullValue(voidPtrTy);
246    AllocaInst * const status = iBuilder->CreateAlloca(int8PtrTy);
247
248    std::vector<Function *> kernel_functions;
249    const auto ip = iBuilder->saveIP();
250    for (unsigned i = 0; i < kernels.size(); i++) {
251        kernel_functions.push_back(kernels[i]->generateThreadFunction("k_"+std::to_string(i)));
252    }
253    iBuilder->restoreIP(ip);
254
255    for (unsigned i = 0; i < kernels.size(); i++) {
256        iBuilder->CreatePThreadCreateCall(pthreadsPtrs[i], nullVal, kernel_functions[i], iBuilder->CreateBitCast(kernels[i]->getInstance(), int8PtrTy));
257    }
258
259    std::vector<Value *> threadIDs;
260    for (unsigned i = 0; i < kernels.size(); i++) { 
261        threadIDs.push_back(iBuilder->CreateLoad(pthreadsPtrs[i]));
262    }
263   
264    for (unsigned i = 0; i < kernels.size(); i++) { 
265        iBuilder->CreatePThreadJoinCall(threadIDs[i], status);
266    }
267}
268
269
270void generatePipelineLoop(IDISA::IDISA_Builder * iBuilder, std::vector<KernelBuilder *> kernels) {
271    for (auto k : kernels) k->createInstance();
272    //BufferMap bufferMap;
273    //createStreamBufferMap(bufferMap, kernels);
274   
275    BasicBlock * entryBlock = iBuilder->GetInsertBlock();
276    Function * main = entryBlock->getParent();
277
278    // Create the basic blocks. 
279    BasicBlock * segmentLoop = BasicBlock::Create(iBuilder->getContext(), "segmentLoop", main, 0);
280    BasicBlock * exitBlock = BasicBlock::Create(iBuilder->getContext(), "exitBlock", main, 0);
281    // We create vectors of loop body and final segment blocks indexed by kernel.
282    std::vector<BasicBlock *> loopBodyBlocks;
283    std::vector<BasicBlock *> finalSegmentBlocks;
284
285    loopBodyBlocks.push_back(segmentLoop); 
286    finalSegmentBlocks.push_back(nullptr); 
287   
288    for (unsigned i = 1; i < kernels.size(); i++) {
289        if (kernels[i-1]->hasNoTerminateAttribute()) {
290            // Previous kernel cannot terminate.   Continue with the previous blocks;
291            loopBodyBlocks.push_back(loopBodyBlocks.back());
292            finalSegmentBlocks.push_back(finalSegmentBlocks.back());
293        }
294        else {
295            loopBodyBlocks.push_back(BasicBlock::Create(iBuilder->getContext(), "do_" + kernels[i]->getName(), main, 0));
296            finalSegmentBlocks.push_back(BasicBlock::Create(iBuilder->getContext(), "finish_" + kernels[i]->getName(), main, 0));
297        }
298    }
299    loopBodyBlocks.push_back(segmentLoop); // If the last kernel does not terminate, loop back.
300    finalSegmentBlocks.push_back(exitBlock); // If the last kernel does terminate, we're done.
301   
302    iBuilder->CreateBr(segmentLoop);
303    Constant * segBlocks = iBuilder->getSize(codegen::SegmentSize * iBuilder->getStride() / iBuilder->getBitBlockWidth());
304    for (unsigned i = 0; i < kernels.size(); i++) {
305        iBuilder->SetInsertPoint(loopBodyBlocks[i]);
306        //Value * segBlocks = getSegmentBlocks(bufferMap, kernels[i]);
307        Value * segNo = kernels[i]->acquireLogicalSegmentNo(kernels[i]->getInstance());
308        kernels[i]->createDoSegmentCall(kernels[i]->getInstance(), segBlocks);
309        if (kernels[i]->hasNoTerminateAttribute()) {
310            kernels[i]->releaseLogicalSegmentNo(kernels[i]->getInstance(), iBuilder->CreateAdd(segNo, iBuilder->getSize(1)));
311            if (i == kernels.size() - 1) {
312                iBuilder->CreateBr(segmentLoop);
313            }
314        }
315        else {
316            Value * terminated = kernels[i]->getTerminationSignal(kernels[i]->getInstance());
317            kernels[i]->releaseLogicalSegmentNo(kernels[i]->getInstance(), iBuilder->CreateAdd(segNo, iBuilder->getSize(1)));
318            iBuilder->CreateCondBr(terminated, finalSegmentBlocks[i+1], loopBodyBlocks[i+1]);
319        }
320        if (finalSegmentBlocks[i] != nullptr) {
321            iBuilder->SetInsertPoint(finalSegmentBlocks[i]);
322            Value * segNo = kernels[i]->acquireLogicalSegmentNo(kernels[i]->getInstance());
323            kernels[i]->createDoSegmentCall(kernels[i]->getInstance(), segBlocks);
324            kernels[i]->releaseLogicalSegmentNo(kernels[i]->getInstance(), iBuilder->CreateAdd(segNo, iBuilder->getSize(1)));
325            if (finalSegmentBlocks[i] != finalSegmentBlocks[i+1]) {
326                iBuilder->CreateBr(finalSegmentBlocks[i+1]);
327            }
328        }
329    }
330    iBuilder->SetInsertPoint(exitBlock);
331}
Note: See TracBrowser for help on using the repository browser.