Cpp-Taskflow  2.2.0
executor.hpp
1 // 2019/08/03 - modified by Tsung-Wei Huang
2 // - made executor thread-safe
3 //
4 // 2019/07/26 - modified by Chun-Xun Lin
5 // - Combine explore_task & wait_for_task
6 // - Remove CAS operations
7 // - Update _num_thieves after pre_wait
8 // - TODO: Check will underutilization happen?
9 // - TODO: Find out uppper bound (does cycle exist?)
10 // - TODO: Does performance drop due to the busy looping after pre_wait?
11 //
12 // 2019/07/25 - modified by Tsung-Wei Huang & Chun-Xun Lin
13 // - fixed the potential underutilization
14 // - use CAS in both last thief & active worker to make the notification less aggressive
15 //
16 // 2019/06/18 - modified by Tsung-Wei Huang
17 // - fixed the cache to enable continuity
18 // - TODO: do we need a special optimization for 0 workers?
19 //
20 // 2019/06/11 - modified by Tsung-Wei Huang
21 // - fixed the bug in calling observer while the user
22 // may clear the data
23 // - added object pool for nodes
24 //
25 // 2019/05/17 - modified by Chun-Xun Lin
26 // - moved topology to taskflow
27 //
28 // 2019/05/14 - modified by Tsung-Wei Huang
29 // - isolated the executor from the taskflow
30 //
31 // 2019/04/09 - modified by Tsung-Wei Huang
32 // - removed silent_dispatch method
33 //
34 // 2019/03/12 - modified by Chun-Xun Lin
35 // - added taskflow
36 //
37 // 2019/02/11 - modified by Tsung-Wei Huang
38 // - refactored run_until
39 // - added allocator to topologies
40 // - changed to list for topologies
41 //
42 // 2019/02/10 - modified by Chun-Xun Lin
43 // - added run_n to execute taskflow
44 // - finished first peer-review with TW
45 //
46 // 2018/07 - 2019/02/09 - missing logs
47 //
48 // 2018/06/30 - created by Tsung-Wei Huang
49 // - added BasicTaskflow template
50 
51 // TODO items:
52 // 1. come up with a better way to remove the "joined" links
53 // during the execution of a static node (1st layer)
54 //
55 
56 #pragma once
57 
58 #include <iostream>
59 #include <vector>
60 #include <cstdlib>
61 #include <cstdio>
62 #include <random>
63 #include <atomic>
64 #include <memory>
65 #include <deque>
66 #include <optional>
67 #include <thread>
68 #include <algorithm>
69 #include <set>
70 #include <numeric>
71 #include <cassert>
72 
73 #include "spmc_queue.hpp"
74 #include "notifier.hpp"
75 #include "observer.hpp"
76 #include "taskflow.hpp"
77 
78 namespace tf {
79 
88 class Executor {
89 
90  struct Worker {
91  std::mt19937 rdgen { std::random_device{}() };
93  std::optional<Node*> cache;
94  };
95 
96  struct PerThread {
97  Executor* pool {nullptr};
98  int worker_id {-1};
99  };
100 
101  public:
102 
106  explicit Executor(unsigned n = std::thread::hardware_concurrency());
107 
111  ~Executor();
112 
120  std::future<void> run(Taskflow& taskflow);
121 
130  template<typename C>
131  std::future<void> run(Taskflow& taskflow, C&& callable);
132 
141  std::future<void> run_n(Taskflow& taskflow, size_t N);
142 
152  template<typename C>
153  std::future<void> run_n(Taskflow& taskflow, size_t N, C&& callable);
154 
164  template<typename P>
165  std::future<void> run_until(Taskflow& taskflow, P&& pred);
166 
177  template<typename P, typename C>
178  std::future<void> run_until(Taskflow& taskflow, P&& pred, C&& callable);
179 
183  void wait_for_all();
184 
190  size_t num_workers() const;
191 
205  template<typename Observer, typename... Args>
206  Observer* make_observer(Args&&... args);
207 
211  void remove_observer();
212 
213  private:
214 
215  std::condition_variable _topology_cv;
216  std::mutex _topology_mutex;
217  std::mutex _queue_mutex;
218 
219  unsigned _num_topologies {0};
220 
221  // scheduler field
222  std::vector<Worker> _workers;
224  std::vector<std::thread> _threads;
225 
227 
228  std::atomic<size_t> _num_actives {0};
229  std::atomic<size_t> _num_thieves {0};
230  std::atomic<bool> _done {0};
231 
232  Notifier _notifier;
233 
235 
236  unsigned _find_victim(unsigned);
237 
238  PerThread& _per_thread() const;
239 
240  bool _wait_for_task(unsigned, std::optional<Node*>&);
241 
242  void _spawn(unsigned);
243  void _exploit_task(unsigned, std::optional<Node*>&);
244  void _explore_task(unsigned, std::optional<Node*>&);
245  void _schedule(Node*, bool);
246  void _schedule(PassiveVector<Node*>&);
247  void _schedule_unsync(Node*, std::stack<Node*>&) const;
248  void _schedule_unsync(PassiveVector<Node*>&, std::stack<Node*>&) const;
249  void _invoke(unsigned, Node*);
250  void _invoke_unsync(Node*, std::stack<Node*>&) const;
251  void _invoke_static_work(unsigned, Node*);
252  void _invoke_dynamic_work(unsigned, Node*, Subflow&);
253  void _init_module_node(Node*);
254  void _init_module_node_unsync(Node*, std::stack<Node*>&) const;
255  void _tear_down_topology(Topology*);
256  void _increment_topology();
257  void _decrement_topology();
258  void _decrement_topology_and_notify();
259 };
260 
261 // Constructor
262 inline Executor::Executor(unsigned N) :
263  _workers {N},
264  _waiters {N},
265  _notifier {_waiters} {
266  _spawn(N);
267 }
268 
269 // Destructor
271 
272  // wait for all topologies to complete
273  wait_for_all();
274 
275  // shut down the scheduler
276  _done = true;
277  _notifier.notify(true);
278 
279  for(auto& t : _threads){
280  t.join();
281  }
282 }
283 
284 // Function: num_workers
285 inline size_t Executor::num_workers() const {
286  return _workers.size();
287 }
288 
289 // Function: _per_thread
290 inline Executor::PerThread& Executor::_per_thread() const {
291  thread_local PerThread pt;
292  return pt;
293 }
294 
295 // Procedure: _spawn
296 inline void Executor::_spawn(unsigned N) {
297 
298  // Lock to synchronize all workers before creating _worker_maps
299  for(unsigned i=0; i<N; ++i) {
300  _threads.emplace_back([this, i] () -> void {
301 
302  PerThread& pt = _per_thread();
303  pt.pool = this;
304  pt.worker_id = i;
305 
306  std::optional<Node*> t;
307 
308  // must use 1 as condition instead of !done
309  while(1) {
310 
311  // execute the tasks.
312  _exploit_task(i, t);
313 
314  // wait for tasks
315  if(_wait_for_task(i, t) == false) {
316  break;
317  }
318  }
319 
320  });
321  }
322 }
323 
324 // Function: _find_victim
325 inline unsigned Executor::_find_victim(unsigned thief) {
326 
327  /*unsigned l = 0;
328  unsigned r = _workers.size() - 1;
329  unsigned vtm = std::uniform_int_distribution<unsigned>{l, r}(
330  _workers[thief].rdgen
331  );
332 
333  // try to look for a task from other workers
334  for(unsigned i=0; i<_workers.size(); ++i){
335 
336  if((thief == vtm && !_queue.empty()) ||
337  (thief != vtm && !_workers[vtm].queue.empty())) {
338  return vtm;
339  }
340 
341  if(++vtm; vtm == _workers.size()) {
342  vtm = 0;
343  }
344  } */
345 
346  // try to look for a task from other workers
347  for(unsigned vtm=0; vtm<_workers.size(); ++vtm){
348  if((thief == vtm && !_queue.empty()) ||
349  (thief != vtm && !_workers[vtm].queue.empty())) {
350  return vtm;
351  }
352  }
353 
354  return _workers.size();
355 }
356 
357 // Function: _explore_task
358 inline void Executor::_explore_task(unsigned thief, std::optional<Node*>& t) {
359 
360  //assert(_workers[thief].queue.empty());
361  assert(!t);
362 
363  const unsigned l = 0;
364  const unsigned r = _workers.size() - 1;
365 
366  const size_t F = (_workers.size() + 1) << 1;
367  const size_t Y = 100;
368 
369  size_t f = 0;
370  size_t y = 0;
371 
372  // explore
373  while(!_done) {
374 
375  unsigned vtm = std::uniform_int_distribution<unsigned>{l, r}(
376  _workers[thief].rdgen
377  );
378 
379  t = (vtm == thief) ? _queue.steal() : _workers[vtm].queue.steal();
380 
381  if(t) {
382  break;
383  }
384 
385  if(f++ > F) {
386  if(std::this_thread::yield(); y++ > Y) {
387  break;
388  }
389  }
390 
391  /*if(auto vtm = _find_victim(thief); vtm != _workers.size()) {
392  t = (vtm == thief) ? _queue.steal() : _workers[vtm].queue.steal();
393  // successful thief
394  if(t) {
395  break;
396  }
397  }
398  else {
399  if(f++ > F) {
400  if(std::this_thread::yield(); y++ > Y) {
401  break;
402  }
403  }
404  }*/
405  }
406 
407 }
408 
409 // Procedure: _exploit_task
410 inline void Executor::_exploit_task(unsigned i, std::optional<Node*>& t) {
411 
412  assert(!_workers[i].cache);
413 
414  if(t) {
415  auto& worker = _workers[i];
416  if(_num_actives.fetch_add(1) == 0 && _num_thieves == 0) {
417  _notifier.notify(false);
418  }
419  do {
420  _invoke(i, *t);
421 
422  if(worker.cache) {
423  t = *worker.cache;
424  worker.cache = std::nullopt;
425  }
426  else {
427  t = worker.queue.pop();
428  }
429 
430  } while(t);
431 
432  --_num_actives;
433  }
434 }
435 
436 // Function: _wait_for_task
437 inline bool Executor::_wait_for_task(unsigned me, std::optional<Node*>& t) {
438 
439  wait_for_task:
440 
441  assert(!t);
442 
443  ++_num_thieves;
444 
445  explore_task:
446 
447  if(_explore_task(me, t); t) {
448  if(auto N = _num_thieves.fetch_sub(1); N == 1) {
449  _notifier.notify(false);
450  }
451  return true;
452  }
453 
454  _notifier.prepare_wait(&_waiters[me]);
455 
456  //if(auto vtm = _find_victim(me); vtm != _workers.size()) {
457  if(!_queue.empty()) {
458 
459  _notifier.cancel_wait(&_waiters[me]);
460  //t = (vtm == me) ? _queue.steal() : _workers[vtm].queue.steal();
461 
462  if(t = _queue.steal(); t) {
463  if(auto N = _num_thieves.fetch_sub(1); N == 1) {
464  _notifier.notify(false);
465  }
466  return true;
467  }
468  else {
469  goto explore_task;
470  }
471  }
472 
473  if(_done) {
474  _notifier.cancel_wait(&_waiters[me]);
475  _notifier.notify(true);
476  --_num_thieves;
477  return false;
478  }
479 
480  if(_num_thieves.fetch_sub(1) == 1 && _num_actives) {
481  _notifier.cancel_wait(&_waiters[me]);
482  goto wait_for_task;
483  }
484 
485  // Now I really need to relinguish my self to others
486  _notifier.commit_wait(&_waiters[me]);
487 
488  return true;
489 }
490 
491 // Function: make_observer
492 template<typename Observer, typename... Args>
493 Observer* Executor::make_observer(Args&&... args) {
494  // use a local variable to mimic the constructor
495  auto tmp = std::make_unique<Observer>(std::forward<Args>(args)...);
496  tmp->set_up(_workers.size());
497  _observer = std::move(tmp);
498  return static_cast<Observer*>(_observer.get());
499 }
500 
501 // Procedure: remove_observer
503  _observer.reset();
504 }
505 
506 // Procedure: _schedule_unsync
507 inline void Executor::_schedule_unsync(
508  Node* node,
509  std::stack<Node*>& stack
510 ) const {
511 
512  // module node need another initialization
513  if(node->_module != nullptr && !node->_module->empty() && !node->is_spawned()) {
514  _init_module_node_unsync(node, stack);
515  }
516 
517  stack.push(node);
518 }
519 
520 // Procedure: _schedule_unsync
521 inline void Executor::_schedule_unsync(
522  PassiveVector<Node*>& nodes,
523  std::stack<Node*>& stack
524 ) const {
525 
526  // here we guarantee to run by a thread so no need to cache the
527  // size from nodes
528  for(auto node : nodes) {
529  if(node->_module != nullptr && !node->_module->empty() && !node->is_spawned()) {
530  _init_module_node_unsync(node, stack);
531  }
532  stack.push(node);
533  }
534 }
535 
536 // Procedure: _schedule
537 // The main procedure to schedule a give task node.
538 // Each task node has two types of tasks - regular and subflow.
539 inline void Executor::_schedule(Node* node, bool bypass) {
540 
541  assert(_workers.size() != 0);
542 
543  // module node need another initialization
544  if(node->_module != nullptr && !node->_module->empty() && !node->is_spawned()) {
545  _init_module_node(node);
546  }
547 
548  // caller is a worker to this pool
549  if(auto& pt = _per_thread(); pt.pool == this) {
550  if(!bypass) {
551  _workers[pt.worker_id].queue.push(node);
552  }
553  else {
554  assert(!_workers[pt.worker_id].cache);
555  _workers[pt.worker_id].cache = node;
556  }
557  return;
558  }
559 
560  // other threads
561  {
562  std::scoped_lock lock(_queue_mutex);
563  _queue.push(node);
564  }
565 
566  _notifier.notify(false);
567 }
568 
569 // Procedure: _schedule
570 // The main procedure to schedule a set of task nodes.
571 // Each task node has two types of tasks - regular and subflow.
572 inline void Executor::_schedule(PassiveVector<Node*>& nodes) {
573 
574  assert(_workers.size() != 0);
575 
576  // We need to cacth the node count to avoid accessing the nodes
577  // vector while the parent topology is removed!
578  const auto num_nodes = nodes.size();
579 
580  if(num_nodes == 0) {
581  return;
582  }
583 
584  for(auto node : nodes) {
585  if(node->_module != nullptr && !node->_module->empty() && !node->is_spawned()) {
586  _init_module_node(node);
587  }
588  }
589 
590  // worker thread
591  if(auto& pt = _per_thread(); pt.pool == this) {
592  for(size_t i=0; i<num_nodes; ++i) {
593  _workers[pt.worker_id].queue.push(nodes[i]);
594  }
595  return;
596  }
597 
598  // other threads
599  {
600  std::scoped_lock lock(_queue_mutex);
601  for(size_t k=0; k<num_nodes; ++k) {
602  _queue.push(nodes[k]);
603  }
604  }
605 
606  _notifier.notify(false);
607 }
608 
609 // Procedure: _init_module_node
610 inline void Executor::_init_module_node(Node* node) {
611 
612  node->_work = [node=node, this, tgt{PassiveVector<Node*>()}] () mutable {
613 
614  // second time to enter this context
615  if(node->is_spawned()) {
616  node->_dependents.resize(node->_dependents.size()-tgt.size());
617  for(auto& t: tgt) {
618  t->_successors.clear();
619  }
620  return ;
621  }
622 
623  // first time to enter this context
624  node->set_spawned();
625 
626  PassiveVector<Node*> src;
627 
628  for(auto& n: node->_module->_graph.nodes()) {
629  n->_topology = node->_topology;
630  if(n->num_dependents() == 0) {
631  src.push_back(n.get());
632  }
633  if(n->num_successors() == 0) {
634  n->precede(*node);
635  tgt.push_back(n.get());
636  }
637  }
638 
639  _schedule(src);
640  };
641 }
642 
643 // Procedure: _init_module_node_unsync
644 inline void Executor::_init_module_node_unsync(
645  Node* node,
646  std::stack<Node*>& stack
647 ) const {
648 
649  node->_work = [this, node=node, &stack, tgt{PassiveVector<Node*>()}] () mutable {
650 
651  // second time to enter this context
652  if(node->is_spawned()) {
653  node->_dependents.resize(node->_dependents.size()-tgt.size());
654  for(auto& t: tgt) {
655  t->_successors.clear();
656  }
657  return ;
658  }
659 
660  // first time to enter this context
661  node->set_spawned();
662 
663  PassiveVector<Node*> src;
664 
665  for(auto& n: node->_module->_graph.nodes()) {
666  n->_topology = node->_topology;
667  if(n->num_dependents() == 0) {
668  src.push_back(n.get());
669  }
670  if(n->num_successors() == 0) {
671  n->precede(*node);
672  tgt.push_back(n.get());
673  }
674  }
675 
676  _schedule_unsync(src, stack);
677  };
678 }
679 
680 // Procedure: _invoke
681 inline void Executor::_invoke(unsigned me, Node* node) {
682 
683  assert(_workers.size() != 0);
684 
685  // Here we need to fetch the num_successors first to avoid the invalid memory
686  // access caused by topology clear.
687  const auto num_successors = node->num_successors();
688 
689  // static task
690  // The default node work type. We only need to execute the callback if any.
691  if(auto index=node->_work.index(); index == 1) {
692  if(node->_module != nullptr) {
693  bool first_time = !node->is_spawned();
694  _invoke_static_work(me, node);
695  if(first_time) {
696  return ;
697  }
698  }
699  else {
700  _invoke_static_work(me, node);
701  }
702  }
703  // dynamic task
704  else if (index == 2){
705 
706  // Clear the subgraph before the task execution
707  if(!node->is_spawned()) {
708  if(node->_subgraph) {
709  node->_subgraph->clear();
710  }
711  else {
712  node->_subgraph.emplace();
713  }
714  }
715 
716  Subflow fb(*(node->_subgraph));
717 
718  _invoke_dynamic_work(me, node, fb);
719 
720  // Need to create a subflow if first time & subgraph is not empty
721  if(!node->is_spawned()) {
722  node->set_spawned();
723  if(!node->_subgraph->empty()) {
724  // For storing the source nodes
725  PassiveVector<Node*> src;
726  for(auto& n: node->_subgraph->nodes()) {
727  n->_topology = node->_topology;
728  n->set_subtask();
729  if(n->num_successors() == 0) {
730  if(fb.detached()) {
731  node->_topology->_num_sinks++;
732  }
733  else {
734  n->precede(*node);
735  }
736  }
737  if(n->num_dependents() == 0) {
738  src.push_back(n.get());
739  }
740  }
741 
742  _schedule(src);
743 
744  if(fb.joined()) {
745  return;
746  }
747  }
748  }
749  } // End of DynamicWork -----------------------------------------------------
750 
751  // Recover the runtime change due to dynamic tasking except the target & spawn tasks
752  // This must be done before scheduling the successors, otherwise this might cause
753  // race condition on the _dependents
754  //if(num_successors && !node->_subtask) {
755  if(!node->is_subtask()) {
756  // Only dynamic tasking needs to restore _dependents
757  // TODO:
758  if(node->_work.index() == 2 && !node->_subgraph->empty()) {
759  while(!node->_dependents.empty() && node->_dependents.back()->is_subtask()) {
760  node->_dependents.pop_back();
761  }
762  }
763  node->_num_dependents = static_cast<int>(node->_dependents.size());
764  node->unset_spawned();
765  }
766 
767  // At this point, the node storage might be destructed.
768  Node* cache {nullptr};
769 
770  for(size_t i=0; i<num_successors; ++i) {
771  if(--(node->_successors[i]->_num_dependents) == 0) {
772  if(cache) {
773  _schedule(cache, false);
774  }
775  cache = node->_successors[i];
776  }
777  }
778 
779  if(cache) {
780  _schedule(cache, true);
781  }
782 
783  // A node without any successor should check the termination of topology
784  if(num_successors == 0) {
785  if(--(node->_topology->_num_sinks) == 0) {
786  _tear_down_topology(node->_topology);
787  }
788  }
789 }
790 
791 // Procedure: _invoke_static_work
792 inline void Executor::_invoke_static_work(unsigned me, Node* node) {
793  if(_observer) {
794  _observer->on_entry(me, TaskView(node));
795  std::invoke(std::get<Node::StaticWork>(node->_work));
796  _observer->on_exit(me, TaskView(node));
797  }
798  else {
799  std::invoke(std::get<Node::StaticWork>(node->_work));
800  }
801 }
802 
803 // Procedure: _invoke_dynamic_work
804 inline void Executor::_invoke_dynamic_work(unsigned me, Node* node, Subflow& sf) {
805  if(_observer) {
806  _observer->on_entry(me, TaskView(node));
807  std::invoke(std::get<Node::DynamicWork>(node->_work), sf);
808  _observer->on_exit(me, TaskView(node));
809  }
810  else {
811  std::invoke(std::get<Node::DynamicWork>(node->_work), sf);
812  }
813 }
814 
815 // Procedure: _invoke_unsync
816 inline void Executor::_invoke_unsync(Node* node, std::stack<Node*>& stack) const {
817 
818  const auto num_successors = node->num_successors();
819 
820  // static task
821  // The default node work type. We only need to execute the callback if any.
822  if(auto index=node->_work.index(); index == 1) {
823  if(node->_module != nullptr) {
824  bool first_time = !node->is_spawned();
825  std::invoke(std::get<Node::StaticWork>(node->_work));
826  if(first_time) {
827  return ;
828  }
829  }
830  else {
831  std::invoke(std::get<Node::StaticWork>(node->_work));
832  }
833  }
834  // dynamic task
835  else if (index == 2){
836 
837  // Clear the subgraph before the task execution
838  if(!node->is_spawned()) {
839  if(node->_subgraph) {
840  node->_subgraph->clear();
841  }
842  else {
843  node->_subgraph.emplace();
844  }
845  }
846 
847  Subflow fb(*(node->_subgraph));
848 
849  std::invoke(std::get<Node::DynamicWork>(node->_work), fb);
850 
851  // Need to create a subflow if first time & subgraph is not empty
852  if(!node->is_spawned()) {
853  node->set_spawned();
854  if(!node->_subgraph->empty()) {
855  // For storing the source nodes
856  PassiveVector<Node*> src;
857  for(auto& n: node->_subgraph->nodes()) {
858  n->_topology = node->_topology;
859  n->set_subtask();
860  if(n->num_successors() == 0) {
861  if(fb.detached()) {
862  node->_topology->_num_sinks++;
863  }
864  else {
865  n->precede(*node);
866  }
867  }
868  if(n->num_dependents() == 0) {
869  src.push_back(n.get());
870  }
871  }
872 
873  _schedule_unsync(src, stack);
874 
875  if(fb.joined()) {
876  return;
877  }
878  }
879  }
880  } // End of DynamicWork -----------------------------------------------------
881 
882  // Recover the runtime change due to dynamic tasking except the target & spawn tasks
883  // This must be done before scheduling the successors, otherwise this might cause
884  // race condition on the _dependents
885  //if(num_successors && !node->_subtask) {
886  if(!node->is_subtask()) {
887  // Only dynamic tasking needs to restore _dependents
888  // TODO:
889  if(node->_work.index() == 2 && !node->_subgraph->empty()) {
890  while(!node->_dependents.empty() && node->_dependents.back()->is_subtask()) {
891  node->_dependents.pop_back();
892  }
893  }
894  node->_num_dependents = static_cast<int>(node->_dependents.size());
895  node->unset_spawned();
896  }
897 
898  // At this point, the node storage might be destructed.
899  for(size_t i=0; i<num_successors; ++i) {
900  if(--(node->_successors[i]->_num_dependents) == 0) {
901  _schedule_unsync(node->_successors[i], stack);
902  }
903  }
904 
905  // A node without any successor should check the termination of topology
906  if(num_successors == 0) {
907  --(node->_topology->_num_sinks);
908  }
909 }
910 
911 // Function: run
913  return run_n(f, 1, [](){});
914 }
915 
916 // Function: run
917 template <typename C>
919  static_assert(std::is_invocable<C>::value);
920  return run_n(f, 1, std::forward<C>(c));
921 }
922 
923 // Function: run_n
924 inline std::future<void> Executor::run_n(Taskflow& f, size_t repeat) {
925  return run_n(f, repeat, [](){});
926 }
927 
928 // Function: run_n
929 template <typename C>
930 std::future<void> Executor::run_n(Taskflow& f, size_t repeat, C&& c) {
931  return run_until(f, [repeat]() mutable { return repeat-- == 0; }, std::forward<C>(c));
932 }
933 
934 // Function: run_until
935 template<typename P>
937  return run_until(f, std::forward<P>(pred), [](){});
938 }
939 
940 // Function: _tear_down_topology
941 inline void Executor::_tear_down_topology(Topology* tpg) {
942 
943  auto &f = tpg->_taskflow;
944 
945  //assert(&tpg == &(f._topologies.front()));
946 
947  // case 1: we still need to run the topology again
948  if(!std::invoke(tpg->_pred)) {
949  tpg->_recover_num_sinks();
950  _schedule(tpg->_sources);
951  }
952  // case 2: the final run of this topology
953  else {
954 
955  if(tpg->_call != nullptr) {
956  std::invoke(tpg->_call);
957  }
958 
959  f._mtx.lock();
960 
961  // If there is another run (interleave between lock)
962  if(f._topologies.size() > 1) {
963 
964  // Set the promise
965  tpg->_promise.set_value();
966  f._topologies.pop_front();
967  f._mtx.unlock();
968 
969  // decrement the topology but since this is not the last we don't notify
970  _decrement_topology();
971 
972  f._topologies.front()._bind(f._graph);
973  _schedule(f._topologies.front()._sources);
974  }
975  else {
976  assert(f._topologies.size() == 1);
977 
978  // Need to back up the promise first here becuz taskflow might be
979  // destroy before taskflow leaves
980  auto p {std::move(tpg->_promise)};
981 
982  f._topologies.pop_front();
983 
984  f._mtx.unlock();
985 
986  // We set the promise in the end in case taskflow leaves before taskflow
987  p.set_value();
988 
989  _decrement_topology_and_notify();
990  }
991  }
992 }
993 
994 // Function: run_until
995 template <typename P, typename C>
997 
998  // Predicate must return a boolean value
999  static_assert(std::is_invocable_v<C> && std::is_invocable_v<P>);
1000 
1001  _increment_topology();
1002 
1003  // Special case of predicate
1004  if(std::invoke(pred)) {
1005  std::promise<void> promise;
1006  promise.set_value();
1007  _decrement_topology_and_notify();
1008  return promise.get_future();
1009  }
1010 
1011  // Special case of zero workers requires:
1012  // - iterative execution to avoid stack overflow
1013  // - avoid execution of last_work
1014  if(_workers.size() == 0 || f.empty()) {
1015 
1016  Topology tpg(f, std::forward<P>(pred), std::forward<C>(c));
1017 
1018  // Clear last execution data & Build precedence between nodes and target
1019  tpg._bind(f._graph);
1020 
1021  std::stack<Node*> stack;
1022 
1023  do {
1024  _schedule_unsync(tpg._sources, stack);
1025  while(!stack.empty()) {
1026  auto node = stack.top();
1027  stack.pop();
1028  _invoke_unsync(node, stack);
1029  }
1030  tpg._recover_num_sinks();
1031  } while(!std::invoke(tpg._pred));
1032 
1033  if(tpg._call != nullptr) {
1034  std::invoke(tpg._call);
1035  }
1036 
1037  tpg._promise.set_value();
1038 
1039  _decrement_topology_and_notify();
1040 
1041  return tpg._promise.get_future();
1042  }
1043 
1044  // Multi-threaded execution.
1045  bool run_now {false};
1046  Topology* tpg;
1047  std::future<void> future;
1048 
1049  {
1050  std::scoped_lock lock(f._mtx);
1051 
1052  // create a topology for this run
1053  tpg = &(f._topologies.emplace_back(f, std::forward<P>(pred), std::forward<C>(c)));
1054  future = tpg->_promise.get_future();
1055 
1056  if(f._topologies.size() == 1) {
1057  run_now = true;
1058  //tpg->_bind(f._graph);
1059  //_schedule(tpg->_sources);
1060  }
1061  }
1062 
1063  // Notice here calling schedule may cause the topology to be removed sonner
1064  // before the function leaves.
1065  if(run_now) {
1066  tpg->_bind(f._graph);
1067  _schedule(tpg->_sources);
1068  }
1069 
1070  return future;
1071 }
1072 
1073 // Procedure: _increment_topology
1074 inline void Executor::_increment_topology() {
1075  std::scoped_lock lock(_topology_mutex);
1076  ++_num_topologies;
1077 }
1078 
1079 // Procedure: _decrement_topology_and_notify
1080 inline void Executor::_decrement_topology_and_notify() {
1081  std::scoped_lock lock(_topology_mutex);
1082  if(--_num_topologies == 0) {
1083  _topology_cv.notify_all();
1084  }
1085 }
1086 
1087 // Procedure: _decrement_topology
1088 inline void Executor::_decrement_topology() {
1089  std::scoped_lock lock(_topology_mutex);
1090  --_num_topologies;
1091 }
1092 
1093 // Procedure: wait_for_all
1094 inline void Executor::wait_for_all() {
1095  std::unique_lock lock(_topology_mutex);
1096  _topology_cv.wait(lock, [&](){ return _num_topologies == 0; });
1097 }
1098 
1099 } // end of namespace tf -----------------------------------------------------
1100 
1101 
std::future< void > run(Taskflow &taskflow)
runs the taskflow once
Definition: executor.hpp:912
void remove_observer()
removes the associated observer
Definition: executor.hpp:502
bool empty() const noexcept
queries if the queue is empty at the time of this call
Definition: spmc_queue.hpp:172
std::future< void > run_until(Taskflow &taskflow, P &&pred)
runs the taskflow multiple times until the predicate becomes true and then invokes a callback ...
Definition: executor.hpp:936
~Executor()
destructs the executor
Definition: executor.hpp:270
T yield(T... args)
void push(O &&item)
inserts an item to the queue
Definition: spmc_queue.hpp:189
Definition: taskflow.hpp:5
T hardware_concurrency(T... args)
bool detached() const
queries if the subflow will be detached from its parent task
Definition: flow_builder.hpp:869
Observer * make_observer(Args &&... args)
constructs an observer to inspect the activities of worker threads
Definition: executor.hpp:493
the class to create a task dependency graph
Definition: core/taskflow.hpp:15
A constant wrapper class to a task node, mainly used in the tf::ExecutorObserver interface.
Definition: task.hpp:370
bool empty() const
queries the emptiness of the taskflow
Definition: core/taskflow.hpp:122
bool joined() const
queries if the subflow will join its parent task
Definition: flow_builder.hpp:874
Lock-free unbounded single-producer multiple-consumer queue.
Definition: spmc_queue.hpp:29
The executor class to run a taskflow graph.
Definition: executor.hpp:88
size_t num_workers() const
queries the number of worker threads (can be zero)
Definition: executor.hpp:285
std::optional< T > steal()
steals an item from the queue
Definition: spmc_queue.hpp:239
Executor(unsigned n=std::thread::hardware_concurrency())
constructs the executor with N worker threads
Definition: executor.hpp:262
The building blocks of dynamic tasking.
Definition: flow_builder.hpp:817
std::future< void > run_n(Taskflow &taskflow, size_t N)
runs the taskflow for N times
Definition: executor.hpp:924
void wait_for_all()
wait for all pending graphs to complete
Definition: executor.hpp:1094