Skip to content

Commit 736aef9

Browse files
updated topology pool
1 parent 7c71186 commit 736aef9

2 files changed

Lines changed: 35 additions & 25 deletions

File tree

taskflow/core/executor.hpp

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ class Executor {
231231
void _invoke_static_work(unsigned, Node*);
232232
void _invoke_dynamic_work(unsigned, Node*, Subflow&);
233233
void _init_module_node(Node*);
234-
void _tear_down_topology(Topology&);
234+
void _tear_down_topology(Topology*);
235235
};
236236

237237
// Constructor
@@ -682,7 +682,7 @@ inline void Executor::_invoke(unsigned me, Node* node) {
682682
if(num_successors == 0) {
683683
if(--(node->_topology->_num_sinks) == 0) {
684684
if(_workers.size() > 0) { // finishing this topology
685-
_tear_down_topology(*node->_topology);
685+
_tear_down_topology(node->_topology);
686686
}
687687
}
688688
}
@@ -742,22 +742,24 @@ std::future<void> Executor::run_until(Taskflow& f, P&& pred) {
742742
}
743743

744744
// Function: _tear_down_topology
745-
inline void Executor::_tear_down_topology(Topology& tpg) {
745+
inline void Executor::_tear_down_topology(Topology* tpg) {
746746

747-
auto &f = tpg._taskflow;
747+
auto &f = tpg->_taskflow;
748748

749-
//assert(&tpg == &(f._topologies.front()));
749+
assert(tpg == f._topologies.front());
750750

751751
// case 1: we still need to run the topology again
752-
if(!std::invoke(tpg._pred)) {
753-
tpg._recover_num_sinks();
754-
_schedule(tpg._sources);
752+
if(!std::invoke(tpg->_pred)) {
753+
tpg->_recover_num_sinks();
754+
_schedule(tpg->_sources);
755755
}
756756
// case 2: the final run of this topology
757757
else {
758+
759+
auto& pool = per_thread_object_pool<Topology>();
758760

759-
if(tpg._call != nullptr) {
760-
std::invoke(tpg._call);
761+
if(tpg->_call != nullptr) {
762+
std::invoke(tpg->_call);
761763
}
762764

763765
f._mtx.lock();
@@ -766,7 +768,8 @@ inline void Executor::_tear_down_topology(Topology& tpg) {
766768
if(f._topologies.size() > 1) {
767769

768770
// Set the promise
769-
tpg._promise.set_value();
771+
tpg->_promise.set_value();
772+
pool.recycle(tpg);
770773
f._topologies.pop_front();
771774
f._mtx.unlock();
772775

@@ -775,15 +778,16 @@ inline void Executor::_tear_down_topology(Topology& tpg) {
775778
_num_topologies--;
776779
}
777780

778-
f._topologies.front()._bind(f._graph);
779-
_schedule(f._topologies.front()._sources);
781+
f._topologies.front()->_bind(f._graph);
782+
_schedule(f._topologies.front()->_sources);
780783
}
781784
else {
782785
assert(f._topologies.size() == 1);
783786
// Need to back up the promise first here becuz taskflow might be
784787
// destroy before taskflow leaves
785-
auto p {std::move(tpg._promise)};
786-
788+
auto p {std::move(tpg->_promise)};
789+
790+
pool.recycle(tpg);
787791
f._topologies.pop_front();
788792

789793
f._mtx.unlock();
@@ -809,25 +813,29 @@ std::future<void> Executor::run_until(Taskflow& f, P&& pred, C&& c) {
809813
if(std::invoke(pred)) {
810814
return std::async(std::launch::deferred, [](){});
811815
}
816+
817+
auto& pool = per_thread_object_pool<Topology>();
812818

813819
// Speicla case of zero workers needs
814820
// - iterative execution to avoid stack overflow
815821
// - avoid execution of last_work
816822
if(_workers.size() == 0) {
817823

818-
Topology tpg(f, std::forward<P>(pred), std::forward<C>(c));
824+
auto tpg = pool.get(f, std::forward<P>(pred), std::forward<C>(c));
819825

820826
// Clear last execution data & Build precedence between nodes and target
821-
tpg._bind(f._graph);
827+
tpg->_bind(f._graph);
822828

823829
do {
824-
_schedule(tpg._sources);
825-
tpg._recover_num_sinks();
826-
} while(!std::invoke(tpg._pred));
830+
_schedule(tpg->_sources);
831+
tpg->_recover_num_sinks();
832+
} while(!std::invoke(tpg->_pred));
827833

828-
if(tpg._call != nullptr) {
829-
std::invoke(tpg._call);
834+
if(tpg->_call != nullptr) {
835+
std::invoke(tpg->_call);
830836
}
837+
838+
pool.recycle(tpg);
831839

832840
return std::async(std::launch::deferred, [](){});
833841
}
@@ -847,7 +855,9 @@ std::future<void> Executor::run_until(Taskflow& f, P&& pred, C&& c) {
847855
std::scoped_lock lock(f._mtx);
848856

849857
// create a topology for this run
850-
tpg = &(f._topologies.emplace_back(f, std::forward<P>(pred), std::forward<C>(c)));
858+
//tpg = &(f._topologies.emplace_back(f, std::forward<P>(pred), std::forward<C>(c)));
859+
tpg = pool.get(f, std::forward<P>(pred), std::forward<C>(c));
860+
f._topologies.push_back(tpg);
851861
future = tpg->_promise.get_future();
852862

853863
if(f._topologies.size() == 1) {

taskflow/core/taskflow.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,9 @@ class Taskflow : public FlowBuilder {
8383

8484
std::mutex _mtx;
8585

86-
std::list<Topology> _topologies;
86+
//std::list<Topology> _topologies;
8787

88-
//std::deque<Topology*> _topologies;
88+
std::deque<Topology*> _topologies;
8989
};
9090

9191
// Constructor

0 commit comments

Comments
 (0)