Skip to content

Commit ee81511

Browse files
updated privatized_threadpool
1 parent dcf7506 commit ee81511

1 file changed

Lines changed: 52 additions & 59 deletions

File tree

taskflow/threadpool/privatized_threadpool.hpp

Lines changed: 52 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1+
// 2018/09/21 - modified by Tsung-Wei and Chun-Xun
2+
// - refactored the code
3+
//
4+
// TODO:
5+
// - Problems can occur when external threads insert tasks during spawn.
6+
//
17
// 2018/09/12 - created by Tsung-Wei Huang and Chun-Xun Lin
28
//
3-
// Speculative threadpool is similar to proactive threadpool except
4-
// each thread will speculatively move a new task to its local worker
5-
// data structure to reduce extract hit to the task queue.
6-
// This can save time from locking the mutex during dynamic tasking.
9+
// Implemented PrivatizedThreadpool using the data structre inspired
10+
// Eigen CXX/Threadpool.
711

812
#pragma once
913

@@ -198,7 +202,7 @@ bool RunQueue<T, N>::empty() const {
198202
}
199203

200204
// Class: BasicPrivatizedThreadpool
201-
template < template<typename...> class Func >
205+
/*template < template<typename...> class Func >
202206
class BasicPrivatizedThreadpool {
203207
204208
using TaskType = Func<void()>;
@@ -241,7 +245,7 @@ class BasicPrivatizedThreadpool {
241245
// TODO: do we need atomic variable here?
242246
std::atomic<size_t> _idle_workers {0};
243247
244-
std::unordered_map<std::thread::id, size_t> _worker_map;
248+
std::unordered_map<std::thread::id, size_t> _worker_maps;
245249
246250
const std::thread::id _owner {std::this_thread::get_id()};
247251
@@ -254,7 +258,7 @@ class BasicPrivatizedThreadpool {
254258
// or make it relaxed
255259
std::atomic<size_t> _next_queue {0};
256260
257-
size_t _nonempty_queue() const;
261+
size_t _nonempty_worker_queue() const;
258262
259263
bool _sync {false};
260264
@@ -265,9 +269,9 @@ class BasicPrivatizedThreadpool {
265269
}; // class BasicPrivatizedThreadpool. --------------------------------------
266270
267271
268-
// Function: _nonempty_queue
272+
// Function: _nonempty_worker_queue
269273
template < template<typename...> class Func >
270-
size_t BasicPrivatizedThreadpool<Func>::_nonempty_queue() const {
274+
size_t BasicPrivatizedThreadpool<Func>::_nonempty_worker_queue() const {
271275
for(size_t i=0;i <_works.size(); ++i){
272276
if(!_works[i]->queue.empty()){
273277
return i;
@@ -378,7 +382,7 @@ void BasicPrivatizedThreadpool<Func>::shutdown(){
378382
_threads.clear();
379383
380384
_works.clear();
381-
_worker_map.clear();
385+
_worker_maps.clear();
382386
383387
_wait_for_all = false;
384388
_exiting = false;
@@ -405,7 +409,7 @@ void BasicPrivatizedThreadpool<Func>::spawn(unsigned N) {
405409
406410
const size_t sz = _threads.size();
407411
408-
// Lock to synchronize all workers before creating _worker_maps
412+
// Lock to synchronize all workers before creating _worker_mapss
409413
std::scoped_lock<std::mutex> lock(_mutex);
410414
411415
_coprimes.clear();
@@ -440,7 +444,7 @@ void BasicPrivatizedThreadpool<Func>::spawn(unsigned N) {
440444
//while(!w.queue.pop_front(t) && _task_queue.empty()){
441445
if(++_idle_workers == num_workers() && _wait_for_all){
442446
// Last active thread checks if all queues are empty
443-
if(auto ret = _nonempty_queue(); ret == num_workers()){
447+
if(auto ret = _nonempty_worker_queue(); ret == num_workers()){
444448
// TODO: here only one thread will do so
445449
_sync = true;
446450
_empty_cv.notify_one();
@@ -468,7 +472,7 @@ void BasicPrivatizedThreadpool<Func>::spawn(unsigned N) {
468472
} // End of while ------------------------------------------------------
469473
});
470474
471-
_worker_map.insert({_threads.back().get_id(), i+sz});
475+
_worker_maps.insert({_threads.back().get_id(), i+sz});
472476
} // End of For ---------------------------------------------------------------------------------
473477
474478
}
@@ -529,8 +533,8 @@ void BasicPrivatizedThreadpool<Func>::silent_async(C&& c){
529533
530534
if(std::this_thread::get_id() != _owner){
531535
auto tid = std::this_thread::get_id();
532-
if(_worker_map.find(tid) != _worker_map.end()){
533-
if(!_works[_worker_map.at(tid)]->queue.push_front(t)){
536+
if(_worker_maps.find(tid) != _worker_maps.end()){
537+
if(!_works[_worker_maps.at(tid)]->queue.push_front(t)){
534538
std::scoped_lock<std::mutex> lock(_mutex);
535539
_task_queue.push_back(std::move(t));
536540
}
@@ -575,13 +579,10 @@ void BasicPrivatizedThreadpool<Func>::wait_for_all() {
575579
576580
_sync = false;
577581
_wait_for_all = false;
578-
}
582+
} */
579583

580584

581585

582-
/* A different implementation
583-
584-
585586
template < template<typename...> class Func >
586587
class BasicPrivatizedThreadpool {
587588

@@ -619,44 +620,42 @@ class BasicPrivatizedThreadpool {
619620
auto async(C&&);
620621

621622
private:
623+
624+
const std::thread::id _owner {std::this_thread::get_id()};
622625

623626
mutable std::mutex _mutex;
624627

625628
std::condition_variable _empty_cv;
626629

627630
std::deque<TaskType> _task_queue;
628631
std::vector<std::thread> _threads;
632+
std::vector<size_t> _coprimes;
633+
634+
std::unordered_map<std::thread::id, size_t> _worker_maps;
635+
std::vector<std::unique_ptr<Worker>> _workers;
629636

630637
// TODO: do we need atomic variable here?
631-
//std::atomic<size_t> _idle_workers {0};
632-
size_t _idle_workers {0};
638+
std::atomic<bool> _allow_steal {true};
633639

634-
std::unordered_map<std::thread::id, size_t> _worker_map;
640+
size_t _idle_workers {0};
641+
size_t _next_queue {0};
635642

636-
const std::thread::id _owner {std::this_thread::get_id()};
637-
638643
bool _wait_for_all {false};
639644

640-
std::vector<std::unique_ptr<Worker>> _workers;
641645

642-
// TODO: can we just use some hacky method to replace atomic
643-
// or make it relaxed
644-
std::atomic<size_t> _next_queue {0};
646+
std::optional<size_t> _nonempty_worker_queue() const;
645647

646-
std::optional<size_t> _nonempty_queue() const;
647-
648-
std::vector<size_t> _coprimes;
649648
void _xorshift32(uint32_t&);
650649
bool _steal(TaskType&, uint32_t&);
651650

652-
std::atomic<bool> _allow_steal {true};
651+
653652

654653
}; // class BasicPrivatizedThreadpool. --------------------------------------
655654

656655

657-
// Function: _nonempty_queue
656+
// Function: _nonempty_worker_queue
658657
template < template<typename...> class Func >
659-
std::optional<size_t> BasicPrivatizedThreadpool<Func>::_nonempty_queue() const {
658+
std::optional<size_t> BasicPrivatizedThreadpool<Func>::_nonempty_worker_queue() const {
660659
for(size_t i=0;i <_workers.size(); ++i){
661660
if(!_workers[i]->queue.empty()){
662661
return i;
@@ -742,7 +741,7 @@ void BasicPrivatizedThreadpool<Func>::shutdown(){
742741
std::unique_lock<std::mutex> lock(_mutex);
743742
// If all workers are idle && all queues are empty, then master
744743
// can directly wake up workers without waiting for notified
745-
if(_idle_workers != num_workers() || _nonempty_queue().has_value()){
744+
if(_idle_workers != num_workers() || _nonempty_worker_queue().has_value()){
746745
_wait_for_all = true;
747746

748747
// Wake up all workers in case their queues are not empty
@@ -757,7 +756,6 @@ void BasicPrivatizedThreadpool<Func>::shutdown(){
757756

758757
// Notify workers to exit
759758
for(auto& w : _workers){
760-
// TODO: can we replace this dummy task with state?
761759
w->state = Worker::EXIT;
762760
w->cv.notify_one();
763761
}
@@ -770,7 +768,7 @@ void BasicPrivatizedThreadpool<Func>::shutdown(){
770768
_threads.clear();
771769

772770
_workers.clear();
773-
_worker_map.clear();
771+
_worker_maps.clear();
774772
}
775773

776774
// Function: spawn
@@ -791,8 +789,8 @@ void BasicPrivatizedThreadpool<Func>::spawn(unsigned N) {
791789

792790
const size_t sz = _threads.size();
793791

794-
// Lock to synchronize all workers before creating _worker_maps
795-
std::scoped_lock<std::mutex> lock(_mutex);
792+
// Lock to synchronize all workers before creating _worker_mapss
793+
std::scoped_lock lock(_mutex);
796794

797795
_coprimes.clear();
798796
for(size_t i=1; i<=sz+N; i++){
@@ -811,24 +809,20 @@ void BasicPrivatizedThreadpool<Func>::spawn(unsigned N) {
811809
TaskType t {nullptr};
812810
Worker& w = *(_workers[i]);
813811
uint32_t seed = i+1;
814-
std::unique_lock<std::mutex> lock(_mutex, std::defer_lock);
812+
std::unique_lock lock(_mutex, std::defer_lock);
815813

816814
while(w.state != Worker::EXIT){
817-
//// TODO: assume exiting is atomic variable and defer lock
818815
if(!w.queue.pop_front(t)) {
819816
if(!_allow_steal.load(std::memory_order_relaxed) || !_steal(t, seed)) {
820817
lock.lock();
821818
if(!_task_queue.empty()) {
822819
t = std::move(_task_queue.front());
823820
_task_queue.pop_front();
824821
}
825-
// ... as follows...
826822
else{
827-
//if(++_idle_workers == num_workers() && _wait_for_all) {
828823
if(++_idle_workers == num_workers()){
829824
// Last active thread checks if all queues are empty
830-
// TODO: optional
831-
if(auto ret = _nonempty_queue(); ret.has_value()){
825+
if(auto ret = _nonempty_worker_queue(); ret.has_value()){
832826
// if the nonempty queue is mine, continue to process tasks in queue
833827
if(*ret == i){
834828
--_idle_workers;
@@ -839,17 +833,16 @@ void BasicPrivatizedThreadpool<Func>::spawn(unsigned N) {
839833
_workers[*ret]->cv.notify_one();
840834
}
841835
else{
842-
// TODO: here only one thread will do so
836+
// here only one thread will do so
843837
// if all workers are idle && all queues are empty && master is waiting
844838
// notify the master by last thread
845839
if(_wait_for_all){
846840
_wait_for_all = false;
847841
_empty_cv.notify_one();
848842
}
849843
}
850-
} // End of ++_idle_workers
844+
}
851845

852-
w.state = Worker::ALIVE;
853846
while(w.state == Worker::ALIVE && w.queue.empty()){
854847
w.cv.wait(lock);
855848
}
@@ -866,7 +859,7 @@ void BasicPrivatizedThreadpool<Func>::spawn(unsigned N) {
866859
} // End of while ------------------------------------------------------
867860
});
868861

869-
_worker_map.insert({_threads.back().get_id(), i+sz});
862+
_worker_maps.insert({_threads.back().get_id(), i+sz});
870863
} // End of For ---------------------------------------------------------------------------------
871864
_allow_steal = true;
872865
}
@@ -926,7 +919,7 @@ void BasicPrivatizedThreadpool<Func>::silent_async(C&& c){
926919
}
927920

928921
if(auto tid = std::this_thread::get_id(); tid != _owner){
929-
if(auto itr = _worker_map.find(tid); itr != _worker_map.end()){
922+
if(auto itr = _worker_maps.find(tid); itr != _worker_maps.end()){
930923
if(!_workers[itr->second]->queue.push_front(t)){
931924
std::scoped_lock<std::mutex> lock(_mutex);
932925
_task_queue.push_back(std::move(t));
@@ -936,15 +929,15 @@ void BasicPrivatizedThreadpool<Func>::silent_async(C&& c){
936929
}
937930

938931
// owner thread or other threads
939-
// TODO: use random for load balancing?
940-
auto id = (_next_queue.fetch_add(1, std::memory_order_relaxed)+1)%_workers.size();
932+
auto id = (++_next_queue) % _workers.size();
933+
941934
if(!_workers[id]->queue.push_back(t)){
942-
std::scoped_lock<std::mutex> lock(_mutex);
935+
std::scoped_lock lock(_mutex);
943936
_task_queue.push_back(std::move(t));
944937
}
945938
else{
946939
// Lock to make sure the worker will be notified
947-
std::scoped_lock<std::mutex> lock(_mutex);
940+
std::scoped_lock lock(_mutex);
948941
}
949942
_workers[id]->cv.notify_one();
950943
}
@@ -958,12 +951,14 @@ void BasicPrivatizedThreadpool<Func>::wait_for_all() {
958951
throw std::runtime_error("Worker thread cannot wait for all");
959952
}
960953

961-
if(num_workers() == 0) return ;
954+
if(num_workers() == 0) {
955+
return ;
956+
}
962957

963-
std::unique_lock<std::mutex> lock(_mutex);
958+
std::unique_lock lock(_mutex);
964959
// If all workers are idle && all queues are empty,
965960
// then wait_for_all is done.
966-
if(_idle_workers == num_workers() && !_nonempty_queue().has_value()){
961+
if(_idle_workers == num_workers() && !_nonempty_worker_queue()){
967962
return ;
968963
}
969964

@@ -978,8 +973,6 @@ void BasicPrivatizedThreadpool<Func>::wait_for_all() {
978973
}
979974
}
980975

981-
*/
982-
983976

984977
}; // namespace tf -----------------------------------------------------------
985978

0 commit comments

Comments
 (0)