Skip to content

Commit eb4448b

Browse files
updated speculative threadpool
1 parent 319b727 commit eb4448b

6 files changed

Lines changed: 71 additions & 51 deletions

File tree

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ target_link_libraries(threadpool_test_tmp Threads::Threads)
145145
set_target_properties(threadpool_test_tmp PROPERTIES OUTPUT_NAME "threadpool")
146146
add_test(simple_threadpool ${TF_UTEST_DIR}/threadpool -tc=SimpleThreadpool)
147147
add_test(proactive_threadpool ${TF_UTEST_DIR}/threadpool -tc=ProactiveThreadpool)
148+
add_test(speculative_threadpool ${TF_UTEST_DIR}/threadpool -tc=SpeculativeThreadpool)
148149

149150
## threadpool_cxx14 unittest (contributed by Glen Fraser)
150151
add_executable(threadpool_cxx14_tmp unittest/threadpool_cxx14.cpp)

example/threadpool.cpp

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,14 @@ auto linear_insertions() {
5757
void benchmark_linear_insertions() {
5858

5959
std::cout << "==== Linear Insertions ====\n";
60+
61+
std::cout << "Speculative threadpool elapsed time: "
62+
<< linear_insertions<tf::SpeculativeThreadpool>() << " ms\n";
6063

61-
std::cout << "Proactive threadpool takes: "
64+
std::cout << "Proactive threadpool elapsed time: "
6265
<< linear_insertions<tf::ProactiveThreadpool>() << " ms\n";
6366

64-
std::cout << "Simple threadpool takes: "
67+
std::cout << "Simple threadpool elapsed time: "
6568
<< linear_insertions<tf::SimpleThreadpool>() << " ms\n";
6669
}
6770

@@ -93,11 +96,14 @@ auto empty_jobs() {
9396
void benchmark_empty_jobs() {
9497

9598
std::cout << "==== Empty Jobs ====\n";
99+
100+
std::cout << "Speculative threadpool elapsed time: "
101+
<< empty_jobs<tf::SpeculativeThreadpool>() << " ms\n";
96102

97-
std::cout << "Proactive threadpool takes: "
103+
std::cout << "Proactive threadpool elapsed time: "
98104
<< empty_jobs<tf::ProactiveThreadpool>() << " ms\n";
99105

100-
std::cout << "Simple threadpool takes: "
106+
std::cout << "Simple threadpool elapsed time: "
101107
<< empty_jobs<tf::SimpleThreadpool>() << " ms\n";
102108
}
103109

@@ -129,11 +135,14 @@ auto atomic_add() {
129135
void benchmark_atomic_add() {
130136

131137
std::cout << "==== Atomic Add ====\n";
138+
139+
std::cout << "Speculative threadpool elapsed time: "
140+
<< atomic_add<tf::SpeculativeThreadpool>() << " ms\n";
132141

133-
std::cout << "Proactive threadpool takes: "
142+
std::cout << "Proactive threadpool elapsed time: "
134143
<< atomic_add<tf::ProactiveThreadpool>() << " ms\n";
135144

136-
std::cout << "Simple threadpool takes: "
145+
std::cout << "Simple threadpool elapsed time: "
137146
<< atomic_add<tf::SimpleThreadpool>() << " ms\n";
138147
}
139148

taskflow/threadpool/proactive_threadpool.hpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,6 @@ auto BasicProactiveThreadpool<TaskType>::async(C&& c) {
297297
_task_queue.emplace_back(
298298
[p = MoC(std::move(p)), c = std::forward<C>(c)]() mutable {
299299
p.get().set_value(c());
300-
return;
301300
}
302301
);
303302
}
@@ -307,7 +306,6 @@ auto BasicProactiveThreadpool<TaskType>::async(C&& c) {
307306
w->ready = true;
308307
w->task = [p = MoC(std::move(p)), c = std::forward<C>(c)]() mutable {
309308
p.get().set_value(c());
310-
return;
311309
};
312310
w->cv.notify_one();
313311
}

taskflow/threadpool/speculative_threadpool.hpp

Lines changed: 21 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -74,19 +74,17 @@ class BasicSpeculativeThreadpool {
7474
std::deque<TaskType> _task_queue;
7575
std::vector<std::thread> _threads;
7676
std::vector<Worker*> _idlers;
77-
std::unordered_set<std::thread::id> _worker_ids;
7877
std::unordered_map<std::thread::id, Worker*> _worker_local;
7978

80-
// TODO
8179
const std::thread::id _owner {std::this_thread::get_id()};
8280

8381
bool _exiting {false};
8482
bool _wait_for_all {false};
8583

86-
typename std::unordered_map<std::thread::id, Worker*>::iterator
87-
_lookahead(){
84+
auto _lookahead(){
85+
auto id = std::this_thread::get_id();
8886
std::scoped_lock<std::mutex> lock(_mutex);
89-
return _worker_local.find(std::this_thread::get_id());
87+
return _worker_local.find(id);
9088
}
9189

9290
}; // class BasicSpeculativeThreadpool. --------------------------------------
@@ -156,7 +154,8 @@ void BasicSpeculativeThreadpool<TaskType>::shutdown(){
156154

157155
_wait_for_all = false;
158156
_exiting = false;
159-
// task queue might have tasks that are added dynamically...
157+
158+
// task queue might have tasks added by threads outside this pool...
160159
//while(not _task_queue.empty()) {
161160
// std::invoke(_task_queue.front());
162161
// _task_queue.pop_front();
@@ -174,17 +173,15 @@ void BasicSpeculativeThreadpool<TaskType>::spawn(unsigned N) {
174173

175174
for(size_t i=0; i<N; ++i){
176175
_threads.emplace_back([this]()->void{
176+
177177
Worker w;
178178
TaskType t;
179-
180-
{
181-
std::scoped_lock<std::mutex> lock(_mutex);
182-
// TODO
183-
//_worker_ids.insert(std::this_thread::get_id());
184-
_worker_local.insert({std::this_thread::get_id(), &w});
185-
}
179+
auto id = std::this_thread::get_id();
186180

187181
std::unique_lock<std::mutex> lock(_mutex);
182+
183+
_worker_local.insert({id, &w});
184+
188185
while(!_exiting){
189186
if(_task_queue.empty()){
190187
w.ready = false;
@@ -198,21 +195,16 @@ void BasicSpeculativeThreadpool<TaskType>::spawn(unsigned N) {
198195
w.cv.wait(lock);
199196
}
200197

201-
// TODO?
202198
t = std::move(w.task);
203199
}
204200
else{
205201
t = std::move(_task_queue.front());
206202
_task_queue.pop_front();
207-
208-
//if(_task_queue.empty() && _wait_for_all) {
209-
// _empty_cv.notify_one();
210-
//}
211203
}
212204

213205
if(t){
214206
_mutex.unlock();
215-
// TODO:
207+
// speculation loop
216208
while(t) {
217209
t();
218210
t = std::move(w.task);
@@ -222,7 +214,6 @@ void BasicSpeculativeThreadpool<TaskType>::spawn(unsigned N) {
222214
} // End of while --------------------------------------------------------------------------
223215
});
224216

225-
_worker_ids.insert(_threads.back().get_id());
226217
} // End of For ---------------------------------------------------------------------------------
227218
}
228219

@@ -249,19 +240,18 @@ auto BasicSpeculativeThreadpool<TaskType>::async(C&& c){
249240
}
250241
// have worker(s)
251242
else{
252-
//std::scoped_lock<std::mutex> lock(_mutex);
253243
if constexpr(std::is_same_v<void, R>){
254-
// all workers are busy.
255-
244+
245+
// speculation
256246
if(std::this_thread::get_id() != _owner){
257247
auto iter = _lookahead();
258248
if(iter != _worker_local.end() and iter->second->task == nullptr){
259-
iter->second->task = std::move(
249+
iter->second->task =
260250
[p = MoC(std::move(p)), c = std::forward<C>(c)]() mutable {
261251
c();
262252
p.get().set_value();
263-
});
264-
return ;
253+
};
254+
return fu;
265255
}
266256
}
267257

@@ -288,14 +278,15 @@ auto BasicSpeculativeThreadpool<TaskType>::async(C&& c){
288278
}
289279
else{
290280

281+
// speculation
291282
if(std::this_thread::get_id() != _owner){
292283
auto iter = _lookahead();
293284
if(iter != _worker_local.end() and iter->second->task == nullptr){
294-
iter->second->task = std::move(
285+
iter->second->task =
295286
[p = MoC(std::move(p)), c = std::forward<C>(c)]() mutable {
296287
p.get().set_value(c());
297-
});
298-
return ;
288+
};
289+
return fu;
299290
}
300291
}
301292

@@ -304,7 +295,6 @@ auto BasicSpeculativeThreadpool<TaskType>::async(C&& c){
304295
_task_queue.emplace_back(
305296
[p = MoC(std::move(p)), c = std::forward<C>(c)]() mutable {
306297
p.get().set_value(c());
307-
return;
308298
}
309299
);
310300
}
@@ -314,7 +304,6 @@ auto BasicSpeculativeThreadpool<TaskType>::async(C&& c){
314304
w->ready = true;
315305
w->task = [p = MoC(std::move(p)), c = std::forward<C>(c)]() mutable {
316306
p.get().set_value(c());
317-
return;
318307
};
319308
w->cv.notify_one();
320309
}
@@ -336,6 +325,7 @@ void BasicSpeculativeThreadpool<TaskType>::silent_async(C&& c){
336325
return;
337326
}
338327

328+
// speculation
339329
if(std::this_thread::get_id() != _owner){
340330
auto iter = _lookahead();
341331
if(iter != _worker_local.end() and iter->second->task == nullptr){
@@ -344,16 +334,6 @@ void BasicSpeculativeThreadpool<TaskType>::silent_async(C&& c){
344334
}
345335
}
346336

347-
// TODO
348-
//std::scoped_lock<std::mutex> lock(_mutex);
349-
//if(auto iter = _worker_local.find(std::this_thread::get_id());
350-
// iter != _worker_local.end()){
351-
// if(iter->second->task == nullptr){
352-
// iter->second->task = std::move(t);
353-
// return ;
354-
// }
355-
//}
356-
357337
std::scoped_lock<std::mutex> lock(_mutex);
358338
if(_idlers.empty()){
359339
_task_queue.push_back(std::move(t));

taskflow/threadpool/threadpool.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,6 @@
1919

2020
#include "simple_threadpool.hpp"
2121
#include "proactive_threadpool.hpp"
22+
#include "speculative_threadpool.hpp"
2223

2324

unittest/threadpool.cpp

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,15 @@ template <typename T>
9393
void test_dynamic_tasking(T& threadpool) {
9494

9595
std::atomic<size_t> sum {0};
96+
std::atomic<size_t> cnt {0};
9697

9798
std::function<void(int)> insert;
9899
std::promise<int> promise;
99100
auto future = promise.get_future();
100101

101-
insert = [&threadpool, &insert, &sum, &promise] (int i) {
102+
insert = [&threadpool, &insert, &sum, &promise, &cnt] (int i) {
102103
if(i > 0) {
104+
++cnt;
103105
threadpool.silent_async([i=i-1, &insert] () {
104106
insert(i);
105107
});
@@ -122,7 +124,8 @@ void test_dynamic_tasking(T& threadpool) {
122124

123125
// synchronize until all tasks finish
124126
threadpool.wait_for_all();
125-
127+
128+
REQUIRE(cnt == 100 * threadpool.num_workers());
126129
REQUIRE(future.get() == 1);
127130
REQUIRE(sum == threadpool.num_workers());
128131
}
@@ -180,5 +183,33 @@ TEST_CASE("ProactiveThreadpool" * doctest::timeout(300)) {
180183
}
181184
}
182185

186+
// --------------------------------------------------------
187+
// Testcase: SpeculativeThreadpool
188+
// --------------------------------------------------------
189+
TEST_CASE("SpeculativeThreadpool" * doctest::timeout(300)) {
183190

191+
const size_t task_num = 100;
192+
193+
SUBCASE("PlaceTask"){
194+
for(unsigned i=0; i<=4; ++i) {
195+
tf::SpeculativeThreadpool tp(i);
196+
test_async(tp, task_num);
197+
test_silent_async(tp, task_num);
198+
}
199+
}
200+
201+
SUBCASE("WaitForAll"){
202+
for(unsigned i=0; i<=4; ++i) {
203+
tf::SpeculativeThreadpool tp(i);
204+
test_wait_for_all(tp);
205+
}
206+
}
207+
208+
SUBCASE("DynamicTasking") {
209+
for(unsigned i=0; i<=4; ++i) {
210+
tf::SpeculativeThreadpool tp(i);
211+
test_dynamic_tasking(tp);
212+
}
213+
}
214+
}
184215

0 commit comments

Comments
 (0)