source: icGREP/icgrep-devel/icgrep/kernels/pipeline/pipeline_logic.hpp @ 6237

Last change on this file since 6237 was 6237, checked in by nmedfort, 4 months ago

Re-enabled segment pipeline parallelism; moved logical segment number into pipeline kernel.

File size: 8.7 KB
Line 
1#ifndef PIPELINE_LOGIC_HPP
2#define PIPELINE_LOGIC_HPP
3
4#include "pipeline_compiler.hpp"
5
6namespace kernel {
7
8/** ------------------------------------------------------------------------------------------------------------- *
9 * @brief compileSingleThread
10 ** ------------------------------------------------------------------------------------------------------------- */
11void PipelineCompiler::generateSingleThreadKernelMethod(BuilderRef b) {
12    StructType * const localStateType = getLocalStateType(b);
13    Value * const localState = allocateThreadLocalSpace(b, localStateType);
14    setThreadLocalSpace(b, localState);
15    start(b, b->getSize(0));
16    for (unsigned i = 0; i < mPipeline.size(); ++i) {
17        setActiveKernel(b, i);
18        executeKernel(b);
19    }
20    end(b, 1);
21    deallocateThreadLocalSpace(b, localState);
22}
23
24
25
26/** ------------------------------------------------------------------------------------------------------------- *
27 * @brief compileMultiThread
28 *
29 * Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
30 * operating over an input stream set S, a segment-parallel implementation divides the input
31 * into segments and coordinates a set of T <= K threads to each process one segment at a time.
32 * Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
33 * fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
34 ** ------------------------------------------------------------------------------------------------------------- */
35void PipelineCompiler::generateMultiThreadKernelMethod(BuilderRef b, const unsigned numOfThreads) {
36
37    assert (numOfThreads > 1);
38
39
40    Module * const m = b->getModule();
41    IntegerType * const sizeTy = b->getSizeTy();
42    PointerType * const voidPtrTy = b->getVoidPtrTy();
43
44    ConstantInt * const ZERO = b->getInt32(0);
45    ConstantInt * const ONE = b->getInt32(1);
46    ConstantInt * const TWO = b->getInt32(2);
47
48    Value * const handle = mPipelineKernel->getHandle(); assert (handle);
49    StructType * const localStateTy = getLocalStateType(b);
50    PointerType * const localStatePtrTy = localStateTy->getPointerTo();
51    StructType * const threadStructType = StructType::get(m->getContext(), {handle->getType(), sizeTy, localStatePtrTy});
52    FunctionType * const threadFuncType = FunctionType::get(b->getVoidTy(), {voidPtrTy}, false);
53
54    const auto threadName = mPipelineKernel->getName() + "_DoSegmentThread";
55    Function * const threadFunc = Function::Create(threadFuncType, Function::InternalLinkage, threadName, b->getModule());
56    threadFunc->setCallingConv(CallingConv::C);
57    auto args = threadFunc->arg_begin();
58    args->setName("kernelStateObject");
59
60    // store where we'll resume compiling the DoSegment method
61    const auto resumePoint = b->saveIP();
62
63    // -------------------------------------------------------------------------------------------------------------------------
64    // MAKE PIPELINE THREAD
65    // -------------------------------------------------------------------------------------------------------------------------
66    b->SetInsertPoint(BasicBlock::Create(b->getContext(), "entry", threadFunc));
67    Value * const threadStruct = b->CreateBitCast(&*(args), threadStructType->getPointerTo());
68    mPipelineKernel->setHandle(b, b->CreateLoad(b->CreateGEP(threadStruct, {ZERO, ZERO})));
69    Value * const segmentOffset = b->CreateLoad(b->CreateGEP(threadStruct, {ZERO, ONE}));
70    setThreadLocalSpace(b, b->CreateLoad(b->CreateGEP(threadStruct, {ZERO, TWO})));
71    // generate the pipeline logic for this thread
72    start(b, segmentOffset);
73    for (unsigned i = 0; i < mPipeline.size(); ++i) {
74        setActiveKernel(b, i);
75        synchronize(b);
76        executeKernel(b);
77        releaseCurrentSegment(b);
78    }
79    mKernel = nullptr;
80    mKernelIndex = 0;
81    end(b, numOfThreads);
82    // only call pthread_exit() within spawned threads; otherwise it'll be equivalent to calling exit() within the process
83    BasicBlock * const exitThread = b->CreateBasicBlock("ExitThread");
84    BasicBlock * const exitFunction = b->CreateBasicBlock("ExitProcessFunction");
85    b->CreateCondBr(b->CreateIsNull(segmentOffset), exitFunction, exitThread);
86    b->SetInsertPoint(exitThread);
87    Constant * const nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
88    b->CreatePThreadExitCall(nullVoidPtrVal);
89    b->CreateBr(exitFunction);
90    b->SetInsertPoint(exitFunction);
91    b->CreateRetVoid();
92
93    // -------------------------------------------------------------------------------------------------------------------------
94    // MAKE PIPELINE DRIVER CONTINUED
95    // -------------------------------------------------------------------------------------------------------------------------
96    b->restoreIP(resumePoint);
97    mPipelineKernel->setHandle(b, handle);
98    const unsigned threads = numOfThreads - 1;
99    Type * const pthreadsTy = ArrayType::get(sizeTy, threads);
100    AllocaInst * const pthreads = b->CreateAlloca(pthreadsTy);
101    Value * threadIdPtr[threads];
102    for (unsigned i = 0; i < threads; ++i) {
103        threadIdPtr[i] = b->CreateGEP(pthreads, {ZERO, b->getInt32(i)});
104    }
105
106    // use the process thread to handle the initial segment function after spawning
107    // (n - 1) threads to handle the subsequent offsets
108    std::vector<Value *> localState(numOfThreads);
109    for (unsigned i = 0; i < threads; ++i) {
110        AllocaInst * const threadState = b->CreateAlloca(threadStructType);
111        b->CreateStore(handle, b->CreateGEP(threadState, {ZERO, ZERO}));
112        b->CreateStore(b->getSize(i + 1), b->CreateGEP(threadState, {ZERO, ONE}));
113        localState[i] = allocateThreadLocalSpace(b, localStateTy);
114        b->CreateStore(localState[i], b->CreateGEP(threadState, {ZERO, TWO}));
115        b->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, threadFunc, threadState);
116    }
117
118    AllocaInst * const threadState = b->CreateAlloca(threadStructType);
119    b->CreateStore(handle, b->CreateGEP(threadState, {ZERO, ZERO}));
120    b->CreateStore(b->getSize(0), b->CreateGEP(threadState, {ZERO, ONE}));
121    b->CreateCall(threadFunc, b->CreatePointerCast(threadState, voidPtrTy));
122
123    AllocaInst * const status = b->CreateAlloca(voidPtrTy);
124    for (unsigned i = 0; i < threads; ++i) {
125        Value * threadId = b->CreateLoad(threadIdPtr[i]);
126        b->CreatePThreadJoinCall(threadId, status);
127        deallocateThreadLocalSpace(b, localState[i]);
128    }
129
130}
131
132enum : int {
133    POP_COUNT_STRUCT_INDEX = 0
134};
135
136/** ------------------------------------------------------------------------------------------------------------- *
137 * @brief getLocalStateType
138 ** ------------------------------------------------------------------------------------------------------------- */
139inline StructType * PipelineCompiler::getLocalStateType(BuilderRef b) {
140    StructType * const popCountTy = getPopCountThreadLocalStateType(b);
141    return StructType::get(popCountTy, nullptr);
142}
143
144/** ------------------------------------------------------------------------------------------------------------- *
145 * @brief allocateThreadLocalSpace
146 ** ------------------------------------------------------------------------------------------------------------- */
147inline Value * PipelineCompiler::allocateThreadLocalSpace(BuilderRef b, StructType * localStateType) {
148    Value * const localState = b->CreateCacheAlignedAlloca(localStateType);
149    Constant * const ZERO = b->getInt32(0);
150    Constant * const POP_COUNT_STRUCT = b->getInt32(POP_COUNT_STRUCT_INDEX);
151    allocatePopCountArrays(b, b->CreateGEP(localState, {ZERO, POP_COUNT_STRUCT}));
152    return localState;
153}
154
155/** ------------------------------------------------------------------------------------------------------------- *
156 * @brief setThreadLocalSpace
157 ** ------------------------------------------------------------------------------------------------------------- */
158inline void PipelineCompiler::setThreadLocalSpace(BuilderRef b, Value * const localState) {
159    Constant * const ZERO = b->getInt32(0);
160    Constant * const POP_COUNT_STRUCT = b->getInt32(POP_COUNT_STRUCT_INDEX);
161    mPopCountState = b->CreateGEP(localState, {ZERO, POP_COUNT_STRUCT});
162}
163
164/** ------------------------------------------------------------------------------------------------------------- *
165 * @brief deallocateThreadLocalSpace
166 ** ------------------------------------------------------------------------------------------------------------- */
167inline void PipelineCompiler::deallocateThreadLocalSpace(BuilderRef b, Value * const localState) {
168    Constant * const ZERO = b->getInt32(0);
169    Constant * const POP_COUNT_STRUCT = b->getInt32(POP_COUNT_STRUCT_INDEX);
170    deallocatePopCountArrays(b, b->CreateGEP(localState, {ZERO, POP_COUNT_STRUCT}));
171}
172
173}
174
175#endif // PIPELINE_LOGIC_HPP
Note: See TracBrowser for help on using the repository browser.