Changeset 5363 for icGREP


Ignore:
Timestamp:
Mar 14, 2017, 10:33:04 AM (2 years ago)
Author:
lindanl
Message:

pipeline parallel for icgrep.

Location:
icGREP/icgrep-devel/icgrep
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • icGREP/icgrep-devel/icgrep/grep_engine.cpp

    r5361 r5363  
    336336
    337337        if (pipelineParallel){
    338             generatePipelineParallel(iBuilder, KernelList);
     338            generateParallelPipeline(iBuilder, KernelList);
    339339        } else if (segmentPipelineParallel){
    340340            generateSegmentParallelPipeline(iBuilder, KernelList);
     
    352352
    353353        if (pipelineParallel) {
    354             generatePipelineParallel(iBuilder, KernelList);
     354            generateParallelPipeline(iBuilder, KernelList);
    355355        } else if (segmentPipelineParallel) {
    356356            generateSegmentParallelPipeline(iBuilder, KernelList);
     
    498498        icgrepK.generateKernel({&BasisBits, &LineBreakStream}, {});
    499499        if (pipelineParallel) {
    500             generatePipelineParallel(iBuilder, {&mmapK, &s2pk, &linebreakK, &icgrepK});
     500            generateParallelPipeline(iBuilder, {&mmapK, &s2pk, &linebreakK, &icgrepK});
    501501        } else if (segmentPipelineParallel) {
    502502            generateSegmentParallelPipeline(iBuilder, {&mmapK, &s2pk, &linebreakK, &icgrepK});
     
    527527           
    528528            if (pipelineParallel) {
    529                 generatePipelineParallel(iBuilder, {&mmapK, &s2pk, &linebreakK, &icgrepK, &scanMatchK});
     529                generateParallelPipeline(iBuilder, {&mmapK, &s2pk, &linebreakK, &icgrepK, &scanMatchK});
    530530            } else if (segmentPipelineParallel) {
    531531                generateSegmentParallelPipeline(iBuilder, {&mmapK, &s2pk, &linebreakK, &icgrepK, &scanMatchK});
  • icGREP/icgrep-devel/icgrep/kernels/pipeline.cpp

    r5320 r5363  
    1414using namespace parabix;
    1515using namespace llvm;
     16
     17#include <iostream>
    1618
    1719using ProducerTable = std::vector<std::vector<std::pair<unsigned, unsigned>>>;
     
    6567}
    6668
     69using ConsumerTable = std::vector<std::vector<std::vector<unsigned>>>;
     70
     71ConsumerTable createConsumerTable(const std::vector<KernelBuilder *> & kernels) {
     72    ConsumerTable consumerTable(kernels.size());
     73   
     74    // First prepare a map from streamSet input buffers to their consuming kernel and input index.
     75    std::unordered_map<const StreamSetBuffer *, std::vector<unsigned>> bufferMap;
     76   
     77    for (unsigned k = 0; k < kernels.size(); k++) {
     78        auto inputSets = kernels[k]->getStreamSetInputBuffers();
     79        for (unsigned j = 0; j < inputSets.size(); j++) {
     80            auto f = bufferMap.find(inputSets[j]);
     81            std::vector<unsigned> kernelNo;
     82            kernelNo.push_back(k);
     83            if (f == bufferMap.end()) {
     84                bufferMap.insert(std::make_pair(inputSets[j], kernelNo));
     85            }
     86            else{
     87                f->second.push_back(k);
     88            }
     89        }
     90    }
     91    for (unsigned k = 0; k < kernels.size(); k++) {
     92        auto outputSets = kernels[k]->getStreamSetOutputBuffers();
     93        for (unsigned i = 0; i < outputSets.size(); i++) {
     94            auto f = bufferMap.find(outputSets[i]);
     95            if (f == bufferMap.end()) {
     96                llvm::report_fatal_error("Pipeline error: output buffer #" + std::to_string(i) + " of " + kernels[k]->getName() + ": not used by any kernel. ");
     97            }
     98            else {
     99                consumerTable[k].push_back(f->second); 
     100            }         
     101        }
     102    }
     103    return consumerTable;
     104}
     105
    67106Function * generateSegmentParallelPipelineThreadFunction(std::string name, IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels, Type * sharedStructType, ProducerTable & producerTable, int id) {
    68107   
     
    247286}
    248287
    249 void generatePipelineParallel(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
    250     llvm::report_fatal_error("Pipeline parallelism no longer supported!");
    251 }
    252 
     288Function * generateParallelPipelineThreadFunction(std::string name, IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels, Type * sharedStructType, ProducerTable & producerTable, ConsumerTable & consumerTable, int id) {
     289       
     290    const auto ip = iBuilder->saveIP();
     291   
     292    Module * m = iBuilder->getModule();
     293    Type * const voidTy = iBuilder->getVoidTy();
     294    IntegerType * const size_ty = iBuilder->getSizeTy();
     295    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
     296    PointerType * const int8PtrTy = iBuilder->getInt8PtrTy();
     297    IntegerType * const int1ty = iBuilder->getInt1Ty();
     298
     299    Function * const threadFunc = cast<Function>(m->getOrInsertFunction(name, voidTy, int8PtrTy, nullptr));
     300    threadFunc->setCallingConv(CallingConv::C);
     301    Function::arg_iterator args = threadFunc->arg_begin();
     302
     303    Value * const input = &*(args++);
     304    input->setName("input");
     305
     306    KernelBuilder * targetK = kernels[id];
     307    Value * bufferSegments = ConstantInt::get(size_ty, codegen::BufferSegments - 1);
     308    ConstantInt * segmentItems = iBuilder->getSize(codegen::SegmentSize * iBuilder->getBitBlockWidth());
     309    Value * waitCondTest = nullptr;
     310
     311     // Create the basic blocks for the thread function.
     312    BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc, 0);
     313    BasicBlock * outputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "outputCheck", threadFunc, 0);
     314    BasicBlock * doSegmentBlock = BasicBlock::Create(iBuilder->getContext(), "doSegment", threadFunc, 0);
     315    BasicBlock * exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc, 0);
     316
     317    iBuilder->SetInsertPoint(entryBlock);
     318   
     319    Value * sharedStruct = iBuilder->CreateBitCast(input, PointerType::get(sharedStructType, 0));
     320    std::vector<Value *> instancePtrs;
     321    std::vector<std::vector<Value *>> ProducerPos;
     322    for (unsigned k = 0; k < kernels.size(); k++) {
     323        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
     324        instancePtrs.push_back(iBuilder->CreateLoad(ptr));
     325
     326        std::vector<Value *> produced;
     327        for (unsigned i = 0; i < kernels[k]->getStreamOutputs().size(); i++) {
     328            produced.push_back(kernels[k]->getProducedItemCount(instancePtrs[k], kernels[k]->getStreamOutputs()[i].name));
     329        }
     330        ProducerPos.push_back(produced);
     331    }
     332
     333    iBuilder->CreateBr(outputCheckBlock);
     334
     335    iBuilder->SetInsertPoint(outputCheckBlock);
     336    PHINode * segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 2, "segNo");
     337    segNo->addIncoming(iBuilder->getSize(0), entryBlock);
     338    segNo->addIncoming(segNo, outputCheckBlock);
     339
     340    waitCondTest = ConstantInt::get(int1ty, 1);
     341    for (unsigned j = 0; j < targetK->getStreamOutputs().size(); j++) {
     342        std::vector<unsigned> consumerKernels = consumerTable[id][j];
     343        for (unsigned k = 0; k < consumerKernels.size(); k++) {
     344            Value * consumerSegNo = kernels[consumerKernels[k]]->acquireLogicalSegmentNo(instancePtrs[consumerKernels[k]]);
     345            waitCondTest = iBuilder->CreateAnd(waitCondTest, iBuilder->CreateICmpULE(segNo, iBuilder->CreateAdd(consumerSegNo, bufferSegments)));
     346        }
     347    }
     348
     349    if(targetK->getStreamInputs().size() == 0) {
     350
     351        iBuilder->CreateCondBr(waitCondTest, doSegmentBlock, outputCheckBlock);
     352
     353        iBuilder->SetInsertPoint(doSegmentBlock);
     354
     355        Value * terminated = targetK->getTerminationSignal(instancePtrs[id]);
     356        std::vector<Value *> doSegmentArgs = {instancePtrs[id], terminated};       
     357        targetK->createDoSegmentCall(doSegmentArgs);
     358        Value * nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
     359        segNo->addIncoming(nextSegNo, doSegmentBlock);
     360        targetK->releaseLogicalSegmentNo(instancePtrs[id], nextSegNo);
     361
     362        iBuilder->CreateCondBr(terminated, exitThreadBlock, outputCheckBlock);
     363
     364    }
     365    else{
     366
     367        BasicBlock * inputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "inputCheck", threadFunc, 0);
     368
     369        iBuilder->CreateCondBr(waitCondTest, inputCheckBlock, outputCheckBlock);
     370
     371        iBuilder->SetInsertPoint(inputCheckBlock);
     372       
     373        waitCondTest = ConstantInt::get(int1ty, 1);
     374        for (unsigned j = 0; j < targetK->getStreamInputs().size(); j++) {
     375            unsigned producerKernel, outputIndex;
     376            std::tie(producerKernel, outputIndex) = producerTable[id][j];
     377            Value * producerSegNo = kernels[producerKernel]->acquireLogicalSegmentNo(instancePtrs[producerKernel]);
     378            waitCondTest = iBuilder->CreateAnd(waitCondTest, iBuilder->CreateICmpULT(segNo, producerSegNo)); 
     379        }
     380
     381        iBuilder->CreateCondBr(waitCondTest, doSegmentBlock, inputCheckBlock);
     382
     383        iBuilder->SetInsertPoint(doSegmentBlock);
     384
     385        Value * nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
     386        Value * terminated = ConstantInt::get(int1ty, 1);
     387        for (unsigned j = 0; j < targetK->getStreamInputs().size(); j++) {
     388            unsigned producerKernel, outputIndex;
     389            std::tie(producerKernel, outputIndex) = producerTable[id][j];
     390            terminated = iBuilder->CreateAnd(terminated, kernels[producerKernel]->getTerminationSignal(instancePtrs[producerKernel]));
     391            Value * producerSegNo = kernels[producerKernel]->acquireLogicalSegmentNo(instancePtrs[producerKernel]);
     392            terminated = iBuilder->CreateAnd(terminated, iBuilder->CreateICmpEQ(nextSegNo, producerSegNo));
     393        }
     394       
     395        std::vector<Value *> doSegmentArgs = {instancePtrs[id], terminated};
     396        for (unsigned j = 0; j < targetK->getStreamInputs().size(); j++) {
     397            unsigned producerKernel, outputIndex;
     398            std::tie(producerKernel, outputIndex) = producerTable[id][j];
     399            // doSegmentArgs.push_back(ProducerPos[producerKernel][outputIndex]);
     400            doSegmentArgs.push_back(iBuilder->CreateMul(segmentItems, segNo));
     401        }
     402        targetK->createDoSegmentCall(doSegmentArgs);
     403        segNo->addIncoming(nextSegNo, doSegmentBlock);
     404        targetK->releaseLogicalSegmentNo(instancePtrs[id], nextSegNo);
     405
     406        iBuilder->CreateCondBr(terminated, exitThreadBlock, outputCheckBlock);
     407    }
     408
     409    iBuilder->SetInsertPoint(exitThreadBlock);
     410
     411    Value * nullVal = Constant::getNullValue(voidPtrTy);
     412    iBuilder->CreatePThreadExitCall(nullVal);
     413    iBuilder->CreateRetVoid();
     414    iBuilder->restoreIP(ip);
     415
     416    return threadFunc;
     417
     418}
     419
     420void generateParallelPipeline(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
     421    const unsigned threadNum = kernels.size();
     422   
     423    Module * m = iBuilder->getModule();
     424   
     425    IntegerType * const size_ty = iBuilder->getSizeTy();
     426    PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
     427    PointerType * const int8PtrTy = iBuilder->getInt8PtrTy();
     428   
     429    for (auto k : kernels) k->createInstance();
     430   
     431    ProducerTable producerTable = createProducerTable(kernels);
     432    ConsumerTable consumerTable = createConsumerTable(kernels);
     433   
     434    Type * const pthreadsTy = ArrayType::get(size_ty, threadNum);
     435    AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
     436    std::vector<Value *> pthreadsPtrs;
     437    for (unsigned i = 0; i < threadNum; i++) {
     438        pthreadsPtrs.push_back(iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)}));
     439    }
     440    Value * nullVal = Constant::getNullValue(voidPtrTy);
     441    AllocaInst * const status = iBuilder->CreateAlloca(int8PtrTy);
     442   
     443    std::vector<Type *> structTypes;
     444    for (unsigned i = 0; i < kernels.size(); i++) {
     445        structTypes.push_back(kernels[i]->getInstance()->getType());
     446    }
     447    Type * sharedStructType = StructType::get(m->getContext(), structTypes);
     448   
     449    AllocaInst * sharedStruct = iBuilder->CreateAlloca(sharedStructType);
     450    for (unsigned i = 0; i < kernels.size(); i++) {
     451        Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
     452        iBuilder->CreateStore(kernels[i]->getInstance(), ptr);
     453    }
     454    for (unsigned i = 0; i < kernels.size(); i++) {
     455        kernels[i]->releaseLogicalSegmentNo(kernels[i]->getInstance(), iBuilder->getSize(0));
     456    }
     457
     458    std::vector<Function *> thread_functions;
     459    const auto ip = iBuilder->saveIP();
     460    for (unsigned i = 0; i < threadNum; i++) {
     461        thread_functions.push_back(generateParallelPipelineThreadFunction("thread"+std::to_string(i), iBuilder, kernels, sharedStructType, producerTable, consumerTable, i));
     462    }
     463    iBuilder->restoreIP(ip);
     464   
     465    for (unsigned i = 0; i < threadNum; i++) {
     466        iBuilder->CreatePThreadCreateCall(pthreadsPtrs[i], nullVal, thread_functions[i], iBuilder->CreateBitCast(sharedStruct, int8PtrTy));
     467    }
     468   
     469    std::vector<Value *> threadIDs;
     470    for (unsigned i = 0; i < threadNum; i++) {
     471        threadIDs.push_back(iBuilder->CreateLoad(pthreadsPtrs[i]));
     472    }
     473   
     474    for (unsigned i = 0; i < threadNum; i++) {
     475        iBuilder->CreatePThreadJoinCall(threadIDs[i], status);
     476    }
     477
     478}
    253479
    254480void generatePipelineLoop(IDISA::IDISA_Builder * iBuilder, const std::vector<KernelBuilder *> & kernels) {
  • icGREP/icgrep-devel/icgrep/kernels/pipeline.h

    r5260 r5363  
    1414void generatePipelineLoop(IDISA::IDISA_Builder * iBuilder, const std::vector<kernel::KernelBuilder *> & kernels);
    1515
    16 void generatePipelineParallel(IDISA::IDISA_Builder * iBuilder, const std::vector<kernel::KernelBuilder *> & kernels);
     16void generateParallelPipeline(IDISA::IDISA_Builder * iBuilder, const std::vector<kernel::KernelBuilder *> & kernels);
    1717
    1818#endif // PIPELINE_H
Note: See TracChangeset for help on using the changeset viewer.