Skip to content

Commit e7aa0e3

Browse files
committed
added tf::ScalablePipeline(lines)
1 parent f8990db commit e7aa0e3

27 files changed

Lines changed: 792 additions & 4696 deletions

README.md

Lines changed: 22 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -240,26 +240,6 @@ tf::Task cudaflow = taskflow.emplace([&](tf::cudaFlow& cf) {
240240

241241
<p align="center"><img src="doxygen/images/saxpy_1_cudaflow.svg"></p>
242242

243-
Taskflow also supports SYCL, a general-purpose heterogeneous programming model,
244-
to program GPU tasks in a single-source C++ environment using the task graph-based
245-
approach.
246-
247-
```cpp
248-
tf::Task syclflow = taskflow.emplace_on([&](tf::syclFlow& sf){
249-
tf::syclTask h2d_x = cf.copy(dx, hx.data(), N).name("h2d_x");
250-
tf::syclTask h2d_y = cf.copy(dy, hy.data(), N).name("h2d_y");
251-
tf::syclTask d2h_x = cf.copy(hx.data(), dx, N).name("d2h_x");
252-
tf::syclTask d2h_y = cf.copy(hy.data(), dy, N).name("d2h_y");
253-
tf::syclTask saxpy = sf.parallel_for(sycl::range<1>(N),
254-
[=] (sycl::id<1> id) {
255-
dx[id] = 2.0f * dx[id] + dy[id];
256-
}
257-
).name("saxpy");
258-
saxpy.succeed(h2d_x, h2d_y)
259-
.precede(d2h_x, d2h_y);
260-
}, sycl_queue).name("syclFlow");
261-
```
262-
263243
## Compose Task Graphs
264244

265245
Taskflow is composable.
@@ -371,6 +351,28 @@ tf::cudaTask cuda3 = cudaflow.sort( // sort a range of items on GPU
371351
);
372352
```
373353

354+
Additionally, %Taskflow provides composable graph building blocks for you to
355+
efficiently implement common parallel algorithms, such as parallel pipeline.
356+
357+
@code{.cpp}
358+
// create a pipeline to propagate five tokens through three serial stages
359+
tf::Pipeline pl(num_parallel_lines,
360+
tf::Pipe{tf::PipeType::SERIAL, [](tf::Pipeflow& pf) {
361+
if(pf.token() == 5) {
362+
pf.stop();
363+
}
364+
}},
365+
tf::Pipe{tf::PipeType::SERIAL, [](tf::Pipeflow& pf) {
366+
printf("stage 2: input buffer[%zu] = %d\n", pf.line(), buffer[pf.line()]);
367+
}},
368+
tf::Pipe{tf::PipeType::SERIAL, [](tf::Pipeflow& pf) {
369+
printf("stage 3: input buffer[%zu] = %d\n", pf.line(), buffer[pf.line()]);
370+
}}
371+
);
372+
taskflow.composed_of(pl)
373+
executor.run(taskflow).wait();
374+
@endcode
375+
374376

375377
# Supported Compilers
376378

@@ -424,8 +426,6 @@ You are completely free to re-distribute your work derived from Taskflow.
424426
* * *
425427

426428
[Tsung-Wei Huang]: https://tsung-wei-huang.github.io/
427-
[Chun-Xun Lin]: https://github.com/clin99
428-
[Martin Wong]: https://ece.illinois.edu/directory/profile/mdfwong
429429
[GitHub releases]: https://github.com/taskflow/taskflow/releases
430430
[GitHub issues]: https://github.com/taskflow/taskflow/issues
431431
[GitHub insights]: https://github.com/taskflow/taskflow/pulse
@@ -434,15 +434,7 @@ You are completely free to re-distribute your work derived from Taskflow.
434434
[Project Website]: https://taskflow.github.io/
435435
[cppcon20 talk]: https://www.youtube.com/watch?v=MX15huP5DsM
436436
[contributors]: https://taskflow.github.io/taskflow/contributors.html
437-
[OpenMP Tasking]: https://www.openmp.org/spec-html/5.0/openmpsu99.html
438-
[TBB FlowGraph]: https://www.threadingbuildingblocks.org/tutorial-intel-tbb-flow-graph
439-
[OpenTimer]: https://github.com/OpenTimer/OpenTimer
440-
[DtCraft]: https://github.com/tsung-wei-huang/DtCraft
441437
[totalgee]: https://github.com/totalgee
442-
[damienhocking]: https://github.com/damienhocking
443-
[ForgeMistress]: https://github.com/ForgeMistress
444-
[Patrik Huber]: https://github.com/patrikhuber
445-
[KingDuckZ]: https://github.com/KingDuckZ
446438
[NSF]: https://www.nsf.gov/
447439
[UIUC]: https://illinois.edu/
448440
[CSL]: https://csl.illinois.edu/
@@ -452,18 +444,7 @@ You are completely free to re-distribute your work derived from Taskflow.
452444
[cookbook]: https://taskflow.github.io/taskflow/pages.html
453445
[references]: https://taskflow.github.io/taskflow/References.html
454446
[PayMe]: https://www.paypal.me/twhuang/10
455-
[C++17]: https://en.wikipedia.org/wiki/C%2B%2B17
456-
[C++14]: https://en.wikipedia.org/wiki/C%2B%2B14
457447
[email me]: mailto:[email protected]
458448
[Cpp Conference 2018]: https://github.com/CppCon/CppCon2018
459-
[IPDPS19]: https://tsung-wei-huang.github.io/papers/ipdps19.pdf
460449
[TPDS21]: https://tsung-wei-huang.github.io/papers/tpds21-taskflow.pdf
461-
[cuda-zone]: https://developer.nvidia.com/cuda-zone
462-
[nvcc]: https://developer.nvidia.com/cuda-llvm-compiler
463-
[cudaGraph]: https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__GRAPH.html
464-
[Firestorm]: https://github.com/ForgeMistress/Firestorm
465-
[Shiva]: https://shiva.gitbook.io/project/shiva
466-
[PID Framework]: http://pid.lirmm.net/pid-framework/index.html
467-
[NovusCore]: https://github.com/novuscore/NovusCore
468-
[SA-PCB]: https://github.com/choltz95/SA-PCB
469450

benchmarks/graph_pipeline/gold.cpp

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
#include "levelgraph.hpp"
22
#include <fstream>
3-
//#include "matrix_calculation.hpp"
4-
53

64

75
int pipe_helper(
@@ -600,17 +598,12 @@ std::chrono::microseconds measure_time_gold(
600598
graph_pipeline_gold_16_pipes(graph);
601599
end = std::chrono::high_resolution_clock::now();
602600
break;
601+
602+
default:
603+
throw std::runtime_error("can support only up to 16 pipes");
604+
break;
603605
}
604606

605-
//std::ofstream outputfile;
606-
//outputfile.open("./build/benchmarks/tf_time.csv", std::ofstream::app);
607-
//outputfile << num_threads << ','
608-
// << num_lines << ','
609-
// << pipes << ','
610-
// << size << ','
611-
// << elapsed.count()/1e3 << '\n';
612-
613-
//outputfile.close();
614607
return std::chrono::duration_cast<std::chrono::microseconds>(end - beg);
615608
}
616609

benchmarks/graph_pipeline/omp.cpp

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,6 @@ void last_pipe_helper(LevelGraph& graph, const size_t i) {
2929
else {
3030
graph.node_at(lev, len).set_value(retval);
3131
}
32-
//std::ofstream outputfile;
33-
//outputfile.open("./omp_result_.txt", std::ofstream::app);
34-
//outputfile << graph.node_at(lev, len).get_value() << '\n';
35-
//outputfile.close();
3632
}
3733

3834

@@ -1413,18 +1409,11 @@ std::chrono::microseconds measure_time_omp(
14131409
graph_pipeline_omp_16_pipes(graph);
14141410
end = std::chrono::high_resolution_clock::now();
14151411
break;
1412+
1413+
default:
1414+
throw std::runtime_error("can support only up to 16 pipes");
1415+
break;
14161416
}
1417-
1418-
//std::ofstream outputfile;
1419-
//outputfile.open("./omp_time.csv", std::ofstream::app);
1420-
//outputfile << num_threads << ','
1421-
// << num_lines << ','
1422-
// << pipes << ','
1423-
// << graph.graph_size() << ','
1424-
// << (std::chrono::duration_cast<std::chrono::microseconds>(end - beg).count())/1e3
1425-
// << '\n';
1426-
1427-
//outputfile.close();
14281417
return std::chrono::duration_cast<std::chrono::microseconds>(end - beg);
14291418
}
14301419

benchmarks/graph_pipeline/taskflow.cpp

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
#include <taskflow/algorithm/pipeline.hpp>
44
//#include "matrix_calculation.hpp"
55

6-
7-
86
struct Input {
97
size_t lev;
108
size_t len;
@@ -68,11 +66,6 @@ struct FilterFinal {
6866
else {
6967
graph.node_at(lev, len).set_value(val);
7068
}
71-
72-
//std::ofstream outputfile;
73-
//outputfile.open("./tf_result_.txt", std::ofstream::app);
74-
//outputfile << graph.node_at(lev, len).get_value() << '\n';
75-
//outputfile.close();
7669
}
7770
};
7871

@@ -597,17 +590,11 @@ std::chrono::microseconds measure_time_taskflow(
597590
case 16:
598591
elapsed = graph_pipeline_taskflow_16_pipes(graph, num_lines, num_threads);
599592
break;
600-
}
601593

602-
//std::ofstream outputfile;
603-
//outputfile.open("./tf_time.csv", std::ofstream::app);
604-
//outputfile << num_threads << ','
605-
// << num_lines << ','
606-
// << pipes << ','
607-
// << graph.graph_size() << ','
608-
// << elapsed.count()/1e3 << '\n';
609-
610-
//outputfile.close();
594+
default:
595+
throw std::runtime_error("can support only up to 16 pipes");
596+
break;
597+
}
611598
return elapsed;
612599
}
613600

benchmarks/graph_pipeline/tbb.cpp

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -491,23 +491,11 @@ std::chrono::microseconds measure_time_tbb(
491491
graph_pipeline_tbb_16_pipes(graph, num_lines);
492492
end = std::chrono::high_resolution_clock::now();
493493
break;
494+
495+
default:
496+
throw std::runtime_error("can support only up to 16 pipes");
497+
break;
494498
}
495499

496-
//std::ofstream outputfile;
497-
//outputfile.open("./tbb_result.txt", std::ofstream::app);
498-
//for (auto r:result) {
499-
// outputfile << r << '\n';
500-
//}
501-
502-
//std::ofstream outputfile;
503-
//outputfile.open("./tbb_time.csv", std::ofstream::app);
504-
//outputfile << num_threads << ','
505-
// << num_lines << ','
506-
// << pipes << ','
507-
// << graph.graph_size() << ','
508-
// << (std::chrono::duration_cast<std::chrono::microseconds>(end - beg).count())/1e3
509-
// << '\n';
510-
//outputfile.close();
511-
512500
return std::chrono::duration_cast<std::chrono::microseconds>(end - beg);
513501
}

benchmarks/linear_pipeline/taskflow.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -510,6 +510,10 @@ std::chrono::microseconds measure_time_taskflow(
510510
case 16:
511511
elapsed = parallel_pipeline_taskflow_16_pipes(pipes, num_lines, num_threads, size);
512512
break;
513+
514+
default:
515+
throw std::runtime_error("can support only up to 16 pipes");
516+
break;
513517
}
514518

515519
//std::ofstream outputfile;

benchmarks/linear_pipeline/tbb.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -717,6 +717,10 @@ std::chrono::microseconds measure_time_tbb(
717717
parallel_pipeline_tbb_16_pipes(pipes, num_lines, size);
718718
end = std::chrono::high_resolution_clock::now();
719719
break;
720+
721+
default:
722+
throw std::runtime_error("can support only up to 16 pipes");
723+
break;
720724
}
721725

722726
//std::ofstream outputfile;

docs/ParallelPipeline.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ <h3>Contents</h3>
6767
<li><a href="#ParallelPipelineLearnMore">Learn More about Taskflow Pipeline</a></li>
6868
</ul>
6969
</div>
70-
<p>Taskflow provides a <em>task-parallel</em> pipeline programming framework for you to create a <em>pipeline scheduling framework</em> to implement pipeline algorithms. Pipeline parallelism refers to a parallel execution of multiple data tokens through a linear chain of pipes or stages. Each stage processes the data token sent from the previous stage, applies the given callable to that data token, and then sends the result to the next stage. Multiple data tokens can be processed simultaneously across different stages.</p><section id="ParallelPipelineIncludeHeaderFile"><h2><a href="#ParallelPipelineIncludeHeaderFile">Include the Header</a></h2><p>You need to include the header file, <code>taskflow/algorithm/pipeline.hpp</code>, for creating a pipeline scheduling framework.</p><pre class="m-code"><span class="cp">#include</span> <span class="cpf">&lt;taskflow/algorithm/pipeline.hpp&gt;</span><span class="cp"></span></pre></section><section id="UnderstandPipelineScheduling"><h2><a href="#UnderstandPipelineScheduling">Understand the Pipeline Scheduling Framework</a></h2><p>A <a href="classtf_1_1Pipeline.html" class="m-doc">tf::<wbr />Pipeline</a> object is a <em>composable</em> graph to create a <em>pipeline scheduling framework</em> through a module task in a taskflow (see <a href="ComposableTasking.html" class="m-doc">Composable Tasking</a>). Unlike the conventional pipeline programming frameworks (e.g., Intel TBB Parallel <a href="classtf_1_1Pipeline.html" class="m-doc">Pipeline</a>), Taskflow&#x27;s pipeline algorithm does not provide any data abstraction, which often restricts users from optimizing data layouts in their applications, but a flexible framework for users to customize their application data atop an efficient pipeline scheduling framework.</p><div class="m-graph"><svg style="width: 22.250rem; height: 22.688rem;" viewBox="0.00 0.00 356.00 363.08">
70+
<p>Taskflow provides a <em>task-parallel</em> pipeline programming framework for you to create a <em>pipeline scheduling framework</em>. Pipeline parallelism refers to a parallel execution of multiple data tokens through a linear chain of pipes or stages. Each stage processes the data token sent from the previous stage, applies the given callable to that data token, and then sends the result to the next stage. Multiple data tokens can be processed simultaneously across different stages.</p><section id="ParallelPipelineIncludeHeaderFile"><h2><a href="#ParallelPipelineIncludeHeaderFile">Include the Header</a></h2><p>You need to include the header file, <code>taskflow/algorithm/pipeline.hpp</code>, for creating a pipeline scheduling framework.</p><pre class="m-code"><span class="cp">#include</span> <span class="cpf">&lt;taskflow/algorithm/pipeline.hpp&gt;</span><span class="cp"></span></pre></section><section id="UnderstandPipelineScheduling"><h2><a href="#UnderstandPipelineScheduling">Understand the Pipeline Scheduling Framework</a></h2><p>A <a href="classtf_1_1Pipeline.html" class="m-doc">tf::<wbr />Pipeline</a> object is a <em>composable</em> graph to create a <em>pipeline scheduling framework</em> through a module task in a taskflow (see <a href="ComposableTasking.html" class="m-doc">Composable Tasking</a>). Unlike the conventional pipeline programming frameworks (e.g., Intel TBB Parallel <a href="classtf_1_1Pipeline.html" class="m-doc">Pipeline</a>), Taskflow&#x27;s pipeline algorithm does not provide any data abstraction, which often restricts users from optimizing data layouts in their applications, but a flexible framework for users to customize their application data atop an efficient pipeline scheduling framework.</p><div class="m-graph"><svg style="width: 22.250rem; height: 22.688rem;" viewBox="0.00 0.00 356.00 363.08">
7171
<g transform="scale(1 1) rotate(0) translate(4 359.0782)">
7272
<title>Taskflow</title>
7373
<g class="m-cluster">

0 commit comments

Comments
 (0)