Skip to content

Commit dc64219

Browse files
committed
updated multi-condition task
1 parent 6d32183 commit dc64219

12 files changed

Lines changed: 1333 additions & 825 deletions

File tree

examples/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ list(APPEND TF_EXAMPLES
66
subflow
77
fibonacci
88
condition
9+
multi_condition
910
switch_case
1011
do_while
1112
while

examples/multi_condition.cpp

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// This program demonstrates how to use multi-condition task
2+
// to jump to multiple successor tasks
3+
//
4+
// A ----> B
5+
// |
6+
// |---> C
7+
// |
8+
// |---> D
9+
//
10+
#include <taskflow/taskflow.hpp>
11+
12+
int main() {
13+
14+
tf::Executor executor;
15+
tf::Taskflow taskflow("Multi-Conditional Tasking Demo");
16+
17+
auto A = taskflow.emplace([&]() -> tf::SmallVector<int> {
18+
std::cout << "A\n";
19+
return {0, 2};
20+
}).name("A");
21+
auto B = taskflow.emplace([&](){ std::cout << "B\n"; }).name("B");
22+
auto C = taskflow.emplace([&](){ std::cout << "C\n"; }).name("C");
23+
auto D = taskflow.emplace([&](){ std::cout << "D\n"; }).name("D");
24+
25+
A.precede(B, C, D);
26+
27+
// visualizes the taskflow
28+
taskflow.dump(std::cout);
29+
30+
// executes the taskflow
31+
executor.run(taskflow).wait();
32+
33+
return 0;
34+
}
35+

taskflow/core/executor.hpp

Lines changed: 71 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,8 @@ class Executor {
389389
void _invoke_dynamic_task(Worker&, Node*);
390390
void _invoke_dynamic_task_internal(Worker&, Node*, Graph&, bool);
391391
void _invoke_dynamic_task_external(Node*, Graph&, bool);
392-
void _invoke_condition_task(Worker&, Node*, int&);
392+
void _invoke_condition_task(Worker&, Node*, SmallVector<int>&);
393+
void _invoke_multi_condition_task(Worker&, Node*, SmallVector<int>&);
393394
void _invoke_module_task(Worker&, Node*);
394395
void _invoke_async_task(Worker&, Node*);
395396
void _invoke_silent_async_task(Worker&, Node*);
@@ -796,7 +797,8 @@ inline void Executor::_invoke(Worker& worker, Node* node) {
796797
const auto num_successors = node->num_successors();
797798

798799
// condition task
799-
int cond = -1;
800+
//int cond = -1;
801+
SmallVector<int> conds = { -1 };
800802

801803
// switch is faster than nested if-else due to jump table
802804
switch(node->_handle.index()) {
@@ -814,7 +816,13 @@ inline void Executor::_invoke(Worker& worker, Node* node) {
814816

815817
// condition task
816818
case Node::CONDITION: {
817-
_invoke_condition_task(worker, node, cond);
819+
_invoke_condition_task(worker, node, conds);
820+
}
821+
break;
822+
823+
// multi-condition task
824+
case Node::MULTI_CONDITION: {
825+
_invoke_multi_condition_task(worker, node, conds);
818826
}
819827
break;
820828

@@ -852,7 +860,7 @@ inline void Executor::_invoke(Worker& worker, Node* node) {
852860
}
853861
break;
854862

855-
// monostate
863+
// monostate (placeholder)
856864
default:
857865
break;
858866
}
@@ -865,8 +873,8 @@ inline void Executor::_invoke(Worker& worker, Node* node) {
865873
// We MUST recover the dependency since the graph may have cycles.
866874
// This must be done before scheduling the successors, otherwise this might cause
867875
// race condition on the _dependents
868-
//if(node->_has_state(Node::BRANCHED)) {
869-
if((node->_state.load(std::memory_order_relaxed) & Node::BRANCHED)) {
876+
//if(node->_has_state(Node::CONDITIONED)) {
877+
if((node->_state.load(std::memory_order_relaxed) & Node::CONDITIONED)) {
870878
node->_join_counter = node->num_strong_dependents();
871879
}
872880
else {
@@ -879,23 +887,53 @@ inline void Executor::_invoke(Worker& worker, Node* node) {
879887

880888
// At this point, the node storage might be destructed (to be verified)
881889
// case 1: non-condition task
882-
if(node->_handle.index() != Node::CONDITION) {
883-
for(size_t i=0; i<num_successors; ++i) {
884-
if(--(node->_successors[i]->_join_counter) == 0) {
885-
j.fetch_add(1);
886-
_schedule(node->_successors[i]);
890+
switch(node->_handle.index()) {
891+
892+
// condition and multi-condition tasks
893+
case Node::CONDITION:
894+
case Node::MULTI_CONDITION: {
895+
for(auto cond : conds) {
896+
if(cond >= 0 && static_cast<size_t>(cond) < num_successors) {
897+
auto s = node->_successors[cond];
898+
// zeroing the join counter seems redundant but it keeps invariant
899+
s->_join_counter.store(0, std::memory_order_relaxed);
900+
j.fetch_add(1);
901+
_schedule(s);
902+
}
887903
}
888904
}
889-
}
890-
// case 2: condition task
891-
else {
892-
if(cond >= 0 && static_cast<size_t>(cond) < num_successors) {
893-
auto s = node->_successors[cond];
894-
s->_join_counter.store(0); // seems redundant but just for invariant
895-
j.fetch_add(1);
896-
_schedule(s);
905+
break;
906+
907+
// non-condition task
908+
default: {
909+
for(size_t i=0; i<num_successors; ++i) {
910+
if(--(node->_successors[i]->_join_counter) == 0) {
911+
j.fetch_add(1);
912+
_schedule(node->_successors[i]);
913+
}
914+
}
897915
}
916+
break;
898917
}
918+
919+
//// case 1: non-condition task
920+
//if(node->_handle.index() != Node::CONDITION) {
921+
// for(size_t i=0; i<num_successors; ++i) {
922+
// if(--(node->_successors[i]->_join_counter) == 0) {
923+
// j.fetch_add(1);
924+
// _schedule(node->_successors[i]);
925+
// }
926+
// }
927+
//}
928+
//// case 2: condition task
929+
//else {
930+
// if(cond >= 0 && static_cast<size_t>(cond) < num_successors) {
931+
// auto s = node->_successors[cond];
932+
// s->_join_counter.store(0); // seems redundant but just for invariant
933+
// j.fetch_add(1);
934+
// _schedule(s);
935+
// }
936+
//}
899937

900938
// tear_down the invoke
901939
_tear_down_invoke(node, false);
@@ -1075,10 +1113,19 @@ inline void Executor::_invoke_dynamic_task_internal(
10751113

10761114
// Procedure: _invoke_condition_task
10771115
inline void Executor::_invoke_condition_task(
1078-
Worker& worker, Node* node, int& cond
1116+
Worker& worker, Node* node, SmallVector<int>& conds
1117+
) {
1118+
_observer_prologue(worker, node);
1119+
conds[0] = std::get_if<Node::Condition>(&node->_handle)->work();
1120+
_observer_epilogue(worker, node);
1121+
}
1122+
1123+
// Procedure: _invoke_multi_condition_task
1124+
inline void Executor::_invoke_multi_condition_task(
1125+
Worker& worker, Node* node, SmallVector<int>& conds
10791126
) {
10801127
_observer_prologue(worker, node);
1081-
cond = std::get_if<Node::Condition>(&node->_handle)->work();
1128+
conds = std::get_if<Node::MultiCondition>(&node->_handle)->work();
10821129
_observer_epilogue(worker, node);
10831130
}
10841131

@@ -1302,7 +1349,7 @@ inline void Executor::_tear_down_topology(Topology* tpg) {
13021349
// case 2: the final run of this topology
13031350
else {
13041351

1305-
// TODO: if the topology is cancelled, need to release all constraints
1352+
// TODO: if the topology is cancelled, need to release all semaphores
13061353

13071354
if(tpg->_call != nullptr) {
13081355
tpg->_call();
@@ -1352,8 +1399,8 @@ inline void Executor::_tear_down_topology(Topology* tpg) {
13521399
_decrement_topology_and_notify();
13531400

13541401
// remove the taskflow if it is managed by the executor
1355-
// TODO: we may need to synchronize on wait (which means the following
1356-
// code should the moved before set_value
1402+
// TODO: in the future, we may need to synchronize on wait
1403+
// (which means the following code should the moved before set_value)
13571404
if(s) {
13581405
std::scoped_lock<std::mutex> lock(_taskflow_mutex);
13591406
_taskflows.erase(*s);

taskflow/core/flow_builder.hpp

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,10 @@ class FlowBuilder {
8585
tf::Taskflow taskflow;
8686
8787
auto [init, cond, yes, no] = taskflow.emplace(
88-
[] () { },
89-
[] () { return 0; },
90-
[] () { std::cout << "yes\n"; },
91-
[] () { std::cout << "no\n"; }
88+
[] () { },
89+
[] () { return 0; },
90+
[] () { std::cout << "yes\n"; },
91+
[] () { std::cout << "no\n"; }
9292
);
9393
9494
// executes yes if cond returns 0, or no if cond returns 1
@@ -102,6 +102,42 @@ class FlowBuilder {
102102
std::enable_if_t<is_condition_task_v<C>, void>* = nullptr
103103
>
104104
Task emplace(C&& callable);
105+
106+
/**
107+
@brief creates a multi-condition task
108+
109+
@tparam C callable type constructible from
110+
std::function<tf::SmallVector<int>()>
111+
112+
@param callable callable to construct a multi-condition task
113+
114+
@return a tf::Task handle
115+
116+
The following example creates a multi-condition task that selectively
117+
jumps to two successor tasks.
118+
119+
@code{.cpp}
120+
tf::Taskflow taskflow;
121+
122+
auto [init, cond, branch1, branch2, branch3] = taskflow.emplace(
123+
[] () { },
124+
[] () { return tf::SmallVector{0, 2}; },
125+
[] () { std::cout << "branch1\n"; },
126+
[] () { std::cout << "branch2\n"; },
127+
[] () { std::cout << "branch3\n"; }
128+
);
129+
130+
// executes branch1 and branch3 when cond returns 0 and 2
131+
cond.precede(branch1, branch2, branch3);
132+
cond.succeed(init);
133+
@endcode
134+
135+
Please refer to @ref ConditionalTasking for details.
136+
*/
137+
template <typename C,
138+
std::enable_if_t<is_multi_condition_task_v<C>, void>* = nullptr
139+
>
140+
Task emplace(C&& callable);
105141

106142
/**
107143
@brief creates multiple tasks from a list of callable objects
@@ -547,6 +583,14 @@ Task FlowBuilder::emplace(C&& c) {
547583
));
548584
}
549585

586+
// Function: emplace
587+
template <typename C, std::enable_if_t<is_multi_condition_task_v<C>, void>*>
588+
Task FlowBuilder::emplace(C&& c) {
589+
return Task(_graph.emplace_back(
590+
std::in_place_type_t<Node::MultiCondition>{}, std::forward<C>(c)
591+
));
592+
}
593+
550594
// Function: emplace
551595
template <typename... C, std::enable_if_t<(sizeof...(C)>1), void>*>
552596
auto FlowBuilder::emplace(C&&... cs) {

0 commit comments

Comments
 (0)