Skip to content

Commit 92d3d18

Browse files
author
TSUNG-WEI HUANG
committed
refactored subflow
1 parent 0400086 commit 92d3d18

7 files changed

Lines changed: 185 additions & 131 deletions

File tree

examples/subflow.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ int main(int argc, char* argv[]) {
7979
B.precede(D); // D runs after B
8080
C.precede(D); // D runs after C
8181

82-
executor.run(taskflow).get(); // block until finished
82+
executor.run_n(taskflow, 3).get(); // block until finished
8383

8484
// examine the graph
8585
taskflow.dump(std::cout);

taskflow/core/executor.hpp

Lines changed: 129 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1075,8 +1075,6 @@ class Executor {
10751075
void _explore_task(Worker&, Node*&);
10761076
void _schedule(Worker&, Node*);
10771077
void _schedule(Node*);
1078-
void _schedule(Worker&, const std::vector<Node*>&, size_t);
1079-
void _schedule(const std::vector<Node*>&, size_t);
10801078
void _set_up_topology(Worker*, Topology*);
10811079
void _tear_down_topology(Worker&, Topology*);
10821080
void _tear_down_async(Node*);
@@ -1086,7 +1084,6 @@ class Executor {
10861084
void _decrement_topology();
10871085
void _invoke(Worker&, Node*);
10881086
void _invoke_static_task(Worker&, Node*);
1089-
void _detach_subflow_task(Worker&, Node*, Graph&);
10901087
void _invoke_condition_task(Worker&, Node*, SmallVector<int>&);
10911088
void _invoke_multi_condition_task(Worker&, Node*, SmallVector<int>&);
10921089
void _invoke_async_task(Worker&, Node*);
@@ -1099,11 +1096,17 @@ class Executor {
10991096
bool _invoke_subflow_task(Worker&, Node*);
11001097
bool _invoke_module_task(Worker&, Node*);
11011098

1102-
size_t _set_up_graph(Graph&, Node*, Topology*, int);
1099+
template <typename I>
1100+
I _set_up_graph(I first, I second, Node*, Topology*, int);
11031101

11041102
template <typename P>
11051103
void _corun_until(Worker&, P&&);
1104+
1105+
template <typename I>
1106+
void _schedule(Worker&, I first, I last);
11061107

1108+
template <typename I>
1109+
void _schedule(I first, I last);
11071110
};
11081111

11091112
#ifndef DOXYGEN_GENERATING_OUTPUT
@@ -1435,45 +1438,48 @@ inline void Executor::_schedule(Node* node) {
14351438
}
14361439

14371440
// Procedure: _schedule
1438-
inline void Executor::_schedule(Worker& worker, const std::vector<Node*>& nodes, size_t num_nodes) {
1441+
template <typename I>
1442+
void Executor::_schedule(Worker& worker, I first, I last) {
14391443

14401444
//assert(nodes.size() >= num_nodes);
14411445

1442-
if(num_nodes == 0) {
1446+
if(first == last) {
14431447
return;
14441448
}
14451449

14461450
// caller is a worker to this pool - starting at v3.5 we do not use
14471451
// any complicated notification mechanism as the experimental result
14481452
// has shown no significant advantage.
14491453
if(worker._executor == this) {
1450-
for(size_t i=0; i<num_nodes; ++i) {
1451-
worker._wsq.push(nodes[i], [&](){ _freelist.push(worker._id, nodes[i]); });
1454+
for(; first != last; ++first) {
1455+
worker._wsq.push(*first, [&](){ _freelist.push(worker._id, *first); });
14521456
_notifier.notify_one();
14531457
}
14541458
return;
14551459
}
14561460

1457-
for(size_t k=0; k<num_nodes; ++k) {
1458-
_freelist.push(nodes[k]);
1461+
size_t n = 0;
1462+
for(; first != last; ++first, ++n) {
1463+
_freelist.push(*first);
14591464
}
1460-
_notifier.notify_n(num_nodes);
1465+
_notifier.notify_n(n);
14611466
}
14621467

14631468
// Procedure: _schedule
1464-
inline void Executor::_schedule(const std::vector<Node*>& nodes, size_t num_nodes) {
1469+
template <typename I>
1470+
inline void Executor::_schedule(I first, I last) {
14651471

14661472
//assert(nodes.size() >= num_nodes);
14671473

1468-
if(num_nodes == 0) {
1474+
if(first == last) {
14691475
return;
14701476
}
1471-
1472-
for(size_t k=0; k<num_nodes; ++k) {
1473-
_freelist.push(nodes[k]);
1477+
1478+
size_t n = 0;
1479+
for(; first != last; ++first, ++n) {
1480+
_freelist.push(*first);
14741481
}
1475-
1476-
_notifier.notify_n(num_nodes);
1482+
_notifier.notify_n(n);
14771483
}
14781484

14791485
// Procedure: _invoke
@@ -1720,24 +1726,30 @@ inline bool Executor::_invoke_subflow_task(Worker& w, Node* node) {
17201726
//TF_EXECUTOR_EXCEPTION_HANDLER(w, node, {
17211727

17221728
if((node->_state.load(std::memory_order_relaxed) & Node::PREEMPTED) == 0) {
1723-
auto handle = std::get_if<Node::Subflow>(&node->_handle);
1724-
handle->subgraph._clear();
1725-
Subflow sf(*this, w, node, handle->subgraph);
17261729

1730+
auto& h = *std::get_if<Node::Subflow>(&node->_handle);
1731+
auto& g = h.subgraph;
1732+
1733+
// set up the subflow
1734+
Subflow sf(*this, w, node, g);
1735+
1736+
// invoke the subflow callable
17271737
_observer_prologue(w, node);
1728-
handle->work(sf);
1738+
h.work(sf);
17291739
_observer_epilogue(w, node);
17301740

17311741
// spawn the subflow if it is joinable and its graph is non-empty
1732-
if(sf._joinable && handle->subgraph.size()) {
1742+
// implicit join is faster than Subflow::join as it does not involve corun
1743+
if(sf.joinable() && g._nodes.size() > sf._tag) {
17331744

17341745
// signal the executor to preempt this node
17351746
node->_state.fetch_or(Node::PREEMPTED, std::memory_order_relaxed);
17361747

17371748
// set up and schedule the graph
1738-
auto N = _set_up_graph(handle->subgraph, node, node->_topology, 0);
1739-
node->_join_counter.fetch_add(N, std::memory_order_relaxed);
1740-
_schedule(w, handle->subgraph._nodes, N);
1749+
auto sbeg = g._nodes.begin() + sf._tag;
1750+
auto send = _set_up_graph(sbeg, g._nodes.end(), node, node->_topology, 0);
1751+
node->_join_counter.fetch_add(send - sbeg, std::memory_order_relaxed);
1752+
_schedule(w, sbeg, send);
17411753
return true;
17421754
}
17431755
//node->_process_exception();
@@ -1749,30 +1761,6 @@ inline bool Executor::_invoke_subflow_task(Worker& w, Node* node) {
17491761
return false;
17501762
}
17511763

1752-
// Procedure: _detach_subflow_task
1753-
inline void Executor::_detach_subflow_task(Worker& w, Node* p, Graph& g) {
1754-
1755-
// graph is empty and has no async tasks
1756-
if(g.empty() && p->_join_counter.load(std::memory_order_acquire) == 0) {
1757-
return;
1758-
}
1759-
1760-
auto N = _set_up_graph(g, nullptr, p->_topology, Node::DETACHED);
1761-
1762-
// need to fetch the node to a local vector since the graph will be
1763-
// merged to the parent taskflow with move semantics
1764-
std::vector<Node*> local(N);
1765-
std::copy(g._nodes.begin(), g._nodes.begin() + N, local.begin());
1766-
1767-
{
1768-
std::lock_guard<std::mutex> lock(p->_topology->_taskflow._mutex);
1769-
p->_topology->_taskflow._graph._merge(std::move(g));
1770-
}
1771-
1772-
p->_topology->_join_counter.fetch_add(N, std::memory_order_relaxed);
1773-
_schedule(w, local, N);
1774-
}
1775-
17761764
// Procedure: _corun_graph
17771765
inline void Executor::_corun_graph(Worker& w, Node* p, Graph& g) {
17781766

@@ -1783,9 +1771,9 @@ inline void Executor::_corun_graph(Worker& w, Node* p, Graph& g) {
17831771
return;
17841772
}
17851773

1786-
auto N = _set_up_graph(g, p, p->_topology, 0);
1787-
p->_join_counter.fetch_add(N, std::memory_order_relaxed);
1788-
_schedule(w, g._nodes, N);
1774+
auto send = _set_up_graph(g._nodes.begin(), g._nodes.end(), p, p->_topology, 0);
1775+
p->_join_counter.fetch_add(send - g._nodes.begin(), std::memory_order_relaxed);
1776+
_schedule(w, g._nodes.begin(), send);
17891777

17901778
_corun_until(w, [p] () -> bool {
17911779
return p->_join_counter.load(std::memory_order_acquire) == 0; }
@@ -1848,9 +1836,9 @@ inline bool Executor::_invoke_module_task(Worker& w, Node* node) {
18481836
// signal the executor to preempt this node
18491837
node->_state.fetch_or(Node::PREEMPTED, std::memory_order_relaxed);
18501838

1851-
auto N = _set_up_graph(m->graph, node, node->_topology, 0);
1852-
node->_join_counter.fetch_add(N, std::memory_order_relaxed);
1853-
_schedule(w, m->graph._nodes, N);
1839+
auto send = _set_up_graph(m->graph._nodes.begin(), m->graph._nodes.end(), node, node->_topology, 0);
1840+
node->_join_counter.fetch_add(send - m->graph._nodes.begin(), std::memory_order_relaxed);
1841+
_schedule(w, m->graph._nodes.begin(), send);
18541842
return true;
18551843
}
18561844
// second entry - already spawned
@@ -2080,34 +2068,80 @@ inline void Executor::wait_for_all() {
20802068
}
20812069

20822070
// Function: _set_up_topology
2083-
inline void Executor::_set_up_topology(Worker* worker, Topology* tpg) {
2071+
inline void Executor::_set_up_topology(Worker* w, Topology* tpg) {
20842072

20852073
// ---- under taskflow lock ----
2086-
tpg->_taskflow._graph._clear_detached();
2087-
tpg->_num_sources = _set_up_graph(tpg->_taskflow._graph, nullptr, tpg, 0);
2074+
auto& g = tpg->_taskflow._graph;
2075+
//g._clear_detached();
2076+
2077+
auto send = _set_up_graph(g._nodes.begin(), g._nodes.end(), nullptr, tpg, 0);
2078+
tpg->_num_sources = send - g._nodes.begin();
20882079
tpg->_join_counter.store(tpg->_num_sources, std::memory_order_relaxed);
20892080

2090-
worker ? _schedule(*worker, tpg->_taskflow._graph._nodes, tpg->_num_sources) :
2091-
_schedule( tpg->_taskflow._graph._nodes, tpg->_num_sources);
2081+
w ? _schedule(*w, g._nodes.begin(), send) : _schedule(g._nodes.begin(), send);
20922082
}
20932083

20942084
// Function: _set_up_graph
2095-
inline size_t Executor::_set_up_graph(
2096-
Graph& g, Node* parent, Topology* tpg, int state
2097-
) {
2098-
size_t num_sources = 0;
2099-
for(size_t i=0; i<g._nodes.size(); i++) {
2100-
auto node = g._nodes[i];
2085+
//inline size_t Executor::_set_up_graph(
2086+
// Graph& g, Node* parent, Topology* tpg, int state
2087+
//) {
2088+
// size_t num_sources = 0;
2089+
// for(size_t i=0; i<g._nodes.size(); i++) {
2090+
// auto node = g._nodes[i];
2091+
// node->_topology = tpg;
2092+
// node->_parent = parent;
2093+
// node->_state.store(state, std::memory_order_relaxed);
2094+
// node->_set_up_join_counter();
2095+
// node->_exception_ptr = nullptr;
2096+
// if(node->num_dependents() == 0) {
2097+
// std::swap(g._nodes[num_sources++], g._nodes[i]);
2098+
// }
2099+
// }
2100+
// return num_sources;
2101+
//}
2102+
2103+
// Function: _set_up_graph
2104+
template <typename I>
2105+
I Executor::_set_up_graph(I first, I last, Node* parent, Topology* tpg, int state) {
2106+
2107+
auto send = first;
2108+
for(; first != last; ++first) {
2109+
auto node = *first;
21012110
node->_topology = tpg;
21022111
node->_parent = parent;
21032112
node->_state.store(state, std::memory_order_relaxed);
21042113
node->_set_up_join_counter();
21052114
node->_exception_ptr = nullptr;
21062115
if(node->num_dependents() == 0) {
2107-
std::swap(g._nodes[num_sources++], g._nodes[i]);
2116+
std::iter_swap(send++, first);
2117+
}
2118+
2119+
// handle-specific clear
2120+
switch(node->_handle.index()) {
2121+
case Node::SUBFLOW: {
2122+
std::get_if<Node::Subflow>(&node->_handle)->subgraph.clear();
2123+
} break;
2124+
2125+
default:
2126+
break;
21082127
}
21092128
}
2110-
return num_sources;
2129+
return send;
2130+
2131+
2132+
//size_t num_sources = 0;
2133+
//for(size_t i=0; i<g._nodes.size(); i++) {
2134+
// auto node = g._nodes[i];
2135+
// node->_topology = tpg;
2136+
// node->_parent = parent;
2137+
// node->_state.store(state, std::memory_order_relaxed);
2138+
// node->_set_up_join_counter();
2139+
// node->_exception_ptr = nullptr;
2140+
// if(node->num_dependents() == 0) {
2141+
// std::swap(g._nodes[num_sources++], g._nodes[i]);
2142+
// }
2143+
//}
2144+
//return num_sources;
21112145
}
21122146

21132147
// Function: _tear_down_topology
@@ -2121,8 +2155,9 @@ inline void Executor::_tear_down_topology(Worker& worker, Topology* tpg) {
21212155
if(!tpg->_exception_ptr && !tpg->cancelled() && !tpg->_pred()) {
21222156
//assert(tpg->_join_counter == 0);
21232157
std::lock_guard<std::mutex> lock(f._mutex);
2158+
auto& g = tpg->_taskflow._graph;
21242159
tpg->_join_counter.store(tpg->_num_sources, std::memory_order_relaxed);
2125-
_schedule(worker, tpg->_taskflow._graph._nodes, tpg->_num_sources);
2160+
_schedule(worker, g._nodes.begin(), g._nodes.begin() + tpg->_num_sources);
21262161
}
21272162
// case 2: the final run of this topology
21282163
else {
@@ -2182,30 +2217,41 @@ inline void Subflow::join() {
21822217

21832218
// assert(this_worker().worker == &_worker);
21842219

2185-
if(!_joinable) {
2186-
TF_THROW("subflow not joinable");
2220+
if(!joinable()) {
2221+
TF_THROW("subflow already joined or detached");
21872222
}
21882223

2189-
// only the parent worker can join the subflow
2190-
_executor._corun_graph(_worker, _parent, _graph);
2191-
2192-
// if any exception is caught from subflow tasks, rethrow it
2193-
_parent->_process_exception();
2194-
2195-
_joinable = false;
2224+
if(auto sbeg = _graph._nodes.begin() + _tag; sbeg != _graph._nodes.end()) {
2225+
auto send = _executor._set_up_graph(
2226+
sbeg, _graph._nodes.end(), _parent, _parent->_topology, 0
2227+
);
2228+
_parent->_join_counter.fetch_add(send-sbeg, std::memory_order_relaxed);
2229+
_executor._schedule(_worker, sbeg, send);
2230+
_executor._corun_until(_worker, [p=_parent] () -> bool {
2231+
return p->_join_counter.load(std::memory_order_acquire) == 0; }
2232+
);
2233+
}
2234+
2235+
_tag |= JOINED_BIT;
21962236
}
21972237

21982238
inline void Subflow::detach() {
21992239

22002240
// assert(this_worker().worker == &_worker);
22012241

2202-
if(!_joinable) {
2242+
if(!joinable()) {
22032243
TF_THROW("subflow already joined or detached");
22042244
}
22052245

2206-
// only the parent worker can detach the subflow
2207-
_executor._detach_subflow_task(_worker, _parent, _graph);
2208-
_joinable = false;
2246+
if(auto sbeg = _graph._nodes.begin() + _tag; sbeg != _graph._nodes.end()) {
2247+
auto send = _executor._set_up_graph(
2248+
sbeg, _graph._nodes.end(), nullptr, _parent->_topology, Node::DETACHED
2249+
);
2250+
_parent->_topology->_join_counter.fetch_add(send - sbeg, std::memory_order_relaxed);
2251+
_executor._schedule(_worker, sbeg, send);
2252+
}
2253+
2254+
_tag |= JOINED_BIT;
22092255
}
22102256

22112257
// ############################################################################

0 commit comments

Comments
 (0)