Changeset 5735


Ignore:
Timestamp:
Nov 14, 2017, 10:17:19 AM (4 days ago)
Author:
cameron
Message:

Incremental object caching; multithreaded print

Location:
icGREP/icgrep-devel/icgrep
Files:
9 edited

Legend:

Unmodified
Added
Removed
  • icGREP/icgrep-devel/icgrep/grep_engine.cpp

    r5732 r5735  
    3939#include <llvm/ADT/STLExtras.h> // for make_unique
    4040#include <llvm/Support/CommandLine.h>
     41#include <llvm/Support/Debug.h>
    4142
    4243using namespace parabix;
     
    5051GrepEngine::GrepEngine() :
    5152    mGrepDriver(nullptr),
     53    mNextFileToGrep(0),
     54    mNextFileToPrint(0),
    5255    grepMatchFound(false),
    53     fileCount(0),
    54     mMoveMatchesToEOL(true) {}
     56    mMoveMatchesToEOL(true),
     57    mEngineThread(pthread_self()) {}
    5558   
    5659GrepEngine::~GrepEngine() {
     
    8083    const unsigned n = filenames.size();
    8184    mResultStrs.resize(n);
     85    mFileStatus.resize(n);
    8286    for (unsigned i = 0; i < n; i++) {
    8387        mResultStrs[i] = make_unique<std::stringstream>();
     88        mFileStatus[i] = FileStatus::Pending;
    8489    }
    8590    inputFiles = filenames;
     
    417422// The process of searching a group of files may use a sequential or a task
    418423// parallel approach.
     424   
     425void * DoGrepThreadFunction(void *args) {
     426    reinterpret_cast<GrepEngine *>(args)->DoGrepThreadMethod();
     427}
    419428
    420429bool GrepEngine::searchAllFiles() {
    421     if (Threads <= 1) {
    422         for (unsigned i = 0; i != inputFiles.size(); ++i) {
    423             size_t grepResult = doGrep(inputFiles[i], i);
    424             if (grepResult > 0) {
    425                 grepMatchFound = true;
    426                 if (QuietMode) break;
    427             }
    428         }
    429     } else if (Threads > 1) {
    430         const unsigned numOfThreads = Threads; // <- convert the command line value into an integer to allow stack allocation
    431         pthread_t threads[numOfThreads];
     430    const unsigned numOfThreads = Threads; // <- convert the command line value into an integer to allow stack allocation
     431    pthread_t threads[numOfThreads];
     432   
     433    for(unsigned long i = 1; i < numOfThreads; ++i) {
     434        const int rc = pthread_create(&threads[i], nullptr, DoGrepThreadFunction, (void *)this);
     435        if (rc) {
     436            llvm::report_fatal_error("Failed to create thread: code " + std::to_string(rc));
     437        }
     438    }
     439    // Main thread also does the work;
     440   
     441    DoGrepThreadMethod();
     442    for(unsigned i = 1; i < numOfThreads; ++i) {
     443        void * status = nullptr;
     444        const int rc = pthread_join(threads[i], &status);
     445        if (rc) {
     446            llvm::report_fatal_error("Failed to join thread: code " + std::to_string(rc));
     447        }
     448    }
     449    return grepMatchFound;
     450}
     451
     452
     453// DoGrep thread function.
     454void * GrepEngine::DoGrepThreadMethod() {
     455    size_t fileIdx;
     456
     457    count_mutex.lock();
     458    fileIdx = mNextFileToGrep;
     459    if (fileIdx < inputFiles.size()) {
     460        mFileStatus[fileIdx] = FileStatus::InGrep;
     461        mNextFileToGrep++;
     462    }
     463    count_mutex.unlock();
     464
     465    while (fileIdx < inputFiles.size()) {
     466        size_t grepResult = doGrep(inputFiles[fileIdx], fileIdx);
    432467       
    433         for(unsigned long i = 0; i < numOfThreads; ++i) {
    434             const int rc = pthread_create(&threads[i], nullptr, DoGrepThreadFunction, (void *)this);
    435             if (rc) {
    436                 llvm::report_fatal_error("Failed to create thread: code " + std::to_string(rc));
    437             }
    438         }
    439         for(unsigned i = 0; i < numOfThreads; ++i) {
    440             void * status = nullptr;
    441             const int rc = pthread_join(threads[i], &status);
    442             if (rc) {
    443                 llvm::report_fatal_error("Failed to join thread: code " + std::to_string(rc));
    444             }
    445         }
    446     }
    447     return grepMatchFound;
    448 }
    449 
    450 
    451 // DoGrep thread function.
    452 void * GrepEngine::DoGrepThreadFunction(void *args) {
    453     size_t fileIdx;
    454     grep::GrepEngine * grepEngine = (grep::GrepEngine *)args;
    455 
    456     grepEngine->count_mutex.lock();
    457     fileIdx = grepEngine->fileCount;
    458     grepEngine->fileCount++;
    459     grepEngine->count_mutex.unlock();
    460 
    461     while (fileIdx < grepEngine->inputFiles.size()) {
    462         size_t grepResult = grepEngine->doGrep(grepEngine->inputFiles[fileIdx], fileIdx);
    463        
    464         grepEngine->count_mutex.lock();
    465         if (grepResult > 0) grepEngine->grepMatchFound = true;
    466         fileIdx = grepEngine->fileCount;
    467         grepEngine->fileCount++;
    468         grepEngine->count_mutex.unlock();
    469         if (QuietMode && grepEngine->grepMatchFound) pthread_exit(nullptr);
    470     }
    471     pthread_exit(nullptr);
    472 }
    473    
    474 void GrepEngine::writeMatches() {
    475     for (unsigned i = 0; i < inputFiles.size(); ++i) {
    476         std::cout << mResultStrs[i]->str();
    477     }
    478 }
    479 
    480 }
    481 
     468        count_mutex.lock();
     469        mFileStatus[fileIdx] = FileStatus::GrepComplete;
     470        if (grepResult > 0) grepMatchFound = true;
     471        fileIdx = mNextFileToGrep;
     472        if (fileIdx < inputFiles.size()) {
     473            mFileStatus[fileIdx] = FileStatus::InGrep;
     474            mNextFileToGrep++;
     475        }
     476        count_mutex.unlock();
     477        if (QuietMode && grepMatchFound) {
     478            if (pthread_self() != mEngineThread) pthread_exit(nullptr);
     479            return nullptr;
     480        }
     481    }
     482    count_mutex.lock();
     483    fileIdx = mNextFileToPrint;
     484    bool readyToPrint = ((fileIdx == 0) || (mFileStatus[fileIdx-1] == FileStatus::PrintComplete)) && (mFileStatus[fileIdx] == FileStatus::GrepComplete);
     485    if (fileIdx < inputFiles.size() && readyToPrint) {
     486        mFileStatus[fileIdx] = FileStatus::Printing;
     487        mNextFileToPrint++;
     488    }
     489    count_mutex.unlock();
     490    while (fileIdx < inputFiles.size()) {
     491        if (readyToPrint) {
     492            std::cout << mResultStrs[fileIdx]->str();
     493        }
     494        else if (pthread_self() == mEngineThread) {
     495            mGrepDriver->performIncrementalCacheCleanupStep();
     496        }
     497        count_mutex.lock();
     498        if (readyToPrint) mFileStatus[fileIdx] = FileStatus::PrintComplete;
     499        fileIdx = mNextFileToPrint;
     500        readyToPrint = (mFileStatus[fileIdx-1] == FileStatus::PrintComplete) && (mFileStatus[fileIdx] == FileStatus::GrepComplete);
     501        if (fileIdx < inputFiles.size() && readyToPrint) {
     502            mFileStatus[fileIdx] = FileStatus::Printing;
     503            mNextFileToPrint++;
     504        }
     505        count_mutex.unlock();
     506    }
     507    if (pthread_self() != mEngineThread) {
     508        pthread_exit(nullptr);
     509    }
     510    else {
     511        return nullptr;
     512    }
     513}
     514}
     515
  • icGREP/icgrep-devel/icgrep/grep_engine.h

    r5707 r5735  
    3131    virtual void grepCodeGen(std::vector<re::RE *> REs);
    3232    bool searchAllFiles();
    33     void writeMatches();
    34    
     33    void * DoGrepThreadMethod();
     34
    3535protected:
    3636    std::pair<parabix::StreamSetBuffer *, parabix::StreamSetBuffer *> grepPipeline(std::vector<re::RE *> & REs, parabix::StreamSetBuffer * ByteStream);
    3737
    38     static void * DoGrepThreadFunction(void *args);
    3938    virtual uint64_t doGrep(const std::string & fileName, const uint32_t fileIdx);
    4039    std::string linePrefix(std::string fileName);
     
    4342    Driver * mGrepDriver;
    4443
     44    enum class FileStatus {Pending, InGrep, GrepComplete, Printing, PrintComplete};
     45    std::mutex count_mutex;
     46    size_t mNextFileToGrep;
     47    size_t mNextFileToPrint;
    4548    std::vector<std::string> inputFiles;
    4649    std::vector<std::unique_ptr<std::stringstream>> mResultStrs;
     50    std::vector<FileStatus> mFileStatus;
     51    bool grepMatchFound;
    4752   
    4853    std::string mFileSuffix;
    49    
    50     bool grepMatchFound;
    51     std::mutex count_mutex;
    52     size_t fileCount;
    5354    bool mMoveMatchesToEOL;
     55    pthread_t mEngineThread;
    5456};
    5557
  • icGREP/icgrep-devel/icgrep/icgrep.cpp

    r5706 r5735  
    203203    bool matchFound = grepEngine->searchAllFiles();
    204204   
    205     grepEngine->writeMatches();
    206    
    207205    delete(grepEngine);
    208206   
  • icGREP/icgrep-devel/icgrep/toolchain/cpudriver.cpp

    r5733 r5735  
    215215        }
    216216        mEngine->finalizeObject();
    217         if (mCache) mCache->cleanUpObjectCacheFiles();
    218217    } catch (const std::exception & e) {
    219218        report_fatal_error(module->getName() + ": " + e.what());
     
    228227void * ParabixDriver::getMain() {
    229228    return mEngine->getPointerToNamedFunction("Main");
     229}
     230
     231void ParabixDriver::performIncrementalCacheCleanupStep() {
     232    mCache->performIncrementalCacheCleanupStep();
    230233}
    231234
  • icGREP/icgrep-devel/icgrep/toolchain/cpudriver.h

    r5630 r5735  
    2525
    2626    void * getMain() override; // "main" exists until the driver is deleted
     27   
     28    void performIncrementalCacheCleanupStep() override;
    2729
    2830private:
  • icGREP/icgrep-devel/icgrep/toolchain/driver.h

    r5544 r5735  
    4747   
    4848    virtual void * getMain() = 0; // "main" exists until the driver is deleted
     49   
     50    virtual void performIncrementalCacheCleanupStep() = 0;
    4951
    5052protected:
  • icGREP/icgrep-devel/icgrep/toolchain/object_cache.cpp

    r5732 r5735  
    88#include <llvm/Support/FileSystem.h>
    99#include <llvm/Support/Path.h>
     10#include <llvm/Support/Debug.h>
    1011#include <llvm/IR/Module.h>
    1112#include <sys/file.h>
     
    6869const static auto SIGNATURE = "signature";
    6970
    70 const static boost::uintmax_t CACHE_SIZE_LIMIT = 50 * 1024 * 1024;
    71 
    7271const MDString * getSignature(const llvm::Module * const M) {
    7372    NamedMDNode * const sig = M->getNamedMetadata(SIGNATURE);
     
    111110                    }
    112111                } else {
     112                   
    113113                    report_fatal_error("signature file expected but not found: " + moduleId);
    114114                }               
     
    131131                    kernel->prepareCachedKernel(idb);                   
    132132                    mCachedObject.emplace(moduleId, std::make_pair(m, std::move(objectBuffer.get())));
    133                     // update the modified time of the object file
     133                    // update the modified time of the .kernel, .o and .sig files
     134                    time_t access_time = time(0);
     135                    boost::filesystem::last_write_time(objectName.c_str(), access_time);
    134136                    sys::path::replace_extension(objectName, ".o");
    135                     boost::filesystem::last_write_time(objectName.c_str(), time(0));
     137                    boost::filesystem::last_write_time(objectName.c_str(), access_time);
     138                    if (kernel->hasSignature()) {
     139                        sys::path::replace_extension(objectName, ".sig");
     140                        boost::filesystem::last_write_time(objectName.c_str(), access_time);
     141                    }
    136142                    return true;
    137143                }
     
    165171        objectName.append(".o");
    166172
    167         if (LLVM_LIKELY(!mCachePath.empty())) {
    168             sys::fs::create_directories(Twine(mCachePath));
    169         }
    170 
    171173        // Write the object code
    172174        std::error_code EC;
     
    199201}
    200202
    201 void ParabixObjectCache::cleanUpObjectCacheFiles() {
    202 
    203     using namespace boost::filesystem;
    204     using ObjectFile = std::pair<std::time_t, path>;
    205 
    206     path cachePath(mCachePath.str());
    207     if (LLVM_LIKELY(is_directory(cachePath))) {
    208         std::vector<ObjectFile> files;
    209         for(const directory_entry & entry : boost::make_iterator_range(directory_iterator(cachePath), {})) {
    210             const auto path = entry.path();;
    211             if (LLVM_LIKELY(is_regular_file(path) && path.has_extension() && path.extension().compare(".o") == 0)) {
    212                 files.emplace_back(last_write_time(path), path.filename());
    213             }
    214         }
    215         // sort the files in decending order of last modified (datetime) then file name
    216         std::sort(files.begin(), files.end(), std::greater<ObjectFile>());
    217         boost::uintmax_t cacheSize = 0;
    218         for(const ObjectFile & entry : files) {
    219             auto objectPath = cachePath / std::get<1>(entry);
    220             if (LLVM_LIKELY(exists(objectPath))) {
    221                 const auto size = file_size(objectPath);
    222                 if ((cacheSize + size) < CACHE_SIZE_LIMIT) {
    223                     cacheSize += size;
    224                 } else {
    225                     remove(objectPath);
    226                     objectPath.replace_extension("sig");
    227                     remove(objectPath);
    228                     objectPath.replace_extension("kernel");
    229                     remove(objectPath);
    230                 }
     203void ParabixObjectCache::performIncrementalCacheCleanupStep() {
     204    if (mCacheCleanupIterator != boost::filesystem::directory_iterator()) {
     205        auto & e = mCacheCleanupIterator->path();
     206        mCacheCleanupIterator++;
     207        // Simple clean-up policy: files that haven't been touched by the
     208        // driver in MaxCacheEntryHours are deleted.
     209        // TODO: possibly incrementally manage by size and/or total file count.
     210        // TODO: possibly determine total filecount and set items per clean up step based on
     211        // filecount
     212        if (boost::filesystem::is_regular_file(e)) {
     213            auto age = std::time(nullptr) - boost::filesystem::last_write_time(e);
     214            if (age > mCacheEntryMaxHours * 3600 /* secs/hour*/ ) {
     215                boost::filesystem::remove(e);
     216                errs() << e.string() << " removed.\n";
    231217            }
    232218        }
     
    243229}
    244230
    245 inline ParabixObjectCache::Path ParabixObjectCache::getDefaultPath() {
     231inline std::string ParabixObjectCache::getDefaultPath() {
    246232    // $HOME/.cache/parabix/
    247233    Path cachePath;
    248234#if LLVM_VERSION_INTEGER < LLVM_3_7_0
    249     sys::path::user_cache_directory(cachePath, "parabix", PARABIX_VERSION);
     235    sys::path::user_cache_directory(cachePath, "parabix");
    250236#else
    251237    sys::path::home_directory(cachePath);
    252     sys::path::append(cachePath, ".cache", "parabix", PARABIX_VERSION);
     238    sys::path::append(cachePath, ".cache", "parabix");
    253239#endif
    254     return cachePath;
    255 }
    256 
    257 ParabixObjectCache::ParabixObjectCache()
    258 : mCachePath(getDefaultPath()) {
    259 
     240    return cachePath.str();
    260241}
    261242
    262243ParabixObjectCache::ParabixObjectCache(const std::string dir)
    263244: mCachePath(dir) {
    264 
    265 }
     245    boost::filesystem::path p(mCachePath.str());
     246    if (LLVM_LIKELY(!mCachePath.empty())) {
     247        sys::fs::create_directories(Twine(mCachePath));
     248    }
     249    boost::filesystem::directory_iterator it(p);
     250    mCacheCleanupIterator = it;
     251    mCacheEntryMaxHours = CACHE_ENTRY_MAX_HOURS;
     252}
     253
     254ParabixObjectCache::ParabixObjectCache()
     255: ParabixObjectCache(getDefaultPath()) {
     256}
     257
     258
  • icGREP/icgrep-devel/icgrep/toolchain/object_cache.h

    r5630 r5735  
    1212#include <llvm/ADT/StringRef.h>
    1313#include <boost/container/flat_map.hpp>
     14#include <boost/filesystem.hpp>
    1415#include <vector>
    1516#include <string>
     
    3435//
    3536
     37unsigned const CACHE_ENTRY_MAX_HOURS (24 * 15);
     38
    3639class ParabixObjectCache final : public llvm::ObjectCache {
    3740    using Path = llvm::SmallString<128>;
     
    4043    using ModuleCache = Map<std::string, std::pair<llvm::Module *, std::unique_ptr<llvm::MemoryBuffer>>>;
    4144public:
     45    ParabixObjectCache(const std::string dir);
    4246    ParabixObjectCache();
    43     ParabixObjectCache(const std::string dir);
    4447    bool loadCachedObjectFile(const std::unique_ptr<kernel::KernelBuilder> & idb, kernel::Kernel * const kernel);
    4548    void notifyObjectCompiled(const llvm::Module * M, llvm::MemoryBufferRef Obj) override;
    46     void cleanUpObjectCacheFiles();
    4749    std::unique_ptr<llvm::MemoryBuffer> getObject(const llvm::Module * M) override;
     50    void performIncrementalCacheCleanupStep();
    4851protected:
    49     static Path getDefaultPath();
     52    static std::string getDefaultPath();
    5053private:
    5154    ModuleCache         mCachedObject;
    5255    const Path          mCachePath;
     56    boost::filesystem::directory_iterator mCacheCleanupIterator;
     57    unsigned            mCacheEntryMaxHours;
    5358};
    5459
  • icGREP/icgrep-devel/icgrep/toolchain/toolchain.cpp

    r5734 r5735  
    177177        default: report_fatal_error(std::string(1, OptLevelOption) + " is an invalid optimization level.");
    178178    }
    179     #ifndef CUDA_ENABLED
     179#ifndef CUDA_ENABLED
    180180    if (NVPTX) {
    181181        report_fatal_error("CUDA compiler is not supported.");
    182182    }
    183     #endif
     183#endif
    184184}
    185185
Note: See TracChangeset for help on using the changeset viewer.