Skip to content

Commit f3ceb1f

Browse files
refactored privatized threadpool
1 parent cfc8359 commit f3ceb1f

4 files changed

Lines changed: 163 additions & 43 deletions

File tree

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ set_target_properties(threadpool_test_tmp PROPERTIES OUTPUT_NAME "threadpool")
148148
add_test(simple_threadpool ${TF_UTEST_DIR}/threadpool -tc=SimpleThreadpool)
149149
add_test(proactive_threadpool ${TF_UTEST_DIR}/threadpool -tc=ProactiveThreadpool)
150150
add_test(speculative_threadpool ${TF_UTEST_DIR}/threadpool -tc=SpeculativeThreadpool)
151+
add_test(privatized_threadpool ${TF_UTEST_DIR}/threadpool -tc=PrivatizedThreadpool)
151152

152153
## threadpool_cxx14 unittest (contributed by Glen Fraser)
153154
add_executable(threadpool_cxx14_tmp unittest/threadpool_cxx14.cpp)

taskflow/threadpool/privatized_threadpool.hpp

Lines changed: 114 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,10 @@ class BasicPrivatizedThreadpool {
247247

248248
std::deque<TaskType> _task_queue;
249249
std::vector<std::thread> _threads;
250+
251+
// TODO: do we need atomic variable here?
250252
std::atomic<size_t> _idle_workers {0};
253+
251254
std::unordered_map<std::thread::id, size_t> _worker_map;
252255

253256
const std::thread::id _owner {std::this_thread::get_id()};
@@ -256,6 +259,9 @@ class BasicPrivatizedThreadpool {
256259
bool _wait_for_all {false};
257260

258261
std::vector<std::unique_ptr<Worker>> _works;
262+
263+
// TODO: can we just use some hacky method to replace atomic
264+
// or make it relaxed
259265
std::atomic<size_t> _next_queue {0};
260266

261267
size_t _nonempty_queue() const;
@@ -370,6 +376,7 @@ void BasicPrivatizedThreadpool<Func>::shutdown(){
370376
_exiting = true;
371377

372378
for(auto& w : _works){
379+
// TODO: can we replace this dummy task with state?
373380
w->queue.push_back([](){});
374381
w->cv.notify_one();
375382
}
@@ -425,48 +432,109 @@ void BasicPrivatizedThreadpool<Func>::spawn(unsigned N) {
425432
for(size_t i=0; i<N; ++i){
426433
_threads.emplace_back([this, i=i+sz]() -> void {
427434

428-
TaskType t {nullptr};
429-
Worker& w = *(_works[i]);
430-
uint32_t dice = i+1;
431-
std::unique_lock<std::mutex> lock(_mutex);
432-
433-
while(!_exiting){
434-
if(!w.queue.pop_front(t)){
435-
if(_steal(t, dice)){}
436-
else if(!_task_queue.empty()) {
437-
//if(!_task_queue.empty()){
438-
t = std::move(_task_queue.front());
439-
_task_queue.pop_front();
440-
}
441-
else {
442-
while(!w.queue.pop_front(t) && _task_queue.empty()){
443-
if(++_idle_workers == num_workers() && _wait_for_all){
444-
// Last active thread checks if all queues are empty
445-
if(auto ret = _nonempty_queue(); ret == num_workers()){
446-
_sync = true;
447-
_empty_cv.notify_one();
448-
}
449-
else{
450-
if(ret == i){
451-
-- _idle_workers;
452-
continue;
453-
}
454-
_works[ret]->cv.notify_one();
455-
}
456-
}
457-
w.cv.wait(lock);
458-
-- _idle_workers;
459-
}
460-
}
461-
} // End of first if
462-
463-
if(t){
464-
_mutex.unlock();
465-
t();
466-
t = nullptr;
467-
_mutex.lock();
468-
}
469-
} // End of while ------------------------------------------------------
435+
TaskType t {nullptr};
436+
Worker& w = *(_works[i]);
437+
uint32_t dice = i+1;
438+
std::unique_lock<std::mutex> lock(_mutex);
439+
440+
while(!_exiting){
441+
442+
//// TODO: assume exisint is atomic variable and defer lock
443+
//if(!w.queue.pop_front(t)) {
444+
// if(!_steal(t, dice)) {
445+
// lock.lock();
446+
// // ... as follows...
447+
// lock.unlock();
448+
// }
449+
//}
450+
451+
452+
/*// TODO:
453+
lock.unlock();
454+
455+
// step 1: check my own queue
456+
if(!w.queue.pop_front(t)) {
457+
if(!_steal(t, dice)) {
458+
lock.lock();
459+
if(_task_queue.empty()) {
460+
// Idle worker does not imply its queue is empty.
461+
if(++_idle_workers == num_workers() && _wait_for_all) {
462+
// Last active thread checks if all queues are empty
463+
// TODO: optional
464+
if(auto ret = _nonempty_queue(); ret == num_workers()){
465+
// TODO: here only one thread will do so
466+
_sync = true;
467+
_empty_cv.notify_one();
468+
}
469+
else{
470+
// if the nonempty queue is myself
471+
if(ret == i){
472+
--_idle_workers;
473+
continue;
474+
}
475+
_works[ret]->cv.notify_one();
476+
}
477+
}
478+
w.cv.wait(lock);
479+
--_idle_workers;
480+
}
481+
else {
482+
t = std::move(_task_queue.front());
483+
_task_queue.pop_front();
484+
}
485+
lock.unlock();
486+
}
487+
}
488+
489+
if(t) {
490+
t();
491+
t = nullptr;
492+
}
493+
lock.lock();
494+
495+
496+
// end of TODO
497+
*/
498+
499+
500+
if(!w.queue.pop_front(t)){
501+
if(_steal(t, dice)){}
502+
else if(!_task_queue.empty()) {
503+
//if(!_task_queue.empty()){
504+
t = std::move(_task_queue.front());
505+
_task_queue.pop_front();
506+
}
507+
else {
508+
// TODO: do we need another while loop here?
509+
while(!w.queue.pop_front(t) && _task_queue.empty()){
510+
if(++_idle_workers == num_workers() && _wait_for_all){
511+
// Last active thread checks if all queues are empty
512+
if(auto ret = _nonempty_queue(); ret == num_workers()){
513+
// TODO: here only one thread will do so
514+
_sync = true;
515+
_empty_cv.notify_one();
516+
}
517+
else{
518+
if(ret == i){
519+
-- _idle_workers;
520+
continue;
521+
}
522+
_works[ret]->cv.notify_one();
523+
}
524+
}
525+
w.cv.wait(lock);
526+
--_idle_workers;
527+
}
528+
}
529+
} // End of first if
530+
531+
if(t){
532+
_mutex.unlock();
533+
t();
534+
t = nullptr;
535+
_mutex.lock();
536+
}
537+
} // End of while ------------------------------------------------------
470538
});
471539

472540
_worker_map.insert({_threads.back().get_id(), i+sz});
@@ -576,6 +644,8 @@ void BasicPrivatizedThreadpool<Func>::silent_async(C&& c){
576644
}
577645
}
578646

647+
// owner thread or other threads
648+
// TODO: use random for load balancing?
579649
auto id = (++_next_queue)%_works.size();
580650
if(!_works[id]->queue.push_back(t)){
581651
std::scoped_lock<std::mutex> lock(_mutex);
@@ -603,7 +673,8 @@ void BasicPrivatizedThreadpool<Func>::wait_for_all() {
603673
for(const auto& w : _works){
604674
w->cv.notify_one();
605675
}
606-
676+
677+
// TODO: can we use a single wait_for_all?
607678
while(!_sync){
608679
_empty_cv.wait(lock);
609680
}

taskflow/threadpool/threadpool.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020
#include "simple_threadpool.hpp"
2121
#include "proactive_threadpool.hpp"
2222
#include "speculative_threadpool.hpp"
23+
#include "privatized_threadpool.hpp"
2324

2425

2526
namespace tf {
2627

2728
using ProactiveThreadpool = proactive_threadpool::BasicProactiveThreadpool<std::function>;
2829
using SpeculativeThreadpool = speculative_threadpool::BasicSpeculativeThreadpool<std::function>;
30+
using PrivatizedThreadpool = privatized_threadpool::BasicPrivatizedThreadpool<std::function>;
2931

3032
}; // namespace tf. ----------------------------------------------------------
3133

unittest/threadpool.cpp

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,3 +355,49 @@ TEST_CASE("SpeculativeThreadpool" * doctest::timeout(300)) {
355355
}
356356
}
357357

358+
// --------------------------------------------------------
359+
// Testcase: PrivatizedThreadpool
360+
// --------------------------------------------------------
361+
TEST_CASE("PrivatizedThreadpool" * doctest::timeout(300)) {
362+
363+
const size_t num_tasks = 100;
364+
365+
SUBCASE("Ownership") {
366+
for(unsigned i=0; i<=4; ++i) {
367+
tf::PrivatizedThreadpool tp(i);
368+
test_ownership(tp);
369+
}
370+
}
371+
372+
SUBCASE("PlaceTask"){
373+
for(unsigned i=0; i<=4; ++i) {
374+
tf::PrivatizedThreadpool tp(i);
375+
test_async(tp, num_tasks);
376+
test_silent_async(tp, num_tasks);
377+
}
378+
}
379+
380+
SUBCASE("WaitForAll"){
381+
for(unsigned i=0; i<=4; ++i) {
382+
tf::PrivatizedThreadpool tp(i);
383+
test_wait_for_all(tp);
384+
}
385+
}
386+
387+
SUBCASE("SpawnShutdown") {
388+
for(unsigned i=0; i<=4; ++i) {
389+
tf::PrivatizedThreadpool tp(i);
390+
test_spawn_shutdown(tp);
391+
}
392+
}
393+
394+
SUBCASE("DynamicTasking") {
395+
for(unsigned i=0; i<=4; ++i) {
396+
tf::PrivatizedThreadpool tp(i);
397+
test_dynamic_tasking(tp);
398+
}
399+
}
400+
}
401+
402+
403+

0 commit comments

Comments
 (0)