@@ -247,7 +247,10 @@ class BasicPrivatizedThreadpool {
247247
248248 std::deque<TaskType> _task_queue;
249249 std::vector<std::thread> _threads;
250+
251+ // TODO: do we need atomic variable here?
250252 std::atomic<size_t > _idle_workers {0 };
253+
251254 std::unordered_map<std::thread::id, size_t > _worker_map;
252255
253256 const std::thread::id _owner {std::this_thread::get_id ()};
@@ -256,6 +259,9 @@ class BasicPrivatizedThreadpool {
256259 bool _wait_for_all {false };
257260
258261 std::vector<std::unique_ptr<Worker>> _works;
262+
263+ // TODO: can we just use some hacky method to replace atomic
264+ // or make it relaxed
259265 std::atomic<size_t > _next_queue {0 };
260266
261267 size_t _nonempty_queue () const ;
@@ -370,6 +376,7 @@ void BasicPrivatizedThreadpool<Func>::shutdown(){
370376 _exiting = true ;
371377
372378 for (auto & w : _works){
379+ // TODO: can we replace this dummy task with state?
373380 w->queue .push_back ([](){});
374381 w->cv .notify_one ();
375382 }
@@ -425,48 +432,109 @@ void BasicPrivatizedThreadpool<Func>::spawn(unsigned N) {
425432 for (size_t i=0 ; i<N; ++i){
426433 _threads.emplace_back ([this , i=i+sz]() -> void {
427434
428- TaskType t {nullptr };
429- Worker& w = *(_works[i]);
430- uint32_t dice = i+1 ;
431- std::unique_lock<std::mutex> lock (_mutex);
432-
433- while (!_exiting){
434- if (!w.queue .pop_front (t)){
435- if (_steal (t, dice)){}
436- else if (!_task_queue.empty ()) {
437- // if(!_task_queue.empty()){
438- t = std::move (_task_queue.front ());
439- _task_queue.pop_front ();
440- }
441- else {
442- while (!w.queue .pop_front (t) && _task_queue.empty ()){
443- if (++_idle_workers == num_workers () && _wait_for_all){
444- // Last active thread checks if all queues are empty
445- if (auto ret = _nonempty_queue (); ret == num_workers ()){
446- _sync = true ;
447- _empty_cv.notify_one ();
448- }
449- else {
450- if (ret == i){
451- -- _idle_workers;
452- continue ;
453- }
454- _works[ret]->cv .notify_one ();
455- }
456- }
457- w.cv .wait (lock);
458- -- _idle_workers;
459- }
460- }
461- } // End of first if
462-
463- if (t){
464- _mutex.unlock ();
465- t ();
466- t = nullptr ;
467- _mutex.lock ();
468- }
469- } // End of while ------------------------------------------------------
435+ TaskType t {nullptr };
436+ Worker& w = *(_works[i]);
437+ uint32_t dice = i+1 ;
438+ std::unique_lock<std::mutex> lock (_mutex);
439+
440+ while (!_exiting){
441+
442+ // // TODO: assume exisint is atomic variable and defer lock
443+ // if(!w.queue.pop_front(t)) {
444+ // if(!_steal(t, dice)) {
445+ // lock.lock();
446+ // // ... as follows...
447+ // lock.unlock();
448+ // }
449+ // }
450+
451+
452+ /* // TODO:
453+ lock.unlock();
454+
455+ // step 1: check my own queue
456+ if(!w.queue.pop_front(t)) {
457+ if(!_steal(t, dice)) {
458+ lock.lock();
459+ if(_task_queue.empty()) {
460+ // Idle worker does not imply its queue is empty.
461+ if(++_idle_workers == num_workers() && _wait_for_all) {
462+ // Last active thread checks if all queues are empty
463+ // TODO: optional
464+ if(auto ret = _nonempty_queue(); ret == num_workers()){
465+ // TODO: here only one thread will do so
466+ _sync = true;
467+ _empty_cv.notify_one();
468+ }
469+ else{
470+ // if the nonempty queue is myself
471+ if(ret == i){
472+ --_idle_workers;
473+ continue;
474+ }
475+ _works[ret]->cv.notify_one();
476+ }
477+ }
478+ w.cv.wait(lock);
479+ --_idle_workers;
480+ }
481+ else {
482+ t = std::move(_task_queue.front());
483+ _task_queue.pop_front();
484+ }
485+ lock.unlock();
486+ }
487+ }
488+
489+ if(t) {
490+ t();
491+ t = nullptr;
492+ }
493+ lock.lock();
494+
495+
496+ // end of TODO
497+ */
498+
499+
500+ if (!w.queue .pop_front (t)){
501+ if (_steal (t, dice)){}
502+ else if (!_task_queue.empty ()) {
503+ // if(!_task_queue.empty()){
504+ t = std::move (_task_queue.front ());
505+ _task_queue.pop_front ();
506+ }
507+ else {
508+ // TODO: do we need another while loop here?
509+ while (!w.queue .pop_front (t) && _task_queue.empty ()){
510+ if (++_idle_workers == num_workers () && _wait_for_all){
511+ // Last active thread checks if all queues are empty
512+ if (auto ret = _nonempty_queue (); ret == num_workers ()){
513+ // TODO: here only one thread will do so
514+ _sync = true ;
515+ _empty_cv.notify_one ();
516+ }
517+ else {
518+ if (ret == i){
519+ -- _idle_workers;
520+ continue ;
521+ }
522+ _works[ret]->cv .notify_one ();
523+ }
524+ }
525+ w.cv .wait (lock);
526+ --_idle_workers;
527+ }
528+ }
529+ } // End of first if
530+
531+ if (t){
532+ _mutex.unlock ();
533+ t ();
534+ t = nullptr ;
535+ _mutex.lock ();
536+ }
537+ } // End of while ------------------------------------------------------
470538 });
471539
472540 _worker_map.insert ({_threads.back ().get_id (), i+sz});
@@ -576,6 +644,8 @@ void BasicPrivatizedThreadpool<Func>::silent_async(C&& c){
576644 }
577645 }
578646
647+ // owner thread or other threads
648+ // TODO: use random for load balancing?
579649 auto id = (++_next_queue)%_works.size ();
580650 if (!_works[id]->queue .push_back (t)){
581651 std::scoped_lock<std::mutex> lock (_mutex);
@@ -603,7 +673,8 @@ void BasicPrivatizedThreadpool<Func>::wait_for_all() {
603673 for (const auto & w : _works){
604674 w->cv .notify_one ();
605675 }
606-
676+
677+ // TODO: can we use a single wait_for_all?
607678 while (!_sync){
608679 _empty_cv.wait (lock);
609680 }
0 commit comments