Skip to content

Commit 18eba9f

Browse files
refactored privatized_threadpool
1 parent 87ce7e4 commit 18eba9f

4 files changed

Lines changed: 54 additions & 46 deletions

File tree

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ add_test(WorkerQueue.TriThread ${TF_UTEST_DIR}/threadpool -tc=WorkerQueue.TriTh
141141
add_test(simple_threadpool ${TF_UTEST_DIR}/threadpool -tc=SimpleThreadpool)
142142
add_test(proactive_threadpool ${TF_UTEST_DIR}/threadpool -tc=ProactiveThreadpool)
143143
add_test(speculative_threadpool ${TF_UTEST_DIR}/threadpool -tc=SpeculativeThreadpool)
144-
add_test(privatized_threadpool ${TF_UTEST_DIR}/threadpool -tc=PrivatizedThreadpool)
144+
add_test(privatized_threadpool ${TF_UTEST_DIR}/threadpool -tc=PrivatizedThreadpool)
145145

146146
## threadpool_cxx14 unittest (contributed by Glen Fraser)
147147
add_executable(threadpool_cxx14_tmp unittest/threadpool_cxx14.cpp)

doc/faq.md

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,16 @@ If you cannot find a solution here, please post an issue [here][Github issues].
99

1010
---
1111

12-
## General Questions
12+
# General Questions
1313

14-
### Q: How do I use Cpp-Taskflow in my projects?
14+
## Q: How do I use Cpp-Taskflow in my projects?
1515

1616
Cpp-Taskflow is a header-only library with zero dependencies.
1717
The only thing you need is a [C++17][C++17] compiler.
1818
To use Cpp-Taskflow, simply drop the folder
1919
[taskflow](../taskflow) to your project and include [taskflow.hpp](../taskflow/taskflow.hpp).
2020

21-
### Q: What is the difference between static tasking and dynamic tasking?
21+
## Q: What is the difference between static tasking and dynamic tasking?
2222

2323
Static tasking refers to those tasks created before execution,
2424
while dynamic tasking refers to those tasks created during the execution of static tasks
@@ -30,21 +30,21 @@ Dynamic tasks created by the same task node are grouped together to a subflow.
3030
| ![](../image/static_graph.png) | ![](../image/dynamic_graph.png) |
3131

3232

33-
### Q: How many tasks can Cpp-Taskflow handle?
33+
## Q: How many tasks can Cpp-Taskflow handle?
3434

3535
Cpp-Taskflow is a very lightweight and efficient tasking library.
3636
It has been applied in many academic and industry projects to scale up their existing workload.
3737
A research project [OpenTimer][OpenTimer] has used Cpp-Taskflow to deal with hundreds of millions of tasks.
3838

39-
### Q: What are the differences between Cpp-Taskflow and other tasking libraries?
39+
## Q: What are the differences between Cpp-Taskflow and other tasking libraries?
4040

4141
From our humble opinion, Cpp-Taskflow is superior in its tasking API, interface, and performance.
4242
In most cases, users can quickly master Cpp-Taskflow to create large and complex dependency graphs
4343
in just a few minutes.
4444
The performance scales very well and is comparable to hard-coded multi-threading.
4545
Of course, the judge is always left for users -:)
4646

47-
### Q: What is the weird hex value, like 0x7fc39d402ab0, in the dumped graph?
47+
## Q: What is the weird hex value, like 0x7fc39d402ab0, in the dumped graph?
4848

4949
Each task has a method `name(const std::string&)` for user to assign a human readable string
5050
to ease the debugging process.
@@ -53,14 +53,14 @@ its address value in the memory is used instead.
5353

5454
---
5555

56-
## Compilation Issues
56+
# Compilation Issues
5757

58-
### Q: I can't get Cpp-Taskflow compiled in my project!
58+
## Q: I can't get Cpp-Taskflow compiled in my project!
5959

6060
Please make sure your compile supports the latest version of [C++17][C++17].
6161
Make sure your project meets the System Requirements described at [README][README].
6262

63-
### Q: Clang can't compile due to the use of std::variant.
63+
## Q: Clang can't compile due to the use of std::variant.
6464

6565
Cpp-Taskflow uses `std::variant` to enable uniform interface between static and dynamic tasking.
6666
However it has been reported in
@@ -81,9 +81,9 @@ For clang users, you will need to use this patch in `taskflow.hpp` as follows:
8181

8282
---
8383

84-
## Programming Questions
84+
# Programming Questions
8585

86-
### Q: What is the difference between Cpp-Taskflow threads and workers?
86+
## Q: What is the difference between Cpp-Taskflow threads and workers?
8787

8888
The master thread owns the thread pool and can spawn workers to run tasks
8989
or shutdown the pool.
@@ -96,18 +96,19 @@ tf::Taskflow(N); // N workers, N+1 threads in the program.
9696
9797
If there is no worker threads in the pool, the master thread will do all the works by itself.
9898
99-
### Q: Is taskflow thread-safe?
99+
## Q: Is taskflow thread-safe?
100+
100101
No, the taskflow object is not thread-safe. You can't create tasks from multiple threads
101102
at the same time.
102103
103-
### Q: My program hangs and never returns after dispatching a taskflow graph. What's wrong?
104+
## Q: My program hangs and never returns after dispatching a taskflow graph. What's wrong?
104105
105106
When the program hangs forever it is very likely your taskflow graph has a cycle.
106107
Try the `dump` method to debug the graph before dispatching your taskflow graph.
107108
If there is no cycle, make sure you are using `future.get()` in the right way,
108109
i.e., not blocking your control flow.
109110
110-
### Q: In the following example where B spawns a joined subflow of two tasks B1 and B2, do they run concurrently with task A?
111+
## Q: In the following example where B spawns a joined subflow of two tasks B1 and B2, do they run concurrently with task A?
111112
112113
<p>
113114
<img src="../image/dynamic_graph.png" width="60%">
@@ -119,7 +120,7 @@ This graph may looks strange because B seems to run twice!
119120
However, Cpp-Taskflow will schedule B only once to create its subflow.
120121
Whether this subflow joins or detaches from B only affects the future object returned from B.
121122
122-
### Q: How can I parallelize multiple runs on the same function with different arguments?
123+
## Q: How can I parallelize multiple runs on the same function with different arguments?
123124
124125
Many people have been asking how to apply Taskflow's `parallel_for` method
125126
to parallelize a sequential loop over an index sequence.
@@ -142,7 +143,7 @@ for(int i=0; i<N; ++i) {
142143
tf.wait_for_all();
143144
```
144145
145-
### Q: What is a Task?
146+
## Q: What is a Task?
146147
147148
A `Task` is a lightweight handle associated with an internal graph node
148149
for users to build task dependencies.

taskflow/taskflow.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1095,6 +1095,7 @@ class Taskflow : public FlowBuilder {
10951095
//SimpleThreadpool<Closure> _executor;
10961096
//ProactiveThreadpool<Closure> _executor;
10971097
SpeculativeThreadpool<Closure> _executor;
1098+
//PrivatizedThreadpool<Closure> _executor;
10981099

10991100
Graph _graph;
11001101

taskflow/threadpool/privatized_threadpool.hpp

Lines changed: 35 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
1+
// 2018/10/05 - modified by Chun-Xun
2+
// - adopted the new threadpool model
3+
//
14
// 2018/09/21 - modified by Tsung-Wei and Chun-Xun
25
// - refactored the code
36
//
4-
// TODO:
5-
// - Problems can occur when external threads insert tasks during spawn.
6-
//
77
// 2018/09/12 - created by Tsung-Wei Huang and Chun-Xun Lin
88
//
99
// Implemented PrivatizedThreadpool using the data structre inspired
1010
// Eigen CXX/Threadpool.
1111

12+
// TODO
13+
// - double check whether we can use std::forward<Args>(args)... in enqueue/dequeue
14+
// - can we replace lock with CAS
15+
// - refactored the WorkQueue class ...
16+
// - add more example to threadpool and use std::future to mimic the control flow
17+
// - atomic add problem (extremely slow)
18+
1219
#pragma once
1320

1421
#include <iostream>
@@ -30,10 +37,13 @@ namespace tf {
3037

3138
// ----------------------------------------------------------------------------
3239
// Privatized queue of worker. The lock-free queue is inspired by
33-
// http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
40+
// http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
3441
template<typename T, size_t buffer_size>
3542
class PrivatizedTaskQueue {
36-
public:
43+
44+
public:
45+
46+
// TODO replaced the new with std::array
3747
PrivatizedTaskQueue()
3848
: buffer_(new cell_t [buffer_size])
3949
, buffer_mask_(buffer_size - 1)
@@ -106,7 +116,8 @@ class PrivatizedTaskQueue {
106116
dequeue_pos_.load(std::memory_order_relaxed);
107117
};
108118

109-
private:
119+
private:
120+
110121
struct cell_t {
111122
std::atomic<size_t> sequence_;
112123
T data_;
@@ -123,12 +134,10 @@ class PrivatizedTaskQueue {
123134
cacheline_pad_t pad2_;
124135
std::atomic<size_t> dequeue_pos_;
125136
cacheline_pad_t pad3_;
126-
127-
PrivatizedTaskQueue(PrivatizedTaskQueue const&);
128-
void operator = (PrivatizedTaskQueue const&);
129137
};
130138

131139

140+
// ----------------------------------------------------------------------------
132141

133142
// Class: PrivatizedThreadpool
134143
template <typename Task>
@@ -159,13 +168,12 @@ class PrivatizedThreadpool {
159168
const std::thread::id _owner {std::this_thread::get_id()};
160169

161170
mutable std::mutex _mutex;
162-
std::condition_variable _empty_cv;
163171

164172
std::vector<Task> _tasks;
165173
std::vector<std::thread> _threads;
174+
std::vector<Worker> _workers;
166175

167176
std::unordered_map<std::thread::id, size_t> _worker_maps;
168-
std::vector<Worker> _workers;
169177

170178
size_t _num_idlers {0};
171179
size_t _next_queue {0};
@@ -254,22 +262,15 @@ size_t PrivatizedThreadpool<Task>::num_workers() const {
254262
template <typename Task>
255263
void PrivatizedThreadpool<Task>::_shutdown(){
256264

257-
if(!is_owner()){
258-
throw std::runtime_error("Worker thread cannot shut down the pool");
259-
}
260-
261-
if(_threads.empty()) {
262-
return;
263-
}
265+
assert(is_owner());
264266

265267
{
266-
std::scoped_lock<std::mutex> lock(_mutex);
267-
// Notify workers to exit
268+
std::scoped_lock lock(_mutex);
268269
for(auto& w : _workers){
269270
w.exit = true;
270271
w.cv.notify_one();
271272
}
272-
} // Release lock
273+
}
273274

274275
for(auto& t : _threads){
275276
t.join();
@@ -284,9 +285,7 @@ void PrivatizedThreadpool<Task>::_shutdown(){
284285
template <typename Task>
285286
void PrivatizedThreadpool<Task>::_spawn(unsigned N) {
286287

287-
if(! is_owner()){
288-
throw std::runtime_error("Worker thread cannot spawn threads");
289-
}
288+
assert(is_owner());
290289

291290
// Lock to synchronize all workers before creating _worker_mapss
292291
std::scoped_lock lock(_mutex);
@@ -329,10 +328,15 @@ void PrivatizedThreadpool<Task>::_spawn(unsigned N) {
329328
lock.unlock();
330329
} // End of pop_front
331330

332-
while(t){
331+
while(t) {
333332
(*t)();
334-
std::swap(t, w.cache);
335-
w.cache = std::nullopt;
333+
if(w.cache) {
334+
t = std::move(w.cache);
335+
w.cache = std::nullopt;
336+
}
337+
else {
338+
t = std::nullopt;
339+
}
336340
}
337341
} // End of while ------------------------------------------------------
338342
});
@@ -353,15 +357,17 @@ void PrivatizedThreadpool<Task>::emplace(ArgsT&&... args){
353357
}
354358

355359
Task t {std::forward<ArgsT>(args)...};
356-
360+
361+
// caller is not the owner
357362
if(auto tid = std::this_thread::get_id(); tid != _owner){
363+
// the caller is the worker of the threadpool
358364
if(auto itr = _worker_maps.find(tid); itr != _worker_maps.end()){
359365
if(!_workers[itr->second].cache.has_value()){
360366
_workers[itr->second].cache = std::move(t);
361367
return ;
362368
}
363369
if(!_workers[itr->second].queue.enqueue(t)){
364-
std::scoped_lock<std::mutex> lock(_mutex);
370+
std::scoped_lock lock(_mutex);
365371
_tasks.push_back(std::move(t));
366372
}
367373
return ;

0 commit comments

Comments
 (0)