Skip to content

Commit e455550

Browse files
added parallel_for on index
1 parent e3eb1e6 commit e455550

7 files changed

Lines changed: 329 additions & 88 deletions

File tree

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ add_test(builder ${TF_UTEST_DIR}/taskflow -tc=Builder)
172172
add_test(dispatch ${TF_UTEST_DIR}/taskflow -tc=Dispatch)
173173
add_test(executor ${TF_UTEST_DIR}/taskflow -tc=Executor)
174174
add_test(parallel_for ${TF_UTEST_DIR}/taskflow -tc=ParallelFor)
175+
add_test(parallel_for_idx ${TF_UTEST_DIR}/taskflow -tc=ParallelForOnIndex)
175176
add_test(reduce ${TF_UTEST_DIR}/taskflow -tc=Reduce)
176177
add_test(reduce_min ${TF_UTEST_DIR}/taskflow -tc=ReduceMin)
177178
add_test(reduce_max ${TF_UTEST_DIR}/taskflow -tc=ReduceMax)

README.md

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,7 @@ Visit [documentation][wiki] to see the complete list.
479479
| placeholder | none | task | insert a node without any work; work can be assigned later |
480480
| linearize | task list | none | create a linear dependency in the given task list |
481481
| parallel_for | beg, end, callable, group | task pair | apply the callable in parallel and group-by-group to the result of dereferencing every iterator in the range |
482+
| parallel_for | beg, end, step, callable, group | task pair | apply the callable in parallel and group-by-group to a index-based range |
482483
| reduce | beg, end, res, bop | task pair | reduce a range of elements to a single result through a binary operator |
483484
| transform_reduce | beg, end, res, bop, uop | task pair | apply a unary operator to each element in the range and reduce them to a single result through a binary operator |
484485
| dispatch | none | future | dispatch the current graph and return a shared future to block on completion |
@@ -569,12 +570,42 @@ auto [S, T] = tf.parallel_for(
569570
[] (int i) {
570571
std::cout << "AB and CD run in parallel" << '\n';
571572
},
572-
2 // group to execute two tasks at a time
573+
2 // group two tasks at a time
573574
);
574575
```
575576

576577
By default, taskflow performs an even partition over worker threads
577-
if the group size is not specified.
578+
if the group size is not specified (or equal to 0).
579+
580+
In addition to range-based iterator, parallel\_for has another overload on an index-based loop.
581+
The first three argument to this overload indicates
582+
starting index, ending index (exclusive), and step size.
583+
584+
```cpp
585+
// [0, 10) with a step size of 2
586+
auto [S, T] = tf.parallel_for(
587+
0, 10, 2,
588+
[] (int i) {
589+
std::cout << "parallel_for on index " << i << std::endl;
590+
},
591+
2 // group two tasks at a time
592+
);
593+
// will print 0, 2, 4, 6, 8 (three groups, {0, 2}, {4, 6}, {8})
594+
```
595+
596+
You can also go opposite direction by reversing the starting index and the ending index
597+
with a negative step size.
598+
599+
```cpp
600+
// [10, 0) with a step size of -2
601+
auto [S, T] = tf.parallel_for(
602+
10, 0, 2,
603+
[] (int i) {
604+
std::cout << "parallel_for on index " << i << std::endl;
605+
}
606+
);
607+
// will print 10, 8, 6, 4, 2 (group size decided by taskflow)
608+
```
578609

579610
### *reduce/transform_reduce*
580611

example/parallel_for.cpp

Lines changed: 16 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -2,65 +2,37 @@
22
#include <cassert>
33
#include <numeric>
44

5-
// Function: fib
6-
int fib(int n) {
7-
if(n <= 2) return n;
8-
return (fib(n-1) + fib(n-2))%1024;
9-
}
10-
11-
// ----------------------------------------------------------------------------
12-
13-
// Procedure: sequential
14-
void sequential(int N) {
15-
auto tbeg = std::chrono::steady_clock::now();
16-
for(int i=0; i<N; ++i) {
17-
printf("fib[%d]=%d\n", i, fib(i));
18-
}
19-
auto tend = std::chrono::steady_clock::now();
20-
std::cout << "sequential version takes "
21-
<< std::chrono::duration_cast<std::chrono::milliseconds>(tend-tbeg).count()
22-
<< " ms\n";
23-
}
24-
25-
// Procedure: taskflow
26-
void taskflow(int N) {
5+
// Procedure: parallel_for_on_range
6+
void parallel_for_on_range(int N) {
277

288
std::vector<int> range(N);
299
std::iota(range.begin(), range.end(), 0);
3010

31-
auto tbeg = std::chrono::steady_clock::now();
3211
tf::Taskflow tf;
3312
tf.parallel_for(range, [&] (const int i) {
34-
printf("fib[%d]=%d\n", i, fib(i));
35-
}, 1);
13+
printf("parallel_for on container item: %d\n", i);
14+
});
3615
tf.wait_for_all();
16+
}
3717

38-
auto tend = std::chrono::steady_clock::now();
39-
std::cout << "taskflow version takes "
40-
<< std::chrono::duration_cast<std::chrono::milliseconds>(tend-tbeg).count()
41-
<< " ms\n";
18+
// Procedure: parallel_for_on_index
19+
void parallel_for_on_index(int N) {
20+
tf::Taskflow tf;
21+
22+
// [0, N) with step size 1
23+
tf.parallel_for(0, N, 1, [] (int i) {
24+
printf("parallel_for on index: %d\n", i);
25+
});
26+
tf.wait_for_all();
4227
}
4328

4429
// ----------------------------------------------------------------------------
4530

4631
// Function: main
4732
int main(int argc, char* argv[]) {
4833

49-
if(argc != 3) {
50-
std::cerr << "usage: ./parallel_for [baseline|taskflow] N\n";
51-
std::exit(EXIT_FAILURE);
52-
}
53-
54-
// Run methods
55-
if(std::string_view method(argv[1]); method == "baseline") {
56-
sequential(std::atoi(argv[2]));
57-
}
58-
else if(method == "taskflow") {
59-
taskflow(std::atoi(argv[2]));
60-
}
61-
else {
62-
std::cerr << "wrong method, shoud be [baseline|taskflow]\n";
63-
}
34+
parallel_for_on_range(10);
35+
parallel_for_on_index(10);
6436

6537
return 0;
6638
}

taskflow/error/error.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ struct Error : public std::error_category {
1212

1313
enum Code : int {
1414
SUCCESS = 0,
15+
FLOW_BUILDER,
1516
EXECUTOR
1617
};
1718

@@ -39,6 +40,10 @@ inline std::string Error::message(int code) const {
3940
return "success";
4041
break;
4142

43+
case FLOW_BUILDER:
44+
return "flow builder error";
45+
break;
46+
4247
case EXECUTOR:
4348
return "executor error";
4449
break;

taskflow/graph/flow_builder.hpp

Lines changed: 107 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,7 @@ class FlowBuilder {
2929
template <typename T, typename C, std::enable_if_t<is_iterable_v<T>, void>* = nullptr>
3030
auto parallel_for(T&, C&&, size_t = 0);
3131

32-
template <
33-
typename I,
34-
typename C,
35-
std::enable_if_t<std::is_arithmetic_v<I>, void>* = nullptr
36-
>
32+
template <typename I, typename C, std::enable_if_t<std::is_arithmetic_v<I>, void>* = nullptr >
3733
auto parallel_for(I, I, I, C&&, size_t = 0);
3834

3935
template <typename I, typename T, typename B>
@@ -71,6 +67,9 @@ class FlowBuilder {
7167

7268
template <typename L>
7369
void _linearize(L&);
70+
71+
template <typename I>
72+
size_t _estimate_chunk_size(I, I, I);
7473
};
7574

7675
// Constructor
@@ -178,40 +177,93 @@ auto FlowBuilder::parallel_for(T& t, C&& c, size_t group) {
178177
template <
179178
typename I,
180179
typename C,
181-
std::enable_if_t<std::is_arithmetic_v<I>, void>* = nullptr
180+
std::enable_if_t<std::is_arithmetic_v<I>, void>*
182181
>
183-
auto FlowBuilder::parallel_for(I beg, I end, I step, C&& c, size_t g) {
182+
auto FlowBuilder::parallel_for(I beg, I end, I s, C&& c, size_t g) {
184183

185-
if(g == 0) {
186-
auto N = (end - beg + step - 1) / step;
187-
auto w = std::max(unsigned{1}, std::thread::hardware_concurrency());
188-
g = (N + w - 1) / w;
189-
}
184+
using T = std::decay_t<I>;
190185

186+
if((s == 0 && beg != end) || (beg < end && s <= 0) || (beg > end && s >=0) ) {
187+
TF_THROW(Error::FLOW_BUILDER,
188+
"invalid range [", beg, ", ", end, ") with step size ", s
189+
);
190+
}
191+
191192
auto source = placeholder();
192193
auto target = placeholder();
193194

194-
std::cout << "g is " << g << std::endl;
195-
196-
while(beg < end) {
195+
if(g == 0) {
196+
g = _estimate_chunk_size(beg, end, s);
197+
}
197198

198-
auto e = beg + static_cast<I>(g) * step;
199+
// Integer indices
200+
if constexpr(std::is_integral_v<T>) {
199201

200-
std::cout << beg << " " << e << std::endl;
201-
202-
// Create a task
203-
auto task = silent_emplace([beg, e, step, c] () mutable {
204-
for(auto i=beg; i<e; i+=step) {
205-
c(i);
206-
}
207-
});
208-
source.precede(task);
209-
task.precede(target);
202+
auto offset = static_cast<T>(g) * s;
210203

211-
// adjust the pointer
212-
beg = e;
204+
// positive case
205+
if(beg < end) {
206+
while(beg != end) {
207+
auto e = std::min(beg + offset, end);
208+
auto task = silent_emplace([=] () mutable {
209+
for(auto i=beg; i<e; i+=s) {
210+
c(i);
211+
}
212+
});
213+
source.precede(task);
214+
task.precede(target);
215+
beg = e;
216+
}
217+
}
218+
// negative case
219+
else if(beg > end) {
220+
while(beg != end) {
221+
auto e = std::max(beg + offset, end);
222+
auto task = silent_emplace([=] () mutable {
223+
for(auto i=beg; i>e; i+=s) {
224+
c(i);
225+
}
226+
});
227+
source.precede(task);
228+
task.precede(target);
229+
beg = e;
230+
}
231+
}
213232
}
233+
// We enumerate the entire sequence to avoid floating error
234+
else if constexpr(std::is_floating_point_v<T>) {
235+
size_t N = 0;
236+
auto B = beg;
237+
for(auto i=beg; (beg<end ? i<end : i>end); i+=s, ++N) {
238+
if(N == g) {
239+
auto task = silent_emplace([=] () mutable {
240+
auto b = B;
241+
for(size_t n=0; n<N; ++n) {
242+
c(b);
243+
b += s;
244+
}
245+
});
246+
N = 0;
247+
B = i;
248+
source.precede(task);
249+
task.precede(target);
250+
}
251+
}
214252

253+
// the last pices
254+
if(N != 0) {
255+
auto task = silent_emplace([=] () mutable {
256+
auto b = B;
257+
for(size_t n=0; n<N; ++n) {
258+
c(b);
259+
b += s;
260+
}
261+
});
262+
source.precede(task);
263+
task.precede(target);
264+
}
265+
}
266+
215267
return std::make_pair(source, target);
216268
}
217269

@@ -345,6 +397,33 @@ auto FlowBuilder::transform_reduce(I beg, I end, T& result, B&& bop, P&& pop, U&
345397
return std::make_pair(source, target);
346398
}
347399

400+
// Function: _estimate_chunk_size
401+
template <typename I>
402+
size_t FlowBuilder::_estimate_chunk_size(I beg, I end, I step) {
403+
404+
using T = std::decay_t<I>;
405+
406+
size_t w = std::max(unsigned{1}, std::thread::hardware_concurrency());
407+
size_t N = 0;
408+
409+
if constexpr(std::is_integral_v<T>) {
410+
if(beg <= end) {
411+
N = (end - beg + step - 1) / step;
412+
}
413+
else {
414+
N = (end - beg + step + 1) / step;
415+
}
416+
}
417+
else if constexpr(std::is_floating_point_v<T>) {
418+
N = std::ceil((end - beg) / step);
419+
}
420+
else {
421+
static_assert(dependent_false_v<T>, "can't deduce chunk size");
422+
}
423+
424+
return (N + w - 1) / w;
425+
}
426+
348427

349428
// Procedure: _linearize
350429
template <typename L>

taskflow/utility/utility.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <cassert>
2121
#include <optional>
2222
#include <variant>
23+
#include <cmath>
2324

2425
namespace tf {
2526

0 commit comments

Comments
 (0)