73 #include "spmc_queue.hpp" 74 #include "notifier.hpp" 75 #include "observer.hpp" 76 #include "taskflow.hpp" 93 std::optional<Node*> cache;
177 template<
typename P,
typename C>
205 template<
typename Observer,
typename... Args>
219 unsigned _num_topologies {0};
236 unsigned _find_victim(
unsigned);
238 PerThread& _per_thread()
const;
240 bool _wait_for_task(
unsigned, std::optional<Node*>&);
242 void _spawn(
unsigned);
243 void _exploit_task(
unsigned, std::optional<Node*>&);
244 void _explore_task(
unsigned, std::optional<Node*>&);
245 void _schedule(Node*,
bool);
246 void _schedule(PassiveVector<Node*>&);
249 void _invoke(
unsigned, Node*);
251 void _invoke_static_work(
unsigned, Node*);
252 void _invoke_dynamic_work(
unsigned, Node*,
Subflow&);
253 void _init_module_node(Node*);
255 void _tear_down_topology(Topology*);
256 void _increment_topology();
257 void _decrement_topology();
258 void _decrement_topology_and_notify();
265 _notifier {_waiters} {
277 _notifier.notify(
true);
279 for(
auto& t : _threads){
286 return _workers.size();
290 inline Executor::PerThread& Executor::_per_thread()
const {
291 thread_local PerThread pt;
296 inline void Executor::_spawn(
unsigned N) {
299 for(
unsigned i=0; i<N; ++i) {
300 _threads.emplace_back([
this, i] () ->
void {
302 PerThread& pt = _per_thread();
306 std::optional<Node*> t;
315 if(_wait_for_task(i, t) ==
false) {
325 inline unsigned Executor::_find_victim(
unsigned thief) {
347 for(
unsigned vtm=0; vtm<_workers.size(); ++vtm){
348 if((thief == vtm && !_queue.
empty()) ||
349 (thief != vtm && !_workers[vtm].queue.empty())) {
354 return _workers.size();
358 inline void Executor::_explore_task(
unsigned thief, std::optional<Node*>& t) {
363 const unsigned l = 0;
364 const unsigned r = _workers.size() - 1;
366 const size_t F = (_workers.size() + 1) << 1;
367 const size_t Y = 100;
376 _workers[thief].rdgen
379 t = (vtm == thief) ? _queue.
steal() : _workers[vtm].queue.steal();
410 inline void Executor::_exploit_task(
unsigned i, std::optional<Node*>& t) {
412 assert(!_workers[i].cache);
415 auto& worker = _workers[i];
416 if(_num_actives.fetch_add(1) == 0 && _num_thieves == 0) {
417 _notifier.notify(
false);
424 worker.cache = std::nullopt;
427 t = worker.queue.pop();
437 inline bool Executor::_wait_for_task(
unsigned me, std::optional<Node*>& t) {
447 if(_explore_task(me, t); t) {
448 if(
auto N = _num_thieves.fetch_sub(1); N == 1) {
449 _notifier.notify(
false);
454 _notifier.prepare_wait(&_waiters[me]);
457 if(!_queue.
empty()) {
459 _notifier.cancel_wait(&_waiters[me]);
462 if(t = _queue.
steal(); t) {
463 if(
auto N = _num_thieves.fetch_sub(1); N == 1) {
464 _notifier.notify(
false);
474 _notifier.cancel_wait(&_waiters[me]);
475 _notifier.notify(
true);
480 if(_num_thieves.fetch_sub(1) == 1 && _num_actives) {
481 _notifier.cancel_wait(&_waiters[me]);
486 _notifier.commit_wait(&_waiters[me]);
492 template<
typename Observer,
typename... Args>
495 auto tmp = std::make_unique<Observer>(std::forward<Args>(args)...);
496 tmp->set_up(_workers.size());
497 _observer = std::move(tmp);
498 return static_cast<Observer*
>(_observer.get());
507 inline void Executor::_schedule_unsync(
513 if(node->_module !=
nullptr && !node->_module->empty() && !node->is_spawned()) {
514 _init_module_node_unsync(node, stack);
521 inline void Executor::_schedule_unsync(
522 PassiveVector<Node*>& nodes,
528 for(
auto node : nodes) {
529 if(node->_module !=
nullptr && !node->_module->empty() && !node->is_spawned()) {
530 _init_module_node_unsync(node, stack);
539 inline void Executor::_schedule(Node* node,
bool bypass) {
541 assert(_workers.size() != 0);
544 if(node->_module !=
nullptr && !node->_module->empty() && !node->is_spawned()) {
545 _init_module_node(node);
549 if(
auto& pt = _per_thread(); pt.pool ==
this) {
551 _workers[pt.worker_id].queue.push(node);
554 assert(!_workers[pt.worker_id].cache);
555 _workers[pt.worker_id].cache = node;
562 std::scoped_lock lock(_queue_mutex);
566 _notifier.notify(
false);
572 inline void Executor::_schedule(PassiveVector<Node*>& nodes) {
574 assert(_workers.size() != 0);
578 const auto num_nodes = nodes.size();
584 for(
auto node : nodes) {
585 if(node->_module !=
nullptr && !node->_module->empty() && !node->is_spawned()) {
586 _init_module_node(node);
591 if(
auto& pt = _per_thread(); pt.pool ==
this) {
592 for(
size_t i=0; i<num_nodes; ++i) {
593 _workers[pt.worker_id].queue.push(nodes[i]);
600 std::scoped_lock lock(_queue_mutex);
601 for(
size_t k=0; k<num_nodes; ++k) {
602 _queue.
push(nodes[k]);
606 _notifier.notify(
false);
610 inline void Executor::_init_module_node(Node* node) {
612 node->_work = [node=node,
this, tgt{PassiveVector<Node*>()}] ()
mutable {
615 if(node->is_spawned()) {
616 node->_dependents.resize(node->_dependents.size()-tgt.size());
618 t->_successors.clear();
626 PassiveVector<Node*> src;
628 for(
auto& n: node->_module->_graph.nodes()) {
629 n->_topology = node->_topology;
630 if(n->num_dependents() == 0) {
631 src.push_back(n.get());
633 if(n->num_successors() == 0) {
635 tgt.push_back(n.get());
644 inline void Executor::_init_module_node_unsync(
649 node->_work = [
this, node=node, &stack, tgt{PassiveVector<Node*>()}] ()
mutable {
652 if(node->is_spawned()) {
653 node->_dependents.resize(node->_dependents.size()-tgt.size());
655 t->_successors.clear();
663 PassiveVector<Node*> src;
665 for(
auto& n: node->_module->_graph.nodes()) {
666 n->_topology = node->_topology;
667 if(n->num_dependents() == 0) {
668 src.push_back(n.get());
670 if(n->num_successors() == 0) {
672 tgt.push_back(n.get());
676 _schedule_unsync(src, stack);
681 inline void Executor::_invoke(
unsigned me, Node* node) {
683 assert(_workers.size() != 0);
687 const auto num_successors = node->num_successors();
691 if(
auto index=node->_work.index(); index == 1) {
692 if(node->_module !=
nullptr) {
693 bool first_time = !node->is_spawned();
694 _invoke_static_work(me, node);
700 _invoke_static_work(me, node);
704 else if (index == 2){
707 if(!node->is_spawned()) {
708 if(node->_subgraph) {
709 node->_subgraph->clear();
712 node->_subgraph.emplace();
716 Subflow fb(*(node->_subgraph));
718 _invoke_dynamic_work(me, node, fb);
721 if(!node->is_spawned()) {
723 if(!node->_subgraph->empty()) {
725 PassiveVector<Node*> src;
726 for(
auto& n: node->_subgraph->nodes()) {
727 n->_topology = node->_topology;
729 if(n->num_successors() == 0) {
731 node->_topology->_num_sinks++;
737 if(n->num_dependents() == 0) {
738 src.push_back(n.get());
755 if(!node->is_subtask()) {
758 if(node->_work.index() == 2 && !node->_subgraph->empty()) {
759 while(!node->_dependents.empty() && node->_dependents.back()->is_subtask()) {
760 node->_dependents.pop_back();
763 node->_num_dependents =
static_cast<int>(node->_dependents.size());
764 node->unset_spawned();
768 Node* cache {
nullptr};
770 for(
size_t i=0; i<num_successors; ++i) {
771 if(--(node->_successors[i]->_num_dependents) == 0) {
773 _schedule(cache,
false);
775 cache = node->_successors[i];
780 _schedule(cache,
true);
784 if(num_successors == 0) {
785 if(--(node->_topology->_num_sinks) == 0) {
786 _tear_down_topology(node->_topology);
792 inline void Executor::_invoke_static_work(
unsigned me, Node* node) {
794 _observer->on_entry(me,
TaskView(node));
795 std::invoke(std::get<Node::StaticWork>(node->_work));
796 _observer->on_exit(me,
TaskView(node));
799 std::invoke(std::get<Node::StaticWork>(node->_work));
804 inline void Executor::_invoke_dynamic_work(
unsigned me, Node* node,
Subflow& sf) {
806 _observer->on_entry(me,
TaskView(node));
807 std::invoke(std::get<Node::DynamicWork>(node->_work), sf);
808 _observer->on_exit(me,
TaskView(node));
811 std::invoke(std::get<Node::DynamicWork>(node->_work), sf);
816 inline void Executor::_invoke_unsync(Node* node,
std::stack<Node*>& stack)
const {
818 const auto num_successors = node->num_successors();
822 if(
auto index=node->_work.index(); index == 1) {
823 if(node->_module !=
nullptr) {
824 bool first_time = !node->is_spawned();
825 std::invoke(std::get<Node::StaticWork>(node->_work));
831 std::invoke(std::get<Node::StaticWork>(node->_work));
835 else if (index == 2){
838 if(!node->is_spawned()) {
839 if(node->_subgraph) {
840 node->_subgraph->clear();
843 node->_subgraph.emplace();
847 Subflow fb(*(node->_subgraph));
849 std::invoke(std::get<Node::DynamicWork>(node->_work), fb);
852 if(!node->is_spawned()) {
854 if(!node->_subgraph->empty()) {
856 PassiveVector<Node*> src;
857 for(
auto& n: node->_subgraph->nodes()) {
858 n->_topology = node->_topology;
860 if(n->num_successors() == 0) {
862 node->_topology->_num_sinks++;
868 if(n->num_dependents() == 0) {
869 src.push_back(n.get());
873 _schedule_unsync(src, stack);
886 if(!node->is_subtask()) {
889 if(node->_work.index() == 2 && !node->_subgraph->empty()) {
890 while(!node->_dependents.empty() && node->_dependents.back()->is_subtask()) {
891 node->_dependents.pop_back();
894 node->_num_dependents =
static_cast<int>(node->_dependents.size());
895 node->unset_spawned();
899 for(
size_t i=0; i<num_successors; ++i) {
900 if(--(node->_successors[i]->_num_dependents) == 0) {
901 _schedule_unsync(node->_successors[i], stack);
906 if(num_successors == 0) {
907 --(node->_topology->_num_sinks);
913 return run_n(f, 1, [](){});
917 template <
typename C>
919 static_assert(std::is_invocable<C>::value);
920 return run_n(f, 1, std::forward<C>(c));
925 return run_n(f, repeat, [](){});
929 template <
typename C>
931 return run_until(f, [repeat]()
mutable {
return repeat-- == 0; }, std::forward<C>(c));
937 return run_until(f, std::forward<P>(pred), [](){});
941 inline void Executor::_tear_down_topology(Topology* tpg) {
943 auto &f = tpg->_taskflow;
948 if(!std::invoke(tpg->_pred)) {
949 tpg->_recover_num_sinks();
950 _schedule(tpg->_sources);
955 if(tpg->_call !=
nullptr) {
956 std::invoke(tpg->_call);
962 if(f._topologies.size() > 1) {
965 tpg->_promise.set_value();
966 f._topologies.pop_front();
970 _decrement_topology();
972 f._topologies.front()._bind(f._graph);
973 _schedule(f._topologies.front()._sources);
976 assert(f._topologies.size() == 1);
980 auto p {std::move(tpg->_promise)};
982 f._topologies.pop_front();
989 _decrement_topology_and_notify();
995 template <
typename P,
typename C>
999 static_assert(std::is_invocable_v<C> && std::is_invocable_v<P>);
1001 _increment_topology();
1004 if(std::invoke(pred)) {
1006 promise.set_value();
1007 _decrement_topology_and_notify();
1008 return promise.get_future();
1014 if(_workers.size() == 0 || f.
empty()) {
1016 Topology tpg(f, std::forward<P>(pred), std::forward<C>(c));
1019 tpg._bind(f._graph);
1024 _schedule_unsync(tpg._sources, stack);
1025 while(!stack.empty()) {
1026 auto node = stack.top();
1028 _invoke_unsync(node, stack);
1030 tpg._recover_num_sinks();
1031 }
while(!std::invoke(tpg._pred));
1033 if(tpg._call !=
nullptr) {
1034 std::invoke(tpg._call);
1037 tpg._promise.set_value();
1039 _decrement_topology_and_notify();
1041 return tpg._promise.get_future();
1045 bool run_now {
false};
1050 std::scoped_lock lock(f._mtx);
1053 tpg = &(f._topologies.emplace_back(f, std::forward<P>(pred), std::forward<C>(c)));
1054 future = tpg->_promise.get_future();
1056 if(f._topologies.size() == 1) {
1066 tpg->_bind(f._graph);
1067 _schedule(tpg->_sources);
1074 inline void Executor::_increment_topology() {
1075 std::scoped_lock lock(_topology_mutex);
1080 inline void Executor::_decrement_topology_and_notify() {
1081 std::scoped_lock lock(_topology_mutex);
1082 if(--_num_topologies == 0) {
1083 _topology_cv.notify_all();
1088 inline void Executor::_decrement_topology() {
1089 std::scoped_lock lock(_topology_mutex);
1096 _topology_cv.wait(lock, [&](){
return _num_topologies == 0; });
std::future< void > run(Taskflow &taskflow)
runs the taskflow once
Definition: executor.hpp:912
void remove_observer()
removes the associated observer
Definition: executor.hpp:502
bool empty() const noexcept
queries if the queue is empty at the time of this call
Definition: spmc_queue.hpp:172
std::future< void > run_until(Taskflow &taskflow, P &&pred)
runs the taskflow multiple times until the predicate becomes true and then invokes a callback ...
Definition: executor.hpp:936
~Executor()
destructs the executor
Definition: executor.hpp:270
void push(O &&item)
inserts an item to the queue
Definition: spmc_queue.hpp:189
Definition: taskflow.hpp:5
T hardware_concurrency(T... args)
bool detached() const
queries if the subflow will be detached from its parent task
Definition: flow_builder.hpp:869
Observer * make_observer(Args &&... args)
constructs an observer to inspect the activities of worker threads
Definition: executor.hpp:493
the class to create a task dependency graph
Definition: core/taskflow.hpp:15
A constant wrapper class to a task node, mainly used in the tf::ExecutorObserver interface.
Definition: task.hpp:370
bool empty() const
queries the emptiness of the taskflow
Definition: core/taskflow.hpp:122
bool joined() const
queries if the subflow will join its parent task
Definition: flow_builder.hpp:874
Lock-free unbounded single-producer multiple-consumer queue.
Definition: spmc_queue.hpp:29
The executor class to run a taskflow graph.
Definition: executor.hpp:88
size_t num_workers() const
queries the number of worker threads (can be zero)
Definition: executor.hpp:285
std::optional< T > steal()
steals an item from the queue
Definition: spmc_queue.hpp:239
Executor(unsigned n=std::thread::hardware_concurrency())
constructs the executor with N worker threads
Definition: executor.hpp:262
The building blocks of dynamic tasking.
Definition: flow_builder.hpp:817
std::future< void > run_n(Taskflow &taskflow, size_t N)
runs the taskflow for N times
Definition: executor.hpp:924
void wait_for_all()
wait for all pending graphs to complete
Definition: executor.hpp:1094