Changeset 5601


Ignore:
Timestamp:
Aug 6, 2017, 1:20:25 PM (3 months ago)
Author:
lindanl
Message:

base64: Add task parallel for multiple input files.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • icGREP/icgrep-devel/icgrep/base64.cpp

    r5597 r5601  
    2323#include <sys/stat.h>
    2424#include <fcntl.h>
     25#include <mutex>
    2526
    2627using namespace llvm;
     
    3334static cl::opt<bool> mMapBuffering("mmap-buffering", cl::desc("Enable mmap buffering."), cl::cat(base64Options));
    3435static cl::opt<bool> memAlignBuffering("memalign-buffering", cl::desc("Enable posix_memalign buffering."), cl::cat(base64Options));
     36static cl::opt<int> Threads("threads", cl::desc("Total number of threads."), cl::init(1));
    3537
    3638
     
    7779    pxDriver.makeKernelCall(radix64K, {Expanded3_4Out}, {Radix64out});
    7880   
    79     StreamSetBuffer * Base64out = pxDriver.addBuffer(make_unique<DynamicBuffer>(iBuilder, iBuilder->getStreamSetTy(1, 8), bufferSize));
    80     Kernel * base64K = pxDriver.addKernelInstance(make_unique<base64Kernel>(iBuilder));
    81     pxDriver.makeKernelCall(base64K, {Radix64out}, {Base64out});
    82    
    83     Kernel * outK = pxDriver.addKernelInstance(make_unique<StdOutKernel>(iBuilder, 8));
    84     pxDriver.makeKernelCall(outK, {Base64out}, {});
     81    if (memAlignBuffering){
     82        auto Base64out = pxDriver.addExternalBuffer(make_unique<ExternalBuffer>(iBuilder, iBuilder->getStreamSetTy(1, 8), outputStream));
     83        Kernel * base64K = pxDriver.addKernelInstance(make_unique<base64Kernel>(iBuilder));
     84        pxDriver.makeKernelCall(base64K, {Radix64out}, {Base64out});
     85    }
     86    else {
     87        StreamSetBuffer * Base64out = pxDriver.addBuffer(make_unique<DynamicBuffer>(iBuilder, iBuilder->getStreamSetTy(1, 8), bufferSize));
     88        Kernel * base64K = pxDriver.addKernelInstance(make_unique<base64Kernel>(iBuilder));
     89        pxDriver.makeKernelCall(base64K, {Radix64out}, {Base64out});
     90       
     91        Kernel * outK = pxDriver.addKernelInstance(make_unique<StdOutKernel>(iBuilder, 8));
     92        pxDriver.makeKernelCall(outK, {Base64out}, {});
     93    }
    8594   
    8695    pxDriver.generatePipelineIR();
     
    9099    pxDriver.finalizeObject();
    91100}
    92 
    93101
    94102typedef void (*base64FunctionType)(const uint32_t fd, char * outputBuffer);
     
    115123        fn_ptr(fd, static_cast<char*>(outputBuffer.get_address()));
    116124    } else if (memAlignBuffering) {
     125        unsigned inputSize = file_size(fd);
     126        unsigned paddingSize = (inputSize % 3) ? (4 - (inputSize % 3)) : 0;
     127        unsigned outputSize = inputSize * 4/3 + paddingSize;
     128
     129        char * outputBuffer;
     130        if (posix_memalign(reinterpret_cast<void **>(&outputBuffer), 32, inputSize * 2)) {
     131            throw std::bad_alloc();
     132        }
     133        fn_ptr(fd, outputBuffer);
     134        fwrite(outputBuffer, outputSize, 1, stdout);
     135        free(reinterpret_cast<void *>(outputBuffer));
     136    } else { /* No external output buffer */
     137        fn_ptr(fd, nullptr);
     138    }
     139    close(fd);
     140   
     141}
     142
     143std::mutex count_mutex;
     144size_t fileCount;
     145base64FunctionType fn_ptr;
     146
     147std::vector<char *> resultStrs;
     148std::vector<int> filesizes;
     149
     150void *Base64ThreadFunction(void *args)
     151{
     152    size_t fileIdx;
     153
     154    count_mutex.lock();
     155    fileIdx = fileCount;
     156    fileCount++;
     157    count_mutex.unlock();
     158
     159    while (fileIdx < inputFiles.size()) {
     160        const int fd = open(inputFiles[fileIdx].c_str(), O_RDONLY);
     161        if (LLVM_UNLIKELY(fd == -1)) {
     162            std::cerr << "Error: cannot open " << inputFiles[fileIdx] << " for processing. Skipped.\n";
     163            exit(-1);
     164        }
     165
    117166        char * outputBuffer;
    118167        if (posix_memalign(reinterpret_cast<void **>(&outputBuffer), 32, 2 * file_size(fd))) {
    119168            throw std::bad_alloc();
    120169        }
     170   
    121171        fn_ptr(fd, outputBuffer);
    122         free(reinterpret_cast<void *>(outputBuffer));
    123     } else { /* No external output buffer */
    124         fn_ptr(fd, nullptr);
    125     }
    126     close(fd);
    127    
     172        resultStrs[fileIdx] = outputBuffer;
     173        filesizes[fileIdx] = file_size(fd);
     174
     175        count_mutex.lock();
     176        fileIdx = fileCount;
     177        fileCount++;
     178        count_mutex.unlock();
     179    }
     180
     181    pthread_exit(nullptr);
    128182}
    129183
     
    131185    codegen::ParseCommandLineOptions(argc, argv, {&base64Options, codegen::codegen_flags()});
    132186
    133     ParabixDriver pxDriver("base64");
    134     base64PipelineGen(pxDriver);
    135     auto main = reinterpret_cast<base64FunctionType>(pxDriver.getMain());
    136 
    137     for (unsigned i = 0; i != inputFiles.size(); ++i) {
    138         base64(main, inputFiles[i]);
    139     }
     187    if (Threads == 1) {
     188        ParabixDriver pxDriver("base64");
     189        base64PipelineGen(pxDriver);
     190        fn_ptr = reinterpret_cast<base64FunctionType>(pxDriver.getMain());     
     191        for (unsigned i = 0; i != inputFiles.size(); ++i) {
     192            base64(fn_ptr, inputFiles[i]);
     193        }
     194    }
     195    else{
     196        memAlignBuffering = true;
     197        ParabixDriver pxDriver("base64");
     198        base64PipelineGen(pxDriver);
     199        fn_ptr = reinterpret_cast<base64FunctionType>(pxDriver.getMain());
     200
     201        fileCount = 0;
     202        const unsigned n = inputFiles.size();
     203        resultStrs.resize(n);
     204        filesizes.resize(n);
     205
     206        const unsigned numOfThreads = Threads;
     207        pthread_t threads[numOfThreads];
     208
     209        for(unsigned long i = 0; i < numOfThreads; ++i){
     210            const int rc = pthread_create(&threads[i], NULL, Base64ThreadFunction, (void *)i);
     211            if (rc) {
     212                llvm::report_fatal_error("Failed to create thread: code " + std::to_string(rc));
     213            }
     214        }
     215
     216        for(unsigned i = 0; i < numOfThreads; ++i) {
     217            void * status = nullptr;
     218            const int rc = pthread_join(threads[i], &status);
     219            if (rc) {
     220                llvm::report_fatal_error("Failed to join thread: code " + std::to_string(rc));
     221            }
     222        }
     223
     224        for (unsigned i=0; i<resultStrs.size(); i++){
     225            unsigned paddingSize = (filesizes[i] % 3) ? (4 - (filesizes[i] % 3)) : 0;
     226            fwrite(resultStrs[i], filesizes[i] * 4/3 + paddingSize, 1, stdout);
     227        }
     228    }   
    140229
    141230    return 0;
Note: See TracChangeset for help on using the changeset viewer.