Skip to content

Commit 11a3634

Browse files
updated workstealing
1 parent 037e33b commit 11a3634

2 files changed

Lines changed: 88 additions & 17 deletions

File tree

example/threadpool.cpp

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
// 2018/12/06 modified by Tsung-Wei Huang
2+
// - added nested insertions test
3+
//
14
// 2018/12/04 modified by Tsung-Wei Huang
25
// - replace privatized threadpool with work stealing threadpool
36
//
@@ -21,6 +24,7 @@
2124

2225
#include <taskflow/threadpool/threadpool.hpp>
2326
#include <chrono>
27+
#include <cmath>
2428
#include <random>
2529
#include <numeric>
2630
#include <climits>
@@ -214,6 +218,69 @@ auto atomic_add() {
214218
return std::chrono::duration_cast<std::chrono::milliseconds>(end - beg).count();
215219
}
216220

221+
// ============================================================================
222+
// skewed insertions
223+
// ============================================================================
224+
225+
// Function: nested_insertions
226+
template <typename T>
227+
auto nested_insertions() {
228+
229+
const int num_threads = std::thread::hardware_concurrency();
230+
const int num_tasks = 32;
231+
232+
auto beg = std::chrono::high_resolution_clock::now();
233+
234+
std::atomic<int64_t> counter(0);
235+
236+
std::promise<void> promise;
237+
auto future = promise.get_future();
238+
239+
auto increment = [&] () {
240+
int64_t sum = 0;
241+
for(int i=0; i<5; ++i) {
242+
sum = (sum + 1)*num_tasks;
243+
}
244+
if(++counter == sum) {
245+
promise.set_value();
246+
}
247+
};
248+
249+
T threadpool(num_threads);
250+
251+
threadpool.emplace([&] () {
252+
std::this_thread::sleep_for(std::chrono::microseconds(100));
253+
for(int i=0; i<num_tasks; ++i) {
254+
increment();
255+
threadpool.emplace([&] () {
256+
for(int i=0; i<num_tasks; ++i) {
257+
increment();
258+
threadpool.emplace([&] () {
259+
for(int i=0; i<num_tasks; ++i) {
260+
increment();
261+
threadpool.emplace([&] () {
262+
for(int i=0; i<num_tasks; ++i) {
263+
increment();
264+
threadpool.emplace([&] () {
265+
for(int i=0; i<num_tasks; ++i) {
266+
increment();
267+
}
268+
});
269+
}
270+
});
271+
}
272+
});
273+
}
274+
});
275+
}
276+
});
277+
278+
future.get();
279+
280+
auto end = std::chrono::high_resolution_clock::now();
281+
return std::chrono::duration_cast<std::chrono::milliseconds>(end - beg).count();
282+
}
283+
217284
// ============================================================================
218285
// batch insertion
219286
// ============================================================================
@@ -270,6 +337,7 @@ int main(int argc, char* argv[]) {
270337
BENCHMARK("Linear Insertions", linear_insertions);
271338
BENCHMARK("Divide and Conquer", subsum);
272339
BENCHMARK("Batch Insertions", batch_insertions);
340+
BENCHMARK("Nested Insertions", nested_insertions);
273341

274342
return 0;
275343
}

taskflow/threadpool/workstealing_threadpool.hpp

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
// 2018/12/06- modified by Tsung-Wei Huang
2+
// - refactored the code
3+
// - added load balancing strategy
4+
// - removed the storage alignment in WorkStealingQueue
5+
//
16
// 2018/12/03 - created by Tsung-Wei Huang
27
// - added WorkStealingQueue class
38

@@ -234,7 +239,6 @@ class WorkStealingThreadpool {
234239
bool ready {false};
235240
uint64_t seed;
236241
unsigned last_victim;
237-
unsigned last_partner;
238242
};
239243

240244
public:
@@ -255,7 +259,7 @@ class WorkStealingThreadpool {
255259
private:
256260

257261
const std::thread::id _owner {std::this_thread::get_id()};
258-
const int load_balancing_factor {4};
262+
const unsigned _load_balance_prob {61};
259263

260264
mutable std::mutex _mutex;
261265

@@ -340,7 +344,6 @@ void WorkStealingThreadpool<Closure>::_spawn(unsigned N) {
340344
std::optional<Closure> t;
341345
Worker& w = (_workers[i]);
342346
w.last_victim = (i + 1) % N;
343-
w.last_partner = w.last_victim;
344347
w.seed = i + 1;
345348

346349
std::unique_lock lock(_mutex, std::defer_lock);
@@ -414,19 +417,21 @@ void WorkStealingThreadpool<Closure>::_balance_load(unsigned me) {
414417
return;
415418
}
416419

417-
auto n = _workers[me].queue.size();
418-
auto p = _fast_modulo(_randomize(_workers[me].seed), n + 1);
419-
420-
// Load balancing with probability 1/(n+1)
421-
if(p == n) {
420+
//auto n = _workers[me].queue.size();
421+
auto p = _fast_modulo(_randomize(_workers[me].seed), _load_balance_prob);
422+
423+
// Load balancing
424+
if(p == 0) {
422425
// wake up my partner to help balance
423-
std::scoped_lock lock(_mutex);
424-
if(!_idlers.empty()) {
425-
Worker* w = _idlers.back();
426-
_idlers.pop_back();
427-
w->ready = true;
428-
w->cv.notify_one();
429-
w->last_victim = me;
426+
if(_mutex.try_lock()) {
427+
if(!_idlers.empty()) {
428+
Worker* w = _idlers.back();
429+
_idlers.pop_back();
430+
w->ready = true;
431+
w->cv.notify_one();
432+
w->last_victim = me;
433+
}
434+
_mutex.unlock();
430435
}
431436
}
432437
}
@@ -494,7 +499,6 @@ void WorkStealingThreadpool<Closure>::emplace(ArgsT&&... args){
494499
// bfs load balancing
495500
else {
496501
_workers[me].queue.push(Closure{std::forward<ArgsT>(args)...});
497-
//_balance_load(me);
498502
}
499503
return;
500504
}
@@ -546,7 +550,6 @@ void WorkStealingThreadpool<Closure>::batch(std::vector<Closure>&& tasks) {
546550

547551
for(; i<tasks.size(); ++i) {
548552
_workers[me].queue.push(std::move(tasks[i]));
549-
//_balance_load(me);
550553
}
551554

552555
return;

0 commit comments

Comments
 (0)