@@ -227,6 +227,7 @@ class WorkStealingThreadpool {
227227 struct Worker {
228228 std::condition_variable cv;
229229 WorkStealingQueue<Closure> queue;
230+ std::optional<Closure> cache;
230231 bool exit {false };
231232 bool ready {false };
232233 uint64_t seed;
@@ -256,7 +257,6 @@ class WorkStealingThreadpool {
256257 mutable std::mutex _mutex;
257258
258259 std::vector<Worker> _workers;
259- // std::vector<Worker*> _idlers;
260260 std::vector<std::thread> _threads;
261261
262262 std::unordered_map<std::thread::id, unsigned > _worker_maps;
@@ -366,18 +366,27 @@ void WorkStealingThreadpool<Closure>::_spawn(unsigned N) {
366366 lock.lock ();
367367 if (_queue.empty ()) {
368368 w.ready = false ;
369- // _idlers.push_back(&w);
370369 _idlers.push (&w);
371370 while (!w.ready && !w.exit ) {
372371 w.cv .wait (lock);
373372 }
374373 }
375374 lock.unlock ();
375+
376+ if (w.cache ) {
377+ std::swap (t, w.cache );
378+ }
376379 }
377380
378381 while (t) {
379382 (*t)();
380- t = w.queue .pop ();
383+ if (w.cache ) {
384+ t = std::move (*w.cache );
385+ w.cache = std::nullopt ;
386+ }
387+ else {
388+ t = w.queue .pop ();
389+ }
381390 }
382391 } // End of while ------------------------------------------------------
383392
@@ -409,6 +418,7 @@ void WorkStealingThreadpool<Closure>::_balance_load(unsigned me) {
409418 if (auto idler = _idlers.steal (); idler) {
410419 (*idler)->ready = true ;
411420 (*idler)->victim_hint = me;
421+ (*idler)->cache = _workers[me].queue .steal ();
412422 (*idler)->cv .notify_one ();
413423 factor += load_balancing_factor;
414424 }
@@ -473,7 +483,12 @@ void WorkStealingThreadpool<Closure>::emplace(ArgsT&&... args){
473483
474484 unsigned me = itr->second ;
475485
476- _workers[me].queue .push (Closure{std::forward<ArgsT>(args)...});
486+ if (_workers[me].cache ) {
487+ _workers[me].queue .push (Closure{std::forward<ArgsT>(args)...});
488+ }
489+ else {
490+ _workers[me].cache .emplace (std::forward<ArgsT>(args)...);
491+ }
477492
478493 // load balancing
479494 _balance_load (me);
@@ -484,7 +499,7 @@ void WorkStealingThreadpool<Closure>::emplace(ArgsT&&... args){
484499
485500 if (auto idler = _idlers.steal (); idler) {
486501 (*idler)->ready = true ;
487- (*idler)->queue . push (Closure{ std::forward<ArgsT>(args)...} );
502+ (*idler)->cache . emplace ( std::forward<ArgsT>(args)...);
488503 (*idler)->cv .notify_one ();
489504 }
490505 else {
@@ -497,6 +512,10 @@ void WorkStealingThreadpool<Closure>::emplace(ArgsT&&... args){
497512template <typename Closure>
498513void WorkStealingThreadpool<Closure>::batch(std::vector<Closure>&& tasks) {
499514
515+ if (tasks.empty ()) {
516+ return ;
517+ }
518+
500519 // no worker thread available
501520 if (num_workers () == 0 ){
502521 for (auto &t: tasks){
@@ -512,9 +531,15 @@ void WorkStealingThreadpool<Closure>::batch(std::vector<Closure>&& tasks) {
512531 if (auto itr = _worker_maps.find (tid); itr != _worker_maps.end ()){
513532
514533 unsigned me = itr->second ;
534+
535+ size_t i = 0 ;
515536
516- for (auto & t : tasks) {
517- _workers[me].queue .push (std::move (t));
537+ if (!_workers[me].cache ) {
538+ _workers[me].cache = std::move (tasks[i++]);
539+ }
540+
541+ for (; i<tasks.size (); ++i) {
542+ _workers[me].queue .push (std::move (tasks[i]));
518543 }
519544
520545 // load balancing
@@ -535,11 +560,10 @@ void WorkStealingThreadpool<Closure>::batch(std::vector<Closure>&& tasks) {
535560 while (!_queue.empty ()) {
536561 if (auto idler = _idlers.steal (); idler) {
537562 (*idler)->ready = true ;
563+ (*idler)->cache = _queue.steal ();
538564 (*idler)->cv .notify_one ();
539565 }
540- else {
541- break ;
542- }
566+ else break ;
543567 }
544568}
545569
0 commit comments