source: icGREP/icgrep-devel/icgrep/kernels/pipeline.cpp @ 5413

Last change on this file since 5413 was 5411, checked in by nmedfort, 2 years ago

Potential bug fix for 32-bit. Modified MRemap to check for Linux OS support. Added MMapAdvise to CBuilder.

File size: 20.5 KB
Line 
1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
6#include "pipeline.h"
7#include <kernels/toolchain.h>
8#include <kernels/kernel.h>
9#include <kernels/streamset.h>
10#include <llvm/IR/Module.h>
11#include <boost/container/flat_set.hpp>
12#include <boost/container/flat_map.hpp>
13
14using namespace kernel;
15using namespace parabix;
16using namespace llvm;
17
18template <typename Value>
19using StreamSetBufferMap = boost::container::flat_map<const StreamSetBuffer *, Value>;
20
21template <typename Value>
22using FlatSet = boost::container::flat_set<Value>;
23
24Function * makeThreadFunction(IDISA::IDISA_Builder * const b, const std::string & name) {
25    Function * const f = Function::Create(FunctionType::get(b->getVoidTy(), {b->getVoidPtrTy()}, false), Function::InternalLinkage, name, b->getModule());
26    f->setCallingConv(CallingConv::C);
27    f->arg_begin()->setName("input");
28    return f;
29}
30
31/** ------------------------------------------------------------------------------------------------------------- *
32 * @brief generateSegmentParallelPipeline
33 *
34 * Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
35 * operating over an input stream set S, a segment-parallel implementation divides the input
36 * into segments and coordinates a set of T <= K threads to each process one segment at a time.
37 * Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
38 * fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
39 ** ------------------------------------------------------------------------------------------------------------- */
40void generateSegmentParallelPipeline(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
41
42    const unsigned n = kernels.size();
43    Module * const m = iBuilder->getModule();
44    IntegerType * const sizeTy = iBuilder->getSizeTy();
45    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
46    const unsigned threads = codegen::ThreadNum;
47    Constant * nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
48
49    std::vector<Type *> structTypes;
50
51    Value * instance[n];
52    for (unsigned i = 0; i < n; ++i) {
53        instance[i] = kernels[i]->getInstance();
54        structTypes.push_back(instance[i]->getType());
55    }
56    StructType * const sharedStructType = StructType::get(m->getContext(), structTypes);
57    StructType * const threadStructType = StructType::get(sharedStructType->getPointerTo(), sizeTy, nullptr);
58
59    Function * const threadFunc = makeThreadFunction(iBuilder, "segment");
60
61    // -------------------------------------------------------------------------------------------------------------------------
62    // MAKE SEGMENT PARALLEL PIPELINE THREAD
63    // -------------------------------------------------------------------------------------------------------------------------
64    const auto ip = iBuilder->saveIP();
65
66     // Create the basic blocks for the thread function.
67    BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc);
68    iBuilder->SetInsertPoint(entryBlock);
69    Value * const input = &threadFunc->getArgumentList().front();
70    Value * const threadStruct = iBuilder->CreatePointerCast(input, threadStructType->getPointerTo());
71    Value * const sharedStatePtr = iBuilder->CreateLoad(iBuilder->CreateGEP(threadStruct, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
72    for (unsigned k = 0; k < n; ++k) {
73        Value * ptr = iBuilder->CreateLoad(iBuilder->CreateGEP(sharedStatePtr, {iBuilder->getInt32(0), iBuilder->getInt32(k)}));
74        kernels[k]->setInstance(ptr);
75    }
76    Value * const segOffset = iBuilder->CreateLoad(iBuilder->CreateGEP(threadStruct, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
77
78    BasicBlock * segmentLoop = BasicBlock::Create(iBuilder->getContext(), "segmentLoop", threadFunc);
79    iBuilder->CreateBr(segmentLoop);
80
81    iBuilder->SetInsertPoint(segmentLoop);
82    PHINode * const segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 2, "segNo");
83    segNo->addIncoming(segOffset, entryBlock);
84
85    Value * doFinal = iBuilder->getFalse();
86    Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
87
88    BasicBlock * segmentLoopBody = nullptr;
89    BasicBlock * const exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
90
91    StreamSetBufferMap<Value *> producedPos;
92
93    for (unsigned k = 0; k < n; ++k) {
94
95        const auto & kernel = kernels[k];
96
97        BasicBlock * const segmentWait = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Wait", threadFunc);
98        iBuilder->CreateBr(segmentWait);
99
100        segmentLoopBody = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Do", threadFunc);
101
102        iBuilder->SetInsertPoint(segmentWait);
103        const unsigned waitIdx = codegen::DebugOptionIsSet(codegen::SerializeThreads) ? (n - 1) : k;
104        Value * const processedSegmentCount = kernels[waitIdx]->acquireLogicalSegmentNo();
105        assert (processedSegmentCount->getType() == segNo->getType());
106        Value * const ready = iBuilder->CreateICmpEQ(segNo, processedSegmentCount);
107
108        if (kernel->hasNoTerminateAttribute()) {
109            iBuilder->CreateCondBr(ready, segmentLoopBody, segmentWait);
110        } else { // If the kernel was terminated in a previous segment then the pipeline is done.
111            BasicBlock * completionTest = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Completed", threadFunc, 0);
112            BasicBlock * exitBlock = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Exit", threadFunc, 0);
113            iBuilder->CreateCondBr(ready, completionTest, segmentWait);
114
115            iBuilder->SetInsertPoint(completionTest);
116            iBuilder->CreateCondBr(kernel->getTerminationSignal(), exitBlock, segmentLoopBody);
117            iBuilder->SetInsertPoint(exitBlock);
118            // Ensure that the next thread will also exit.
119            kernel->releaseLogicalSegmentNo(nextSegNo);
120            iBuilder->CreateBr(exitThreadBlock);
121        }
122
123        // Execute the kernel segment
124        iBuilder->SetInsertPoint(segmentLoopBody);
125        const auto & inputs = kernel->getStreamInputs();
126        std::vector<Value *> args = {kernel->getInstance(), doFinal};
127        for (unsigned i = 0; i < inputs.size(); ++i) {
128            const auto f = producedPos.find(kernel->getStreamSetInputBuffer(i));
129            if (LLVM_UNLIKELY(f == producedPos.end())) {
130                report_fatal_error(kernel->getName() + " uses stream set " + inputs[i].name + " prior to its definition");
131            }
132            args.push_back(f->second);
133        }
134
135        kernel->createDoSegmentCall(args);
136        if (!kernel->hasNoTerminateAttribute()) {
137            doFinal = iBuilder->CreateOr(doFinal, kernel->getTerminationSignal());
138        }
139
140        const auto & outputs = kernel->getStreamOutputs();
141        for (unsigned i = 0; i < outputs.size(); ++i) {
142            Value * const produced = kernel->getProducedItemCount(outputs[i].name, doFinal);
143            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
144            assert (producedPos.count(buf) == 0);
145            producedPos.emplace(buf, produced);
146        }
147
148        kernel->releaseLogicalSegmentNo(nextSegNo);
149    }
150
151    assert (segmentLoopBody);
152    exitThreadBlock->moveAfter(segmentLoopBody);
153    segNo->addIncoming(iBuilder->CreateAdd(segNo, iBuilder->getSize(threads)), segmentLoopBody);
154    iBuilder->CreateCondBr(doFinal, exitThreadBlock, segmentLoop);
155
156    iBuilder->SetInsertPoint(exitThreadBlock);
157    iBuilder->CreatePThreadExitCall(nullVoidPtrVal);
158    iBuilder->CreateRetVoid();
159
160    // -------------------------------------------------------------------------------------------------------------------------
161    iBuilder->restoreIP(ip);
162
163    for (unsigned i = 0; i < n; ++i) {
164        kernels[i]->setInstance(instance[i]);
165    }
166
167    // -------------------------------------------------------------------------------------------------------------------------
168    // MAKE SEGMENT PARALLEL PIPELINE DRIVER
169    // -------------------------------------------------------------------------------------------------------------------------
170    Type * const pthreadsTy = ArrayType::get(sizeTy, threads);
171    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
172    Value * threadIdPtr[threads];
173
174    for (unsigned i = 0; i < threads; ++i) {
175        threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
176    }
177
178    for (unsigned i = 0; i < n; ++i) {
179        kernels[i]->releaseLogicalSegmentNo(iBuilder->getSize(0));
180    }
181
182    AllocaInst * const sharedStruct = iBuilder->CreateCacheAlignedAlloca(sharedStructType);
183    for (unsigned i = 0; i < n; ++i) {
184        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
185        iBuilder->CreateStore(kernels[i]->getInstance(), ptr);
186    }
187
188    for (unsigned i = 0; i < threads; ++i) {
189        AllocaInst * threadState = iBuilder->CreateAlloca(threadStructType);
190        Value * const sharedStatePtr = iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(0)});
191        iBuilder->CreateStore(sharedStruct, sharedStatePtr);
192        Value * const segmentOffsetPtr = iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(1)});
193        iBuilder->CreateStore(iBuilder->getSize(i), segmentOffsetPtr);
194        iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, threadFunc, threadState);
195    }
196
197    AllocaInst * const status = iBuilder->CreateAlloca(voidPtrTy);
198    for (unsigned i = 0; i < threads; ++i) {
199        Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]);
200        iBuilder->CreatePThreadJoinCall(threadId, status);
201    }
202}
203
204
205/** ------------------------------------------------------------------------------------------------------------- *
206 * @brief generateParallelPipeline
207 ** ------------------------------------------------------------------------------------------------------------- */
208void generateParallelPipeline(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> &kernels) {
209
210    Module * const m = iBuilder->getModule();
211    IntegerType * const sizeTy = iBuilder->getSizeTy();
212    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
213    ConstantInt * bufferSegments = ConstantInt::get(sizeTy, codegen::BufferSegments - 1);
214    ConstantInt * segmentItems = ConstantInt::get(sizeTy, codegen::SegmentSize * iBuilder->getBitBlockWidth());
215    Constant * const nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
216
217    const unsigned n = kernels.size();
218
219    Type * const pthreadsTy = ArrayType::get(sizeTy, n);
220    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
221    Value * threadIdPtr[n];
222    for (unsigned i = 0; i < n; ++i) {
223        threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
224    }
225
226    Value * instance[n];
227    Type * structTypes[n];
228    for (unsigned i = 0; i < n; ++i) {
229        instance[i] = kernels[i]->getInstance();
230        structTypes[i] = instance[i]->getType();
231    }
232
233    Type * const sharedStructType = StructType::get(m->getContext(), ArrayRef<Type *>{structTypes, n});
234
235
236    AllocaInst * sharedStruct = iBuilder->CreateAlloca(sharedStructType);
237    for (unsigned i = 0; i < n; ++i) {
238        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
239        iBuilder->CreateStore(instance[i], ptr);
240    }
241
242    for (auto & kernel : kernels) {
243        kernel->releaseLogicalSegmentNo(iBuilder->getSize(0));
244    }
245
246    // GENERATE THE PRODUCING AND CONSUMING KERNEL MAPS
247    StreamSetBufferMap<unsigned> producingKernel;
248    StreamSetBufferMap<std::vector<unsigned>> consumingKernels;
249    for (unsigned id = 0; id < n; ++id) {
250        const auto & kernel = kernels[id];
251        const auto & inputs = kernel->getStreamInputs();
252        const auto & outputs = kernel->getStreamOutputs();
253        // add any outputs from this kernel to the producing kernel map
254        for (unsigned j = 0; j < outputs.size(); ++j) {
255            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(j);
256            if (LLVM_UNLIKELY(producingKernel.count(buf) != 0)) {
257                report_fatal_error(kernel->getName() + " redefines stream set " + outputs[j].name);
258            }
259            producingKernel.emplace(buf, id);
260        }
261        // and any inputs to the consuming kernels list
262        for (unsigned j = 0; j < inputs.size(); ++j) {
263            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(j);
264            auto f = consumingKernels.find(buf);
265            if (f == consumingKernels.end()) {
266                if (LLVM_UNLIKELY(producingKernel.count(buf) == 0)) {
267                    report_fatal_error(kernel->getName() + " uses stream set " + inputs[j].name + " prior to its definition");
268                }
269                consumingKernels.emplace(buf, std::vector<unsigned>{ id });
270            } else {
271                f->second.push_back(id);
272            }
273        }
274    }
275
276    const auto ip = iBuilder->saveIP();
277
278    // GENERATE UNIQUE PIPELINE PARALLEL THREAD FUNCTION FOR EACH KERNEL
279    FlatSet<unsigned> kernelSet;
280    kernelSet.reserve(n);
281
282    Function * thread_functions[n];
283    Value * producerSegNo[n];
284    for (unsigned id = 0; id < n; id++) {
285        const auto & kernel = kernels[id];
286        const auto & inputs = kernel->getStreamInputs();
287
288        Function * const threadFunc = makeThreadFunction(iBuilder, "ppt:" + kernel->getName());
289
290         // Create the basic blocks for the thread function.
291        BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc);
292        BasicBlock * outputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "outputCheck", threadFunc);
293        BasicBlock * inputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "inputCheck", threadFunc);
294        BasicBlock * doSegmentBlock = BasicBlock::Create(iBuilder->getContext(), "doSegment", threadFunc);
295        BasicBlock * exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
296
297        iBuilder->SetInsertPoint(entryBlock);
298
299        Value * sharedStruct = iBuilder->CreateBitCast(&threadFunc->getArgumentList().front(), sharedStructType->getPointerTo());
300
301        for (unsigned k = 0; k < n; k++) {
302            Value * const ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
303            kernels[k]->setInstance(iBuilder->CreateLoad(ptr));
304        }
305
306        iBuilder->CreateBr(outputCheckBlock);
307
308        // Check whether the output buffers are ready for more data
309        iBuilder->SetInsertPoint(outputCheckBlock);
310        PHINode * segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 3, "segNo");
311        segNo->addIncoming(iBuilder->getSize(0), entryBlock);
312        segNo->addIncoming(segNo, outputCheckBlock);
313
314        Value * outputWaitCond = iBuilder->getTrue();
315        for (const StreamSetBuffer * buf : kernel->getStreamSetOutputBuffers()) {
316            const auto & list = consumingKernels[buf];
317            assert(std::is_sorted(list.begin(), list.end()));
318            kernelSet.insert(list.begin(), list.end());
319        }
320        for (unsigned k : kernelSet) {
321            Value * consumerSegNo = kernels[k]->acquireLogicalSegmentNo();
322            assert (consumerSegNo->getType() == segNo->getType());
323            Value * consumedSegNo = iBuilder->CreateAdd(consumerSegNo, bufferSegments);
324            outputWaitCond = iBuilder->CreateAnd(outputWaitCond, iBuilder->CreateICmpULE(segNo, consumedSegNo));
325        }
326        kernelSet.clear();
327        iBuilder->CreateCondBr(outputWaitCond, inputCheckBlock, outputCheckBlock);
328
329        // Check whether the input buffers have enough data for this kernel to begin
330        iBuilder->SetInsertPoint(inputCheckBlock);
331        for (const StreamSetBuffer * buf : kernel->getStreamSetInputBuffers()) {
332            kernelSet.insert(producingKernel[buf]);
333        }
334
335        Value * inputWaitCond = iBuilder->getTrue();
336        for (unsigned k : kernelSet) {
337            producerSegNo[k] = kernels[k]->acquireLogicalSegmentNo();
338            assert (producerSegNo[k]->getType() == segNo->getType());
339            inputWaitCond = iBuilder->CreateAnd(inputWaitCond, iBuilder->CreateICmpULT(segNo, producerSegNo[k]));
340        }
341        iBuilder->CreateCondBr(inputWaitCond, doSegmentBlock, inputCheckBlock);
342
343        // Process the segment
344        iBuilder->SetInsertPoint(doSegmentBlock);
345
346        Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
347        Value * terminated = nullptr;
348        if (kernelSet.empty()) {
349            // if this kernel has no input streams, the kernel itself must decide when it terminates.
350            terminated = kernel->getTerminationSignal();
351        } else {
352            // ... otherwise the kernel terminates only when it exhausts all of its input streams
353            terminated = iBuilder->getTrue();
354            for (unsigned k : kernelSet) {
355                terminated = iBuilder->CreateAnd(terminated, kernels[k]->getTerminationSignal());
356                terminated = iBuilder->CreateAnd(terminated, iBuilder->CreateICmpEQ(nextSegNo, producerSegNo[k]));
357            }
358            kernelSet.clear();
359        }
360
361        std::vector<Value *> args = {kernel->getInstance(), terminated};
362        args.insert(args.end(), inputs.size(), iBuilder->CreateMul(segmentItems, segNo));
363
364        kernel->createDoSegmentCall(args);
365        segNo->addIncoming(nextSegNo, doSegmentBlock);
366        kernel->releaseLogicalSegmentNo(nextSegNo);
367
368        iBuilder->CreateCondBr(terminated, exitThreadBlock, outputCheckBlock);
369
370        iBuilder->SetInsertPoint(exitThreadBlock);
371        iBuilder->CreatePThreadExitCall(nullVoidPtrVal);
372        iBuilder->CreateRetVoid();
373
374        thread_functions[id] = threadFunc;
375    }
376
377    iBuilder->restoreIP(ip);
378
379    for (unsigned i = 0; i < n; ++i) {
380        kernels[i]->setInstance(instance[i]);
381    }
382
383    for (unsigned i = 0; i < n; ++i) {
384        iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, thread_functions[i], sharedStruct);
385    }
386
387    AllocaInst * const status = iBuilder->CreateAlloca(voidPtrTy);
388    for (unsigned i = 0; i < n; ++i) {
389        Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]);
390        iBuilder->CreatePThreadJoinCall(threadId, status);
391    }
392
393}
394
395/** ------------------------------------------------------------------------------------------------------------- *
396 * @brief generatePipelineLoop
397 ** ------------------------------------------------------------------------------------------------------------- */
398void generatePipelineLoop(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
399
400    BasicBlock * entryBlock = iBuilder->GetInsertBlock();
401    Function * main = entryBlock->getParent();
402
403    // Create the basic blocks for the loop.
404    BasicBlock * pipelineLoop = BasicBlock::Create(iBuilder->getContext(), "pipelineLoop", main);
405    BasicBlock * pipelineExit = BasicBlock::Create(iBuilder->getContext(), "pipelineExit", main);
406
407    StreamSetBufferMap<Value *> producedPos;
408
409    iBuilder->CreateBr(pipelineLoop);
410    iBuilder->SetInsertPoint(pipelineLoop);
411
412    Value * terminated = iBuilder->getFalse();
413    for (auto & kernel : kernels) {
414        const auto & inputs = kernel->getStreamInputs();
415        std::vector<Value *> args = {kernel->getInstance(), terminated};
416        for (unsigned i = 0; i < inputs.size(); ++i) {
417            const auto f = producedPos.find(kernel->getStreamSetInputBuffer(i));
418            if (LLVM_UNLIKELY(f == producedPos.end())) {
419                report_fatal_error(kernel->getName() + " uses stream set " + inputs[i].name + " prior to its definition");
420            }
421            args.push_back(f->second);
422        }
423        Value * const segNo = kernel->acquireLogicalSegmentNo();
424        kernel->createDoSegmentCall(args);
425        if (!kernel->hasNoTerminateAttribute()) {
426            terminated = iBuilder->CreateOr(terminated, kernel->getTerminationSignal());
427        }
428        const auto & outputs = kernel->getStreamOutputs();
429        for (unsigned i = 0; i < outputs.size(); ++i) {
430            Value * const produced = kernel->getProducedItemCount(outputs[i].name, terminated);
431            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
432            assert (producedPos.count(buf) == 0);
433            producedPos.emplace(buf, produced);
434        }
435
436        kernel->releaseLogicalSegmentNo(iBuilder->CreateAdd(segNo, iBuilder->getSize(1)));
437    }
438
439    iBuilder->CreateCondBr(terminated, pipelineExit, pipelineLoop);
440    iBuilder->SetInsertPoint(pipelineExit);
441}
Note: See TracBrowser for help on using the repository browser.