Skip to content

Commit 7f27687

Browse files
author
clin99
committed
Successfully merge
1 parent 0086dcf commit 7f27687

3 files changed

Lines changed: 365 additions & 0 deletions

File tree

CMakeLists.txt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,13 @@ message(STATUS "Build type: " ${CMAKE_BUILD_TYPE})
1818
message(STATUS "CMAKE_CXX_COMPILER: " ${CMAKE_CXX_COMPILER})
1919
message(STATUS "CMAKE_CXX_FLAGS: " ${CMAKE_CXX_FLAGS})
2020

21+
find_package(OpenMP)
22+
if (OPENMP_FOUND)
23+
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${OpenMP_C_FLAGS}")
24+
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${OpenMP_CXX_FLAGS}")
25+
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${OpenMP_EXE_LINKER_FLAGS}")
26+
endif()
27+
2128
# The version number
2229
set (TASKFLOW_MAJOR_VERSION "0")
2330
set (TASKFLOW_MINOR_VERSION "1")
@@ -38,6 +45,10 @@ set (CMAKE_RUNTIME_OUTPUT_DIRECTORY ${PROJECT_SOURCE_DIR}/example)
3845
add_executable(simple example/simple.cpp)
3946
target_link_libraries(simple ${CMAKE_THREAD_LIBS_INIT})
4047

48+
set (CMAKE_RUNTIME_OUTPUT_DIRECTORY ${PROJECT_SOURCE_DIR}/example)
49+
add_executable(matrix example/matrix.cpp)
50+
target_link_libraries(matrix ${CMAKE_THREAD_LIBS_INIT})
51+
4152
set (CMAKE_RUNTIME_OUTPUT_DIRECTORY ${PROJECT_SOURCE_DIR}/unittest)
4253
add_executable(taskflow unittest/taskflow.cpp)
4354
target_link_libraries(taskflow ${CMAKE_THREAD_LIBS_INIT})

example/matrix.cpp

Lines changed: 330 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,330 @@
1+
// Cubist programming assignment.
2+
//
3+
// Author: Tsung-Wei Huang
4+
//
5+
// This program is accomplished by my self, without any advice or help from
6+
// other individuals. This is my own products.
7+
//
8+
// Dependency: taskflow.hpp
9+
// taskflow.hpp is a c++ DAG-based task scheduler. It has been used in my open-source
10+
// projects DtCraft and OpenTimer.
11+
// Check my github for more details: https://github.com/twhuang-uiuc
12+
13+
#include <iostream>
14+
#include <cstdlib>
15+
#include <cstdio>
16+
#include <thread>
17+
#include <cmath>
18+
#include <random>
19+
#include <vector>
20+
#include <fstream>
21+
#include <cstring>
22+
#include <taskflow.hpp>
23+
24+
// ----------------------------------------------------------------------------
25+
// Utility section
26+
// ----------------------------------------------------------------------------
27+
28+
using matrix_t = std::vector<std::vector<float>>;
29+
30+
// Function: random_matrix
31+
// Generate a matrix between 0 and 1
32+
matrix_t random_matrix(size_t rows, size_t cols) {
33+
34+
matrix_t mat;
35+
36+
mat.resize(rows);
37+
for(size_t r=0; r<rows; ++r) {
38+
mat[r].resize(cols);
39+
for(size_t c=0; c<cols; ++c) {
40+
mat[r][c] = ::rand() / static_cast<float>(RAND_MAX);
41+
}
42+
}
43+
44+
return mat;
45+
}
46+
47+
// Procedure: save_matrix
48+
// save a give matrix to a path
49+
void save_matrix(const std::string& path, const matrix_t& mat) {
50+
51+
std::cout << std::string("saving matrix ") + path.c_str() + "\n";
52+
53+
std::ofstream ofs(path);
54+
55+
if(!ofs.good() || mat.empty()) {
56+
throw std::invalid_argument("failed to save matrix");
57+
}
58+
59+
ofs << mat.size() << ' ' << mat[0].size() << '\n';
60+
61+
for(size_t r=0; r<mat.size(); ++r) {
62+
for(size_t c=0; c<mat[r].size(); ++c) {
63+
ofs << mat[r][c] << ' ';
64+
}
65+
ofs << '\n';
66+
}
67+
}
68+
69+
// Function: load_matrix
70+
matrix_t load_matrix(const std::string& path) {
71+
72+
std::cout << std::string("loading matrix ") + path.c_str() + "\n";
73+
74+
std::ifstream ifs(path);
75+
76+
if(!ifs.good()) {
77+
throw std::invalid_argument("failed to load matrix");
78+
}
79+
80+
size_t rows, cols;
81+
82+
ifs >> rows >> cols;
83+
84+
matrix_t mat;
85+
mat.resize(rows);
86+
for(size_t r=0; r<rows; ++r) {
87+
mat[r].resize(cols);
88+
}
89+
90+
for(size_t r=0; r<rows; ++r) {
91+
for(size_t c=0; c<cols; ++c) {
92+
ifs >> mat[r][c];
93+
}
94+
}
95+
96+
return mat;
97+
}
98+
99+
// Dummy caculation
100+
matrix_t operator + (const matrix_t& a, auto b) {
101+
matrix_t res = a;
102+
return res;
103+
}
104+
105+
106+
// ----------------------------------------------------------------------------
107+
// Task section
108+
// ----------------------------------------------------------------------------
109+
110+
// Procedure: generate_test
111+
void generate_test(size_t N) {
112+
// generate test data
113+
auto a = random_matrix(N, N);
114+
save_matrix("a.csv", a);
115+
auto b = random_matrix(N, N);
116+
save_matrix("b.csv", b);
117+
}
118+
119+
// Procedure: function1
120+
matrix_t func1(const matrix_t& x, auto&& j) {
121+
std::cout << "computing1 ...\n";
122+
matrix_t dummy = x;
123+
std::this_thread::sleep_for(std::chrono::seconds(2));
124+
return dummy;
125+
}
126+
127+
// Procedure: function2
128+
auto func2(const matrix_t& a, auto&& b) {
129+
std::cout << "computing2 ...\n";
130+
matrix_t dummy = a;
131+
std::this_thread::sleep_for(std::chrono::seconds(2));
132+
return dummy;
133+
}
134+
135+
// Procedure: sequential
136+
void sequential(size_t N) {
137+
138+
generate_test(N);
139+
140+
auto tbeg = std::chrono::steady_clock::now();
141+
142+
auto a = load_matrix("a.csv");
143+
for(int j=1; j<=5; ++j) {
144+
auto tmp = func1(a, j);
145+
save_matrix(std::string("a") + std::to_string(j) + ".csv", tmp);
146+
}
147+
148+
auto b = load_matrix("b.csv");
149+
for(int j=1; j<=5; ++j) {
150+
auto tmp = func2(b, j);
151+
save_matrix(std::string("b") + std::to_string(j) + ".csv", tmp);
152+
}
153+
154+
for(int j=1; j<=5; ++j) {
155+
auto a = load_matrix(std::string("a") + std::to_string(j) + ".csv");
156+
auto b = load_matrix(std::string("b") + std::to_string(j) + ".csv");
157+
auto c = func2(a, b);
158+
save_matrix(std::string("c") + std::to_string(j) + ".csv", c);
159+
}
160+
161+
auto tend = std::chrono::steady_clock::now();
162+
163+
std::cout << "sequential version takes "
164+
<< std::chrono::duration_cast<std::chrono::seconds>(tend-tbeg).count()
165+
<< " seconds\n";
166+
}
167+
168+
// Procedure: naive_parallel
169+
void naive_parallel(size_t N, size_t num_threads = std::thread::hardware_concurrency()) {
170+
171+
generate_test(N);
172+
173+
auto tbeg = std::chrono::steady_clock::now();
174+
175+
auto a = load_matrix("a.csv");
176+
#pragma omp parallel for num_threads(num_threads)
177+
for(int j=1; j<=5; ++j) {
178+
auto tmp = func1(a, j);
179+
save_matrix(std::string("a") + std::to_string(j) + ".csv", tmp);
180+
}
181+
182+
auto b = load_matrix("b.csv");
183+
#pragma omp parallel for num_threads(num_threads)
184+
for(int j=1; j<=5; ++j) {
185+
auto tmp = func2(b, j);
186+
save_matrix(std::string("b") + std::to_string(j) + ".csv", tmp);
187+
}
188+
189+
#pragma omp parallel for num_threads(num_threads)
190+
for(int j=1; j<=5; ++j) {
191+
auto a = load_matrix(std::string("a") + std::to_string(j) + ".csv");
192+
auto b = load_matrix(std::string("b") + std::to_string(j) + ".csv");
193+
auto c = func2(a, b);
194+
save_matrix(std::string("c") + std::to_string(j) + ".csv", c);
195+
}
196+
197+
auto tend = std::chrono::steady_clock::now();
198+
199+
std::cout << "naive parallel version takes "
200+
<< std::chrono::duration_cast<std::chrono::seconds>(tend-tbeg).count()
201+
<< " seconds\n";
202+
}
203+
204+
// Procedure: parallel
205+
void parallel(size_t N, size_t num_threads = std::thread::hardware_concurrency()) {
206+
207+
generate_test(N);
208+
209+
auto tbeg = std::chrono::steady_clock::now();
210+
211+
tf::Taskflow<int> tf(num_threads);
212+
213+
// Parallelize the following tasks.
214+
// auto a = load_matrix("a.csv");
215+
// for(int j=1; j<=5; ++j) {
216+
// auto tmp = func1(a, j);
217+
// save_matrix(std::string("a") + std::to_string(j) + ".csv", tmp);
218+
// }
219+
// auto b = load_matrix("b.csv");
220+
// for(int j=1; j<=5; ++j) {
221+
// auto tmp = func2(b, j);
222+
// save_matrix(std::string("b") + std::to_string(j) + ".csv", tmp);
223+
// }
224+
matrix_t a;
225+
auto load_a = tf.silent_emplace([&] () { a = load_matrix("a.csv"); });
226+
auto save_a1 = tf.silent_emplace([&] () { save_matrix("a1.csv", func1(a, 1)); });
227+
auto save_a2 = tf.silent_emplace([&] () { save_matrix("a2.csv", func1(a, 2)); });
228+
auto save_a3 = tf.silent_emplace([&] () { save_matrix("a3.csv", func1(a, 3)); });
229+
auto save_a4 = tf.silent_emplace([&] () { save_matrix("a4.csv", func1(a, 4)); });
230+
auto save_a5 = tf.silent_emplace([&] () { save_matrix("a5.csv", func1(a, 5)); });
231+
232+
tf.broadcast(load_a, {save_a1, save_a2, save_a3, save_a4, save_a5});
233+
234+
matrix_t b;
235+
auto load_b = tf.silent_emplace([&] () { b = load_matrix("b.csv"); });
236+
auto save_b1 = tf.silent_emplace([&] () { save_matrix("b1.csv", func1(b, 1)); });
237+
auto save_b2 = tf.silent_emplace([&] () { save_matrix("b2.csv", func1(b, 2)); });
238+
auto save_b3 = tf.silent_emplace([&] () { save_matrix("b3.csv", func1(b, 3)); });
239+
auto save_b4 = tf.silent_emplace([&] () { save_matrix("b4.csv", func1(b, 4)); });
240+
auto save_b5 = tf.silent_emplace([&] () { save_matrix("b5.csv", func1(b, 5)); });
241+
242+
tf.broadcast(load_b, {save_b1, save_b2, save_b3, save_b4, save_b5});
243+
244+
// Synchronize
245+
auto sync = tf.silent_emplace([&]() {std::cout << "a[1:5].csv and b[1:5].csv written\n";});
246+
247+
tf.gather({save_a1, save_a2, save_a3, save_a4, save_a5,
248+
save_b1, save_b2, save_b3, save_b4, save_b5}, sync);
249+
250+
// Parallelize the following
251+
// for(int j=1; j<=5; ++j) {
252+
// auto a = load_matrix(std::string("a") + std::to_string(j) + ".csv");
253+
// auto b = load_matrix(std::string("b") + std::to_string(j) + ".csv");
254+
// auto c = func2(a, b);
255+
// save_matrix(std::string("c") + std::to_string(j) + ".csv", c);
256+
// }
257+
matrix_t a1, a2, a3, a4, a5, b1, b2, b3, b4, b5;
258+
auto load_a1 = tf.silent_emplace([&](){ a1 = load_matrix("a1.csv"); });
259+
auto load_a2 = tf.silent_emplace([&](){ a2 = load_matrix("a2.csv"); });
260+
auto load_a3 = tf.silent_emplace([&](){ a3 = load_matrix("a3.csv"); });
261+
auto load_a4 = tf.silent_emplace([&](){ a4 = load_matrix("a4.csv"); });
262+
auto load_a5 = tf.silent_emplace([&](){ a5 = load_matrix("a5.csv"); });
263+
auto load_b1 = tf.silent_emplace([&](){ a1 = load_matrix("b1.csv"); });
264+
auto load_b2 = tf.silent_emplace([&](){ a2 = load_matrix("b2.csv"); });
265+
auto load_b3 = tf.silent_emplace([&](){ a3 = load_matrix("b3.csv"); });
266+
auto load_b4 = tf.silent_emplace([&](){ a4 = load_matrix("b4.csv"); });
267+
auto load_b5 = tf.silent_emplace([&](){ a5 = load_matrix("b5.csv"); });
268+
auto save_c1 = tf.silent_emplace([&](){ save_matrix("c1.csv", func2(a1, b1)); });
269+
auto save_c2 = tf.silent_emplace([&](){ save_matrix("c2.csv", func2(a2, b2)); });
270+
auto save_c3 = tf.silent_emplace([&](){ save_matrix("c3.csv", func2(a3, b3)); });
271+
auto save_c4 = tf.silent_emplace([&](){ save_matrix("c4.csv", func2(a4, b4)); });
272+
auto save_c5 = tf.silent_emplace([&](){ save_matrix("c5.csv", func2(a5, b5)); });
273+
274+
tf.broadcast(sync, {load_a1, load_a2, load_a3, load_a4, load_a5,
275+
load_b1, load_b2, load_b3, load_b4, load_b5});
276+
277+
tf.precede(load_a1, save_c1)
278+
.precede(load_b1, save_c1)
279+
.precede(load_a2, save_c2)
280+
.precede(load_b2, save_c2)
281+
.precede(load_a3, save_c3)
282+
.precede(load_b3, save_c3)
283+
.precede(load_a4, save_c4)
284+
.precede(load_b4, save_c4)
285+
.precede(load_a5, save_c5)
286+
.precede(load_b5, save_c5)
287+
.wait_for_all();
288+
289+
auto tend = std::chrono::steady_clock::now();
290+
std::cout << "parallel version takes "
291+
<< std::chrono::duration_cast<std::chrono::seconds>(tend-tbeg).count()
292+
<< " seconds\n";
293+
}
294+
295+
// ------------------------------------------------------------------------------------------------
296+
297+
// Function: main
298+
int main(int argc, char* argv[]) {
299+
300+
if(argc != 3) {
301+
std::cerr << "usage: ./cubist N [seq|naive|taskflow]\n";
302+
std::exit(EXIT_FAILURE);
303+
}
304+
305+
if(std::strcmp(argv[2], "seq") == 0) {
306+
sequential(std::stoi(argv[1]));
307+
}
308+
else if(std::strcmp(argv[2], "naive") == 0) {
309+
naive_parallel(std::stoi(argv[1]));
310+
}
311+
else if(std::strcmp(argv[2], "taskflow") == 0) {
312+
parallel(std::stof(argv[1]));
313+
}
314+
else {
315+
std::cerr << "wrong method\n";
316+
}
317+
318+
return 0;
319+
}
320+
321+
322+
323+
324+
325+
326+
327+
328+
329+
330+

taskflow.hpp

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,30 @@ class Taskflow {
353353
template <typename L>
354354
void _gather(const L&, const KeyT&);
355355
};
356+
357+
358+
template<typename KeyT>
359+
Taskflow<KeyT>::Topology::Topology(std::unordered_map<KeyT, Task>&& t) :
360+
tasks(std::move(t)) {
361+
362+
std::promise<void> promise;
363+
364+
future = promise.get_future().share();
365+
target.work = [p=MoveOnCopy{std::move(promise)}] () mutable { p.get().set_value(); };
366+
367+
source.precede(target);
368+
369+
// Build the super source and super target.
370+
for(auto& kvp : tasks) {
371+
if(kvp.second.dependents == 0) {
372+
source.precede(kvp.second);
373+
}
374+
if(kvp.second.successors.size() == 0) {
375+
kvp.second.precede(target);
376+
}
377+
}
378+
}
379+
356380

357381
template <typename KeyT>
358382
template <typename C>

0 commit comments

Comments
 (0)