source: icGREP/icgrep-devel/icgrep/kernels/source_kernel.cpp @ 5864

Last change on this file since 5864 was 5856, checked in by nmedfort, 17 months ago

Revised pipeline structure to better control I/O rates

File size: 16.3 KB
Line 
1/*
2 *  Copyright (c) 2017 International Characters.
3 *  This software is licensed to the public under the Open Software License 3.0.
4 */
5#include "source_kernel.h"
6#include <kernels/kernel_builder.h>
7#include <kernels/streamset.h>
8#include <llvm/IR/Module.h>
9#include <sys/stat.h>
10#include <fcntl.h>
11#include <toolchain/toolchain.h>
12
13using namespace llvm;
14
15uint64_t file_size(const uint32_t fd) {
16    struct stat st;
17    if (LLVM_UNLIKELY(fstat(fd, &st) != 0)) {
18        st.st_size = 0;
19    }
20    return st.st_size;
21}
22
23namespace kernel {
24
25/// MMAP SOURCE KERNEL
26
27Function * MMapSourceKernel::linkFileSizeMethod(const std::unique_ptr<kernel::KernelBuilder> & kb) {
28    return kb->LinkFunction("file_size", &file_size);
29}
30
31void MMapSourceKernel::generateInitializeMethod(Function * const fileSizeMethod, const unsigned codeUnitWidth, const std::unique_ptr<KernelBuilder> & kb) {
32
33    BasicBlock * const emptyFile = kb->CreateBasicBlock("emptyFile");
34    BasicBlock * const nonEmptyFile = kb->CreateBasicBlock("NonEmptyFile");
35    BasicBlock * const exit = kb->CreateBasicBlock("Exit");
36    IntegerType * const sizeTy = kb->getSizeTy();
37    Value * const fd = kb->getScalarField("fileDescriptor");
38    assert (fileSizeMethod);
39    Value * fileSize = kb->CreateZExtOrTrunc(kb->CreateCall(fileSizeMethod, fd), sizeTy);
40    kb->CreateLikelyCondBr(kb->CreateIsNotNull(fileSize), nonEmptyFile, emptyFile);
41
42    kb->SetInsertPoint(nonEmptyFile);
43    PointerType * const codeUnitPtrTy = kb->getIntNTy(codeUnitWidth)->getPointerTo();
44    Value * const fileBuffer = kb->CreatePointerCast(kb->CreateFileSourceMMap(fd, fileSize), codeUnitPtrTy);
45    kb->setScalarField("buffer", fileBuffer);   
46    kb->setBaseAddress("sourceBuffer", fileBuffer);   
47    kb->CreateMAdvise(fileBuffer, fileSize, CBuilder::ADVICE_WILLNEED);
48    if (codeUnitWidth > 8) {
49        fileSize = kb->CreateUDiv(fileSize, kb->getSize(codeUnitWidth / 8));
50    }
51    kb->setBufferedSize("sourceBuffer", fileSize);
52    kb->setScalarField("fileSize", fileSize);
53    kb->setProducedItemCount("sourceBuffer", fileSize);
54    kb->setCapacity("sourceBuffer", fileSize);
55    kb->CreateBr(exit);
56
57    kb->SetInsertPoint(emptyFile);
58    kb->setTerminationSignal();
59    kb->CreateBr(exit);
60
61    kb->SetInsertPoint(exit);
62}
63
64void MMapSourceKernel::generateDoSegmentMethod(const unsigned codeUnitWidth, const std::unique_ptr<KernelBuilder> & kb) {
65
66    BasicBlock * dropPages = kb->CreateBasicBlock("dropPages");
67    BasicBlock * checkRemaining = kb->CreateBasicBlock("checkRemaining");
68    BasicBlock * setTermination = kb->CreateBasicBlock("setTermination");
69    BasicBlock * exit = kb->CreateBasicBlock("mmapSourceExit");
70
71    Value * const fileSize = kb->getScalarField("fileSize");
72    Constant * const pageSize = kb->getSize(getpagesize());
73
74    Value * consumed = kb->getConsumedItemCount("sourceBuffer");
75    consumed = kb->CreateMul(consumed, kb->getSize(codeUnitWidth / 8));
76    consumed = kb->CreateAnd(consumed, ConstantExpr::getNeg(pageSize));
77    Value * const consumedBuffer = kb->getRawOutputPointer("sourceBuffer", consumed);
78    Value * const readableBuffer = kb->getScalarField("buffer");
79    Value * const unnecessaryBytes = kb->CreatePtrDiff(consumedBuffer, readableBuffer);
80
81    // avoid calling madvise unless an actual page table change could occur
82    kb->CreateLikelyCondBr(kb->CreateIsNotNull(unnecessaryBytes), dropPages, checkRemaining);
83
84    kb->SetInsertPoint(dropPages);
85    // instruct the OS that it can safely drop any fully consumed pages
86    kb->CreateMAdvise(readableBuffer, unnecessaryBytes, CBuilder::ADVICE_DONTNEED);
87    kb->setScalarField("buffer", kb->CreateGEP(readableBuffer, unnecessaryBytes));
88    kb->CreateBr(checkRemaining);
89
90    // determine whether or not we've exhausted the file buffer
91    kb->SetInsertPoint(checkRemaining);
92    Value * const remaining = kb->CreateSub(fileSize, consumed);
93    Value * const lastPage = kb->CreateICmpULE(remaining, pageSize);
94    kb->CreateUnlikelyCondBr(lastPage, setTermination, exit);
95
96    kb->SetInsertPoint(setTermination);
97    kb->setTerminationSignal();
98    kb->CreateBr(exit);
99
100    // finally, set the "produced" count to reflect current position in the file
101    kb->SetInsertPoint(exit);
102}
103
104void MMapSourceKernel::unmapSourceBuffer(const std::unique_ptr<KernelBuilder> & kb) {
105    kb->CreateMUnmap(kb->getBaseAddress("sourceBuffer"), kb->getBufferedSize("sourceBuffer"));
106}
107
108/// READ SOURCE KERNEL
109
110void ReadSourceKernel::generateInitializeMethod(const unsigned codeUnitWidth, const unsigned stride, const std::unique_ptr<KernelBuilder> & b) {
111    const unsigned pageSize = getpagesize();
112    const auto bufferSize = std::max(pageSize * 8, codegen::SegmentSize * stride  * 4);
113    ConstantInt * const bufferItems = b->getSize(bufferSize);
114    const auto codeUnitSize = codeUnitWidth / 8;
115    ConstantInt * const bufferBytes = b->getSize(bufferSize * codeUnitSize);
116    PointerType * const codeUnitPtrTy = b->getIntNTy(codeUnitWidth)->getPointerTo();
117    Value * const buffer = b->CreatePointerCast(b->CreateCacheAlignedMalloc(bufferBytes), codeUnitPtrTy);
118    b->setBaseAddress("sourceBuffer", buffer);
119    b->setScalarField("buffer", buffer);
120    b->setCapacity("sourceBuffer", bufferItems);
121}
122
123void ReadSourceKernel::generateDoSegmentMethod(const unsigned codeUnitWidth, const unsigned stride, const std::unique_ptr<KernelBuilder> & b) {
124
125    const unsigned pageSize = getpagesize();
126    ConstantInt * const itemsToRead = b->getSize(std::max(pageSize, codegen::SegmentSize * stride * 2));
127    ConstantInt * const codeUnitBytes = b->getSize(codeUnitWidth / 8);
128    ConstantInt * const itemsPerSegment = b->getSize(codegen::SegmentSize * stride);
129
130    BasicBlock * const entry = b->GetInsertBlock();
131    BasicBlock * const checkData = b->CreateBasicBlock("CheckData");
132    BasicBlock * const moveData = b->CreateBasicBlock("MoveData");
133    BasicBlock * const prepareBuffer = b->CreateBasicBlock("PrepareBuffer");
134    BasicBlock * const readData = b->CreateBasicBlock("ReadData");
135    BasicBlock * const setTermination = b->CreateBasicBlock("SetTermination");
136    BasicBlock * const readExit = b->CreateBasicBlock("ReadExit");
137
138    // Do we have enough unread data to support one segment?
139    Value * const produced = b->getProducedItemCount("sourceBuffer");
140    Value * const buffered = b->getBufferedSize("sourceBuffer");
141    Value * const itemsPending = b->CreateAdd(produced, itemsPerSegment);
142
143    b->CreateLikelyCondBr(b->CreateICmpULT(itemsPending, buffered), readExit, checkData);
144
145    // Can we append to our existing buffer without impacting any subsequent kernel?
146    b->SetInsertPoint(checkData);
147    Value * const capacity = b->getCapacity("sourceBuffer");
148    Value * const readEnd = b->getRawOutputPointer("sourceBuffer", b->CreateAdd(buffered, itemsToRead));
149    Value * const baseBuffer = b->getScalarField("buffer");
150    Value * const bufferLimit = b->CreateGEP(baseBuffer, capacity);
151    b->CreateLikelyCondBr(b->CreateICmpULE(readEnd, bufferLimit), readData, moveData);
152
153    // First wait on any consumers to finish processing then check how much data has been consumed.
154    b->SetInsertPoint(moveData);
155    b->CreateConsumerWait();
156
157    // Then determine how much data has been consumed and how much needs to be copied back, noting
158    // that our "unproduced" data must be block aligned.
159    BasicBlock * const copyBack = b->CreateBasicBlock("CopyBack");
160    BasicBlock * const expandAndCopyBack = b->CreateBasicBlock("ExpandAndCopyBack");
161
162    const auto blockSize = b->getBitBlockWidth() / 8;
163    Constant * const blockSizeAlignmentMask = ConstantExpr::getNeg(b->getSize(blockSize));
164    Value * const consumed = b->getConsumedItemCount("sourceBuffer");
165    Value * const offset = b->CreateAnd(consumed, blockSizeAlignmentMask);
166    Value * const unreadData = b->getRawOutputPointer("sourceBuffer", offset);
167    Value * const remainingItems = b->CreateSub(buffered, offset);
168    Value * const remainingBytes = b->CreateMul(remainingItems, codeUnitBytes);
169
170    // Have we consumed enough data that we can safely copy back the unconsumed data without needing a temporary buffer?
171    Value * const canCopy = b->CreateICmpULT(b->CreateGEP(baseBuffer, remainingItems), b->getRawOutputPointer("sourceBuffer", offset));
172    b->CreateLikelyCondBr(canCopy, copyBack, expandAndCopyBack);
173
174    // If so, just copy the data ...
175    b->SetInsertPoint(copyBack);
176    b->CreateMemCpy(baseBuffer, unreadData, remainingBytes, blockSize);
177    b->CreateBr(prepareBuffer);
178
179    // Otherwise, allocate a buffer with twice the capacity and copy the unconsumed data back into it
180    b->SetInsertPoint(expandAndCopyBack);
181    Value * const expandedCapacity = b->CreateShl(capacity, 1);
182    Value * const expandedBytes = b->CreateMul(expandedCapacity, codeUnitBytes);
183    Value * const expandedBuffer = b->CreatePointerCast(b->CreateCacheAlignedMalloc(expandedBytes), unreadData->getType());
184    b->CreateMemCpy(expandedBuffer, unreadData, remainingBytes, blockSize);
185    b->setScalarField("buffer", expandedBuffer);
186    b->setCapacity("sourceBuffer", expandedCapacity);
187    b->CreateFree(baseBuffer);
188    b->CreateBr(prepareBuffer);
189
190    b->SetInsertPoint(prepareBuffer);
191    PHINode * newBaseBuffer = b->CreatePHI(baseBuffer->getType(), 2);
192    newBaseBuffer->addIncoming(baseBuffer, copyBack);
193    newBaseBuffer->addIncoming(expandedBuffer, expandAndCopyBack);
194    b->setBaseAddress("sourceBuffer", b->CreateGEP(newBaseBuffer, b->CreateNeg(offset)));
195    b->CreateBr(readData);
196
197    // Regardless of whether we're simply appending data or had to allocate a new buffer, read a new page
198    // of data into the input source buffer. If we fail to read a full page ...
199    b->SetInsertPoint(readData);
200    Value * const sourceBuffer = b->getRawOutputPointer("sourceBuffer", buffered);
201    Value * const fd = b->getScalarField("fileDescriptor");
202    Constant * const bytesToRead = ConstantExpr::getMul(itemsToRead, codeUnitBytes);
203    Value * const bytesRead = b->CreateReadCall(fd, sourceBuffer, bytesToRead);
204    Value * const itemsRead = b->CreateUDiv(bytesRead, codeUnitBytes);
205    Value * const itemsBuffered = b->CreateAdd(buffered, itemsRead);
206    b->setBufferedSize("sourceBuffer", itemsBuffered);
207    b->CreateUnlikelyCondBr(b->CreateICmpULT(itemsBuffered, itemsPending), setTermination, readExit);
208
209    // ... set the termination signal.
210    b->SetInsertPoint(setTermination);
211    Value * const bytesToZero = b->CreateMul(b->CreateSub(itemsPending, itemsBuffered), codeUnitBytes);
212    b->CreateMemZero(b->getRawOutputPointer("sourceBuffer", itemsBuffered), bytesToZero);
213    b->setTerminationSignal();
214    b->CreateBr(readExit);
215
216    readExit->moveAfter(setTermination);
217    b->SetInsertPoint(readExit);
218    PHINode * const itemsProduced = b->CreatePHI(itemsPending->getType(), 3);
219    itemsProduced->addIncoming(itemsPending, entry);
220    itemsProduced->addIncoming(itemsPending, readData);
221    itemsProduced->addIncoming(itemsBuffered, setTermination);
222    b->setProducedItemCount("sourceBuffer", itemsProduced);
223}
224
225void ReadSourceKernel::freeBuffer(const std::unique_ptr<KernelBuilder> & kb) {
226    kb->CreateFree(kb->getScalarField("buffer"));
227}
228
229/// Hybrid MMap/Read source kernel
230
231void FDSourceKernel::linkExternalMethods(const std::unique_ptr<kernel::KernelBuilder> & kb) {
232    mFileSizeFunction = MMapSourceKernel::linkFileSizeMethod(kb);
233}
234
235void FDSourceKernel::generateFinalizeMethod(const std::unique_ptr<KernelBuilder> & kb) {
236    BasicBlock * finalizeRead = kb->CreateBasicBlock("finalizeRead");
237    BasicBlock * finalizeMMap = kb->CreateBasicBlock("finalizeMMap");
238    BasicBlock * finalizeDone = kb->CreateBasicBlock("finalizeDone");
239    // if the fileDescriptor is 0, the file is stdin, use readSource kernel logic, otherwise use mmap logic.
240    kb->CreateCondBr(kb->CreateICmpEQ(kb->getScalarField("fileDescriptor"), kb->getInt32(STDIN_FILENO)), finalizeRead, finalizeMMap);
241    kb->SetInsertPoint(finalizeRead);
242    ReadSourceKernel::freeBuffer(kb);
243    kb->CreateBr(finalizeDone);
244    kb->SetInsertPoint(finalizeMMap);
245    MMapSourceKernel::unmapSourceBuffer(kb);
246    kb->CreateBr(finalizeDone);
247    kb->SetInsertPoint(finalizeDone);
248}
249
250void FDSourceKernel::generateInitializeMethod(const std::unique_ptr<KernelBuilder> & kb) {
251    BasicBlock * initializeRead = kb->CreateBasicBlock("initializeRead");
252    BasicBlock * initializeMMap = kb->CreateBasicBlock("initializeMMap");
253    BasicBlock * initializeDone = kb->CreateBasicBlock("initializeDone");
254    // if the fileDescriptor is 0, the file is stdin, use readSource kernel logic, otherwise use MMap logic.
255    kb->CreateCondBr(kb->CreateICmpEQ(kb->getScalarField("fileDescriptor"), kb->getInt32(STDIN_FILENO)), initializeRead, initializeMMap);
256    kb->SetInsertPoint(initializeRead);
257    ReadSourceKernel::generateInitializeMethod(mCodeUnitWidth, getStride(), kb);
258    kb->CreateBr(initializeDone);
259    kb->SetInsertPoint(initializeMMap);
260    MMapSourceKernel::generateInitializeMethod(mFileSizeFunction, mCodeUnitWidth, kb);
261    kb->CreateBr(initializeDone);
262    kb->SetInsertPoint(initializeDone);
263}
264
265void FDSourceKernel::generateDoSegmentMethod(const std::unique_ptr<KernelBuilder> & kb) {
266    BasicBlock * DoSegmentRead = kb->CreateBasicBlock("DoSegmentRead");
267    BasicBlock * DoSegmentMMap = kb->CreateBasicBlock("DoSegmentMMap");
268    BasicBlock * DoSegmentDone = kb->CreateBasicBlock("DoSegmentDone");
269    // if the fileDescriptor is 0, the file is stdin, use readSource kernel logic, otherwise use MMap logic.
270    kb->CreateCondBr(kb->CreateICmpEQ(kb->getScalarField("fileDescriptor"), kb->getInt32(STDIN_FILENO)), DoSegmentRead, DoSegmentMMap);
271    kb->SetInsertPoint(DoSegmentRead);
272    ReadSourceKernel::generateDoSegmentMethod(mCodeUnitWidth, getStride(), kb);
273    kb->CreateBr(DoSegmentDone);
274    kb->SetInsertPoint(DoSegmentMMap);
275    MMapSourceKernel::generateDoSegmentMethod(mCodeUnitWidth, kb);
276    kb->CreateBr(DoSegmentDone);
277    kb->SetInsertPoint(DoSegmentDone);
278}
279
280
281/// MEMORY SOURCE KERNEL
282
283void MemorySourceKernel::generateInitializeMethod(const std::unique_ptr<KernelBuilder> & kb) {
284    Value * const fileSource = kb->getScalarField("fileSource");
285    kb->setBaseAddress("sourceBuffer", fileSource);
286    Value * const fileSize = kb->getScalarField("fileSize");
287    Value * const fileItems = kb->CreateUDiv(fileSize, kb->getSize(mCodeUnitWidth / 8));
288    kb->setBufferedSize("sourceBuffer", fileItems);
289    kb->setCapacity("sourceBuffer", fileItems);
290    kb->setProducedItemCount("sourceBuffer", fileItems);
291    kb->setTerminationSignal();
292}
293
294void MemorySourceKernel::generateDoSegmentMethod(const std::unique_ptr<KernelBuilder> & kb) {
295
296}
297
298MMapSourceKernel::MMapSourceKernel(const std::unique_ptr<kernel::KernelBuilder> & b, const unsigned codeUnitWidth)
299: SegmentOrientedKernel("mmap_source@" + std::to_string(codeUnitWidth),
300{},
301{Binding{b->getStreamSetTy(1, codeUnitWidth), "sourceBuffer", FixedRate(), Deferred()}},
302{Binding{b->getInt32Ty(), "fileDescriptor"}},
303{Binding{b->getSizeTy(), "fileSize"}}, {Binding{b->getIntNTy(codeUnitWidth)->getPointerTo(), "buffer"}})
304, mCodeUnitWidth(codeUnitWidth)
305, mFileSizeFunction(nullptr) {
306    addAttribute(CanTerminateEarly());
307}
308
309
310ReadSourceKernel::ReadSourceKernel(const std::unique_ptr<kernel::KernelBuilder> & b, const unsigned codeUnitWidth)
311: SegmentOrientedKernel("read_source" + std::to_string(codegen::SegmentSize) + "@" + std::to_string(codeUnitWidth)
312, {}
313, {Binding{b->getStreamSetTy(1, codeUnitWidth), "sourceBuffer", FixedRate(), Deferred()}}
314, {Binding{b->getInt32Ty(), "fileDescriptor"}}
315, {}
316, {Binding{b->getIntNTy(codeUnitWidth)->getPointerTo(), "buffer"}})
317, mCodeUnitWidth(codeUnitWidth) {
318    addAttribute(CanTerminateEarly());
319}
320
321
322FDSourceKernel::FDSourceKernel(const std::unique_ptr<kernel::KernelBuilder> & kb, const unsigned codeUnitWidth)
323: SegmentOrientedKernel("FD_source@" + std::to_string(codeUnitWidth)
324, {}
325, {Binding{kb->getStreamSetTy(1, codeUnitWidth), "sourceBuffer", FixedRate(), Deferred()}}
326, {Binding{kb->getInt32Ty(), "fileDescriptor"}}
327, {}
328, {Binding{kb->getIntNTy(codeUnitWidth)->getPointerTo(), "buffer"}, Binding{kb->getSizeTy(), "fileSize"}})
329, mCodeUnitWidth(codeUnitWidth)
330, mFileSizeFunction(nullptr) {
331    addAttribute(CanTerminateEarly());
332}
333
334MemorySourceKernel::MemorySourceKernel(const std::unique_ptr<kernel::KernelBuilder> & kb, Type * const type, const unsigned codeUnitWidth)
335: SegmentOrientedKernel("memory_source",
336    {},
337    {Binding{kb->getStreamSetTy(1, codeUnitWidth), "sourceBuffer"}},
338    {Binding{cast<PointerType>(type), "fileSource"}, Binding{kb->getSizeTy(), "fileSize"}}, {}, {})
339, mCodeUnitWidth(codeUnitWidth) {
340    addAttribute(CanTerminateEarly());
341}
342
343}
Note: See TracBrowser for help on using the repository browser.