Ignore:
Timestamp:
Dec 20, 2017, 11:42:53 AM (20 months ago)
Author:
nmedfort
Message:

Bug fix for pipeline: it was terminating too early when there was insufficient output space to process all of the input for a kernel.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • icGREP/icgrep-devel/icgrep/toolchain/pipeline.cpp

    r5782 r5793  
    8585    Value * const segOffset = b->CreateLoad(b->CreateGEP(threadStruct, {b->getInt32(0), b->getInt32(1)}));
    8686
    87     BasicBlock * segmentLoop = BasicBlock::Create(b->getContext(), "segmentLoop", threadFunc);
     87    BasicBlock * const segmentLoop = BasicBlock::Create(b->getContext(), "segmentLoop", threadFunc);
    8888    b->CreateBr(segmentLoop);
    8989
     
    9191    PHINode * const segNo = b->CreatePHI(b->getSizeTy(), 2, "segNo");
    9292    segNo->addIncoming(segOffset, entryBlock);
    93 
    94     Value * terminated = b->getFalse();
    95     Value * const nextSegNo = b->CreateAdd(segNo, b->getSize(1));
    9693
    9794    BasicBlock * const exitThreadBlock = BasicBlock::Create(b->getContext(), "exitThread", threadFunc);
     
    106103    }
    107104
     105    Value * terminated = nullptr;
     106
     107    const bool serialize = codegen::DebugOptionIsSet(codegen::SerializeThreads);
     108
    108109    for (unsigned k = 0; k < n; ++k) {
    109110
     
    114115        b->CreateBr(kernelWait);
    115116
     117        BasicBlock * const kernelCheck = BasicBlock::Create(b->getContext(), kernel->getName() + "Check", threadFunc);
     118
    116119        BasicBlock * const kernelBody = BasicBlock::Create(b->getContext(), kernel->getName() + "Do", threadFunc);
    117120
     121        BasicBlock * const kernelEnd = BasicBlock::Create(b->getContext(), kernel->getName() + "End", threadFunc);
     122
    118123        b->SetInsertPoint(kernelWait);
    119         const unsigned waitIdx = codegen::DebugOptionIsSet(codegen::SerializeThreads) ? (n - 1) : k;
    120 
    121         b->setKernel(kernels[waitIdx]);
     124
     125        b->setKernel(kernels[serialize ? (n - 1) : k]);
    122126        Value * const processedSegmentCount = b->acquireLogicalSegmentNo();
    123127        b->setKernel(kernel);
    124128
    125129        assert (processedSegmentCount->getType() == segNo->getType());
    126         Value * const ready = b->CreateICmpEQ(segNo, processedSegmentCount);
    127 
    128         if (kernel->hasNoTerminateAttribute()) {
    129             b->CreateCondBr(ready, kernelBody, kernelWait);
    130         } else { // If the kernel was terminated in a previous segment then the pipeline is done.
    131             BasicBlock * kernelTerminated = BasicBlock::Create(b->getContext(), kernel->getName() + "Terminated", threadFunc, 0);
    132             BasicBlock * exitBlock = BasicBlock::Create(b->getContext(), kernel->getName() + "Exit", threadFunc, 0);
    133             b->CreateCondBr(ready, kernelTerminated, kernelWait);
    134 
    135             b->SetInsertPoint(kernelTerminated);
    136             Value * terminationSignal = b->getTerminationSignal();
    137             b->CreateCondBr(terminationSignal, exitBlock, kernelBody);
    138             b->SetInsertPoint(exitBlock);
    139             b->releaseLogicalSegmentNo(nextSegNo); // Ensure that the next thread will also exit.
    140             b->CreateBr(exitThreadBlock);
    141         }
    142 
    143         BasicBlock * const kernelEnd = BasicBlock::Create(b->getContext(), kernel->getName() + "End", threadFunc);
     130        Value * const ready = b->CreateICmpEQ(segNo, processedSegmentCount);       
     131        b->CreateCondBr(ready, kernelCheck, kernelWait);
     132
     133        b->SetInsertPoint(kernelCheck);
     134        b->CreateUnlikelyCondBr(b->getTerminationSignal(), kernelEnd, kernelBody);
    144135
    145136        // Execute the kernel segment
    146137        b->SetInsertPoint(kernelBody);
    147138        const auto & inputs = kernel->getStreamInputs();
    148         std::vector<Value *> args = {kernel->getInstance(), terminated};
     139        Value * const isFinal = b->CreateOr(terminated ? terminated : b->getFalse(), b->getTerminationSignal());
     140        std::vector<Value *> args = {kernel->getInstance(), isFinal};
    149141        for (unsigned i = 0; i < inputs.size(); ++i) {
    150142            const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
     
    153145            Value * const produced = f->second;
    154146            args.push_back(produced);
    155             handleInsufficientData(b, produced, terminated, kernelEnd, kernel, inputs[i], buffer);
     147            handleInsufficientData(b, produced, isFinal, kernelEnd, kernel, inputs[i], buffer);
    156148        }
    157149
     
    162154        b->SetInsertPoint(kernelEnd);
    163155
    164         if (!kernel->hasNoTerminateAttribute()) {
    165             terminated = b->CreateOr(terminated, b->getTerminationSignal());
     156        Value * const finished = b->getTerminationSignal();
     157        if (terminated) { // all kernels must terminate
     158            terminated = b->CreateAnd(terminated, finished);
     159        } else {
     160            terminated = finished;
    166161        }
    167162
     
    190185            b->CreateStore(b->CreateAdd(b->CreateLoad(counterPtr), b->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
    191186            cycleCountStart = cycleCountEnd;
    192         }       
    193         b->releaseLogicalSegmentNo(nextSegNo);
     187        }
     188
     189        b->releaseLogicalSegmentNo(b->CreateAdd(segNo, b->getSize(1)));
    194190    }
    195191
     
    213209
    214210    segNo->addIncoming(b->CreateAdd(segNo, b->getSize(codegen::ThreadNum)), b->GetInsertBlock());
     211    if (LLVM_UNLIKELY(terminated == nullptr)) {
     212        report_fatal_error("error: at least one kernel must have a termination signal");
     213    }
    215214    b->CreateUnlikelyCondBr(terminated, exitThreadBlock, segmentLoop);
    216215
     
    240239    // -------------------------------------------------------------------------------------------------------------------------
    241240    const unsigned threads = codegen::ThreadNum - 1;
    242     assert (codegen::ThreadNum > 1);
     241    assert (codegen::ThreadNum > 0);
    243242    Type * const pthreadsTy = ArrayType::get(sizeTy, threads);
    244243    AllocaInst * const pthreads = b->CreateAlloca(pthreadsTy);
     
    279278    }
    280279   
    281     if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
     280    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
     281        for (const Kernel * kernel : kernels) {
     282            b->setKernel(kernel);
     283            const auto & inputs = kernel->getStreamInputs();
     284            const auto & outputs = kernel->getStreamOutputs();
     285            Value * items = nullptr;
     286            if (inputs.empty()) {
     287                items = b->getProducedItemCount(outputs[0].getName());
     288            } else {
     289                items = b->getProcessedItemCount(inputs[0].getName());
     290            }
     291            Value * fItems = b->CreateUIToFP(items, b->getDoubleTy());
     292            Value * cycles = b->CreateLoad(b->getCycleCountPtr());
     293            Value * fCycles = b->CreateUIToFP(cycles, b->getDoubleTy());
     294            const auto formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
     295            Value * stringPtr = b->CreatePointerCast(b->GetString(formatString), b->getInt8PtrTy());
     296            b->CreateCall(b->GetDprintf(), {b->getInt32(2), stringPtr, fItems, fCycles, b->CreateFDiv(fCycles, fItems)});
     297        }
     298    }
     299   
     300}
     301
     302/** ------------------------------------------------------------------------------------------------------------- *
     303 * @brief generatePipelineLoop
     304 ** ------------------------------------------------------------------------------------------------------------- */
     305void generatePipelineLoop(const std::unique_ptr<KernelBuilder> & b, const std::vector<Kernel *> & kernels) {
     306
     307    BasicBlock * entryBlock = b->GetInsertBlock();
     308    Function * main = entryBlock->getParent();
     309
     310    // Create the basic blocks for the loop.
     311    BasicBlock * pipelineLoop = BasicBlock::Create(b->getContext(), "pipelineLoop", main);
     312    BasicBlock * pipelineExit = BasicBlock::Create(b->getContext(), "pipelineExit", main);
     313
     314    StreamSetBufferMap<Value *> producedItemCount;
     315    StreamSetBufferMap<Value *> consumedItemCount;
     316
     317    b->CreateBr(pipelineLoop);
     318    b->SetInsertPoint(pipelineLoop);
     319   
     320    Value * cycleCountStart = nullptr;
     321    Value * cycleCountEnd = nullptr;
     322    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
     323        cycleCountStart = b->CreateReadCycleCounter();
     324    }
     325    Value * terminated = nullptr;
     326
     327    for (Kernel * const kernel : kernels) {
     328
     329        b->setKernel(kernel);
     330        const auto & inputs = kernel->getStreamInputs();
     331        const auto & outputs = kernel->getStreamOutputs();
     332
     333        Value * const isFinal = terminated ? terminated : b->getFalse();
     334
     335        std::vector<Value *> args = {kernel->getInstance(), isFinal};
     336
     337        for (unsigned i = 0; i < inputs.size(); ++i) {
     338            const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
     339            const auto f = producedItemCount.find(buffer);
     340            if (LLVM_UNLIKELY(f == producedItemCount.end())) {
     341                report_fatal_error(kernel->getName() + " uses stream set " + inputs[i].getName() + " prior to its definition");
     342            }
     343            Value * const produced = f->second;
     344            args.push_back(produced);
     345            handleInsufficientData(b, produced, isFinal, pipelineLoop, kernel, inputs[i], buffer);
     346        }
     347
     348        applyOutputBufferExpansions(b, kernel);
     349
     350        b->createDoSegmentCall(args);
     351
     352        Value * const finished = b->getTerminationSignal();
     353        if (terminated) {
     354            // All kernels must agree that we've terminated.
     355            terminated = b->CreateAnd(terminated, finished);
     356        } else {
     357            terminated = finished;
     358        }
     359
     360        for (unsigned i = 0; i < outputs.size(); ++i) {
     361            Value * const produced = b->getProducedItemCount(outputs[i].getName());
     362            const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
     363            assert (producedItemCount.count(buf) == 0);
     364            producedItemCount.emplace(buf, produced);
     365        }
     366
     367        for (unsigned i = 0; i < inputs.size(); ++i) {
     368            Value * const processed = b->getProcessedItemCount(inputs[i].getName());
     369            const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);
     370            auto f = consumedItemCount.find(buf);
     371            if (f == consumedItemCount.end()) {
     372                consumedItemCount.emplace(buf, processed);
     373            } else {
     374                f->second = b->CreateUMin(processed, f->second);
     375            }
     376        }
     377
     378        if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
     379            cycleCountEnd = b->CreateReadCycleCounter();
     380            Value * counterPtr = b->getCycleCountPtr();
     381            b->CreateStore(b->CreateAdd(b->CreateLoad(counterPtr), b->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
     382            cycleCountStart = cycleCountEnd;
     383        }
     384//        Value * const segNo = b->acquireLogicalSegmentNo();
     385//        Value * nextSegNo = b->CreateAdd(segNo, b->getSize(1));
     386//        b->releaseLogicalSegmentNo(nextSegNo);
     387    }
     388
     389    for (const auto consumed : consumedItemCount) {
     390        const StreamSetBuffer * const buffer = consumed.first;
     391        Kernel * const kernel = buffer->getProducer();
     392        const auto & binding = kernel->getStreamOutput(buffer);
     393        if (LLVM_UNLIKELY(binding.getRate().isDerived())) {
     394            continue;
     395        }
     396        b->setKernel(kernel);
     397        b->setConsumedItemCount(binding.getName(), consumed.second);
     398    }
     399
     400    if (LLVM_UNLIKELY(terminated == nullptr)) {
     401        report_fatal_error("error: at least one kernel must have a termination signal");
     402    }
     403    b->CreateCondBr(terminated, pipelineExit, pipelineLoop);
     404
     405    pipelineExit->moveAfter(b->GetInsertBlock());
     406
     407    b->SetInsertPoint(pipelineExit);
     408
     409    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
    282410        for (unsigned k = 0; k < kernels.size(); k++) {
    283411            auto & kernel = kernels[k];
     
    299427        }
    300428    }
    301    
    302 }
    303 
    304 
    305 /** ------------------------------------------------------------------------------------------------------------- *
    306  * @brief generateParallelPipeline
    307  ** ------------------------------------------------------------------------------------------------------------- */
    308 void generateParallelPipeline(const std::unique_ptr<KernelBuilder> & iBuilder, const std::vector<Kernel *> &kernels) {
    309 
    310     Module * const m = iBuilder->getModule();
    311     IntegerType * const sizeTy = iBuilder->getSizeTy();
    312     PointerType * const voidPtrTy = iBuilder->getVoidPtrTy();
    313     ConstantInt * bufferSegments = ConstantInt::get(sizeTy, codegen::BufferSegments - 1);
    314     ConstantInt * segmentItems = ConstantInt::get(sizeTy, codegen::SegmentSize * iBuilder->getBitBlockWidth());
    315     Constant * const nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
    316 
    317     const unsigned n = kernels.size();
    318 
    319     Type * const pthreadsTy = ArrayType::get(sizeTy, n);
    320     AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy);
    321     Value * threadIdPtr[n];
    322     for (unsigned i = 0; i < n; ++i) {
    323         threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
    324     }
    325 
    326     Value * instance[n];
    327     Type * structTypes[n];
    328     for (unsigned i = 0; i < n; ++i) {
    329         instance[i] = kernels[i]->getInstance();
    330         structTypes[i] = instance[i]->getType();
    331     }
    332 
    333     Type * const sharedStructType = StructType::get(m->getContext(), ArrayRef<Type *>{structTypes, n});
    334 
    335 
    336     AllocaInst * sharedStruct = iBuilder->CreateAlloca(sharedStructType);
    337     for (unsigned i = 0; i < n; ++i) {
    338         Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)});
    339         iBuilder->CreateStore(instance[i], ptr);
    340     }
    341 
    342     for (auto & kernel : kernels) {
    343         iBuilder->setKernel(kernel);
    344         iBuilder->releaseLogicalSegmentNo(iBuilder->getSize(0));
    345     }
    346 
    347     // GENERATE THE PRODUCING AND CONSUMING KERNEL MAPS
    348     StreamSetBufferMap<unsigned> producingKernel;
    349     StreamSetBufferMap<std::vector<unsigned>> consumingKernels;
    350     for (unsigned id = 0; id < n; ++id) {
    351         const auto & kernel = kernels[id];
    352         const auto & inputs = kernel->getStreamInputs();
    353         const auto & outputs = kernel->getStreamOutputs();
    354         // add any outputs from this kernel to the producing kernel map
    355         for (unsigned j = 0; j < outputs.size(); ++j) {
    356             const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(j);
    357             if (LLVM_UNLIKELY(producingKernel.count(buf) != 0)) {
    358                 report_fatal_error(kernel->getName() + " redefines stream set " + outputs[j].getName());
    359             }
    360             producingKernel.emplace(buf, id);
    361         }
    362         // and any inputs to the consuming kernels list
    363         for (unsigned j = 0; j < inputs.size(); ++j) {
    364             const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(j);
    365             auto f = consumingKernels.find(buf);
    366             if (f == consumingKernels.end()) {
    367                 if (LLVM_UNLIKELY(producingKernel.count(buf) == 0)) {
    368                     report_fatal_error(kernel->getName() + " uses stream set " + inputs[j].getName() + " prior to its definition");
    369                 }
    370                 consumingKernels.emplace(buf, std::vector<unsigned>{ id });
    371             } else {
    372                 f->second.push_back(id);
    373             }
    374         }
    375     }
    376 
    377     const auto ip = iBuilder->saveIP();
    378 
    379     // GENERATE UNIQUE PIPELINE PARALLEL THREAD FUNCTION FOR EACH KERNEL
    380     FlatSet<unsigned> kernelSet;
    381     kernelSet.reserve(n);
    382 
    383     Function * thread_functions[n];
    384     Value * producerSegNo[n];
    385     for (unsigned id = 0; id < n; id++) {
    386         const auto & kernel = kernels[id];
    387 
    388         iBuilder->setKernel(kernel);
    389 
    390         const auto & inputs = kernel->getStreamInputs();
    391 
    392         Function * const threadFunc = makeThreadFunction(iBuilder, "ppt:" + kernel->getName());
    393         auto ai = threadFunc->arg_begin();
    394        
    395          // Create the basic blocks for the thread function.
    396         BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc);
    397         BasicBlock * outputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "outputCheck", threadFunc);
    398         BasicBlock * inputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "inputCheck", threadFunc);
    399         BasicBlock * doSegmentBlock = BasicBlock::Create(iBuilder->getContext(), "doSegment", threadFunc);
    400         BasicBlock * exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc);
    401 
    402         iBuilder->SetInsertPoint(entryBlock);
    403 
    404         Value * const sharedStruct = iBuilder->CreateBitCast(&*(ai), sharedStructType->getPointerTo());
    405 
    406         for (unsigned k = 0; k < n; k++) {
    407             Value * const ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(k)});
    408             kernels[k]->setInstance(iBuilder->CreateLoad(ptr));
    409         }
    410 
    411         iBuilder->CreateBr(outputCheckBlock);
    412 
    413         // Check whether the output buffers are ready for more data
    414         iBuilder->SetInsertPoint(outputCheckBlock);
    415         PHINode * segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 3, "segNo");
    416         segNo->addIncoming(iBuilder->getSize(0), entryBlock);
    417         segNo->addIncoming(segNo, outputCheckBlock);
    418 
    419         Value * outputWaitCond = iBuilder->getTrue();
    420         for (const StreamSetBuffer * buf : kernel->getStreamSetOutputBuffers()) {
    421             const auto & list = consumingKernels[buf];
    422             assert(std::is_sorted(list.begin(), list.end()));
    423             kernelSet.insert(list.begin(), list.end());
    424         }
    425         for (unsigned k : kernelSet) {
    426             iBuilder->setKernel(kernels[k]);
    427             Value * consumerSegNo = iBuilder->acquireLogicalSegmentNo();
    428             assert (consumerSegNo->getType() == segNo->getType());
    429             Value * consumedSegNo = iBuilder->CreateAdd(consumerSegNo, bufferSegments);
    430             outputWaitCond = iBuilder->CreateAnd(outputWaitCond, iBuilder->CreateICmpULE(segNo, consumedSegNo));
    431         }
    432         kernelSet.clear();
    433         iBuilder->setKernel(kernel);
    434         iBuilder->CreateCondBr(outputWaitCond, inputCheckBlock, outputCheckBlock);
    435 
    436         // Check whether the input buffers have enough data for this kernel to begin
    437         iBuilder->SetInsertPoint(inputCheckBlock);
    438         for (const StreamSetBuffer * buf : kernel->getStreamSetInputBuffers()) {
    439             kernelSet.insert(producingKernel[buf]);
    440         }
    441 
    442         Value * inputWaitCond = iBuilder->getTrue();
    443         for (unsigned k : kernelSet) {
    444             iBuilder->setKernel(kernels[k]);
    445             producerSegNo[k] = iBuilder->acquireLogicalSegmentNo();
    446             assert (producerSegNo[k]->getType() == segNo->getType());
    447             inputWaitCond = iBuilder->CreateAnd(inputWaitCond, iBuilder->CreateICmpULT(segNo, producerSegNo[k]));
    448         }
    449         iBuilder->setKernel(kernel);
    450         iBuilder->CreateCondBr(inputWaitCond, doSegmentBlock, inputCheckBlock);
    451 
    452         // Process the segment
    453         iBuilder->SetInsertPoint(doSegmentBlock);
    454 
    455         Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1));
    456         Value * terminated = nullptr;
    457         if (kernelSet.empty()) {
    458             // if this kernel has no input streams, the kernel itself must decide when it terminates.
    459             terminated = iBuilder->getTerminationSignal();
    460         } else {
    461             // ... otherwise the kernel terminates only when it exhausts all of its input streams
    462             terminated = iBuilder->getTrue();
    463             for (unsigned k : kernelSet) {
    464                 iBuilder->setKernel(kernels[k]);
    465                 terminated = iBuilder->CreateAnd(terminated, iBuilder->getTerminationSignal());
    466                 terminated = iBuilder->CreateAnd(terminated, iBuilder->CreateICmpEQ(nextSegNo, producerSegNo[k]));
    467             }
    468             kernelSet.clear();
    469             iBuilder->setKernel(kernel);
    470         }
    471 
    472         std::vector<Value *> args = {kernel->getInstance(), terminated};
    473         args.insert(args.end(), inputs.size(), iBuilder->CreateMul(segmentItems, segNo));
    474 
    475         iBuilder->createDoSegmentCall(args);
    476         segNo->addIncoming(nextSegNo, doSegmentBlock);
    477         iBuilder->releaseLogicalSegmentNo(nextSegNo);
    478 
    479         iBuilder->CreateCondBr(terminated, exitThreadBlock, outputCheckBlock);
    480 
    481         iBuilder->SetInsertPoint(exitThreadBlock);
    482 
    483         iBuilder->CreatePThreadExitCall(nullVoidPtrVal);
    484 
    485         iBuilder->CreateRetVoid();
    486 
    487         thread_functions[id] = threadFunc;
    488     }
    489 
    490     iBuilder->restoreIP(ip);
    491 
    492     for (unsigned i = 0; i < n; ++i) {
    493         kernels[i]->setInstance(instance[i]);
    494     }
    495 
    496     for (unsigned i = 0; i < n; ++i) {
    497         iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, thread_functions[i], sharedStruct);
    498     }
    499 
    500     AllocaInst * const status = iBuilder->CreateAlloca(voidPtrTy);
    501     for (unsigned i = 0; i < n; ++i) {
    502         Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]);
    503         iBuilder->CreatePThreadJoinCall(threadId, status);
    504     }
    505 }
    506 
    507 /** ------------------------------------------------------------------------------------------------------------- *
    508  * @brief generatePipelineLoop
    509  ** ------------------------------------------------------------------------------------------------------------- */
    510 void generatePipelineLoop(const std::unique_ptr<KernelBuilder> & b, const std::vector<Kernel *> & kernels) {
    511 
    512     BasicBlock * entryBlock = b->GetInsertBlock();
    513     Function * main = entryBlock->getParent();
    514 
    515     // Create the basic blocks for the loop.
    516     BasicBlock * pipelineLoop = BasicBlock::Create(b->getContext(), "pipelineLoop", main);
    517     BasicBlock * pipelineExit = BasicBlock::Create(b->getContext(), "pipelineExit", main);
    518 
    519     StreamSetBufferMap<Value *> producedItemCount;
    520     StreamSetBufferMap<Value *> consumedItemCount;
    521 
    522     b->CreateBr(pipelineLoop);
    523     b->SetInsertPoint(pipelineLoop);
    524    
    525     Value * cycleCountStart = nullptr;
    526     Value * cycleCountEnd = nullptr;
    527     if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
    528         cycleCountStart = b->CreateReadCycleCounter();
    529     }
    530     Value * terminated = b->getFalse();
    531 
    532     for (Kernel * const kernel : kernels) {
    533 
    534         b->setKernel(kernel);
    535         const auto & inputs = kernel->getStreamInputs();
    536         const auto & outputs = kernel->getStreamOutputs();
    537 
    538         std::vector<Value *> args = {kernel->getInstance(), terminated};
    539 
    540         for (unsigned i = 0; i < inputs.size(); ++i) {
    541             const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
    542             const auto f = producedItemCount.find(buffer);
    543             if (LLVM_UNLIKELY(f == producedItemCount.end())) {
    544                 report_fatal_error(kernel->getName() + " uses stream set " + inputs[i].getName() + " prior to its definition");
    545             }
    546             Value * const produced = f->second;
    547             args.push_back(produced);
    548             handleInsufficientData(b, produced, terminated, pipelineLoop, kernel, inputs[i], buffer);
    549         }
    550 
    551         applyOutputBufferExpansions(b, kernel);
    552 
    553         b->createDoSegmentCall(args);
    554 
    555         if (!kernel->hasNoTerminateAttribute()) {
    556             Value * terminatedSignal = b->getTerminationSignal();
    557             terminated = b->CreateOr(terminated, terminatedSignal);
    558         }
    559         for (unsigned i = 0; i < outputs.size(); ++i) {
    560             Value * const produced = b->getProducedItemCount(outputs[i].getName());
    561             const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
    562             assert (producedItemCount.count(buf) == 0);
    563             producedItemCount.emplace(buf, produced);
    564         }
    565 
    566         for (unsigned i = 0; i < inputs.size(); ++i) {
    567             Value * const processed = b->getProcessedItemCount(inputs[i].getName());
    568             const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);
    569             auto f = consumedItemCount.find(buf);
    570             if (f == consumedItemCount.end()) {
    571                 consumedItemCount.emplace(buf, processed);
    572             } else {
    573                 f->second = b->CreateUMin(processed, f->second);
    574             }
    575         }
    576 
    577         if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
    578             cycleCountEnd = b->CreateReadCycleCounter();
    579             Value * counterPtr = b->getCycleCountPtr();
    580             b->CreateStore(b->CreateAdd(b->CreateLoad(counterPtr), b->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
    581             cycleCountStart = cycleCountEnd;
    582         }
    583 //        Value * const segNo = b->acquireLogicalSegmentNo();
    584 //        Value * nextSegNo = b->CreateAdd(segNo, b->getSize(1));
    585 //        b->releaseLogicalSegmentNo(nextSegNo);
    586     }
    587 
    588     for (const auto consumed : consumedItemCount) {
    589         const StreamSetBuffer * const buffer = consumed.first;
    590         Kernel * const kernel = buffer->getProducer();
    591         const auto & binding = kernel->getStreamOutput(buffer);
    592         if (LLVM_UNLIKELY(binding.getRate().isDerived())) {
    593             continue;
    594         }
    595         b->setKernel(kernel);
    596         b->setConsumedItemCount(binding.getName(), consumed.second);
    597     }
    598 
    599     b->CreateCondBr(terminated, pipelineExit, pipelineLoop);
    600 
    601     b->SetInsertPoint(pipelineExit);
    602 
    603     if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
    604         for (unsigned k = 0; k < kernels.size(); k++) {
    605             auto & kernel = kernels[k];
    606             b->setKernel(kernel);
    607             const auto & inputs = kernel->getStreamInputs();
    608             const auto & outputs = kernel->getStreamOutputs();
    609             Value * items = nullptr;
    610             if (inputs.empty()) {
    611                 items = b->getProducedItemCount(outputs[0].getName());
    612             } else {
    613                 items = b->getProcessedItemCount(inputs[0].getName());
    614             }
    615             Value * fItems = b->CreateUIToFP(items, b->getDoubleTy());
    616             Value * cycles = b->CreateLoad(b->getCycleCountPtr());
    617             Value * fCycles = b->CreateUIToFP(cycles, b->getDoubleTy());
    618             const auto formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
    619             Value * stringPtr = b->CreatePointerCast(b->GetString(formatString), b->getInt8PtrTy());
    620             b->CreateCall(b->GetDprintf(), {b->getInt32(2), stringPtr, fItems, fCycles, b->CreateFDiv(fCycles, fItems)});
    621         }
    622     }
     429
    623430}
    624431
     
    671478    const Kernel * const producer = buffer->getProducer();
    672479    const Binding & output = producer->getStreamOutput(buffer);
    673     auto producedRate = producer->getLowerBound(output.getRate()) * producer->getStride();
    674480    const auto consumedRate = consumer->getUpperBound(input.getRate()) * consumer->getStride();
    675     if (LLVM_UNLIKELY(input.hasLookahead())) {
    676         producedRate -= input.getLookahead();
    677 //        const auto amount = input.getLookahead();
    678 //        const auto strides = ((amount + consumer->getStride() - 1) / consumer->getStride());
    679 //        consumedRate += strides * consumer->getStride();
    680     }
    681     if (LLVM_UNLIKELY(producedRate < consumedRate)) {
    682         const auto name = input.getName();
    683         BasicBlock * const sufficient = BasicBlock::Create(b->getContext(), name + "IsSufficient", b->GetInsertBlock()->getParent());
    684         Value * const processed = b->getProcessedItemCount(name);
    685         Value * const unread = b->CreateSub(produced, processed);
    686         Constant * const amount = ConstantInt::get(unread->getType(), ceiling(consumedRate));
    687         Value * const cond = b->CreateOr(b->CreateICmpUGE(unread, amount), final);
    688         b->CreateLikelyCondBr(cond, sufficient, insufficient);
    689         b->SetInsertPoint(sufficient);
     481    if (consumedRate > 0) {
     482        auto producedRate = producer->getLowerBound(output.getRate()) * producer->getStride();
     483        if (LLVM_UNLIKELY(input.hasLookahead())) {
     484            producedRate -= input.getLookahead();
     485        }
     486        if (LLVM_UNLIKELY(producedRate < consumedRate)) {
     487            const auto name = input.getName();
     488            BasicBlock * const sufficient = BasicBlock::Create(b->getContext(), name + "IsSufficient", b->GetInsertBlock()->getParent());
     489            Value * const processed = b->getProcessedItemCount(name);
     490
     491            if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableAsserts))) {
     492                b->CreateAssert(b->CreateICmpULE(processed, produced), input.getName() + ": processed cannot exceed produced");
     493            }
     494            Value * const unread = b->CreateSub(produced, processed);
     495            Constant * const amount = ConstantInt::get(unread->getType(), ceiling(consumedRate));
     496            Value * const cond = b->CreateOr(b->CreateICmpUGE(unread, amount), final);
     497            b->CreateLikelyCondBr(cond, sufficient, insufficient);
     498            b->SetInsertPoint(sufficient);
     499        }
    690500    }
    691501}
Note: See TracChangeset for help on using the changeset viewer.