Ignore:
Timestamp:
Feb 6, 2018, 4:57:35 PM (15 months ago)
Author:
nmedfort
Message:

More work on the pipeline I/O rate handling

File:
1 edited

Legend:

Unmodified
Added
Removed
  • icGREP/icgrep-devel/icgrep/toolchain/pipeline.cpp

    r5856 r5865  
    3636    using StreamSetBufferMap = flat_map<const StreamSetBuffer *, Value>;
    3737
     38    using CheckMap = flat_map<const Kernel *, std::vector<const StreamSetBuffer *>>;
     39
    3840    using RateValue = ProcessingRate::RateValue;
    3941
     
    5557    Value * executeKernel(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel, PHINode * const segNo, Value * const finished);
    5658
     59protected:
     60
     61    Graph makeInputGraph(const std::vector<Kernel *> & kernels);
     62
     63    Graph makeOutputGraph(const std::vector<Kernel *> & kernels);
     64
     65    Graph printGraph(const bool input, Graph && G);
     66
     67    Graph pruneGraph(Graph && G);
     68
     69    void addChecks(Graph && G, CheckMap & M);
     70
    5771    void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel);
    5872
    59     void updateProducedAndConsumedCounts(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel);
    60 
    6173private:
    6274
    63     Graph   G;
    64     Map     M;
     75    CheckMap                            inputAvailabilityChecks;
     76    CheckMap                            outputSpaceChecks;
    6577
    6678    StreamSetBufferMap<Value *>         producedItemCount;
     
    343355    // input from A. Moreover if C is depedent on B, C could be skipped entirely.
    344356
    345     // Note: we cannot simply test the output of A for both B and C. In a our data-parallel
    346     // pipeline A's state may change by the time we process C.
    347 
    348     for (const Kernel * const consumer : kernels) {
    349         const auto v = add_vertex(consumer, G);
     357    // Note: we cannot simply test the output of A for both B and C. In the data-parallel
     358    // pipeline, A's state may change by the time we process C.
     359
     360//    addChecks(printGraph(true, pruneGraph(printGraph(true, makeInputGraph(kernels)))), inputAvailabilityChecks);
     361//    addChecks(printGraph(false, pruneGraph(printGraph(false, makeOutputGraph(kernels)))), outputSpaceChecks);
     362
     363    addChecks(pruneGraph(makeInputGraph(kernels)), inputAvailabilityChecks);
     364    addChecks(pruneGraph(makeOutputGraph(kernels)), outputSpaceChecks);
     365
     366
     367    // iterate through each kernel in order and determine which kernel last used a particular buffer
     368    for (Kernel * const kernel : kernels) {
     369        const auto & inputs = kernel->getStreamInputs();
     370        for (unsigned i = 0; i < inputs.size(); ++i) {
     371            lastConsumer[kernel->getStreamSetInputBuffer(i)] = kernel;
     372        }
     373    }
     374
     375}
     376
     377/** ------------------------------------------------------------------------------------------------------------- *
     378 * @brief makeInputGraph
     379 *
     380 * The input graph models whether a kernel could *consume* more data than may be produced by a preceeding kernel.
     381 ** ------------------------------------------------------------------------------------------------------------- */
     382PipelineGenerator::Graph PipelineGenerator::makeInputGraph(const std::vector<Kernel *> & kernels) {
     383
     384    const auto n = kernels.size();
     385    Graph   G(n);
     386    Map     M;
     387
     388    for (Graph::vertex_descriptor v = 0; v < n; ++v) {
     389
     390        const Kernel * const consumer = kernels[v];
    350391        M.emplace(consumer, v);
     392        G[v] = consumer;
     393
    351394        const auto & inputs = consumer->getStreamInputs();
    352395        for (unsigned i = 0; i < inputs.size(); ++i) {
     396
     397            const Binding & input = inputs[i];
     398            auto ub_in = consumer->getUpperBound(input.getRate()) * consumer->getStride(); assert (ub_in > 0);
     399            if (input.hasLookahead()) {
     400                ub_in += input.getLookahead();
     401            }
    353402
    354403            const auto buffer = consumer->getStreamSetInputBuffer(i);
    355404            const Kernel * const producer = buffer->getProducer();
    356405            const Binding & output = producer->getStreamOutput(buffer);
    357             if (output.getRate().isRelative()) continue;
    358 
    359             const Binding & input = inputs[i];
    360             auto ub_in = consumer->getUpperBound(input.getRate()) * consumer->getStride();
    361             if (input.hasLookahead()) {
    362                 ub_in += input.getLookahead();
    363             }
    364 
    365406            const auto lb_out = producer->getLowerBound(output.getRate()) * producer->getStride();
    366407
     
    388429        }
    389430    }
     431    return G;
     432}
     433
     434/** ------------------------------------------------------------------------------------------------------------- *
     435 * @brief makeOutputGraph
     436 *
     437 * The output graph models whether a kernel could *produce* more data than may be consumed by a subsequent kernel.
     438 ** ------------------------------------------------------------------------------------------------------------- */
     439PipelineGenerator::Graph PipelineGenerator::makeOutputGraph(const std::vector<Kernel *> & kernels) {
     440
     441    const auto n = kernels.size();
     442    Graph   G(n);
     443    Map     M;
     444
     445    for (Graph::vertex_descriptor i = 0; i < n; ++i) {
     446        const Kernel * const consumer = kernels[i];
     447        const auto v = n - i - 1;
     448        M.emplace(consumer, v);
     449        G[v] = consumer;
     450
     451        const auto & inputs = consumer->getStreamInputs();
     452        for (unsigned i = 0; i < inputs.size(); ++i) {
     453            const auto buffer = consumer->getStreamSetInputBuffer(i);
     454            if (isa<SourceBuffer>(buffer)) continue;
     455            const Kernel * const producer = buffer->getProducer();
     456            const Binding & output = producer->getStreamOutput(buffer);
     457            auto ub_out = producer->getUpperBound(output.getRate()) * producer->getStride();
     458            if (LLVM_UNLIKELY(ub_out > 0)) { // unknown output rates are handled by reallocating their buffers
     459                const Binding & input = inputs[i];
     460                if (input.hasLookahead()) {
     461                    ub_out -= input.getLookahead();
     462                }
     463                const auto lb_in = consumer->getLowerBound(input.getRate()) * consumer->getStride();
     464                const auto inverseRate = lb_in / ub_out;
     465                const auto f = M.find(producer); assert (f != M.end());
     466                const auto u = f->second;
     467                // If we have multiple inputs from the same kernel, we only need to consider the "fastest" one
     468                bool fastest = true;
     469                if (ub_out > 0) {
     470                    for (const auto e : make_iterator_range(in_edges(v, G))) {
     471                        if (source(e, G) == u) {
     472                            Channel & p = G[e];
     473                            fastest = false;
     474                            if (inverseRate < p.rate) {
     475                                p.rate = inverseRate;
     476                                p.buffer = buffer;
     477                            }
     478                            break;
     479                        }
     480                    }
     481                }
     482                if (fastest) {
     483                    add_edge(v, u, Channel{inverseRate, buffer}, G);
     484                }
     485            }
     486        }
     487    }
     488    return G;
     489}
     490
     491/** ------------------------------------------------------------------------------------------------------------- *
     492 * @brief printGraph
     493 ** ------------------------------------------------------------------------------------------------------------- */
     494PipelineGenerator::Graph PipelineGenerator::printGraph(const bool input, Graph && G) {
     495
     496    auto & out = errs();
     497
     498    out << "digraph " << (input ? "I" : "O") << " {\n";
     499    for (auto u : make_iterator_range(vertices(G))) {
     500        out << "v" << u << " [label=\"" << u << ": "
     501            << G[u]->getName() << "\"];\n";
     502    }
     503
     504    for (auto e : make_iterator_range(edges(G))) {
     505        const Channel & c = G[e];
     506        const auto s = source(e, G);
     507        const auto t = target(e, G);
     508        const Kernel * const S = G[input ? s : t];
     509        const Kernel * const T = G[input ? t : s];
     510
     511        out << "v" << s << " -> v" << t
     512            << " [label=\"";
     513
     514        if (c.buffer) {
     515            out << S->getStreamOutput(c.buffer).getName()
     516                << " -> "
     517                << T->getStreamInput(c.buffer).getName()
     518                << "   ";
     519        }
     520
     521        out << c.rate.numerator() << " / " << c.rate.denominator()
     522            << "\"];\n";
     523    }
     524
     525    out << "}\n\n";
     526    out.flush();
     527
     528    return G;
     529}
     530
     531/** ------------------------------------------------------------------------------------------------------------- *
     532 * @brief pruneGraph
     533 ** ------------------------------------------------------------------------------------------------------------- */
     534PipelineGenerator::Graph PipelineGenerator::pruneGraph(Graph && G) {
    390535
    391536    // Take a transitive closure of G but whenever we attempt to insert an edge into the closure
    392537    // that already exists, check instead whether the rate of our proposed edge is <= the existing
    393     // edge's rate. If so, the data availability is transitively guaranteed.
     538    // edge's rate. If so, the data availability/consumption is transitively guaranteed.
    394539    for (const auto u : make_iterator_range(vertices(G))) {
    395540        for (auto ei : make_iterator_range(in_edges(u, G))) {
    396541            const auto v = source(ei, G);
    397             const Channel & pu = G[ei];           
    398             for (auto ej : make_iterator_range(out_edges(u, G))) {               
     542            const Channel & pu = G[ei];
     543            for (auto ej : make_iterator_range(out_edges(u, G))) {
    399544                const auto w = target(ej, G);
    400545                const auto ratio = RateValue(G[u]->getStride(), G[w]->getStride());
     
    404549                    if (source(ek, G) == v) {
    405550                        Channel & pw = G[ek];
    406                         if (rate <= pw.rate && pw.rate > 0) {
     551                        if (rate <= pw.rate) {
    407552                            pw.buffer = nullptr;
    408553                        }
     
    419564
    420565    // remove any closure edges from G
    421     remove_edge_if([&](const Graph::edge_descriptor e) { return G[e].buffer == nullptr; }, G);
    422 
    423     // If a kernel has no 'necessary to check' inputs then we can remove every output with a rate >= 1 from G
     566    remove_edge_if([&G](const Graph::edge_descriptor e) { return G[e].buffer == nullptr; }, G);
     567
     568    // For any kernel, if we do not need to check any of its inputs, we can avoid checking any of its
     569    // outputs that have a rate >= 1 (i.e., its production rates >= consumption rates.)
    424570    for (const auto u : make_iterator_range(vertices(G))) {
    425571        if (in_degree(u, G) == 0) {
    426             remove_out_edge_if(u, [&](const Graph::edge_descriptor e) { return G[e].rate >= RateValue{1, 1}; }, G);
    427         }
    428     }
    429 
    430     // iterate through each kernel in order and determine which kernel last used a particular buffer
    431     for (Kernel * const kernel : kernels) {
    432         const auto & inputs = kernel->getStreamInputs();
    433         for (unsigned i = 0; i < inputs.size(); ++i) {
    434             lastConsumer[kernel->getStreamSetInputBuffer(i)] = kernel;
    435         }
    436     }
    437 
     572            remove_out_edge_if(u, [&G](const Graph::edge_descriptor e) { return G[e].rate >= RateValue{1, 1}; }, G);
     573        }
     574    }
     575
     576    return G;
     577}
     578
     579/** ------------------------------------------------------------------------------------------------------------- *
     580 * @brief addChecks
     581 ** ------------------------------------------------------------------------------------------------------------- */
     582void PipelineGenerator::addChecks(Graph && G, CheckMap & M) {
     583    for (const auto u : make_iterator_range(vertices(G))) {
     584        if (LLVM_LIKELY(in_degree(u, G) == 0)) continue;
     585        flat_set<const StreamSetBuffer *> B;
     586        for (auto e : make_iterator_range(in_edges(u, G))) {
     587            B.insert(G[e].buffer);
     588        }
     589        M.emplace(G[u], std::vector<const StreamSetBuffer *>{B.begin(), B.end()});
     590    }
    438591}
    439592
     
    441594 * @brief executeKernel
    442595 ** ------------------------------------------------------------------------------------------------------------- */
    443 Value *PipelineGenerator::executeKernel(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel, PHINode * const segNo, Value * const finished) {
     596Value * PipelineGenerator::executeKernel(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel, PHINode * const segNo, Value * const finished) {
    444597
    445598    const auto & inputs = kernel->getStreamInputs();
    446599
    447600    std::vector<Value *> args(2 + inputs.size());
    448 
    449     const auto f = M.find(kernel); assert (f != M.end());
    450     const auto u = f->second;
    451601
    452602    BasicBlock * const kernelEntry = b->GetInsertBlock();
    453603    BasicBlock * const kernelCode = b->CreateBasicBlock(kernel->getName());
    454     BasicBlock * const kernelExit = b->CreateBasicBlock(kernel->getName() + "_exit");
     604    BasicBlock * const kernelFinished = b->CreateBasicBlock(kernel->getName() + "Finished");
     605    BasicBlock * const kernelExit = b->CreateBasicBlock(kernel->getName() + "Exit");
    455606
    456607    b->CreateUnlikelyCondBr(b->getTerminationSignal(), kernelExit, kernelCode);
     608
     609    b->SetInsertPoint(kernelFinished);
     610    PHINode * const final = b->CreatePHI(b->getInt1Ty(), 2);
    457611
    458612    b->SetInsertPoint(kernelExit);
    459613    PHINode * const terminated = b->CreatePHI(b->getInt1Ty(), 2);
    460     // Since our initial "isFinal" state is equal to what the first kernel's termination signal state
     614    // The initial "isFinal" state is equal to the first kernel's termination signal state
    461615    terminated->addIncoming(finished ? finished : b->getTrue(), kernelEntry);
    462616    Value * isFinal = finished ? finished : b->getFalse();
    463617
     618    // Since it is possible that a sole consumer of some stream could terminate early, set the
     619    // initial consumed amount to the amount produced in this iteration.
     620    std::vector<PHINode *> consumedItemCountPhi(inputs.size());
     621    std::vector<Value *> priorConsumedItemCount(inputs.size());
     622    for (unsigned i = 0; i < inputs.size(); ++i) {
     623        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
     624        auto c = consumedItemCount.find(buffer);
     625        PHINode * const consumedPhi = b->CreatePHI(b->getSizeTy(), 2);
     626        Value * consumed = nullptr;
     627        if (c == consumedItemCount.end()) {
     628            const auto p = producedItemCount.find(buffer);
     629            assert (p != producedItemCount.end());
     630            consumed = p->second;
     631            consumedItemCount.emplace(buffer, consumedPhi);
     632        } else {
     633            consumed = c->second;
     634            c->second = consumedPhi;
     635        }
     636        consumedPhi->addIncoming(consumed, kernelEntry);
     637        consumedItemCountPhi[i] = consumedPhi;
     638        priorConsumedItemCount[i] = consumed;
     639    }
     640
    464641    b->SetInsertPoint(kernelCode);
     642
     643    // Check for sufficient output space
     644    const auto O = outputSpaceChecks.find(kernel);
     645    if (O != outputSpaceChecks.end()) {
     646        for (const StreamSetBuffer * buffer : O->second) {
     647
     648
     649            const Binding & output = kernel->getStreamOutput(buffer);
     650            const auto name = output.getName();
     651            BasicBlock * const sufficient = b->CreateBasicBlock(name + "HasOutputSpace");
     652            const auto ub = kernel->getUpperBound(output.getRate()); assert (ub > 0);
     653            Constant * const strideLength = b->getSize(ceiling(ub * kernel->getStride()));
     654            Value * const produced = b->getProducedItemCount(name);
     655            Value * const consumed = b->getConsumedItemCount(name);
     656            Value * const unused = b->CreateSub(produced, consumed);
     657            Value * const potentialData = b->CreateAdd(unused, strideLength);
     658            Value * const capacity = b->getBufferedSize(name);
     659
     660          //  b->CallPrintInt("< " + kernel->getName() + "_" + name + "_potential", potentialData);
     661          //  b->CallPrintInt("< " + kernel->getName() + "_" + name + "_capacity", capacity);
     662
     663            Value * const hasSufficientSpace = b->CreateICmpULE(potentialData, capacity);
     664
     665          //  b->CallPrintInt("* < " + kernel->getName() + "_" + name + "_sufficientSpace", hasSufficientSpace);
     666
     667            final->addIncoming(b->getFalse(), b->GetInsertBlock());
     668            b->CreateLikelyCondBr(hasSufficientSpace, sufficient, kernelFinished);
     669            b->SetInsertPoint(sufficient);
     670        }
     671    }
     672
    465673    for (unsigned i = 0; i < inputs.size(); ++i) {
    466 
    467674        const Binding & input = inputs[i];
    468 
    469675        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
    470 
    471676        const auto name = input.getName();
    472677
     
    476681        }
    477682        Value * const produced = p->second;
     683
     684      //  b->CallPrintInt("< " + kernel->getName() + "_" + name + "_produced", produced);
     685
    478686        const auto ub = kernel->getUpperBound(input.getRate()); assert (ub > 0);
    479687        const auto strideLength = ceiling(ub * kernel->getStride()) ;
    480688        Constant * const segmentLength = b->getSize(strideLength * codegen::SegmentSize);
    481689
    482         if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
     690        if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts) && !isa<SourceBuffer>(buffer))) {
    483691            b->CreateAssert(b->CreateICmpULE(segmentLength, b->getCapacity(name)),
    484692                            kernel->getName() + ": " + name + " upper bound of segment length exceeds buffer capacity");
    485693        }
    486694
    487         Value * limit = nullptr;
    488         if (input.getRate().isFixed()) {
    489             // if the input is deferred, simply adding length to the processed item count may result in setting a limit
    490             // that is too low for. instead just calculate the limit of all fixed rates from the segment no.
    491             limit = b->CreateMul(b->CreateAdd(segNo, b->getSize(1)), segmentLength);
    492         } else {
    493             Value * const processed = b->getProcessedItemCount(name);
    494             limit = b->CreateAdd(processed, segmentLength);
    495         }
     695//        Value * limit = nullptr;
     696//        if (input.getRate().isFixed()) {
     697//            // if the input is deferred, simply adding length to the processed item count may result in setting a limit
     698//            // that is too low for. instead just calculate the limit of all fixed rates from the segment no.
     699//            limit = b->CreateMul(b->CreateAdd(segNo, b->getSize(1)), segmentLength);
     700//        } else {
     701//            Value * const processed = b->getProcessedItemCount(name);
     702//            limit = b->CreateAdd(processed, segmentLength);
     703//        }
     704
     705        // if the input is deferred, simply adding length to the processed item count may result in setting a limit
     706        // that is too low for. instead just calculate the limit of all fixed rates from the segment no.
     707        Value * const limit = b->CreateMul(b->CreateAdd(segNo, b->getSize(1)), segmentLength);
     708
     709     //  b->CallPrintInt("< " + kernel->getName() + "_" + name + "_limit", limit);
    496710
    497711        // TODO: currently, if we produce the exact amount as our limit states, we will have to process one additional segment
     
    502716        args[i + 2] = b->CreateSelect(consumingAll, produced, limit);
    503717        isFinal = b->CreateAnd(isFinal, consumingAll);
    504 
    505         // Check for available input (if it's both computable and not guaranteed to be sufficient by the processing rates)
    506         for (auto e : make_iterator_range(in_edges(u, G))) {
    507             const auto p = G[e];
    508             if (p.buffer == buffer) {
    509                 BasicBlock * const sufficient = b->CreateBasicBlock(name + "_hasSufficientData");
    510 
    511                 Constant * const sl = b->getSize(strideLength);
    512 
    513                 Value * remaining = nullptr;
    514                 if (input.getRate().isFixed()) {
    515                     remaining = b->CreateMul(segNo, sl);
    516                 } else {
    517                     remaining = b->getProcessedItemCount(name);
    518                 }
    519                 remaining = b->CreateSub(produced, remaining);
    520 
    521                 Value * const hasSufficientData = b->CreateOr(b->CreateICmpUGE(remaining, sl), isFinal);
    522                 terminated->addIncoming(b->getFalse(), b->GetInsertBlock());
    523                 b->CreateLikelyCondBr(hasSufficientData, sufficient, kernelExit);
    524                 b->SetInsertPoint(sufficient);
    525             }
     718    }
     719
     720    // Check for available input
     721    const auto I = inputAvailabilityChecks.find(kernel);
     722    if (I != inputAvailabilityChecks.end()) {
     723        for (const StreamSetBuffer * buffer : I->second) {
     724            const Binding & input = kernel->getStreamInput(buffer);
     725            const auto name = input.getName();
     726            BasicBlock * const sufficient = b->CreateBasicBlock(name + "HasInputData");
     727            const auto ub = kernel->getUpperBound(input.getRate()); assert (ub > 0);
     728            Constant * const strideLength = b->getSize(ceiling(ub * kernel->getStride()));
     729            Value * const processed = b->getProcessedItemCount(name);
     730//            if (input.getRate().isFixed()) {
     731//                processed = b->CreateMul(segNo, strideLength);
     732//            } else {
     733//                processed = b->getProcessedItemCount(name);
     734//            }
     735            const auto p = producedItemCount.find(buffer);
     736            assert (p != producedItemCount.end());
     737            Value * const produced = p->second;
     738            Value * const unprocessed = b->CreateSub(produced, processed);
     739
     740          //  b->CallPrintInt("< " + kernel->getName() + "_" + name + "_unprocessed", unprocessed);
     741
     742            Value * const hasSufficientData = b->CreateOr(b->CreateICmpUGE(unprocessed, strideLength), isFinal);
     743
     744          //  b->CallPrintInt("* < " + kernel->getName() + "_" + name + "_sufficientData", hasSufficientData);
     745
     746            final->addIncoming(b->getFalse(), b->GetInsertBlock());
     747            b->CreateLikelyCondBr(hasSufficientData, sufficient, kernelFinished);
     748            b->SetInsertPoint(sufficient);
    526749        }
    527750    }
     
    538761    }
    539762    b->setTerminationSignal(isFinal);
    540 //    b->CallPrintInt(kernel->getName() + "_finished", isFinal);
    541     BasicBlock * const kernelFinished = b->GetInsertBlock();
     763  //  b->CallPrintInt(kernel->getName() + "_finished", isFinal);
     764    final->addIncoming(isFinal, b->GetInsertBlock());
     765    b->CreateBr(kernelFinished);
     766
     767    b->SetInsertPoint(kernelFinished);
     768
     769    // update the consumed item counts
     770    for (unsigned i = 0; i < inputs.size(); ++i) {
     771        Value * const processed = b->getProcessedItemCount(inputs[i].getName());
     772      //  b->CallPrintInt("> " + kernel->getName() + "_" + inputs[i].getName() + "_processed", processed);
     773        Value * const consumed = b->CreateUMin(priorConsumedItemCount[i], processed);
     774        consumedItemCountPhi[i]->addIncoming(consumed, kernelFinished);
     775    }
     776    b->CreateBr(kernelExit);
     777
    542778    kernelExit->moveAfter(kernelFinished);
    543     b->CreateBr(kernelExit);
    544779
    545780    b->SetInsertPoint(kernelExit);
    546     terminated->addIncoming(isFinal, kernelFinished);
    547 
    548     updateProducedAndConsumedCounts(b, kernel);
     781    terminated->addIncoming(final, kernelFinished);
     782
     783
     784    // If this kernel is the last consumer of a input buffer, update the consumed count for that buffer.
     785    // NOTE: unless we can prove that this kernel cannot terminate before any prior consumer, we cannot
     786    // put this code into the kernelFinished block.
     787    for (unsigned i = 0; i < inputs.size(); ++i) {
     788        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
     789        const auto c = lastConsumer.find(buffer);
     790        assert (c != lastConsumer.end());
     791        if (c->second == kernel) {
     792            Kernel * const producer = buffer->getProducer();
     793            const auto & output = producer->getStreamOutput(buffer);
     794            if (output.getRate().isRelative()) continue;
     795
     796           // b->CallPrintInt("* " + producer->getName() + "_" + output.getName() + "_consumed", consumedItemCountPhi[i]);
     797
     798            b->setKernel(producer);
     799            if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
     800                Value * const alreadyConsumed = b->getConsumedItemCount(output.getName());
     801                b->CreateAssert(b->CreateICmpULE(alreadyConsumed, consumedItemCountPhi[i]),
     802                                producer->getName() + ": " + output.getName() + " consumed item count is not monotonically non-decreasing!");
     803            }
     804            b->setConsumedItemCount(output.getName(), consumedItemCountPhi[i]);
     805            b->setKernel(kernel);
     806        }
     807    }
     808
     809    const auto & outputs = kernel->getStreamOutputs();
     810    for (unsigned i = 0; i < outputs.size(); ++i) {
     811        Value * const produced = b->getProducedItemCount(outputs[i].getName());
     812
     813       // b->CallPrintInt("> " + kernel->getName() + "_" + outputs[i].getName() + "_produced", produced);
     814
     815        const StreamSetBuffer * const buffer = kernel->getStreamSetOutputBuffer(i);
     816        assert (producedItemCount.count(buffer) == 0);
     817        producedItemCount.emplace(buffer, produced);
     818    }
    549819
    550820    return terminated;
    551821}
     822
    552823
    553824/** ------------------------------------------------------------------------------------------------------------- *
     
    588859}
    589860
    590 /** ------------------------------------------------------------------------------------------------------------- *
    591  * @brief updateProducedAndConsumedCounts
    592  ** ------------------------------------------------------------------------------------------------------------- */
    593 void PipelineGenerator::updateProducedAndConsumedCounts(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel) {
    594 
    595     const auto & inputs = kernel->getStreamInputs();
    596     for (unsigned i = 0; i < inputs.size(); ++i) {
    597         Value * const processed = b->getProcessedItemCount(inputs[i].getName());
    598 
    599         const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
    600         auto f = consumedItemCount.find(buffer);
    601         Value * consumed = processed;
    602         if (f == consumedItemCount.end()) {
    603             consumedItemCount.emplace(buffer, consumed);
    604         } else {
    605             consumed = b->CreateUMin(consumed, f->second);
    606             f->second = consumed;
    607         }
    608 
    609         // If this kernel is the last consumer of a input buffer, update the consumed count for that buffer.
    610         const auto c = lastConsumer.find(buffer);
    611         assert (c != lastConsumer.end());
    612         if (c->second == kernel) {
    613             Kernel * const producer = buffer->getProducer();
    614             const auto & output = producer->getStreamOutput(buffer);
    615             if (output.getRate().isRelative()) continue;
    616             b->setKernel(producer);
    617 
    618             b->setConsumedItemCount(output.getName(), consumed);
    619             b->setKernel(kernel);
    620         }
    621     }
    622 
    623     const auto & outputs = kernel->getStreamOutputs();
    624     for (unsigned i = 0; i < outputs.size(); ++i) {
    625         Value * const produced = b->getProducedItemCount(outputs[i].getName());
    626         const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
    627         assert (producedItemCount.count(buf) == 0);
    628         producedItemCount.emplace(buf, produced);
    629     }
    630 
    631 }
    632 
    633 
Note: See TracChangeset for help on using the changeset viewer.