@@ -389,7 +389,8 @@ class Executor {
389389 void _invoke_dynamic_task (Worker&, Node*);
390390 void _invoke_dynamic_task_internal (Worker&, Node*, Graph&, bool );
391391 void _invoke_dynamic_task_external (Node*, Graph&, bool );
392- void _invoke_condition_task (Worker&, Node*, int &);
392+ void _invoke_condition_task (Worker&, Node*, SmallVector<int >&);
393+ void _invoke_multi_condition_task (Worker&, Node*, SmallVector<int >&);
393394 void _invoke_module_task (Worker&, Node*);
394395 void _invoke_async_task (Worker&, Node*);
395396 void _invoke_silent_async_task (Worker&, Node*);
@@ -796,7 +797,8 @@ inline void Executor::_invoke(Worker& worker, Node* node) {
796797 const auto num_successors = node->num_successors ();
797798
798799 // condition task
799- int cond = -1 ;
800+ // int cond = -1;
801+ SmallVector<int > conds = { -1 };
800802
801803 // switch is faster than nested if-else due to jump table
802804 switch (node->_handle .index ()) {
@@ -814,7 +816,13 @@ inline void Executor::_invoke(Worker& worker, Node* node) {
814816
815817 // condition task
816818 case Node::CONDITION: {
817- _invoke_condition_task (worker, node, cond);
819+ _invoke_condition_task (worker, node, conds);
820+ }
821+ break ;
822+
823+ // multi-condition task
824+ case Node::MULTI_CONDITION: {
825+ _invoke_multi_condition_task (worker, node, conds);
818826 }
819827 break ;
820828
@@ -852,7 +860,7 @@ inline void Executor::_invoke(Worker& worker, Node* node) {
852860 }
853861 break ;
854862
855- // monostate
863+ // monostate (placeholder)
856864 default :
857865 break ;
858866 }
@@ -865,8 +873,8 @@ inline void Executor::_invoke(Worker& worker, Node* node) {
865873 // We MUST recover the dependency since the graph may have cycles.
866874 // This must be done before scheduling the successors, otherwise this might cause
867875 // race condition on the _dependents
868- // if(node->_has_state(Node::BRANCHED )) {
869- if ((node->_state .load (std::memory_order_relaxed) & Node::BRANCHED )) {
876+ // if(node->_has_state(Node::CONDITIONED )) {
877+ if ((node->_state .load (std::memory_order_relaxed) & Node::CONDITIONED )) {
870878 node->_join_counter = node->num_strong_dependents ();
871879 }
872880 else {
@@ -879,23 +887,53 @@ inline void Executor::_invoke(Worker& worker, Node* node) {
879887
880888 // At this point, the node storage might be destructed (to be verified)
881889 // case 1: non-condition task
882- if (node->_handle .index () != Node::CONDITION) {
883- for (size_t i=0 ; i<num_successors; ++i) {
884- if (--(node->_successors [i]->_join_counter ) == 0 ) {
885- j.fetch_add (1 );
886- _schedule (node->_successors [i]);
890+ switch (node->_handle .index ()) {
891+
892+ // condition and multi-condition tasks
893+ case Node::CONDITION:
894+ case Node::MULTI_CONDITION: {
895+ for (auto cond : conds) {
896+ if (cond >= 0 && static_cast <size_t >(cond) < num_successors) {
897+ auto s = node->_successors [cond];
898+ // zeroing the join counter seems redundant but it keeps invariant
899+ s->_join_counter .store (0 , std::memory_order_relaxed);
900+ j.fetch_add (1 );
901+ _schedule (s);
902+ }
887903 }
888904 }
889- }
890- // case 2: condition task
891- else {
892- if (cond >= 0 && static_cast <size_t >(cond) < num_successors) {
893- auto s = node->_successors [cond];
894- s->_join_counter .store (0 ); // seems redundant but just for invariant
895- j.fetch_add (1 );
896- _schedule (s);
905+ break ;
906+
907+ // non-condition task
908+ default : {
909+ for (size_t i=0 ; i<num_successors; ++i) {
910+ if (--(node->_successors [i]->_join_counter ) == 0 ) {
911+ j.fetch_add (1 );
912+ _schedule (node->_successors [i]);
913+ }
914+ }
897915 }
916+ break ;
898917 }
918+
919+ // // case 1: non-condition task
920+ // if(node->_handle.index() != Node::CONDITION) {
921+ // for(size_t i=0; i<num_successors; ++i) {
922+ // if(--(node->_successors[i]->_join_counter) == 0) {
923+ // j.fetch_add(1);
924+ // _schedule(node->_successors[i]);
925+ // }
926+ // }
927+ // }
928+ // // case 2: condition task
929+ // else {
930+ // if(cond >= 0 && static_cast<size_t>(cond) < num_successors) {
931+ // auto s = node->_successors[cond];
932+ // s->_join_counter.store(0); // seems redundant but just for invariant
933+ // j.fetch_add(1);
934+ // _schedule(s);
935+ // }
936+ // }
899937
900938 // tear_down the invoke
901939 _tear_down_invoke (node, false );
@@ -1075,10 +1113,19 @@ inline void Executor::_invoke_dynamic_task_internal(
10751113
10761114// Procedure: _invoke_condition_task
10771115inline void Executor::_invoke_condition_task (
1078- Worker& worker, Node* node, int & cond
1116+ Worker& worker, Node* node, SmallVector<int >& conds
1117+ ) {
1118+ _observer_prologue (worker, node);
1119+ conds[0 ] = std::get_if<Node::Condition>(&node->_handle )->work ();
1120+ _observer_epilogue (worker, node);
1121+ }
1122+
1123+ // Procedure: _invoke_multi_condition_task
1124+ inline void Executor::_invoke_multi_condition_task (
1125+ Worker& worker, Node* node, SmallVector<int >& conds
10791126) {
10801127 _observer_prologue (worker, node);
1081- cond = std::get_if<Node::Condition >(&node->_handle )->work ();
1128+ conds = std::get_if<Node::MultiCondition >(&node->_handle )->work ();
10821129 _observer_epilogue (worker, node);
10831130}
10841131
@@ -1302,7 +1349,7 @@ inline void Executor::_tear_down_topology(Topology* tpg) {
13021349 // case 2: the final run of this topology
13031350 else {
13041351
1305- // TODO: if the topology is cancelled, need to release all constraints
1352+ // TODO: if the topology is cancelled, need to release all semaphores
13061353
13071354 if (tpg->_call != nullptr ) {
13081355 tpg->_call ();
@@ -1352,8 +1399,8 @@ inline void Executor::_tear_down_topology(Topology* tpg) {
13521399 _decrement_topology_and_notify ();
13531400
13541401 // remove the taskflow if it is managed by the executor
1355- // TODO: we may need to synchronize on wait (which means the following
1356- // code should the moved before set_value
1402+ // TODO: in the future, we may need to synchronize on wait
1403+ // (which means the following code should the moved before set_value)
13571404 if (s) {
13581405 std::scoped_lock<std::mutex> lock (_taskflow_mutex);
13591406 _taskflows.erase (*s);
0 commit comments