Changeset 5403 for icGREP


Ignore:
Timestamp:
Apr 11, 2017, 4:42:34 PM (2 years ago)
Author:
nmedfort
Message:

Work on the pipeline algorithms.

Location:
icGREP/icgrep-devel/icgrep
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • icGREP/icgrep-devel/icgrep/IR_Gen/CBuilder.cpp

    r5402 r5403  
    393393
    394394Value * CBuilder::CreatePThreadCreateCall(Value * thread, Value * attr, Function * start_routine, Value * arg) {
     395    Type * const voidPtrTy = getVoidPtrTy();
    395396    Function * pthreadCreateFunc = mMod->getFunction("pthread_create");
    396397    if (pthreadCreateFunc == nullptr) {
    397398        Type * pthreadTy = getSizeTy();
    398399        FunctionType * funVoidPtrVoidTy = FunctionType::get(getVoidTy(), {getVoidPtrTy()}, false);
    399         FunctionType * fty = FunctionType::get(getInt32Ty(), {pthreadTy->getPointerTo(), getVoidPtrTy(), funVoidPtrVoidTy->getPointerTo(), getVoidPtrTy()}, false);
     400        FunctionType * fty = FunctionType::get(getInt32Ty(), {pthreadTy->getPointerTo(), voidPtrTy, funVoidPtrVoidTy->getPointerTo(), voidPtrTy}, false);
    400401        pthreadCreateFunc = Function::Create(fty, Function::ExternalLinkage, "pthread_create", mMod);
    401402        pthreadCreateFunc->setCallingConv(CallingConv::C);
    402403    }
    403     return CreateCall(pthreadCreateFunc, {thread, attr, start_routine, CreatePointerCast(arg, getVoidPtrTy())});
     404    return CreateCall(pthreadCreateFunc, {thread, attr, start_routine, CreatePointerCast(arg, voidPtrTy)});
    404405}
    405406
  • icGREP/icgrep-devel/icgrep/kernels/kernel.cpp

    r5402 r5403  
    517517}
    518518
    519 Value * KernelBuilder::createDoSegmentCall(const std::vector<Value *> & args) const {
     519CallInst * KernelBuilder::createDoSegmentCall(const std::vector<Value *> & args) const {
    520520    return iBuilder->CreateCall(getDoSegmentFunction(), args);
    521521}
    522522
    523 Value * KernelBuilder::createGetAccumulatorCall(Value * self, const std::string & accumName) const {
     523CallInst * KernelBuilder::createGetAccumulatorCall(Value * self, const std::string & accumName) const {
    524524    return iBuilder->CreateCall(getAccumulatorFunction(accumName), {self});
    525525}
  • icGREP/icgrep-devel/icgrep/kernels/kernel.h

    r5402 r5403  
    137137    const parabix::StreamSetBuffer * getStreamSetOutputBuffer(const unsigned i) const { return mStreamSetOutputBuffers[i]; }
    138138
    139     llvm::Value * createDoSegmentCall(const std::vector<llvm::Value *> & args) const;
    140 
    141     llvm::Value * createGetAccumulatorCall(llvm::Value * self, const std::string & accumName) const;
     139    llvm::CallInst * createDoSegmentCall(const std::vector<llvm::Value *> & args) const;
     140
     141    llvm::CallInst * createGetAccumulatorCall(llvm::Value * self, const std::string & accumName) const;
    142142
    143143protected:
  • icGREP/icgrep-devel/icgrep/kernels/pipeline.cpp

    r5402 r5403  
    99#include <kernels/streamset.h>
    1010#include <llvm/IR/Module.h>
     11#include <boost/container/flat_set.hpp>
    1112#include <boost/container/flat_map.hpp>
    1213
     
    1516using namespace llvm;
    1617
    17 using ProducerTable = std::vector<std::vector<std::pair<unsigned, unsigned>>>;
    18 
    19 using ConsumerTable = std::vector<std::vector<std::vector<unsigned>>>;
    20 
    2118template <typename Value>
    2219using StreamSetBufferMap = boost::container::flat_map<const StreamSetBuffer *, Value>;
    2320
    24 
    25 ProducerTable createProducerTable(const std::vector<KernelBuilder *> & kernels) {
    26     // map each output streamSet to its producing kernel and output index.
    27     StreamSetBufferMap<std::pair<unsigned, unsigned>> map;
    28     for (unsigned k = 0; k < kernels.size(); k++) {
    29         const auto & outputSets = kernels[k]->getStreamSetOutputBuffers();
    30         for (unsigned j = 0; j < outputSets.size(); j++) {
    31             map.emplace(outputSets[j], std::make_pair(k, j));
    32         }
    33     }
    34     // TODO: replace this with a sparse matrix? it would be easier to understand that the i,j-th element indicated kernel i's input was from the j-th kernel
    35     ProducerTable producerTable(kernels.size());
    36     for (unsigned k = 0; k < kernels.size(); k++) {
    37         const KernelBuilder * const kernel = kernels[k];
    38         const auto & inputSets = kernel->getStreamSetInputBuffers();
    39         for (unsigned i = 0; i < inputSets.size(); i++) {
    40             const auto f = map.find(inputSets[i]);
    41             if (LLVM_UNLIKELY(f == map.end())) {
    42                 report_fatal_error("Pipeline error: input buffer #" + std::to_string(i) + " of " + kernel->getName() + ": no corresponding output buffer. ");
    43             }
    44             unsigned sourceKernel, outputIndex;
    45             std::tie(sourceKernel, outputIndex) = f->second;
    46             producerTable[k].emplace_back(sourceKernel, outputIndex);
    47             if (LLVM_UNLIKELY(sourceKernel >= k)) {
    48                 report_fatal_error("Pipeline error: input buffer #" + std::to_string(i) + " of " + kernel->getName() + ": not defined before use. ");
    49             }
    50         }
    51     }
    52     return producerTable;
    53 }
    54 
    55 ConsumerTable createConsumerTable(const std::vector<KernelBuilder *> & kernels) {
    56     // map each input streamSet to its set of consuming kernels
    57     StreamSetBufferMap<std::vector<unsigned>> map;
    58     for (unsigned k = 0; k < kernels.size(); k++) {
    59         const auto & inputSets = kernels[k]->getStreamSetInputBuffers();
    60         for (const StreamSetBuffer * inputSet : inputSets) {
    61             auto f = map.find(inputSet);
    62             if (f == map.end()) {
    63                 map.emplace(inputSet, std::vector<unsigned>({k}));
    64             } else {
    65                 f->second.push_back(k);
    66             }
    67         }
    68     }
    69     ConsumerTable consumerTable(kernels.size());
    70     for (unsigned k = 0; k < kernels.size(); k++) {
    71         const auto & outputSets = kernels[k]->getStreamSetOutputBuffers();
    72         for (const StreamSetBuffer * outputSet : outputSets) {
    73             auto f = map.find(outputSet);
    74             if (LLVM_LIKELY(f != map.end())) {
    75                 consumerTable[k].emplace_back(std::move(f->second));
    76             }         
    77         }
    78     }
    79     return consumerTable;
    80 }
     21template <typename Value>
     22using FlatSet = boost::container::flat_set<Value>;
    8123
    8224Function * makeThreadFunction(const std::string & name, Module * const m) {
     
    9032}
    9133
    92 Function * generateSegmentParallelPipelineThreadFunction(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels, Type * sharedStructType, const ProducerTable & producerTable, const unsigned id) {
    93    
    94     // ProducerPos[k][i] will hold the producedItemCount of the i^th output stream
    95     // set of the k^th kernel.  These values will be loaded immediately after the
    96     // doSegment and finalSegment calls for kernel k and later used as the
    97     // producer position arguments for later doSegment/finalSegment calls.
    98    
    99     std::vector<std::vector<Value *>> ProducerPos;
    100    
    101    
     34/** ------------------------------------------------------------------------------------------------------------- *
     35 * @brief generateSegmentParallelPipeline
     36 *
     37 * Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
     38 * operating over an input stream set S, a segment-parallel implementation divides the input
     39 * into segments and coordinates a set of T <= K threads to each process one segment at a time.
     40 * Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
     41 * fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
     42 ** ------------------------------------------------------------------------------------------------------------- */
     43void generateSegmentParallelPipeline(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
     44
     45    const unsigned n = kernels.size(); assert (n > 0);
     46    Module * const m = iBuilder->getModule();
     47    IntegerType * const sizeTy = iBuilder->getSizeTy();
     48    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
     49    PointerType * const int8PtrTy = iBuilder->getInt8PtrTy();
     50    const unsigned threads = codegen::ThreadNum;
     51    Constant * nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
     52
     53    std::vector<Type *> structTypes;
     54    for (unsigned i = 0; i < n; i++) {
     55        kernels[i]->createInstance();
     56        structTypes.push_back(kernels[i]->getInstance()->getType());
     57    }
     58    StructType * const sharedStructType = StructType::get(m->getContext(), structTypes);
     59    StructType * const threadStructType = StructType::get(sharedStructType->getPointerTo(), sizeTy, nullptr);
     60
     61    // -------------------------------------------------------------------------------------------------------------------------
     62    // MAKE SEGMENT PARALLEL PIPELINE THREAD
     63    // -------------------------------------------------------------------------------------------------------------------------
    10264    const auto ip = iBuilder->saveIP();
    103    
    104     Module * const m = iBuilder->getModule();
    105     PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
    106 
    107     Function * const threadFunc = makeThreadFunction("thread" + std::to_string(id), m);
     65    Function * const threadFunc = makeThreadFunction("sppt", m);
    10866
    10967     // Create the basic blocks for the thread function.
    11068    BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc);
     69    iBuilder->SetInsertPoint(entryBlock);
     70    Value * const input = &threadFunc->getArgumentList().front();
     71    Value * const threadStruct = iBuilder->CreatePointerCast(input, threadStructType->getPointerTo());
     72    Value * const sharedStatePtr = iBuilder->CreateLoad(iBuilder->CreateGEP(threadStruct, {iBuilder->getInt32(0), iBuilder->getInt32(0)}));
     73    Value * instance[n];
     74    for (unsigned k = 0; k < n; k++) {
     75        Value * ptr = iBuilder->CreateGEP(sharedStatePtr, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
     76        instance[k] = iBuilder->CreateLoad(ptr);
     77    }
     78    Value * const segOffset = iBuilder->CreateLoad(iBuilder->CreateGEP(threadStruct, {iBuilder->getInt32(0), iBuilder->getInt32(1)}));
     79
    11180    BasicBlock * segmentLoop = BasicBlock::Create(iBuilder->getContext(), "segmentLoop", threadFunc);
    112     BasicBlock * exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
    113    
    114     std::vector<BasicBlock *> segmentWait;
    115     std::vector<BasicBlock *> segmentLoopBody;
    116     for (unsigned i = 0; i < kernels.size(); i++) {
    117         auto kname = kernels[i]->getName();
    118         segmentWait.push_back(BasicBlock::Create(iBuilder->getContext(), kname + "Wait", threadFunc));
    119         segmentLoopBody.push_back(BasicBlock::Create(iBuilder->getContext(), kname + "Do", threadFunc));
    120     }
    121 
    122     iBuilder->SetInsertPoint(entryBlock);
    123    
    124     Value * input = &(*threadFunc->arg_begin());
    125     Value * sharedStruct = iBuilder->CreateBitCast(input, sharedStructType->getPointerTo());
    126     std::vector<Value *> instancePtrs;
    127     for (unsigned k = 0; k < kernels.size(); k++) {
    128         Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
    129         instancePtrs.push_back(iBuilder->CreateLoad(ptr));
    130     }
    131    
    13281    iBuilder->CreateBr(segmentLoop);
    13382
    13483    iBuilder->SetInsertPoint(segmentLoop);
    135     PHINode * segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 2, "segNo");
    136     segNo->addIncoming(iBuilder->getSize(id), entryBlock);
    137     const unsigned last_kernel = kernels.size() - 1;
    138     Value * doFinal = ConstantInt::getNullValue(iBuilder->getInt1Ty());
    139     Value * nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
    140     iBuilder->CreateBr(segmentWait[0]);
    141     for (unsigned k = 0; k < kernels.size(); k++) {
    142         iBuilder->SetInsertPoint(segmentWait[k]);
    143         unsigned waitForKernel = k;
    144         if (codegen::DebugOptionIsSet(codegen::SerializeThreads)) {
    145             waitForKernel = last_kernel;
    146         }
    147         Value * processedSegmentCount = kernels[waitForKernel]->acquireLogicalSegmentNo(instancePtrs[waitForKernel]);
    148         Value * ready = iBuilder->CreateICmpEQ(segNo, processedSegmentCount);
    149 
     84    PHINode * const segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 2, "segNo");
     85    segNo->addIncoming(segOffset, entryBlock);
     86
     87    Value * doFinal = iBuilder->getFalse();
     88    Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
     89
     90    BasicBlock * segmentWait = BasicBlock::Create(iBuilder->getContext(), kernels[0]->getName() + "Wait", threadFunc);
     91
     92    BasicBlock * segmentLoopBody = nullptr;
     93
     94    BasicBlock * const exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
     95
     96    iBuilder->CreateBr(segmentWait);
     97
     98    StreamSetBufferMap<Value *> producedPos;
     99
     100    for (unsigned k = 0;;) {
    150101        KernelBuilder * const kernel = kernels[k];
     102        const unsigned waitIdx = codegen::DebugOptionIsSet(codegen::SerializeThreads) ? (n - 1) : k;
     103
     104        segmentLoopBody = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Do", threadFunc);
     105
     106        iBuilder->SetInsertPoint(segmentWait);
     107        Value * const processedSegmentCount = kernels[waitIdx]->acquireLogicalSegmentNo(instance[waitIdx]);
     108        assert (processedSegmentCount->getType() == segNo->getType());
     109        Value * const ready = iBuilder->CreateICmpEQ(segNo, processedSegmentCount);
    151110
    152111        if (kernel->hasNoTerminateAttribute()) {
    153             iBuilder->CreateCondBr(ready, segmentLoopBody[k], segmentWait[k]);
     112            iBuilder->CreateCondBr(ready, segmentLoopBody, segmentWait);
    154113        } else { // If the kernel was terminated in a previous segment then the pipeline is done.
    155114            BasicBlock * completionTest = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Completed", threadFunc, 0);
    156115            BasicBlock * exitBlock = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Exit", threadFunc, 0);
    157             iBuilder->CreateCondBr(ready, completionTest, segmentWait[k]);
     116            iBuilder->CreateCondBr(ready, completionTest, segmentWait);
    158117            iBuilder->SetInsertPoint(completionTest);
    159             Value * alreadyDone = kernel->getTerminationSignal(instancePtrs[k]);
    160             iBuilder->CreateCondBr(alreadyDone, exitBlock, segmentLoopBody[k]);
     118            Value * alreadyDone = kernel->getTerminationSignal(instance[k]);
     119            iBuilder->CreateCondBr(alreadyDone, exitBlock, segmentLoopBody);
    161120            iBuilder->SetInsertPoint(exitBlock);
    162121            // Ensure that the next thread will also exit.
    163             kernel->releaseLogicalSegmentNo(instancePtrs[k], nextSegNo);
     122            kernel->releaseLogicalSegmentNo(instance[k], nextSegNo);
    164123            iBuilder->CreateBr(exitThreadBlock);
    165124        }
    166         iBuilder->SetInsertPoint(segmentLoopBody[k]);
    167         std::vector<Value *> args = {instancePtrs[k], doFinal};
    168         for (unsigned j = 0; j < kernel->getStreamInputs().size(); j++) {
    169             unsigned producerKernel, outputIndex;
    170             std::tie(producerKernel, outputIndex) = producerTable[k][j];
    171             args.push_back(ProducerPos[producerKernel][outputIndex]);
    172         }
    173         for (unsigned i = 0; i < kernel->getStreamOutputs().size(); ++i) {
     125
     126        // Execute the kernel segment
     127        iBuilder->SetInsertPoint(segmentLoopBody);
     128        const auto & inputs = kernel->getStreamInputs();
     129        const auto & outputs = kernel->getStreamOutputs();
     130        std::vector<Value *> args = {instance[k], doFinal};
     131        for (unsigned i = 0; i < inputs.size(); ++i) {
     132            const auto f = producedPos.find(kernel->getStreamSetInputBuffer(i));
     133            if (LLVM_UNLIKELY(f == producedPos.end())) {
     134                report_fatal_error(kernel->getName() + " uses stream set " + inputs[i].name + " prior to its definition");
     135            }
     136            args.push_back(f->second);
     137        }
     138        for (unsigned i = 0; i < outputs.size(); ++i) {
    174139            args.push_back(iBuilder->getSize(0));
    175140        }
    176         kernel->createDoSegmentCall(args);
    177          if (!(kernel->hasNoTerminateAttribute())) {
    178             Value * terminated = kernel->getTerminationSignal(instancePtrs[k]);
    179             doFinal = iBuilder->CreateOr(doFinal, terminated);
    180         }
    181         std::vector<Value *> produced;
    182         for (unsigned i = 0; i < kernel->getStreamOutputs().size(); i++) {
    183             produced.push_back(kernel->getProducedItemCount(instancePtrs[k], kernel->getStreamOutputs()[i].name, doFinal));
    184         }
    185         ProducerPos.push_back(produced);
    186 
    187         kernel->releaseLogicalSegmentNo(instancePtrs[k], nextSegNo);
    188         if (k == last_kernel) {
    189             segNo->addIncoming(iBuilder->CreateAdd(segNo, iBuilder->getSize(codegen::ThreadNum)), segmentLoopBody[last_kernel]);
     141        CallInst * ci = kernel->createDoSegmentCall(args);
     142        // TODO: investigate whether this actually inlines the function call correctly despite being in a seperate module.
     143        ci->addAttribute(AttributeSet::FunctionIndex, Attribute::AlwaysInline);
     144
     145        if (!kernel->hasNoTerminateAttribute()) {
     146            doFinal = iBuilder->CreateOr(doFinal, kernel->getTerminationSignal(instance[k]));
     147        }
     148        for (unsigned i = 0; i < outputs.size(); i++) {
     149            Value * const produced = kernel->getProducedItemCount(instance[k], outputs[i].name, doFinal);
     150            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
     151            assert (producedPos.count(buf) == 0);
     152            producedPos.emplace(buf, produced);
     153        }
     154
     155        kernel->releaseLogicalSegmentNo(instance[k], nextSegNo);
     156        if (LLVM_UNLIKELY(++k == n)) {
     157            assert (segmentLoopBody);
     158            exitThreadBlock->moveAfter(segmentLoopBody);
     159            segNo->addIncoming(iBuilder->CreateAdd(segNo, iBuilder->getSize(threads)), segmentLoopBody);
    190160            iBuilder->CreateCondBr(doFinal, exitThreadBlock, segmentLoop);
     161
     162            iBuilder->SetInsertPoint(exitThreadBlock);
     163            iBuilder->CreatePThreadExitCall(nullVoidPtrVal);
     164            iBuilder->CreateRetVoid();
     165            break;
    191166        } else {
    192             iBuilder->CreateBr(segmentWait[k+1]);
    193         }
    194     }
    195 
    196     iBuilder->SetInsertPoint(exitThreadBlock);
    197     Value * nullVal = Constant::getNullValue(voidPtrTy);
    198     iBuilder->CreatePThreadExitCall(nullVal);
    199     iBuilder->CreateRetVoid();
     167            segmentWait = BasicBlock::Create(iBuilder->getContext(), kernels[k]->getName() + "Wait", threadFunc);
     168            iBuilder->CreateBr(segmentWait);
     169        }
     170    }
     171
     172    // -------------------------------------------------------------------------------------------------------------------------
    200173    iBuilder->restoreIP(ip);
    201     return threadFunc;
     174
     175    // -------------------------------------------------------------------------------------------------------------------------
     176    // MAKE SEGMENT PARALLEL PIPELINE DRIVER
     177    // -------------------------------------------------------------------------------------------------------------------------
     178
     179    Type * const pthreadsTy = ArrayType::get(sizeTy, threads);
     180    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
     181    Value * threadIdPtr[threads];
     182    for (unsigned i = 0; i < threads; i++) {
     183        threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
     184    }
     185
     186    for (unsigned i = 0; i < n; i++) {
     187        kernels[i]->releaseLogicalSegmentNo(kernels[i]->getInstance(), iBuilder->getSize(0));
     188    }
     189
     190    AllocaInst * const sharedStruct = iBuilder->CreateCacheAlignedAlloca(sharedStructType);
     191    for (unsigned i = 0; i < n; i++) {
     192        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
     193        iBuilder->CreateStore(kernels[i]->getInstance(), ptr);
     194    }
     195
     196    for (unsigned i = 0; i < threads; i++) {
     197        AllocaInst * threadState = iBuilder->CreateAlloca(threadStructType);
     198        Value * const sharedStatePtr = iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(0)});
     199        iBuilder->CreateStore(sharedStruct, sharedStatePtr);
     200        Value * const segmentOffsetPtr = iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(1)});
     201        iBuilder->CreateStore(iBuilder->getSize(i), segmentOffsetPtr);
     202        iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, threadFunc, threadState);
     203    }
     204
     205    AllocaInst * const status = iBuilder->CreateAlloca(int8PtrTy);
     206    for (unsigned i = 0; i < threads; i++) {
     207        Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]);
     208        iBuilder->CreatePThreadJoinCall(threadId, status);
     209    }
    202210}
    203211
    204 // Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
    205 // operating over an input stream set S, a segment-parallel implementation divides the input
    206 // into segments and coordinates a set of T <= K threads to each process one segment at a time.   
    207 // Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
    208 // fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
    209 
    210 
    211 void generateSegmentParallelPipeline(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
    212        
    213     Module * m = iBuilder->getModule();
    214    
    215     IntegerType * const size_ty = iBuilder->getSizeTy();
     212
     213/** ------------------------------------------------------------------------------------------------------------- *
     214 * @brief generateParallelPipeline
     215 ** ------------------------------------------------------------------------------------------------------------- */
     216void generateParallelPipeline(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
     217
     218    Module * const m = iBuilder->getModule();
     219    IntegerType * const sizeTy = iBuilder->getSizeTy();
    216220    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
    217221    PointerType * const int8PtrTy = iBuilder->getInt8PtrTy();
    218    
     222    ConstantInt * bufferSegments = ConstantInt::get(sizeTy, codegen::BufferSegments - 1);
     223    ConstantInt * segmentItems = ConstantInt::get(sizeTy, codegen::SegmentSize * iBuilder->getBitBlockWidth());
     224    Constant * const nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
     225
    219226    for (auto k : kernels) {
    220227        k->createInstance();
    221228    }
    222229
    223     const ProducerTable producerTable = createProducerTable(kernels);
    224    
    225     Type * const pthreadsTy = ArrayType::get(size_ty, codegen::ThreadNum);
     230    const unsigned n = kernels.size();
     231
     232    Type * const pthreadsTy = ArrayType::get(sizeTy, n);
    226233    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
    227     std::vector<Value *> pthreadsPtrs;
    228     for (int i = 0; i < codegen::ThreadNum; i++) {
    229         pthreadsPtrs.push_back(iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)}));
    230     }
    231     Value * nullVal = Constant::getNullValue(voidPtrTy);
    232     AllocaInst * const status = iBuilder->CreateAlloca(int8PtrTy);
    233    
    234     std::vector<Type *> structTypes;
    235     for (unsigned i = 0; i < kernels.size(); i++) {
    236         structTypes.push_back(kernels[i]->getInstance()->getType());
    237     }
    238     Type * sharedStructType = StructType::get(m->getContext(), structTypes);
    239    
     234    Value * threadIdPtr[n];
     235    for (unsigned i = 0; i < n; i++) {
     236        threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
     237    }
     238
     239    Type * structTypes[n];
     240    for (unsigned i = 0; i < n; i++) {
     241        structTypes[i] = kernels[i]->getInstance()->getType();
     242    }
     243    Type * const sharedStructType = StructType::get(m->getContext(), ArrayRef<Type *>{structTypes, n});
    240244    AllocaInst * sharedStruct = iBuilder->CreateAlloca(sharedStructType);
    241     for (unsigned i = 0; i < kernels.size(); i++) {
     245    for (unsigned i = 0; i < n; i++) {
    242246        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
    243247        iBuilder->CreateStore(kernels[i]->getInstance(), ptr);
    244248    }
    245     for (unsigned i = 0; i < kernels.size(); i++) {
     249    for (unsigned i = 0; i < n; i++) {
    246250        kernels[i]->releaseLogicalSegmentNo(kernels[i]->getInstance(), iBuilder->getSize(0));
    247251    }
    248252
    249     std::vector<Function *> thread_functions;
     253    // GENERATE THE PRODUCING AND CONSUMING KERNEL MAPS
     254    StreamSetBufferMap<unsigned> producingKernel;
     255    StreamSetBufferMap<std::vector<unsigned>> consumingKernels;
     256    for (unsigned id = 0; id < n; ++id) {
     257        KernelBuilder * const kernel = kernels[id];
     258        const auto & inputs = kernel->getStreamInputs();
     259        const auto & outputs = kernel->getStreamOutputs();
     260        // add any outputs from this kernel to the producing kernel map
     261        for (unsigned j = 0; j < outputs.size(); ++j) {
     262            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(j);
     263            if (LLVM_UNLIKELY(producingKernel.count(buf) != 0)) {
     264                report_fatal_error(kernel->getName() + " redefines stream set " + outputs[j].name);
     265            }
     266            producingKernel.emplace(buf, id);
     267        }
     268        // and any inputs to the consuming kernels list
     269        for (unsigned j = 0; j < inputs.size(); ++j) {
     270            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(j);
     271            auto f = consumingKernels.find(buf);
     272            if (f == consumingKernels.end()) {
     273                if (LLVM_UNLIKELY(producingKernel.count(buf) == 0)) {
     274                    report_fatal_error(kernel->getName() + " uses stream set " + inputs[j].name + " prior to its definition");
     275                }
     276                consumingKernels.emplace(buf, std::vector<unsigned>{ id });
     277            } else {
     278                f->second.push_back(id);
     279            }
     280        }
     281    }
     282
    250283    const auto ip = iBuilder->saveIP();
    251     for (int i = 0; i < codegen::ThreadNum; i++) {
    252         thread_functions.push_back(generateSegmentParallelPipelineThreadFunction(iBuilder, kernels, sharedStructType, producerTable, i));
    253     }
    254     iBuilder->restoreIP(ip);
    255    
    256     for (int i = 0; i < codegen::ThreadNum; i++) {
    257         iBuilder->CreatePThreadCreateCall(pthreadsPtrs[i], nullVal, thread_functions[i], sharedStruct);
    258     }
    259    
    260     std::vector<Value *> threadIDs;
    261     for (int i = 0; i < codegen::ThreadNum; i++) {
    262         threadIDs.push_back(iBuilder->CreateLoad(pthreadsPtrs[i]));
    263     }
    264    
    265     for (int i = 0; i < codegen::ThreadNum; i++) {
    266         iBuilder->CreatePThreadJoinCall(threadIDs[i], status);
    267     }
    268    
    269 }
    270 
    271 Function * generateParallelPipelineThreadFunction(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels, Type * sharedStructType, const ProducerTable & producerTable, const ConsumerTable & consumerTable, const unsigned id) {
    272        
    273     const auto ip = iBuilder->saveIP();
    274    
    275     Module * m = iBuilder->getModule();
    276     Function * const threadFunc = makeThreadFunction("thread" + std::to_string(id), m);
    277 
    278     KernelBuilder * const kernel = kernels[id];
    279     Value * bufferSegments = ConstantInt::get(iBuilder->getSizeTy(), codegen::BufferSegments - 1);
    280     ConstantInt * segmentItems = iBuilder->getSize(codegen::SegmentSize * iBuilder->getBitBlockWidth());
    281 
    282      // Create the basic blocks for the thread function.
    283     BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc);
    284     BasicBlock * outputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "outputCheck", threadFunc);
    285     BasicBlock * doSegmentBlock = BasicBlock::Create(iBuilder->getContext(), "doSegment", threadFunc);
    286     BasicBlock * exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
    287 
    288     iBuilder->SetInsertPoint(entryBlock);
    289    
    290     Value * input = &(*threadFunc->arg_begin());
    291     Value * sharedStruct = iBuilder->CreateBitCast(input, sharedStructType->getPointerTo());
    292 
    293     const unsigned n = kernels.size();
    294 
    295     Value * instancePtrs[n];
    296     for (unsigned k = 0; k < n; k++) {
    297         Value * const ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
    298         instancePtrs[k] = iBuilder->CreateLoad(ptr);
    299     }
    300 
    301     iBuilder->CreateBr(outputCheckBlock);
    302 
    303     iBuilder->SetInsertPoint(outputCheckBlock);
    304     PHINode * segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 3, "segNo");
    305     segNo->addIncoming(iBuilder->getSize(0), entryBlock);
    306     segNo->addIncoming(segNo, outputCheckBlock);
    307 
    308     Value * outputWaitCond = iBuilder->getTrue();
    309     for (unsigned j = 0; j < kernel->getStreamOutputs().size(); j++) {
    310         const auto & consumerKernels = consumerTable[id][j];
    311         for (unsigned k = 0; k < consumerKernels.size(); k++) {
    312             Value * consumerSegNo = kernels[consumerKernels[k]]->acquireLogicalSegmentNo(instancePtrs[consumerKernels[k]]);
    313             outputWaitCond = iBuilder->CreateAnd(outputWaitCond, iBuilder->CreateICmpULE(segNo, iBuilder->CreateAdd(consumerSegNo, bufferSegments)));
    314         }
    315     }
    316 
    317     if (kernel->getStreamInputs().empty()) {
    318 
    319         iBuilder->CreateCondBr(outputWaitCond, doSegmentBlock, outputCheckBlock);
    320 
     284
     285    // GENERATE UNIQUE PIPELINE PARALLEL THREAD FUNCTION FOR EACH KERNEL
     286    FlatSet<unsigned> kernelSet;
     287    kernelSet.reserve(n);
     288
     289    Function * thread_functions[n];
     290    for (unsigned id = 0; id < n; id++) {
     291        KernelBuilder * const kernel = kernels[id];
     292        const auto & inputs = kernel->getStreamInputs();
     293        const auto & outputs = kernel->getStreamOutputs();
     294
     295        Function * const threadFunc = makeThreadFunction("ppt:" + kernel->getName(), m);
     296
     297         // Create the basic blocks for the thread function.
     298        BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc);
     299        BasicBlock * outputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "outputCheck", threadFunc);
     300        BasicBlock * inputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "inputCheck", threadFunc);
     301        BasicBlock * doSegmentBlock = BasicBlock::Create(iBuilder->getContext(), "doSegment", threadFunc);
     302        BasicBlock * exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
     303
     304        iBuilder->SetInsertPoint(entryBlock);
     305
     306        Value * sharedStruct = iBuilder->CreateBitCast(&threadFunc->getArgumentList().front(), sharedStructType->getPointerTo());
     307
     308        Value * instancePtrs[n];
     309        for (unsigned k = 0; k < n; k++) {
     310            Value * const ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
     311            instancePtrs[k] = iBuilder->CreateLoad(ptr);
     312        }
     313
     314        iBuilder->CreateBr(outputCheckBlock);
     315
     316        // Check whether the output buffers are ready for more data
     317        iBuilder->SetInsertPoint(outputCheckBlock);
     318        PHINode * segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 3, "segNo");
     319        segNo->addIncoming(iBuilder->getSize(0), entryBlock);
     320        segNo->addIncoming(segNo, outputCheckBlock);
     321
     322        Value * outputWaitCond = iBuilder->getTrue();
     323        for (const StreamSetBuffer * buf : kernel->getStreamSetOutputBuffers()) {
     324            const auto & list = consumingKernels[buf];
     325            assert(std::is_sorted(list.begin(), list.end()));
     326            kernelSet.insert(list.begin(), list.end());
     327        }
     328        for (unsigned k : kernelSet) {
     329            Value * consumerSegNo = kernels[k]->acquireLogicalSegmentNo(instancePtrs[k]);
     330            assert (consumerSegNo->getType() == segNo->getType());
     331            Value * consumedSegNo = iBuilder->CreateAdd(consumerSegNo, bufferSegments);
     332            outputWaitCond = iBuilder->CreateAnd(outputWaitCond, iBuilder->CreateICmpULE(segNo, consumedSegNo));
     333        }
     334        kernelSet.clear();
     335        iBuilder->CreateCondBr(outputWaitCond, inputCheckBlock, outputCheckBlock);
     336
     337        // Check whether the input buffers have enough data for this kernel to begin
     338        iBuilder->SetInsertPoint(inputCheckBlock);
     339        for (const StreamSetBuffer * buf : kernel->getStreamSetInputBuffers()) {
     340            kernelSet.insert(producingKernel[buf]);
     341        }
     342        const auto m = kernelSet.size();
     343        Value * producerSegNo[m];
     344
     345        Value * inputWaitCond = iBuilder->getTrue();
     346        if (m) {
     347            unsigned j = 0;
     348            for (unsigned k : kernelSet) {
     349                producerSegNo[j] = kernels[k]->acquireLogicalSegmentNo(instancePtrs[k]);
     350                assert (producerSegNo[j]->getType() == segNo->getType());
     351                inputWaitCond = iBuilder->CreateAnd(inputWaitCond, iBuilder->CreateICmpULT(segNo, producerSegNo[j++]));
     352            }
     353        }
     354        iBuilder->CreateCondBr(inputWaitCond, doSegmentBlock, inputCheckBlock);
     355
     356        // Process the segment
    321357        iBuilder->SetInsertPoint(doSegmentBlock);
    322358
    323         std::vector<Value *> args = {instancePtrs[id], iBuilder->getFalse()};
    324         for (unsigned i = 0; i < kernel->getStreamOutputs().size(); ++i) {
    325             args.push_back(iBuilder->getSize(0));
    326         }
    327         kernel->createDoSegmentCall(args);
    328         Value * nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
    329         segNo->addIncoming(nextSegNo, doSegmentBlock);
    330         Value * const terminated = kernel->getTerminationSignal(instancePtrs[id]);
    331         kernel->releaseLogicalSegmentNo(instancePtrs[id], nextSegNo);
    332         iBuilder->CreateCondBr(terminated, exitThreadBlock, outputCheckBlock);
    333 
    334     } else {
    335 
    336         BasicBlock * inputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "inputCheck", threadFunc, doSegmentBlock);
    337 
    338         iBuilder->CreateCondBr(outputWaitCond, inputCheckBlock, outputCheckBlock);
    339 
    340         iBuilder->SetInsertPoint(inputCheckBlock);
    341        
    342         Value * inputWaitCond = iBuilder->getTrue();
    343         for (unsigned j = 0; j < kernel->getStreamInputs().size(); j++) {
    344             unsigned producerKernel, outputIndex;
    345             std::tie(producerKernel, outputIndex) = producerTable[id][j];
    346             Value * producerSegNo = kernels[producerKernel]->acquireLogicalSegmentNo(instancePtrs[producerKernel]);
    347             inputWaitCond = iBuilder->CreateAnd(inputWaitCond, iBuilder->CreateICmpULT(segNo, producerSegNo));
    348         }
    349 
    350         iBuilder->CreateCondBr(inputWaitCond, doSegmentBlock, inputCheckBlock);
    351 
    352         iBuilder->SetInsertPoint(doSegmentBlock);
    353 
    354         Value * nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
    355 
    356         Value * terminated = iBuilder->getTrue();
    357         for (unsigned j = 0; j < kernel->getStreamInputs().size(); j++) {
    358             unsigned producerKernel, outputIndex;
    359             std::tie(producerKernel, outputIndex) = producerTable[id][j];
    360             terminated = iBuilder->CreateAnd(terminated, kernels[producerKernel]->getTerminationSignal(instancePtrs[producerKernel]));
    361             Value * producerSegNo = kernels[producerKernel]->acquireLogicalSegmentNo(instancePtrs[producerKernel]);
    362             terminated = iBuilder->CreateAnd(terminated, iBuilder->CreateICmpEQ(nextSegNo, producerSegNo));
    363         }
    364        
     359        Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
     360
     361        Value * terminated = nullptr;
     362        if (m == 0) {
     363            // if this kernel has no input streams, the kernel itself must decide when it terminates.
     364            terminated = kernel->getTerminationSignal(instancePtrs[id]);
     365        } else {
     366            // ... otherwise the kernel terminates only when it exhausts all of its input streams
     367            terminated = iBuilder->getTrue();
     368            unsigned j = 0;
     369            for (unsigned k : kernelSet) {
     370                terminated = iBuilder->CreateAnd(terminated, kernels[k]->getTerminationSignal(instancePtrs[k]));
     371                terminated = iBuilder->CreateAnd(terminated, iBuilder->CreateICmpEQ(nextSegNo, producerSegNo[j++]));
     372            }
     373            kernelSet.clear();
     374        }
     375
    365376        std::vector<Value *> args = {instancePtrs[id], terminated};
    366         for (unsigned j = 0; j < kernel->getStreamInputs().size(); j++) {
    367             unsigned producerKernel, outputIndex;
    368             std::tie(producerKernel, outputIndex) = producerTable[id][j];
    369             args.push_back(iBuilder->CreateMul(segmentItems, segNo));
    370         }
    371         for (unsigned i = 0; i < kernel->getStreamOutputs().size(); ++i) {
    372             args.push_back(iBuilder->getSize(0));
    373         }
     377        args.insert(args.end(), inputs.size(), iBuilder->CreateMul(segmentItems, segNo));
     378        args.insert(args.end(), outputs.size(), iBuilder->getSize(0));
     379
    374380        kernel->createDoSegmentCall(args);
    375381        segNo->addIncoming(nextSegNo, doSegmentBlock);
     
    377383
    378384        iBuilder->CreateCondBr(terminated, exitThreadBlock, outputCheckBlock);
    379     }
    380 
    381     iBuilder->SetInsertPoint(exitThreadBlock);
    382     iBuilder->CreatePThreadExitCall(ConstantPointerNull::getNullValue(iBuilder->getVoidPtrTy()));
    383     iBuilder->CreateRetVoid();
     385
     386        iBuilder->SetInsertPoint(exitThreadBlock);
     387        iBuilder->CreatePThreadExitCall(nullVoidPtrVal);
     388        iBuilder->CreateRetVoid();
     389
     390        thread_functions[id] = threadFunc;
     391    }
     392
    384393    iBuilder->restoreIP(ip);
    385     return threadFunc;
     394
     395    for (unsigned i = 0; i < n; i++) {
     396        iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, thread_functions[i], sharedStruct);
     397    }
     398
     399    AllocaInst * const status = iBuilder->CreateAlloca(int8PtrTy);
     400    for (unsigned i = 0; i < n; i++) {
     401        Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]);
     402        iBuilder->CreatePThreadJoinCall(threadId, status);
     403    }
     404
    386405}
    387406
    388 void generateParallelPipeline(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
    389    
    390     Module * const m = iBuilder->getModule();
    391     IntegerType * const size_ty = iBuilder->getSizeTy();
    392     PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
    393     PointerType * const int8PtrTy = iBuilder->getInt8PtrTy();
    394    
    395     for (auto k : kernels) {
    396         k->createInstance();
    397     }
    398 
    399     const ProducerTable producerTable = createProducerTable(kernels);
    400     const ConsumerTable consumerTable = createConsumerTable(kernels);
    401     const unsigned n = kernels.size();
    402 
    403     Type * const pthreadsTy = ArrayType::get(size_ty, n);
    404     AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);   
    405     Value * pthreadsPtrs[n];
    406     for (unsigned i = 0; i < n; i++) {
    407         pthreadsPtrs[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
    408     }
    409    
    410     std::vector<Type *> structTypes;
    411     for (unsigned i = 0; i < n; i++) {
    412         structTypes.push_back(kernels[i]->getInstance()->getType());
    413     }
    414     Type * const sharedStructType = StructType::get(m->getContext(), structTypes);
    415     AllocaInst * sharedStruct = iBuilder->CreateAlloca(sharedStructType);
    416     for (unsigned i = 0; i < n; i++) {
    417         Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
    418         iBuilder->CreateStore(kernels[i]->getInstance(), ptr);
    419     }
    420     for (unsigned i = 0; i < n; i++) {
    421         KernelBuilder * const K = kernels[i];
    422         K->releaseLogicalSegmentNo(K->getInstance(), iBuilder->getSize(0));
    423     }
    424 
    425     Function * thread_functions[n];
    426     for (unsigned i = 0; i < n; i++) {
    427         thread_functions[i] = generateParallelPipelineThreadFunction(iBuilder, kernels, sharedStructType, producerTable, consumerTable, i);
    428     }
    429    
    430     Value * nullVal = Constant::getNullValue(voidPtrTy);
    431     for (unsigned i = 0; i < n; i++) {
    432         iBuilder->CreatePThreadCreateCall(pthreadsPtrs[i], nullVal, thread_functions[i], sharedStruct);
    433     }
    434 
    435     AllocaInst * const status = iBuilder->CreateAlloca(int8PtrTy);
    436     for (unsigned i = 0; i < n; i++) {
    437         Value * threadId = iBuilder->CreateLoad(pthreadsPtrs[i]);
    438         iBuilder->CreatePThreadJoinCall(threadId, status);
    439     }
    440 
    441 }
    442 
     407/** ------------------------------------------------------------------------------------------------------------- *
     408 * @brief generatePipelineLoop
     409 ** ------------------------------------------------------------------------------------------------------------- */
    443410void generatePipelineLoop(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
    444411
     
    459426    iBuilder->SetInsertPoint(pipelineLoop);
    460427
    461     // Gather all of our
     428    // Build up the initial phi nodes for each of the consumed (minimum processed) positions
    462429    for (unsigned k = 0; k < kernels.size(); k++) {
    463430        KernelBuilder * const kernel = kernels[k];
Note: See TracChangeset for help on using the changeset viewer.