Ignore:
Timestamp:
Apr 24, 2018, 2:57:34 PM (16 months ago)
Author:
nmedfort
Message:

Restructured MultiBlock? kernel. Removal of Swizzled buffers. Inclusion of PopCount? rates / non-linear access. Modifications to several kernels to better align them with the kernel and pipeline changes.

Location:
icGREP/icgrep-devel/icgrep/toolchain
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • icGREP/icgrep-devel/icgrep/toolchain/cpudriver.cpp

    r5932 r5985  
    158158        kernel->prepareKernel(iBuilder);
    159159    }
    160 
    161160    // note: instantiation of all kernels must occur prior to initialization
    162161    for (Kernel * const k : mPipeline) {
     
    198197
    199198std::string ParabixDriver::getMangledName(std::string s) {
     199    #if LLVM_VERSION_INTEGER >= LLVM_VERSION_CODE(3, 9, 0)
    200200    DataLayout DL(mTarget->createDataLayout());   
    201201    std::string MangledName;
     
    203203    Mangler::getNameWithPrefix(MangledNameStream, s, DL);
    204204    return MangledName;
     205    #else
     206    return s;
     207    #endif
    205208}
    206209
  • icGREP/icgrep-devel/icgrep/toolchain/pipeline.cpp

    r5967 r5985  
    1212#include <boost/container/flat_map.hpp>
    1313#include <boost/graph/adjacency_list.hpp>
     14#include <boost/range/adaptor/reversed.hpp>
    1415#include <kernels/kernel_builder.h>
    15 
    1616#include <llvm/Support/raw_ostream.h>
    1717
     
    2222using namespace boost::container;
    2323
     24#define DISABLE_COPY_TO_OVERFLOW
     25
    2426using Port = Kernel::Port;
    2527
     
    3133}
    3234
    33 struct PipelineGenerator {
     35class PipelineGenerator {
     36public:
    3437
    3538    template <typename Value>
    3639    using StreamSetBufferMap = flat_map<const StreamSetBuffer *, Value>;
    3740
    38     using CheckMap = flat_map<const Kernel *, std::vector<const StreamSetBuffer *>>;
    39 
    4041    using RateValue = ProcessingRate::RateValue;
     42
     43    PipelineGenerator(const std::vector<Kernel *> & kernels)
     44    : kernels(kernels)
     45    , terminated(nullptr)
     46    , noMore(nullptr)
     47    , deadLockCounter(nullptr)
     48    , anyProgress(nullptr)
     49    , madeProgress(nullptr) {
     50
     51    }
    4152
    4253    struct Channel {
    4354        Channel() = default;
    44         Channel(const RateValue & rate, const StreamSetBuffer * const buffer)
    45         : rate(rate), buffer(buffer) { }
    46 
    47         RateValue               rate;
     55        Channel(const RateValue & ratio, const StreamSetBuffer * const buffer = nullptr, const unsigned operand = 0)
     56        : ratio(ratio), buffer(buffer), operand(operand) { }
     57
     58        RateValue               ratio;
    4859        const StreamSetBuffer * buffer;
     60        unsigned                operand;
    4961    };
    5062
    51     using Graph = adjacency_list<vecS, vecS, bidirectionalS, const Kernel *, Channel, vecS>;
    52 
    53     using Map = flat_map<const Kernel *, Graph::vertex_descriptor>;
    54 
    55     void initialize(const std::vector<Kernel *> & kernels);
    56 
    57     Value * executeKernel(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel, PHINode * const segNo, Value * const finished);
     63    using ChannelGraph = adjacency_list<vecS, vecS, bidirectionalS, const Kernel *, Channel>;
     64
     65    using DependencyGraph = adjacency_list<hash_setS, vecS, bidirectionalS, Value *>;
     66
     67    using KernelMap = flat_map<const Kernel *, unsigned>;
     68
     69    void initialize(const std::unique_ptr<KernelBuilder> & b, BasicBlock * const entryBlock);
     70
     71    void execute(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
     72
     73    Value * finalize(const std::unique_ptr<KernelBuilder> & b);
    5874
    5975protected:
    6076
    61     Graph makeInputGraph(const std::vector<Kernel *> & kernels);
    62 
    63     Graph makeOutputGraph(const std::vector<Kernel *> & kernels);
    64 
    65     Graph printGraph(const bool input, Graph && G);
    66 
    67     Graph pruneGraph(Graph && G);
    68 
    69     void addChecks(Graph && G, CheckMap & M);
    70 
    71     void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel);
     77    ChannelGraph makeInputGraph() const;
     78
     79    ChannelGraph makeOutputGraph() const;
     80
     81    DependencyGraph makeDependencyGraph() const;
     82
     83    template<class VertexList>
     84    ChannelGraph pruneGraph(ChannelGraph && G, VertexList && V) const;
     85
     86    void checkIfAllInputKernelsAreTerminated(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
     87
     88    void checkAvailableInputData(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
     89
     90    void checkAvailableOutputSpace(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
     91
     92    Value * getStrideLength(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel, const Binding & binding);
     93
     94    Value * callKernel(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
     95
     96    void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
     97
     98    Value * getFullyProcessedItemCount(const std::unique_ptr<KernelBuilder> & b, const Binding & binding, Value * const final) const;
     99
     100    void printGraph(const ChannelGraph & G) const;
     101
     102    void printGraph(const DependencyGraph & G) const;
    72103
    73104private:
    74105
    75     CheckMap                            inputAvailabilityChecks;
    76     CheckMap                            outputSpaceChecks;
     106    const std::vector<Kernel *> &       kernels;
     107    PHINode *                           terminated;
     108
     109    Value *                             noMore;
     110
     111    DependencyGraph                     dependencyGraph;
     112    ChannelGraph                        inputGraph;
     113    ChannelGraph                        outputGraph;
     114
     115    BasicBlock *                        kernelFinished;
     116
     117    PHINode *                           deadLockCounter;
     118    Value *                             anyProgress;
     119    PHINode *                           madeProgress;
    77120
    78121    StreamSetBufferMap<Value *>         producedItemCount;
    79122    StreamSetBufferMap<Value *>         consumedItemCount;
    80123    StreamSetBufferMap<const Kernel *>  lastConsumer;
     124    flat_set<const StreamSetBuffer *>   isConsumedAtNonFixedRate;
    81125};
    82126
     
    130174    Value * const segOffset = b->CreateLoad(b->CreateGEP(threadStruct, {b->getInt32(0), b->getInt32(1)}));
    131175
    132     PipelineGenerator G;
     176    PipelineGenerator G(kernels);
    133177
    134178    BasicBlock * const segmentLoop = b->CreateBasicBlock("segmentLoop");
    135179    b->CreateBr(segmentLoop);
    136180
    137     b->SetInsertPoint(segmentLoop);
    138     G.initialize(kernels);
     181    b->SetInsertPoint(segmentLoop);   
    139182    PHINode * const segNo = b->CreatePHI(b->getSizeTy(), 2, "segNo");
    140183    segNo->addIncoming(segOffset, entryBlock);
    141     Value * finished = nullptr;
     184    G.initialize(b, entryBlock);
    142185
    143186    Value * cycleCountStart = nullptr;
     
    159202        b->setKernel(kernels[serialize ? (n - 1) : k]);
    160203        Value * const processedSegmentCount = b->acquireLogicalSegmentNo();
    161         b->setKernel(kernel);
    162204        assert (processedSegmentCount->getType() == segNo->getType());
    163205        Value * const ready = b->CreateICmpEQ(segNo, processedSegmentCount);
     
    168210        b->SetInsertPoint(kernelCheck);
    169211
    170         finished = G.executeKernel(b, kernel, segNo, finished);
     212        G.execute(b, k);
     213
     214        b->releaseLogicalSegmentNo(b->CreateAdd(segNo, b->getSize(1)));
    171215
    172216        if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
     
    177221        }
    178222
    179         b->releaseLogicalSegmentNo(b->CreateAdd(segNo, b->getSize(1)));
    180223    }
    181224
    182225    segNo->addIncoming(b->CreateAdd(segNo, b->getSize(codegen::ThreadNum)), b->GetInsertBlock());
    183 
     226    Value * const finished = G.finalize(b);
    184227    BasicBlock * const segmentExit = b->CreateBasicBlock("segmentExit");
    185228    b->CreateUnlikelyCondBr(finished, segmentExit, segmentLoop);
    186229
    187230    b->SetInsertPoint(segmentExit);
    188 
    189231    // only call pthread_exit() within spawned threads; otherwise it'll be equivalent to calling exit() within the process
    190232    BasicBlock * const exitThread = b->CreateBasicBlock("ExitThread");
    191     BasicBlock * const exitFunction = b->CreateBasicBlock("ExitProcessFunction");
    192 
     233    BasicBlock * const exitFunction = b->CreateBasicBlock("ExitProcessFunction");   
    193234    b->CreateCondBr(b->CreateIsNull(segOffset), exitFunction, exitThread);
    194235    b->SetInsertPoint(exitThread);
    195236    b->CreatePThreadExitCall(nullVoidPtrVal);
    196     b->CreateBr(exitFunction);
     237    b->CreateBr(exitFunction);   
    197238    b->SetInsertPoint(exitFunction);
    198239    b->CreateRetVoid();
     
    279320    BasicBlock * const entryBlock = b->GetInsertBlock();
    280321    BasicBlock * const pipelineLoop = b->CreateBasicBlock("pipelineLoop");
    281     BasicBlock * const pipelineExit = b->CreateBasicBlock("pipelineExit");
    282 
    283     PipelineGenerator G;
     322
     323    PipelineGenerator G(kernels);
    284324
    285325    b->CreateBr(pipelineLoop);
    286326
    287     b->SetInsertPoint(pipelineLoop);
    288     G.initialize(kernels);
     327    b->SetInsertPoint(pipelineLoop);   
    289328    PHINode * const segNo = b->CreatePHI(b->getSizeTy(), 2, "segNo");
    290329    segNo->addIncoming(b->getSize(0), entryBlock);
    291     Value * finished = nullptr;
     330    G.initialize(b, entryBlock);
     331
     332    Value * const nextSegNo = b->CreateAdd(segNo, b->getSize(1));
    292333
    293334    Value * cycleCountStart = nullptr;
     
    297338    }
    298339
    299     for (Kernel * const kernel : kernels) {
    300 
    301         b->setKernel(kernel);
    302 
    303         finished = G.executeKernel(b, kernel, segNo, finished);
     340    for (unsigned i = 0; i < kernels.size(); ++i) {
     341
     342        G.execute(b, i);
     343
     344        b->releaseLogicalSegmentNo(nextSegNo);
    304345
    305346        if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
     
    311352    }
    312353
    313     segNo->addIncoming(b->CreateAdd(segNo, b->getSize(1)), b->GetInsertBlock());
     354    segNo->addIncoming(nextSegNo, b->GetInsertBlock());
     355    BasicBlock * const pipelineExit = b->CreateBasicBlock("pipelineExit");
     356    Value * const finished = G.finalize(b);
    314357    b->CreateCondBr(finished, pipelineExit, pipelineLoop);
    315358
    316     pipelineExit->moveAfter(b->GetInsertBlock());
    317 
    318359    b->SetInsertPoint(pipelineExit);
    319 
    320360    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
    321361        for (unsigned k = 0; k < kernels.size(); k++) {
     
    344384 * @brief initialize
    345385 ** ------------------------------------------------------------------------------------------------------------- */
    346 void PipelineGenerator::initialize(const std::vector<Kernel *> & kernels) {
    347 
    348     // Our goal when building G is *not* to model the dataflow of our program but instead to
    349     // detetermine the minimum number of sufficient data tests needed to ensure each kernel has
    350     // enough data to progress.
    351 
    352     // For example, suppose we have kernels A, B and C, and that B has a fixed input and fixed
    353     // output rate. C also has a fixed input rate but A does *not* have a fixed output rate.
    354     // C must test whether it has enough input from B as B is not guaranteed to have enough
    355     // input from A. Moreover if C is depedent on B, C could be skipped entirely.
    356 
    357     // Note: we cannot simply test the output of A for both B and C. In the data-parallel
    358     // pipeline, A's state may change by the time we process C.
    359 
    360 //    addChecks(printGraph(true, pruneGraph(printGraph(true, makeInputGraph(kernels)))), inputAvailabilityChecks);
    361 //    addChecks(printGraph(false, pruneGraph(printGraph(false, makeOutputGraph(kernels)))), outputSpaceChecks);
    362 
    363     addChecks(pruneGraph(makeInputGraph(kernels)), inputAvailabilityChecks);
    364     addChecks(pruneGraph(makeOutputGraph(kernels)), outputSpaceChecks);
    365 
    366 
    367     // iterate through each kernel in order and determine which kernel last used a particular buffer
     386void PipelineGenerator::initialize(const std::unique_ptr<KernelBuilder> & b, BasicBlock * const entryBlock) {
     387
     388    dependencyGraph = makeDependencyGraph();
     389    inputGraph = makeInputGraph();
     390    outputGraph = makeOutputGraph();
     391
    368392    for (Kernel * const kernel : kernels) {
    369393        const auto & inputs = kernel->getStreamInputs();
    370394        for (unsigned i = 0; i < inputs.size(); ++i) {
    371             lastConsumer[kernel->getStreamSetInputBuffer(i)] = kernel;
    372         }
    373     }
    374 
     395            const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
     396            if (kernel->requiresCopyBack(inputs[i]) && !buffer->isUnbounded()) {
     397                if (LLVM_LIKELY(buffer->supportsCopyBack())) {
     398                    isConsumedAtNonFixedRate.insert(buffer);
     399                } else {
     400    //                std::string tmp;
     401    //                raw_string_ostream out(tmp);
     402    //                out << kernel->getName() << " : " << name << " must have an overflow";
     403    //                report_fatal_error(out.str());
     404                }
     405            }
     406        }
     407        // if this kernel consumes this buffer, update the last consumer
     408        for (const StreamSetBuffer * const buffer : kernel->getStreamSetInputBuffers()) {
     409            auto f = lastConsumer.find(buffer);
     410            assert (f != lastConsumer.end());
     411            f->second = kernel;
     412        }
     413        // incase some output is never consumed, make the kernel that produced it the initial "last consumer"
     414        for (const StreamSetBuffer * const buffer : kernel->getStreamSetOutputBuffers()) {
     415            assert (buffer->getProducer() == kernel);
     416            assert (lastConsumer.count(buffer) == 0);
     417            lastConsumer.emplace(buffer, kernel);
     418        }
     419    }
     420
     421    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
     422        deadLockCounter = b->CreatePHI(b->getSizeTy(), 2, "deadLockCounter");
     423        deadLockCounter->addIncoming(b->getSize(0), entryBlock);
     424        anyProgress = b->getFalse();
     425    }
     426
     427}
     428
     429/** ------------------------------------------------------------------------------------------------------------- *
     430 * @brief makeTerminationGraph
     431 *
     432 * The input graph models whether a kernel could *consume* more data than may be produced by a preceeding kernel.
     433 ** ------------------------------------------------------------------------------------------------------------- */
     434PipelineGenerator::DependencyGraph PipelineGenerator::makeDependencyGraph() const {
     435    const auto n = kernels.size();
     436    DependencyGraph G(n);
     437    KernelMap M;
     438    // construct a kernel dependency graph
     439    for (unsigned v = 0; v < n; ++v) {
     440        const Kernel * const kernel = kernels[v];       
     441        for (const StreamSetBuffer * buf : kernel->getStreamSetInputBuffers()) {
     442            const auto f = M.find(buf->getProducer()); assert (f != M.end());
     443            add_edge(f->second, v, G);
     444        }
     445        M.emplace(kernel, v);
     446    }
     447    // generate a transitive closure
     448    for (unsigned u = 0; u < n; ++u) {
     449        for (auto e : make_iterator_range(in_edges(u, G))) {
     450            const auto s = source(e, G);
     451            for (auto f : make_iterator_range(out_edges(u, G))) {
     452                add_edge(s, target(f, G), G);
     453            }
     454        }
     455    }
     456    // then take the transitive reduction
     457    std::vector<unsigned> sources;
     458    for (unsigned u = n; u-- > 0; ) {
     459        if (in_degree(u, G) > 0 && out_degree(u, G) > 0) {
     460            for (auto e : make_iterator_range(in_edges(u, G))) {
     461                sources.push_back(source(e, G));
     462            }
     463            std::sort(sources.begin(), sources.end());
     464            for (auto e : make_iterator_range(out_edges(u, G))) {
     465                remove_in_edge_if(target(e, G), [&G, &sources](const DependencyGraph::edge_descriptor f) {
     466                    return std::binary_search(sources.begin(), sources.end(), source(f, G));
     467                }, G);
     468            }
     469            sources.clear();
     470        }
     471    }
     472    return G;
    375473}
    376474
     
    380478 * The input graph models whether a kernel could *consume* more data than may be produced by a preceeding kernel.
    381479 ** ------------------------------------------------------------------------------------------------------------- */
    382 PipelineGenerator::Graph PipelineGenerator::makeInputGraph(const std::vector<Kernel *> & kernels) {
    383 
     480PipelineGenerator::ChannelGraph PipelineGenerator::makeInputGraph() const {
    384481    const auto n = kernels.size();
    385     Graph   G(n);
    386     Map     M;
    387 
    388     for (Graph::vertex_descriptor v = 0; v < n; ++v) {
     482    ChannelGraph G(n);
     483    KernelMap M;
     484    for (unsigned v = 0; v < n; ++v) {
    389485
    390486        const Kernel * const consumer = kernels[v];
     487        G[v] = consumer;
    391488        M.emplace(consumer, v);
    392         G[v] = consumer;
    393489
    394490        const auto & inputs = consumer->getStreamInputs();
     
    396492
    397493            const Binding & input = inputs[i];
    398             auto ub_in = consumer->getUpperBound(input.getRate()) * consumer->getStride(); assert (ub_in > 0);
     494            auto ub_in = consumer->getUpperBound(input.getRate()) * consumer->getStride() * codegen::SegmentSize; assert (ub_in > 0);
    399495            if (input.hasLookahead()) {
    400496                ub_in += input.getLookahead();
     
    404500            const Kernel * const producer = buffer->getProducer();
    405501            const Binding & output = producer->getStreamOutput(buffer);
    406             const auto lb_out = producer->getLowerBound(output.getRate()) * producer->getStride();
    407 
    408             const auto rate = lb_out / ub_in;
     502            const auto lb_out = producer->getLowerBound(output.getRate()) * producer->getStride() * codegen::SegmentSize;
     503
     504            const auto min_oi_ratio = lb_out / ub_in;
    409505            const auto f = M.find(producer); assert (f != M.end());
    410506            const auto u = f->second;
    411507            // If we have multiple inputs from the same kernel, we only need to consider the "slowest" one
    412508            bool slowest = true;
    413             if (lb_out > 0) {
    414                 for (const auto e : make_iterator_range(in_edges(v, G))) {
    415                     if (source(e, G) == u) {
    416                         Channel & p = G[e];
     509            for (const auto e : make_iterator_range(in_edges(v, G))) {
     510                if (source(e, G) == u) {
     511                    const Channel & p = G[e];
     512                    if (min_oi_ratio > p.ratio) {
    417513                        slowest = false;
    418                         if (rate < p.rate) {
    419                             p.rate = rate;
    420                             p.buffer = buffer;
    421                         }
    422                         break;
     514                    } else if (min_oi_ratio < p.ratio) {
     515                        clear_in_edges(v, G);
    423516                    }
     517                    break;
    424518                }
    425519            }
    426520            if (slowest) {
    427                 add_edge(u, v, Channel{rate, buffer}, G);
    428             }
    429         }
    430     }
    431     return G;
     521                add_edge(u, v, Channel{min_oi_ratio, buffer, i}, G);
     522            }
     523        }
     524    }
     525
     526    return pruneGraph(std::move(G), std::move(make_iterator_range(vertices(G))));
    432527}
    433528
     
    437532 * The output graph models whether a kernel could *produce* more data than may be consumed by a subsequent kernel.
    438533 ** ------------------------------------------------------------------------------------------------------------- */
    439 PipelineGenerator::Graph PipelineGenerator::makeOutputGraph(const std::vector<Kernel *> & kernels) {
    440 
     534PipelineGenerator::ChannelGraph PipelineGenerator::makeOutputGraph() const {
    441535    const auto n = kernels.size();
    442     Graph   G(n);
    443     Map     M;
    444 
    445     for (Graph::vertex_descriptor i = 0; i < n; ++i) {
    446         const Kernel * const consumer = kernels[i];
    447         const auto v = n - i - 1;
     536    ChannelGraph G(n);
     537    KernelMap M;
     538    for (unsigned v = 0; v < n; ++v) {
     539        const Kernel * const consumer = kernels[v];
     540        G[v] = consumer;
    448541        M.emplace(consumer, v);
    449         G[v] = consumer;
    450542
    451543        const auto & inputs = consumer->getStreamInputs();
     
    454546            if (isa<SourceBuffer>(buffer)) continue;
    455547            const Kernel * const producer = buffer->getProducer();
     548            assert (consumer != producer);
    456549            const Binding & output = producer->getStreamOutput(buffer);
    457             auto ub_out = producer->getUpperBound(output.getRate()) * producer->getStride();
    458             if (LLVM_UNLIKELY(ub_out > 0)) { // unknown output rates are handled by reallocating their buffers
     550            auto ub_out = producer->getUpperBound(output.getRate()) * producer->getStride() * codegen::SegmentSize;
     551            if (ub_out > 0) { // unknown output rates are handled by reallocating their buffers
    459552                const Binding & input = inputs[i];
    460553                if (input.hasLookahead()) {
    461                     ub_out -= input.getLookahead();
     554                    const auto la = input.getLookahead();
     555                    if (LLVM_UNLIKELY(ub_out <= la)) {
     556                        llvm::report_fatal_error("lookahead exceeds segment size");
     557                    }
     558                    ub_out += la;
    462559                }
    463                 const auto lb_in = consumer->getLowerBound(input.getRate()) * consumer->getStride();
    464                 const auto inverseRate = lb_in / ub_out;
     560                const auto lb_in = consumer->getLowerBound(input.getRate()) * consumer->getStride() * codegen::SegmentSize;
     561                const auto min_io_ratio = lb_in / ub_out;
    465562                const auto f = M.find(producer); assert (f != M.end());
    466563                const auto u = f->second;
     564                assert (v != u);
     565                assert (G[u] == producer);
    467566                // If we have multiple inputs from the same kernel, we only need to consider the "fastest" one
    468567                bool fastest = true;
    469                 if (ub_out > 0) {
    470                     for (const auto e : make_iterator_range(in_edges(v, G))) {
    471                         if (source(e, G) == u) {
    472                             Channel & p = G[e];
     568                for (const auto e : make_iterator_range(in_edges(v, G))) {
     569                    if (source(e, G) == u) {
     570                        const Channel & p = G[e];
     571                        if (min_io_ratio > p.ratio) {
    473572                            fastest = false;
    474                             if (inverseRate < p.rate) {
    475                                 p.rate = inverseRate;
    476                                 p.buffer = buffer;
    477                             }
    478                             break;
     573                        } else if (min_io_ratio < p.ratio) {
     574                            clear_in_edges(v, G);
    479575                        }
     576                        break;
    480577                    }
    481578                }
    482579                if (fastest) {
    483                     add_edge(v, u, Channel{inverseRate, buffer}, G);
     580                    add_edge(v, u, Channel{min_io_ratio, buffer, i}, G);
    484581                }
    485582            }
    486583        }
    487584    }
    488     return G;
    489 }
    490 
    491 /** ------------------------------------------------------------------------------------------------------------- *
    492  * @brief printGraph
    493  ** ------------------------------------------------------------------------------------------------------------- */
    494 PipelineGenerator::Graph PipelineGenerator::printGraph(const bool input, Graph && G) {
    495 
    496     auto & out = errs();
    497 
    498     out << "digraph " << (input ? "I" : "O") << " {\n";
    499     for (auto u : make_iterator_range(vertices(G))) {
    500         out << "v" << u << " [label=\"" << u << ": "
    501             << G[u]->getName() << "\"];\n";
    502     }
    503 
    504     for (auto e : make_iterator_range(edges(G))) {
    505         const Channel & c = G[e];
    506         const auto s = source(e, G);
    507         const auto t = target(e, G);
    508         const Kernel * const S = G[input ? s : t];
    509         const Kernel * const T = G[input ? t : s];
    510 
    511         out << "v" << s << " -> v" << t
    512             << " [label=\"";
    513 
    514         if (c.buffer) {
    515             out << S->getStreamOutput(c.buffer).getName()
    516                 << " -> "
    517                 << T->getStreamInput(c.buffer).getName()
    518                 << "   ";
    519         }
    520 
    521         out << c.rate.numerator() << " / " << c.rate.denominator()
    522             << "\"];\n";
    523     }
    524 
    525     out << "}\n\n";
    526     out.flush();
    527 
    528     return G;
     585
     586    return pruneGraph(std::move(G), std::move(boost::adaptors::reverse(make_iterator_range(vertices(G)))));
    529587}
    530588
     
    532590 * @brief pruneGraph
    533591 ** ------------------------------------------------------------------------------------------------------------- */
    534 PipelineGenerator::Graph PipelineGenerator::pruneGraph(Graph && G) {
    535 
     592template<class VertexList>
     593inline PipelineGenerator::ChannelGraph PipelineGenerator::pruneGraph(ChannelGraph && G, VertexList && V) const {
    536594    // Take a transitive closure of G but whenever we attempt to insert an edge into the closure
    537595    // that already exists, check instead whether the rate of our proposed edge is <= the existing
    538596    // edge's rate. If so, the data availability/consumption is transitively guaranteed.
    539     for (const auto u : make_iterator_range(vertices(G))) {
     597    for (const auto u : V) {
    540598        for (auto ei : make_iterator_range(in_edges(u, G))) {
    541599            const auto v = source(ei, G);
    542             const Channel & pu = G[ei];
     600            const Channel & ci = G[ei];
    543601            for (auto ej : make_iterator_range(out_edges(u, G))) {
    544602                const auto w = target(ej, G);
    545                 const auto ratio = RateValue(G[u]->getStride(), G[w]->getStride());
    546                 const auto rate = pu.rate * ratio;
     603                // ci.rate = BOUND_RATIO(u, v) * (STRIDE(u) / STRIDE(v))
     604                const auto scaling = RateValue(G[v]->getStride(), G[w]->getStride());
     605                const auto rate = ci.ratio * scaling;
    547606                bool insert = true;
    548607                for (auto ek : make_iterator_range(in_edges(w, G))) {
     608                    // do we already have a vw edge?
    549609                    if (source(ek, G) == v) {
    550                         Channel & pw = G[ek];
    551                         if (rate <= pw.rate) {
    552                             pw.buffer = nullptr;
     610                        Channel & ck = G[ek];
     611                        if (rate <= ck.ratio) {
     612                            ck.buffer = nullptr;
    553613                        }
    554614                        insert = false;
    555                         break;
    556615                    }
    557616                }
    558617                if (insert) {
    559                     add_edge(v, w, Channel{rate, nullptr}, G);
     618                    add_edge(v, w, Channel{rate}, G);
    560619                }
    561620            }
     
    564623
    565624    // remove any closure edges from G
    566     remove_edge_if([&G](const Graph::edge_descriptor e) { return G[e].buffer == nullptr; }, G);
    567 
    568     // For any kernel, if we do not need to check any of its inputs, we can avoid checking any of its
    569     // outputs that have a rate >= 1 (i.e., its production rates >= consumption rates.)
    570     for (const auto u : make_iterator_range(vertices(G))) {
     625    remove_edge_if([&G](const ChannelGraph::edge_descriptor e) { return G[e].buffer == nullptr; }, G);
     626
     627    for (const auto u : V) {
    571628        if (in_degree(u, G) == 0) {
    572             remove_out_edge_if(u, [&G](const Graph::edge_descriptor e) { return G[e].rate >= RateValue{1, 1}; }, G);
     629            // If we do not need to check any of its inputs, we can avoid testing any output of a kernel
     630            // with a rate ratio of at least 1
     631            remove_out_edge_if(u, [&G](const ChannelGraph::edge_descriptor e) {
     632                return G[e].ratio >= RateValue{1, 1};
     633            }, G);
    573634        }
    574635    }
     
    578639
    579640/** ------------------------------------------------------------------------------------------------------------- *
    580  * @brief addChecks
    581  ** ------------------------------------------------------------------------------------------------------------- */
    582 void PipelineGenerator::addChecks(Graph && G, CheckMap & M) {
    583     for (const auto u : make_iterator_range(vertices(G))) {
    584         if (LLVM_LIKELY(in_degree(u, G) == 0)) continue;
    585         flat_set<const StreamSetBuffer *> B;
    586         for (auto e : make_iterator_range(in_edges(u, G))) {
    587             B.insert(G[e].buffer);
    588         }
    589         M.emplace(G[u], std::vector<const StreamSetBuffer *>{B.begin(), B.end()});
    590     }
    591 }
    592 
    593 /** ------------------------------------------------------------------------------------------------------------- *
    594641 * @brief executeKernel
    595642 ** ------------------------------------------------------------------------------------------------------------- */
    596 Value * PipelineGenerator::executeKernel(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel, PHINode * const segNo, Value * const finished) {
     643void PipelineGenerator::execute(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
     644
     645    const Kernel * const kernel = kernels[index];
     646    b->setKernel(kernel);
    597647
    598648    const auto & inputs = kernel->getStreamInputs();
    599 
    600     std::vector<Value *> args(2 + inputs.size());
     649    const auto & outputs = kernel->getStreamOutputs();
    601650
    602651    BasicBlock * const kernelEntry = b->GetInsertBlock();
    603652    BasicBlock * const kernelCode = b->CreateBasicBlock(kernel->getName());
    604     BasicBlock * const kernelFinished = b->CreateBasicBlock(kernel->getName() + "Finished");
     653    kernelFinished = b->CreateBasicBlock(kernel->getName() + "Finished");
    605654    BasicBlock * const kernelExit = b->CreateBasicBlock(kernel->getName() + "Exit");
    606655
     
    608657
    609658    b->SetInsertPoint(kernelFinished);
    610     PHINode * const final = b->CreatePHI(b->getInt1Ty(), 2);
    611 
     659    terminated = b->CreatePHI(b->getInt1Ty(), 2);
     660    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
     661        madeProgress = b->CreatePHI(b->getInt1Ty(), 3);
     662    }
    612663    b->SetInsertPoint(kernelExit);
    613     PHINode * const terminated = b->CreatePHI(b->getInt1Ty(), 2);
    614     // The initial "isFinal" state is equal to the first kernel's termination signal state
    615     terminated->addIncoming(finished ? finished : b->getTrue(), kernelEntry);
    616     Value * isFinal = finished ? finished : b->getFalse();
     664    PHINode * const isFinal = b->CreatePHI(b->getInt1Ty(), 2, kernel->getName() + "_isFinal");
     665    isFinal->addIncoming(b->getTrue(), kernelEntry);
     666    isFinal->addIncoming(terminated, kernelFinished);
     667
     668    PHINode * progress = nullptr;
     669    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
     670        progress = b->CreatePHI(b->getInt1Ty(), 2, kernel->getName() + "_anyProgress");
     671        progress->addIncoming(anyProgress, kernelEntry);
     672        progress->addIncoming(madeProgress, kernelFinished);
     673    }
    617674
    618675    // Since it is possible that a sole consumer of some stream could terminate early, set the
     
    620677    std::vector<PHINode *> consumedItemCountPhi(inputs.size());
    621678    std::vector<Value *> priorConsumedItemCount(inputs.size());
     679
    622680    for (unsigned i = 0; i < inputs.size(); ++i) {
    623681        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
     
    641699    b->SetInsertPoint(kernelCode);
    642700
    643     // Check for sufficient output space
    644     const auto O = outputSpaceChecks.find(kernel);
    645     if (O != outputSpaceChecks.end()) {
    646         for (const StreamSetBuffer * buffer : O->second) {
    647 
    648 
    649             const Binding & output = kernel->getStreamOutput(buffer);
    650 
    651             if (output.isDisableSufficientChecking()) {
    652                 continue;
    653             }
    654 
    655             const auto name = output.getName();
    656             BasicBlock * const sufficient = b->CreateBasicBlock(name + "HasOutputSpace");
    657             const auto ub = kernel->getUpperBound(output.getRate()); assert (ub > 0);
    658             Constant * const strideLength = b->getSize(ceiling(ub * kernel->getStride()));
    659             Value * const produced = b->getProducedItemCount(name);
    660             Value * const consumed = b->getConsumedItemCount(name);
    661             Value * const unused = b->CreateSub(produced, consumed);
    662             Value * const potentialData = b->CreateAdd(unused, strideLength);
    663             Value * const capacity = b->getBufferedSize(name);
    664 
    665           //  b->CallPrintInt("< " + kernel->getName() + "_" + name + "_potential", potentialData);
    666           //  b->CallPrintInt("< " + kernel->getName() + "_" + name + "_capacity", capacity);
    667 
    668             Value * const hasSufficientSpace = b->CreateICmpULE(potentialData, capacity);
    669 
    670           //  b->CallPrintInt("* < " + kernel->getName() + "_" + name + "_sufficientSpace", hasSufficientSpace);
    671 
    672             final->addIncoming(b->getFalse(), b->GetInsertBlock());
    673             b->CreateLikelyCondBr(hasSufficientSpace, sufficient, kernelFinished);
    674             b->SetInsertPoint(sufficient);
    675         }
    676     }
    677 
     701    checkIfAllInputKernelsAreTerminated(b, index);
     702
     703    checkAvailableInputData(b, index);
     704
     705    checkAvailableOutputSpace(b, index);
     706
     707    applyOutputBufferExpansions(b, index);
     708
     709    Value * const finalStride = callKernel(b, index);
     710
     711    // TODO: add in some checks to verify the kernel actually adheres to its rate
     712
     713    BasicBlock * const kernelCodeExit = b->GetInsertBlock();
     714    terminated->addIncoming(finalStride, kernelCodeExit);
     715    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
     716        madeProgress->addIncoming(b->getTrue(), kernelCodeExit);
     717        anyProgress = progress;
     718    }
     719    b->CreateBr(kernelFinished);
     720
     721    b->SetInsertPoint(kernelFinished);
     722
     723    // update the consumed item counts
    678724    for (unsigned i = 0; i < inputs.size(); ++i) {
    679725        const Binding & input = inputs[i];
    680         const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
    681         const auto name = input.getName();
    682 
    683         const auto p = producedItemCount.find(buffer);
    684         if (LLVM_UNLIKELY(p == producedItemCount.end())) {
    685             report_fatal_error(kernel->getName() + " uses stream set " + name + " prior to its definition");
    686         }
    687         Value * const produced = p->second;
    688 
    689       //  b->CallPrintInt("< " + kernel->getName() + "_" + name + "_produced", produced);
    690 
    691 
    692         const auto ub = kernel->getUpperBound(input.getRate()); assert (ub > 0);
    693         const auto strideLength = ceiling(ub * kernel->getStride()) ;
    694         Constant * const segmentLength = b->getSize(strideLength * codegen::SegmentSize);
    695 
    696 //        if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts) && !isa<SourceBuffer>(buffer))) {
    697 //            b->CreateAssert(b->CreateICmpULE(segmentLength, b->getCapacity(name)),
    698 //                            kernel->getName() + ": " + name + " upper bound of segment length exceeds buffer capacity");
    699 //        }
    700 
    701 //        Value * limit = nullptr;
    702 //        if (input.getRate().isFixed()) {
    703 //            // if the input is deferred, simply adding length to the processed item count may result in setting a limit
    704 //            // that is too low for. instead just calculate the limit of all fixed rates from the segment no.
    705 //            limit = b->CreateMul(b->CreateAdd(segNo, b->getSize(1)), segmentLength);
    706 //        } else {
    707 //            Value * const processed = b->getProcessedItemCount(name);
    708 //            limit = b->CreateAdd(processed, segmentLength);
    709 //        }
    710 
    711         // if the input is deferred, simply adding length to the processed item count may result in setting a limit
    712         // that is too low for. instead just calculate the limit of all fixed rates from the segment no.
    713         Value * const limit = b->CreateMul(b->CreateAdd(segNo, b->getSize(1)), segmentLength);
    714 
    715      //  b->CallPrintInt("< " + kernel->getName() + "_" + name + "_limit", limit);
    716 
    717         // TODO: currently, if we produce the exact amount as our limit states, we will have to process one additional segment
    718         // before we can consider this kernel finished. We ought to be able to avoid doing in some cases but need to prove its
    719         // always safe to do so.
    720 
    721         Value * const consumingAll = b->CreateICmpULT(produced, limit);
    722         args[i + 2] = b->CreateSelect(consumingAll, produced, limit);
    723         isFinal = b->CreateAnd(isFinal, consumingAll);
    724     }
    725 
    726     // Check for available input
    727     const auto I = inputAvailabilityChecks.find(kernel);
    728     if (I != inputAvailabilityChecks.end()) {
    729         for (const StreamSetBuffer * buffer : I->second) {
    730             const Binding & input = kernel->getStreamInput(buffer);
    731             if (input.isDisableSufficientChecking()) {
     726        Value * const fullyProcessed = getFullyProcessedItemCount(b, input, terminated);
     727        Value * const consumed = b->CreateUMin(priorConsumedItemCount[i], fullyProcessed);
     728        consumedItemCountPhi[i]->addIncoming(consumed, kernelFinished);
     729    }
     730    b->CreateBr(kernelExit);
     731
     732    kernelExit->moveAfter(kernelFinished);
     733
     734    b->SetInsertPoint(kernelExit);
     735
     736    for (unsigned i = 0; i < outputs.size(); ++i) {
     737        const Binding & output = outputs[i];
     738        Value * const produced = b->getProducedItemCount(output.getName());
     739        const StreamSetBuffer * const buffer = kernel->getStreamSetOutputBuffer(i);
     740        // if some stream has no consumer, set the consumed item count to the produced item count
     741        const auto c = lastConsumer.find(buffer);
     742        assert (c != lastConsumer.end());
     743        if (LLVM_UNLIKELY(c->second == kernel)) {
     744            assert (buffer->getProducer() == kernel);
     745            if (LLVM_UNLIKELY(output.getRate().isRelative())) {
    732746                continue;
    733747            }
    734 
    735             const auto name = input.getName();
    736             BasicBlock * const sufficient = b->CreateBasicBlock(name + "HasInputData");
    737 
    738             Constant * strideLength = nullptr;
    739             if (LLVM_UNLIKELY(input.hasAttribute(kernel::Attribute::KindId::AlwaysConsume))) {
    740                 const auto lb = kernel->getLowerBound(input.getRate());
    741                 strideLength = b->getSize(ceiling(lb * kernel->getStride()));
    742             } else {
    743                 const auto ub = kernel->getUpperBound(input.getRate()); assert (ub > 0);
    744                 strideLength = b->getSize(ceiling(ub * kernel->getStride()) - 1);
    745             }
    746 
    747             Value * const processed = b->getProcessedItemCount(name);
    748 //            if (input.getRate().isFixed()) {
    749 //                processed = b->CreateMul(segNo, strideLength);
    750 //            } else {
    751 //                processed = b->getProcessedItemCount(name);
    752 //            }
    753             const auto p = producedItemCount.find(buffer);
    754             assert (p != producedItemCount.end());
    755             Value * const produced = p->second;
    756             Value * const unprocessed = b->CreateSub(produced, processed);
    757 
    758           //  b->CallPrintInt("< " + kernel->getName() + "_" + name + "_unprocessed", unprocessed);
    759 
    760             Value * const hasSufficientData = b->CreateOr(b->CreateICmpUGT(unprocessed, strideLength), isFinal);
    761 
    762           //  b->CallPrintInt("* < " + kernel->getName() + "_" + name + "_sufficientData", hasSufficientData);
    763 
    764             final->addIncoming(b->getFalse(), b->GetInsertBlock());
    765             b->CreateLikelyCondBr(hasSufficientData, sufficient, kernelFinished);
    766             b->SetInsertPoint(sufficient);
    767         }
    768     }
    769 
    770     applyOutputBufferExpansions(b, kernel);
    771 
    772     args[0] = kernel->getInstance();
    773     args[1] = isFinal;
    774 
    775     b->createDoSegmentCall(args);
    776 
    777     if (kernel->hasAttribute(kernel::Attribute::KindId::MustConsumeAll)) {
    778         // workaround to modify is final
    779         for (unsigned i = 0; i < inputs.size(); ++i) {
    780             const Binding &input = inputs[i];
    781             const StreamSetBuffer *const buffer = kernel->getStreamSetInputBuffer(i);
    782             const auto name = input.getName();
    783 
    784             auto processed = b->getProcessedItemCount(name);
    785             const auto p = producedItemCount.find(buffer);
    786             assert (p != producedItemCount.end());
    787             Value * const available = p->second;
    788 //            auto available = b->getAvailableItemCount(name);
    789             isFinal = b->CreateAnd(isFinal, b->CreateICmpEQ(processed, available));
    790 
    791 
    792 //            Value * const unprocessed = b->CreateSub(produced, processed);
    793         }
    794     }
    795 
    796 
    797     if (kernel->hasAttribute(kernel::Attribute::KindId::MustExplicitlyTerminate)) {
    798         isFinal = b->getTerminationSignal();
    799     } else {
    800         if (kernel->hasAttribute(kernel::Attribute::KindId::CanTerminateEarly)) {
    801             isFinal = b->CreateOr(isFinal, b->getTerminationSignal());
    802         }
    803         b->setTerminationSignal(isFinal);
    804     }
    805 
    806   //  b->CallPrintInt(kernel->getName() + "_finished", isFinal);
    807     final->addIncoming(isFinal, b->GetInsertBlock());
    808     b->CreateBr(kernelFinished);
    809 
    810     b->SetInsertPoint(kernelFinished);
    811 
    812     // update the consumed item counts
    813     for (unsigned i = 0; i < inputs.size(); ++i) {
    814         Value * const processed = b->getProcessedItemCount(inputs[i].getName());
    815       //  b->CallPrintInt("> " + kernel->getName() + "_" + inputs[i].getName() + "_processed", processed);
    816         Value * const consumed = b->CreateUMin(priorConsumedItemCount[i], processed);
    817         consumedItemCountPhi[i]->addIncoming(consumed, kernelFinished);
    818     }
    819     b->CreateBr(kernelExit);
    820 
    821     kernelExit->moveAfter(kernelFinished);
    822 
    823     b->SetInsertPoint(kernelExit);
    824     terminated->addIncoming(final, kernelFinished);
     748            b->setConsumedItemCount(output.getName(), produced);
     749        } else { // otherwise record how many items were produced
     750            assert (producedItemCount.count(buffer) == 0);
     751            producedItemCount.emplace(buffer, produced);
     752        }
     753
     754    }
     755
     756
     757    // TODO: if all consumers process the data at a fixed rate, we can just set the consumed item count
     758    // by the strideNo rather than tracking it.
    825759
    826760
     
    832766        const auto c = lastConsumer.find(buffer);
    833767        assert (c != lastConsumer.end());
    834         if (c->second == kernel) {
     768        if (LLVM_LIKELY(c->second == kernel)) {
    835769            Kernel * const producer = buffer->getProducer();
     770            assert (producer != kernel);
    836771            const auto & output = producer->getStreamOutput(buffer);
    837772            if (output.getRate().isRelative()) continue;
    838 
    839            // b->CallPrintInt("* " + producer->getName() + "_" + output.getName() + "_consumed", consumedItemCountPhi[i]);
    840 
    841773            b->setKernel(producer);
    842774            if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
     
    850782    }
    851783
     784    dependencyGraph[index] = isFinal;
     785}
     786
     787/** ------------------------------------------------------------------------------------------------------------- *
     788 * @brief checkAvailableInputData
     789 ** ------------------------------------------------------------------------------------------------------------- */
     790void PipelineGenerator::checkIfAllInputKernelsAreTerminated(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
     791    const auto n = in_degree(index, dependencyGraph);
     792    if (LLVM_UNLIKELY(n == 0)) {
     793        noMore = b->getFalse();
     794    } else {
     795        noMore = b->getTrue();
     796        for (auto e : make_iterator_range(in_edges(index, dependencyGraph))) {
     797            const auto u = source(e, dependencyGraph);
     798            Value * const finished = dependencyGraph[u];
     799            //b->CallPrintInt("* " + kernels[u]->getName() + "_hasFinished", finished);
     800            noMore = b->CreateAnd(noMore, finished);
     801        }
     802    }
     803}
     804
     805/** ------------------------------------------------------------------------------------------------------------- *
     806 * @brief checkAvailableInputData
     807 ** ------------------------------------------------------------------------------------------------------------- */
     808void PipelineGenerator::checkAvailableInputData(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
     809    const Kernel * const kernel = kernels[index];
     810    b->setKernel(kernel);
     811    for (auto e : make_iterator_range(in_edges(index, inputGraph))) {
     812        const Channel & c = inputGraph[e];
     813        const Binding & input = kernel->getStreamInput(c.operand);
     814
     815        Value * requiredInput = getStrideLength(b, kernel, input);
     816        if (LLVM_UNLIKELY(input.hasLookahead())) {
     817            Constant * const lookahead = b->getSize(input.getLookahead());
     818            requiredInput = b->CreateAdd(requiredInput, lookahead);
     819        }
     820        const auto p = producedItemCount.find(c.buffer);
     821        assert (p != producedItemCount.end());
     822        Value * const produced = p->second;
     823        Value * const processed = b->getNonDeferredProcessedItemCount(input);
     824        Value * const unprocessed = b->CreateSub(produced, processed);
     825        Value * const hasEnough = b->CreateICmpUGE(unprocessed, requiredInput);
     826        Value * const check = b->CreateOr(hasEnough, noMore);
     827        terminated->addIncoming(b->getFalse(), b->GetInsertBlock());
     828        if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
     829            madeProgress->addIncoming(anyProgress, b->GetInsertBlock());
     830        }
     831        BasicBlock * const hasSufficientInput = b->CreateBasicBlock(kernel->getName() + "_" + input.getName() + "_hasSufficientInput");
     832        b->CreateLikelyCondBr(check, hasSufficientInput, kernelFinished);
     833        b->SetInsertPoint(hasSufficientInput);
     834    }
     835}
     836
     837/** ------------------------------------------------------------------------------------------------------------- *
     838 * @brief checkAvailableOutputSpace
     839 ** ------------------------------------------------------------------------------------------------------------- */
     840void PipelineGenerator::checkAvailableOutputSpace(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
     841    const Kernel * const kernel = kernels[index];
     842    b->setKernel(kernel);
     843    for (auto e : make_iterator_range(in_edges(index, outputGraph))) {
     844        const Channel & c = outputGraph[e];
     845        assert (c.buffer->getProducer() == kernel);
     846        const Binding & output = kernel->getStreamOutput(c.buffer);
     847
     848        Value * requiredSpace = getStrideLength(b, kernel, output);
     849        const auto & name = output.getName();
     850        Value * const produced = b->getNonDeferredProducedItemCount(output);
     851        Value * const consumed = b->getConsumedItemCount(name);
     852        Value * const unconsumed = b->CreateSub(produced, consumed);
     853        requiredSpace = b->CreateAdd(requiredSpace, unconsumed);
     854        Value * const capacity = b->getBufferedSize(name);
     855        Value * const check = b->CreateICmpULE(requiredSpace, capacity);
     856        terminated->addIncoming(b->getFalse(), b->GetInsertBlock());
     857        if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
     858            madeProgress->addIncoming(anyProgress, b->GetInsertBlock());
     859        }
     860        BasicBlock * const hasOutputSpace = b->CreateBasicBlock(kernel->getName() + "_" + name + "_hasOutputSpace");
     861        b->CreateLikelyCondBr(check, hasOutputSpace, kernelFinished);
     862        b->SetInsertPoint(hasOutputSpace);
     863    }
     864}
     865
     866/** ------------------------------------------------------------------------------------------------------------- *
     867 * @brief getStrideLength
     868 ** ------------------------------------------------------------------------------------------------------------- */
     869Value * PipelineGenerator::getStrideLength(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel, const Binding & binding) {
     870    Value * strideLength = nullptr;
     871    const ProcessingRate & rate = binding.getRate();
     872    if (rate.isPopCount() || rate.isNegatedPopCount()) {
     873        Port refPort; unsigned refIndex;
     874        std::tie(refPort, refIndex) = kernel->getStreamPort(rate.getReference());
     875        const Binding & ref = kernel->getStreamInput(refIndex);
     876        Value * markers = b->loadInputStreamBlock(ref.getName(), b->getSize(0));
     877        if (rate.isNegatedPopCount()) {
     878            markers = b->CreateNot(markers);
     879        }
     880        strideLength = b->bitblock_popcount(markers);
     881    } else if (binding.hasAttribute(kernel::Attribute::KindId::AlwaysConsume)) {
     882        const auto lb = kernel->getLowerBound(rate);
     883        strideLength = b->getSize(ceiling(lb * kernel->getStride()));
     884    } else {
     885        const auto ub = kernel->getUpperBound(rate); assert (ub > 0);
     886        strideLength = b->getSize(ceiling(ub * kernel->getStride()));
     887    }
     888    return strideLength;
     889}
     890
     891/** ------------------------------------------------------------------------------------------------------------- *
     892 * @brief callKernel
     893 ** ------------------------------------------------------------------------------------------------------------- */
     894Value * PipelineGenerator::callKernel(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
     895
     896    const Kernel * const kernel = kernels[index];
     897    b->setKernel(kernel);
     898
     899    #ifndef DISABLE_COPY_TO_OVERFLOW
     900    // Store how many items we produced by this kernel in the prior iteration. We'll use this to determine when
     901    // to mirror the first K segments
    852902    const auto & outputs = kernel->getStreamOutputs();
    853     for (unsigned i = 0; i < outputs.size(); ++i) {
    854         Value * const produced = b->getProducedItemCount(outputs[i].getName());
    855 
    856        // b->CallPrintInt("> " + kernel->getName() + "_" + outputs[i].getName() + "_produced", produced);
    857 
    858         const StreamSetBuffer * const buffer = kernel->getStreamSetOutputBuffer(i);
    859         assert (producedItemCount.count(buffer) == 0);
    860         producedItemCount.emplace(buffer, produced);
    861     }
    862 
    863     return terminated;
    864 }
    865 
     903    const auto m = outputs.size();
     904
     905    std::vector<Value *> initiallyProducedItemCount(m, nullptr);
     906    for (unsigned i = 0; i < m; ++i) {
     907        const Binding & output = outputs[i];
     908        const auto & name = output.getName();
     909        const StreamSetBuffer * const buffer = kernel->getOutputStreamSetBuffer(name);
     910        if (isConsumedAtNonFixedRate.count(buffer)) {
     911            initiallyProducedItemCount[i] = b->getProducedItemCount(name);
     912        }
     913    }
     914    #endif
     915
     916    const auto & inputs = kernel->getStreamInputs();
     917    const auto n = inputs.size();
     918    std::vector<Value *> arguments(n + 2);
     919
     920    Value * isFinal = noMore;
     921    for (unsigned i = 0; i < n; ++i) {
     922        const Binding & input = inputs[i];
     923        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
     924
     925        const auto p = producedItemCount.find(buffer);
     926        assert (p != producedItemCount.end());
     927        Value * const produced = p->second;
     928
     929        const ProcessingRate & rate = input.getRate();
     930        if (rate.isPopCount()) {
     931            arguments[i + 2] = produced;
     932        } else {
     933            const unsigned strideSize = ceiling(kernel->getUpperBound(rate) * kernel->getStride());
     934            Value * const processed = b->getNonDeferredProcessedItemCount(input);
     935            Value * const limit = b->CreateAdd(processed, b->getSize(strideSize * codegen::SegmentSize));
     936            Value * const partial = b->CreateICmpULT(produced, limit);
     937            arguments[i + 2] = b->CreateSelect(partial, produced, limit);
     938            isFinal = b->CreateAnd(isFinal, partial);
     939        }
     940    }
     941
     942    // TODO: pass in a strideNo for fixed rate streams to allow the kernel to calculate the current avail,
     943    // processed, and produced counts
     944
     945    arguments[0] = kernel->getInstance();
     946    arguments[1] = isFinal;
     947
     948    b->createDoSegmentCall(arguments);
     949
     950    #ifndef DISABLE_COPY_TO_OVERFLOW
     951    // For each buffer with an overflow region of K blocks, overwrite the overflow with the first K blocks of
     952    // data to ensure that even if this stream is produced at a fixed rate but consumed at a bounded rate,
     953    // every kernel has a consistent view of the stream data.
     954    for (unsigned i = 0; i < m; ++i) {
     955        const Binding & output = outputs[i];
     956        const auto & name = output.getName();
     957        if (initiallyProducedItemCount[i]) {
     958            Value * const bufferSize = b->getBufferedSize(name);
     959            Value * const prior = initiallyProducedItemCount[i];
     960            Value * const offset = b->CreateURem(prior, bufferSize);
     961            Value * const produced = b->getNonDeferredProducedItemCount(output);
     962            Value * const buffered = b->CreateAdd(offset, b->CreateSub(produced, prior));
     963            BasicBlock * const copyBack = b->CreateBasicBlock(name + "MirrorOverflow");
     964            BasicBlock * const done = b->CreateBasicBlock(name + "MirrorOverflowDone");
     965            b->CreateCondBr(b->CreateICmpUGT(buffered, bufferSize), copyBack, done);
     966            b->SetInsertPoint(copyBack);
     967            b->CreateCopyToOverflow(name);
     968            b->CreateBr(done);
     969            b->SetInsertPoint(done);
     970        }
     971    }
     972    #endif
     973
     974    return b->getTerminationSignal();
     975}
    866976
    867977/** ------------------------------------------------------------------------------------------------------------- *
    868978 * @brief applyOutputBufferExpansions
    869979 ** ------------------------------------------------------------------------------------------------------------- */
    870 void PipelineGenerator::applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * k) {
    871     const auto & outputs = k->getStreamSetOutputBuffers();
     980void PipelineGenerator::applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
     981    const Kernel * const kernel = kernels[index];
     982    const auto & outputs = kernel->getStreamSetOutputBuffers();
    872983    for (unsigned i = 0; i < outputs.size(); i++) {
    873984        if (isa<DynamicBuffer>(outputs[i])) {
    874             const auto baseSize = ceiling(k->getUpperBound(k->getStreamOutput(i).getRate()) * k->getStride() * codegen::SegmentSize);
     985
     986
     987            const auto baseSize = ceiling(kernel->getUpperBound(kernel->getStreamOutput(i).getRate()) * kernel->getStride() * codegen::SegmentSize);
    875988            if (LLVM_LIKELY(baseSize > 0)) {
    876989
    877                 const auto & name = k->getStreamOutput(i).getName();
     990                const auto & name = kernel->getStreamOutput(i).getName();
    878991
    879992                BasicBlock * const doExpand = b->CreateBasicBlock(name + "Expand");
     
    9021015}
    9031016
     1017
     1018/** ------------------------------------------------------------------------------------------------------------- *
     1019 * @brief getFullyProcessedItemCount
     1020 ** ------------------------------------------------------------------------------------------------------------- */
     1021inline Value * PipelineGenerator::getFullyProcessedItemCount(const std::unique_ptr<KernelBuilder> & b, const Binding & input, Value * const isFinal) const {
     1022    Value * const processed = b->getProcessedItemCount(input.getName());
     1023    if (LLVM_UNLIKELY(input.hasAttribute(kernel::Attribute::KindId::BlockSize))) {
     1024        // If the input rate has a block size attribute then --- for the purpose of determining how many
     1025        // items have been consumed --- we consider a stream set to be fully processed when an entire
     1026        // block (stride?) has been processed.
     1027        Constant * const BLOCK_WIDTH = b->getSize(b->getBitBlockWidth());
     1028        Value * const partial = b->CreateAnd(processed, ConstantExpr::getNeg(BLOCK_WIDTH));
     1029        return b->CreateSelect(isFinal, processed, partial);
     1030    }
     1031    return processed;
     1032}
     1033
     1034/** ------------------------------------------------------------------------------------------------------------- *
     1035 * @brief finalize
     1036 ** ------------------------------------------------------------------------------------------------------------- */
     1037Value * PipelineGenerator::finalize(const std::unique_ptr<KernelBuilder> & b) {
     1038    if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
     1039        ConstantInt * const ZERO = b->getSize(0);
     1040        ConstantInt * const ONE = b->getSize(1);
     1041        ConstantInt * const TWO = b->getSize(2);
     1042        Value * const count = b->CreateSelect(anyProgress, ZERO, b->CreateAdd(deadLockCounter, ONE));
     1043        b->CreateAssert(b->CreateICmpNE(count, TWO), "Dead lock detected: pipeline could not progress after two iterations");
     1044        deadLockCounter->addIncoming(count, b->GetInsertBlock());
     1045    }
     1046    // return whether each sink has terminated
     1047    Value * final = b->getTrue();
     1048    for (const auto u : make_iterator_range(vertices(dependencyGraph))) {
     1049        if (out_degree(u, dependencyGraph) == 0) {
     1050            final = b->CreateAnd(final, dependencyGraph[u]);
     1051        }
     1052    }
     1053    return final;
     1054
     1055}
     1056
     1057/** ------------------------------------------------------------------------------------------------------------- *
     1058 * @brief printGraph
     1059 ** ------------------------------------------------------------------------------------------------------------- */
     1060void PipelineGenerator::printGraph(const ChannelGraph & G) const {
     1061
     1062    auto & out = errs();
     1063
     1064    out << "digraph G {\n";
     1065    for (auto u : make_iterator_range(vertices(G))) {
     1066        assert (G[u] == kernels[u]);
     1067        if (in_degree(u, G) > 0 || out_degree(u, G) > 0) {
     1068            out << "v" << u << " [label=\"" << u << ": "
     1069                << G[u]->getName() << "\"];\n";
     1070        }
     1071    }
     1072
     1073    for (auto e : make_iterator_range(edges(G))) {
     1074        const Channel & c = G[e];
     1075        const auto s = source(e, G);
     1076        const auto t = target(e, G);
     1077
     1078        out << "v" << s << " -> v" << t
     1079            << " [label=\""
     1080
     1081
     1082
     1083            << c.ratio.numerator() << " / " << c.ratio.denominator()
     1084            << "\"];\n";
     1085    }
     1086
     1087    out << "}\n\n";
     1088    out.flush();
     1089
     1090}
     1091
     1092/** ------------------------------------------------------------------------------------------------------------- *
     1093 * @brief printGraph
     1094 ** ------------------------------------------------------------------------------------------------------------- */
     1095void PipelineGenerator::printGraph(const DependencyGraph & G) const {
     1096
     1097    auto & out = errs();
     1098
     1099    out << "digraph G {\n";
     1100    for (auto u : make_iterator_range(vertices(G))) {
     1101            out << "v" << u << " [label=\"" << u << ": "
     1102                << kernels[u]->getName() << "\"];\n";
     1103    }
     1104
     1105    for (auto e : make_iterator_range(edges(G))) {
     1106        const auto s = source(e, G);
     1107        const auto t = target(e, G);
     1108        out << "v" << s << " -> v" << t << ";\n";
     1109    }
     1110
     1111    out << "}\n\n";
     1112    out.flush();
     1113
     1114}
Note: See TracChangeset for help on using the changeset viewer.