@@ -1075,8 +1075,6 @@ class Executor {
10751075 void _explore_task (Worker&, Node*&);
10761076 void _schedule (Worker&, Node*);
10771077 void _schedule (Node*);
1078- void _schedule (Worker&, const std::vector<Node*>&, size_t );
1079- void _schedule (const std::vector<Node*>&, size_t );
10801078 void _set_up_topology (Worker*, Topology*);
10811079 void _tear_down_topology (Worker&, Topology*);
10821080 void _tear_down_async (Node*);
@@ -1086,7 +1084,6 @@ class Executor {
10861084 void _decrement_topology ();
10871085 void _invoke (Worker&, Node*);
10881086 void _invoke_static_task (Worker&, Node*);
1089- void _detach_subflow_task (Worker&, Node*, Graph&);
10901087 void _invoke_condition_task (Worker&, Node*, SmallVector<int >&);
10911088 void _invoke_multi_condition_task (Worker&, Node*, SmallVector<int >&);
10921089 void _invoke_async_task (Worker&, Node*);
@@ -1099,11 +1096,17 @@ class Executor {
10991096 bool _invoke_subflow_task (Worker&, Node*);
11001097 bool _invoke_module_task (Worker&, Node*);
11011098
1102- size_t _set_up_graph (Graph&, Node*, Topology*, int );
1099+ template <typename I>
1100+ I _set_up_graph (I first, I second, Node*, Topology*, int );
11031101
11041102 template <typename P>
11051103 void _corun_until (Worker&, P&&);
1104+
1105+ template <typename I>
1106+ void _schedule (Worker&, I first, I last);
11061107
1108+ template <typename I>
1109+ void _schedule (I first, I last);
11071110};
11081111
11091112#ifndef DOXYGEN_GENERATING_OUTPUT
@@ -1435,45 +1438,48 @@ inline void Executor::_schedule(Node* node) {
14351438}
14361439
14371440// Procedure: _schedule
1438- inline void Executor::_schedule (Worker& worker, const std::vector<Node*>& nodes, size_t num_nodes) {
1441+ template <typename I>
1442+ void Executor::_schedule (Worker& worker, I first, I last) {
14391443
14401444 // assert(nodes.size() >= num_nodes);
14411445
1442- if (num_nodes == 0 ) {
1446+ if (first == last ) {
14431447 return ;
14441448 }
14451449
14461450 // caller is a worker to this pool - starting at v3.5 we do not use
14471451 // any complicated notification mechanism as the experimental result
14481452 // has shown no significant advantage.
14491453 if (worker._executor == this ) {
1450- for (size_t i= 0 ; i<num_nodes ; ++i ) {
1451- worker._wsq .push (nodes[i] , [&](){ _freelist.push (worker._id , nodes[i] ); });
1454+ for (; first != last ; ++first ) {
1455+ worker._wsq .push (*first , [&](){ _freelist.push (worker._id , *first ); });
14521456 _notifier.notify_one ();
14531457 }
14541458 return ;
14551459 }
14561460
1457- for (size_t k=0 ; k<num_nodes; ++k) {
1458- _freelist.push (nodes[k]);
1461+ size_t n = 0 ;
1462+ for (; first != last; ++first, ++n) {
1463+ _freelist.push (*first);
14591464 }
1460- _notifier.notify_n (num_nodes );
1465+ _notifier.notify_n (n );
14611466}
14621467
14631468// Procedure: _schedule
1464- inline void Executor::_schedule (const std::vector<Node*>& nodes, size_t num_nodes) {
1469+ template <typename I>
1470+ inline void Executor::_schedule (I first, I last) {
14651471
14661472 // assert(nodes.size() >= num_nodes);
14671473
1468- if (num_nodes == 0 ) {
1474+ if (first == last ) {
14691475 return ;
14701476 }
1471-
1472- for (size_t k=0 ; k<num_nodes; ++k) {
1473- _freelist.push (nodes[k]);
1477+
1478+ size_t n = 0 ;
1479+ for (; first != last; ++first, ++n) {
1480+ _freelist.push (*first);
14741481 }
1475-
1476- _notifier.notify_n (num_nodes);
1482+ _notifier.notify_n (n);
14771483}
14781484
14791485// Procedure: _invoke
@@ -1720,24 +1726,30 @@ inline bool Executor::_invoke_subflow_task(Worker& w, Node* node) {
17201726 // TF_EXECUTOR_EXCEPTION_HANDLER(w, node, {
17211727
17221728 if ((node->_state .load (std::memory_order_relaxed) & Node::PREEMPTED) == 0 ) {
1723- auto handle = std::get_if<Node::Subflow>(&node->_handle );
1724- handle->subgraph ._clear ();
1725- Subflow sf (*this , w, node, handle->subgraph );
17261729
1730+ auto & h = *std::get_if<Node::Subflow>(&node->_handle );
1731+ auto & g = h.subgraph ;
1732+
1733+ // set up the subflow
1734+ Subflow sf (*this , w, node, g);
1735+
1736+ // invoke the subflow callable
17271737 _observer_prologue (w, node);
1728- handle-> work (sf);
1738+ h. work (sf);
17291739 _observer_epilogue (w, node);
17301740
17311741 // spawn the subflow if it is joinable and its graph is non-empty
1732- if (sf._joinable && handle->subgraph .size ()) {
1742+ // implicit join is faster than Subflow::join as it does not involve corun
1743+ if (sf.joinable () && g._nodes .size () > sf._tag ) {
17331744
17341745 // signal the executor to preempt this node
17351746 node->_state .fetch_or (Node::PREEMPTED, std::memory_order_relaxed);
17361747
17371748 // set up and schedule the graph
1738- auto N = _set_up_graph (handle->subgraph , node, node->_topology , 0 );
1739- node->_join_counter .fetch_add (N, std::memory_order_relaxed);
1740- _schedule (w, handle->subgraph ._nodes , N);
1749+ auto sbeg = g._nodes .begin () + sf._tag ;
1750+ auto send = _set_up_graph (sbeg, g._nodes .end (), node, node->_topology , 0 );
1751+ node->_join_counter .fetch_add (send - sbeg, std::memory_order_relaxed);
1752+ _schedule (w, sbeg, send);
17411753 return true ;
17421754 }
17431755 // node->_process_exception();
@@ -1749,30 +1761,6 @@ inline bool Executor::_invoke_subflow_task(Worker& w, Node* node) {
17491761 return false ;
17501762}
17511763
1752- // Procedure: _detach_subflow_task
1753- inline void Executor::_detach_subflow_task (Worker& w, Node* p, Graph& g) {
1754-
1755- // graph is empty and has no async tasks
1756- if (g.empty () && p->_join_counter .load (std::memory_order_acquire) == 0 ) {
1757- return ;
1758- }
1759-
1760- auto N = _set_up_graph (g, nullptr , p->_topology , Node::DETACHED);
1761-
1762- // need to fetch the node to a local vector since the graph will be
1763- // merged to the parent taskflow with move semantics
1764- std::vector<Node*> local (N);
1765- std::copy (g._nodes .begin (), g._nodes .begin () + N, local.begin ());
1766-
1767- {
1768- std::lock_guard<std::mutex> lock (p->_topology ->_taskflow ._mutex );
1769- p->_topology ->_taskflow ._graph ._merge (std::move (g));
1770- }
1771-
1772- p->_topology ->_join_counter .fetch_add (N, std::memory_order_relaxed);
1773- _schedule (w, local, N);
1774- }
1775-
17761764// Procedure: _corun_graph
17771765inline void Executor::_corun_graph (Worker& w, Node* p, Graph& g) {
17781766
@@ -1783,9 +1771,9 @@ inline void Executor::_corun_graph(Worker& w, Node* p, Graph& g) {
17831771 return ;
17841772 }
17851773
1786- auto N = _set_up_graph (g, p, p->_topology , 0 );
1787- p->_join_counter .fetch_add (N , std::memory_order_relaxed);
1788- _schedule (w, g._nodes , N );
1774+ auto send = _set_up_graph (g. _nodes . begin (), g. _nodes . end () , p, p->_topology , 0 );
1775+ p->_join_counter .fetch_add (send - g. _nodes . begin () , std::memory_order_relaxed);
1776+ _schedule (w, g._nodes . begin (), send );
17891777
17901778 _corun_until (w, [p] () -> bool {
17911779 return p->_join_counter .load (std::memory_order_acquire) == 0 ; }
@@ -1848,9 +1836,9 @@ inline bool Executor::_invoke_module_task(Worker& w, Node* node) {
18481836 // signal the executor to preempt this node
18491837 node->_state .fetch_or (Node::PREEMPTED, std::memory_order_relaxed);
18501838
1851- auto N = _set_up_graph (m->graph , node, node->_topology , 0 );
1852- node->_join_counter .fetch_add (N , std::memory_order_relaxed);
1853- _schedule (w, m->graph ._nodes , N );
1839+ auto send = _set_up_graph (m->graph . _nodes . begin (), m-> graph . _nodes . end () , node, node->_topology , 0 );
1840+ node->_join_counter .fetch_add (send - m-> graph . _nodes . begin () , std::memory_order_relaxed);
1841+ _schedule (w, m->graph ._nodes . begin (), send );
18541842 return true ;
18551843 }
18561844 // second entry - already spawned
@@ -2080,34 +2068,80 @@ inline void Executor::wait_for_all() {
20802068}
20812069
20822070// Function: _set_up_topology
2083- inline void Executor::_set_up_topology (Worker* worker , Topology* tpg) {
2071+ inline void Executor::_set_up_topology (Worker* w , Topology* tpg) {
20842072
20852073 // ---- under taskflow lock ----
2086- tpg->_taskflow ._graph ._clear_detached ();
2087- tpg->_num_sources = _set_up_graph (tpg->_taskflow ._graph , nullptr , tpg, 0 );
2074+ auto & g = tpg->_taskflow ._graph ;
2075+ // g._clear_detached();
2076+
2077+ auto send = _set_up_graph (g._nodes .begin (), g._nodes .end (), nullptr , tpg, 0 );
2078+ tpg->_num_sources = send - g._nodes .begin ();
20882079 tpg->_join_counter .store (tpg->_num_sources , std::memory_order_relaxed);
20892080
2090- worker ? _schedule (*worker, tpg->_taskflow ._graph ._nodes , tpg->_num_sources ) :
2091- _schedule ( tpg->_taskflow ._graph ._nodes , tpg->_num_sources );
2081+ w ? _schedule (*w, g._nodes .begin (), send) : _schedule (g._nodes .begin (), send);
20922082}
20932083
20942084// Function: _set_up_graph
2095- inline size_t Executor::_set_up_graph (
2096- Graph& g, Node* parent, Topology* tpg, int state
2097- ) {
2098- size_t num_sources = 0 ;
2099- for (size_t i=0 ; i<g._nodes .size (); i++) {
2100- auto node = g._nodes [i];
2085+ // inline size_t Executor::_set_up_graph(
2086+ // Graph& g, Node* parent, Topology* tpg, int state
2087+ // ) {
2088+ // size_t num_sources = 0;
2089+ // for(size_t i=0; i<g._nodes.size(); i++) {
2090+ // auto node = g._nodes[i];
2091+ // node->_topology = tpg;
2092+ // node->_parent = parent;
2093+ // node->_state.store(state, std::memory_order_relaxed);
2094+ // node->_set_up_join_counter();
2095+ // node->_exception_ptr = nullptr;
2096+ // if(node->num_dependents() == 0) {
2097+ // std::swap(g._nodes[num_sources++], g._nodes[i]);
2098+ // }
2099+ // }
2100+ // return num_sources;
2101+ // }
2102+
2103+ // Function: _set_up_graph
2104+ template <typename I>
2105+ I Executor::_set_up_graph (I first, I last, Node* parent, Topology* tpg, int state) {
2106+
2107+ auto send = first;
2108+ for (; first != last; ++first) {
2109+ auto node = *first;
21012110 node->_topology = tpg;
21022111 node->_parent = parent;
21032112 node->_state .store (state, std::memory_order_relaxed);
21042113 node->_set_up_join_counter ();
21052114 node->_exception_ptr = nullptr ;
21062115 if (node->num_dependents () == 0 ) {
2107- std::swap (g._nodes [num_sources++], g._nodes [i]);
2116+ std::iter_swap (send++, first);
2117+ }
2118+
2119+ // handle-specific clear
2120+ switch (node->_handle .index ()) {
2121+ case Node::SUBFLOW: {
2122+ std::get_if<Node::Subflow>(&node->_handle )->subgraph .clear ();
2123+ } break ;
2124+
2125+ default :
2126+ break ;
21082127 }
21092128 }
2110- return num_sources;
2129+ return send;
2130+
2131+
2132+ // size_t num_sources = 0;
2133+ // for(size_t i=0; i<g._nodes.size(); i++) {
2134+ // auto node = g._nodes[i];
2135+ // node->_topology = tpg;
2136+ // node->_parent = parent;
2137+ // node->_state.store(state, std::memory_order_relaxed);
2138+ // node->_set_up_join_counter();
2139+ // node->_exception_ptr = nullptr;
2140+ // if(node->num_dependents() == 0) {
2141+ // std::swap(g._nodes[num_sources++], g._nodes[i]);
2142+ // }
2143+ // }
2144+ // return num_sources;
21112145}
21122146
21132147// Function: _tear_down_topology
@@ -2121,8 +2155,9 @@ inline void Executor::_tear_down_topology(Worker& worker, Topology* tpg) {
21212155 if (!tpg->_exception_ptr && !tpg->cancelled () && !tpg->_pred ()) {
21222156 // assert(tpg->_join_counter == 0);
21232157 std::lock_guard<std::mutex> lock (f._mutex );
2158+ auto & g = tpg->_taskflow ._graph ;
21242159 tpg->_join_counter .store (tpg->_num_sources , std::memory_order_relaxed);
2125- _schedule (worker, tpg-> _taskflow . _graph . _nodes , tpg->_num_sources );
2160+ _schedule (worker, g. _nodes . begin (), g. _nodes . begin () + tpg->_num_sources );
21262161 }
21272162 // case 2: the final run of this topology
21282163 else {
@@ -2182,30 +2217,41 @@ inline void Subflow::join() {
21822217
21832218 // assert(this_worker().worker == &_worker);
21842219
2185- if (!_joinable ) {
2186- TF_THROW (" subflow not joinable " );
2220+ if (!joinable () ) {
2221+ TF_THROW (" subflow already joined or detached " );
21872222 }
21882223
2189- // only the parent worker can join the subflow
2190- _executor._corun_graph (_worker, _parent, _graph);
2191-
2192- // if any exception is caught from subflow tasks, rethrow it
2193- _parent->_process_exception ();
2194-
2195- _joinable = false ;
2224+ if (auto sbeg = _graph._nodes .begin () + _tag; sbeg != _graph._nodes .end ()) {
2225+ auto send = _executor._set_up_graph (
2226+ sbeg, _graph._nodes .end (), _parent, _parent->_topology , 0
2227+ );
2228+ _parent->_join_counter .fetch_add (send-sbeg, std::memory_order_relaxed);
2229+ _executor._schedule (_worker, sbeg, send);
2230+ _executor._corun_until (_worker, [p=_parent] () -> bool {
2231+ return p->_join_counter .load (std::memory_order_acquire) == 0 ; }
2232+ );
2233+ }
2234+
2235+ _tag |= JOINED_BIT;
21962236}
21972237
21982238inline void Subflow::detach () {
21992239
22002240 // assert(this_worker().worker == &_worker);
22012241
2202- if (!_joinable ) {
2242+ if (!joinable () ) {
22032243 TF_THROW (" subflow already joined or detached" );
22042244 }
22052245
2206- // only the parent worker can detach the subflow
2207- _executor._detach_subflow_task (_worker, _parent, _graph);
2208- _joinable = false ;
2246+ if (auto sbeg = _graph._nodes .begin () + _tag; sbeg != _graph._nodes .end ()) {
2247+ auto send = _executor._set_up_graph (
2248+ sbeg, _graph._nodes .end (), nullptr , _parent->_topology , Node::DETACHED
2249+ );
2250+ _parent->_topology ->_join_counter .fetch_add (send - sbeg, std::memory_order_relaxed);
2251+ _executor._schedule (_worker, sbeg, send);
2252+ }
2253+
2254+ _tag |= JOINED_BIT;
22092255}
22102256
22112257// ############################################################################
0 commit comments