Ignore:
Timestamp:
Apr 28, 2018, 3:54:43 PM (16 months ago)
Author:
nmedfort
Message:

Added temporary buffer functionality to the pipeline for single stream source buffers. Fixed memory leak from UCD::UnicodeBreakRE()

Location:
icGREP/icgrep-devel/icgrep/toolchain
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • icGREP/icgrep-devel/icgrep/toolchain/driver.cpp

    r5755 r5998  
    2323    }
    2424}
     25
     26Driver::~Driver() {}
  • icGREP/icgrep-devel/icgrep/toolchain/driver.h

    r5856 r5998  
    1717public:
    1818    Driver(std::string && moduleName);
    19 
    20     virtual ~Driver() = default;
    2119
    2220    const std::unique_ptr<kernel::KernelBuilder> & getBuilder() {
     
    5856    virtual void performIncrementalCacheCleanupStep() = 0;
    5957
     58    virtual ~Driver() = 0;
     59
    6060protected:
    6161
  • icGREP/icgrep-devel/icgrep/toolchain/pipeline.cpp

    r5996 r5998  
    8484    ChannelGraph pruneGraph(ChannelGraph && G, VertexList && V) const;
    8585
    86     void checkIfAllInputKernelsAreTerminated(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
     86    bool isPotentiallyUnsafeInputBuffer(const StreamSetBuffer * const buffer);
     87
     88    void allocateTemporaryBufferPointerArray(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel);
     89
     90    void checkIfAllInputKernelsHaveFinished(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
    8791
    8892    void checkAvailableInputData(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
     
    9498    Value * callKernel(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
    9599
    96     void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const unsigned index);
     100    void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel *kernel);
     101
     102    void runKernel(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel);
     103
     104    void allocateTemporaryBuffers(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel, Value * const requiresTemporaryBuffers);
     105
     106    void freeTemporaryBuffers(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel, Value * const requiresTemporaryBuffers);
    97107
    98108    Value * getFullyProcessedItemCount(const std::unique_ptr<KernelBuilder> & b, const Binding & binding, Value * const final) const;
     
    112122    ChannelGraph                        inputGraph;
    113123    ChannelGraph                        outputGraph;
     124
     125    std::vector<Value *>                temporaryBufferPtrs;
    114126
    115127    BasicBlock *                        kernelFinished;
     
    709721    b->SetInsertPoint(kernelCode);
    710722
    711     checkIfAllInputKernelsAreTerminated(b, index);
    712 
    713     checkAvailableInputData(b, index);
    714 
    715     checkAvailableOutputSpace(b, index);
    716 
    717     applyOutputBufferExpansions(b, index);
    718 
    719723    Value * const finalStride = callKernel(b, index);
    720724
     
    765769
    766770
     771    // If this kernel is the last consumer of a input buffer, update the consumed count for that buffer.
     772
    767773    // TODO: if all consumers process the data at a fixed rate, we can just set the consumed item count
    768774    // by the strideNo rather than tracking it.
    769775
    770 
    771     // If this kernel is the last consumer of a input buffer, update the consumed count for that buffer.
    772     // NOTE: unless we can prove that this kernel cannot terminate before any prior consumer, we cannot
    773     // put this code into the kernelFinished block.
     776    // TODO: a kernel could take the same stream set for multiple arguments.
     777
     778    // TODO: if we can prove that this kernel cannot terminate before any prior consumer, this code
     779    // could be executed in kernelFinished block.
    774780    for (unsigned i = 0; i < inputs.size(); ++i) {
    775781        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
     
    782788            if (output.getRate().isRelative()) continue;
    783789            b->setKernel(producer);
    784             if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
    785                 Value * const alreadyConsumed = b->getConsumedItemCount(output.getName());
    786                 b->CreateAssert(b->CreateICmpULE(alreadyConsumed, consumedItemCountPhi[i]),
    787                                 producer->getName() + ": " + output.getName() + " consumed item count is not monotonically non-decreasing!");
    788             }
    789790            b->setConsumedItemCount(output.getName(), consumedItemCountPhi[i]);
    790791            b->setKernel(kernel);
     
    796797
    797798/** ------------------------------------------------------------------------------------------------------------- *
    798  * @brief checkAvailableInputData
    799  ** ------------------------------------------------------------------------------------------------------------- */
    800 void PipelineGenerator::checkIfAllInputKernelsAreTerminated(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
     799 * @brief checkIfAllInputKernelsHaveFinished
     800 ** ------------------------------------------------------------------------------------------------------------- */
     801void PipelineGenerator::checkIfAllInputKernelsHaveFinished(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
    801802    const auto n = in_degree(index, dependencyGraph);
    802803    if (LLVM_UNLIKELY(n == 0)) {
     
    807808            const auto u = source(e, dependencyGraph);
    808809            Value * const finished = dependencyGraph[u];
    809             //b->CallPrintInt("* " + kernels[u]->getName() + "_hasFinished", finished);
    810810            noMore = b->CreateAnd(noMore, finished);
    811811        }
     
    818818void PipelineGenerator::checkAvailableInputData(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
    819819    const Kernel * const kernel = kernels[index];
    820     b->setKernel(kernel);
     820    b->setKernel(kernel);   
    821821    for (auto e : make_iterator_range(in_edges(index, inputGraph))) {
    822822        const Channel & c = inputGraph[e];
     
    834834        Value * const unprocessed = b->CreateSub(produced, processed);
    835835        Value * const hasEnough = b->CreateICmpUGE(unprocessed, requiredInput);
    836         Value * const check = b->CreateOr(hasEnough, noMore);
    837836        terminated->addIncoming(b->getFalse(), b->GetInsertBlock());
    838837        if (LLVM_UNLIKELY(codegen::DebugOptionIsSet(codegen::EnableAsserts))) {
    839838            madeProgress->addIncoming(anyProgress, b->GetInsertBlock());
    840839        }
    841         BasicBlock * const hasSufficientInput = b->CreateBasicBlock(kernel->getName() + "_" + input.getName() + "_hasSufficientInput");
     840        const auto prefix = kernel->getName() + "_" + input.getName();
     841        BasicBlock * const hasSufficientInput = b->CreateBasicBlock(prefix + "_hasSufficientInput");
     842        Value * const check = b->CreateOr(hasEnough, noMore);
    842843        b->CreateLikelyCondBr(check, hasSufficientInput, kernelFinished);
    843844        b->SetInsertPoint(hasSufficientInput);
     
    868869            madeProgress->addIncoming(anyProgress, b->GetInsertBlock());
    869870        }
    870         BasicBlock * const hasOutputSpace = b->CreateBasicBlock(kernel->getName() + "_" + name + "_hasOutputSpace");
     871        const auto prefix = kernel->getName() + "_" + name;
     872        BasicBlock * const hasOutputSpace = b->CreateBasicBlock(prefix + "_hasOutputSpace");
    871873        b->CreateLikelyCondBr(check, hasOutputSpace, kernelFinished);
    872874        b->SetInsertPoint(hasOutputSpace);
     
    907909    b->setKernel(kernel);
    908910
     911    checkIfAllInputKernelsHaveFinished(b, index);
     912
     913    checkAvailableInputData(b, index);
     914
     915    checkAvailableOutputSpace(b, index);
     916
     917    applyOutputBufferExpansions(b, kernel);
     918
    909919    #ifndef DISABLE_COPY_TO_OVERFLOW
    910920    // Store how many items we produced by this kernel in the prior iteration. We'll use this to determine when
     
    924934    #endif
    925935
    926     const auto & inputs = kernel->getStreamInputs();
    927     const auto n = inputs.size();
    928     std::vector<Value *> arguments(n + 2);
    929 
    930     Value * isFinal = noMore;
    931     for (unsigned i = 0; i < n; ++i) {
    932         const Binding & input = inputs[i];
    933         const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
    934 
    935         const auto p = producedItemCount.find(buffer);
    936         assert (p != producedItemCount.end());
    937         Value * const produced = p->second;
    938 
    939         const ProcessingRate & rate = input.getRate();
    940         if (rate.isPopCount()) {
    941             arguments[i + 2] = produced;
    942         } else {
    943             const unsigned strideSize = ceiling(kernel->getUpperBound(rate) * kernel->getStride());
    944             Value * const processed = b->getNonDeferredProcessedItemCount(input);
    945             Value * const limit = b->CreateAdd(processed, b->getSize(strideSize * codegen::SegmentSize));
    946             Value * const partial = b->CreateICmpULT(produced, limit);
    947             arguments[i + 2] = b->CreateSelect(partial, produced, limit);
    948             isFinal = b->CreateAnd(isFinal, partial);
    949         }
    950     }
    951 
    952     // TODO: pass in a strideNo for fixed rate streams to allow the kernel to calculate the current avail,
    953     // processed, and produced counts
    954 
    955     arguments[0] = kernel->getInstance();
    956     arguments[1] = isFinal;
    957 
    958     b->createDoSegmentCall(arguments);
     936    allocateTemporaryBufferPointerArray(b, kernel);
     937
     938    runKernel(b, kernel);
    959939
    960940    #ifndef DISABLE_COPY_TO_OVERFLOW
     
    986966
    987967/** ------------------------------------------------------------------------------------------------------------- *
     968 * @brief runKernel
     969 ** ------------------------------------------------------------------------------------------------------------- */
     970void PipelineGenerator::runKernel(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel) {
     971
     972    const auto & inputs = kernel->getStreamInputs();
     973    const auto n = inputs.size();
     974    std::vector<Value *> arguments(n + 2);
     975
     976    Value * isFinal = noMore;
     977
     978    Value * requiresTemporaryBuffers = nullptr;
     979
     980    for (unsigned i = 0; i < n; ++i) {
     981        const Binding & input = inputs[i];
     982        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
     983
     984        const auto p = producedItemCount.find(buffer);
     985        assert (p != producedItemCount.end());
     986        Value * const produced = p->second;
     987
     988        const ProcessingRate & rate = input.getRate();
     989        if (rate.isPopCount()) {
     990            arguments[i + 2] = produced;
     991        } else {
     992            const unsigned strideSize = ceiling(kernel->getUpperBound(rate) * kernel->getStride());
     993            Value * const processed = b->getNonDeferredProcessedItemCount(input);
     994            Value * const limit = b->CreateAdd(processed, b->getSize(strideSize * codegen::SegmentSize));
     995            Value * const hasPartial = b->CreateICmpULT(produced, limit);
     996            arguments[i + 2] = b->CreateSelect(hasPartial, produced, limit);
     997            isFinal = b->CreateAnd(isFinal, hasPartial);
     998            if (!temporaryBufferPtrs.empty() && temporaryBufferPtrs[i]) {
     999                if (requiresTemporaryBuffers) {
     1000                    requiresTemporaryBuffers = b->CreateOr(requiresTemporaryBuffers, hasPartial);
     1001                } else {
     1002                    requiresTemporaryBuffers = hasPartial;
     1003                }
     1004            }
     1005        }
     1006    }
     1007
     1008    // TODO: pass in a strideNo for fixed rate streams to allow the kernel to calculate the current avail,
     1009    // processed, and produced counts
     1010
     1011    arguments[0] = kernel->getInstance();
     1012    arguments[1] = isFinal;
     1013
     1014    if (requiresTemporaryBuffers) {
     1015        allocateTemporaryBuffers(b, kernel, requiresTemporaryBuffers);
     1016    }
     1017
     1018    b->createDoSegmentCall(arguments);
     1019
     1020    if (requiresTemporaryBuffers) {
     1021        freeTemporaryBuffers(b, kernel, requiresTemporaryBuffers);
     1022    }
     1023}
     1024
     1025
     1026/** ------------------------------------------------------------------------------------------------------------- *
    9881027 * @brief applyOutputBufferExpansions
    9891028 ** ------------------------------------------------------------------------------------------------------------- */
    990 void PipelineGenerator::applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const unsigned index) {
    991     const Kernel * const kernel = kernels[index];
     1029void PipelineGenerator::applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel) {
    9921030    const auto & outputs = kernel->getStreamSetOutputBuffers();
    9931031    for (unsigned i = 0; i < outputs.size(); i++) {
     
    10251063}
    10261064
     1065/** ------------------------------------------------------------------------------------------------------------- *
     1066 * @brief allocateTemporaryBufferPointerArray
     1067 ** ------------------------------------------------------------------------------------------------------------- */
     1068void PipelineGenerator::allocateTemporaryBufferPointerArray(const std::unique_ptr<KernelBuilder> & b, const Kernel * const kernel) {
     1069    // TODO: whenever two kernels are using the same "unsafe" buffer, they'll both create and destroy their own
     1070    // temporary copies of it. This could be optimized to have it done at production and deleted after the last
     1071    // consuming kernel utilizes it.
     1072    temporaryBufferPtrs.clear();
     1073
     1074    const auto & inputs = kernel->getStreamInputs();
     1075    for (unsigned i = 0; i < inputs.size(); ++i) {
     1076        const StreamSetBuffer * const buffer = kernel->getStreamSetInputBuffer(i);
     1077        if (LLVM_UNLIKELY(isPotentiallyUnsafeInputBuffer(buffer))) {
     1078            if (temporaryBufferPtrs.empty()) {
     1079                temporaryBufferPtrs.resize(inputs.size(), nullptr);
     1080            }
     1081            assert (temporaryBufferPtrs[i] == nullptr);
     1082            PointerType * const ptrTy = buffer->getStreamSetPointerType();
     1083            StructType * const structTy = StructType::create(b->getContext(), {ptrTy, ptrTy});
     1084            AllocaInst * const tempBuffer = b->CreateAlloca(structTy);
     1085            b->CreateStore(Constant::getNullValue(structTy), tempBuffer);
     1086            temporaryBufferPtrs[i] = tempBuffer;
     1087        }
     1088    }
     1089
     1090}
     1091
     1092/** ------------------------------------------------------------------------------------------------------------- *
     1093 * @brief isPotentiallyUnsafeInputBuffer
     1094 *
     1095 * We cannot trust that the final block of any single stream source or external buffer can be safely read past its
     1096 * final item since kernels may attempt to load aligned blocks of data, leading to potentially-intermittent
     1097 * segmentation faults, depending on whether the access crosses a page boundary.
     1098 ** ------------------------------------------------------------------------------------------------------------- */
     1099inline bool PipelineGenerator::isPotentiallyUnsafeInputBuffer(const StreamSetBuffer * const buffer) {
     1100    return isa<SourceBuffer>(buffer) && buffer->getNumOfStreams() == 1;
     1101}
     1102
     1103/** ------------------------------------------------------------------------------------------------------------- *
     1104 * @brief allocateTemporaryBuffers
     1105 ** ------------------------------------------------------------------------------------------------------------- */
     1106void PipelineGenerator::allocateTemporaryBuffers(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel, Value * const requiresTemporaryBuffers) {
     1107    ConstantInt * const ZERO = b->getInt32(0);
     1108    ConstantInt * const ONE = b->getInt32(1);
     1109    BasicBlock * const allocateBuffers = b->CreateBasicBlock();
     1110    BasicBlock * const runKernel = b->CreateBasicBlock();
     1111    b->CreateUnlikelyCondBr(requiresTemporaryBuffers, allocateBuffers, runKernel);
     1112
     1113    b->SetInsertPoint(allocateBuffers);
     1114    for (unsigned i = 0; i < temporaryBufferPtrs.size(); ++i) {
     1115        if (temporaryBufferPtrs[i]) {
     1116            const Binding & input = kernel->getStreamInput(i);
     1117            const auto p = producedItemCount.find(kernel->getStreamSetInputBuffer(i));
     1118            assert (p != producedItemCount.end());
     1119            Value * const produced = p->second;
     1120            Value * const processed = b->getProcessedItemCount(input.getName());
     1121            Value * const unprocessed = b->CreateSub(produced, processed);
     1122            const auto temp = b->AcquireTemporaryBuffer(input.getName(), processed, unprocessed);
     1123            b->CreateStore(temp.first, b->CreateGEP(temporaryBufferPtrs[i], { ZERO, ZERO }));
     1124            b->CreateStore(temp.second, b->CreateGEP(temporaryBufferPtrs[i], { ZERO, ONE }));
     1125        }
     1126    }
     1127    b->CreateBr(runKernel);
     1128
     1129    b->SetInsertPoint(runKernel);
     1130}
     1131
     1132/** ------------------------------------------------------------------------------------------------------------- *
     1133 * @brief freeTemporaryBuffers
     1134 ** ------------------------------------------------------------------------------------------------------------- */
     1135void PipelineGenerator::freeTemporaryBuffers(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel, Value * const requiresTemporaryBuffers) {
     1136    ConstantInt * const ZERO = b->getInt32(0);
     1137    ConstantInt * const ONE = b->getInt32(1);
     1138
     1139    BasicBlock * const freeBuffers = b->CreateBasicBlock();
     1140    BasicBlock * const finishedKernel = b->CreateBasicBlock();
     1141    b->CreateUnlikelyCondBr(requiresTemporaryBuffers, freeBuffers, finishedKernel);
     1142    b->SetInsertPoint(freeBuffers);
     1143    for (unsigned i = 0; i < temporaryBufferPtrs.size(); ++i) {
     1144        if (temporaryBufferPtrs[i]) {
     1145            Value * const originalBuffer = b->CreateLoad(b->CreateGEP(temporaryBufferPtrs[i], { ZERO, ZERO }));
     1146            const Binding & input = kernel->getStreamInput(i);
     1147            b->setBaseAddress(input.getName(), originalBuffer);
     1148            Value * const temporaryBuffer = b->CreateLoad(b->CreateGEP(temporaryBufferPtrs[i], { ZERO, ONE }));
     1149            b->CreateFree(temporaryBuffer);
     1150        }
     1151    }
     1152    b->CreateBr(finishedKernel);
     1153
     1154    b->SetInsertPoint(finishedKernel);
     1155}
    10271156
    10281157/** ------------------------------------------------------------------------------------------------------------- *
     
    10621191    }
    10631192    return final;
    1064 
    10651193}
    10661194
Note: See TracChangeset for help on using the changeset viewer.