1 | /* |
---|
2 | * Copyright (c) 2016 International Characters. |
---|
3 | * This software is licensed to the public under the Open Software License 3.0. |
---|
4 | */ |
---|
5 | |
---|
6 | #include "pipeline.h" |
---|
7 | #include <toolchain/toolchain.h> |
---|
8 | #include <kernels/kernel.h> |
---|
9 | #include <kernels/streamset.h> |
---|
10 | #include <llvm/IR/Module.h> |
---|
11 | #include <boost/container/flat_set.hpp> |
---|
12 | #include <boost/container/flat_map.hpp> |
---|
13 | #include <kernels/kernel_builder.h> |
---|
14 | |
---|
15 | using namespace kernel; |
---|
16 | using namespace parabix; |
---|
17 | using namespace llvm; |
---|
18 | |
---|
19 | using Port = Kernel::Port; |
---|
20 | |
---|
21 | template <typename Value> |
---|
22 | using StreamSetBufferMap = boost::container::flat_map<const StreamSetBuffer *, Value>; |
---|
23 | |
---|
24 | template <typename Value> |
---|
25 | using FlatSet = boost::container::flat_set<Value>; |
---|
26 | |
---|
27 | Function * makeThreadFunction(const std::unique_ptr<kernel::KernelBuilder> & b, const std::string & name) { |
---|
28 | Function * const f = Function::Create(FunctionType::get(b->getVoidTy(), {b->getVoidPtrTy()}, false), Function::InternalLinkage, name, b->getModule()); |
---|
29 | f->setCallingConv(CallingConv::C); |
---|
30 | f->arg_begin()->setName("input"); |
---|
31 | return f; |
---|
32 | } |
---|
33 | |
---|
34 | void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * kernel); |
---|
35 | |
---|
36 | /** ------------------------------------------------------------------------------------------------------------- * |
---|
37 | * @brief generateSegmentParallelPipeline |
---|
38 | * |
---|
39 | * Given a computation expressed as a logical pipeline of K kernels k0, k_1, ...k_(K-1) |
---|
40 | * operating over an input stream set S, a segment-parallel implementation divides the input |
---|
41 | * into segments and coordinates a set of T <= K threads to each process one segment at a time. |
---|
42 | * Let S_0, S_1, ... S_N be the segments of S. Segments are assigned to threads in a round-robin |
---|
43 | * fashion such that processing of segment S_i by the full pipeline is carried out by thread i mod T. |
---|
44 | ** ------------------------------------------------------------------------------------------------------------- */ |
---|
45 | void generateSegmentParallelPipeline(const std::unique_ptr<KernelBuilder> & iBuilder, const std::vector<Kernel *> & kernels) { |
---|
46 | |
---|
47 | const unsigned n = kernels.size(); |
---|
48 | Module * const m = iBuilder->getModule(); |
---|
49 | IntegerType * const sizeTy = iBuilder->getSizeTy(); |
---|
50 | PointerType * const voidPtrTy = iBuilder->getVoidPtrTy(); |
---|
51 | Constant * nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy); |
---|
52 | std::vector<Type *> structTypes; |
---|
53 | |
---|
54 | codegen::BufferSegments = std::max(codegen::BufferSegments, codegen::ThreadNum); |
---|
55 | |
---|
56 | Value * instance[n]; |
---|
57 | for (unsigned i = 0; i < n; ++i) { |
---|
58 | instance[i] = kernels[i]->getInstance(); |
---|
59 | structTypes.push_back(instance[i]->getType()); |
---|
60 | } |
---|
61 | StructType * const sharedStructType = StructType::get(m->getContext(), structTypes); |
---|
62 | StructType * const threadStructType = StructType::get(sharedStructType->getPointerTo(), sizeTy, nullptr); |
---|
63 | |
---|
64 | Function * const threadFunc = makeThreadFunction(iBuilder, "segment"); |
---|
65 | |
---|
66 | // ------------------------------------------------------------------------------------------------------------------------- |
---|
67 | // MAKE SEGMENT PARALLEL PIPELINE THREAD |
---|
68 | // ------------------------------------------------------------------------------------------------------------------------- |
---|
69 | const auto ip = iBuilder->saveIP(); |
---|
70 | |
---|
71 | // Create the basic blocks for the thread function. |
---|
72 | BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc); |
---|
73 | iBuilder->SetInsertPoint(entryBlock); |
---|
74 | Value * const input = &threadFunc->getArgumentList().front(); |
---|
75 | Value * const threadStruct = iBuilder->CreatePointerCast(input, threadStructType->getPointerTo()); |
---|
76 | Value * const sharedStatePtr = iBuilder->CreateLoad(iBuilder->CreateGEP(threadStruct, {iBuilder->getInt32(0), iBuilder->getInt32(0)})); |
---|
77 | for (unsigned k = 0; k < n; ++k) { |
---|
78 | Value * ptr = iBuilder->CreateLoad(iBuilder->CreateGEP(sharedStatePtr, {iBuilder->getInt32(0), iBuilder->getInt32(k)})); |
---|
79 | kernels[k]->setInstance(ptr); |
---|
80 | } |
---|
81 | Value * const segOffset = iBuilder->CreateLoad(iBuilder->CreateGEP(threadStruct, {iBuilder->getInt32(0), iBuilder->getInt32(1)})); |
---|
82 | |
---|
83 | BasicBlock * segmentLoop = BasicBlock::Create(iBuilder->getContext(), "segmentLoop", threadFunc); |
---|
84 | iBuilder->CreateBr(segmentLoop); |
---|
85 | |
---|
86 | iBuilder->SetInsertPoint(segmentLoop); |
---|
87 | PHINode * const segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 2, "segNo"); |
---|
88 | segNo->addIncoming(segOffset, entryBlock); |
---|
89 | |
---|
90 | Value * terminated = iBuilder->getFalse(); |
---|
91 | Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1)); |
---|
92 | |
---|
93 | BasicBlock * segmentLoopBody = nullptr; |
---|
94 | BasicBlock * const exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc); |
---|
95 | |
---|
96 | StreamSetBufferMap<Value *> producedPos; |
---|
97 | StreamSetBufferMap<Value *> consumedPos; |
---|
98 | |
---|
99 | Value * cycleCountStart = nullptr; |
---|
100 | Value * cycleCountEnd = nullptr; |
---|
101 | if (DebugOptionIsSet(codegen::EnableCycleCounter)) { |
---|
102 | cycleCountStart = iBuilder->CreateReadCycleCounter(); |
---|
103 | } |
---|
104 | |
---|
105 | for (unsigned k = 0; k < n; ++k) { |
---|
106 | |
---|
107 | const auto & kernel = kernels[k]; |
---|
108 | |
---|
109 | BasicBlock * const segmentWait = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Wait", threadFunc); |
---|
110 | |
---|
111 | BasicBlock * segmentYield = segmentWait; |
---|
112 | iBuilder->CreateBr(segmentWait); |
---|
113 | |
---|
114 | segmentLoopBody = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Do", threadFunc); |
---|
115 | |
---|
116 | iBuilder->SetInsertPoint(segmentWait); |
---|
117 | const unsigned waitIdx = codegen::DebugOptionIsSet(codegen::SerializeThreads) ? (n - 1) : k; |
---|
118 | |
---|
119 | iBuilder->setKernel(kernels[waitIdx]); |
---|
120 | Value * const processedSegmentCount = iBuilder->acquireLogicalSegmentNo(); |
---|
121 | iBuilder->setKernel(kernel); |
---|
122 | |
---|
123 | assert (processedSegmentCount->getType() == segNo->getType()); |
---|
124 | Value * const ready = iBuilder->CreateICmpEQ(segNo, processedSegmentCount); |
---|
125 | |
---|
126 | if (kernel->hasNoTerminateAttribute()) { |
---|
127 | iBuilder->CreateCondBr(ready, segmentLoopBody, segmentYield); |
---|
128 | } else { // If the kernel was terminated in a previous segment then the pipeline is done. |
---|
129 | BasicBlock * completionTest = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Completed", threadFunc, 0); |
---|
130 | BasicBlock * exitBlock = BasicBlock::Create(iBuilder->getContext(), kernel->getName() + "Exit", threadFunc, 0); |
---|
131 | iBuilder->CreateCondBr(ready, completionTest, segmentYield); |
---|
132 | |
---|
133 | iBuilder->SetInsertPoint(completionTest); |
---|
134 | Value * terminationSignal = iBuilder->getTerminationSignal(); |
---|
135 | iBuilder->CreateCondBr(terminationSignal, exitBlock, segmentLoopBody); |
---|
136 | iBuilder->SetInsertPoint(exitBlock); |
---|
137 | // Ensure that the next thread will also exit. |
---|
138 | iBuilder->releaseLogicalSegmentNo(nextSegNo); |
---|
139 | iBuilder->CreateBr(exitThreadBlock); |
---|
140 | } |
---|
141 | |
---|
142 | // Execute the kernel segment |
---|
143 | iBuilder->SetInsertPoint(segmentLoopBody); |
---|
144 | const auto & inputs = kernel->getStreamInputs(); |
---|
145 | std::vector<Value *> args = {kernel->getInstance(), terminated}; |
---|
146 | for (unsigned i = 0; i < inputs.size(); ++i) { |
---|
147 | const auto f = producedPos.find(kernel->getStreamSetInputBuffer(i)); |
---|
148 | assert (f != producedPos.end()); |
---|
149 | args.push_back(f->second); |
---|
150 | } |
---|
151 | |
---|
152 | iBuilder->setKernel(kernel); |
---|
153 | iBuilder->createDoSegmentCall(args); |
---|
154 | if (!kernel->hasNoTerminateAttribute()) { |
---|
155 | terminated = iBuilder->CreateOr(terminated, iBuilder->getTerminationSignal()); |
---|
156 | } |
---|
157 | |
---|
158 | const auto & outputs = kernel->getStreamOutputs(); |
---|
159 | for (unsigned i = 0; i < outputs.size(); ++i) { |
---|
160 | Value * const produced = iBuilder->getProducedItemCount(outputs[i].getName()); // terminated |
---|
161 | const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i); |
---|
162 | assert (producedPos.count(buf) == 0); |
---|
163 | producedPos.emplace(buf, produced); |
---|
164 | } |
---|
165 | for (unsigned i = 0; i < inputs.size(); ++i) { |
---|
166 | Value * const processedItemCount = iBuilder->getProcessedItemCount(inputs[i].getName()); |
---|
167 | const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i); |
---|
168 | auto f = consumedPos.find(buf); |
---|
169 | if (f == consumedPos.end()) { |
---|
170 | consumedPos.emplace(buf, processedItemCount); |
---|
171 | } else { |
---|
172 | Value * lesser = iBuilder->CreateICmpULT(processedItemCount, f->second); |
---|
173 | f->second = iBuilder->CreateSelect(lesser, processedItemCount, f->second); |
---|
174 | } |
---|
175 | } |
---|
176 | if (DebugOptionIsSet(codegen::EnableCycleCounter)) { |
---|
177 | cycleCountEnd = iBuilder->CreateReadCycleCounter(); |
---|
178 | Value * counterPtr = iBuilder->getCycleCountPtr(); |
---|
179 | iBuilder->CreateStore(iBuilder->CreateAdd(iBuilder->CreateLoad(counterPtr), iBuilder->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr); |
---|
180 | cycleCountStart = cycleCountEnd; |
---|
181 | } |
---|
182 | |
---|
183 | iBuilder->releaseLogicalSegmentNo(nextSegNo); |
---|
184 | } |
---|
185 | |
---|
186 | assert (segmentLoopBody); |
---|
187 | exitThreadBlock->moveAfter(segmentLoopBody); |
---|
188 | |
---|
189 | for (const auto consumed : consumedPos) { |
---|
190 | const StreamSetBuffer * const buf = consumed.first; |
---|
191 | Kernel * const k = buf->getProducer(); |
---|
192 | const auto & outputs = k->getStreamSetOutputBuffers(); |
---|
193 | for (unsigned i = 0; i < outputs.size(); ++i) { |
---|
194 | if (outputs[i] == buf) { |
---|
195 | const auto binding = k->getStreamOutput(i); |
---|
196 | if (LLVM_UNLIKELY(binding.getRate().isDerived())) { |
---|
197 | continue; |
---|
198 | } |
---|
199 | iBuilder->setKernel(k); |
---|
200 | iBuilder->setConsumedItemCount(binding.getName(), consumed.second); |
---|
201 | break; |
---|
202 | } |
---|
203 | } |
---|
204 | } |
---|
205 | |
---|
206 | segNo->addIncoming(iBuilder->CreateAdd(segNo, iBuilder->getSize(codegen::ThreadNum)), segmentLoopBody); |
---|
207 | iBuilder->CreateCondBr(terminated, exitThreadBlock, segmentLoop); |
---|
208 | |
---|
209 | iBuilder->SetInsertPoint(exitThreadBlock); |
---|
210 | |
---|
211 | // only call pthread_exit() within spawned threads; otherwise it'll be equivalent to calling exit() within the process |
---|
212 | BasicBlock * const exitThread = BasicBlock::Create(iBuilder->getContext(), "ExitThread", threadFunc); |
---|
213 | BasicBlock * const exitFunction = BasicBlock::Create(iBuilder->getContext(), "ExitProcessFunction", threadFunc); |
---|
214 | |
---|
215 | Value * const exitCond = iBuilder->CreateICmpEQ(segOffset, ConstantInt::getNullValue(segOffset->getType())); |
---|
216 | iBuilder->CreateCondBr(exitCond, exitFunction, exitThread); |
---|
217 | iBuilder->SetInsertPoint(exitThread); |
---|
218 | iBuilder->CreatePThreadExitCall(nullVoidPtrVal); |
---|
219 | iBuilder->CreateBr(exitFunction); |
---|
220 | iBuilder->SetInsertPoint(exitFunction); |
---|
221 | iBuilder->CreateRetVoid(); |
---|
222 | |
---|
223 | // ------------------------------------------------------------------------------------------------------------------------- |
---|
224 | iBuilder->restoreIP(ip); |
---|
225 | |
---|
226 | for (unsigned i = 0; i < n; ++i) { |
---|
227 | kernels[i]->setInstance(instance[i]); |
---|
228 | } |
---|
229 | |
---|
230 | // ------------------------------------------------------------------------------------------------------------------------- |
---|
231 | // MAKE SEGMENT PARALLEL PIPELINE DRIVER |
---|
232 | // ------------------------------------------------------------------------------------------------------------------------- |
---|
233 | const unsigned threads = codegen::ThreadNum - 1; |
---|
234 | assert (codegen::ThreadNum > 1); |
---|
235 | Type * const pthreadsTy = ArrayType::get(sizeTy, threads); |
---|
236 | AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy); |
---|
237 | Value * threadIdPtr[threads]; |
---|
238 | |
---|
239 | for (unsigned i = 0; i < threads; ++i) { |
---|
240 | threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)}); |
---|
241 | } |
---|
242 | |
---|
243 | for (unsigned i = 0; i < n; ++i) { |
---|
244 | iBuilder->setKernel(kernels[i]); |
---|
245 | iBuilder->releaseLogicalSegmentNo(iBuilder->getSize(0)); |
---|
246 | } |
---|
247 | |
---|
248 | AllocaInst * const sharedStruct = iBuilder->CreateCacheAlignedAlloca(sharedStructType); |
---|
249 | for (unsigned i = 0; i < n; ++i) { |
---|
250 | Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)}); |
---|
251 | iBuilder->CreateStore(kernels[i]->getInstance(), ptr); |
---|
252 | } |
---|
253 | |
---|
254 | // use the process thread to handle the initial segment function after spawning (n - 1) threads to handle the subsequent offsets |
---|
255 | for (unsigned i = 0; i < threads; ++i) { |
---|
256 | AllocaInst * const threadState = iBuilder->CreateAlloca(threadStructType); |
---|
257 | iBuilder->CreateStore(sharedStruct, iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(0)})); |
---|
258 | iBuilder->CreateStore(iBuilder->getSize(i + 1), iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(1)})); |
---|
259 | iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, threadFunc, threadState); |
---|
260 | } |
---|
261 | |
---|
262 | AllocaInst * const threadState = iBuilder->CreateAlloca(threadStructType); |
---|
263 | iBuilder->CreateStore(sharedStruct, iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(0)})); |
---|
264 | iBuilder->CreateStore(iBuilder->getSize(0), iBuilder->CreateGEP(threadState, {iBuilder->getInt32(0), iBuilder->getInt32(1)})); |
---|
265 | iBuilder->CreateCall(threadFunc, iBuilder->CreatePointerCast(threadState, voidPtrTy)); |
---|
266 | |
---|
267 | AllocaInst * const status = iBuilder->CreateAlloca(voidPtrTy); |
---|
268 | for (unsigned i = 0; i < threads; ++i) { |
---|
269 | Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]); |
---|
270 | iBuilder->CreatePThreadJoinCall(threadId, status); |
---|
271 | } |
---|
272 | |
---|
273 | if (DebugOptionIsSet(codegen::EnableCycleCounter)) { |
---|
274 | for (unsigned k = 0; k < kernels.size(); k++) { |
---|
275 | auto & kernel = kernels[k]; |
---|
276 | iBuilder->setKernel(kernel); |
---|
277 | const auto & inputs = kernel->getStreamInputs(); |
---|
278 | const auto & outputs = kernel->getStreamOutputs(); |
---|
279 | Value * items = nullptr; |
---|
280 | if (inputs.empty()) { |
---|
281 | items = iBuilder->getProducedItemCount(outputs[0].getName()); |
---|
282 | } else { |
---|
283 | items = iBuilder->getProcessedItemCount(inputs[0].getName()); |
---|
284 | } |
---|
285 | Value * fItems = iBuilder->CreateUIToFP(items, iBuilder->getDoubleTy()); |
---|
286 | Value * cycles = iBuilder->CreateLoad(iBuilder->getCycleCountPtr()); |
---|
287 | Value * fCycles = iBuilder->CreateUIToFP(cycles, iBuilder->getDoubleTy()); |
---|
288 | std::string formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles, %6.2f cycles per item.\n"; |
---|
289 | Value * stringPtr = iBuilder->CreatePointerCast(iBuilder->GetString(formatString), iBuilder->getInt8PtrTy()); |
---|
290 | iBuilder->CreateCall(iBuilder->GetDprintf(), {iBuilder->getInt32(2), stringPtr, fItems, fCycles, iBuilder->CreateFDiv(fCycles, fItems)}); |
---|
291 | } |
---|
292 | } |
---|
293 | |
---|
294 | } |
---|
295 | |
---|
296 | |
---|
297 | /** ------------------------------------------------------------------------------------------------------------- * |
---|
298 | * @brief generateParallelPipeline |
---|
299 | ** ------------------------------------------------------------------------------------------------------------- */ |
---|
300 | void generateParallelPipeline(const std::unique_ptr<KernelBuilder> & iBuilder, const std::vector<Kernel *> &kernels) { |
---|
301 | |
---|
302 | Module * const m = iBuilder->getModule(); |
---|
303 | IntegerType * const sizeTy = iBuilder->getSizeTy(); |
---|
304 | PointerType * const voidPtrTy = iBuilder->getVoidPtrTy(); |
---|
305 | ConstantInt * bufferSegments = ConstantInt::get(sizeTy, codegen::BufferSegments - 1); |
---|
306 | ConstantInt * segmentItems = ConstantInt::get(sizeTy, codegen::SegmentSize * iBuilder->getBitBlockWidth()); |
---|
307 | Constant * const nullVoidPtrVal = ConstantPointerNull::getNullValue(voidPtrTy); |
---|
308 | |
---|
309 | const unsigned n = kernels.size(); |
---|
310 | |
---|
311 | Type * const pthreadsTy = ArrayType::get(sizeTy, n); |
---|
312 | AllocaInst * const pthreads = iBuilder->CreateAlloca(pthreadsTy); |
---|
313 | Value * threadIdPtr[n]; |
---|
314 | for (unsigned i = 0; i < n; ++i) { |
---|
315 | threadIdPtr[i] = iBuilder->CreateGEP(pthreads, {iBuilder->getInt32(0), iBuilder->getInt32(i)}); |
---|
316 | } |
---|
317 | |
---|
318 | Value * instance[n]; |
---|
319 | Type * structTypes[n]; |
---|
320 | for (unsigned i = 0; i < n; ++i) { |
---|
321 | instance[i] = kernels[i]->getInstance(); |
---|
322 | structTypes[i] = instance[i]->getType(); |
---|
323 | } |
---|
324 | |
---|
325 | Type * const sharedStructType = StructType::get(m->getContext(), ArrayRef<Type *>{structTypes, n}); |
---|
326 | |
---|
327 | |
---|
328 | AllocaInst * sharedStruct = iBuilder->CreateAlloca(sharedStructType); |
---|
329 | for (unsigned i = 0; i < n; ++i) { |
---|
330 | Value * ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(i)}); |
---|
331 | iBuilder->CreateStore(instance[i], ptr); |
---|
332 | } |
---|
333 | |
---|
334 | for (auto & kernel : kernels) { |
---|
335 | iBuilder->setKernel(kernel); |
---|
336 | iBuilder->releaseLogicalSegmentNo(iBuilder->getSize(0)); |
---|
337 | } |
---|
338 | |
---|
339 | // GENERATE THE PRODUCING AND CONSUMING KERNEL MAPS |
---|
340 | StreamSetBufferMap<unsigned> producingKernel; |
---|
341 | StreamSetBufferMap<std::vector<unsigned>> consumingKernels; |
---|
342 | for (unsigned id = 0; id < n; ++id) { |
---|
343 | const auto & kernel = kernels[id]; |
---|
344 | const auto & inputs = kernel->getStreamInputs(); |
---|
345 | const auto & outputs = kernel->getStreamOutputs(); |
---|
346 | // add any outputs from this kernel to the producing kernel map |
---|
347 | for (unsigned j = 0; j < outputs.size(); ++j) { |
---|
348 | const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(j); |
---|
349 | if (LLVM_UNLIKELY(producingKernel.count(buf) != 0)) { |
---|
350 | report_fatal_error(kernel->getName() + " redefines stream set " + outputs[j].getName()); |
---|
351 | } |
---|
352 | producingKernel.emplace(buf, id); |
---|
353 | } |
---|
354 | // and any inputs to the consuming kernels list |
---|
355 | for (unsigned j = 0; j < inputs.size(); ++j) { |
---|
356 | const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(j); |
---|
357 | auto f = consumingKernels.find(buf); |
---|
358 | if (f == consumingKernels.end()) { |
---|
359 | if (LLVM_UNLIKELY(producingKernel.count(buf) == 0)) { |
---|
360 | report_fatal_error(kernel->getName() + " uses stream set " + inputs[j].getName() + " prior to its definition"); |
---|
361 | } |
---|
362 | consumingKernels.emplace(buf, std::vector<unsigned>{ id }); |
---|
363 | } else { |
---|
364 | f->second.push_back(id); |
---|
365 | } |
---|
366 | } |
---|
367 | } |
---|
368 | |
---|
369 | const auto ip = iBuilder->saveIP(); |
---|
370 | |
---|
371 | // GENERATE UNIQUE PIPELINE PARALLEL THREAD FUNCTION FOR EACH KERNEL |
---|
372 | FlatSet<unsigned> kernelSet; |
---|
373 | kernelSet.reserve(n); |
---|
374 | |
---|
375 | Function * thread_functions[n]; |
---|
376 | Value * producerSegNo[n]; |
---|
377 | for (unsigned id = 0; id < n; id++) { |
---|
378 | const auto & kernel = kernels[id]; |
---|
379 | |
---|
380 | iBuilder->setKernel(kernel); |
---|
381 | |
---|
382 | const auto & inputs = kernel->getStreamInputs(); |
---|
383 | |
---|
384 | Function * const threadFunc = makeThreadFunction(iBuilder, "ppt:" + kernel->getName()); |
---|
385 | |
---|
386 | // Create the basic blocks for the thread function. |
---|
387 | BasicBlock * entryBlock = BasicBlock::Create(iBuilder->getContext(), "entry", threadFunc); |
---|
388 | BasicBlock * outputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "outputCheck", threadFunc); |
---|
389 | BasicBlock * inputCheckBlock = BasicBlock::Create(iBuilder->getContext(), "inputCheck", threadFunc); |
---|
390 | BasicBlock * doSegmentBlock = BasicBlock::Create(iBuilder->getContext(), "doSegment", threadFunc); |
---|
391 | BasicBlock * exitThreadBlock = BasicBlock::Create(iBuilder->getContext(), "exitThread", threadFunc); |
---|
392 | |
---|
393 | iBuilder->SetInsertPoint(entryBlock); |
---|
394 | |
---|
395 | Value * sharedStruct = iBuilder->CreateBitCast(&threadFunc->getArgumentList().front(), sharedStructType->getPointerTo()); |
---|
396 | |
---|
397 | for (unsigned k = 0; k < n; k++) { |
---|
398 | Value * const ptr = iBuilder->CreateGEP(sharedStruct, {iBuilder->getInt32(0), iBuilder->getInt32(k)}); |
---|
399 | kernels[k]->setInstance(iBuilder->CreateLoad(ptr)); |
---|
400 | } |
---|
401 | |
---|
402 | iBuilder->CreateBr(outputCheckBlock); |
---|
403 | |
---|
404 | // Check whether the output buffers are ready for more data |
---|
405 | iBuilder->SetInsertPoint(outputCheckBlock); |
---|
406 | PHINode * segNo = iBuilder->CreatePHI(iBuilder->getSizeTy(), 3, "segNo"); |
---|
407 | segNo->addIncoming(iBuilder->getSize(0), entryBlock); |
---|
408 | segNo->addIncoming(segNo, outputCheckBlock); |
---|
409 | |
---|
410 | Value * outputWaitCond = iBuilder->getTrue(); |
---|
411 | for (const StreamSetBuffer * buf : kernel->getStreamSetOutputBuffers()) { |
---|
412 | const auto & list = consumingKernels[buf]; |
---|
413 | assert(std::is_sorted(list.begin(), list.end())); |
---|
414 | kernelSet.insert(list.begin(), list.end()); |
---|
415 | } |
---|
416 | for (unsigned k : kernelSet) { |
---|
417 | iBuilder->setKernel(kernels[k]); |
---|
418 | Value * consumerSegNo = iBuilder->acquireLogicalSegmentNo(); |
---|
419 | assert (consumerSegNo->getType() == segNo->getType()); |
---|
420 | Value * consumedSegNo = iBuilder->CreateAdd(consumerSegNo, bufferSegments); |
---|
421 | outputWaitCond = iBuilder->CreateAnd(outputWaitCond, iBuilder->CreateICmpULE(segNo, consumedSegNo)); |
---|
422 | } |
---|
423 | kernelSet.clear(); |
---|
424 | iBuilder->setKernel(kernel); |
---|
425 | iBuilder->CreateCondBr(outputWaitCond, inputCheckBlock, outputCheckBlock); |
---|
426 | |
---|
427 | // Check whether the input buffers have enough data for this kernel to begin |
---|
428 | iBuilder->SetInsertPoint(inputCheckBlock); |
---|
429 | for (const StreamSetBuffer * buf : kernel->getStreamSetInputBuffers()) { |
---|
430 | kernelSet.insert(producingKernel[buf]); |
---|
431 | } |
---|
432 | |
---|
433 | Value * inputWaitCond = iBuilder->getTrue(); |
---|
434 | for (unsigned k : kernelSet) { |
---|
435 | iBuilder->setKernel(kernels[k]); |
---|
436 | producerSegNo[k] = iBuilder->acquireLogicalSegmentNo(); |
---|
437 | assert (producerSegNo[k]->getType() == segNo->getType()); |
---|
438 | inputWaitCond = iBuilder->CreateAnd(inputWaitCond, iBuilder->CreateICmpULT(segNo, producerSegNo[k])); |
---|
439 | } |
---|
440 | iBuilder->setKernel(kernel); |
---|
441 | iBuilder->CreateCondBr(inputWaitCond, doSegmentBlock, inputCheckBlock); |
---|
442 | |
---|
443 | // Process the segment |
---|
444 | iBuilder->SetInsertPoint(doSegmentBlock); |
---|
445 | |
---|
446 | Value * const nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1)); |
---|
447 | Value * terminated = nullptr; |
---|
448 | if (kernelSet.empty()) { |
---|
449 | // if this kernel has no input streams, the kernel itself must decide when it terminates. |
---|
450 | terminated = iBuilder->getTerminationSignal(); |
---|
451 | } else { |
---|
452 | // ... otherwise the kernel terminates only when it exhausts all of its input streams |
---|
453 | terminated = iBuilder->getTrue(); |
---|
454 | for (unsigned k : kernelSet) { |
---|
455 | iBuilder->setKernel(kernels[k]); |
---|
456 | terminated = iBuilder->CreateAnd(terminated, iBuilder->getTerminationSignal()); |
---|
457 | terminated = iBuilder->CreateAnd(terminated, iBuilder->CreateICmpEQ(nextSegNo, producerSegNo[k])); |
---|
458 | } |
---|
459 | kernelSet.clear(); |
---|
460 | iBuilder->setKernel(kernel); |
---|
461 | } |
---|
462 | |
---|
463 | std::vector<Value *> args = {kernel->getInstance(), terminated}; |
---|
464 | args.insert(args.end(), inputs.size(), iBuilder->CreateMul(segmentItems, segNo)); |
---|
465 | |
---|
466 | iBuilder->createDoSegmentCall(args); |
---|
467 | segNo->addIncoming(nextSegNo, doSegmentBlock); |
---|
468 | iBuilder->releaseLogicalSegmentNo(nextSegNo); |
---|
469 | |
---|
470 | iBuilder->CreateCondBr(terminated, exitThreadBlock, outputCheckBlock); |
---|
471 | |
---|
472 | iBuilder->SetInsertPoint(exitThreadBlock); |
---|
473 | |
---|
474 | iBuilder->CreatePThreadExitCall(nullVoidPtrVal); |
---|
475 | |
---|
476 | iBuilder->CreateRetVoid(); |
---|
477 | |
---|
478 | thread_functions[id] = threadFunc; |
---|
479 | } |
---|
480 | |
---|
481 | iBuilder->restoreIP(ip); |
---|
482 | |
---|
483 | for (unsigned i = 0; i < n; ++i) { |
---|
484 | kernels[i]->setInstance(instance[i]); |
---|
485 | } |
---|
486 | |
---|
487 | for (unsigned i = 0; i < n; ++i) { |
---|
488 | iBuilder->CreatePThreadCreateCall(threadIdPtr[i], nullVoidPtrVal, thread_functions[i], sharedStruct); |
---|
489 | } |
---|
490 | |
---|
491 | AllocaInst * const status = iBuilder->CreateAlloca(voidPtrTy); |
---|
492 | for (unsigned i = 0; i < n; ++i) { |
---|
493 | Value * threadId = iBuilder->CreateLoad(threadIdPtr[i]); |
---|
494 | iBuilder->CreatePThreadJoinCall(threadId, status); |
---|
495 | } |
---|
496 | } |
---|
497 | |
---|
498 | /** ------------------------------------------------------------------------------------------------------------- * |
---|
499 | * @brief generatePipelineLoop |
---|
500 | ** ------------------------------------------------------------------------------------------------------------- */ |
---|
501 | void generatePipelineLoop(const std::unique_ptr<KernelBuilder> & iBuilder, const std::vector<Kernel *> & kernels) { |
---|
502 | |
---|
503 | BasicBlock * entryBlock = iBuilder->GetInsertBlock(); |
---|
504 | Function * main = entryBlock->getParent(); |
---|
505 | |
---|
506 | // Create the basic blocks for the loop. |
---|
507 | BasicBlock * pipelineLoop = BasicBlock::Create(iBuilder->getContext(), "pipelineLoop", main); |
---|
508 | BasicBlock * pipelineExit = BasicBlock::Create(iBuilder->getContext(), "pipelineExit", main); |
---|
509 | |
---|
510 | StreamSetBufferMap<Value *> producedPos; |
---|
511 | StreamSetBufferMap<Value *> consumedPos; |
---|
512 | |
---|
513 | iBuilder->CreateBr(pipelineLoop); |
---|
514 | iBuilder->SetInsertPoint(pipelineLoop); |
---|
515 | |
---|
516 | Value * cycleCountStart = nullptr; |
---|
517 | Value * cycleCountEnd = nullptr; |
---|
518 | if (DebugOptionIsSet(codegen::EnableCycleCounter)) { |
---|
519 | cycleCountStart = iBuilder->CreateReadCycleCounter(); |
---|
520 | } |
---|
521 | Value * terminated = iBuilder->getFalse(); |
---|
522 | |
---|
523 | for (Kernel * const kernel : kernels) { |
---|
524 | |
---|
525 | iBuilder->setKernel(kernel); |
---|
526 | const auto & inputs = kernel->getStreamInputs(); |
---|
527 | const auto & outputs = kernel->getStreamOutputs(); |
---|
528 | |
---|
529 | std::vector<Value *> args = {kernel->getInstance(), terminated}; |
---|
530 | |
---|
531 | for (unsigned i = 0; i < inputs.size(); ++i) { |
---|
532 | const auto f = producedPos.find(kernel->getStreamSetInputBuffer(i)); |
---|
533 | if (LLVM_UNLIKELY(f == producedPos.end())) { |
---|
534 | report_fatal_error(kernel->getName() + " uses stream set " + inputs[i].getName() + " prior to its definition"); |
---|
535 | } |
---|
536 | args.push_back(f->second); |
---|
537 | } |
---|
538 | |
---|
539 | applyOutputBufferExpansions(iBuilder, kernel); |
---|
540 | |
---|
541 | iBuilder->createDoSegmentCall(args); |
---|
542 | |
---|
543 | if (!kernel->hasNoTerminateAttribute()) { |
---|
544 | Value * terminatedSignal = iBuilder->getTerminationSignal(); |
---|
545 | terminated = iBuilder->CreateOr(terminated, terminatedSignal); |
---|
546 | } |
---|
547 | for (unsigned i = 0; i < outputs.size(); ++i) { |
---|
548 | Value * const produced = iBuilder->getProducedItemCount(outputs[i].getName()); // , terminated |
---|
549 | const StreamSetBuffer * const buf = kernel->getStreamSetOutputBuffer(i); |
---|
550 | assert (producedPos.count(buf) == 0); |
---|
551 | producedPos.emplace(buf, produced); |
---|
552 | } |
---|
553 | |
---|
554 | for (unsigned i = 0; i < inputs.size(); ++i) { |
---|
555 | Value * const processed = iBuilder->getProcessedItemCount(inputs[i].getName()); |
---|
556 | const StreamSetBuffer * const buf = kernel->getStreamSetInputBuffer(i); |
---|
557 | auto f = consumedPos.find(buf); |
---|
558 | if (f == consumedPos.end()) { |
---|
559 | consumedPos.emplace(buf, processed); |
---|
560 | } else { |
---|
561 | Value * lesser = iBuilder->CreateICmpULT(processed, f->second); |
---|
562 | f->second = iBuilder->CreateSelect(lesser, processed, f->second); |
---|
563 | } |
---|
564 | } |
---|
565 | if (DebugOptionIsSet(codegen::EnableCycleCounter)) { |
---|
566 | cycleCountEnd = iBuilder->CreateReadCycleCounter(); |
---|
567 | Value * counterPtr = iBuilder->getCycleCountPtr(); |
---|
568 | iBuilder->CreateStore(iBuilder->CreateAdd(iBuilder->CreateLoad(counterPtr), iBuilder->CreateSub(cycleCountEnd, cycleCountStart)), counterPtr); |
---|
569 | cycleCountStart = cycleCountEnd; |
---|
570 | } |
---|
571 | |
---|
572 | Value * const segNo = iBuilder->acquireLogicalSegmentNo(); |
---|
573 | Value * nextSegNo = iBuilder->CreateAdd(segNo, iBuilder->getSize(1)); |
---|
574 | iBuilder->releaseLogicalSegmentNo(nextSegNo); |
---|
575 | } |
---|
576 | |
---|
577 | for (const auto consumed : consumedPos) { |
---|
578 | const StreamSetBuffer * const buf = consumed.first; |
---|
579 | Kernel * const k = buf->getProducer(); |
---|
580 | const auto & outputs = k->getStreamSetOutputBuffers(); |
---|
581 | for (unsigned i = 0; i < outputs.size(); ++i) { |
---|
582 | if (outputs[i] == buf) { |
---|
583 | const auto binding = k->getStreamOutput(i); |
---|
584 | if (LLVM_UNLIKELY(binding.getRate().isDerived())) { |
---|
585 | continue; |
---|
586 | } |
---|
587 | iBuilder->setKernel(k); |
---|
588 | iBuilder->setConsumedItemCount(binding.getName(), consumed.second); |
---|
589 | break; |
---|
590 | } |
---|
591 | } |
---|
592 | } |
---|
593 | |
---|
594 | iBuilder->CreateCondBr(terminated, pipelineExit, pipelineLoop); |
---|
595 | |
---|
596 | iBuilder->SetInsertPoint(pipelineExit); |
---|
597 | |
---|
598 | if (DebugOptionIsSet(codegen::EnableCycleCounter)) { |
---|
599 | for (unsigned k = 0; k < kernels.size(); k++) { |
---|
600 | auto & kernel = kernels[k]; |
---|
601 | iBuilder->setKernel(kernel); |
---|
602 | const auto & inputs = kernel->getStreamInputs(); |
---|
603 | const auto & outputs = kernel->getStreamOutputs(); |
---|
604 | Value * items = nullptr; |
---|
605 | if (inputs.empty()) { |
---|
606 | items = iBuilder->getProducedItemCount(outputs[0].getName()); |
---|
607 | } else { |
---|
608 | items = iBuilder->getProcessedItemCount(inputs[0].getName()); |
---|
609 | } |
---|
610 | Value * fItems = iBuilder->CreateUIToFP(items, iBuilder->getDoubleTy()); |
---|
611 | Value * cycles = iBuilder->CreateLoad(iBuilder->getCycleCountPtr()); |
---|
612 | Value * fCycles = iBuilder->CreateUIToFP(cycles, iBuilder->getDoubleTy()); |
---|
613 | std::string formatString = kernel->getName() + ": %7.2e items processed; %7.2e CPU cycles, %6.2f cycles per item.\n"; |
---|
614 | Value * stringPtr = iBuilder->CreatePointerCast(iBuilder->GetString(formatString), iBuilder->getInt8PtrTy()); |
---|
615 | iBuilder->CreateCall(iBuilder->GetDprintf(), {iBuilder->getInt32(2), stringPtr, fItems, fCycles, iBuilder->CreateFDiv(fCycles, fItems)}); |
---|
616 | } |
---|
617 | } |
---|
618 | } |
---|
619 | |
---|
620 | void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const std::string & name, DynamicBuffer * const db, const uint64_t l) { |
---|
621 | |
---|
622 | BasicBlock * const doExpand = b->CreateBasicBlock(name + "Expand"); |
---|
623 | BasicBlock * const nextBlock = b->GetInsertBlock()->getNextNode(); |
---|
624 | doExpand->moveAfter(b->GetInsertBlock()); |
---|
625 | BasicBlock * const bufferReady = b->CreateBasicBlock(name + "Ready"); |
---|
626 | bufferReady->moveAfter(doExpand); |
---|
627 | if (nextBlock) nextBlock->moveAfter(bufferReady); |
---|
628 | |
---|
629 | Value * const handle = db->getStreamSetHandle(); |
---|
630 | |
---|
631 | Value * const produced = b->getProducedItemCount(name); |
---|
632 | Value * const consumed = b->getConsumedItemCount(name); |
---|
633 | Value * const required = b->CreateAdd(b->CreateSub(produced, consumed), b->getSize(2 * l)); |
---|
634 | |
---|
635 | b->CreateCondBr(b->CreateICmpUGT(required, db->getCapacity(b.get(), handle)), doExpand, bufferReady); |
---|
636 | |
---|
637 | b->SetInsertPoint(doExpand); |
---|
638 | db->doubleCapacity(b.get(), handle); |
---|
639 | // Ensure that capacity is sufficient by successive doubling, if necessary. |
---|
640 | b->CreateCondBr(b->CreateICmpUGT(required, db->getBufferedSize(b.get(), handle)), doExpand, bufferReady); |
---|
641 | |
---|
642 | b->SetInsertPoint(bufferReady); |
---|
643 | } |
---|
644 | |
---|
645 | inline const Binding & getBinding(const Kernel * k, const std::string & name) { |
---|
646 | Port port; unsigned index; |
---|
647 | std::tie(port, index) = k->getStreamPort(name); |
---|
648 | if (port == Port::Input) { |
---|
649 | return k->getStreamInput(index); |
---|
650 | } else { |
---|
651 | return k->getStreamOutput(index); |
---|
652 | } |
---|
653 | } |
---|
654 | |
---|
655 | void applyOutputBufferExpansions(const std::unique_ptr<KernelBuilder> & b, const Kernel * k) { |
---|
656 | const auto & outputs = k->getStreamSetOutputBuffers(); |
---|
657 | for (unsigned i = 0; i < outputs.size(); i++) { |
---|
658 | if (isa<DynamicBuffer>(outputs[i])) { |
---|
659 | const ProcessingRate & rate = k->getStreamOutput(i).getRate(); |
---|
660 | if (rate.isFixed() || rate.isBounded()) { |
---|
661 | const auto & name = k->getStreamOutput(i).getName(); |
---|
662 | const auto l = rate.getUpperBound() * k->getKernelStride(); |
---|
663 | applyOutputBufferExpansions(b, name, cast<DynamicBuffer>(outputs[i]), l); |
---|
664 | } else if (rate.isExactlyRelative()) { |
---|
665 | const auto binding = getBinding(k, rate.getReference()); |
---|
666 | const ProcessingRate & ref = binding.getRate(); |
---|
667 | if (rate.isFixed() || rate.isBounded()) { |
---|
668 | const auto & name = k->getStreamOutput(i).getName(); |
---|
669 | const auto l = (ref.getUpperBound() * rate.getNumerator() * k->getKernelStride() + rate.getDenominator() - 1) / rate.getDenominator(); |
---|
670 | applyOutputBufferExpansions(b, name, cast<DynamicBuffer>(outputs[i]), l); |
---|
671 | } |
---|
672 | } |
---|
673 | } |
---|
674 | } |
---|
675 | } |
---|