Ignore:
Timestamp:
Nov 14, 2017, 10:17:19 AM (15 months ago)
Author:
cameron
Message:

Incremental object caching; multithreaded print

File:
1 edited

Legend:

Unmodified
Added
Removed
  • icGREP/icgrep-devel/icgrep/grep_engine.cpp

    r5732 r5735  
    3939#include <llvm/ADT/STLExtras.h> // for make_unique
    4040#include <llvm/Support/CommandLine.h>
     41#include <llvm/Support/Debug.h>
    4142
    4243using namespace parabix;
     
    5051GrepEngine::GrepEngine() :
    5152    mGrepDriver(nullptr),
     53    mNextFileToGrep(0),
     54    mNextFileToPrint(0),
    5255    grepMatchFound(false),
    53     fileCount(0),
    54     mMoveMatchesToEOL(true) {}
     56    mMoveMatchesToEOL(true),
     57    mEngineThread(pthread_self()) {}
    5558   
    5659GrepEngine::~GrepEngine() {
     
    8083    const unsigned n = filenames.size();
    8184    mResultStrs.resize(n);
     85    mFileStatus.resize(n);
    8286    for (unsigned i = 0; i < n; i++) {
    8387        mResultStrs[i] = make_unique<std::stringstream>();
     88        mFileStatus[i] = FileStatus::Pending;
    8489    }
    8590    inputFiles = filenames;
     
    417422// The process of searching a group of files may use a sequential or a task
    418423// parallel approach.
     424   
     425void * DoGrepThreadFunction(void *args) {
     426    reinterpret_cast<GrepEngine *>(args)->DoGrepThreadMethod();
     427}
    419428
    420429bool GrepEngine::searchAllFiles() {
    421     if (Threads <= 1) {
    422         for (unsigned i = 0; i != inputFiles.size(); ++i) {
    423             size_t grepResult = doGrep(inputFiles[i], i);
    424             if (grepResult > 0) {
    425                 grepMatchFound = true;
    426                 if (QuietMode) break;
    427             }
    428         }
    429     } else if (Threads > 1) {
    430         const unsigned numOfThreads = Threads; // <- convert the command line value into an integer to allow stack allocation
    431         pthread_t threads[numOfThreads];
     430    const unsigned numOfThreads = Threads; // <- convert the command line value into an integer to allow stack allocation
     431    pthread_t threads[numOfThreads];
     432   
     433    for(unsigned long i = 1; i < numOfThreads; ++i) {
     434        const int rc = pthread_create(&threads[i], nullptr, DoGrepThreadFunction, (void *)this);
     435        if (rc) {
     436            llvm::report_fatal_error("Failed to create thread: code " + std::to_string(rc));
     437        }
     438    }
     439    // Main thread also does the work;
     440   
     441    DoGrepThreadMethod();
     442    for(unsigned i = 1; i < numOfThreads; ++i) {
     443        void * status = nullptr;
     444        const int rc = pthread_join(threads[i], &status);
     445        if (rc) {
     446            llvm::report_fatal_error("Failed to join thread: code " + std::to_string(rc));
     447        }
     448    }
     449    return grepMatchFound;
     450}
     451
     452
     453// DoGrep thread function.
     454void * GrepEngine::DoGrepThreadMethod() {
     455    size_t fileIdx;
     456
     457    count_mutex.lock();
     458    fileIdx = mNextFileToGrep;
     459    if (fileIdx < inputFiles.size()) {
     460        mFileStatus[fileIdx] = FileStatus::InGrep;
     461        mNextFileToGrep++;
     462    }
     463    count_mutex.unlock();
     464
     465    while (fileIdx < inputFiles.size()) {
     466        size_t grepResult = doGrep(inputFiles[fileIdx], fileIdx);
    432467       
    433         for(unsigned long i = 0; i < numOfThreads; ++i) {
    434             const int rc = pthread_create(&threads[i], nullptr, DoGrepThreadFunction, (void *)this);
    435             if (rc) {
    436                 llvm::report_fatal_error("Failed to create thread: code " + std::to_string(rc));
    437             }
    438         }
    439         for(unsigned i = 0; i < numOfThreads; ++i) {
    440             void * status = nullptr;
    441             const int rc = pthread_join(threads[i], &status);
    442             if (rc) {
    443                 llvm::report_fatal_error("Failed to join thread: code " + std::to_string(rc));
    444             }
    445         }
    446     }
    447     return grepMatchFound;
    448 }
    449 
    450 
    451 // DoGrep thread function.
    452 void * GrepEngine::DoGrepThreadFunction(void *args) {
    453     size_t fileIdx;
    454     grep::GrepEngine * grepEngine = (grep::GrepEngine *)args;
    455 
    456     grepEngine->count_mutex.lock();
    457     fileIdx = grepEngine->fileCount;
    458     grepEngine->fileCount++;
    459     grepEngine->count_mutex.unlock();
    460 
    461     while (fileIdx < grepEngine->inputFiles.size()) {
    462         size_t grepResult = grepEngine->doGrep(grepEngine->inputFiles[fileIdx], fileIdx);
    463        
    464         grepEngine->count_mutex.lock();
    465         if (grepResult > 0) grepEngine->grepMatchFound = true;
    466         fileIdx = grepEngine->fileCount;
    467         grepEngine->fileCount++;
    468         grepEngine->count_mutex.unlock();
    469         if (QuietMode && grepEngine->grepMatchFound) pthread_exit(nullptr);
    470     }
    471     pthread_exit(nullptr);
    472 }
    473    
    474 void GrepEngine::writeMatches() {
    475     for (unsigned i = 0; i < inputFiles.size(); ++i) {
    476         std::cout << mResultStrs[i]->str();
    477     }
    478 }
    479 
    480 }
    481 
     468        count_mutex.lock();
     469        mFileStatus[fileIdx] = FileStatus::GrepComplete;
     470        if (grepResult > 0) grepMatchFound = true;
     471        fileIdx = mNextFileToGrep;
     472        if (fileIdx < inputFiles.size()) {
     473            mFileStatus[fileIdx] = FileStatus::InGrep;
     474            mNextFileToGrep++;
     475        }
     476        count_mutex.unlock();
     477        if (QuietMode && grepMatchFound) {
     478            if (pthread_self() != mEngineThread) pthread_exit(nullptr);
     479            return nullptr;
     480        }
     481    }
     482    count_mutex.lock();
     483    fileIdx = mNextFileToPrint;
     484    bool readyToPrint = ((fileIdx == 0) || (mFileStatus[fileIdx-1] == FileStatus::PrintComplete)) && (mFileStatus[fileIdx] == FileStatus::GrepComplete);
     485    if (fileIdx < inputFiles.size() && readyToPrint) {
     486        mFileStatus[fileIdx] = FileStatus::Printing;
     487        mNextFileToPrint++;
     488    }
     489    count_mutex.unlock();
     490    while (fileIdx < inputFiles.size()) {
     491        if (readyToPrint) {
     492            std::cout << mResultStrs[fileIdx]->str();
     493        }
     494        else if (pthread_self() == mEngineThread) {
     495            mGrepDriver->performIncrementalCacheCleanupStep();
     496        }
     497        count_mutex.lock();
     498        if (readyToPrint) mFileStatus[fileIdx] = FileStatus::PrintComplete;
     499        fileIdx = mNextFileToPrint;
     500        readyToPrint = (mFileStatus[fileIdx-1] == FileStatus::PrintComplete) && (mFileStatus[fileIdx] == FileStatus::GrepComplete);
     501        if (fileIdx < inputFiles.size() && readyToPrint) {
     502            mFileStatus[fileIdx] = FileStatus::Printing;
     503            mNextFileToPrint++;
     504        }
     505        count_mutex.unlock();
     506    }
     507    if (pthread_self() != mEngineThread) {
     508        pthread_exit(nullptr);
     509    }
     510    else {
     511        return nullptr;
     512    }
     513}
     514}
     515
Note: See TracChangeset for help on using the changeset viewer.