source: icGREP/icgrep-devel/icgrep/base64.cpp @ 5864

Last change on this file since 5864 was 5856, checked in by nmedfort, 18 months ago

Revised pipeline structure to better control I/O rates

File size: 8.4 KB
RevLine 
[5232]1/*
2 *  Copyright (c) 2016 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 *  icgrep is a trademark of International Characters.
5 */
6
7#include <iostream>
8#include <llvm/IR/Function.h>
9#include <llvm/IR/Module.h>
10#include <llvm/ExecutionEngine/ExecutionEngine.h>
11#include <llvm/IR/Verifier.h>
12#include <llvm/Support/CommandLine.h>
[5425]13#include <toolchain/toolchain.h>
[5464]14#include <toolchain/cpudriver.h>
[5238]15#include <IR_Gen/idisa_target.h>
[5429]16#include <kernels/source_kernel.h>
[5267]17#include <kernels/streamset.h>
[5232]18#include <kernels/radix64.h>
19#include <kernels/stdout_kernel.h>
[5436]20#include <kernels/kernel_builder.h>
[5418]21#include <boost/interprocess/mapped_region.hpp>
[5232]22#include <boost/interprocess/anonymous_shared_memory.hpp>
[5418]23#include <sys/stat.h>
24#include <fcntl.h>
[5601]25#include <mutex>
[5260]26
27using namespace llvm;
28
[5232]29static cl::OptionCategory base64Options("base64 Options",
30                                            "Transcoding control options.");
31
32static cl::list<std::string> inputFiles(cl::Positional, cl::desc("<input file ...>"), cl::OneOrMore, cl::cat(base64Options));
33
34static cl::opt<bool> mMapBuffering("mmap-buffering", cl::desc("Enable mmap buffering."), cl::cat(base64Options));
35static cl::opt<bool> memAlignBuffering("memalign-buffering", cl::desc("Enable posix_memalign buffering."), cl::cat(base64Options));
[5601]36static cl::opt<int> Threads("threads", cl::desc("Total number of threads."), cl::init(1));
[5232]37
38
39using namespace kernel;
40using namespace parabix;
41
[5396]42void base64PipelineGen(ParabixDriver & pxDriver) {
43       
[5435]44    auto & iBuilder = pxDriver.getBuilder();
[5396]45    Module * mod = iBuilder->getModule();
[5425]46    Type * bitBlockType = iBuilder->getBitBlockType();
[5232]47
[5425]48    Type * const voidTy = iBuilder->getVoidTy();
[5418]49    Type * const int32Ty = iBuilder->getInt32Ty();
[5425]50    Type * const outputType = PointerType::get(ArrayType::get(ArrayType::get(bitBlockType, 8), 1), 0);
[5254]51   
52   
[5418]53    Function * const main = cast<Function>(mod->getOrInsertFunction("Main", voidTy, int32Ty, outputType, nullptr));
[5254]54    main->setCallingConv(CallingConv::C);
[5771]55    auto args = main->arg_begin();
[5254]56   
[5418]57    Value * const fileDescriptor = &*(args++);
58    fileDescriptor->setName("fileDescriptor");
[5254]59    Value * const outputStream = &*(args++);
60    outputStream->setName("outputStream");
[5504]61    iBuilder->SetInsertPoint(BasicBlock::Create(mod->getContext(), "entry", main,0));
[5254]62
[5277]63    //Round up to a multiple of 3.
[5856]64    const auto bufferSize = ((codegen::SegmentSize * codegen::BufferSegments * codegen::ThreadNum + 2) / 3) * 3;
[5504]65
[5755]66    StreamSetBuffer * ByteStream = pxDriver.addBuffer<SourceBuffer>(iBuilder, iBuilder->getStreamSetTy(1, 8));
[5856]67    Kernel * mmapK = pxDriver.addKernelInstance<MMapSourceKernel>(iBuilder);
[5504]68    mmapK->setInitialArguments({fileDescriptor});
69    pxDriver.makeKernelCall(mmapK, {}, {ByteStream});
[5232]70   
[5856]71    const auto outputBufferSize = ((bufferSize + 2) / 3) * 4;
72
73    StreamSetBuffer * Expanded3_4Out = pxDriver.addBuffer<DynamicBuffer>(iBuilder, iBuilder->getStreamSetTy(1, 8), outputBufferSize);
[5755]74    Kernel * expandK = pxDriver.addKernelInstance<expand3_4Kernel>(iBuilder);
[5504]75    pxDriver.makeKernelCall(expandK, {ByteStream}, {Expanded3_4Out});
[5232]76   
[5856]77    StreamSetBuffer * Radix64out = pxDriver.addBuffer<DynamicBuffer>(iBuilder, iBuilder->getStreamSetTy(1, 8), outputBufferSize);
[5755]78    Kernel * radix64K = pxDriver.addKernelInstance<radix64Kernel>(iBuilder);
[5504]79    pxDriver.makeKernelCall(radix64K, {Expanded3_4Out}, {Radix64out});
[5254]80   
[5601]81    if (memAlignBuffering){
[5755]82        auto Base64out = pxDriver.addBuffer<ExternalBuffer>(iBuilder, iBuilder->getStreamSetTy(1, 8), outputStream);
83        Kernel * base64K = pxDriver.addKernelInstance<base64Kernel>(iBuilder);
[5601]84        pxDriver.makeKernelCall(base64K, {Radix64out}, {Base64out});
[5856]85    } else {
86        StreamSetBuffer * Base64out = pxDriver.addBuffer<DynamicBuffer>(iBuilder, iBuilder->getStreamSetTy(1, 8), outputBufferSize);
[5755]87        Kernel * base64K = pxDriver.addKernelInstance<base64Kernel>(iBuilder);
[5856]88        pxDriver.makeKernelCall(base64K, {Radix64out}, {Base64out});       
[5755]89        Kernel * outK = pxDriver.addKernelInstance<StdOutKernel>(iBuilder, 8);
[5601]90        pxDriver.makeKernelCall(outK, {Base64out}, {});
91    }
[5396]92   
93    pxDriver.generatePipelineIR();
[5597]94    pxDriver.deallocateBuffers();
[5232]95    iBuilder->CreateRetVoid();
[5401]96
[5474]97    pxDriver.finalizeObject();
[5232]98}
99
[5418]100typedef void (*base64FunctionType)(const uint32_t fd, char * outputBuffer);
[5232]101
[5418]102size_t file_size(const int fd) {
103    struct stat st;
104    if (LLVM_UNLIKELY(fstat(fd, &st) != 0)) {
105        st.st_size = 0;
106    }
107    return st.st_size;
108}
109
[5232]110void base64(base64FunctionType fn_ptr, const std::string & fileName) {
[5240]111
[5418]112    const int fd = open(fileName.c_str(), O_RDONLY);
113    if (LLVM_UNLIKELY(fd == -1)) {
114        std::cerr << "Error: cannot open " << fileName << " for processing. Skipped.\n";
[5232]115        return;
116    }
117    if (mMapBuffering) {
[5418]118        boost::interprocess::mapped_region outputBuffer(boost::interprocess::anonymous_shared_memory(2 * file_size(fd)));
[5232]119        outputBuffer.advise(boost::interprocess::mapped_region::advice_willneed);
120        outputBuffer.advise(boost::interprocess::mapped_region::advice_sequential);
[5418]121        fn_ptr(fd, static_cast<char*>(outputBuffer.get_address()));
122    } else if (memAlignBuffering) {
[5601]123        unsigned inputSize = file_size(fd);
124        unsigned paddingSize = (inputSize % 3) ? (4 - (inputSize % 3)) : 0;
125        unsigned outputSize = inputSize * 4/3 + paddingSize;
126
[5232]127        char * outputBuffer;
[5601]128        if (posix_memalign(reinterpret_cast<void **>(&outputBuffer), 32, inputSize * 2)) {
[5240]129            throw std::bad_alloc();
130        }
[5418]131        fn_ptr(fd, outputBuffer);
[5601]132        fwrite(outputBuffer, outputSize, 1, stdout);
[5232]133        free(reinterpret_cast<void *>(outputBuffer));
[5418]134    } else { /* No external output buffer */
135        fn_ptr(fd, nullptr);
[5232]136    }
[5418]137    close(fd);
[5232]138   
139}
140
[5601]141std::mutex count_mutex;
142size_t fileCount;
143base64FunctionType fn_ptr;
144
145std::vector<char *> resultStrs;
146std::vector<int> filesizes;
147
148void *Base64ThreadFunction(void *args)
149{
150    size_t fileIdx;
151
152    count_mutex.lock();
153    fileIdx = fileCount;
154    fileCount++;
155    count_mutex.unlock();
156
157    while (fileIdx < inputFiles.size()) {
158        const int fd = open(inputFiles[fileIdx].c_str(), O_RDONLY);
159        if (LLVM_UNLIKELY(fd == -1)) {
160            std::cerr << "Error: cannot open " << inputFiles[fileIdx] << " for processing. Skipped.\n";
161            exit(-1);
162        }
163
164        char * outputBuffer;
165        if (posix_memalign(reinterpret_cast<void **>(&outputBuffer), 32, 2 * file_size(fd))) {
166            throw std::bad_alloc();
167        }
168   
169        fn_ptr(fd, outputBuffer);
170        resultStrs[fileIdx] = outputBuffer;
171        filesizes[fileIdx] = file_size(fd);
172
173        count_mutex.lock();
174        fileIdx = fileCount;
175        fileCount++;
176        count_mutex.unlock();
177    }
178
179    pthread_exit(nullptr);
180}
181
[5232]182int main(int argc, char *argv[]) {
[5486]183    codegen::ParseCommandLineOptions(argc, argv, {&base64Options, codegen::codegen_flags()});
[5232]184
[5601]185    if (Threads == 1) {
186        ParabixDriver pxDriver("base64");
187        base64PipelineGen(pxDriver);
188        fn_ptr = reinterpret_cast<base64FunctionType>(pxDriver.getMain());     
189        for (unsigned i = 0; i != inputFiles.size(); ++i) {
190            base64(fn_ptr, inputFiles[i]);
191        }
[5232]192    }
[5601]193    else{
194        memAlignBuffering = true;
195        ParabixDriver pxDriver("base64");
196        base64PipelineGen(pxDriver);
197        fn_ptr = reinterpret_cast<base64FunctionType>(pxDriver.getMain());
[5232]198
[5601]199        fileCount = 0;
200        const unsigned n = inputFiles.size();
201        resultStrs.resize(n);
202        filesizes.resize(n);
203
204        const unsigned numOfThreads = Threads;
205        pthread_t threads[numOfThreads];
206
207        for(unsigned long i = 0; i < numOfThreads; ++i){
208            const int rc = pthread_create(&threads[i], NULL, Base64ThreadFunction, (void *)i);
209            if (rc) {
210                llvm::report_fatal_error("Failed to create thread: code " + std::to_string(rc));
211            }
212        }
213
214        for(unsigned i = 0; i < numOfThreads; ++i) {
215            void * status = nullptr;
216            const int rc = pthread_join(threads[i], &status);
217            if (rc) {
218                llvm::report_fatal_error("Failed to join thread: code " + std::to_string(rc));
219            }
220        }
221
222        for (unsigned i=0; i<resultStrs.size(); i++){
223            unsigned paddingSize = (filesizes[i] % 3) ? (4 - (filesizes[i] % 3)) : 0;
224            fwrite(resultStrs[i], filesizes[i] * 4/3 + paddingSize, 1, stdout);
225        }
226    }   
227
[5232]228    return 0;
229}
230
Note: See TracBrowser for help on using the repository browser.