source: icGREP/icgrep-devel/icgrep/toolchain/pipeline.cpp @ 5921

Last change on this file since 5921 was 5921, checked in by xwa163, 14 months ago
  1. Initial checkin for new approach for lz4 index decoder that always use 4MB buffer
  2. Add test case for new approach (for now test cases will fail when test file is larger than 4MB)
File size: 38.9 KB
Line 
1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5
6#include "pipeline.h"
7#include <toolchain/toolchain.h>
8#include <kernels/kernel.h>
9#include <kernels/streamset.h>
10#include <llvm/IR/Module.h>
11#include <boost/container/flat_set.hpp>
12#include <boost/container/flat_map.hpp>
13#include <boost/graph/adjacency_list.hpp>
14#include <kernels/kernel_builder.h>
15
16#include <llvm/Support/raw_ostream.h>
17
18using namespace kernel;
19using namespace parabix;
20using namespace llvm;
21using namespace boost;
22using namespace boost::container;
23
24using Port = Kernel::Port;
25
26Function * makeThreadFunction(const std::unique_ptr<kernel::KernelBuilder> & b, const std::string & name) {
27    Function * const f = Function::Create(FunctionType::get(b->getVoidTy(), {b->getVoidPtrTy()}, false), Function::InternalLinkage, name, b->getModule());
28    f->setCallingConv(CallingConv::C);
29    f->arg_begin()->setName("state");
30    return f;
31}
32
33struct PipelineGenerator {
34
35    template <typename Value>
36    using StreamSetBufferMap = flat_map<const StreamSetBuffer *, Value>;
37
38    using CheckMap = flat_map<const Kernel *, std::vector<const StreamSetBuffer *>>;
39
40    using RateValue = ProcessingRate::RateValue;
41
42    struct Channel {
43        Channel() = default;
44        Channel(const RateValue & rate, const StreamSetBuffer * const buffer)
45        : rate(rate), buffer(buffer) { }
46
47        RateValue               rate;
48        const StreamSetBuffer * buffer;
49    };
50
51    using Graph = adjacency_list<vecS, vecS, bidirectionalS, const Kernel *, Channel, vecS>;
52
53    using Map = flat_map<const Kernel *, Graph::vertex_descriptor>;
54
55    void initialize(const std::vector<Kernel *> & kernels);
56
57    Value * executeKernel(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel, PHINode * const segNo, Value * const finished);
58
59protected:
60
61    Graph makeInputGraph(const std::vector<Kernel *> & kernels);
62
63    Graph makeOutputGraph(const std::vector<Kernel *> & kernels);
64
65    Graph printGraph(const bool input, Graph && G);
66
67    Graph pruneGraph(Graph && G);
68
69    void addChecks(Graph && G, CheckMap & M);
70
71    void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel);
72
73private:
74
75    CheckMap                            inputAvailabilityChecks;
76    CheckMap                            outputSpaceChecks;
77
78    StreamSetBufferMap<Value *>         producedItemCount;
79    StreamSetBufferMap<Value *>         consumedItemCount;
80    StreamSetBufferMap<const Kernel *>  lastConsumer;
81};
82
83/** ------------------------------------------------------------------------------------------------------------- *
84 * @brief generateSegmentParallelPipeline
85 *
86 * Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1)
87 * operating over an input stream set S, a segment-parallel implementation divides the input
88 * into segments and coordinates a set of T <= K threads to each process one segment at a time.
89 * Let S_0, S_1, ... S_N be the segments of S.   Segments are assigned to threads in a round-robin
90 * fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T.
91 ** ------------------------------------------------------------------------------------------------------------- */
92void generateSegmentParallelPipeline(const std::unique_ptr<KernelBuilder> & b, const std::vector<Kernel *> & kernels) {
93
94    const unsigned n = kernels.size();
95    Module * const m = b->getModule();
96    IntegerType * const sizeTy = b->getSizeTy();
97    PointerType * const voidPtrTy = b->getVoidPtrTy();
98    Constant * nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy);
99    std::vector<Type *> structTypes;
100    codegen::BufferSegments = std::max(codegen::BufferSegments, codegen::ThreadNum);
101
102    Value * instance[n];
103    for (unsigned i = 0; i < n; ++i) {
104        instance[i] = kernels[i]->getInstance();
105        structTypes.push_back(instance[i]->getType());
106    }
107    StructType * const sharedStructType = StructType::get(m->getContext(), structTypes);
108    StructType * const threadStructType = StructType::get(m->getContext(), {sharedStructType->getPointerTo(), sizeTy});
109
110    const auto ip = b->saveIP();
111
112    Function * const threadFunc = makeThreadFunction(b, "segment");
113    auto args = threadFunc->arg_begin();
114
115    // -------------------------------------------------------------------------------------------------------------------------
116    // MAKE SEGMENT PARALLEL PIPELINE THREAD
117    // -------------------------------------------------------------------------------------------------------------------------
118
119     // Create the basic blocks for the thread function.
120    BasicBlock * entryBlock = BasicBlock::Create(b->getContext(), "entry", threadFunc);
121    b->SetInsertPoint(entryBlock);
122
123    Value * const threadStruct = b->CreateBitCast(&*(args), threadStructType->getPointerTo());
124
125    Value * const sharedStatePtr = b->CreateLoad(b->CreateGEP(threadStruct, {b->getInt32(0), b->getInt32(0)}));
126    for (unsigned k = 0; k < n; ++k) {
127        Value * ptr = b->CreateLoad(b->CreateGEP(sharedStatePtr, {b->getInt32(0), b->getInt32(k)}));
128        kernels[k]->setInstance(ptr);
129    }
130    Value * const segOffset = b->CreateLoad(b->CreateGEP(threadStruct, {b->getInt32(0), b->getInt32(1)}));
131
132    PipelineGenerator G;
133
134    BasicBlock * const segmentLoop = b->CreateBasicBlock("segmentLoop");
135    b->CreateBr(segmentLoop);
136
137    b->SetInsertPoint(segmentLoop);
138    G.initialize(kernels);
139    PHINode * const segNo = b->CreatePHI(b->getSizeTy(), 2, "segNo");
140    segNo->addIncoming(segOffset, entryBlock);
141    Value * finished = nullptr;
142
143    Value * cycleCountStart = nullptr;
144    Value * cycleCountEnd = nullptr;
145    if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
146        cycleCountStart = b->CreateReadCycleCounter();
147    }
148
149    const bool serialize = codegen::DebugOptionIsSet(codegen::SerializeThreads);
150
151    for (unsigned k = 0; k < n; ++k) {
152
153        const Kernel * const kernel = kernels[k];
154
155        BasicBlock * const kernelWait = b->CreateBasicBlock(kernel->getName() + "Wait");
156        b->CreateBr(kernelWait);
157
158        b->SetInsertPoint(kernelWait);
159        b->setKernel(kernels[serialize ? (n - 1) : k]);
160        Value * const processedSegmentCount = b->acquireLogicalSegmentNo();
161        b->setKernel(kernel);
162        assert (processedSegmentCount->getType() == segNo->getType());
163        Value * const ready = b->CreateICmpEQ(segNo, processedSegmentCount);
164
165        BasicBlock * const kernelCheck = b->CreateBasicBlock(kernel->getName() + "Check");
166        b->CreateCondBr(ready, kernelCheck, kernelWait);
167
168        b->SetInsertPoint(kernelCheck);
169
170        finished = G.executeKernel(b, kernel, segNo, finished);
171
172        if (DebugOptionIsSet(codegen::EnableCycleCounter)) {
173            cycleCountEnd = b->CreateReadCycleCounter();
174            Value * counterPtr = b->getCycleCountPtr();
175            b->CreateStore(b->CreateAdd(b->CreateLoad(counterPtr), b->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
176            cycleCountStart = cycleCountEnd;
177        }
178
179        b->releaseLogicalSegmentNo(b->CreateAdd(segNo, b->getSize(1)));
180    }
181
182    segNo->addIncoming(b->CreateAdd(segNo, b->getSize(codegen::ThreadNum)), b->GetInsertBlock());
183
184    BasicBlock * const segmentExit = b->CreateBasicBlock("segmentExit");
185    b->CreateUnlikelyCondBr(finished, segmentExit, segmentLoop);
186
187    b->SetInsertPoint(segmentExit);
188
189    // only call pthread_exit() within spawned threads; otherwise it'll be equivalent to calling exit() within the process
190    BasicBlock * const exitThread = b->CreateBasicBlock("ExitThread");
191    BasicBlock * const exitFunction = b->CreateBasicBlock("ExitProcessFunction");
192
193    b->CreateCondBr(b->CreateIsNull(segOffset), exitFunction, exitThread);
194    b->SetInsertPoint(exitThread);
195    b->CreatePThreadExitCall(nullVoidPtrVal);
196    b->CreateBr(exitFunction);
197    b->SetInsertPoint(exitFunction);
198    b->CreateRetVoid();
199
200    // -------------------------------------------------------------------------------------------------------------------------
201    b->restoreIP(ip);
202
203    for (unsigned i = 0; i < n; ++i) {
204        kernels[i]->setInstance(instance[i]);
205    }
206
207    // -------------------------------------------------------------------------------------------------------------------------
208    // MAKE SEGMENT PARALLEL PIPELINE DRIVER
209    // -------------------------------------------------------------------------------------------------------------------------
210    const unsigned threads = codegen::ThreadNum - 1;
211    assert (codegen::ThreadNum > 0);
212    Type * const pthreadsTy = ArrayType::get(sizeTy, threads);
213    AllocaInst * const pthreads = b->CreateAlloca(pthreadsTy);
214    Value * threadIdPtr[threads];
215
216    for (unsigned i = 0; i < threads; ++i) {
217        threadIdPtr[i] = b->CreateGEP(pthreads, {b->getInt32(0), b->getInt32(i)});
218    }
219
220    for (unsigned i = 0; i < n; ++i) {
221        b->setKernel(kernels[i]);
222        b->releaseLogicalSegmentNo(b->getSize(0));
223    }
224
225    AllocaInst * const sharedStruct = b->CreateCacheAlignedAlloca(sharedStructType);
226    for (unsigned i = 0; i < n; ++i) {
227        Value * ptr = b->CreateGEP(sharedStruct, {b->getInt32(0), b->getInt32(i)});
228        b->CreateStore(kernels[i]->getInstance(), ptr);
229    }
230
231    // use the process thread to handle the initial segment function after spawning (n - 1) threads to handle the subsequent offsets
232    for (unsigned i = 0; i < threads; ++i) {
233        AllocaInst * const threadState = b->CreateAlloca(threadStructType);
234        b->CreateStore(sharedStruct, b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(0)}));
235        b->CreateStore(b->getSize(i + 1), b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(1)}));
236        b->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, threadFunc, threadState);
237    }
238
239    AllocaInst * const threadState = b->CreateAlloca(threadStructType);
240    b->CreateStore(sharedStruct, b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(0)}));
241    b->CreateStore(b->getSize(0), b->CreateGEP(threadState, {b->getInt32(0), b->getInt32(1)}));
242    b->CreateCall(threadFunc, b->CreatePointerCast(threadState, voidPtrTy));
243
244    AllocaInst * const status = b->CreateAlloca(voidPtrTy);
245    for (unsigned i = 0; i < threads; ++i) {
246        Value * threadId = b->CreateLoad(threadIdPtr[i]);
247        b->CreatePThreadJoinCall(threadId, status);
248    }
249   
250    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
251        for (const Kernel * kernel : kernels) {
252            b->setKernel(kernel);
253            const auto & inputs = kernel->getStreamInputs();
254            const auto & outputs = kernel->getStreamOutputs();
255            Value * items = nullptr;
256            if (inputs.empty()) {
257                items = b->getProducedItemCount(outputs[0].getName());
258            } else {
259                items = b->getProcessedItemCount(inputs[0].getName());
260            }
261            Value * fItems = b->CreateUIToFP(items, b->getDoubleTy());
262            Value * cycles = b->CreateLoad(b->getCycleCountPtr());
263            Value * fCycles = b->CreateUIToFP(cycles, b->getDoubleTy());
264            const auto formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
265            Value * stringPtr = b->CreatePointerCast(b->GetString(formatString), b->getInt8PtrTy());
266            b->CreateCall(b->GetDprintf(), {b->getInt32(2), stringPtr, fItems, fCycles, b->CreateFDiv(fCycles, fItems)});
267        }
268    }
269   
270}
271
272
273/** ------------------------------------------------------------------------------------------------------------- *
274 * @brief generatePipelineLoop
275 ** ------------------------------------------------------------------------------------------------------------- */
276void generatePipelineLoop(const std::unique_ptr<KernelBuilder> & b, const std::vector<Kernel *> & kernels) {
277
278    // Create the basic blocks for the loop.
279    BasicBlock * const entryBlock = b->GetInsertBlock();
280    BasicBlock * const pipelineLoop = b->CreateBasicBlock("pipelineLoop");
281    BasicBlock * const pipelineExit = b->CreateBasicBlock("pipelineExit");
282
283    PipelineGenerator G;
284
285    b->CreateBr(pipelineLoop);
286
287    b->SetInsertPoint(pipelineLoop);
288    G.initialize(kernels);
289    PHINode * const segNo = b->CreatePHI(b->getSizeTy(), 2, "segNo");
290    segNo->addIncoming(b->getSize(0), entryBlock);
291    Value * finished = nullptr;
292
293    Value * cycleCountStart = nullptr;
294    Value * cycleCountEnd = nullptr;
295    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
296        cycleCountStart = b->CreateReadCycleCounter();
297    }
298
299    for (Kernel * const kernel : kernels) {
300
301        b->setKernel(kernel);
302
303        finished = G.executeKernel(b, kernel, segNo, finished);
304
305        if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
306            cycleCountEnd = b->CreateReadCycleCounter();
307            Value * counterPtr = b->getCycleCountPtr();
308            b->CreateStore(b->CreateAdd(b->CreateLoad(counterPtr), b->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr);
309            cycleCountStart = cycleCountEnd;
310        }
311    }
312
313    segNo->addIncoming(b->CreateAdd(segNo, b->getSize(1)), b->GetInsertBlock());
314    b->CreateCondBr(finished, pipelineExit, pipelineLoop);
315
316    pipelineExit->moveAfter(b->GetInsertBlock());
317
318    b->SetInsertPoint(pipelineExit);
319
320    if (LLVM_UNLIKELY(DebugOptionIsSet(codegen::EnableCycleCounter))) {
321        for (unsigned k = 0; k < kernels.size(); k++) {
322            auto & kernel = kernels[k];
323            b->setKernel(kernel);
324            const auto & inputs = kernel->getStreamInputs();
325            const auto & outputs = kernel->getStreamOutputs();
326            Value * items = nullptr;
327            if (inputs.empty()) {
328                items = b->getProducedItemCount(outputs[0].getName());
329            } else {
330                items = b->getProcessedItemCount(inputs[0].getName());
331            }
332            Value * fItems = b->CreateUIToFP(items, b->getDoubleTy());
333            Value * cycles = b->CreateLoad(b->getCycleCountPtr());
334            Value * fCycles = b->CreateUIToFP(cycles, b->getDoubleTy());
335            const auto formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles,  %6.2f cycles per item.\n";
336            Value * stringPtr = b->CreatePointerCast(b->GetString(formatString), b->getInt8PtrTy());
337            b->CreateCall(b->GetDprintf(), {b->getInt32(2), stringPtr, fItems, fCycles, b->CreateFDiv(fCycles, fItems)});
338        }
339    }
340
341}
342
343/** ------------------------------------------------------------------------------------------------------------- *
344 * @brief initialize
345 ** ------------------------------------------------------------------------------------------------------------- */
346void PipelineGenerator::initialize(const std::vector<Kernel *> & kernels) {
347
348    // Our goal when building G is *not* to model the dataflow of our program but instead to
349    // detetermine the minimum number of sufficient data tests needed to ensure each kernel has
350    // enough data to progress.
351
352    // For example, suppose we have kernels A, B and C, and that B has a fixed input and fixed
353    // output rate. C also has a fixed input rate but A does *not* have a fixed output rate.
354    // C must test whether it has enough input from B as B is not guaranteed to have enough
355    // input from A. Moreover if C is depedent on B, C could be skipped entirely.
356
357    // Note: we cannot simply test the output of A for both B and C. In the data-parallel
358    // pipeline, A's state may change by the time we process C.
359
360//    addChecks(printGraph(true, pruneGraph(printGraph(true, makeInputGraph(kernels)))), inputAvailabilityChecks);
361//    addChecks(printGraph(false, pruneGraph(printGraph(false, makeOutputGraph(kernels)))), outputSpaceChecks);
362
363    addChecks(pruneGraph(makeInputGraph(kernels)), inputAvailabilityChecks);
364    addChecks(pruneGraph(makeOutputGraph(kernels)), outputSpaceChecks);
365
366
367    // iterate through each kernel in order and determine which kernel last used a particular buffer
368    for (Kernel * const kernel : kernels) {
369        const auto & inputs = kernel->getStreamInputs();
370        for (unsigned i = 0; i < inputs.size(); ++i) {
371            lastConsumer[kernel->getStreamSetInputBuffer(i)] = kernel;
372        }
373    }
374
375}
376
377/** ------------------------------------------------------------------------------------------------------------- *
378 * @brief makeInputGraph
379 *
380 * The input graph models whether a kernel could *consume* more data than may be produced by a preceeding kernel.
381 ** ------------------------------------------------------------------------------------------------------------- */
382PipelineGenerator::Graph PipelineGenerator::makeInputGraph(const std::vector<Kernel *> & kernels) {
383
384    const auto n = kernels.size();
385    Graph   G(n);
386    Map     M;
387
388    for (Graph::vertex_descriptor v = 0; v < n; ++v) {
389
390        const Kernel * const consumer = kernels[v];
391        M.emplace(consumer, v);
392        G[v] = consumer;
393
394        const auto & inputs = consumer->getStreamInputs();
395        for (unsigned i = 0; i < inputs.size(); ++i) {
396
397            const Binding & input = inputs[i];
398            auto ub_in = consumer->getUpperBound(input.getRate()) * consumer->getStride(); assert (ub_in > 0);
399            if (input.hasLookahead()) {
400                ub_in += input.getLookahead();
401            }
402
403            const auto buffer = consumer->getStreamSetInputBuffer(i);
404            const Kernel * const producer = buffer->getProducer();
405            const Binding & output = producer->getStreamOutput(buffer);
406            const auto lb_out = producer->getLowerBound(output.getRate()) * producer->getStride();
407
408            const auto rate = lb_out / ub_in;
409            const auto f = M.find(producer); assert (f != M.end());
410            const auto u = f->second;
411            // If we have multiple inputs from the same kernel, we only need to consider the "slowest" one
412            bool slowest = true;
413            if (lb_out > 0) {
414                for (const auto e : make_iterator_range(in_edges(v, G))) {
415                    if (source(e, G) == u) {
416                        Channel & p = G[e];
417                        slowest = false;
418                        if (rate < p.rate) {
419                            p.rate = rate;
420                            p.buffer = buffer;
421                        }
422                        break;
423                    }
424                }
425            }
426            if (slowest) {
427                add_edge(u, v, Channel{rate, buffer}, G);
428            }
429        }
430    }
431    return G;
432}
433
434/** ------------------------------------------------------------------------------------------------------------- *
435 * @brief makeOutputGraph
436 *
437 * The output graph models whether a kernel could *produce* more data than may be consumed by a subsequent kernel.
438 ** ------------------------------------------------------------------------------------------------------------- */
439PipelineGenerator::Graph PipelineGenerator::makeOutputGraph(const std::vector<Kernel *> & kernels) {
440
441    const auto n = kernels.size();
442    Graph   G(n);
443    Map     M;
444
445    for (Graph::vertex_descriptor i = 0; i < n; ++i) {
446        const Kernel * const consumer = kernels[i];
447        const auto v = n - i - 1;
448        M.emplace(consumer, v);
449        G[v] = consumer;
450
451        const auto & inputs = consumer->getStreamInputs();
452        for (unsigned i = 0; i < inputs.size(); ++i) {
453            const auto buffer = consumer->getStreamSetInputBuffer(i);
454            if (isa<SourceBuffer>(buffer)) continue;
455            const Kernel * const producer = buffer->getProducer();
456            const Binding & output = producer->getStreamOutput(buffer);
457            auto ub_out = producer->getUpperBound(output.getRate()) * producer->getStride();
458            if (LLVM_UNLIKELY(ub_out > 0)) { // unknown output rates are handled by reallocating their buffers
459                const Binding & input = inputs[i];
460                if (input.hasLookahead()) {
461                    ub_out -= input.getLookahead();
462                }
463                const auto lb_in = consumer->getLowerBound(input.getRate()) * consumer->getStride();
464                const auto inverseRate = lb_in / ub_out;
465                const auto f = M.find(producer); assert (f != M.end());
466                const auto u = f->second;
467                // If we have multiple inputs from the same kernel, we only need to consider the "fastest" one
468                bool fastest = true;
469                if (ub_out > 0) {
470                    for (const auto e : make_iterator_range(in_edges(v, G))) {
471                        if (source(e, G) == u) {
472                            Channel & p = G[e];
473                            fastest = false;
474                            if (inverseRate < p.rate) {
475                                p.rate = inverseRate;
476                                p.buffer = buffer;
477                            }
478                            break;
479                        }
480                    }
481                }
482                if (fastest) {
483                    add_edge(v, u, Channel{inverseRate, buffer}, G);
484                }
485            }
486        }
487    }
488    return G;
489}
490
491/** ------------------------------------------------------------------------------------------------------------- *
492 * @brief printGraph
493 ** ------------------------------------------------------------------------------------------------------------- */
494PipelineGenerator::Graph PipelineGenerator::printGraph(const bool input, Graph && G) {
495
496    auto & out = errs();
497
498    out << "digraph " << (input ? "I" : "O") << " {\n";
499    for (auto u : make_iterator_range(vertices(G))) {
500        out << "v" << u << " [label=\"" << u << ": "
501            << G[u]->getName() << "\"];\n";
502    }
503
504    for (auto e : make_iterator_range(edges(G))) {
505        const Channel & c = G[e];
506        const auto s = source(e, G);
507        const auto t = target(e, G);
508        const Kernel * const S = G[input ? s : t];
509        const Kernel * const T = G[input ? t : s];
510
511        out << "v" << s << " -> v" << t
512            << " [label=\"";
513
514        if (c.buffer) {
515            out << S->getStreamOutput(c.buffer).getName()
516                << " -> "
517                << T->getStreamInput(c.buffer).getName()
518                << "   ";
519        }
520
521        out << c.rate.numerator() << " / " << c.rate.denominator()
522            << "\"];\n";
523    }
524
525    out << "}\n\n";
526    out.flush();
527
528    return G;
529}
530
531/** ------------------------------------------------------------------------------------------------------------- *
532 * @brief pruneGraph
533 ** ------------------------------------------------------------------------------------------------------------- */
534PipelineGenerator::Graph PipelineGenerator::pruneGraph(Graph && G) {
535
536    // Take a transitive closure of G but whenever we attempt to insert an edge into the closure
537    // that already exists, check instead whether the rate of our proposed edge is <= the existing
538    // edge's rate. If so, the data availability/consumption is transitively guaranteed.
539    for (const auto u : make_iterator_range(vertices(G))) {
540        for (auto ei : make_iterator_range(in_edges(u, G))) {
541            const auto v = source(ei, G);
542            const Channel & pu = G[ei];
543            for (auto ej : make_iterator_range(out_edges(u, G))) {
544                const auto w = target(ej, G);
545                const auto ratio = RateValue(G[u]->getStride(), G[w]->getStride());
546                const auto rate = pu.rate * ratio;
547                bool insert = true;
548                for (auto ek : make_iterator_range(in_edges(w, G))) {
549                    if (source(ek, G) == v) {
550                        Channel & pw = G[ek];
551                        if (rate <= pw.rate) {
552                            pw.buffer = nullptr;
553                        }
554                        insert = false;
555                        break;
556                    }
557                }
558                if (insert) {
559                    add_edge(v, w, Channel{rate, nullptr}, G);
560                }
561            }
562        }
563    }
564
565    // remove any closure edges from G
566    remove_edge_if([&G](const Graph::edge_descriptor e) { return G[e].buffer == nullptr; }, G);
567
568    // For any kernel, if we do not need to check any of its inputs, we can avoid checking any of its
569    // outputs that have a rate >= 1 (i.e., its production rates >= consumption rates.)
570    for (const auto u : make_iterator_range(vertices(G))) {
571        if (in_degree(u, G) == 0) {
572            remove_out_edge_if(u, [&G](const Graph::edge_descriptor e) { return G[e].rate >= RateValue{1, 1}; }, G);
573        }
574    }
575
576    return G;
577}
578
579/** ------------------------------------------------------------------------------------------------------------- *
580 * @brief addChecks
581 ** ------------------------------------------------------------------------------------------------------------- */
582void PipelineGenerator::addChecks(Graph && G, CheckMap & M) {
583    for (const auto u : make_iterator_range(vertices(G))) {
584        if (LLVM_LIKELY(in_degree(u, G) == 0)) continue;
585        flat_set<const StreamSetBuffer *> B;
586        for (auto e : make_iterator_range(in_edges(u, G))) {
587            B.insert(G[e].buffer);
588        }
589        M.emplace(G[u], std::vector<const StreamSetBuffer *>{B.begin(), B.end()});
590    }
591}
592
593/** ------------------------------------------------------------------------------------------------------------- *
594 * @brief executeKernel
595 ** ------------------------------------------------------------------------------------------------------------- */
596Value * PipelineGenerator::executeKernel(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel, PHINode * const segNo, Value * const finished) {
597
598    const auto & inputs = kernel->getStreamInputs();
599
600    std::vector<Value *> args(2 + inputs.size());
601
602    BasicBlock * const kernelEntry = b->GetInsertBlock();
603    BasicBlock * const kernelCode = b->CreateBasicBlock(kernel->getName());
604    BasicBlock * const kernelFinished = b->CreateBasicBlock(kernel->getName() + "Finished");
605    BasicBlock * const kernelExit = b->CreateBasicBlock(kernel->getName() + "Exit");
606
607    b->CreateUnlikelyCondBr(b->getTerminationSignal(), kernelExit, kernelCode);
608
609    b->SetInsertPoint(kernelFinished);
610    PHINode * const final = b->CreatePHI(b->getInt1Ty(), 2);
611
612    b->SetInsertPoint(kernelExit);
613    PHINode * const terminated = b->CreatePHI(b->getInt1Ty(), 2);
614    // The initial "isFinal" state is equal to the first kernel's termination signal state
615    terminated->addIncoming(finished ? finished : b->getTrue(), kernelEntry);
616    Value * isFinal = finished ? finished : b->getFalse();
617
618    // Since it is possible that a sole consumer of some stream could terminate early, set the
619    // initial consumed amount to the amount produced in this iteration.
620    std::vector<PHINode *> consumedItemCountPhi(inputs.size());
621    std::vector<Value *> priorConsumedItemCount(inputs.size());
622    for (unsigned i = 0; i < inputs.size(); ++i) {
623        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
624        auto c = consumedItemCount.find(buffer);
625        PHINode * const consumedPhi = b->CreatePHI(b->getSizeTy(), 2);
626        Value * consumed = nullptr;
627        if (c == consumedItemCount.end()) {
628            const auto p = producedItemCount.find(buffer);
629            assert (p != producedItemCount.end());
630            consumed = p->second;
631            consumedItemCount.emplace(buffer, consumedPhi);
632        } else {
633            consumed = c->second;
634            c->second = consumedPhi;
635        }
636        consumedPhi->addIncoming(consumed, kernelEntry);
637        consumedItemCountPhi[i] = consumedPhi;
638        priorConsumedItemCount[i] = consumed;
639    }
640
641    b->SetInsertPoint(kernelCode);
642
643    // Check for sufficient output space
644    const auto O = outputSpaceChecks.find(kernel);
645    if (O != outputSpaceChecks.end()) {
646        for (const StreamSetBuffer * buffer : O->second) {
647
648
649            const Binding & output = kernel->getStreamOutput(buffer);
650            const auto name = output.getName();
651            BasicBlock * const sufficient = b->CreateBasicBlock(name + "HasOutputSpace");
652            const auto ub = kernel->getUpperBound(output.getRate()); assert (ub > 0);
653            Constant * const strideLength = b->getSize(ceiling(ub * kernel->getStride()));
654            Value * const produced = b->getProducedItemCount(name);
655            Value * const consumed = b->getConsumedItemCount(name);
656            Value * const unused = b->CreateSub(produced, consumed);
657            Value * const potentialData = b->CreateAdd(unused, strideLength);
658            Value * const capacity = b->getBufferedSize(name);
659
660          //  b->CallPrintInt("< " + kernel->getName() + "_" + name + "_potential", potentialData);
661          //  b->CallPrintInt("< " + kernel->getName() + "_" + name + "_capacity", capacity);
662
663            Value * const hasSufficientSpace = b->CreateICmpULE(potentialData, capacity);
664
665          //  b->CallPrintInt("* < " + kernel->getName() + "_" + name + "_sufficientSpace", hasSufficientSpace);
666
667            final->addIncoming(b->getFalse(), b->GetInsertBlock());
668            b->CreateLikelyCondBr(hasSufficientSpace, sufficient, kernelFinished);
669            b->SetInsertPoint(sufficient);
670        }
671    }
672
673    for (unsigned i = 0; i < inputs.size(); ++i) {
674        const Binding & input = inputs[i];
675        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
676        const auto name = input.getName();
677
678        const auto p = producedItemCount.find(buffer);
679        if (LLVM_UNLIKELY(p == producedItemCount.end())) {
680            report_fatal_error(kernel->getName() + " uses stream set " + name + " prior to its definition");
681        }
682        Value * const produced = p->second;
683
684      //  b->CallPrintInt("< " + kernel->getName() + "_" + name + "_produced", produced);
685
686
687        const auto ub = kernel->getUpperBound(input.getRate()); assert (ub > 0);
688        const auto strideLength = ceiling(ub * kernel->getStride()) ;
689        Constant * const segmentLength = b->getSize(strideLength * codegen::SegmentSize);
690
691        if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts) && !isa<SourceBuffer>(buffer))) {
692            b->CreateAssert(b->CreateICmpULE(segmentLength, b->getCapacity(name)),
693                            kernel->getName() + ": " + name + " upper bound of segment length exceeds buffer capacity");
694        }
695
696//        Value * limit = nullptr;
697//        if (input.getRate().isFixed()) {
698//            // if the input is deferred, simply adding length to the processed item count may result in setting a limit
699//            // that is too low for. instead just calculate the limit of all fixed rates from the segment no.
700//            limit = b->CreateMul(b->CreateAdd(segNo, b->getSize(1)), segmentLength);
701//        } else {
702//            Value * const processed = b->getProcessedItemCount(name);
703//            limit = b->CreateAdd(processed, segmentLength);
704//        }
705
706        // if the input is deferred, simply adding length to the processed item count may result in setting a limit
707        // that is too low for. instead just calculate the limit of all fixed rates from the segment no.
708        Value * const limit = b->CreateMul(b->CreateAdd(segNo, b->getSize(1)), segmentLength);
709
710     //  b->CallPrintInt("< " + kernel->getName() + "_" + name + "_limit", limit);
711
712        // TODO: currently, if we produce the exact amount as our limit states, we will have to process one additional segment
713        // before we can consider this kernel finished. We ought to be able to avoid doing in some cases but need to prove its
714        // always safe to do so.
715
716        Value * const consumingAll = b->CreateICmpULT(produced, limit);
717        args[i + 2] = b->CreateSelect(consumingAll, produced, limit);
718        isFinal = b->CreateAnd(isFinal, consumingAll);
719    }
720
721    // Check for available input
722    const auto I = inputAvailabilityChecks.find(kernel);
723    if (I != inputAvailabilityChecks.end()) {
724        for (const StreamSetBuffer * buffer : I->second) {
725            const Binding & input = kernel->getStreamInput(buffer);
726            const auto name = input.getName();
727            BasicBlock * const sufficient = b->CreateBasicBlock(name + "HasInputData");
728
729            Constant * strideLength = nullptr;
730            if (LLVM_UNLIKELY(input.hasAttribute(kernel::Attribute::KindId::AlwaysConsume))) {
731                const auto lb = kernel->getLowerBound(input.getRate());
732                strideLength = b->getSize(ceiling(lb * kernel->getStride()));
733            } else {
734                const auto ub = kernel->getUpperBound(input.getRate()); assert (ub > 0);
735                strideLength = b->getSize(ceiling(ub * kernel->getStride()) - 1);
736            }
737
738            if (input.isConstantStrideLengthOne()) {
739                // TODO workaround here
740                strideLength = b->getSize(1);
741            }
742
743            Value * const processed = b->getProcessedItemCount(name);
744//            if (input.getRate().isFixed()) {
745//                processed = b->CreateMul(segNo, strideLength);
746//            } else {
747//                processed = b->getProcessedItemCount(name);
748//            }
749            const auto p = producedItemCount.find(buffer);
750            assert (p != producedItemCount.end());
751            Value * const produced = p->second;
752            Value * const unprocessed = b->CreateSub(produced, processed);
753
754          //  b->CallPrintInt("< " + kernel->getName() + "_" + name + "_unprocessed", unprocessed);
755
756            Value * const hasSufficientData = input.isConstantStrideLengthOne() ?
757                                              b->CreateOr(b->CreateICmpUGE(unprocessed, strideLength), isFinal) :
758                                              b->CreateOr(b->CreateICmpUGT(unprocessed, strideLength), isFinal);
759//            Value * const hasSufficientData = b->CreateOr(b->CreateICmpUGT(unprocessed, strideLength), isFinal);
760//            Value * const hasSufficientData = b->CreateOr(b->CreateICmpUGE(unprocessed, strideLength), isFinal);
761
762          //  b->CallPrintInt("* < " + kernel->getName() + "_" + name + "_sufficientData", hasSufficientData);
763
764            final->addIncoming(b->getFalse(), b->GetInsertBlock());
765            b->CreateLikelyCondBr(hasSufficientData, sufficient, kernelFinished);
766            b->SetInsertPoint(sufficient);
767        }
768    }
769
770    applyOutputBufferExpansions(b, kernel);
771
772    args[0] = kernel->getInstance();
773    args[1] = isFinal;
774
775    b->createDoSegmentCall(args);
776
777    if (kernel->hasAttribute(kernel::Attribute::KindId::MustExplicitlyTerminate)) {
778        isFinal = b->getTerminationSignal();
779    } else {
780        if (kernel->hasAttribute(kernel::Attribute::KindId::CanTerminateEarly)) {
781            isFinal = b->CreateOr(isFinal, b->getTerminationSignal());
782        }
783        b->setTerminationSignal(isFinal);
784    }
785
786  //  b->CallPrintInt(kernel->getName() + "_finished", isFinal);
787    final->addIncoming(isFinal, b->GetInsertBlock());
788    b->CreateBr(kernelFinished);
789
790    b->SetInsertPoint(kernelFinished);
791
792    // update the consumed item counts
793    for (unsigned i = 0; i < inputs.size(); ++i) {
794        Value * const processed = b->getProcessedItemCount(inputs[i].getName());
795      //  b->CallPrintInt("> " + kernel->getName() + "_" + inputs[i].getName() + "_processed", processed);
796        Value * const consumed = b->CreateUMin(priorConsumedItemCount[i], processed);
797        consumedItemCountPhi[i]->addIncoming(consumed, kernelFinished);
798    }
799    b->CreateBr(kernelExit);
800
801    kernelExit->moveAfter(kernelFinished);
802
803    b->SetInsertPoint(kernelExit);
804    terminated->addIncoming(final, kernelFinished);
805
806
807    // If this kernel is the last consumer of a input buffer, update the consumed count for that buffer.
808    // NOTE: unless we can prove that this kernel cannot terminate before any prior consumer, we cannot
809    // put this code into the kernelFinished block.
810    for (unsigned i = 0; i < inputs.size(); ++i) {
811        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
812        const auto c = lastConsumer.find(buffer);
813        assert (c != lastConsumer.end());
814        if (c->second == kernel) {
815            Kernel * const producer = buffer->getProducer();
816            const auto & output = producer->getStreamOutput(buffer);
817            if (output.getRate().isRelative()) continue;
818
819           // b->CallPrintInt("* " + producer->getName() + "_" + output.getName() + "_consumed", consumedItemCountPhi[i]);
820
821            b->setKernel(producer);
822            if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
823                Value * const alreadyConsumed = b->getConsumedItemCount(output.getName());
824                b->CreateAssert(b->CreateICmpULE(alreadyConsumed, consumedItemCountPhi[i]),
825                                producer->getName() + ": " + output.getName() + " consumed item count is not monotonically non-decreasing!");
826            }
827            b->setConsumedItemCount(output.getName(), consumedItemCountPhi[i]);
828            b->setKernel(kernel);
829        }
830    }
831
832    const auto & outputs = kernel->getStreamOutputs();
833    for (unsigned i = 0; i < outputs.size(); ++i) {
834        Value * const produced = b->getProducedItemCount(outputs[i].getName());
835
836       // b->CallPrintInt("> " + kernel->getName() + "_" + outputs[i].getName() + "_produced", produced);
837
838        const StreamSetBuffer * const buffer = kernel->getStreamSetOutputBuffer(i);
839        assert (producedItemCount.count(buffer) == 0);
840        producedItemCount.emplace(buffer, produced);
841    }
842
843    return terminated;
844}
845
846
847/** ------------------------------------------------------------------------------------------------------------- *
848 * @brief applyOutputBufferExpansions
849 ** ------------------------------------------------------------------------------------------------------------- */
850void PipelineGenerator::applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * k) {
851    const auto & outputs = k->getStreamSetOutputBuffers();
852    for (unsigned i = 0; i < outputs.size(); i++) {
853        if (isa<DynamicBuffer>(outputs[i])) {
854            const auto baseSize = ceiling(k->getUpperBound(k->getStreamOutput(i).getRate()) * k->getStride() * codegen::SegmentSize);
855            if (LLVM_LIKELY(baseSize > 0)) {
856
857                const auto & name = k->getStreamOutput(i).getName();
858
859                BasicBlock * const doExpand = b->CreateBasicBlock(name + "Expand");
860                BasicBlock * const nextBlock = b->GetInsertBlock()->getNextNode();
861                doExpand->moveAfter(b->GetInsertBlock());
862                BasicBlock * const bufferReady = b->CreateBasicBlock(name + "Ready");
863                bufferReady->moveAfter(doExpand);
864                if (nextBlock) nextBlock->moveAfter(bufferReady);
865
866                Value * const produced = b->getProducedItemCount(name);
867                Value * const consumed = b->getConsumedItemCount(name);
868                Value * const required = b->CreateAdd(b->CreateSub(produced, consumed), b->getSize(2 * baseSize));
869
870                b->CreateCondBr(b->CreateICmpUGT(required, b->getBufferedSize(name)), doExpand, bufferReady);
871                b->SetInsertPoint(doExpand);
872
873                b->doubleCapacity(name);
874                // Ensure that capacity is sufficient by successive doubling, if necessary.
875                b->CreateCondBr(b->CreateICmpUGT(required, b->getBufferedSize(name)), doExpand, bufferReady);
876
877                b->SetInsertPoint(bufferReady);
878
879            }
880        }
881    }
882}
883
Note: See TracBrowser for help on using the repository browser.