source: icGREP/icgrep-devel/icgrep/grep_engine.cpp @ 5761

Last change on this file since 5761 was 5761, checked in by nmedfort, 21 months ago

Cache signature is now written into .kernel bitcode file. Minor bug fix and revision of GrepEngine::DoGrepThreadMethod?

File size: 18.7 KB
Line 
1/*
2 *  Copyright (c) 2017 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 *  icgrep is a trademark of International Characters.
5 */
6
7#include "grep_engine.h"
8#include "grep_interface.h"
9#include <llvm/IR/Module.h>
10#include <boost/filesystem.hpp>
11#include <UCD/resolve_properties.h>
12#include <kernels/charclasses.h>
13#include <kernels/cc_kernel.h>
14#include <kernels/grep_kernel.h>
15#include <kernels/linebreak_kernel.h>
16#include <kernels/streams_merge.h>
17#include <kernels/source_kernel.h>
18#include <kernels/s2p_kernel.h>
19#include <kernels/scanmatchgen.h>
20#include <kernels/streamset.h>
21#include <kernels/until_n.h>
22#include <kernels/kernel_builder.h>
23#include <pablo/pablo_kernel.h>
24#include <re/re_cc.h>
25#include <re/re_toolchain.h>
26#include <toolchain/toolchain.h>
27#include <re/re_name_resolve.h>   
28#include <re/re_collect_unicodesets.h>
29#include <re/re_multiplex.h>
30#include <toolchain/toolchain.h>
31#include <toolchain/cpudriver.h>
32#include <iostream>
33#include <cc/multiplex_CCs.h>
34#include <llvm/Support/raw_ostream.h>
35#include <util/aligned_allocator.h>
36#include <sys/stat.h>
37#include <fcntl.h>
38#include <errno.h>
39#include <llvm/ADT/STLExtras.h> // for make_unique
40#include <llvm/Support/CommandLine.h>
41#include <llvm/Support/Debug.h>
42
43using namespace parabix;
44using namespace llvm;
45static cl::opt<int> Threads("t", cl::desc("Total number of threads."), cl::init(2));
46
47namespace grep {
48
49// Grep Engine construction and initialization.
50   
51GrepEngine::GrepEngine() :
52    mGrepDriver(nullptr),
53    mNextFileToGrep(0),
54    mNextFileToPrint(0),
55    grepMatchFound(false),
56    mMoveMatchesToEOL(true),
57    mEngineThread(pthread_self()) {}
58   
59GrepEngine::~GrepEngine() {
60    delete mGrepDriver;
61}
62   
63QuietModeEngine::QuietModeEngine() : GrepEngine() {
64    mMoveMatchesToEOL = false;
65}
66
67MatchOnlyEngine::MatchOnlyEngine(bool showFilesWithoutMatch) :
68    GrepEngine(), mRequiredCount(showFilesWithoutMatch) {
69    mFileSuffix = NullFlag ? std::string("\0", 1) : "\n";
70    mMoveMatchesToEOL = false;
71}
72
73CountOnlyEngine::CountOnlyEngine() : GrepEngine() {
74    mFileSuffix = ":";
75}
76
77EmitMatchesEngine::EmitMatchesEngine() : GrepEngine() {
78    mFileSuffix = InitialTabFlag ? "\t:" : ":";
79    if (LineRegexpFlag) mMoveMatchesToEOL = false;
80}
81
82void GrepEngine::initFileResult(std::vector<std::string> & filenames) {
83    const unsigned n = filenames.size();
84    mResultStrs.resize(n);
85    mFileStatus.resize(n);
86    for (unsigned i = 0; i < n; i++) {
87        mResultStrs[i] = make_unique<std::stringstream>();
88        mFileStatus[i] = FileStatus::Pending;
89    }
90    inputFiles = filenames;
91}
92   
93
94// Code Generation
95//
96// All engines share a common pipeline to compute a stream of Matches from a given input Bytestream.
97
98std::pair<StreamSetBuffer *, StreamSetBuffer *> GrepEngine::grepPipeline(std::vector<re::RE *> & REs, StreamSetBuffer * ByteStream) {
99    auto & idb = mGrepDriver->getBuilder();
100    const unsigned segmentSize = codegen::SegmentSize;
101    const unsigned bufferSegments = codegen::BufferSegments * codegen::ThreadNum;
102    const unsigned encodingBits = 8;
103   
104    StreamSetBuffer * BasisBits = mGrepDriver->addBuffer<CircularBuffer>(idb, idb->getStreamSetTy(encodingBits, 1), segmentSize * bufferSegments);
105    kernel::Kernel * s2pk = mGrepDriver->addKernelInstance<kernel::S2PKernel>(idb);
106    mGrepDriver->makeKernelCall(s2pk, {ByteStream}, {BasisBits});
107   
108    StreamSetBuffer * LineBreakStream = mGrepDriver->addBuffer<CircularBuffer>(idb, idb->getStreamSetTy(1, 1), segmentSize * bufferSegments);
109    kernel::Kernel * linebreakK = mGrepDriver->addKernelInstance<kernel::LineBreakKernelBuilder>(idb, encodingBits);
110    mGrepDriver->makeKernelCall(linebreakK, {BasisBits}, {LineBreakStream});
111   
112    kernel::Kernel * requiredStreamsK = mGrepDriver->addKernelInstance<kernel::RequiredStreams_UTF8>(idb);
113    StreamSetBuffer * RequiredStreams = mGrepDriver->addBuffer<CircularBuffer>(idb, idb->getStreamSetTy(4, 1), segmentSize * bufferSegments);
114    mGrepDriver->makeKernelCall(requiredStreamsK, {BasisBits}, {RequiredStreams});
115   
116    const auto n = REs.size();
117   
118    std::vector<std::vector<re::CC *>> charclasses(n);
119   
120    for (unsigned i = 0; i < n; i++) {
121        REs[i] = re::resolveNames(REs[i]);
122        const auto UnicodeSets = re::collectUnicodeSets(REs[i]);
123        std::vector<std::vector<unsigned>> exclusiveSetIDs;
124        doMultiplexCCs(UnicodeSets, exclusiveSetIDs, charclasses[i]);
125        REs[i] = multiplex(REs[i], UnicodeSets, exclusiveSetIDs);
126    }
127   
128    std::vector<StreamSetBuffer *> MatchResultsBufs(n);
129   
130    for(unsigned i = 0; i < n; ++i){
131        const auto numOfCharacterClasses = charclasses[i].size();
132        StreamSetBuffer * CharClasses = mGrepDriver->addBuffer<CircularBuffer>(idb, idb->getStreamSetTy(numOfCharacterClasses), segmentSize * bufferSegments);
133        kernel::Kernel * ccK = mGrepDriver->addKernelInstance<kernel::CharClassesKernel>(idb, std::move(charclasses[i]));
134        mGrepDriver->makeKernelCall(ccK, {BasisBits}, {CharClasses});
135        StreamSetBuffer * MatchResults = mGrepDriver->addBuffer<CircularBuffer>(idb, idb->getStreamSetTy(1, 1), segmentSize * bufferSegments);
136        kernel::Kernel * icgrepK = mGrepDriver->addKernelInstance<kernel::ICGrepKernel>(idb, REs[i], numOfCharacterClasses);
137        mGrepDriver->makeKernelCall(icgrepK, {CharClasses, LineBreakStream, RequiredStreams}, {MatchResults});
138        MatchResultsBufs[i] = MatchResults;
139    }
140    StreamSetBuffer * MergedResults = MatchResultsBufs[0];
141    if (REs.size() > 1) {
142        MergedResults = mGrepDriver->addBuffer<CircularBuffer>(idb, idb->getStreamSetTy(1, 1), segmentSize * bufferSegments);
143        kernel::Kernel * streamsMergeK = mGrepDriver->addKernelInstance<kernel::StreamsMerge>(idb, 1, REs.size());
144        mGrepDriver->makeKernelCall(streamsMergeK, MatchResultsBufs, {MergedResults});
145    }
146    StreamSetBuffer * Matches = MergedResults;
147   
148    if (mMoveMatchesToEOL) {
149        StreamSetBuffer * OriginalMatches = Matches;
150        kernel::Kernel * matchedLinesK = mGrepDriver->addKernelInstance<kernel::MatchedLinesKernel>(idb);
151        Matches = mGrepDriver->addBuffer<CircularBuffer>(idb, idb->getStreamSetTy(1, 1), segmentSize * bufferSegments);
152        mGrepDriver->makeKernelCall(matchedLinesK, {OriginalMatches, LineBreakStream}, {Matches});
153    }
154   
155    if (InvertMatchFlag) {
156        kernel::Kernel * invertK = mGrepDriver->addKernelInstance<kernel::InvertMatchesKernel>(idb);
157        StreamSetBuffer * OriginalMatches = Matches;
158        Matches = mGrepDriver->addBuffer<CircularBuffer>(idb, idb->getStreamSetTy(1, 1), segmentSize * bufferSegments);
159        mGrepDriver->makeKernelCall(invertK, {OriginalMatches, LineBreakStream}, {Matches});
160    }
161    if (MaxCountFlag > 0) {
162        kernel::Kernel * untilK = mGrepDriver->addKernelInstance<kernel::UntilNkernel>(idb);
163        untilK->setInitialArguments({idb->getSize(MaxCountFlag)});
164        StreamSetBuffer * AllMatches = Matches;
165        Matches = mGrepDriver->addBuffer<CircularBuffer>(idb, idb->getStreamSetTy(1, 1), segmentSize * bufferSegments);
166        mGrepDriver->makeKernelCall(untilK, {AllMatches}, {Matches});
167    }
168    return std::pair<StreamSetBuffer *, StreamSetBuffer *>(LineBreakStream, Matches);
169}
170
171// The QuietMode, MatchOnly and CountOnly engines share a common code generation main function,
172// which returns a count of the matches found (possibly subject to a MaxCount).
173//
174
175void GrepEngine::grepCodeGen(std::vector<re::RE *> REs) {
176   
177    assert (mGrepDriver == nullptr);
178    mGrepDriver = new ParabixDriver("engine");
179    auto & idb = mGrepDriver->getBuilder();
180    Module * M = idb->getModule();
181   
182    const auto segmentSize = codegen::SegmentSize;
183    const auto bufferSegments = codegen::BufferSegments * codegen::ThreadNum;
184
185    const unsigned encodingBits = 8;
186   
187    Function * mainFunc = cast<Function>(M->getOrInsertFunction("Main", idb->getInt64Ty(), idb->getInt32Ty(), nullptr));
188    mainFunc->setCallingConv(CallingConv::C);
189    idb->SetInsertPoint(BasicBlock::Create(M->getContext(), "entry", mainFunc, 0));
190    auto args = mainFunc->arg_begin();
191   
192    Value * const fileDescriptor = &*(args++);
193    fileDescriptor->setName("fileDescriptor");
194   
195    StreamSetBuffer * ByteStream = mGrepDriver->addBuffer<SourceBuffer>(idb, idb->getStreamSetTy(1, encodingBits));
196    kernel::Kernel * sourceK = mGrepDriver->addKernelInstance<kernel::FDSourceKernel>(idb, segmentSize * bufferSegments);
197    sourceK->setInitialArguments({fileDescriptor});
198    mGrepDriver->makeKernelCall(sourceK, {}, {ByteStream});
199   
200    StreamSetBuffer * LineBreakStream;
201    StreamSetBuffer * Matches;
202    std::tie(LineBreakStream, Matches) = grepPipeline(REs, ByteStream);
203   
204    kernel::Kernel * matchCountK = mGrepDriver->addKernelInstance<kernel::PopcountKernel>(idb);
205    mGrepDriver->makeKernelCall(matchCountK, {Matches}, {});
206    mGrepDriver->generatePipelineIR();
207    idb->setKernel(matchCountK);
208    Value * matchedLineCount = idb->getAccumulator("countResult");
209    matchedLineCount = idb->CreateZExt(matchedLineCount, idb->getInt64Ty());
210    mGrepDriver->deallocateBuffers();
211    idb->CreateRet(matchedLineCount);
212    mGrepDriver->finalizeObject();
213}
214
215//
216// The EmitMatches engine uses an EmitMatchesAccumulator object to concatenate together
217// matched lines.
218
219class EmitMatch : public MatchAccumulator {
220    friend class EmitMatchesEngine;
221public:
222    EmitMatch(std::string linePrefix, std::stringstream * strm) : mLinePrefix(linePrefix), mLineCount(0), mTerminated(true), mResultStr(strm) {}
223    void accumulate_match(const size_t lineNum, char * line_start, char * line_end) override;
224    void finalize_match(char * buffer_end) override;
225protected:
226    std::string mLinePrefix;
227    size_t mLineCount;
228    bool mTerminated;
229    std::stringstream* mResultStr;
230};
231
232//
233//  Default Report Match:  lines are emitted with whatever line terminators are found in the
234//  input.  However, if the final line is not terminated, a new line is appended.
235//
236void EmitMatch::accumulate_match (const size_t lineNum, char * line_start, char * line_end) {
237    if (WithFilenameFlag) {
238        *mResultStr << mLinePrefix;
239    }
240    if (LineNumberFlag) {
241        // Internally line numbers are counted from 0.  For display, adjust
242        // the line number so that lines are numbered from 1.
243        if (InitialTabFlag) {
244            *mResultStr << lineNum+1 << "\t:";
245        }
246        else {
247            *mResultStr << lineNum+1 << ":";
248        }
249    }
250    size_t bytes = line_end - line_start + 1;
251    mResultStr->write(line_start, bytes);
252    mLineCount++;
253    unsigned last_byte = *line_end;
254    mTerminated = (last_byte >= 0x0A) && (last_byte <= 0x0D);
255    if (LLVM_UNLIKELY(!mTerminated)) {
256        if (last_byte == 0x85) {  //  Possible NEL terminator.
257            mTerminated = (bytes >= 2) && (static_cast<unsigned>(line_end[-1]) == 0xC2);
258        }
259        else {
260            // Possible LS or PS terminators.
261            mTerminated = (bytes >= 3) && (static_cast<unsigned>(line_end[-2]) == 0xE2)
262                                       && (static_cast<unsigned>(line_end[-1]) == 0x80)
263                                       && ((last_byte == 0xA8) || (last_byte == 0xA9));
264        }
265    }
266}
267
268void EmitMatch::finalize_match(char * buffer_end) {
269    if (!mTerminated) *mResultStr << "\n";
270}
271
272void EmitMatchesEngine::grepCodeGen(std::vector<re::RE *> REs) {
273    assert (mGrepDriver == nullptr);
274    mGrepDriver = new ParabixDriver("engine");
275    auto & idb = mGrepDriver->getBuilder();
276    Module * M = idb->getModule();
277   
278    const auto segmentSize = codegen::SegmentSize;
279    const auto bufferSegments = codegen::BufferSegments * codegen::ThreadNum;
280    const unsigned encodingBits = 8;
281   
282    Function * mainFunc = cast<Function>(M->getOrInsertFunction("Main", idb->getInt64Ty(), idb->getInt32Ty(), idb->getIntAddrTy(), nullptr));
283    mainFunc->setCallingConv(CallingConv::C);
284    idb->SetInsertPoint(BasicBlock::Create(M->getContext(), "entry", mainFunc, 0));
285    auto args = mainFunc->arg_begin();
286   
287    Value * const fileDescriptor = &*(args++);
288    fileDescriptor->setName("fileDescriptor");
289    Value * match_accumulator = &*(args++);
290    match_accumulator->setName("match_accumulator");
291   
292    StreamSetBuffer * ByteStream = mGrepDriver->addBuffer<SourceBuffer>(idb, idb->getStreamSetTy(1, encodingBits));
293    kernel::Kernel * sourceK = mGrepDriver->addKernelInstance<kernel::FDSourceKernel>(idb, segmentSize * bufferSegments);
294    sourceK->setInitialArguments({fileDescriptor});
295    mGrepDriver->makeKernelCall(sourceK, {}, {ByteStream});
296   
297    StreamSetBuffer * LineBreakStream;
298    StreamSetBuffer * Matches;
299    std::tie(LineBreakStream, Matches) = grepPipeline(REs, ByteStream);
300   
301    kernel::Kernel * scanMatchK = mGrepDriver->addKernelInstance<kernel::ScanMatchKernel>(idb);
302    scanMatchK->setInitialArguments({match_accumulator});
303    mGrepDriver->makeKernelCall(scanMatchK, {Matches, LineBreakStream, ByteStream}, {});
304    mGrepDriver->LinkFunction(*scanMatchK, "accumulate_match_wrapper", &accumulate_match_wrapper);
305    mGrepDriver->LinkFunction(*scanMatchK, "finalize_match_wrapper", &finalize_match_wrapper);
306   
307    mGrepDriver->generatePipelineIR();
308    mGrepDriver->deallocateBuffers();
309    idb->CreateRet(idb->getInt64(0));
310    mGrepDriver->finalizeObject();
311}
312
313
314//
315//  The doGrep methods apply a GrepEngine to a single file, processing the results
316//  differently based on the engine type.
317   
318uint64_t GrepEngine::doGrep(const std::string & fileName, const uint32_t fileIdx) {
319    typedef uint64_t (*GrepFunctionType)(int32_t fileDescriptor);
320    auto f = reinterpret_cast<GrepFunctionType>(mGrepDriver->getMain());
321   
322    int32_t fileDescriptor = openFile(fileName, mResultStrs[fileIdx].get());
323    if (fileDescriptor == -1) return 0;
324   
325    uint64_t grepResult = f(fileDescriptor);
326    close(fileDescriptor);
327    return grepResult;
328}
329
330uint64_t CountOnlyEngine::doGrep(const std::string & fileName, const uint32_t fileIdx) {
331    uint64_t grepResult = GrepEngine::doGrep(fileName, fileIdx);
332    if (WithFilenameFlag) *mResultStrs[fileIdx] << linePrefix(fileName);
333    *mResultStrs[fileIdx] << grepResult << "\n";
334    return grepResult;
335}
336
337std::string GrepEngine::linePrefix(std::string fileName) {
338    if (fileName == "-") {
339        return LabelFlag + mFileSuffix;
340    }
341    else {
342        return fileName + mFileSuffix;
343    }
344}
345   
346uint64_t MatchOnlyEngine::doGrep(const std::string & fileName, const uint32_t fileIdx) {
347    uint64_t grepResult = GrepEngine::doGrep(fileName, fileIdx);
348    if (grepResult == mRequiredCount) {
349       *mResultStrs[fileIdx] << linePrefix(fileName);
350    }
351    return grepResult;
352}
353
354uint64_t EmitMatchesEngine::doGrep(const std::string & fileName, const uint32_t fileIdx) {
355    typedef uint64_t (*GrepFunctionType)(int32_t fileDescriptor, intptr_t accum_addr);
356    auto f = reinterpret_cast<GrepFunctionType>(mGrepDriver->getMain());
357   
358    int32_t fileDescriptor = openFile(fileName, mResultStrs[fileIdx].get());
359    if (fileDescriptor == -1) return 0;
360    EmitMatch accum(linePrefix(fileName), mResultStrs[fileIdx].get());
361    f(fileDescriptor, reinterpret_cast<intptr_t>(&accum));
362    close(fileDescriptor);
363    if (accum.mLineCount > 0) grepMatchFound = true;
364    return accum.mLineCount;
365}
366
367// Open a file and return its file desciptor.
368int32_t GrepEngine::openFile(const std::string & fileName, std::stringstream * msgstrm) {
369    if (fileName == "-") {
370        return STDIN_FILENO;
371    }
372    else {
373        struct stat sb;
374        int32_t fileDescriptor = open(fileName.c_str(), O_RDONLY);
375        if (LLVM_UNLIKELY(fileDescriptor == -1)) {
376            if (!NoMessagesFlag) {
377                if (errno == EACCES) {
378                    *msgstrm << "icgrep: " << fileName << ": Permission denied.\n";
379                }
380                else if (errno == ENOENT) {
381                    *msgstrm << "icgrep: " << fileName << ": No such file.\n";
382                }
383                else {
384                    *msgstrm << "icgrep: " << fileName << ": Failed.\n";
385                }
386            }
387            return fileDescriptor;
388        }
389        if (stat(fileName.c_str(), &sb) == 0 && S_ISDIR(sb.st_mode)) {
390            if (!NoMessagesFlag) {
391                *msgstrm << "icgrep: " << fileName << ": Is a directory.\n";
392            }
393            close(fileDescriptor);
394            return -1;
395        }
396        return fileDescriptor;
397    }
398}
399
400// The process of searching a group of files may use a sequential or a task
401// parallel approach.
402   
403void * DoGrepThreadFunction(void *args) {
404    return reinterpret_cast<GrepEngine *>(args)->DoGrepThreadMethod();
405}
406
407bool GrepEngine::searchAllFiles() {
408    const unsigned numOfThreads = Threads; // <- convert the command line value into an integer to allow stack allocation
409    pthread_t threads[numOfThreads];
410   
411    for(unsigned long i = 1; i < numOfThreads; ++i) {
412        const int rc = pthread_create(&threads[i], nullptr, DoGrepThreadFunction, (void *)this);
413        if (rc) {
414            llvm::report_fatal_error("Failed to create thread: code " + std::to_string(rc));
415        }
416    }
417    // Main thread also does the work;
418   
419    DoGrepThreadMethod();
420    for(unsigned i = 1; i < numOfThreads; ++i) {
421        void * status = nullptr;
422        const int rc = pthread_join(threads[i], &status);
423        if (rc) {
424            llvm::report_fatal_error("Failed to join thread: code " + std::to_string(rc));
425        }
426    }
427    return grepMatchFound;
428}
429
430
431// DoGrep thread function.
432void * GrepEngine::DoGrepThreadMethod() {
433
434    auto fileIdx = mNextFileToGrep++;
435    while (fileIdx < inputFiles.size()) {
436        const size_t grepResult = doGrep(inputFiles[fileIdx], fileIdx);
437        mFileStatus[fileIdx] = FileStatus::GrepComplete;
438        if (grepResult > 0) {
439            grepMatchFound = true;
440        }
441        if (QuietMode && grepMatchFound) {
442            if (pthread_self() != mEngineThread) {
443                pthread_exit(nullptr);
444            }
445            return nullptr;
446        }
447        fileIdx = mNextFileToGrep++;
448    }
449
450    auto printIdx = mNextFileToPrint++;
451    while (printIdx < inputFiles.size()) {
452        const bool readyToPrint = ((printIdx == 0) || (mFileStatus[printIdx - 1] == FileStatus::PrintComplete)) && (mFileStatus[printIdx] == FileStatus::GrepComplete);
453        if (readyToPrint) {
454            const auto output = mResultStrs[printIdx]->str();
455            if (!output.empty()) {
456                mWriteMutex.lock();
457                std::cout << output;
458                mWriteMutex.unlock();
459            }
460            mFileStatus[printIdx] = FileStatus::PrintComplete;
461            printIdx = mNextFileToPrint++;
462        } else {
463            mCacheMutex.lock();
464            mGrepDriver->performIncrementalCacheCleanupStep();
465            mCacheMutex.unlock();
466        }
467        pthread_yield();
468    }
469
470    if (pthread_self() != mEngineThread) {
471        pthread_exit(nullptr);
472    } else {
473        return nullptr;
474    }
475}
476
477}
478
Note: See TracBrowser for help on using the repository browser.