1+ // 2018/09/21 - modified by Tsung-Wei and Chun-Xun
2+ // - refactored the code
3+ //
4+ // TODO:
5+ // - Problems can occur when external threads insert tasks during spawn.
6+ //
17// 2018/09/12 - created by Tsung-Wei Huang and Chun-Xun Lin
28//
3- // Speculative threadpool is similar to proactive threadpool except
4- // each thread will speculatively move a new task to its local worker
5- // data structure to reduce extract hit to the task queue.
6- // This can save time from locking the mutex during dynamic tasking.
9+ // Implemented PrivatizedThreadpool using the data structre inspired
10+ // Eigen CXX/Threadpool.
711
812#pragma once
913
@@ -198,7 +202,7 @@ bool RunQueue<T, N>::empty() const {
198202}
199203
200204// Class: BasicPrivatizedThreadpool
201- template < template <typename ...> class Func >
205+ /* template < template<typename...> class Func >
202206class BasicPrivatizedThreadpool {
203207
204208 using TaskType = Func<void()>;
@@ -241,7 +245,7 @@ class BasicPrivatizedThreadpool {
241245 // TODO: do we need atomic variable here?
242246 std::atomic<size_t> _idle_workers {0};
243247
244- std::unordered_map<std::thread::id, size_t > _worker_map ;
248+ std::unordered_map<std::thread::id, size_t> _worker_maps ;
245249
246250 const std::thread::id _owner {std::this_thread::get_id()};
247251
@@ -254,7 +258,7 @@ class BasicPrivatizedThreadpool {
254258 // or make it relaxed
255259 std::atomic<size_t> _next_queue {0};
256260
257- size_t _nonempty_queue () const ;
261+ size_t _nonempty_worker_queue () const;
258262
259263 bool _sync {false};
260264
@@ -265,9 +269,9 @@ class BasicPrivatizedThreadpool {
265269}; // class BasicPrivatizedThreadpool. --------------------------------------
266270
267271
268- // Function: _nonempty_queue
272+ // Function: _nonempty_worker_queue
269273template < template<typename...> class Func >
270- size_t BasicPrivatizedThreadpool<Func>::_nonempty_queue () const {
274+ size_t BasicPrivatizedThreadpool<Func>::_nonempty_worker_queue () const {
271275 for(size_t i=0;i <_works.size(); ++i){
272276 if(!_works[i]->queue.empty()){
273277 return i;
@@ -378,7 +382,7 @@ void BasicPrivatizedThreadpool<Func>::shutdown(){
378382 _threads.clear();
379383
380384 _works.clear();
381- _worker_map .clear ();
385+ _worker_maps .clear();
382386
383387 _wait_for_all = false;
384388 _exiting = false;
@@ -405,7 +409,7 @@ void BasicPrivatizedThreadpool<Func>::spawn(unsigned N) {
405409
406410 const size_t sz = _threads.size();
407411
408- // Lock to synchronize all workers before creating _worker_maps
412+ // Lock to synchronize all workers before creating _worker_mapss
409413 std::scoped_lock<std::mutex> lock(_mutex);
410414
411415 _coprimes.clear();
@@ -440,7 +444,7 @@ void BasicPrivatizedThreadpool<Func>::spawn(unsigned N) {
440444 //while(!w.queue.pop_front(t) && _task_queue.empty()){
441445 if(++_idle_workers == num_workers() && _wait_for_all){
442446 // Last active thread checks if all queues are empty
443- if (auto ret = _nonempty_queue (); ret == num_workers ()){
447+ if(auto ret = _nonempty_worker_queue (); ret == num_workers()){
444448 // TODO: here only one thread will do so
445449 _sync = true;
446450 _empty_cv.notify_one();
@@ -468,7 +472,7 @@ void BasicPrivatizedThreadpool<Func>::spawn(unsigned N) {
468472 } // End of while ------------------------------------------------------
469473 });
470474
471- _worker_map .insert ({_threads.back ().get_id (), i+sz});
475+ _worker_maps .insert({_threads.back().get_id(), i+sz});
472476 } // End of For ---------------------------------------------------------------------------------
473477
474478}
@@ -529,8 +533,8 @@ void BasicPrivatizedThreadpool<Func>::silent_async(C&& c){
529533
530534 if(std::this_thread::get_id() != _owner){
531535 auto tid = std::this_thread::get_id();
532- if (_worker_map .find (tid) != _worker_map .end ()){
533- if (!_works[_worker_map .at (tid)]->queue .push_front (t)){
536+ if(_worker_maps .find(tid) != _worker_maps .end()){
537+ if(!_works[_worker_maps .at(tid)]->queue.push_front(t)){
534538 std::scoped_lock<std::mutex> lock(_mutex);
535539 _task_queue.push_back(std::move(t));
536540 }
@@ -575,13 +579,10 @@ void BasicPrivatizedThreadpool<Func>::wait_for_all() {
575579
576580 _sync = false;
577581 _wait_for_all = false;
578- }
582+ } */
579583
580584
581585
582- /* A different implementation
583-
584-
585586template < template <typename ...> class Func >
586587class BasicPrivatizedThreadpool {
587588
@@ -619,44 +620,42 @@ class BasicPrivatizedThreadpool {
619620 auto async (C&&);
620621
621622 private:
623+
624+ const std::thread::id _owner {std::this_thread::get_id ()};
622625
623626 mutable std::mutex _mutex;
624627
625628 std::condition_variable _empty_cv;
626629
627630 std::deque<TaskType> _task_queue;
628631 std::vector<std::thread> _threads;
632+ std::vector<size_t > _coprimes;
633+
634+ std::unordered_map<std::thread::id, size_t > _worker_maps;
635+ std::vector<std::unique_ptr<Worker>> _workers;
629636
630637 // TODO: do we need atomic variable here?
631- //std::atomic<size_t> _idle_workers {0};
632- size_t _idle_workers {0};
638+ std::atomic<bool > _allow_steal {true };
633639
634- std::unordered_map<std::thread::id, size_t> _worker_map;
640+ size_t _idle_workers {0 };
641+ size_t _next_queue {0 };
635642
636- const std::thread::id _owner {std::this_thread::get_id()};
637-
638643 bool _wait_for_all {false };
639644
640- std::vector<std::unique_ptr<Worker>> _workers;
641645
642- // TODO: can we just use some hacky method to replace atomic
643- // or make it relaxed
644- std::atomic<size_t> _next_queue {0};
646+ std::optional<size_t > _nonempty_worker_queue () const ;
645647
646- std::optional<size_t> _nonempty_queue() const;
647-
648- std::vector<size_t> _coprimes;
649648 void _xorshift32 (uint32_t &);
650649 bool _steal (TaskType&, uint32_t &);
651650
652- std::atomic<bool> _allow_steal {true};
651+
653652
654653}; // class BasicPrivatizedThreadpool. --------------------------------------
655654
656655
657- // Function: _nonempty_queue
656+ // Function: _nonempty_worker_queue
658657template < template <typename ...> class Func >
659- std::optional<size_t> BasicPrivatizedThreadpool<Func>::_nonempty_queue () const {
658+ std::optional<size_t > BasicPrivatizedThreadpool<Func>::_nonempty_worker_queue () const {
660659 for (size_t i=0 ;i <_workers.size (); ++i){
661660 if (!_workers[i]->queue .empty ()){
662661 return i;
@@ -742,7 +741,7 @@ void BasicPrivatizedThreadpool<Func>::shutdown(){
742741 std::unique_lock<std::mutex> lock (_mutex);
743742 // If all workers are idle && all queues are empty, then master
744743 // can directly wake up workers without waiting for notified
745- if(_idle_workers != num_workers() || _nonempty_queue ().has_value()){
744+ if (_idle_workers != num_workers () || _nonempty_worker_queue ().has_value ()){
746745 _wait_for_all = true ;
747746
748747 // Wake up all workers in case their queues are not empty
@@ -757,7 +756,6 @@ void BasicPrivatizedThreadpool<Func>::shutdown(){
757756
758757 // Notify workers to exit
759758 for (auto & w : _workers){
760- // TODO: can we replace this dummy task with state?
761759 w->state = Worker::EXIT;
762760 w->cv .notify_one ();
763761 }
@@ -770,7 +768,7 @@ void BasicPrivatizedThreadpool<Func>::shutdown(){
770768 _threads.clear ();
771769
772770 _workers.clear ();
773- _worker_map .clear();
771+ _worker_maps .clear ();
774772}
775773
776774// Function: spawn
@@ -791,8 +789,8 @@ void BasicPrivatizedThreadpool<Func>::spawn(unsigned N) {
791789
792790 const size_t sz = _threads.size ();
793791
794- // Lock to synchronize all workers before creating _worker_maps
795- std::scoped_lock<std::mutex> lock(_mutex);
792+ // Lock to synchronize all workers before creating _worker_mapss
793+ std::scoped_lock lock (_mutex);
796794
797795 _coprimes.clear ();
798796 for (size_t i=1 ; i<=sz+N; i++){
@@ -811,24 +809,20 @@ void BasicPrivatizedThreadpool<Func>::spawn(unsigned N) {
811809 TaskType t {nullptr };
812810 Worker& w = *(_workers[i]);
813811 uint32_t seed = i+1 ;
814- std::unique_lock<std::mutex> lock(_mutex, std::defer_lock);
812+ std::unique_lock lock (_mutex, std::defer_lock);
815813
816814 while (w.state != Worker::EXIT){
817- //// TODO: assume exiting is atomic variable and defer lock
818815 if (!w.queue .pop_front (t)) {
819816 if (!_allow_steal.load (std::memory_order_relaxed) || !_steal (t, seed)) {
820817 lock.lock ();
821818 if (!_task_queue.empty ()) {
822819 t = std::move (_task_queue.front ());
823820 _task_queue.pop_front ();
824821 }
825- // ... as follows...
826822 else {
827- //if(++_idle_workers == num_workers() && _wait_for_all) {
828823 if (++_idle_workers == num_workers ()){
829824 // Last active thread checks if all queues are empty
830- // TODO: optional
831- if(auto ret = _nonempty_queue(); ret.has_value()){
825+ if (auto ret = _nonempty_worker_queue (); ret.has_value ()){
832826 // if the nonempty queue is mine, continue to process tasks in queue
833827 if (*ret == i){
834828 --_idle_workers;
@@ -839,17 +833,16 @@ void BasicPrivatizedThreadpool<Func>::spawn(unsigned N) {
839833 _workers[*ret]->cv .notify_one ();
840834 }
841835 else {
842- // TODO: here only one thread will do so
836+ // here only one thread will do so
843837 // if all workers are idle && all queues are empty && master is waiting
844838 // notify the master by last thread
845839 if (_wait_for_all){
846840 _wait_for_all = false ;
847841 _empty_cv.notify_one ();
848842 }
849843 }
850- } // End of ++_idle_workers
844+ }
851845
852- w.state = Worker::ALIVE;
853846 while (w.state == Worker::ALIVE && w.queue .empty ()){
854847 w.cv .wait (lock);
855848 }
@@ -866,7 +859,7 @@ void BasicPrivatizedThreadpool<Func>::spawn(unsigned N) {
866859 } // End of while ------------------------------------------------------
867860 });
868861
869- _worker_map .insert({_threads.back().get_id(), i+sz});
862+ _worker_maps .insert ({_threads.back ().get_id (), i+sz});
870863 } // End of For ---------------------------------------------------------------------------------
871864 _allow_steal = true ;
872865}
@@ -926,7 +919,7 @@ void BasicPrivatizedThreadpool<Func>::silent_async(C&& c){
926919 }
927920
928921 if (auto tid = std::this_thread::get_id (); tid != _owner){
929- if(auto itr = _worker_map .find(tid); itr != _worker_map .end()){
922+ if (auto itr = _worker_maps .find (tid); itr != _worker_maps .end ()){
930923 if (!_workers[itr->second ]->queue .push_front (t)){
931924 std::scoped_lock<std::mutex> lock (_mutex);
932925 _task_queue.push_back (std::move (t));
@@ -936,15 +929,15 @@ void BasicPrivatizedThreadpool<Func>::silent_async(C&& c){
936929 }
937930
938931 // owner thread or other threads
939- // TODO: use random for load balancing?
940- auto id = (_next_queue.fetch_add(1, std::memory_order_relaxed)+1)%_workers.size();
932+ auto id = (++_next_queue) % _workers. size ();
933+
941934 if (!_workers[id]->queue .push_back (t)){
942- std::scoped_lock<std::mutex> lock(_mutex);
935+ std::scoped_lock lock (_mutex);
943936 _task_queue.push_back (std::move (t));
944937 }
945938 else {
946939 // Lock to make sure the worker will be notified
947- std::scoped_lock<std::mutex> lock(_mutex);
940+ std::scoped_lock lock (_mutex);
948941 }
949942 _workers[id]->cv .notify_one ();
950943}
@@ -958,12 +951,14 @@ void BasicPrivatizedThreadpool<Func>::wait_for_all() {
958951 throw std::runtime_error (" Worker thread cannot wait for all" );
959952 }
960953
961- if(num_workers() == 0) return ;
954+ if (num_workers () == 0 ) {
955+ return ;
956+ }
962957
963- std::unique_lock<std::mutex> lock(_mutex);
958+ std::unique_lock lock (_mutex);
964959 // If all workers are idle && all queues are empty,
965960 // then wait_for_all is done.
966- if(_idle_workers == num_workers() && !_nonempty_queue().has_value ()){
961+ if (_idle_workers == num_workers () && !_nonempty_worker_queue ()){
967962 return ;
968963 }
969964
@@ -978,8 +973,6 @@ void BasicPrivatizedThreadpool<Func>::wait_for_all() {
978973 }
979974}
980975
981- */
982-
983976
984977}; // namespace tf -----------------------------------------------------------
985978
0 commit comments