Skip to content

Commit 16b8a41

Browse files
updated executors
1 parent 9650dfa commit 16b8a41

11 files changed

Lines changed: 4146 additions & 0 deletions

File tree

example/executor_profiler.cpp

Lines changed: 366 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,366 @@
1+
// 2019/03/29 modified by Tsung-Wei Huang
2+
// - added bounded workstealing
3+
//
4+
// 2019/02/15 modified by Tsung-Wei Huang
5+
// - modified batch_insertion
6+
//
7+
// 2018/12/08 modified by Tsung-Wei Huang
8+
// - refactored the output format
9+
//
10+
// 2018/12/07 modified by Tsung-Wei Huang
11+
// - refactored the output format
12+
//
13+
// 2018/12/06 modified by Tsung-Wei Huang
14+
// - added nested insertions test
15+
//
16+
// 2018/12/04 modified by Tsung-Wei Huang
17+
// - replace privatized executor with work stealing executor
18+
//
19+
// 2018/10/04 modified by Tsung-Wei Huang
20+
// - removed binary_tree
21+
// - removed modulo_insertions
22+
// - adopted to the new executor implementation
23+
//
24+
// 2018/09/19 modified by Tsung-Wei Huang
25+
// - added binary_tree benchmark
26+
// - added modulo_insertions benchmark
27+
// - refactored benchmark calls
28+
//
29+
// 2018/08/31 contributed by Guannan
30+
//
31+
// Examples to test different executor implementations:
32+
// - SimpleExecutor
33+
// - ProactiveExecutor
34+
// - SpeculativeExecutor
35+
// - PrivatizedExecutor
36+
37+
#include <taskflow/executor/executor.hpp>
38+
#include <chrono>
39+
#include <cmath>
40+
#include <random>
41+
#include <numeric>
42+
#include <climits>
43+
#include <iomanip>
44+
45+
constexpr int WIDTH = 12;
46+
47+
using Closure = std::function<void()>;
48+
49+
// Procedure: benchmark
50+
#define BENCHMARK(TITLE, F) \
51+
std::cout \
52+
<< std::setw(WIDTH) << TITLE << std::flush \
53+
<< std::setw(WIDTH) << F<tf::SimpleExecutor<Closure>>() << std::flush \
54+
<< std::setw(WIDTH) << F<tf::ProactiveExecutor<Closure>>() << std::flush \
55+
<< std::setw(WIDTH) << F<tf::SpeculativeExecutor<Closure>>() << std::flush \
56+
<< std::setw(WIDTH) << F<tf::WorkStealingExecutor<Closure>>() << std::flush \
57+
<< std::setw(WIDTH) << F<tf::EigenWorkStealingExecutor<Closure>>() << std::flush \
58+
<< std::endl;
59+
60+
// ============================================================================
61+
// Divide and conquer to solve max subarray sum problem
62+
// https://www.geeksforgeeks.org/divide-and-conquer-maximum-sum-subarray/
63+
// ============================================================================
64+
65+
constexpr auto tree_height = 20u;
66+
constexpr auto total_nodes = 1u << tree_height;
67+
68+
void update_max(std::atomic<int>& max_val, const int value) {
69+
int old = max_val;
70+
while(old < value && !max_val.compare_exchange_weak(old, value));
71+
}
72+
73+
int max_cross_sum(const std::vector<int>& vec, int l, int m, int r){
74+
// Include elements on left of mid.
75+
auto sum = 0;
76+
auto left_sum = INT_MIN;
77+
for (auto i = m; i >= l; i--){
78+
sum = sum + vec[i];
79+
if (sum > left_sum)
80+
left_sum = sum;
81+
}
82+
83+
// Include elements on right of mid
84+
sum = 0;
85+
auto right_sum = INT_MIN;
86+
for (auto i = m+1; i <= r; i++)
87+
{
88+
sum = sum + vec[i];
89+
if (sum > right_sum)
90+
right_sum = sum;
91+
}
92+
93+
// Return sum of elements on left and right of mid
94+
return left_sum + right_sum;
95+
}
96+
97+
template<typename T>
98+
void max_subsum(
99+
const std::vector<int>& vec,
100+
int l, int r,
101+
std::atomic<int>& max_num,
102+
T& tp,
103+
std::atomic<size_t>& counter,
104+
std::promise<void>& promise
105+
) {
106+
// Base Case: Only one element
107+
if (l == r) {
108+
update_max(max_num, vec[l]);
109+
if(++counter == total_nodes*2-1){
110+
promise.set_value();
111+
}
112+
return ;
113+
}
114+
115+
// Find middle point
116+
int m = (l + r)/2;
117+
118+
tp.emplace([&, l=l, m=m] () {
119+
max_subsum(vec, l, m, max_num, tp, counter, promise);
120+
});
121+
122+
tp.emplace([&, m=m, r=r] () {
123+
max_subsum(vec, m+1, r, max_num, tp, counter, promise);
124+
});
125+
126+
update_max(max_num, max_cross_sum(vec, l, m, r));
127+
128+
if(++counter == total_nodes*2-1){
129+
promise.set_value();
130+
}
131+
}
132+
133+
template<typename T>
134+
auto subsum(){
135+
136+
std::vector<int> vec(total_nodes);
137+
std::iota(vec.begin(), vec.end(), -50);
138+
139+
std::atomic<int> result {INT_MIN};
140+
std::atomic<size_t> counter{0};
141+
std::promise<void> promise;
142+
auto future = promise.get_future();
143+
144+
auto start = std::chrono::high_resolution_clock::now();
145+
T tp(std::thread::hardware_concurrency());
146+
max_subsum(vec, 0, total_nodes-1, result, tp, counter, promise);
147+
future.get();
148+
auto end = std::chrono::high_resolution_clock::now();
149+
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
150+
return elapsed.count();
151+
}
152+
153+
// ============================================================================
154+
// Dynamic tasking through linear insertions
155+
// ============================================================================
156+
157+
// Procedure: linear_insertions
158+
template <typename T>
159+
auto linear_insertions() {
160+
161+
const int num_threads = std::thread::hardware_concurrency();
162+
const int num_tasks = 2000000;
163+
164+
auto beg = std::chrono::high_resolution_clock::now();
165+
166+
T executor(num_threads);
167+
168+
std::atomic<int> sum {0};
169+
170+
std::function<void(int)> insert;
171+
std::promise<int> promise;
172+
auto future = promise.get_future();
173+
174+
insert = [&executor, &insert, &sum, &promise] (int i) {
175+
if(i > 0) {
176+
executor.emplace([i=i-1, &insert] () {
177+
insert(i);
178+
});
179+
}
180+
else {
181+
if(size_t s = ++sum; s == executor.num_workers()) {
182+
promise.set_value(1);
183+
}
184+
}
185+
};
186+
187+
for(int i=0; i<num_threads; i++){
188+
insert(num_tasks / num_threads);
189+
}
190+
191+
// synchronize until all tasks finish
192+
assert(future.get() == 1);
193+
assert(sum == num_threads);
194+
auto end = std::chrono::high_resolution_clock::now();
195+
196+
return std::chrono::duration_cast<std::chrono::milliseconds>(end - beg).count();
197+
}
198+
199+
// ============================================================================
200+
// Insertions with atomic summation
201+
// ============================================================================
202+
203+
// Function: atomic_add
204+
template <typename T>
205+
auto atomic_add() {
206+
207+
const int num_threads = std::thread::hardware_concurrency();
208+
const int num_tasks = 1000000;
209+
210+
std::atomic<int> counter(0);
211+
auto beg = std::chrono::high_resolution_clock::now();
212+
213+
std::promise<void> promise;
214+
auto future = promise.get_future();
215+
216+
T executor(num_threads);
217+
for(size_t i=0; i<num_tasks; i++){
218+
executor.emplace([&](){
219+
if(counter.fetch_add(1, std::memory_order_relaxed) + 1 == num_tasks) {
220+
promise.set_value();
221+
}
222+
});
223+
}
224+
225+
future.get();
226+
227+
assert(counter == num_tasks);
228+
229+
auto end = std::chrono::high_resolution_clock::now();
230+
return std::chrono::duration_cast<std::chrono::milliseconds>(end - beg).count();
231+
}
232+
233+
// ============================================================================
234+
// skewed insertions
235+
// ============================================================================
236+
237+
// Function: nested_insertions
238+
template <typename T>
239+
auto nested_insertions() {
240+
241+
const int num_threads = std::thread::hardware_concurrency();
242+
const int num_tasks = 32;
243+
244+
auto beg = std::chrono::high_resolution_clock::now();
245+
246+
std::atomic<int64_t> counter(0);
247+
248+
std::promise<void> promise;
249+
auto future = promise.get_future();
250+
251+
auto increment = [&] () {
252+
int64_t sum = 0;
253+
for(int i=0; i<5; ++i) {
254+
sum = (sum + 1)*num_tasks;
255+
}
256+
if(++counter == sum) {
257+
promise.set_value();
258+
}
259+
};
260+
261+
T executor(num_threads);
262+
263+
executor.emplace([&] () {
264+
std::this_thread::sleep_for(std::chrono::microseconds(100));
265+
for(int i=0; i<num_tasks; ++i) {
266+
increment();
267+
executor.emplace([&] () {
268+
for(int i=0; i<num_tasks; ++i) {
269+
increment();
270+
executor.emplace([&] () {
271+
for(int i=0; i<num_tasks; ++i) {
272+
increment();
273+
executor.emplace([&] () {
274+
for(int i=0; i<num_tasks; ++i) {
275+
increment();
276+
executor.emplace([&] () {
277+
for(int i=0; i<num_tasks; ++i) {
278+
increment();
279+
}
280+
});
281+
}
282+
});
283+
}
284+
});
285+
}
286+
});
287+
}
288+
});
289+
290+
future.get();
291+
292+
auto end = std::chrono::high_resolution_clock::now();
293+
return std::chrono::duration_cast<std::chrono::milliseconds>(end - beg).count();
294+
}
295+
296+
// ============================================================================
297+
// batch insertion
298+
// ============================================================================
299+
300+
// Function: batch_insertions
301+
template <typename T>
302+
auto batch_insertions() {
303+
304+
const int num_threads = std::thread::hardware_concurrency();
305+
const int num_batches = 512;
306+
const int num_tasks = 512;
307+
308+
auto beg = std::chrono::high_resolution_clock::now();
309+
310+
std::atomic<int> counter(0);
311+
std::vector<std::function<void()>> tasks (num_tasks);
312+
313+
std::promise<void> promise;
314+
auto future = promise.get_future();
315+
316+
for(auto & task : tasks) {
317+
task = [&] () {
318+
if(++counter == 2*num_tasks*num_batches) {
319+
promise.set_value();
320+
}
321+
};
322+
}
323+
324+
T executor(num_threads);
325+
326+
// master to insert a batch
327+
for(size_t i=0; i<num_batches; ++i) {
328+
auto copy = tasks;
329+
executor.batch(copy);
330+
}
331+
332+
for(size_t i=0; i<num_batches; i++){
333+
executor.emplace([&](){
334+
auto copy = tasks;
335+
executor.batch(copy);
336+
});
337+
}
338+
339+
future.get();
340+
341+
auto end = std::chrono::high_resolution_clock::now();
342+
return std::chrono::duration_cast<std::chrono::milliseconds>(end - beg).count();
343+
}
344+
345+
// Function: main
346+
int main(int argc, char* argv[]) {
347+
348+
std::cout << std::setw(WIDTH) << "workload"
349+
<< std::setw(WIDTH) << "simple"
350+
<< std::setw(WIDTH) << "pro"
351+
<< std::setw(WIDTH) << "spec"
352+
<< std::setw(WIDTH) << "steal"
353+
<< std::setw(WIDTH) << "eigen"
354+
<< std::endl;
355+
356+
BENCHMARK("Atomic", atomic_add);
357+
BENCHMARK("Linear", linear_insertions);
358+
BENCHMARK("D&C", subsum);
359+
BENCHMARK("Batch", batch_insertions);
360+
BENCHMARK("Nested", nested_insertions);
361+
362+
return 0;
363+
}
364+
365+
366+

0 commit comments

Comments
 (0)