@@ -270,9 +270,6 @@ class Executor {
270270
271271 private:
272272
273- // inline static thread_local PerThread _per_thread;
274- // inline static thread_local Worker* _this_worker {nullptr};
275-
276273 std::condition_variable _topology_cv;
277274 std::mutex _taskflow_mutex;
278275 std::mutex _topology_mutex;
@@ -365,9 +362,6 @@ inline Executor::~Executor() {
365362 for (auto & t : _threads){
366363 t.join ();
367364 }
368-
369- // flush the default observer
370- // _flush_tfprof();
371365}
372366
373367// Function: num_workers
@@ -440,7 +434,6 @@ void Executor::silent_async(F&& f, ArgsT&&... args) {
440434
441435// Function: this_worker_id
442436inline int Executor::this_worker_id () const {
443- // auto worker = _per_thread.worker;
444437 Worker* worker = this_worker ().worker ;
445438 return worker ? static_cast <int >(worker->_id ) : -1 ;
446439}
@@ -456,7 +449,6 @@ inline void Executor::_spawn(size_t N) {
456449
457450 _threads.emplace_back ([this ] (Worker& w) -> void {
458451
459- // _per_thread.worker = &w;
460452 this_worker ().worker = &w;
461453
462454 Node* t = nullptr ;
@@ -641,7 +633,6 @@ inline void Executor::_schedule(Node* node) {
641633 node->_state .fetch_or (Node::READY, std::memory_order_release);
642634
643635 // caller is a worker to this pool
644- // auto worker = _per_thread.worker;
645636 auto worker = this_worker ().worker ;
646637
647638 if (worker != nullptr && worker->_executor == this ) {
@@ -679,7 +670,6 @@ inline void Executor::_schedule(const std::vector<Node*>& nodes) {
679670 }
680671
681672 // worker thread
682- // auto worker = _per_thread.worker;
683673 auto worker = this_worker ().worker ;
684674
685675 if (worker != nullptr && worker->_executor == this ) {
@@ -921,7 +911,6 @@ inline void Executor::_invoke_dynamic_task(Worker& w, Node* node) {
921911// Procedure: _invoke_dynamic_task_external
922912inline void Executor::_invoke_dynamic_task_external (Node*p, Graph& g, bool detach) {
923913
924- // auto worker = _per_thread.worker;
925914 auto worker = this_worker ().worker ;
926915
927916 // assert(worker && worker->_executor == this);
@@ -1206,7 +1195,6 @@ inline void Executor::_set_up_topology(Topology* tpg) {
12061195 for (auto node : tpg->_taskflow ._graph ._nodes ) {
12071196
12081197 node->_topology = tpg;
1209- // node->_clear_state();
12101198 node->_state .store (0 , std::memory_order_relaxed);
12111199
12121200 if (node->num_dependents () == 0 ) {
@@ -1230,10 +1218,9 @@ inline void Executor::_tear_down_topology(Topology* tpg) {
12301218 // case 1: we still need to run the topology again
12311219 if (!tpg->_is_cancelled && !tpg->_pred ()) {
12321220 // assert(tpg->_join_counter == 0);
1233- f._mutex . lock ( );
1221+ std::lock_guard<std::mutex> lock ( f._mutex );
12341222 tpg->_join_counter = tpg->_sources .size ();
12351223 _schedule (tpg->_sources );
1236- f._mutex .unlock ();
12371224 }
12381225 // case 2: the final run of this topology
12391226 else {
@@ -1244,11 +1231,8 @@ inline void Executor::_tear_down_topology(Topology* tpg) {
12441231 tpg->_call ();
12451232 }
12461233
1247- f._mutex .lock ();
1248-
12491234 // If there is another run (interleave between lock)
1250- if (f._topologies .size () > 1 ) {
1251-
1235+ if (std::unique_lock<std::mutex> lock (f._mutex ); f._topologies .size ()>1 ) {
12521236 // assert(tpg->_join_counter == 0);
12531237
12541238 // Set the promise
@@ -1259,12 +1243,10 @@ inline void Executor::_tear_down_topology(Topology* tpg) {
12591243 // decrement the topology but since this is not the last we don't notify
12601244 _decrement_topology ();
12611245
1262- _set_up_topology (tpg);
1263-
12641246 // set up topology needs to be under the lock or it can
12651247 // introduce memory order error with pop
1266- f. _mutex . unlock ( );
1267- }
1248+ _set_up_topology (tpg );
1249+ }
12681250 else {
12691251 // assert(f._topologies.size() == 1);
12701252
@@ -1283,7 +1265,8 @@ inline void Executor::_tear_down_topology(Topology* tpg) {
12831265 // Now we remove the topology from this taskflow
12841266 f._topologies .pop ();
12851267
1286- f._mutex .unlock ();
1268+ // f._mutex.unlock();
1269+ lock.unlock ();
12871270
12881271 // We set the promise in the end in case taskflow leaves the scope.
12891272 // After set_value, the caller will return from wait
@@ -1298,7 +1281,6 @@ inline void Executor::_tear_down_topology(Topology* tpg) {
12981281 std::scoped_lock<std::mutex> lock (_taskflow_mutex);
12991282 _taskflows.erase (*s);
13001283 }
1301-
13021284 }
13031285 }
13041286}
0 commit comments