Skip to content

Commit f254d69

Browse files
committed
updated sequential/parallel run tests
1 parent dea5a35 commit f254d69

3 files changed

Lines changed: 72 additions & 52 deletions

File tree

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ set_target_properties(taskflow_test_tmp PROPERTIES OUTPUT_NAME "taskflow")
260260
add_test(builder ${TF_UTEST_DIR}/taskflow -tc=Builder)
261261
add_test(creation ${TF_UTEST_DIR}/taskflow -tc=Creation)
262262
add_test(dispatch ${TF_UTEST_DIR}/taskflow -tc=Dispatch)
263-
add_test(multiple_runs ${TF_UTEST_DIR}/taskflow -tc=MultipleRuns)
263+
add_test(sequential_runs ${TF_UTEST_DIR}/taskflow -tc=SequentialRuns)
264264
add_test(parallel_runs ${TF_UTEST_DIR}/taskflow -tc=ParallelRuns)
265265
add_test(parallel_for ${TF_UTEST_DIR}/taskflow -tc=ParallelFor)
266266
add_test(parallel_for_idx ${TF_UTEST_DIR}/taskflow -tc=ParallelForOnIndex)

taskflow/core/executor.hpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1002,8 +1002,10 @@ std::future<void> Executor::run_until(Taskflow& f, P&& pred, C&& c) {
10021002

10031003
// Special case of predicate
10041004
if(std::invoke(pred)) {
1005+
std::promise<void> promise;
1006+
promise.set_value();
10051007
_decrement_topology_and_notify();
1006-
return std::async(std::launch::deferred, [](){});
1008+
return promise.get_future();
10071009
}
10081010

10091011
// Special case of zero workers requires:
@@ -1031,10 +1033,12 @@ std::future<void> Executor::run_until(Taskflow& f, P&& pred, C&& c) {
10311033
if(tpg._call != nullptr) {
10321034
std::invoke(tpg._call);
10331035
}
1036+
1037+
tpg._promise.set_value();
10341038

10351039
_decrement_topology_and_notify();
10361040

1037-
return std::async(std::launch::deferred, [](){});
1041+
return tpg._promise.get_future();
10381042
}
10391043

10401044
// Multi-threaded execution.

unittest/taskflow.cpp

Lines changed: 65 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,9 @@ TEST_CASE("Builder" * doctest::timeout(300)) {
9797
SUBCASE("LinearCounter"){
9898
for(size_t i=0;i<num_tasks;i++){
9999
tasks.emplace_back(
100-
taskflow.emplace([&counter, i]() { REQUIRE(counter == i); counter += 1;})
100+
taskflow.emplace([&counter, i]() {
101+
REQUIRE(counter == i); counter += 1;}
102+
)
101103
);
102104
if(i>0){
103105
taskflow.precede(tasks[i-1], tasks[i]);
@@ -124,7 +126,9 @@ TEST_CASE("Builder" * doctest::timeout(300)) {
124126
SUBCASE("Gather"){
125127
auto dst = taskflow.emplace([&]() { REQUIRE(counter == num_tasks - 1);});
126128
for(size_t i=1;i<num_tasks;i++){
127-
silent_tasks.emplace_back(taskflow.emplace([&counter]() {counter += 1;}));
129+
silent_tasks.emplace_back(
130+
taskflow.emplace([&counter]() {counter += 1;})
131+
);
128132
}
129133
dst.gather(silent_tasks);
130134
executor.run(taskflow).get();
@@ -135,7 +139,9 @@ TEST_CASE("Builder" * doctest::timeout(300)) {
135139
SUBCASE("MapReduce"){
136140
auto src = taskflow.emplace([&counter]() {counter = 0;});
137141
for(size_t i=0;i<num_tasks;i++){
138-
silent_tasks.emplace_back(taskflow.emplace([&counter]() {counter += 1;}));
142+
silent_tasks.emplace_back(
143+
taskflow.emplace([&counter]() {counter += 1;})
144+
);
139145
}
140146
taskflow.broadcast(src, silent_tasks);
141147
auto dst = taskflow.emplace(
@@ -149,7 +155,9 @@ TEST_CASE("Builder" * doctest::timeout(300)) {
149155
SUBCASE("Linearize"){
150156
for(size_t i=0;i<num_tasks;i++){
151157
silent_tasks.emplace_back(
152-
taskflow.emplace([&counter, i]() { REQUIRE(counter == i); counter += 1;})
158+
taskflow.emplace([&counter, i]() {
159+
REQUIRE(counter == i); counter += 1;}
160+
)
153161
);
154162
}
155163
taskflow.linearize(silent_tasks);
@@ -162,7 +170,9 @@ TEST_CASE("Builder" * doctest::timeout(300)) {
162170
auto src = taskflow.emplace([&counter]() {counter = 0;});
163171
for(size_t i=0;i<num_tasks;i++){
164172
silent_tasks.emplace_back(
165-
taskflow.emplace([&counter, i]() { REQUIRE(counter == i); counter += 1; })
173+
taskflow.emplace([&counter, i]() {
174+
REQUIRE(counter == i); counter += 1; }
175+
)
166176
);
167177
}
168178
taskflow.broadcast(src, silent_tasks);
@@ -215,9 +225,9 @@ TEST_CASE("Creation" * doctest::timeout(300)) {
215225
}
216226

217227
// --------------------------------------------------------
218-
// Testcase: Run
228+
// Testcase: SequentialRun
219229
// --------------------------------------------------------
220-
TEST_CASE("Run" * doctest::timeout(300)) {
230+
TEST_CASE("SequentialRun" * doctest::timeout(300)) {
221231

222232
using namespace std::chrono_literals;
223233

@@ -233,13 +243,15 @@ TEST_CASE("Run" * doctest::timeout(300)) {
233243
std::vector<tf::Task> silent_tasks;
234244

235245
for(size_t i=0;i<num_tasks;i++){
236-
silent_tasks.emplace_back(taskflow.emplace([&counter]() {counter += 1;}));
246+
silent_tasks.emplace_back(
247+
taskflow.emplace([&counter]() {counter += 1;})
248+
);
237249
}
238250

239251
SUBCASE("RunOnce"){
240252
auto fu = executor.run(taskflow);
241253
REQUIRE(taskflow.num_nodes() == num_tasks);
242-
REQUIRE(fu.wait_for(1s) == std::future_status::ready);
254+
fu.get();
243255
REQUIRE(counter == num_tasks);
244256
}
245257

@@ -249,7 +261,7 @@ TEST_CASE("Run" * doctest::timeout(300)) {
249261
REQUIRE(counter == num_tasks);
250262
}
251263

252-
SUBCASE("RunVariants") {
264+
SUBCASE("RunWithFuture") {
253265
// Empty subflow test
254266
for(unsigned W=0; W<=4; ++W) {
255267

@@ -271,7 +283,7 @@ TEST_CASE("Run" * doctest::timeout(300)) {
271283
C.precede(D);
272284

273285
tf::Executor executor(W);
274-
std::list<std::shared_future<void>> fu_list;
286+
std::list<std::future<void>> fu_list;
275287
for(size_t i=0; i<500; i++) {
276288
if(i == 499) {
277289
executor.run(f).get(); // Synchronize the first 500 runs
@@ -281,20 +293,26 @@ TEST_CASE("Run" * doctest::timeout(300)) {
281293
fu_list.push_back(executor.run(f));
282294
}
283295
else {
284-
fu_list.push_back(executor.run(f, [&, i=i](){ REQUIRE(count == (i+1)*7); }));
296+
fu_list.push_back(executor.run(f, [&, i=i](){
297+
REQUIRE(count == (i+1)*7); })
298+
);
285299
}
286300
}
301+
302+
executor.wait_for_all();
287303

288304
for(auto& fu: fu_list) {
289-
REQUIRE(fu.wait_for(std::chrono::seconds(0)) == std::future_status::ready);
305+
REQUIRE(fu.valid());
306+
REQUIRE(fu.wait_for(std::chrono::seconds(1)) == std::future_status::ready);
290307
}
291308

292-
executor.wait_for_all();
293-
294309
REQUIRE(count == 7000);
295310
}
311+
}
312+
313+
SUBCASE("RunWithChange") {
296314

297-
// TODO: test correctness when taskflow got changed between runs
315+
// test correctness when taskflow got changed between runs
298316
for(unsigned W=0; W<=4; ++W) {
299317

300318
std::atomic<size_t> count {0};
@@ -329,7 +347,9 @@ TEST_CASE("Run" * doctest::timeout(300)) {
329347
executor.wait_for_all();
330348
REQUIRE(count == 210);
331349
}
350+
}
332351

352+
SUBCASE("RunWithPred") {
333353
// Test run_until
334354
for(unsigned W=0; W<=4; ++W) {
335355

@@ -376,43 +396,39 @@ TEST_CASE("Run" * doctest::timeout(300)) {
376396
).get();
377397
}
378398
}
379-
}
380399

381-
// --------------------------------------------------------
382-
// Testcase: MultipleRuns
383-
// --------------------------------------------------------
384-
TEST_CASE("MultipleRuns" * doctest::timeout(300)) {
385-
386-
for(size_t W=0; W<=8; ++W) {
387-
tf::Executor executor(W);
388-
std::atomic<size_t> counter(0);
400+
SUBCASE("MultipleRuns") {
401+
for(size_t W=0; W<=8; ++W) {
402+
tf::Executor executor(W);
403+
std::atomic<size_t> counter(0);
389404

390-
tf::Taskflow tf1, tf2, tf3, tf4;
405+
tf::Taskflow tf1, tf2, tf3, tf4;
391406

392-
for(size_t n=0; n<16; ++n) {
393-
tf1.emplace([&](){counter.fetch_add(1, std::memory_order_relaxed);});
394-
}
395-
396-
for(size_t n=0; n<1024; ++n) {
397-
tf2.emplace([&](){counter.fetch_add(1, std::memory_order_relaxed);});
398-
}
399-
400-
for(size_t n=0; n<32; ++n) {
401-
tf3.emplace([&](){counter.fetch_add(1, std::memory_order_relaxed);});
402-
}
403-
404-
for(size_t n=0; n<128; ++n) {
405-
tf4.emplace([&](){counter.fetch_add(1, std::memory_order_relaxed);});
406-
}
407-
408-
for(int i=0; i<200; ++i) {
409-
executor.run(tf1);
410-
executor.run(tf2);
411-
executor.run(tf3);
412-
executor.run(tf4);
407+
for(size_t n=0; n<16; ++n) {
408+
tf1.emplace([&](){counter.fetch_add(1, std::memory_order_relaxed);});
409+
}
410+
411+
for(size_t n=0; n<1024; ++n) {
412+
tf2.emplace([&](){counter.fetch_add(1, std::memory_order_relaxed);});
413+
}
414+
415+
for(size_t n=0; n<32; ++n) {
416+
tf3.emplace([&](){counter.fetch_add(1, std::memory_order_relaxed);});
417+
}
418+
419+
for(size_t n=0; n<128; ++n) {
420+
tf4.emplace([&](){counter.fetch_add(1, std::memory_order_relaxed);});
421+
}
422+
423+
for(int i=0; i<200; ++i) {
424+
executor.run(tf1);
425+
executor.run(tf2);
426+
executor.run(tf3);
427+
executor.run(tf4);
428+
}
429+
executor.wait_for_all();
430+
REQUIRE(counter == 240000);
413431
}
414-
executor.wait_for_all();
415-
REQUIRE(counter == 240000);
416432
}
417433
}
418434

0 commit comments

Comments
 (0)