source: icGREP/icgrep-devel/icgrep/toolchain/pipeline.cpp @ 5865

Last change on this file since 5865 was 5865, checked in by nmedfort, 15 months ago

More work on the pipeline I/O rate handling

File size: 37.9 KB
Line 
1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
6#include "pipeline.h"
7#include <toolchain/toolchain.h>
8#include <kernels/kernel.h>
9#include <kernels/streamset.h>
10#include <llvm/IR/Module.h>
11#include <boost/container/flat_set.hpp>
12#include <boost/container/flat_map.hpp>
13#include <boost/graph/adjacency_list.hpp>
14#include <kernels/kernel_builder.h>
15
16#include <llvm/Support/raw_ostream.h>
17
18using namespace kernel;
19using namespace parabix;
20using namespace llvm;
21using namespace boost;
22using namespace boost::container;
23
24using Port = Kernel::Port;
25
26Function * makeThreadFunction(const std::unique_ptr<kernel::KernelBuilder> & b, const std::string & name) {
27    Function * const f = Function::Create(FunctionType::get(b->getVoidTy(), {b->getVoidPtrTy()}, false), Function::InternalLinkage, name, b->getModule());
28    f->setCallingConv(CallingConv::C);
29    f->arg_begin()->setName("state");
30    return f;
31}
32
33struct PipelineGenerator {
34
35    template <typename Value>
36    using StreamSetBufferMap = flat_map<const StreamSetBuffer *, Value>;
37
38    using CheckMap = flat_map<const Kernel *, std::vector<const StreamSetBuffer *>>;
39
40    using RateValue = ProcessingRate::RateValue;
41
42    struct Channel {
43        Channel() = default;
44        Channel(const RateValue & rate, const StreamSetBuffer * const buffer)
45        : rate(rate), buffer(buffer) { }
46
47        RateValue               rate;
48        const StreamSetBuffer * buffer;
49    };
50
51    using Graph = adjacency_list<vecS, vecS, bidirectionalS, const Kernel *, Channel, vecS>;
52
53    using Map = flat_map<const Kernel *, Graph::vertex_descriptor>;
54
55    void initialize(const std::vector<Kernel *> & kernels);
56
57    Value * executeKernel(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel, PHINode * const segNo, Value * const finished);
58
59protected:
60
61    Graph makeInputGraph(const std::vector<Kernel *> & kernels);
62
63    Graph makeOutputGraph(const std::vector<Kernel *> & kernels);
64
65    Graph printGraph(const bool input, Graph && G);
66
67    Graph pruneGraph(Graph && G);
68
69    void addChecks(Graph && G, CheckMap & M);
70
71    void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel);
72
73private:
74
75    CheckMap                            inputAvailabilityChecks;
76    CheckMap                            outputSpaceChecks;
77
78    StreamSetBufferMap<Value *>         producedItemCount;
79    StreamSetBufferMap<Value *>         consumedItemCount;
80    StreamSetBufferMap<const Kernel *>  lastConsumer;
81};
82
83/** ------------------------------------------------------------------------------------------------------------- *
84 * @brief generateSegmentParallelPipeline
85 *
86 * Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
87 * operating over an input stream set S, a segment-parallel implementation divides the input
88 * into segments and coordinates a set of T <= K threads to each process one segment at a time.
89 * Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
90 * fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
91 ** ------------------------------------------------------------------------------------------------------------- */
92void generateSegmentParallelPipeline(const std::unique_ptr<KernelBuilder> & b, const std::vector<Kernel *> & kernels) {
93
94    const unsigned n = kernels.size();
95    Module * const m = b->getModule();
96    IntegerType * const sizeTy = b->getSizeTy();
97    PointerType * const voidPtrTy = b->getVoidPtrTy();
98    Constant * nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
99    std::vector<Type *> structTypes;
100    codegen::BufferSegments = std::max(codegen::BufferSegments, codegen::ThreadNum);
101
102    Value * instance[n];
103    for (unsigned i = 0; i < n; ++i) {
104        instance[i] = kernels[i]->getInstance();
105        structTypes.push_back(instance[i]->getType());
106    }
107    StructType * const sharedStructType = StructType::get(m->getContext(), structTypes);
108    StructType * const threadStructType = StructType::get(m->getContext(), {sharedStructType->getPointerTo(), sizeTy});
109
110    const auto ip = b->saveIP();
111
112    Function * const threadFunc = makeThreadFunction(b, "segment");
113    auto args = threadFunc->arg_begin();
114
115    // -------------------------------------------------------------------------------------------------------------------------
116    // MAKE SEGMENT PARALLEL PIPELINE THREAD
117    // -------------------------------------------------------------------------------------------------------------------------
118
119     // Create the basic blocks for the thread function.
120    BasicBlock * entryBlock = BasicBlock::Create(b->getContext(), "entry", threadFunc);
121    b->SetInsertPoint(entryBlock);
122
123    Value * const threadStruct = b->CreateBitCast(&*(args), threadStructType->getPointerTo());
124
125    Value * const sharedStatePtr = b->CreateLoad(b->CreateGEP(threadStruct, {b->getInt32(0), b->getInt32(0)}));
126    for (unsigned k = 0; k < n; ++k) {
127        Value * ptr = b->CreateLoad(b->CreateGEP(sharedStatePtr, {b->getInt32(0), b->getInt32(k)}));
128        kernels[k]->setInstance(ptr);
129    }
130    Value * const segOffset = b->CreateLoad(b->CreateGEP(threadStruct, {b->getInt32(0), b->getInt32(1)}));
131
132    PipelineGenerator G;
133
134    BasicBlock * const segmentLoop = b->CreateBasicBlock("segmentLoop");
135    b->CreateBr(segmentLoop);
136
137    b->SetInsertPoint(segmentLoop);
138    G.initialize(kernels);
139    PHINode * const segNo = b->CreatePHI(b->getSizeTy(), 2, "segNo");
140    segNo->addIncoming(segOffset, entryBlock);
141    Value * finished = nullptr;
142
143    Value * cycleCountStart = nullptr;
144    Value * cycleCountEnd = nullptr;
145    if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
146        cycleCountStart = b->CreateReadCycleCounter();
147    }
148
149    const bool serialize = codegen::DebugOptionIsSet(codegen::SerializeThreads);
150
151    for (unsigned k = 0; k < n; ++k) {
152
153        const Kernel * const kernel = kernels[k];
154
155        BasicBlock * const kernelWait = b->CreateBasicBlock(kernel->getName() + "Wait");
156        b->CreateBr(kernelWait);
157
158        b->SetInsertPoint(kernelWait);
159        b->setKernel(kernels[serialize ? (n - 1) : k]);
160        Value * const processedSegmentCount = b->acquireLogicalSegmentNo();
161        b->setKernel(kernel);
162        assert (processedSegmentCount->getType() == segNo->getType());
163        Value * const ready = b->CreateICmpEQ(segNo, processedSegmentCount);
164
165        BasicBlock * const kernelCheck = b->CreateBasicBlock(kernel->getName() + "Check");
166        b->CreateCondBr(ready, kernelCheck, kernelWait);
167
168        b->SetInsertPoint(kernelCheck);
169
170        finished = G.executeKernel(b, kernel, segNo, finished);
171
172        if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
173            cycleCountEnd = b->CreateReadCycleCounter();
174            Value * counterPtr = b->getCycleCountPtr();
175            b->CreateStore(b->CreateAdd(b->CreateLoad(counterPtr), b->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
176            cycleCountStart = cycleCountEnd;
177        }
178
179        b->releaseLogicalSegmentNo(b->CreateAdd(segNo, b->getSize(1)));
180    }
181
182    segNo->addIncoming(b->CreateAdd(segNo, b->getSize(codegen::ThreadNum)), b->GetInsertBlock());
183
184    BasicBlock * const segmentExit = b->CreateBasicBlock("segmentExit");
185    b->CreateUnlikelyCondBr(finished, segmentExit, segmentLoop);
186
187    b->SetInsertPoint(segmentExit);
188
189    // only call pthread_exit() within spawned threads; otherwise it'll be equivalent to calling exit() within the process
190    BasicBlock * const exitThread = b->CreateBasicBlock("ExitThread");
191    BasicBlock * const exitFunction = b->CreateBasicBlock("ExitProcessFunction");
192
193    b->CreateCondBr(b->CreateIsNull(segOffset), exitFunction, exitThread);
194    b->SetInsertPoint(exitThread);
195    b->CreatePThreadExitCall(nullVoidPtrVal);
196    b->CreateBr(exitFunction);
197    b->SetInsertPoint(exitFunction);
198    b->CreateRetVoid();
199
200    // -------------------------------------------------------------------------------------------------------------------------
201    b->restoreIP(ip);
202
203    for (unsigned i = 0; i < n; ++i) {
204        kernels[i]->setInstance(instance[i]);
205    }
206
207    // -------------------------------------------------------------------------------------------------------------------------
208    // MAKE SEGMENT PARALLEL PIPELINE DRIVER
209    // -------------------------------------------------------------------------------------------------------------------------
210    const unsigned threads = codegen::ThreadNum - 1;
211    assert (codegen::ThreadNum > 0);
212    Type * const pthreadsTy = ArrayType::get(sizeTy, threads);
213    AllocaInst * const pthreads = b->CreateAlloca(pthreadsTy);
214    Value * threadIdPtr[threads];
215
216    for (unsigned i = 0; i < threads; ++i) {
217        threadIdPtr[i] = b->CreateGEP(pthreads, {b->getInt32(0), b->getInt32(i)});
218    }
219
220    for (unsigned i = 0; i < n; ++i) {
221        b->setKernel(kernels[i]);
222        b->releaseLogicalSegmentNo(b->getSize(0));
223    }
224
225    AllocaInst * const sharedStruct = b->CreateCacheAlignedAlloca(sharedStructType);
226    for (unsigned i = 0; i < n; ++i) {
227        Value * ptr = b->CreateGEP(sharedStruct, {b->getInt32(0), b->getInt32(i)});
228        b->CreateStore(kernels[i]->getInstance(), ptr);
229    }
230
231    // use the process thread to handle the initial segment function after spawning (n - 1) threads to handle the subsequent offsets
232    for (unsigned i = 0; i < threads; ++i) {
233        AllocaInst * const threadState = b->CreateAlloca(threadStructType);
234        b->CreateStore(sharedStruct, b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(0)}));
235        b->CreateStore(b->getSize(i + 1), b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(1)}));
236        b->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, threadFunc, threadState);
237    }
238
239    AllocaInst * const threadState = b->CreateAlloca(threadStructType);
240    b->CreateStore(sharedStruct, b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(0)}));
241    b->CreateStore(b->getSize(0), b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(1)}));
242    b->CreateCall(threadFunc, b->CreatePointerCast(threadState, voidPtrTy));
243
244    AllocaInst * const status = b->CreateAlloca(voidPtrTy);
245    for (unsigned i = 0; i < threads; ++i) {
246        Value * threadId = b->CreateLoad(threadIdPtr[i]);
247        b->CreatePThreadJoinCall(threadId, status);
248    }
249   
250    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
251        for (const Kernel * kernel : kernels) {
252            b->setKernel(kernel);
253            const auto & inputs = kernel->getStreamInputs();
254            const auto & outputs = kernel->getStreamOutputs();
255            Value * items = nullptr;
256            if (inputs.empty()) {
257                items = b->getProducedItemCount(outputs[0].getName());
258            } else {
259                items = b->getProcessedItemCount(inputs[0].getName());
260            }
261            Value * fItems = b->CreateUIToFP(items, b->getDoubleTy());
262            Value * cycles = b->CreateLoad(b->getCycleCountPtr());
263            Value * fCycles = b->CreateUIToFP(cycles, b->getDoubleTy());
264            const auto formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
265            Value * stringPtr = b->CreatePointerCast(b->GetString(formatString), b->getInt8PtrTy());
266            b->CreateCall(b->GetDprintf(), {b->getInt32(2), stringPtr, fItems, fCycles, b->CreateFDiv(fCycles, fItems)});
267        }
268    }
269   
270}
271
272
273/** ------------------------------------------------------------------------------------------------------------- *
274 * @brief generatePipelineLoop
275 ** ------------------------------------------------------------------------------------------------------------- */
276void generatePipelineLoop(const std::unique_ptr<KernelBuilder> & b, const std::vector<Kernel *> & kernels) {
277
278    // Create the basic blocks for the loop.
279    BasicBlock * const entryBlock = b->GetInsertBlock();
280    BasicBlock * const pipelineLoop = b->CreateBasicBlock("pipelineLoop");
281    BasicBlock * const pipelineExit = b->CreateBasicBlock("pipelineExit");
282
283    PipelineGenerator G;
284
285    b->CreateBr(pipelineLoop);
286
287    b->SetInsertPoint(pipelineLoop);
288    G.initialize(kernels);
289    PHINode * const segNo = b->CreatePHI(b->getSizeTy(), 2, "segNo");
290    segNo->addIncoming(b->getSize(0), entryBlock);
291    Value * finished = nullptr;
292
293    Value * cycleCountStart = nullptr;
294    Value * cycleCountEnd = nullptr;
295    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
296        cycleCountStart = b->CreateReadCycleCounter();
297    }
298
299    for (Kernel * const kernel : kernels) {
300
301        b->setKernel(kernel);
302
303        finished = G.executeKernel(b, kernel, segNo, finished);
304
305        if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
306            cycleCountEnd = b->CreateReadCycleCounter();
307            Value * counterPtr = b->getCycleCountPtr();
308            b->CreateStore(b->CreateAdd(b->CreateLoad(counterPtr), b->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
309            cycleCountStart = cycleCountEnd;
310        }
311    }
312
313    segNo->addIncoming(b->CreateAdd(segNo, b->getSize(1)), b->GetInsertBlock());
314    b->CreateCondBr(finished, pipelineExit, pipelineLoop);
315
316    pipelineExit->moveAfter(b->GetInsertBlock());
317
318    b->SetInsertPoint(pipelineExit);
319
320    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
321        for (unsigned k = 0; k < kernels.size(); k++) {
322            auto & kernel = kernels[k];
323            b->setKernel(kernel);
324            const auto & inputs = kernel->getStreamInputs();
325            const auto & outputs = kernel->getStreamOutputs();
326            Value * items = nullptr;
327            if (inputs.empty()) {
328                items = b->getProducedItemCount(outputs[0].getName());
329            } else {
330                items = b->getProcessedItemCount(inputs[0].getName());
331            }
332            Value * fItems = b->CreateUIToFP(items, b->getDoubleTy());
333            Value * cycles = b->CreateLoad(b->getCycleCountPtr());
334            Value * fCycles = b->CreateUIToFP(cycles, b->getDoubleTy());
335            const auto formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
336            Value * stringPtr = b->CreatePointerCast(b->GetString(formatString), b->getInt8PtrTy());
337            b->CreateCall(b->GetDprintf(), {b->getInt32(2), stringPtr, fItems, fCycles, b->CreateFDiv(fCycles, fItems)});
338        }
339    }
340
341}
342
343/** ------------------------------------------------------------------------------------------------------------- *
344 * @brief initialize
345 ** ------------------------------------------------------------------------------------------------------------- */
346void PipelineGenerator::initialize(const std::vector<Kernel *> & kernels) {
347
348    // Our goal when building G is *not* to model the dataflow of our program but instead to
349    // detetermine the minimum number of sufficient data tests needed to ensure each kernel has
350    // enough data to progress.
351
352    // For example, suppose we have kernels A, B and C, and that B has a fixed input and fixed
353    // output rate. C also has a fixed input rate but A does *not* have a fixed output rate.
354    // C must test whether it has enough input from B as B is not guaranteed to have enough
355    // input from A. Moreover if C is depedent on B, C could be skipped entirely.
356
357    // Note: we cannot simply test the output of A for both B and C. In the data-parallel
358    // pipeline, A's state may change by the time we process C.
359
360//    addChecks(printGraph(true, pruneGraph(printGraph(true, makeInputGraph(kernels)))), inputAvailabilityChecks);
361//    addChecks(printGraph(false, pruneGraph(printGraph(false, makeOutputGraph(kernels)))), outputSpaceChecks);
362
363    addChecks(pruneGraph(makeInputGraph(kernels)), inputAvailabilityChecks);
364    addChecks(pruneGraph(makeOutputGraph(kernels)), outputSpaceChecks);
365
366
367    // iterate through each kernel in order and determine which kernel last used a particular buffer
368    for (Kernel * const kernel : kernels) {
369        const auto & inputs = kernel->getStreamInputs();
370        for (unsigned i = 0; i < inputs.size(); ++i) {
371            lastConsumer[kernel->getStreamSetInputBuffer(i)] = kernel;
372        }
373    }
374
375}
376
377/** ------------------------------------------------------------------------------------------------------------- *
378 * @brief makeInputGraph
379 *
380 * The input graph models whether a kernel could *consume* more data than may be produced by a preceeding kernel.
381 ** ------------------------------------------------------------------------------------------------------------- */
382PipelineGenerator::Graph PipelineGenerator::makeInputGraph(const std::vector<Kernel *> & kernels) {
383
384    const auto n = kernels.size();
385    Graph   G(n);
386    Map     M;
387
388    for (Graph::vertex_descriptor v = 0; v < n; ++v) {
389
390        const Kernel * const consumer = kernels[v];
391        M.emplace(consumer, v);
392        G[v] = consumer;
393
394        const auto & inputs = consumer->getStreamInputs();
395        for (unsigned i = 0; i < inputs.size(); ++i) {
396
397            const Binding & input = inputs[i];
398            auto ub_in = consumer->getUpperBound(input.getRate()) * consumer->getStride(); assert (ub_in > 0);
399            if (input.hasLookahead()) {
400                ub_in += input.getLookahead();
401            }
402
403            const auto buffer = consumer->getStreamSetInputBuffer(i);
404            const Kernel * const producer = buffer->getProducer();
405            const Binding & output = producer->getStreamOutput(buffer);
406            const auto lb_out = producer->getLowerBound(output.getRate()) * producer->getStride();
407
408            const auto rate = lb_out / ub_in;
409            const auto f = M.find(producer); assert (f != M.end());
410            const auto u = f->second;
411            // If we have multiple inputs from the same kernel, we only need to consider the "slowest" one
412            bool slowest = true;
413            if (lb_out > 0) {
414                for (const auto e : make_iterator_range(in_edges(v, G))) {
415                    if (source(e, G) == u) {
416                        Channel & p = G[e];
417                        slowest = false;
418                        if (rate < p.rate) {
419                            p.rate = rate;
420                            p.buffer = buffer;
421                        }
422                        break;
423                    }
424                }
425            }
426            if (slowest) {
427                add_edge(u, v, Channel{rate, buffer}, G);
428            }
429        }
430    }
431    return G;
432}
433
434/** ------------------------------------------------------------------------------------------------------------- *
435 * @brief makeOutputGraph
436 *
437 * The output graph models whether a kernel could *produce* more data than may be consumed by a subsequent kernel.
438 ** ------------------------------------------------------------------------------------------------------------- */
439PipelineGenerator::Graph PipelineGenerator::makeOutputGraph(const std::vector<Kernel *> & kernels) {
440
441    const auto n = kernels.size();
442    Graph   G(n);
443    Map     M;
444
445    for (Graph::vertex_descriptor i = 0; i < n; ++i) {
446        const Kernel * const consumer = kernels[i];
447        const auto v = n - i - 1;
448        M.emplace(consumer, v);
449        G[v] = consumer;
450
451        const auto & inputs = consumer->getStreamInputs();
452        for (unsigned i = 0; i < inputs.size(); ++i) {
453            const auto buffer = consumer->getStreamSetInputBuffer(i);
454            if (isa<SourceBuffer>(buffer)) continue;
455            const Kernel * const producer = buffer->getProducer();
456            const Binding & output = producer->getStreamOutput(buffer);
457            auto ub_out = producer->getUpperBound(output.getRate()) * producer->getStride();
458            if (LLVM_UNLIKELY(ub_out > 0)) { // unknown output rates are handled by reallocating their buffers
459                const Binding & input = inputs[i];
460                if (input.hasLookahead()) {
461                    ub_out -= input.getLookahead();
462                }
463                const auto lb_in = consumer->getLowerBound(input.getRate()) * consumer->getStride();
464                const auto inverseRate = lb_in / ub_out;
465                const auto f = M.find(producer); assert (f != M.end());
466                const auto u = f->second;
467                // If we have multiple inputs from the same kernel, we only need to consider the "fastest" one
468                bool fastest = true;
469                if (ub_out > 0) {
470                    for (const auto e : make_iterator_range(in_edges(v, G))) {
471                        if (source(e, G) == u) {
472                            Channel & p = G[e];
473                            fastest = false;
474                            if (inverseRate < p.rate) {
475                                p.rate = inverseRate;
476                                p.buffer = buffer;
477                            }
478                            break;
479                        }
480                    }
481                }
482                if (fastest) {
483                    add_edge(v, u, Channel{inverseRate, buffer}, G);
484                }
485            }
486        }
487    }
488    return G;
489}
490
491/** ------------------------------------------------------------------------------------------------------------- *
492 * @brief printGraph
493 ** ------------------------------------------------------------------------------------------------------------- */
494PipelineGenerator::Graph PipelineGenerator::printGraph(const bool input, Graph && G) {
495
496    auto & out = errs();
497
498    out << "digraph " << (input ? "I" : "O") << " {\n";
499    for (auto u : make_iterator_range(vertices(G))) {
500        out << "v" << u << " [label=\"" << u << ": "
501            << G[u]->getName() << "\"];\n";
502    }
503
504    for (auto e : make_iterator_range(edges(G))) {
505        const Channel & c = G[e];
506        const auto s = source(e, G);
507        const auto t = target(e, G);
508        const Kernel * const S = G[input ? s : t];
509        const Kernel * const T = G[input ? t : s];
510
511        out << "v" << s << " -> v" << t
512            << " [label=\"";
513
514        if (c.buffer) {
515            out << S->getStreamOutput(c.buffer).getName()
516                << " -> "
517                << T->getStreamInput(c.buffer).getName()
518                << "   ";
519        }
520
521        out << c.rate.numerator() << " / " << c.rate.denominator()
522            << "\"];\n";
523    }
524
525    out << "}\n\n";
526    out.flush();
527
528    return G;
529}
530
531/** ------------------------------------------------------------------------------------------------------------- *
532 * @brief pruneGraph
533 ** ------------------------------------------------------------------------------------------------------------- */
534PipelineGenerator::Graph PipelineGenerator::pruneGraph(Graph && G) {
535
536    // Take a transitive closure of G but whenever we attempt to insert an edge into the closure
537    // that already exists, check instead whether the rate of our proposed edge is <= the existing
538    // edge's rate. If so, the data availability/consumption is transitively guaranteed.
539    for (const auto u : make_iterator_range(vertices(G))) {
540        for (auto ei : make_iterator_range(in_edges(u, G))) {
541            const auto v = source(ei, G);
542            const Channel & pu = G[ei];
543            for (auto ej : make_iterator_range(out_edges(u, G))) {
544                const auto w = target(ej, G);
545                const auto ratio = RateValue(G[u]->getStride(), G[w]->getStride());
546                const auto rate = pu.rate * ratio;
547                bool insert = true;
548                for (auto ek : make_iterator_range(in_edges(w, G))) {
549                    if (source(ek, G) == v) {
550                        Channel & pw = G[ek];
551                        if (rate <= pw.rate) {
552                            pw.buffer = nullptr;
553                        }
554                        insert = false;
555                        break;
556                    }
557                }
558                if (insert) {
559                    add_edge(v, w, Channel{rate, nullptr}, G);
560                }
561            }
562        }
563    }
564
565    // remove any closure edges from G
566    remove_edge_if([&G](const Graph::edge_descriptor e) { return G[e].buffer == nullptr; }, G);
567
568    // For any kernel, if we do not need to check any of its inputs, we can avoid checking any of its
569    // outputs that have a rate >= 1 (i.e., its production rates >= consumption rates.)
570    for (const auto u : make_iterator_range(vertices(G))) {
571        if (in_degree(u, G) == 0) {
572            remove_out_edge_if(u, [&G](const Graph::edge_descriptor e) { return G[e].rate >= RateValue{1, 1}; }, G);
573        }
574    }
575
576    return G;
577}
578
579/** ------------------------------------------------------------------------------------------------------------- *
580 * @brief addChecks
581 ** ------------------------------------------------------------------------------------------------------------- */
582void PipelineGenerator::addChecks(Graph && G, CheckMap & M) {
583    for (const auto u : make_iterator_range(vertices(G))) {
584        if (LLVM_LIKELY(in_degree(u, G) == 0)) continue;
585        flat_set<const StreamSetBuffer *> B;
586        for (auto e : make_iterator_range(in_edges(u, G))) {
587            B.insert(G[e].buffer);
588        }
589        M.emplace(G[u], std::vector<const StreamSetBuffer *>{B.begin(), B.end()});
590    }
591}
592
593/** ------------------------------------------------------------------------------------------------------------- *
594 * @brief executeKernel
595 ** ------------------------------------------------------------------------------------------------------------- */
596Value * PipelineGenerator::executeKernel(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel, PHINode * const segNo, Value * const finished) {
597
598    const auto & inputs = kernel->getStreamInputs();
599
600    std::vector<Value *> args(2 + inputs.size());
601
602    BasicBlock * const kernelEntry = b->GetInsertBlock();
603    BasicBlock * const kernelCode = b->CreateBasicBlock(kernel->getName());
604    BasicBlock * const kernelFinished = b->CreateBasicBlock(kernel->getName() + "Finished");
605    BasicBlock * const kernelExit = b->CreateBasicBlock(kernel->getName() + "Exit");
606
607    b->CreateUnlikelyCondBr(b->getTerminationSignal(), kernelExit, kernelCode);
608
609    b->SetInsertPoint(kernelFinished);
610    PHINode * const final = b->CreatePHI(b->getInt1Ty(), 2);
611
612    b->SetInsertPoint(kernelExit);
613    PHINode * const terminated = b->CreatePHI(b->getInt1Ty(), 2);
614    // The initial "isFinal" state is equal to the first kernel's termination signal state
615    terminated->addIncoming(finished ? finished : b->getTrue(), kernelEntry);
616    Value * isFinal = finished ? finished : b->getFalse();
617
618    // Since it is possible that a sole consumer of some stream could terminate early, set the
619    // initial consumed amount to the amount produced in this iteration.
620    std::vector<PHINode *> consumedItemCountPhi(inputs.size());
621    std::vector<Value *> priorConsumedItemCount(inputs.size());
622    for (unsigned i = 0; i < inputs.size(); ++i) {
623        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
624        auto c = consumedItemCount.find(buffer);
625        PHINode * const consumedPhi = b->CreatePHI(b->getSizeTy(), 2);
626        Value * consumed = nullptr;
627        if (c == consumedItemCount.end()) {
628            const auto p = producedItemCount.find(buffer);
629            assert (p != producedItemCount.end());
630            consumed = p->second;
631            consumedItemCount.emplace(buffer, consumedPhi);
632        } else {
633            consumed = c->second;
634            c->second = consumedPhi;
635        }
636        consumedPhi->addIncoming(consumed, kernelEntry);
637        consumedItemCountPhi[i] = consumedPhi;
638        priorConsumedItemCount[i] = consumed;
639    }
640
641    b->SetInsertPoint(kernelCode);
642
643    // Check for sufficient output space
644    const auto O = outputSpaceChecks.find(kernel);
645    if (O != outputSpaceChecks.end()) {
646        for (const StreamSetBuffer * buffer : O->second) {
647
648
649            const Binding & output = kernel->getStreamOutput(buffer);
650            const auto name = output.getName();
651            BasicBlock * const sufficient = b->CreateBasicBlock(name + "HasOutputSpace");
652            const auto ub = kernel->getUpperBound(output.getRate()); assert (ub > 0);
653            Constant * const strideLength = b->getSize(ceiling(ub * kernel->getStride()));
654            Value * const produced = b->getProducedItemCount(name);
655            Value * const consumed = b->getConsumedItemCount(name);
656            Value * const unused = b->CreateSub(produced, consumed);
657            Value * const potentialData = b->CreateAdd(unused, strideLength);
658            Value * const capacity = b->getBufferedSize(name);
659
660          //  b->CallPrintInt("< " + kernel->getName() + "_" + name + "_potential", potentialData);
661          //  b->CallPrintInt("< " + kernel->getName() + "_" + name + "_capacity", capacity);
662
663            Value * const hasSufficientSpace = b->CreateICmpULE(potentialData, capacity);
664
665          //  b->CallPrintInt("* < " + kernel->getName() + "_" + name + "_sufficientSpace", hasSufficientSpace);
666
667            final->addIncoming(b->getFalse(), b->GetInsertBlock());
668            b->CreateLikelyCondBr(hasSufficientSpace, sufficient, kernelFinished);
669            b->SetInsertPoint(sufficient);
670        }
671    }
672
673    for (unsigned i = 0; i < inputs.size(); ++i) {
674        const Binding & input = inputs[i];
675        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
676        const auto name = input.getName();
677
678        const auto p = producedItemCount.find(buffer);
679        if (LLVM_UNLIKELY(p == producedItemCount.end())) {
680            report_fatal_error(kernel->getName() + " uses stream set " + name + " prior to its definition");
681        }
682        Value * const produced = p->second;
683
684      //  b->CallPrintInt("< " + kernel->getName() + "_" + name + "_produced", produced);
685
686        const auto ub = kernel->getUpperBound(input.getRate()); assert (ub > 0);
687        const auto strideLength = ceiling(ub * kernel->getStride()) ;
688        Constant * const segmentLength = b->getSize(strideLength * codegen::SegmentSize);
689
690        if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts) && !isa<SourceBuffer>(buffer))) {
691            b->CreateAssert(b->CreateICmpULE(segmentLength, b->getCapacity(name)),
692                            kernel->getName() + ": " + name + " upper bound of segment length exceeds buffer capacity");
693        }
694
695//        Value * limit = nullptr;
696//        if (input.getRate().isFixed()) {
697//            // if the input is deferred, simply adding length to the processed item count may result in setting a limit
698//            // that is too low for. instead just calculate the limit of all fixed rates from the segment no.
699//            limit = b->CreateMul(b->CreateAdd(segNo, b->getSize(1)), segmentLength);
700//        } else {
701//            Value * const processed = b->getProcessedItemCount(name);
702//            limit = b->CreateAdd(processed, segmentLength);
703//        }
704
705        // if the input is deferred, simply adding length to the processed item count may result in setting a limit
706        // that is too low for. instead just calculate the limit of all fixed rates from the segment no.
707        Value * const limit = b->CreateMul(b->CreateAdd(segNo, b->getSize(1)), segmentLength);
708
709     //  b->CallPrintInt("< " + kernel->getName() + "_" + name + "_limit", limit);
710
711        // TODO: currently, if we produce the exact amount as our limit states, we will have to process one additional segment
712        // before we can consider this kernel finished. We ought to be able to avoid doing in some cases but need to prove its
713        // always safe to do so.
714
715        Value * const consumingAll = b->CreateICmpULT(produced, limit);
716        args[i + 2] = b->CreateSelect(consumingAll, produced, limit);
717        isFinal = b->CreateAnd(isFinal, consumingAll);
718    }
719
720    // Check for available input
721    const auto I = inputAvailabilityChecks.find(kernel);
722    if (I != inputAvailabilityChecks.end()) {
723        for (const StreamSetBuffer * buffer : I->second) {
724            const Binding & input = kernel->getStreamInput(buffer);
725            const auto name = input.getName();
726            BasicBlock * const sufficient = b->CreateBasicBlock(name + "HasInputData");
727            const auto ub = kernel->getUpperBound(input.getRate()); assert (ub > 0);
728            Constant * const strideLength = b->getSize(ceiling(ub * kernel->getStride()));
729            Value * const processed = b->getProcessedItemCount(name);
730//            if (input.getRate().isFixed()) {
731//                processed = b->CreateMul(segNo, strideLength);
732//            } else {
733//                processed = b->getProcessedItemCount(name);
734//            }
735            const auto p = producedItemCount.find(buffer);
736            assert (p != producedItemCount.end());
737            Value * const produced = p->second;
738            Value * const unprocessed = b->CreateSub(produced, processed);
739
740          //  b->CallPrintInt("< " + kernel->getName() + "_" + name + "_unprocessed", unprocessed);
741
742            Value * const hasSufficientData = b->CreateOr(b->CreateICmpUGE(unprocessed, strideLength), isFinal);
743
744          //  b->CallPrintInt("* < " + kernel->getName() + "_" + name + "_sufficientData", hasSufficientData);
745
746            final->addIncoming(b->getFalse(), b->GetInsertBlock());
747            b->CreateLikelyCondBr(hasSufficientData, sufficient, kernelFinished);
748            b->SetInsertPoint(sufficient);
749        }
750    }
751
752    applyOutputBufferExpansions(b, kernel);
753
754    args[0] = kernel->getInstance();
755    args[1] = isFinal;
756
757    b->createDoSegmentCall(args);
758
759    if (inputs.empty() || kernel->canTerminateEarly()) {
760        isFinal = b->CreateOr(isFinal, b->getTerminationSignal());
761    }
762    b->setTerminationSignal(isFinal);
763  //  b->CallPrintInt(kernel->getName() + "_finished", isFinal);
764    final->addIncoming(isFinal, b->GetInsertBlock());
765    b->CreateBr(kernelFinished);
766
767    b->SetInsertPoint(kernelFinished);
768
769    // update the consumed item counts
770    for (unsigned i = 0; i < inputs.size(); ++i) {
771        Value * const processed = b->getProcessedItemCount(inputs[i].getName());
772      //  b->CallPrintInt("> " + kernel->getName() + "_" + inputs[i].getName() + "_processed", processed);
773        Value * const consumed = b->CreateUMin(priorConsumedItemCount[i], processed);
774        consumedItemCountPhi[i]->addIncoming(consumed, kernelFinished);
775    }
776    b->CreateBr(kernelExit);
777
778    kernelExit->moveAfter(kernelFinished);
779
780    b->SetInsertPoint(kernelExit);
781    terminated->addIncoming(final, kernelFinished);
782
783
784    // If this kernel is the last consumer of a input buffer, update the consumed count for that buffer.
785    // NOTE: unless we can prove that this kernel cannot terminate before any prior consumer, we cannot
786    // put this code into the kernelFinished block.
787    for (unsigned i = 0; i < inputs.size(); ++i) {
788        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
789        const auto c = lastConsumer.find(buffer);
790        assert (c != lastConsumer.end());
791        if (c->second == kernel) {
792            Kernel * const producer = buffer->getProducer();
793            const auto & output = producer->getStreamOutput(buffer);
794            if (output.getRate().isRelative()) continue;
795
796           // b->CallPrintInt("* " + producer->getName() + "_" + output.getName() + "_consumed", consumedItemCountPhi[i]);
797
798            b->setKernel(producer);
799            if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
800                Value * const alreadyConsumed = b->getConsumedItemCount(output.getName());
801                b->CreateAssert(b->CreateICmpULE(alreadyConsumed, consumedItemCountPhi[i]),
802                                producer->getName() + ": " + output.getName() + " consumed item count is not monotonically non-decreasing!");
803            }
804            b->setConsumedItemCount(output.getName(), consumedItemCountPhi[i]);
805            b->setKernel(kernel);
806        }
807    }
808
809    const auto & outputs = kernel->getStreamOutputs();
810    for (unsigned i = 0; i < outputs.size(); ++i) {
811        Value * const produced = b->getProducedItemCount(outputs[i].getName());
812
813       // b->CallPrintInt("> " + kernel->getName() + "_" + outputs[i].getName() + "_produced", produced);
814
815        const StreamSetBuffer * const buffer = kernel->getStreamSetOutputBuffer(i);
816        assert (producedItemCount.count(buffer) == 0);
817        producedItemCount.emplace(buffer, produced);
818    }
819
820    return terminated;
821}
822
823
824/** ------------------------------------------------------------------------------------------------------------- *
825 * @brief applyOutputBufferExpansions
826 ** ------------------------------------------------------------------------------------------------------------- */
827void PipelineGenerator::applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * k) {
828    const auto & outputs = k->getStreamSetOutputBuffers();
829    for (unsigned i = 0; i < outputs.size(); i++) {
830        if (isa<DynamicBuffer>(outputs[i])) {
831            const auto baseSize = ceiling(k->getUpperBound(k->getStreamOutput(i).getRate()) * k->getStride() * codegen::SegmentSize);
832            if (LLVM_LIKELY(baseSize > 0)) {
833
834                const auto & name = k->getStreamOutput(i).getName();
835
836                BasicBlock * const doExpand = b->CreateBasicBlock(name + "Expand");
837                BasicBlock * const nextBlock = b->GetInsertBlock()->getNextNode();
838                doExpand->moveAfter(b->GetInsertBlock());
839                BasicBlock * const bufferReady = b->CreateBasicBlock(name + "Ready");
840                bufferReady->moveAfter(doExpand);
841                if (nextBlock) nextBlock->moveAfter(bufferReady);
842
843                Value * const produced = b->getProducedItemCount(name);
844                Value * const consumed = b->getConsumedItemCount(name);
845                Value * const required = b->CreateAdd(b->CreateSub(produced, consumed), b->getSize(2 * baseSize));
846
847                b->CreateCondBr(b->CreateICmpUGT(required, b->getBufferedSize(name)), doExpand, bufferReady);
848                b->SetInsertPoint(doExpand);
849
850                b->doubleCapacity(name);
851                // Ensure that capacity is sufficient by successive doubling, if necessary.
852                b->CreateCondBr(b->CreateICmpUGT(required, b->getBufferedSize(name)), doExpand, bufferReady);
853
854                b->SetInsertPoint(bufferReady);
855
856            }
857        }
858    }
859}
860
Note: See TracBrowser for help on using the repository browser.