Skip to content

Commit cb9713b

Browse files
remove topology objectpool
1 parent 736aef9 commit cb9713b

File tree

2 files changed

+25
-35
lines changed

2 files changed

+25
-35
lines changed

taskflow/core/executor.hpp

Lines changed: 23 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ class Executor {
231231
void _invoke_static_work(unsigned, Node*);
232232
void _invoke_dynamic_work(unsigned, Node*, Subflow&);
233233
void _init_module_node(Node*);
234-
void _tear_down_topology(Topology*);
234+
void _tear_down_topology(Topology&);
235235
};
236236

237237
// Constructor
@@ -682,7 +682,7 @@ inline void Executor::_invoke(unsigned me, Node* node) {
682682
if(num_successors == 0) {
683683
if(--(node->_topology->_num_sinks) == 0) {
684684
if(_workers.size() > 0) { // finishing this topology
685-
_tear_down_topology(node->_topology);
685+
_tear_down_topology(*node->_topology);
686686
}
687687
}
688688
}
@@ -742,24 +742,22 @@ std::future<void> Executor::run_until(Taskflow& f, P&& pred) {
742742
}
743743

744744
// Function: _tear_down_topology
745-
inline void Executor::_tear_down_topology(Topology* tpg) {
745+
inline void Executor::_tear_down_topology(Topology& tpg) {
746746

747-
auto &f = tpg->_taskflow;
747+
auto &f = tpg._taskflow;
748748

749-
assert(tpg == f._topologies.front());
749+
//assert(&tpg == &(f._topologies.front()));
750750

751751
// case 1: we still need to run the topology again
752-
if(!std::invoke(tpg->_pred)) {
753-
tpg->_recover_num_sinks();
754-
_schedule(tpg->_sources);
752+
if(!std::invoke(tpg._pred)) {
753+
tpg._recover_num_sinks();
754+
_schedule(tpg._sources);
755755
}
756756
// case 2: the final run of this topology
757757
else {
758-
759-
auto& pool = per_thread_object_pool<Topology>();
760758

761-
if(tpg->_call != nullptr) {
762-
std::invoke(tpg->_call);
759+
if(tpg._call != nullptr) {
760+
std::invoke(tpg._call);
763761
}
764762

765763
f._mtx.lock();
@@ -768,8 +766,7 @@ inline void Executor::_tear_down_topology(Topology* tpg) {
768766
if(f._topologies.size() > 1) {
769767

770768
// Set the promise
771-
tpg->_promise.set_value();
772-
pool.recycle(tpg);
769+
tpg._promise.set_value();
773770
f._topologies.pop_front();
774771
f._mtx.unlock();
775772

@@ -778,16 +775,15 @@ inline void Executor::_tear_down_topology(Topology* tpg) {
778775
_num_topologies--;
779776
}
780777

781-
f._topologies.front()->_bind(f._graph);
782-
_schedule(f._topologies.front()->_sources);
778+
f._topologies.front()._bind(f._graph);
779+
_schedule(f._topologies.front()._sources);
783780
}
784781
else {
785782
assert(f._topologies.size() == 1);
786783
// Need to back up the promise first here becuz taskflow might be
787784
// destroy before taskflow leaves
788-
auto p {std::move(tpg->_promise)};
789-
790-
pool.recycle(tpg);
785+
auto p {std::move(tpg._promise)};
786+
791787
f._topologies.pop_front();
792788

793789
f._mtx.unlock();
@@ -813,29 +809,25 @@ std::future<void> Executor::run_until(Taskflow& f, P&& pred, C&& c) {
813809
if(std::invoke(pred)) {
814810
return std::async(std::launch::deferred, [](){});
815811
}
816-
817-
auto& pool = per_thread_object_pool<Topology>();
818812

819813
// Speicla case of zero workers needs
820814
// - iterative execution to avoid stack overflow
821815
// - avoid execution of last_work
822816
if(_workers.size() == 0) {
823817

824-
auto tpg = pool.get(f, std::forward<P>(pred), std::forward<C>(c));
818+
Topology tpg(f, std::forward<P>(pred), std::forward<C>(c));
825819

826820
// Clear last execution data & Build precedence between nodes and target
827-
tpg->_bind(f._graph);
821+
tpg._bind(f._graph);
828822

829823
do {
830-
_schedule(tpg->_sources);
831-
tpg->_recover_num_sinks();
832-
} while(!std::invoke(tpg->_pred));
824+
_schedule(tpg._sources);
825+
tpg._recover_num_sinks();
826+
} while(!std::invoke(tpg._pred));
833827

834-
if(tpg->_call != nullptr) {
835-
std::invoke(tpg->_call);
828+
if(tpg._call != nullptr) {
829+
std::invoke(tpg._call);
836830
}
837-
838-
pool.recycle(tpg);
839831

840832
return std::async(std::launch::deferred, [](){});
841833
}
@@ -855,9 +847,7 @@ std::future<void> Executor::run_until(Taskflow& f, P&& pred, C&& c) {
855847
std::scoped_lock lock(f._mtx);
856848

857849
// create a topology for this run
858-
//tpg = &(f._topologies.emplace_back(f, std::forward<P>(pred), std::forward<C>(c)));
859-
tpg = pool.get(f, std::forward<P>(pred), std::forward<C>(c));
860-
f._topologies.push_back(tpg);
850+
tpg = &(f._topologies.emplace_back(f, std::forward<P>(pred), std::forward<C>(c)));
861851
future = tpg->_promise.get_future();
862852

863853
if(f._topologies.size() == 1) {

taskflow/core/taskflow.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,9 @@ class Taskflow : public FlowBuilder {
8383

8484
std::mutex _mtx;
8585

86-
//std::list<Topology> _topologies;
86+
std::list<Topology> _topologies;
8787

88-
std::deque<Topology*> _topologies;
88+
//std::deque<Topology*> _topologies;
8989
};
9090

9191
// Constructor

0 commit comments

Comments
 (0)