Changeset 6240


Ignore:
Timestamp:
Dec 16, 2018, 2:45:14 PM (5 weeks ago)
Author:
nmedfort
Message:

Bug fix for multithreaded editd

Location:
icGREP/icgrep-devel/icgrep/kernels/pipeline
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • icGREP/icgrep-devel/icgrep/kernels/pipeline/pipeline_compiler.hpp

    r6237 r6240  
    175175protected:
    176176
     177// internal pipeline state construction functions
     178
     179    void addInternalKernelProperties(BuilderRef b, const unsigned kernelIndex);
     180
    177181// main pipeline functions
    178 
    179     void addInternalKernelProperties(BuilderRef b, const unsigned kernelIndex);
    180182
    181183    void start(BuilderRef b, Value * const initialSegNo);
     
    185187    void end(BuilderRef b, const unsigned step);
    186188
    187     StructType * getLocalStateType(BuilderRef b);
    188     Value * allocateThreadLocalSpace(BuilderRef b, StructType * localStateType);
    189     void setThreadLocalSpace(BuilderRef b, Value * const localState);
    190     void deallocateThreadLocalSpace(BuilderRef b, Value * const localState);
     189// internal pipeline functions
     190
     191    LLVM_READNONE StructType * getThreadStateType(BuilderRef b);
     192    AllocaInst * allocateThreadState(BuilderRef b, const unsigned segOffset);
     193    Value * setThreadState(BuilderRef b, Value * threadState);
     194    void deallocateThreadState(BuilderRef b, Value * const threadState);
     195
     196    LLVM_READNONE StructType * getLocalStateType(BuilderRef b);
     197    Value * allocateThreadLocalState(BuilderRef b, StructType * localStateType);
     198    void setThreadLocalState(BuilderRef b, Value * const localState);
     199    void deallocateThreadLocalState(BuilderRef b, Value * const localState);
    191200
    192201// inter-kernel functions
     
    408417    std::vector<Value *>                        mAccessibleInputItems;
    409418    std::vector<PHINode *>                      mLinearInputItemsPhi;
    410     std::vector<Value *>                        mFullyProcessedItemCount;
    411 
    412     std::vector<Value *>                        mInitiallyProducedItemCount; // entering the kernel
     419    std::vector<Value *>                        mFullyProcessedItemCount; // exiting the kernel
     420
     421    std::vector<Value *>                        mInitiallyProducedItemCount; // entering the *kernel*
    413422    std::vector<Value *>                        mAlreadyProducedItemCount; // entering the stride
    414423    std::vector<Value *>                        mOutputStrideLength;
  • icGREP/icgrep-devel/icgrep/kernels/pipeline/pipeline_logic.hpp

    r6238 r6240  
    77
    88/** ------------------------------------------------------------------------------------------------------------- *
    9  * @brief compileSingleThread
     9 * @brief generateSingleThreadKernelMethod
    1010 ** ------------------------------------------------------------------------------------------------------------- */
    1111void PipelineCompiler::generateSingleThreadKernelMethod(BuilderRef b) {
    1212    StructType * const localStateType = getLocalStateType(b);
    13     Value * const localState = allocateThreadLocalSpace(b, localStateType);
    14     setThreadLocalSpace(b, localState);
     13    Value * const localState = allocateThreadLocalState(b, localStateType);
     14    setThreadLocalState(b, localState);
    1515    start(b, b->getSize(0));
    1616    for (unsigned i = 0; i < mPipeline.size(); ++i) {
     
    1919    }
    2020    end(b, 1);
    21     deallocateThreadLocalSpace(b, localState);
    22 }
    23 
    24 
    25 
    26 /** ------------------------------------------------------------------------------------------------------------- *
    27  * @brief compileMultiThread
     21    deallocateThreadLocalState(b, localState);
     22}
     23
     24/** ------------------------------------------------------------------------------------------------------------- *
     25 * @brief generateMultiThreadKernelMethod
    2826 *
    2927 * Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
     
    3735    assert (numOfThreads > 1);
    3836
    39 
    4037    Module * const m = b->getModule();
    4138    IntegerType * const sizeTy = b->getSizeTy();
    4239    PointerType * const voidPtrTy = b->getVoidPtrTy();
    43 
    4440    ConstantInt * const ZERO = b->getInt32(0);
    45     ConstantInt * const ONE = b->getInt32(1);
    46     ConstantInt * const TWO = b->getInt32(2);
    47 
    48     Value * const handle = mPipelineKernel->getHandle(); assert (handle);
    49     StructType * const localStateTy = getLocalStateType(b);
    50     PointerType * const localStatePtrTy = localStateTy->getPointerTo();
    51     StructType * const threadStructType = StructType::get(m->getContext(), {handle->getType(), sizeTy, localStatePtrTy});
     41    Constant * const nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
     42
    5243    FunctionType * const threadFuncType = FunctionType::get(b->getVoidTy(), {voidPtrTy}, false);
    53 
    5444    const auto threadName = mPipelineKernel->getName() + "_DoSegmentThread";
    55     Function * const threadFunc = Function::Create(threadFuncType, Function::InternalLinkage, threadName, b->getModule());
     45    Function * const threadFunc = Function::Create(threadFuncType, Function::InternalLinkage, threadName, m);
    5646    threadFunc->setCallingConv(CallingConv::C);
    5747    auto args = threadFunc->arg_begin();
    5848    args->setName("kernelStateObject");
    5949
     50    // -------------------------------------------------------------------------------------------------------------------------
     51    // MAKE PIPELINE DRIVER CONTINUED
     52    // -------------------------------------------------------------------------------------------------------------------------
     53
     54    // use the process thread to handle the initial segment function after spawning
     55    // (n - 1) threads to handle the subsequent offsets
     56    const unsigned threads = numOfThreads - 1;
     57    Type * const pthreadsTy = ArrayType::get(sizeTy, threads);
     58    AllocaInst * const pthreads = b->CreateAlloca(pthreadsTy);
     59    std::vector<Value *> threadIdPtr(threads);
     60    std::vector<Value *> threadState(threads);
     61    for (unsigned i = 0; i < threads; ++i) {
     62        threadState[i] = allocateThreadState(b, i + 1);
     63        threadIdPtr[i] = b->CreateGEP(pthreads, {ZERO, b->getInt32(i)});
     64        b->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, threadFunc, threadState[i]);
     65    }
     66
     67    // execute the process thread
     68    Value * const processState = allocateThreadState(b, 0);
     69    b->CreateCall(threadFunc, b->CreatePointerCast(processState, voidPtrTy));
     70    deallocateThreadLocalState(b, processState);
     71
     72    // wait for all other threads to complete
     73    AllocaInst * const status = b->CreateAlloca(voidPtrTy);
     74    for (unsigned i = 0; i < threads; ++i) {
     75        Value * threadId = b->CreateLoad(threadIdPtr[i]);
     76        b->CreatePThreadJoinCall(threadId, status);
     77        deallocateThreadLocalState(b, threadState[i]);
     78    }
     79
    6080    // store where we'll resume compiling the DoSegment method
    6181    const auto resumePoint = b->saveIP();
     
    6484    // MAKE PIPELINE THREAD
    6585    // -------------------------------------------------------------------------------------------------------------------------
    66     b->SetInsertPoint(BasicBlock::Create(b->getContext(), "entry", threadFunc));
    67     Value * const threadStruct = b->CreateBitCast(&*(args), threadStructType->getPointerTo());
    68     mPipelineKernel->setHandle(b, b->CreateLoad(b->CreateGEP(threadStruct, {ZERO, ZERO})));
    69     Value * const segmentOffset = b->CreateLoad(b->CreateGEP(threadStruct, {ZERO, ONE}));
    70     setThreadLocalSpace(b, b->CreateLoad(b->CreateGEP(threadStruct, {ZERO, TWO})));
     86    b->SetInsertPoint(BasicBlock::Create(m->getContext(), "entry", threadFunc));
     87    Value * const threadStruct = b->CreateBitCast(&*(args), getThreadStateType(b)->getPointerTo());
     88    Value * const segmentOffset = setThreadState(b, threadStruct);
    7189    // generate the pipeline logic for this thread
    7290    start(b, segmentOffset);
     
    85103    b->CreateCondBr(b->CreateIsNull(segmentOffset), exitFunction, exitThread);
    86104    b->SetInsertPoint(exitThread);
    87     Constant * const nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
    88105    b->CreatePThreadExitCall(nullVoidPtrVal);
    89106    b->CreateBr(exitFunction);
     
    91108    b->CreateRetVoid();
    92109
    93     // -------------------------------------------------------------------------------------------------------------------------
    94     // MAKE PIPELINE DRIVER CONTINUED
    95     // -------------------------------------------------------------------------------------------------------------------------
     110    // Restore our position to allow the pipeline kernel to complete the function
    96111    b->restoreIP(resumePoint);
     112    setThreadState(b, processState);
     113
     114}
     115
     116/** ------------------------------------------------------------------------------------------------------------- *
     117 * @brief getThreadStateType
     118 ** ------------------------------------------------------------------------------------------------------------- */
     119inline StructType * PipelineCompiler::getThreadStateType(BuilderRef b) {
     120
     121    StructType * const localStateTy = getLocalStateType(b);
     122    std::vector<Type *> threadStructFields;
     123    threadStructFields.push_back(mPipelineKernel->getHandle()->getType());
     124    threadStructFields.push_back(b->getSizeTy());
     125    threadStructFields.push_back(localStateTy->getPointerTo());
     126    const auto numOfInputs = mPipelineKernel->getNumOfStreamInputs();
     127    for (unsigned i = 0; i < numOfInputs; ++i) {
     128        auto buffer = mPipelineKernel->getInputStreamSetBuffer(i);
     129        Value * const handle = buffer->getHandle();
     130        threadStructFields.push_back(handle->getType());
     131    }
     132    const auto numOfOutputs = mPipelineKernel->getNumOfStreamOutputs();
     133    for (unsigned i = 0; i < numOfOutputs; ++i) {
     134        auto buffer = mPipelineKernel->getOutputStreamSetBuffer(i);
     135        Value * const handle = buffer->getHandle();
     136        threadStructFields.push_back(handle->getType());
     137    }
     138    return StructType::get(b->getContext(), threadStructFields);
     139
     140}
     141
     142/** ------------------------------------------------------------------------------------------------------------- *
     143 * @brief constructThreadState
     144 ** ------------------------------------------------------------------------------------------------------------- */
     145inline AllocaInst * PipelineCompiler::allocateThreadState(BuilderRef b, const unsigned segOffset) {
     146
     147    Constant * const ZERO = b->getInt32(0);
     148    Constant * const HANDLE = ZERO;
     149    Constant * const SEG_OFFSET = b->getInt32(1);
     150    Constant * const LOCAL_STATE = b->getInt32(2);
     151
     152    StructType * const threadStructType = getThreadStateType(b);
     153    AllocaInst * const threadState = b->CreateAlloca(threadStructType);
     154    b->CreateStore(mPipelineKernel->getHandle(), b->CreateGEP(threadState, {ZERO, HANDLE}));
     155    b->CreateStore(b->getSize(segOffset), b->CreateGEP(threadState, {ZERO, SEG_OFFSET}));
     156    StructType * const localStateTy = getLocalStateType(b);
     157    Value * const localState = allocateThreadLocalState(b, localStateTy);
     158    b->CreateStore(localState, b->CreateGEP(threadState, {ZERO, LOCAL_STATE}));
     159    const auto numOfInputs = mPipelineKernel->getNumOfStreamInputs();
     160    for (unsigned i = 0; i < numOfInputs; ++i) {
     161        auto buffer = mPipelineKernel->getInputStreamSetBuffer(i);
     162        Value * const handle = buffer->getHandle();
     163        b->CreateStore(handle, b->CreateGEP(threadState, {ZERO, b->getInt32(i + 3)}));
     164    }
     165    const auto numOfOutputs = mPipelineKernel->getNumOfStreamOutputs();
     166    for (unsigned i = 0; i < numOfOutputs; ++i) {
     167        auto buffer = mPipelineKernel->getOutputStreamSetBuffer(i);
     168        Value * const handle = buffer->getHandle();
     169        b->CreateStore(handle, b->CreateGEP(threadState, {ZERO, b->getInt32(i + numOfInputs + 3)}));
     170    }
     171
     172    return threadState;
     173}
     174
     175/** ------------------------------------------------------------------------------------------------------------- *
     176 * @brief constructThreadState
     177 ** ------------------------------------------------------------------------------------------------------------- */
     178inline Value * PipelineCompiler::setThreadState(BuilderRef b, Value * threadState) {
     179
     180    Constant * const ZERO = b->getInt32(0);
     181    Constant * const HANDLE = ZERO;
     182    Constant * const SEG_OFFSET = b->getInt32(1);
     183    Constant * const LOCAL_STATE = b->getInt32(2);
     184
     185    Value * handle = b->CreateLoad(b->CreateGEP(threadState, {ZERO, HANDLE}));
     186
    97187    mPipelineKernel->setHandle(b, handle);
    98 
    99     // use the process thread to handle the initial segment function after spawning
    100     // (n - 1) threads to handle the subsequent offsets
    101     const unsigned threads = numOfThreads - 1;
    102     Type * const pthreadsTy = ArrayType::get(sizeTy, threads);
    103     AllocaInst * const pthreads = b->CreateAlloca(pthreadsTy);
    104     std::vector<Value *> threadIdPtr(threads);
    105     std::vector<Value *> localState(threads);
    106     for (unsigned i = 0; i < threads; ++i) {
    107         AllocaInst * const threadState = b->CreateAlloca(threadStructType);
    108         b->CreateStore(handle, b->CreateGEP(threadState, {ZERO, ZERO}));
    109         b->CreateStore(b->getSize(i + 1), b->CreateGEP(threadState, {ZERO, ONE}));
    110         localState[i] = allocateThreadLocalSpace(b, localStateTy);
    111         b->CreateStore(localState[i], b->CreateGEP(threadState, {ZERO, TWO}));
    112         threadIdPtr[i] = b->CreateGEP(pthreads, {ZERO, b->getInt32(i)});
    113         b->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, threadFunc, threadState);
    114     }
    115 
    116     AllocaInst * const threadState = b->CreateAlloca(threadStructType);
    117     b->CreateStore(handle, b->CreateGEP(threadState, {ZERO, ZERO}));
    118     b->CreateStore(b->getSize(0), b->CreateGEP(threadState, {ZERO, ONE}));
    119     Value * const processLocalState = allocateThreadLocalSpace(b, localStateTy);
    120     b->CreateStore(processLocalState, b->CreateGEP(threadState, {ZERO, TWO}));
    121     b->CreateCall(threadFunc, b->CreatePointerCast(threadState, voidPtrTy));
    122 
    123     AllocaInst * const status = b->CreateAlloca(voidPtrTy);
    124     for (unsigned i = 0; i < threads; ++i) {
    125         Value * threadId = b->CreateLoad(threadIdPtr[i]);
    126         b->CreatePThreadJoinCall(threadId, status);
    127         deallocateThreadLocalSpace(b, localState[i]);
    128     }
    129     deallocateThreadLocalSpace(b, processLocalState);
    130 
    131 }
     188    Value * const segmentOffset = b->CreateLoad(b->CreateGEP(threadState, {ZERO, SEG_OFFSET}));
     189    setThreadLocalState(b, b->CreateLoad(b->CreateGEP(threadState, {ZERO, LOCAL_STATE})));
     190    const auto numOfInputs = mPipelineKernel->getNumOfStreamInputs();
     191    for (unsigned i = 0; i < numOfInputs; ++i) {
     192        Value * streamHandle = b->CreateLoad(b->CreateGEP(threadState, {ZERO, b->getInt32(i + 3)}));
     193        auto buffer = mPipelineKernel->getInputStreamSetBuffer(i);
     194        buffer->setHandle(b, streamHandle);
     195    }
     196    const auto numOfOutputs = mPipelineKernel->getNumOfStreamOutputs();
     197    for (unsigned i = 0; i < numOfOutputs; ++i) {
     198        Value * streamHandle = b->CreateLoad(b->CreateGEP(threadState, {ZERO, b->getInt32(i + numOfInputs + 3)}));
     199        auto buffer = mPipelineKernel->getOutputStreamSetBuffer(i);
     200        buffer->setHandle(b, streamHandle);
     201    }
     202
     203    return segmentOffset;
     204}
     205
     206/** ------------------------------------------------------------------------------------------------------------- *
     207 * @brief deallocateThreadLocalSpace
     208 ** ------------------------------------------------------------------------------------------------------------- */
     209inline void PipelineCompiler::deallocateThreadState(BuilderRef b, Value * const threadState) {
     210    Constant * const ZERO = b->getInt32(0);
     211    Constant * const LOCAL_STATE = b->getInt32(2);
     212    deallocateThreadLocalState(b, b->CreateGEP(threadState, {ZERO, LOCAL_STATE}));
     213}
     214
    132215
    133216enum : int {
     
    144227
    145228/** ------------------------------------------------------------------------------------------------------------- *
    146  * @brief allocateThreadLocalSpace
    147  ** ------------------------------------------------------------------------------------------------------------- */
    148 inline Value * PipelineCompiler::allocateThreadLocalSpace(BuilderRef b, StructType * localStateType) {
     229 * @brief allocateThreadLocalState
     230 ** ------------------------------------------------------------------------------------------------------------- */
     231inline Value * PipelineCompiler::allocateThreadLocalState(BuilderRef b, StructType * localStateType) {
    149232    Value * const localState = b->CreateCacheAlignedAlloca(localStateType);
    150233    Constant * const ZERO = b->getInt32(0);
     
    155238
    156239/** ------------------------------------------------------------------------------------------------------------- *
    157  * @brief setThreadLocalSpace
    158  ** ------------------------------------------------------------------------------------------------------------- */
    159 inline void PipelineCompiler::setThreadLocalSpace(BuilderRef b, Value * const localState) {
     240 * @brief setThreadLocalState
     241 ** ------------------------------------------------------------------------------------------------------------- */
     242inline void PipelineCompiler::setThreadLocalState(BuilderRef b, Value * const localState) {
    160243    Constant * const ZERO = b->getInt32(0);
    161244    Constant * const POP_COUNT_STRUCT = b->getInt32(POP_COUNT_STRUCT_INDEX);
     
    164247
    165248/** ------------------------------------------------------------------------------------------------------------- *
    166  * @brief deallocateThreadLocalSpace
    167  ** ------------------------------------------------------------------------------------------------------------- */
    168 inline void PipelineCompiler::deallocateThreadLocalSpace(BuilderRef b, Value * const localState) {
     249 * @brief deallocateThreadLocalState
     250 ** ------------------------------------------------------------------------------------------------------------- */
     251inline void PipelineCompiler::deallocateThreadLocalState(BuilderRef b, Value * const localState) {
    169252    Constant * const ZERO = b->getInt32(0);
    170253    Constant * const POP_COUNT_STRUCT = b->getInt32(POP_COUNT_STRUCT_INDEX);
Note: See TracChangeset for help on using the changeset viewer.