@@ -310,117 +310,87 @@ std::shared_future<void> BasicTaskflow<E>::run_n(Framework& f, size_t repeat, C&
310310 auto &tpg = _topologies.emplace_front (f, repeat);
311311 f._topologies .push_back (&tpg);
312312
313- // PV 1/31 (twhuang): Lambda in C++17 is by default inline - no need for static
314- static const auto setup_topology = [](auto & f, auto & tpg) {
315-
313+ const auto setup_topology = [](auto & f, auto & tpg) {
314+ tpg._num_sinks = 0 ;
316315 for (auto & n: f._graph ) {
317-
318- // PR 1/31 (twhuang): I don't think we need check the emptiness
319- // of the successors over here
320- // Also, when do you clean up dynamic tasking nodes?
321- //
322- if (!n._successors .empty ()) {
323- for (size_t i=0 ; i<n._successors .size (); i++) {
324- if (n._successors [i] == f._last_target ) {
325- std::swap (n._successors [i], n._successors .back ());
326- n._successors .pop_back ();
327- break ;
328- }
329- }
330- }
331-
332316 // reset the target links
333317 n._topology = &tpg;
334318 if (n.num_dependents () == 0 ) {
335319 tpg._sources .push_back (&n);
336320 }
337321 if (n.num_successors () == 0 ) {
338- n. precede ( tpg._target );
322+ tpg._num_sinks ++;
339323 }
340324 }
341325 };
342326
343- // PV 1/31 (twhuang): single worker - we need to remove topologies?
344327
345328 // Iterative execution to avoid stack overflow
346329 if (num_workers () == 0 ) {
347330 // Clear last execution data & Build precedence between nodes and target
348331 setup_topology (f, tpg);
349332
350- tpg._target ._work = std::forward<C>(c);
351-
352- // PR 1/31 (twhuang): redundant tgt_predecessors
353- const int tgt_predecessor = tpg._target ._predecessors .size ();
354-
333+ const int tgt_predecessor = tpg._num_sinks ;
355334 for (size_t i=0 ; i<repeat; i++) {
356-
357335 _schedule (tpg._sources );
358-
359- // PR 1/31 (twhuang): why do we need to set the dependents again?
360- // Reset target
361- f._topologies .front ()->_target ._predecessors .resize (tgt_predecessor);
362- f._topologies .front ()->_target ._dependents = tgt_predecessor;
336+ f._topologies .front ()->_num_sinks = tgt_predecessor;
363337 }
364338
365- f._last_target = &tpg._target ;
366- tpg._promise .set_value ();
339+ std::invoke (c);
340+ auto &p = f._topologies .front ()->_promise ;
341+ f._topologies .pop_front ();
342+ p.set_value ();
367343 }
368344 else {
369345 // case 1: the previous execution is still running
370346 if (f._topologies .size () > 1 ) {
371- tpg._target . _work = std::forward<C>(c);
347+ tpg._work = std::forward<C>(c);
372348 }
373349 // case 2: this epoch should run
374350 else {
375351 setup_topology (f, tpg);
376352
377353 // Set up target node's work
378- tpg._target . _work = [&f, c=std::function<void ()>{std::forward<C>(c)},
379- tgt_predecessor = tpg._target . _predecessors . size ( ), this ]() mutable {
354+ tpg._work = [&f, c=std::function<void ()>{std::forward<C>(c)},
355+ tgt_predecessor = tpg._num_sinks . load (std::memory_order_relaxed ), this ]() mutable {
380356
381- std::invoke (c);
382-
383357 // PV 1/31 (twhuang): thread safety?
384358 // case 1: we still need to run the topology again
385359 if (--f._topologies .front ()->_repeat != 0 ) {
386-
387- // Reset target
388- f._topologies .front ()->_target ._predecessors .resize (tgt_predecessor);
389- f._topologies .front ()->_target ._dependents = tgt_predecessor;
390-
360+ f._topologies .front ()->_num_sinks = tgt_predecessor;
391361 _schedule (f._topologies .front ()->_sources );
392362 }
393363 // case 2: the final run of this topology
394364 // notice that there can be another new run request before we acquire the lock
395365 else {
366+ std::invoke (c);
367+
396368 f._mtx .lock ();
397-
369+
398370 // If there is another run
399371 if (f._topologies .size () > 1 ) {
400372
401373 // Set the promise
402374 f._topologies .front ()->_promise .set_value ();
403375
404376 auto next_tpg = std::next (f._topologies .begin ());
405- c = std::move (std::get<StaticWork>((*next_tpg)->_target ._work ));
377+ // c = std::move(std::get<StaticWork>((*next_tpg)->_target._work));
378+ c = std::move ((*next_tpg)->_work );
406379
407380 f._topologies .front ()->_repeat = (*next_tpg)->_repeat ;
408381 f._topologies .front ()->_promise = std::move ((*next_tpg)->_promise );
409382 f._topologies .erase (next_tpg);
410383
411384 f._mtx .unlock ();
412-
413- // Reset target
414- f._topologies .front ()->_target ._predecessors .resize (tgt_predecessor);
415- f._topologies .front ()->_target ._dependents = tgt_predecessor;
385+ // The graph should be exactly the same as previous dispatch
386+ f._topologies .front ()->_num_sinks = tgt_predecessor;
416387
417388 _schedule (f._topologies .front ()->_sources );
418389 }
419390 else {
420391 // Need to back up the promise first here becuz framework might be
421392 // destroy before taskflow leaves
422393 auto &p = f._topologies .front ()->_promise ;
423- f._last_target = &(f._topologies .front ()->_target );
424394 f._topologies .pop_front ();
425395 f._mtx .unlock ();
426396
@@ -461,11 +431,8 @@ void BasicTaskflow<E>::Closure::operator () () const {
461431 // subflow node type
462432 else {
463433
464- // PV 1/31 (twhuang): emplace is enough
465- //
466434 // Clear the subgraph before the task execution
467435 if (!node->_spawned ) {
468- node->_subgraph .reset ();
469436 node->_subgraph .emplace ();
470437 }
471438
@@ -483,7 +450,12 @@ void BasicTaskflow<E>::Closure::operator () () const {
483450 n->_topology = node->_topology ;
484451 n->_subtask = true ;
485452 if (n->num_successors () == 0 ) {
486- n->precede (fb.detached () ? node->_topology ->_target : *node);
453+ if (fb.detached ()) {
454+ node->_topology ->_num_sinks ++;
455+ }
456+ else {
457+ n->precede (*node);
458+ }
487459 }
488460 if (n->num_dependents () == 0 ) {
489461 src.emplace_back (&(*n));
@@ -506,9 +478,13 @@ void BasicTaskflow<E>::Closure::operator () () const {
506478 // Recover the runtime change due to dynamic tasking except the target & spawn tasks
507479 // This must be done before scheduling the successors, otherwise this might cause
508480 // race condition on the _dependents
509- if (num_successors && !node->_subtask ) {
510- while (!node->_predecessors .empty () && node->_predecessors .back ()->_subtask ) {
511- node->_predecessors .pop_back ();
481+ // if(num_successors && !node->_subtask) {
482+ if (!node->_subtask ) {
483+ // Only dynamic tasking needs to restore _predecessors
484+ if (node->_work .index () == 1 && !node->_subgraph ->empty ()) {
485+ while (!node->_predecessors .empty () && node->_predecessors .back ()->_subtask ) {
486+ node->_predecessors .pop_back ();
487+ }
512488 }
513489 node->_dependents = node->_predecessors .size ();
514490 node->_spawned = false ;
@@ -520,6 +496,21 @@ void BasicTaskflow<E>::Closure::operator () () const {
520496 taskflow->_schedule (*(node->_successors [i]));
521497 }
522498 }
499+
500+ // A node without any successor should check the termination of topology
501+ if (num_successors == 0 ) {
502+ if (--(node->_topology ->_num_sinks ) == 0 ) {
503+
504+ // This is the last executing node
505+ bool is_framework = node->_topology ->_handle .index () == 1 ;
506+ if (node->_topology ->_work != nullptr ) {
507+ std::invoke (node->_topology ->_work );
508+ }
509+ if (!is_framework) {
510+ node->_topology ->_promise .set_value ();
511+ }
512+ }
513+ }
523514}
524515
525516// ============================================================================
0 commit comments