Ignore:
Timestamp:
Feb 2, 2018, 2:49:08 PM (19 months ago)
Author:
nmedfort
Message:

Revised pipeline structure to better control I/O rates

Location:
icGREP/icgrep-devel/icgrep/toolchain
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • icGREP/icgrep-devel/icgrep/toolchain/cpudriver.cpp

    r5841 r5856  
    9191
    9292void ParabixDriver::makeKernelCall(Kernel * kernel, const std::vector<StreamSetBuffer *> & inputs, const std::vector<StreamSetBuffer *> & outputs) {
    93     assert ("addKernelCall or makeKernelCall was already run on this kernel." && (kernel->getModule() == nullptr));
     93    assert ("makeKernelCall was already run on this kernel." && (kernel->getModule() == nullptr));
    9494    mPipeline.emplace_back(kernel);
    9595    kernel->bindPorts(inputs, outputs);
  • icGREP/icgrep-devel/icgrep/toolchain/driver.h

    r5755 r5856  
    4141    }
    4242
    43     void addKernelCall(kernel::Kernel & kb, const std::vector<parabix::StreamSetBuffer *> & inputs, const std::vector<parabix::StreamSetBuffer *> & outputs) {
    44         return makeKernelCall(&kb, inputs, outputs);
    45     }
    46 
    4743    virtual void makeKernelCall(kernel::Kernel * kb, const std::vector<parabix::StreamSetBuffer *> & inputs, const std::vector<parabix::StreamSetBuffer *> & outputs) = 0;
    4844
  • icGREP/icgrep-devel/icgrep/toolchain/grep_pipeline.cpp

    r5836 r5856  
    5858   
    5959    StreamSetBuffer * ByteStream = pxDriver.addBuffer<SourceBuffer>(idb, idb->getStreamSetTy(1, 8));
    60     kernel::Kernel * sourceK = pxDriver.addKernelInstance<kernel::MemorySourceKernel>(idb, idb->getInt8PtrTy(), segmentSize);
     60    kernel::Kernel * sourceK = pxDriver.addKernelInstance<kernel::MemorySourceKernel>(idb, idb->getInt8PtrTy());
    6161    sourceK->setInitialArguments({buffer, length});
    6262    pxDriver.makeKernelCall(sourceK, {}, {ByteStream});
  • icGREP/icgrep-devel/icgrep/toolchain/pipeline.cpp

    r5845 r5856  
    1111#include <boost/container/flat_set.hpp>
    1212#include <boost/container/flat_map.hpp>
     13#include <boost/graph/adjacency_list.hpp>
    1314#include <kernels/kernel_builder.h>
     15
     16#include <llvm/Support/raw_ostream.h>
    1417
    1518using namespace kernel;
    1619using namespace parabix;
    1720using namespace llvm;
     21using namespace boost;
     22using namespace boost::container;
    1823
    1924using Port = Kernel::Port;
    20 
    21 template <typename Value>
    22 using StreamSetBufferMap = boost::container::flat_map<const StreamSetBuffer *, Value>;
    23 
    24 template <typename Value>
    25 using FlatSet = boost::container::flat_set<Value>;
    2625
    2726Function * makeThreadFunction(const std::unique_ptr<kernel::KernelBuilder> & b, const std::string & name) {
     
    3231}
    3332
    34 void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel);
    35 
    36 void handleInsufficientData(const std::unique_ptr<KernelBuilder> & b, Value * const produced, Value * const final, BasicBlock * const entry, const Kernel * const consumer,  const Binding & input, const StreamSetBuffer * const buffer);
    37 
    38 bool requiresCopyBack(const Kernel * k, const ProcessingRate & rate);
     33struct PipelineGenerator {
     34
     35    template <typename Value>
     36    using StreamSetBufferMap = flat_map<const StreamSetBuffer *, Value>;
     37
     38    using RateValue = ProcessingRate::RateValue;
     39
     40    struct Channel {
     41        Channel() = default;
     42        Channel(const RateValue & rate, const StreamSetBuffer * const buffer)
     43        : rate(rate), buffer(buffer) { }
     44
     45        RateValue               rate;
     46        const StreamSetBuffer * buffer;
     47    };
     48
     49    using Graph = adjacency_list<vecS, vecS, bidirectionalS, const Kernel *, Channel, vecS>;
     50
     51    using Map = flat_map<const Kernel *, Graph::vertex_descriptor>;
     52
     53    void initialize(const std::vector<Kernel *> & kernels);
     54
     55    Value * executeKernel(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel, PHINode * const segNo, Value * const finished);
     56
     57    void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel);
     58
     59    void updateProducedAndConsumedCounts(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel);
     60
     61private:
     62
     63    Graph   G;
     64    Map     M;
     65
     66    StreamSetBufferMap<Value *>         producedItemCount;
     67    StreamSetBufferMap<Value *>         consumedItemCount;
     68    StreamSetBufferMap<const Kernel *>  lastConsumer;
     69};
    3970
    4071/** ------------------------------------------------------------------------------------------------------------- *
     
    87118    Value * const segOffset = b->CreateLoad(b->CreateGEP(threadStruct, {b->getInt32(0), b->getInt32(1)}));
    88119
    89     BasicBlock * const segmentLoop = BasicBlock::Create(b->getContext(), "segmentLoop", threadFunc);
     120    PipelineGenerator G;
     121
     122    BasicBlock * const segmentLoop = b->CreateBasicBlock("segmentLoop");
    90123    b->CreateBr(segmentLoop);
    91124
    92125    b->SetInsertPoint(segmentLoop);
     126    G.initialize(kernels);
    93127    PHINode * const segNo = b->CreatePHI(b->getSizeTy(), 2, "segNo");
    94128    segNo->addIncoming(segOffset, entryBlock);
    95 
    96     BasicBlock * const exitThreadBlock = BasicBlock::Create(b->getContext(), "exitThread", threadFunc);
    97 
    98     StreamSetBufferMap<Value *> producedItemCount;
    99     StreamSetBufferMap<Value *> consumedItemCount;
    100     StreamSetBufferMap<Kernel *> lastUsedKernel;
     129    Value * finished = nullptr;
    101130
    102131    Value * cycleCountStart = nullptr;
     
    106135    }
    107136
    108     Value * terminated = nullptr;
    109 
    110137    const bool serialize = codegen::DebugOptionIsSet(codegen::SerializeThreads);
    111138
    112     for (Kernel * const kernel : kernels) {
    113         const auto & inputs = kernel->getStreamInputs();
    114         for (unsigned i = 0; i < inputs.size(); ++i) {
    115             const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
    116             auto f = lastUsedKernel.find(buffer);
    117             if (f == lastUsedKernel.end()) {
    118                 lastUsedKernel.emplace(buffer, kernel);
    119             } else {
    120                 f->second = kernel;
    121             }
    122         }
    123     }
    124 
    125139    for (unsigned k = 0; k < n; ++k) {
    126140
    127         const auto & kernel = kernels[k];
    128 
    129         BasicBlock * const kernelWait = BasicBlock::Create(b->getContext(), kernel->getName() + "Wait", threadFunc);
    130 
     141        const Kernel * const kernel = kernels[k];
     142
     143        BasicBlock * const kernelWait = b->CreateBasicBlock(kernel->getName() + "Wait");
    131144        b->CreateBr(kernelWait);
    132145
    133         BasicBlock * const kernelCheck = BasicBlock::Create(b->getContext(), kernel->getName() + "Check", threadFunc);
    134 
    135         BasicBlock * const kernelBody = BasicBlock::Create(b->getContext(), kernel->getName() + "Do", threadFunc);
    136 
    137         BasicBlock * const kernelEnd = BasicBlock::Create(b->getContext(), kernel->getName() + "End", threadFunc);
    138 
    139146        b->SetInsertPoint(kernelWait);
    140 
    141147        b->setKernel(kernels[serialize ? (n - 1) : k]);
    142148        Value * const processedSegmentCount = b->acquireLogicalSegmentNo();
    143149        b->setKernel(kernel);
    144 
    145150        assert (processedSegmentCount->getType() == segNo->getType());
    146         Value * const ready = b->CreateICmpEQ(segNo, processedSegmentCount);       
     151        Value * const ready = b->CreateICmpEQ(segNo, processedSegmentCount);
     152
     153        BasicBlock * const kernelCheck = b->CreateBasicBlock(kernel->getName() + "Check");
    147154        b->CreateCondBr(ready, kernelCheck, kernelWait);
    148155
    149156        b->SetInsertPoint(kernelCheck);
    150         b->CreateUnlikelyCondBr(b->getTerminationSignal(), kernelEnd, kernelBody);
    151 
    152         // Execute the kernel segment
    153         b->SetInsertPoint(kernelBody);
    154         const auto & inputs = kernel->getStreamInputs();
    155         Value * const isFinal = b->CreateOr(terminated ? terminated : b->getFalse(), b->getTerminationSignal());
    156         std::vector<Value *> args = {kernel->getInstance(), isFinal};
    157         for (unsigned i = 0; i < inputs.size(); ++i) {
    158             const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
    159             const auto f = producedItemCount.find(buffer);
    160             assert (f != producedItemCount.end());
    161             Value * const produced = f->second;
    162             args.push_back(produced);
    163             handleInsufficientData(b, produced, isFinal, kernelEnd, kernel, inputs[i], buffer);
    164         }
    165 
    166         b->setKernel(kernel);
    167         b->createDoSegmentCall(args);
    168         b->CreateBr(kernelEnd);
    169 
    170         b->SetInsertPoint(kernelEnd);
    171 
    172         Value * const finished = b->getTerminationSignal();
    173         if (terminated) { // all kernels must terminate
    174             terminated = b->CreateAnd(terminated, finished);
    175         } else {
    176             terminated = finished;
    177         }
    178 
    179         const auto & outputs = kernel->getStreamOutputs();
    180         for (unsigned i = 0; i < outputs.size(); ++i) {           
    181             Value * const produced = b->getProducedItemCount(outputs[i].getName());
    182             const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
    183             assert (producedItemCount.count(buf) == 0);
    184             producedItemCount.emplace(buf, produced);
    185         }
    186         for (unsigned i = 0; i < inputs.size(); ++i) {
    187             Value * const processedItemCount = b->getProcessedItemCount(inputs[i].getName());
    188             const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);           
    189             auto f = consumedItemCount.find(buf);
    190             if (f == consumedItemCount.end()) {
    191                 consumedItemCount.emplace(buf, processedItemCount);
    192             } else {
    193                 assert (f->second);
    194                 f->second = b->CreateUMin(processedItemCount, f->second);
    195             }
    196         }
    197 
    198         for (auto i = lastUsedKernel.begin(); i != lastUsedKernel.end(); i++) {
    199             if (i->second == kernel) {
    200                 const StreamSetBuffer * const buffer = i->first;
    201                 Kernel * const producerKernel = buffer->getProducer();
    202                 const auto & binding = producerKernel->getStreamOutput(buffer);
    203                 if (LLVM_UNLIKELY(binding.getRate().isDerived())) {
    204                     continue;
    205                 }
    206                 auto f = consumedItemCount.find(buffer);
    207                 if (f != consumedItemCount.end()) {
    208                     const Kernel* tempKernel = b->getKernel();
    209                     b->setKernel(producerKernel);
    210                     b->setConsumedItemCount(binding.getName(), f->second);
    211                     b->setKernel(tempKernel);
    212                 }
    213             }
    214         }
    215 
     157
     158        finished = G.executeKernel(b, kernel, segNo, finished);
    216159
    217160        if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
     
    225168    }
    226169
    227     exitThreadBlock->moveAfter(b->GetInsertBlock());
    228 
    229170    segNo->addIncoming(b->CreateAdd(segNo, b->getSize(codegen::ThreadNum)), b->GetInsertBlock());
    230171
    231     b->CreateUnlikelyCondBr(terminated, exitThreadBlock, segmentLoop);
    232 
    233     b->SetInsertPoint(exitThreadBlock);
     172    BasicBlock * const segmentExit = b->CreateBasicBlock("segmentExit");
     173    b->CreateUnlikelyCondBr(finished, segmentExit, segmentLoop);
     174
     175    b->SetInsertPoint(segmentExit);
    234176
    235177    // only call pthread_exit() within spawned threads; otherwise it'll be equivalent to calling exit() within the process
    236     BasicBlock * const exitThread = BasicBlock::Create(b->getContext(), "ExitThread", threadFunc);
    237     BasicBlock * const exitFunction = BasicBlock::Create(b->getContext(), "ExitProcessFunction", threadFunc);
    238 
    239     Value * const exitCond = b->CreateICmpEQ(segOffset, ConstantInt::getNullValue(segOffset->getType()));
    240     b->CreateCondBr(exitCond, exitFunction, exitThread);
     178    BasicBlock * const exitThread = b->CreateBasicBlock("ExitThread");
     179    BasicBlock * const exitFunction = b->CreateBasicBlock("ExitProcessFunction");
     180
     181    b->CreateCondBr(b->CreateIsNull(segOffset), exitFunction, exitThread);
    241182    b->SetInsertPoint(exitThread);
    242183    b->CreatePThreadExitCall(nullVoidPtrVal);
     
    317258}
    318259
     260
    319261/** ------------------------------------------------------------------------------------------------------------- *
    320262 * @brief generatePipelineLoop
     
    322264void generatePipelineLoop(const std::unique_ptr<KernelBuilder> & b, const std::vector<Kernel *> & kernels) {
    323265
    324     BasicBlock * entryBlock = b->GetInsertBlock();
    325     Function * main = entryBlock->getParent();
    326 
    327266    // Create the basic blocks for the loop.
    328     BasicBlock * const pipelineLoop = BasicBlock::Create(b->getContext(), "pipelineLoop", main);
    329     BasicBlock * const pipelineExit = BasicBlock::Create(b->getContext(), "pipelineExit", main);
    330 
    331     StreamSetBufferMap<Value *> producedItemCount;
    332     StreamSetBufferMap<Value *> consumedItemCount;
    333     StreamSetBufferMap<Kernel *> lastUsedKernel;
     267    BasicBlock * const entryBlock = b->GetInsertBlock();
     268    BasicBlock * const pipelineLoop = b->CreateBasicBlock("pipelineLoop");
     269    BasicBlock * const pipelineExit = b->CreateBasicBlock("pipelineExit");
     270
     271    PipelineGenerator G;
    334272
    335273    b->CreateBr(pipelineLoop);
     274
    336275    b->SetInsertPoint(pipelineLoop);
    337    
     276    G.initialize(kernels);
     277    PHINode * const segNo = b->CreatePHI(b->getSizeTy(), 2, "segNo");
     278    segNo->addIncoming(b->getSize(0), entryBlock);
     279    Value * finished = nullptr;
     280
    338281    Value * cycleCountStart = nullptr;
    339282    Value * cycleCountEnd = nullptr;
     
    341284        cycleCountStart = b->CreateReadCycleCounter();
    342285    }
    343     Value * terminated = nullptr;
    344286
    345287    for (Kernel * const kernel : kernels) {
    346         const auto & inputs = kernel->getStreamInputs();
    347         for (unsigned i = 0; i < inputs.size(); ++i) {
    348             const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
    349             auto f = lastUsedKernel.find(buffer);
    350             if (f == lastUsedKernel.end()) {
    351                 lastUsedKernel.emplace(buffer, kernel);
    352             } else {
    353                 f->second = kernel;
    354             }
    355         }
    356     }
    357 
    358     for (Kernel * const kernel : kernels) {
    359288
    360289        b->setKernel(kernel);
    361290
    362         BasicBlock * const entry = b->GetInsertBlock();
    363         BasicBlock * const kernelCode = BasicBlock::Create(b->getContext(), kernel->getName(), main);
    364         BasicBlock * const kernelExit = BasicBlock::Create(b->getContext(), kernel->getName() + "_exit", main);
    365 
    366         b->CreateUnlikelyCondBr(b->getTerminationSignal(), kernelExit, kernelCode);
    367 
    368         b->SetInsertPoint(kernelCode);
    369         const auto & inputs = kernel->getStreamInputs();
    370         const auto & outputs = kernel->getStreamOutputs();
    371 
    372         Value * const isFinal = terminated ? terminated : b->getFalse();
    373 
    374         std::vector<Value *> args = {kernel->getInstance(), isFinal};
    375 
    376         const auto name = kernel->getName();
    377         for (unsigned i = 0; i < inputs.size(); ++i) {
    378             const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
    379             const auto f = producedItemCount.find(buffer);
    380             if (LLVM_UNLIKELY(f == producedItemCount.end())) {
    381                 report_fatal_error(kernel->getName() + " uses stream set " + inputs[i].getName() + " prior to its definition");
    382             }
    383             Value * const produced = f->second;
    384             args.push_back(produced);
    385             handleInsufficientData(b, produced, isFinal, pipelineLoop, kernel, inputs[i], buffer);
    386         }
    387 
    388         applyOutputBufferExpansions(b, kernel);
    389 
    390         b->createDoSegmentCall(args);
    391 
    392         BasicBlock * const kernelFinished = b->GetInsertBlock();
    393         Value * const finished = b->getTerminationSignal();
    394         b->CreateBr(kernelExit);
    395 
    396         b->SetInsertPoint(kernelExit);
    397         PHINode * const finishedPhi = b->CreatePHI(b->getInt1Ty(), 2);
    398         finishedPhi->addIncoming(b->getTrue(), entry);
    399         finishedPhi->addIncoming(finished, kernelFinished);
    400         if (terminated) { // All kernels must agree that we've terminated.
    401             terminated = b->CreateAnd(terminated, finishedPhi);
    402         } else {
    403             terminated = finishedPhi;
    404         }
    405 
    406         for (unsigned i = 0; i < outputs.size(); ++i) {
    407             Value * const produced = b->getProducedItemCount(outputs[i].getName());
    408             const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
    409             assert (producedItemCount.count(buf) == 0);
    410             producedItemCount.emplace(buf, produced);
    411         }
    412 
    413         for (unsigned i = 0; i < inputs.size(); ++i) {
    414             Value * const processed = b->getProcessedItemCount(inputs[i].getName());
    415             const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i);
    416             auto f = consumedItemCount.find(buf);
    417             if (f == consumedItemCount.end()) {
    418                 consumedItemCount.emplace(buf, processed);
    419             } else {
    420                 f->second = b->CreateUMin(processed, f->second);
    421             }
    422         }
    423 
    424         for (auto i = lastUsedKernel.begin(); i != lastUsedKernel.end(); i++) {
    425             if (i->second == kernel) {
    426                 const StreamSetBuffer * const buffer = i->first;
    427                 Kernel * const producerKernel = buffer->getProducer();
    428                 const auto & binding = producerKernel->getStreamOutput(buffer);
    429                 if (LLVM_UNLIKELY(binding.getRate().isDerived())) {
    430                     continue;
    431                 }
    432                 auto f = consumedItemCount.find(buffer);
    433                 if (f != consumedItemCount.end()) {
    434                     const Kernel* tempKernel = b->getKernel();
    435                     b->setKernel(producerKernel);
    436                     b->setConsumedItemCount(binding.getName(), f->second);
    437                     b->setKernel(tempKernel);
    438                 }
    439             }
    440         }
     291        finished = G.executeKernel(b, kernel, segNo, finished);
    441292
    442293        if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
     
    446297            cycleCountStart = cycleCountEnd;
    447298        }
    448 //        Value * const segNo = b->acquireLogicalSegmentNo();
    449 //        Value * nextSegNo = b->CreateAdd(segNo, b->getSize(1));
    450 //        b->releaseLogicalSegmentNo(nextSegNo);
    451     }
    452 
    453     b->CreateCondBr(terminated, pipelineExit, pipelineLoop);
     299    }
     300
     301    segNo->addIncoming(b->CreateAdd(segNo, b->getSize(1)), b->GetInsertBlock());
     302    b->CreateCondBr(finished, pipelineExit, pipelineLoop);
    454303
    455304    pipelineExit->moveAfter(b->GetInsertBlock());
     
    481330
    482331/** ------------------------------------------------------------------------------------------------------------- *
     332 * @brief initialize
     333 ** ------------------------------------------------------------------------------------------------------------- */
     334void PipelineGenerator::initialize(const std::vector<Kernel *> & kernels) {
     335
     336    // Our goal when building G is *not* to model the dataflow of our program but instead to
     337    // detetermine the minimum number of sufficient data tests needed to ensure each kernel has
     338    // enough data to progress.
     339
     340    // For example, suppose we have kernels A, B and C, and that B has a fixed input and fixed
     341    // output rate. C also has a fixed input rate but A does *not* have a fixed output rate.
     342    // C must test whether it has enough input from B as B is not guaranteed to have enough
     343    // input from A. Moreover if C is depedent on B, C could be skipped entirely.
     344
     345    // Note: we cannot simply test the output of A for both B and C. In a our data-parallel
     346    // pipeline A's state may change by the time we process C.
     347
     348    for (const Kernel * const consumer : kernels) {
     349        const auto v = add_vertex(consumer, G);
     350        M.emplace(consumer, v);
     351        const auto & inputs = consumer->getStreamInputs();
     352        for (unsigned i = 0; i < inputs.size(); ++i) {
     353
     354            const auto buffer = consumer->getStreamSetInputBuffer(i);
     355            const Kernel * const producer = buffer->getProducer();
     356            const Binding & output = producer->getStreamOutput(buffer);
     357            if (output.getRate().isRelative()) continue;
     358
     359            const Binding & input = inputs[i];
     360            auto ub_in = consumer->getUpperBound(input.getRate()) * consumer->getStride();
     361            if (input.hasLookahead()) {
     362                ub_in += input.getLookahead();
     363            }
     364
     365            const auto lb_out = producer->getLowerBound(output.getRate()) * producer->getStride();
     366
     367            const auto rate = lb_out / ub_in;
     368            const auto f = M.find(producer); assert (f != M.end());
     369            const auto u = f->second;
     370            // If we have multiple inputs from the same kernel, we only need to consider the "slowest" one
     371            bool slowest = true;
     372            if (lb_out > 0) {
     373                for (const auto e : make_iterator_range(in_edges(v, G))) {
     374                    if (source(e, G) == u) {
     375                        Channel & p = G[e];
     376                        slowest = false;
     377                        if (rate < p.rate) {
     378                            p.rate = rate;
     379                            p.buffer = buffer;
     380                        }
     381                        break;
     382                    }
     383                }
     384            }
     385            if (slowest) {
     386                add_edge(u, v, Channel{rate, buffer}, G);
     387            }
     388        }
     389    }
     390
     391    // Take a transitive closure of G but whenever we attempt to insert an edge into the closure
     392    // that already exists, check instead whether the rate of our proposed edge is <= the existing
     393    // edge's rate. If so, the data availability is transitively guaranteed.
     394    for (const auto u : make_iterator_range(vertices(G))) {
     395        for (auto ei : make_iterator_range(in_edges(u, G))) {
     396            const auto v = source(ei, G);
     397            const Channel & pu = G[ei];           
     398            for (auto ej : make_iterator_range(out_edges(u, G))) {               
     399                const auto w = target(ej, G);
     400                const auto ratio = RateValue(G[u]->getStride(), G[w]->getStride());
     401                const auto rate = pu.rate * ratio;
     402                bool insert = true;
     403                for (auto ek : make_iterator_range(in_edges(w, G))) {
     404                    if (source(ek, G) == v) {
     405                        Channel & pw = G[ek];
     406                        if (rate <= pw.rate && pw.rate > 0) {
     407                            pw.buffer = nullptr;
     408                        }
     409                        insert = false;
     410                        break;
     411                    }
     412                }
     413                if (insert) {
     414                    add_edge(v, w, Channel{rate, nullptr}, G);
     415                }
     416            }
     417        }
     418    }
     419
     420    // remove any closure edges from G
     421    remove_edge_if([&](const Graph::edge_descriptor e) { return G[e].buffer == nullptr; }, G);
     422
     423    // If a kernel has no 'necessary to check' inputs then we can remove every output with a rate >= 1 from G
     424    for (const auto u : make_iterator_range(vertices(G))) {
     425        if (in_degree(u, G) == 0) {
     426            remove_out_edge_if(u, [&](const Graph::edge_descriptor e) { return G[e].rate >= RateValue{1, 1}; }, G);
     427        }
     428    }
     429
     430    // iterate through each kernel in order and determine which kernel last used a particular buffer
     431    for (Kernel * const kernel : kernels) {
     432        const auto & inputs = kernel->getStreamInputs();
     433        for (unsigned i = 0; i < inputs.size(); ++i) {
     434            lastConsumer[kernel->getStreamSetInputBuffer(i)] = kernel;
     435        }
     436    }
     437
     438}
     439
     440/** ------------------------------------------------------------------------------------------------------------- *
     441 * @brief executeKernel
     442 ** ------------------------------------------------------------------------------------------------------------- */
     443Value *PipelineGenerator::executeKernel(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel, PHINode * const segNo, Value * const finished) {
     444
     445    const auto & inputs = kernel->getStreamInputs();
     446
     447    std::vector<Value *> args(2 + inputs.size());
     448
     449    const auto f = M.find(kernel); assert (f != M.end());
     450    const auto u = f->second;
     451
     452    BasicBlock * const kernelEntry = b->GetInsertBlock();
     453    BasicBlock * const kernelCode = b->CreateBasicBlock(kernel->getName());
     454    BasicBlock * const kernelExit = b->CreateBasicBlock(kernel->getName() + "_exit");
     455
     456    b->CreateUnlikelyCondBr(b->getTerminationSignal(), kernelExit, kernelCode);
     457
     458    b->SetInsertPoint(kernelExit);
     459    PHINode * const terminated = b->CreatePHI(b->getInt1Ty(), 2);
     460    // Since our initial "isFinal" state is equal to what the first kernel's termination signal state
     461    terminated->addIncoming(finished ? finished : b->getTrue(), kernelEntry);
     462    Value * isFinal = finished ? finished : b->getFalse();
     463
     464    b->SetInsertPoint(kernelCode);
     465    for (unsigned i = 0; i < inputs.size(); ++i) {
     466
     467        const Binding & input = inputs[i];
     468
     469        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
     470
     471        const auto name = input.getName();
     472
     473        const auto p = producedItemCount.find(buffer);
     474        if (LLVM_UNLIKELY(p == producedItemCount.end())) {
     475            report_fatal_error(kernel->getName() + " uses stream set " + name + " prior to its definition");
     476        }
     477        Value * const produced = p->second;
     478        const auto ub = kernel->getUpperBound(input.getRate()); assert (ub > 0);
     479        const auto strideLength = ceiling(ub * kernel->getStride()) ;
     480        Constant * const segmentLength = b->getSize(strideLength * codegen::SegmentSize);
     481
     482        if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
     483            b->CreateAssert(b->CreateICmpULE(segmentLength, b->getCapacity(name)),
     484                            kernel->getName() + ": " + name + " upper bound of segment length exceeds buffer capacity");
     485        }
     486
     487        Value * limit = nullptr;
     488        if (input.getRate().isFixed()) {
     489            // if the input is deferred, simply adding length to the processed item count may result in setting a limit
     490            // that is too low for. instead just calculate the limit of all fixed rates from the segment no.
     491            limit = b->CreateMul(b->CreateAdd(segNo, b->getSize(1)), segmentLength);
     492        } else {
     493            Value * const processed = b->getProcessedItemCount(name);
     494            limit = b->CreateAdd(processed, segmentLength);
     495        }
     496
     497        // TODO: currently, if we produce the exact amount as our limit states, we will have to process one additional segment
     498        // before we can consider this kernel finished. We ought to be able to avoid doing in some cases but need to prove its
     499        // always safe to do so.
     500
     501        Value * const consumingAll = b->CreateICmpULT(produced, limit);
     502        args[i + 2] = b->CreateSelect(consumingAll, produced, limit);
     503        isFinal = b->CreateAnd(isFinal, consumingAll);
     504
     505        // Check for available input (if it's both computable and not guaranteed to be sufficient by the processing rates)
     506        for (auto e : make_iterator_range(in_edges(u, G))) {
     507            const auto p = G[e];
     508            if (p.buffer == buffer) {
     509                BasicBlock * const sufficient = b->CreateBasicBlock(name + "_hasSufficientData");
     510
     511                Constant * const sl = b->getSize(strideLength);
     512
     513                Value * remaining = nullptr;
     514                if (input.getRate().isFixed()) {
     515                    remaining = b->CreateMul(segNo, sl);
     516                } else {
     517                    remaining = b->getProcessedItemCount(name);
     518                }
     519                remaining = b->CreateSub(produced, remaining);
     520
     521                Value * const hasSufficientData = b->CreateOr(b->CreateICmpUGE(remaining, sl), isFinal);
     522                terminated->addIncoming(b->getFalse(), b->GetInsertBlock());
     523                b->CreateLikelyCondBr(hasSufficientData, sufficient, kernelExit);
     524                b->SetInsertPoint(sufficient);
     525            }
     526        }
     527    }
     528
     529    applyOutputBufferExpansions(b, kernel);
     530
     531    args[0] = kernel->getInstance();
     532    args[1] = isFinal;
     533
     534    b->createDoSegmentCall(args);
     535
     536    if (inputs.empty() || kernel->canTerminateEarly()) {
     537        isFinal = b->CreateOr(isFinal, b->getTerminationSignal());
     538    }
     539    b->setTerminationSignal(isFinal);
     540//    b->CallPrintInt(kernel->getName() + "_finished", isFinal);
     541    BasicBlock * const kernelFinished = b->GetInsertBlock();
     542    kernelExit->moveAfter(kernelFinished);
     543    b->CreateBr(kernelExit);
     544
     545    b->SetInsertPoint(kernelExit);
     546    terminated->addIncoming(isFinal, kernelFinished);
     547
     548    updateProducedAndConsumedCounts(b, kernel);
     549
     550    return terminated;
     551}
     552
     553/** ------------------------------------------------------------------------------------------------------------- *
    483554 * @brief applyOutputBufferExpansions
    484555 ** ------------------------------------------------------------------------------------------------------------- */
    485 void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const std::string & name, DynamicBuffer * const db, const uint64_t baseSize) {
    486     BasicBlock * const doExpand = BasicBlock::Create(b->getContext(), name + "Expand", b->GetInsertBlock()->getParent());
    487     BasicBlock * const nextBlock = b->GetInsertBlock()->getNextNode();
    488     doExpand->moveAfter(b->GetInsertBlock());
    489     BasicBlock * const bufferReady = b->CreateBasicBlock(name + "Ready");
    490     bufferReady->moveAfter(doExpand);
    491     if (nextBlock) nextBlock->moveAfter(bufferReady);
    492 
    493     Value * const handle = db->getStreamSetHandle();
    494 
    495     Value * const produced = b->getProducedItemCount(name);
    496     Value * const consumed = b->getConsumedItemCount(name);
    497     Value * const required = b->CreateAdd(b->CreateSub(produced, consumed), b->getSize(2 * baseSize));
    498 
    499     b->CreateCondBr(b->CreateICmpUGT(required, db->getCapacity(b.get(), handle)), doExpand, bufferReady);
    500 
    501     b->SetInsertPoint(doExpand);
    502     db->doubleCapacity(b.get(), handle);
    503     // Ensure that capacity is sufficient by successive doubling, if necessary.
    504     b->CreateCondBr(b->CreateICmpUGT(required, db->getBufferedSize(b.get(), handle)), doExpand, bufferReady);
    505 
    506     b->SetInsertPoint(bufferReady);
    507 }
    508 
    509 void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * k) {
     556void PipelineGenerator::applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * k) {
    510557    const auto & outputs = k->getStreamSetOutputBuffers();
    511558    for (unsigned i = 0; i < outputs.size(); i++) {
    512559        if (isa<DynamicBuffer>(outputs[i])) {
    513             const auto ub = k->getUpperBound(k->getStreamOutput(i).getRate());
    514             const auto baseSize = (ub.numerator() * k->getStride() + ub.denominator() - 1) / ub.denominator();
     560            const auto baseSize = ceiling(k->getUpperBound(k->getStreamOutput(i).getRate()) * k->getStride() * codegen::SegmentSize);
    515561            if (LLVM_LIKELY(baseSize > 0)) {
     562
    516563                const auto & name = k->getStreamOutput(i).getName();
    517                 applyOutputBufferExpansions(b, name, cast<DynamicBuffer>(outputs[i]), baseSize);
     564
     565                BasicBlock * const doExpand = b->CreateBasicBlock(name + "Expand");
     566                BasicBlock * const nextBlock = b->GetInsertBlock()->getNextNode();
     567                doExpand->moveAfter(b->GetInsertBlock());
     568                BasicBlock * const bufferReady = b->CreateBasicBlock(name + "Ready");
     569                bufferReady->moveAfter(doExpand);
     570                if (nextBlock) nextBlock->moveAfter(bufferReady);
     571
     572                Value * const produced = b->getProducedItemCount(name);
     573                Value * const consumed = b->getConsumedItemCount(name);
     574                Value * const required = b->CreateAdd(b->CreateSub(produced, consumed), b->getSize(2 * baseSize));
     575
     576                b->CreateCondBr(b->CreateICmpUGT(required, b->getBufferedSize(name)), doExpand, bufferReady);
     577                b->SetInsertPoint(doExpand);
     578
     579                b->doubleCapacity(name);
     580                // Ensure that capacity is sufficient by successive doubling, if necessary.
     581                b->CreateCondBr(b->CreateICmpUGT(required, b->getBufferedSize(name)), doExpand, bufferReady);
     582
     583                b->SetInsertPoint(bufferReady);
     584
    518585            }
    519586        }
     
    522589
    523590/** ------------------------------------------------------------------------------------------------------------- *
    524  * @brief handleInsufficientData
     591 * @brief updateProducedAndConsumedCounts
    525592 ** ------------------------------------------------------------------------------------------------------------- */
    526 inline void handleInsufficientData(const std::unique_ptr<KernelBuilder> & b, Value * const produced, Value * const final, BasicBlock * const insufficient,
    527                                    const Kernel * const consumer,  const Binding & input, const StreamSetBuffer * const buffer) {
    528     const Kernel * const producer = buffer->getProducer();
    529     const Binding & output = producer->getStreamOutput(buffer);
    530     const auto consumedRate = consumer->getUpperBound(input.getRate()) * consumer->getStride();
    531     if (consumedRate > 0) {
    532         auto producedRate = producer->getLowerBound(output.getRate()) * producer->getStride();
    533         if (LLVM_UNLIKELY(input.hasLookahead())) {
    534             producedRate -= input.getLookahead();
    535         }
    536         if (LLVM_UNLIKELY(producedRate < consumedRate)) {
    537             const auto name = input.getName();
    538             BasicBlock * const sufficient = BasicBlock::Create(b->getContext(), name + "IsSufficient", b->GetInsertBlock()->getParent());
    539             Value * const processed = b->getProcessedItemCount(name);
    540 
    541             if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableAsserts))) {
    542                 b->CreateAssert(b->CreateICmpULE(processed, produced), input.getName() + ": processed cannot exceed produced");
    543             }
    544             Value * const unread = b->CreateSub(produced, processed);
    545             Constant * const amount = ConstantInt::get(unread->getType(), ceiling(consumedRate));
    546             Value * const cond = b->CreateOr(b->CreateICmpUGE(unread, amount), final);
    547             b->CreateLikelyCondBr(cond, sufficient, insufficient);
    548             b->SetInsertPoint(sufficient);
    549         }
    550     }
     593void PipelineGenerator::updateProducedAndConsumedCounts(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel) {
     594
     595    const auto & inputs = kernel->getStreamInputs();
     596    for (unsigned i = 0; i < inputs.size(); ++i) {
     597        Value * const processed = b->getProcessedItemCount(inputs[i].getName());
     598
     599        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
     600        auto f = consumedItemCount.find(buffer);
     601        Value * consumed = processed;
     602        if (f == consumedItemCount.end()) {
     603            consumedItemCount.emplace(buffer, consumed);
     604        } else {
     605            consumed = b->CreateUMin(consumed, f->second);
     606            f->second = consumed;
     607        }
     608
     609        // If this kernel is the last consumer of a input buffer, update the consumed count for that buffer.
     610        const auto c = lastConsumer.find(buffer);
     611        assert (c != lastConsumer.end());
     612        if (c->second == kernel) {
     613            Kernel * const producer = buffer->getProducer();
     614            const auto & output = producer->getStreamOutput(buffer);
     615            if (output.getRate().isRelative()) continue;
     616            b->setKernel(producer);
     617
     618            b->setConsumedItemCount(output.getName(), consumed);
     619            b->setKernel(kernel);
     620        }
     621    }
     622
     623    const auto & outputs = kernel->getStreamOutputs();
     624    for (unsigned i = 0; i < outputs.size(); ++i) {
     625        Value * const produced = b->getProducedItemCount(outputs[i].getName());
     626        const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i);
     627        assert (producedItemCount.count(buf) == 0);
     628        producedItemCount.emplace(buf, produced);
     629    }
     630
    551631}
    552632
    553 /** ------------------------------------------------------------------------------------------------------------- *
    554  * @brief requiresCopyBack
    555  ** ------------------------------------------------------------------------------------------------------------- */
    556 bool requiresCopyBack(const Kernel * k, const ProcessingRate & rate) {
    557     if (rate.isBounded() || rate.isUnknown()) {
    558         return true;
    559     } else if (rate.isRelative()) {
    560         return requiresCopyBack(k, k->getBinding(rate.getReference()).getRate());
    561     }
    562     return false;
    563 }
     633
Note: See TracChangeset for help on using the changeset viewer.