Skip to content

Commit 45521c7

Browse files
updated executor
1 parent 3ce3593 commit 45521c7

1 file changed

Lines changed: 16 additions & 13 deletions

File tree

taskflow/core/executor.hpp

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,11 @@ inline void Executor::_schedule(Node* node) {
509509
// Each task node has two types of tasks - regular and subflow.
510510
inline void Executor::_schedule(PassiveVector<Node*>& nodes) {
511511

512-
if(nodes.empty()) {
512+
// We need to cacth the node count to avoid accessing the nodes
513+
// vector while the parent topology is removed!
514+
const auto num_nodes = nodes.size();
515+
516+
if(num_nodes == 0) {
513517
return;
514518
}
515519

@@ -530,20 +534,20 @@ inline void Executor::_schedule(PassiveVector<Node*>& nodes) {
530534
auto& pt = _per_thread();
531535

532536
if(pt.pool == this) {
533-
for(size_t i=0; i<nodes.size(); ++i) {
537+
for(size_t i=0; i<num_nodes; ++i) {
534538
_workers[pt.worker_id].queue.push(nodes[i]);
535539
}
536540
return;
537541
}
538542

539543
{
540544
//std::scoped_lock lock(_mutex);
541-
for(size_t k=0; k<nodes.size(); ++k) {
545+
for(size_t k=0; k<num_nodes; ++k) {
542546
_queue.push(nodes[k]);
543547
}
544548
}
545549

546-
size_t N = std::max(size_t{1}, std::min(_num_idlers.load(), nodes.size()));
550+
size_t N = std::max(size_t{1}, std::min(_num_idlers.load(), num_nodes));
547551

548552
if(N >= _workers.size()) {
549553
_notifier.notify(true);
@@ -778,7 +782,6 @@ inline void Executor::_sync_topology(Topology& tpg) {
778782
}
779783
}
780784

781-
782785
// Function: run_until
783786
template <typename P, typename C>
784787
std::future<void> Executor::run_until(Taskflow& f, P&& pred, C&& c) {
@@ -819,7 +822,7 @@ std::future<void> Executor::run_until(Taskflow& f, P&& pred, C&& c) {
819822
}
820823

821824
// Multi-threaded execution.
822-
//bool run_now {false};
825+
bool run_now {false};
823826
Topology* tpg;
824827
std::future<void> future;
825828

@@ -832,19 +835,19 @@ std::future<void> Executor::run_until(Taskflow& f, P&& pred, C&& c) {
832835

833836
// TODO: if we do this without lock protection, we got segfault...?
834837
if(f._topologies.size() == 1) {
835-
//run_now = true;
836-
tpg->_bind(f._graph);
837-
_schedule(tpg->_sources);
838+
run_now = true;
839+
//tpg->_bind(f._graph);
840+
//_schedule(tpg->_sources);
838841
}
839842
}
840843

841844
// Notice here calling schedule may cause the topology to be removed sonner
842845
// before the function leaves.
843-
//if(run_now) {
846+
if(run_now) {
844847
// TODO: seg fault
845-
// tplg->_bind(f._graph)
846-
// _schedule(tpg->_sources);
847-
//}
848+
tpg->_bind(f._graph);
849+
_schedule(tpg->_sources);
850+
}
848851

849852
return future;
850853
}

0 commit comments

Comments
 (0)