Skip to content

Commit 1b6735b

Browse files
updated work stealing queue
1 parent acd9c9d commit 1b6735b

4 files changed

Lines changed: 42 additions & 18 deletions

File tree

example/simple.cpp

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,8 @@
66
#include <taskflow/taskflow.hpp> // the only include you need
77
#include <new>
88

9-
struct keep_apart {
10-
alignas(64) std::atomic<int> cat;
11-
alignas(64) std::atomic<int> dog;
12-
};
13-
149
int main(){
1510

16-
std::cout << sizeof(keep_apart);
17-
1811
tf::Taskflow tf;
1912

2013
auto [A, B, C, D] = tf.silent_emplace( // the taskflow graph

taskflow/threadpool/privatized_threadpool.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
// 2018/12/06 - this class is left for probation
2+
//
13
// 2018/11/28 - modified by Chun-Xun Lin
24
//
35
// Added the method batch to insert a vector of tasks.

taskflow/threadpool/speculative_threadpool.hpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,10 @@ void SpeculativeThreadpool<Closure>::emplace(ArgsT&&... args) {
241241

242242
template <typename Closure>
243243
void SpeculativeThreadpool<Closure>::batch(std::vector<Closure>&& tasks){
244-
size_t consumed {0};
244+
245+
if(tasks.empty()) {
246+
return;
247+
}
245248

246249
//no worker thread available
247250
if(num_workers() == 0){
@@ -250,6 +253,8 @@ void SpeculativeThreadpool<Closure>::batch(std::vector<Closure>&& tasks){
250253
}
251254
return;
252255
}
256+
257+
size_t consumed {0};
253258

254259
// speculation
255260
if(std::this_thread::get_id() != _owner){

taskflow/threadpool/workstealing_threadpool.hpp

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ class WorkStealingThreadpool {
227227
struct Worker {
228228
std::condition_variable cv;
229229
WorkStealingQueue<Closure> queue;
230+
std::optional<Closure> cache;
230231
bool exit {false};
231232
bool ready {false};
232233
uint64_t seed;
@@ -256,7 +257,6 @@ class WorkStealingThreadpool {
256257
mutable std::mutex _mutex;
257258

258259
std::vector<Worker> _workers;
259-
//std::vector<Worker*> _idlers;
260260
std::vector<std::thread> _threads;
261261

262262
std::unordered_map<std::thread::id, unsigned> _worker_maps;
@@ -366,18 +366,27 @@ void WorkStealingThreadpool<Closure>::_spawn(unsigned N) {
366366
lock.lock();
367367
if(_queue.empty()) {
368368
w.ready = false;
369-
//_idlers.push_back(&w);
370369
_idlers.push(&w);
371370
while(!w.ready && !w.exit) {
372371
w.cv.wait(lock);
373372
}
374373
}
375374
lock.unlock();
375+
376+
if(w.cache) {
377+
std::swap(t, w.cache);
378+
}
376379
}
377380

378381
while(t) {
379382
(*t)();
380-
t = w.queue.pop();
383+
if(w.cache) {
384+
t = std::move(*w.cache);
385+
w.cache = std::nullopt;
386+
}
387+
else {
388+
t = w.queue.pop();
389+
}
381390
}
382391
} // End of while ------------------------------------------------------
383392

@@ -409,6 +418,7 @@ void WorkStealingThreadpool<Closure>::_balance_load(unsigned me) {
409418
if(auto idler = _idlers.steal(); idler) {
410419
(*idler)->ready = true;
411420
(*idler)->victim_hint = me;
421+
(*idler)->cache = _workers[me].queue.steal();
412422
(*idler)->cv.notify_one();
413423
factor += load_balancing_factor;
414424
}
@@ -473,7 +483,12 @@ void WorkStealingThreadpool<Closure>::emplace(ArgsT&&... args){
473483

474484
unsigned me = itr->second;
475485

476-
_workers[me].queue.push(Closure{std::forward<ArgsT>(args)...});
486+
if(_workers[me].cache) {
487+
_workers[me].queue.push(Closure{std::forward<ArgsT>(args)...});
488+
}
489+
else {
490+
_workers[me].cache.emplace(std::forward<ArgsT>(args)...);
491+
}
477492

478493
// load balancing
479494
_balance_load(me);
@@ -484,7 +499,7 @@ void WorkStealingThreadpool<Closure>::emplace(ArgsT&&... args){
484499

485500
if(auto idler = _idlers.steal(); idler) {
486501
(*idler)->ready = true;
487-
(*idler)->queue.push(Closure{std::forward<ArgsT>(args)...});
502+
(*idler)->cache.emplace(std::forward<ArgsT>(args)...);
488503
(*idler)->cv.notify_one();
489504
}
490505
else {
@@ -497,6 +512,10 @@ void WorkStealingThreadpool<Closure>::emplace(ArgsT&&... args){
497512
template <typename Closure>
498513
void WorkStealingThreadpool<Closure>::batch(std::vector<Closure>&& tasks) {
499514

515+
if(tasks.empty()) {
516+
return;
517+
}
518+
500519
//no worker thread available
501520
if(num_workers() == 0){
502521
for(auto &t: tasks){
@@ -512,9 +531,15 @@ void WorkStealingThreadpool<Closure>::batch(std::vector<Closure>&& tasks) {
512531
if(auto itr = _worker_maps.find(tid); itr != _worker_maps.end()){
513532

514533
unsigned me = itr->second;
534+
535+
size_t i = 0;
515536

516-
for(auto& t : tasks) {
517-
_workers[me].queue.push(std::move(t));
537+
if(!_workers[me].cache) {
538+
_workers[me].cache = std::move(tasks[i++]);
539+
}
540+
541+
for(; i<tasks.size(); ++i) {
542+
_workers[me].queue.push(std::move(tasks[i]));
518543
}
519544

520545
// load balancing
@@ -535,11 +560,10 @@ void WorkStealingThreadpool<Closure>::batch(std::vector<Closure>&& tasks) {
535560
while(!_queue.empty()) {
536561
if(auto idler = _idlers.steal(); idler) {
537562
(*idler)->ready = true;
563+
(*idler)->cache = _queue.steal();
538564
(*idler)->cv.notify_one();
539565
}
540-
else {
541-
break;
542-
}
566+
else break;
543567
}
544568
}
545569

0 commit comments

Comments
 (0)