Skip to content

Commit dcbb316

Browse files
updated executor
1 parent c6bee3a commit dcbb316

1 file changed

Lines changed: 22 additions & 13 deletions

File tree

taskflow/core/executor.hpp

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ class Executor {
234234

235235
PerThread& _per_thread() const;
236236

237-
bool _wait_for_tasks(unsigned, std::optional<Node*>&);
237+
bool _wait_for_task(unsigned, std::optional<Node*>&);
238238

239239
void _spawn(unsigned);
240240
void _exploit_task(unsigned, std::optional<Node*>&);
@@ -302,7 +302,7 @@ inline void Executor::_spawn(unsigned N) {
302302
_exploit_task(i, t);
303303

304304
// wait for tasks
305-
if(_wait_for_tasks(i, t) == false) {
305+
if(_wait_for_task(i, t) == false) {
306306
break;
307307
}
308308
}
@@ -403,6 +403,7 @@ inline void Executor::_exploit_task(unsigned i, std::optional<Node*>& t) {
403403

404404
if(t) {
405405
auto& worker = _workers[i];
406+
// TODO: study the memory order here
406407
if(_num_actives.fetch_add(1) == 0 && _num_thieves == 0) {
407408
_notifier.notify(false);
408409
}
@@ -423,13 +424,17 @@ inline void Executor::_exploit_task(unsigned i, std::optional<Node*>& t) {
423424
}
424425
}
425426

426-
// Function: _wait_for_tasks
427-
inline bool Executor::_wait_for_tasks(unsigned me, std::optional<Node*>& t) {
427+
// Function: _wait_for_task
428+
inline bool Executor::_wait_for_task(unsigned me, std::optional<Node*>& t) {
428429

430+
wait_for_task:
429431

430-
begin_steal:
432+
assert(!t);
431433

432434
++_num_thieves;
435+
436+
explore_task:
437+
433438
assert(_num_thieves <= _workers.size());
434439

435440
if(_explore_task(me, t); t) {
@@ -439,19 +444,23 @@ inline bool Executor::_wait_for_tasks(unsigned me, std::optional<Node*>& t) {
439444
return true;
440445
}
441446

442-
assert(!t);
443-
444447
_notifier.prepare_wait(&_waiters[me]);
445448

446449
//if(auto vtm = _find_victim(me); vtm != _workers.size()) {
447450
if(!_queue.empty()) {
451+
448452
_notifier.cancel_wait(&_waiters[me]);
449-
t = _queue.steal();
450-
if(auto N = _num_thieves.fetch_sub(1); t && N == 1) {
451-
_notifier.notify(false);
452-
}
453453
//t = (vtm == me) ? _queue.steal() : _workers[vtm].queue.steal();
454-
return true;
454+
455+
if(t = _queue.steal(); t) {
456+
if(auto N = _num_thieves.fetch_sub(1); N == 1) {
457+
_notifier.notify(false);
458+
}
459+
return true;
460+
}
461+
else {
462+
goto explore_task;
463+
}
455464
}
456465

457466
//if(size_t I = ++_num_idlers; _done && I == _workers.size()) {
@@ -473,7 +482,7 @@ inline bool Executor::_wait_for_tasks(unsigned me, std::optional<Node*>& t) {
473482

474483
if(_num_thieves.fetch_sub(1) == 1 && _num_actives) {
475484
_notifier.cancel_wait(&_waiters[me]);
476-
goto begin_steal;
485+
goto wait_for_task;
477486
}
478487

479488
//if(_num_actives && _num_thieves.load(std::memory_order_relaxed) == 0) {

0 commit comments

Comments
 (0)