Skip to content

Commit 344afc0

Browse files
committed
Remove target node from Topology
1 parent 90a09ed commit 344afc0

5 files changed

Lines changed: 63 additions & 83 deletions

File tree

example/framework.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ int main(){
5151
std::cout << std::endl;
5252

5353
std::cout << "Execute the framework 4 times with a callback\n";
54-
tf.run_n(f, 4, [i=0] () mutable { std::cout << "-> run #" << ++i << " finished\n"; }).get();
54+
tf.run_n(f, 4, [] () mutable { std::cout << "The framework finishes\n"; }).get();
5555
std::cout << std::endl;
5656

5757
std::cout << "Silently run the framework\n";

example/framework_dynamic_tasking.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,16 @@ void tf_traversal(std::vector<Node*>& src, Node nodes[], size_t num_nodes) {
8282

8383
tf::Taskflow tf(4);
8484
tf::Framework framework;
85-
for(size_t i=0; i<src.size(); i++) {
86-
framework.silent_emplace([i=i, &src](auto& subflow){ traverse(src[i], subflow); });
87-
}
88-
tf.silent_run_n(framework, 100, [&, iteration=0]() mutable {
85+
// Add a target to verify the traversal and reset nodes in each iteration
86+
auto target = framework.silent_emplace([&](){
8987
validate(nodes, num_nodes);
9088
reset(nodes, num_nodes);
9189
});
90+
for(size_t i=0; i<src.size(); i++) {
91+
framework.silent_emplace([i=i, &src](auto& subflow){ traverse(src[i], subflow); }).precede(target);
92+
}
93+
tf.silent_run_n(framework, 100);
94+
9295
tf.wait_for_all(); // block until finished
9396

9497
auto end = std::chrono::system_clock::now();

taskflow/graph/basic_taskflow.hpp

Lines changed: 48 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -310,117 +310,87 @@ std::shared_future<void> BasicTaskflow<E>::run_n(Framework& f, size_t repeat, C&
310310
auto &tpg = _topologies.emplace_front(f, repeat);
311311
f._topologies.push_back(&tpg);
312312

313-
// PV 1/31 (twhuang): Lambda in C++17 is by default inline - no need for static
314-
static const auto setup_topology = [](auto& f, auto& tpg) {
315-
313+
const auto setup_topology = [](auto& f, auto& tpg) {
314+
tpg._num_sinks = 0;
316315
for(auto& n: f._graph) {
317-
318-
// PR 1/31 (twhuang): I don't think we need check the emptiness
319-
// of the successors over here
320-
// Also, when do you clean up dynamic tasking nodes?
321-
//
322-
if(!n._successors.empty()) {
323-
for(size_t i=0; i<n._successors.size(); i++) {
324-
if(n._successors[i] == f._last_target) {
325-
std::swap(n._successors[i], n._successors.back());
326-
n._successors.pop_back();
327-
break;
328-
}
329-
}
330-
}
331-
332316
// reset the target links
333317
n._topology = &tpg;
334318
if(n.num_dependents() == 0) {
335319
tpg._sources.push_back(&n);
336320
}
337321
if(n.num_successors() == 0) {
338-
n.precede(tpg._target);
322+
tpg._num_sinks ++;
339323
}
340324
}
341325
};
342326

343-
// PV 1/31 (twhuang): single worker - we need to remove topologies?
344327

345328
// Iterative execution to avoid stack overflow
346329
if(num_workers() == 0) {
347330
// Clear last execution data & Build precedence between nodes and target
348331
setup_topology(f, tpg);
349332

350-
tpg._target._work = std::forward<C>(c);
351-
352-
// PR 1/31 (twhuang): redundant tgt_predecessors
353-
const int tgt_predecessor = tpg._target._predecessors.size();
354-
333+
const int tgt_predecessor = tpg._num_sinks;
355334
for(size_t i=0; i<repeat; i++) {
356-
357335
_schedule(tpg._sources);
358-
359-
// PR 1/31 (twhuang): why do we need to set the dependents again?
360-
// Reset target
361-
f._topologies.front()->_target._predecessors.resize(tgt_predecessor);
362-
f._topologies.front()->_target._dependents = tgt_predecessor;
336+
f._topologies.front()->_num_sinks = tgt_predecessor;
363337
}
364338

365-
f._last_target = &tpg._target;
366-
tpg._promise.set_value();
339+
std::invoke(c);
340+
auto &p = f._topologies.front()->_promise;
341+
f._topologies.pop_front();
342+
p.set_value();
367343
}
368344
else {
369345
// case 1: the previous execution is still running
370346
if(f._topologies.size() > 1) {
371-
tpg._target._work = std::forward<C>(c);
347+
tpg._work = std::forward<C>(c);
372348
}
373349
// case 2: this epoch should run
374350
else {
375351
setup_topology(f, tpg);
376352

377353
//Set up target node's work
378-
tpg._target._work = [&f, c=std::function<void()>{std::forward<C>(c)},
379-
tgt_predecessor = tpg._target._predecessors.size(), this]() mutable {
354+
tpg._work = [&f, c=std::function<void()>{std::forward<C>(c)},
355+
tgt_predecessor = tpg._num_sinks.load(std::memory_order_relaxed), this]() mutable {
380356

381-
std::invoke(c);
382-
383357
// PV 1/31 (twhuang): thread safety?
384358
// case 1: we still need to run the topology again
385359
if(--f._topologies.front()->_repeat != 0) {
386-
387-
// Reset target
388-
f._topologies.front()->_target._predecessors.resize(tgt_predecessor);
389-
f._topologies.front()->_target._dependents = tgt_predecessor;
390-
360+
f._topologies.front()->_num_sinks = tgt_predecessor;
391361
_schedule(f._topologies.front()->_sources);
392362
}
393363
// case 2: the final run of this topology
394364
// notice that there can be another new run request before we acquire the lock
395365
else {
366+
std::invoke(c);
367+
396368
f._mtx.lock();
397-
369+
398370
// If there is another run
399371
if(f._topologies.size() > 1) {
400372

401373
// Set the promise
402374
f._topologies.front()->_promise.set_value();
403375

404376
auto next_tpg = std::next(f._topologies.begin());
405-
c = std::move(std::get<StaticWork>((*next_tpg)->_target._work));
377+
//c = std::move(std::get<StaticWork>((*next_tpg)->_target._work));
378+
c = std::move((*next_tpg)->_work);
406379

407380
f._topologies.front()->_repeat = (*next_tpg)->_repeat;
408381
f._topologies.front()->_promise = std::move((*next_tpg)->_promise);
409382
f._topologies.erase(next_tpg);
410383

411384
f._mtx.unlock();
412-
413-
// Reset target
414-
f._topologies.front()->_target._predecessors.resize(tgt_predecessor);
415-
f._topologies.front()->_target._dependents = tgt_predecessor;
385+
// The graph should be exactly the same as previous dispatch
386+
f._topologies.front()->_num_sinks = tgt_predecessor;
416387

417388
_schedule(f._topologies.front()->_sources);
418389
}
419390
else {
420391
// Need to back up the promise first here becuz framework might be
421392
// destroy before taskflow leaves
422393
auto &p = f._topologies.front()->_promise;
423-
f._last_target = &(f._topologies.front()->_target);
424394
f._topologies.pop_front();
425395
f._mtx.unlock();
426396

@@ -461,11 +431,8 @@ void BasicTaskflow<E>::Closure::operator () () const {
461431
// subflow node type
462432
else {
463433

464-
// PV 1/31 (twhuang): emplace is enough
465-
//
466434
// Clear the subgraph before the task execution
467435
if(!node->_spawned) {
468-
node->_subgraph.reset();
469436
node->_subgraph.emplace();
470437
}
471438

@@ -483,7 +450,12 @@ void BasicTaskflow<E>::Closure::operator () () const {
483450
n->_topology = node->_topology;
484451
n->_subtask = true;
485452
if(n->num_successors() == 0) {
486-
n->precede(fb.detached() ? node->_topology->_target : *node);
453+
if(fb.detached()) {
454+
node->_topology->_num_sinks ++;
455+
}
456+
else {
457+
n->precede(*node);
458+
}
487459
}
488460
if(n->num_dependents() == 0) {
489461
src.emplace_back(&(*n));
@@ -506,9 +478,13 @@ void BasicTaskflow<E>::Closure::operator () () const {
506478
// Recover the runtime change due to dynamic tasking except the target & spawn tasks
507479
// This must be done before scheduling the successors, otherwise this might cause
508480
// race condition on the _dependents
509-
if(num_successors && !node->_subtask) {
510-
while(!node->_predecessors.empty() && node->_predecessors.back()->_subtask) {
511-
node->_predecessors.pop_back();
481+
//if(num_successors && !node->_subtask) {
482+
if(!node->_subtask) {
483+
// Only dynamic tasking needs to restore _predecessors
484+
if(node->_work.index() == 1 && !node->_subgraph->empty()) {
485+
while(!node->_predecessors.empty() && node->_predecessors.back()->_subtask) {
486+
node->_predecessors.pop_back();
487+
}
512488
}
513489
node->_dependents = node->_predecessors.size();
514490
node->_spawned = false;
@@ -520,6 +496,21 @@ void BasicTaskflow<E>::Closure::operator () () const {
520496
taskflow->_schedule(*(node->_successors[i]));
521497
}
522498
}
499+
500+
// A node without any successor should check the termination of topology
501+
if(num_successors == 0) {
502+
if(--(node->_topology->_num_sinks) == 0) {
503+
504+
// This is the last executing node
505+
bool is_framework = node->_topology->_handle.index() == 1;
506+
if(node->_topology->_work != nullptr) {
507+
std::invoke(node->_topology->_work);
508+
}
509+
if(!is_framework) {
510+
node->_topology->_promise.set_value();
511+
}
512+
}
513+
}
523514
}
524515

525516
// ============================================================================

taskflow/graph/framework.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ class Framework : public FlowBuilder {
3838

3939
std::mutex _mtx;
4040
std::list<Topology*> _topologies;
41-
Node* _last_target {nullptr};
4241
};
4342

4443
// Constructor

taskflow/graph/topology.hpp

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,14 @@ class Topology {
3030
std::variant<Graph, Framework*> _handle;
3131

3232
std::promise <void> _promise;
33-
size_t _repeat;
33+
size_t _repeat {0};
3434

3535
std::shared_future<void> _future;
3636

3737
std::vector<Node*> _sources;
3838

39-
Node _target;
39+
std::atomic<int> _num_sinks {0};
40+
std::function<void()> _work {nullptr};
4041
};
4142

4243

@@ -51,15 +52,9 @@ inline Topology::Topology(Framework& f, size_t repeat): _handle(&f), _repeat(rep
5152
// Constructor
5253
inline Topology::Topology(Graph&& t) :
5354
_handle(std::move(t)) {
54-
55-
_target._topology = this;
5655

5756
_future = _promise.get_future().share();
5857

59-
_target._work = [this] () mutable {
60-
this->_promise.set_value();
61-
};
62-
6358
// Build the super source and super target.
6459
for(auto& node : std::get<Graph>(_handle)) {
6560

@@ -70,7 +65,7 @@ inline Topology::Topology(Graph&& t) :
7065
}
7166

7267
if(node.num_successors() == 0) {
73-
node.precede(_target);
68+
_num_sinks ++;
7469
}
7570
}
7671
}
@@ -81,14 +76,9 @@ template <typename C>
8176
inline Topology::Topology(Graph&& t, C&& c) :
8277
_handle(std::move(t)) {
8378

84-
_target._topology = this;
85-
8679
_future = _promise.get_future().share();
8780

88-
_target._work = [this, c{std::forward<C>(c)}] () mutable {
89-
this->_promise.set_value();
90-
c();
91-
};
81+
_work = std::forward<C>(c);
9282

9383
// Build the super source and super target.
9484
for(auto& node : std::get<Graph>(_handle)) {
@@ -100,18 +90,15 @@ inline Topology::Topology(Graph&& t, C&& c) :
10090
}
10191

10292
if(node.num_successors() == 0) {
103-
node.precede(_target);
93+
_num_sinks ++;
10494
}
10595
}
10696
}
10797

10898
// Procedure: dump
10999
inline void Topology::dump(std::ostream& os) const {
110-
111-
assert(!(_target._subgraph));
112100

113-
os << "digraph Topology {\n"
114-
<< _target.dump();
101+
os << "digraph Topology {\n";
115102

116103
std::visit(Functors{
117104
[&] (const Graph& graph) {

0 commit comments

Comments
 (0)