Skip to content

Commit cfabb78

Browse files
isolate simple_threadpool
1 parent cd4fb2f commit cfabb78

8 files changed

Lines changed: 437 additions & 399 deletions

File tree

CMakeLists.txt

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -118,30 +118,32 @@ enable_testing()
118118

119119
message(STATUS "Building unit tests ...")
120120
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${PROJECT_SOURCE_DIR}/unittest)
121+
set(TF_UTEST_DIR ${PROJECT_SOURCE_DIR}/unittest)
121122
add_executable(taskflow unittest/taskflow.cpp)
122123
target_link_libraries(taskflow Threads::Threads)
123124

124125
# unittest for taskflow
125-
add_test(builder ${PROJECT_SOURCE_DIR}/unittest/taskflow -tc=Taskflow.Builder)
126-
add_test(dispatch ${PROJECT_SOURCE_DIR}/unittest/taskflow -tc=Taskflow.Dispatch)
127-
add_test(parallel_for ${PROJECT_SOURCE_DIR}/unittest/taskflow -tc=Taskflow.ParallelFor)
128-
add_test(reduce ${PROJECT_SOURCE_DIR}/unittest/taskflow -tc=Taskflow.Reduce)
129-
add_test(reduce_min ${PROJECT_SOURCE_DIR}/unittest/taskflow -tc=Taskflow.ReduceMin)
130-
add_test(reduce_max ${PROJECT_SOURCE_DIR}/unittest/taskflow -tc=Taskflow.ReduceMax)
131-
add_test(joined_subflow ${PROJECT_SOURCE_DIR}/unittest/taskflow -tc=Taskflow.JoinedSubflow)
132-
add_test(detached_subflow ${PROJECT_SOURCE_DIR}/unittest/taskflow -tc=Taskflow.DetachedSubflow)
126+
add_test(builder ${TF_UTEST_DIR}/taskflow -tc=Taskflow.Builder)
127+
add_test(dispatch ${TF_UTEST_DIR}/taskflow -tc=Taskflow.Dispatch)
128+
add_test(parallel_for ${TF_UTEST_DIR}/taskflow -tc=Taskflow.ParallelFor)
129+
add_test(reduce ${TF_UTEST_DIR}/taskflow -tc=Taskflow.Reduce)
130+
add_test(reduce_min ${TF_UTEST_DIR}/taskflow -tc=Taskflow.ReduceMin)
131+
add_test(reduce_max ${TF_UTEST_DIR}/taskflow -tc=Taskflow.ReduceMax)
132+
add_test(joined_subflow ${TF_UTEST_DIR}/taskflow -tc=Taskflow.JoinedSubflow)
133+
add_test(detached_subflow ${TF_UTEST_DIR}/taskflow -tc=Taskflow.DetachedSubflow)
133134

134135
# unittest for threadpool
135-
add_executable(threadpool_test unittest/threadpool.cpp)
136-
target_link_libraries(threadpool_test Threads::Threads)
137-
138-
add_test(proactivethreadpool ${PROJECT_SOURCE_DIR}/unittest/threadpool_test -tc=Threadpool.ProactiveThreadpool)
136+
add_executable(threadpool_test_tmp unittest/threadpool.cpp)
137+
target_link_libraries(threadpool_test_tmp Threads::Threads)
138+
set_target_properties(threadpool_test_tmp PROPERTIES OUTPUT_NAME "threadpool")
139+
add_test(simple_threadpool ${TF_UTEST_DIR}/threadpool -tc=SimpleThreadpool)
140+
add_test(proactive_threadpool ${TF_UTEST_DIR}/threadpool -tc=ProactiveThreadpool)
139141

140142
## threadpool_cxx14 unittest (contributed by Glen Fraser)
141143
add_executable(threadpool_cxx14_tmp unittest/threadpool_cxx14.cpp)
142144
set_target_properties(threadpool_cxx14_tmp PROPERTIES CXX_STANDARD 14)
143145
target_link_libraries(threadpool_cxx14_tmp Threads::Threads)
144146
set_target_properties(threadpool_cxx14_tmp PROPERTIES OUTPUT_NAME "threadpool_cxx14")
145-
add_test(threadpool_cxx14_basic ${PROJECT_SOURCE_DIR}/unittest/threadpool_cxx14 -tc=Threadpool.Basic)
146-
add_test(threadpool_cxx14_wait_for_all ${PROJECT_SOURCE_DIR}/unittest/threadpool_cxx14 -tc=Threadpool.WaitForAll)
147+
add_test(threadpool_cxx14_basic ${TF_UTEST_DIR}/threadpool_cxx14 -tc=Threadpool.Basic)
148+
add_test(threadpool_cxx14_wait_for_all ${TF_UTEST_DIR}/threadpool_cxx14 -tc=Threadpool.WaitForAll)
147149

example/matrix.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ void cppthread(const std::vector<size_t>& D) {
150150

151151
auto tbeg = std::chrono::steady_clock::now();
152152

153-
tf::Threadpool tpl(std::thread::hardware_concurrency());
153+
tf::SimpleThreadpool tpl(std::thread::hardware_concurrency());
154154

155155
std::cout << "Generating matrix As ...\n";
156156
std::vector<matrix_t> As(D.size());

example/threadpool.cpp

Lines changed: 33 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,84 +1,86 @@
1-
//2018/8/31 contributed by Guannan
2-
// Examples to test throughput of various theadpools
1+
// 2018/8/31 contributed by Guannan
2+
//
3+
// Examples to test different threadpool implementations:
4+
// - SimpleThreadpool
5+
// - ProactiveThreadpool
36

47
#include <taskflow/threadpool/threadpool.hpp>
5-
#include <taskflow/threadpool/proactive_threadpool.hpp>
68
#include <chrono>
7-
#include <atomic>
8-
#include <thread>
9+
#include <random>
910

11+
// Procedure: benchmark_empty_jobs
1012
void benchmark_empty_jobs() {
1113

12-
std::cout << "Testing threadpool throughput on empty jobs..." << std::endl;
14+
std::cout << "Benchmarking threadpool throughput on empty jobs ...\n";
1315

1416
unsigned thread_num = 4;
1517
unsigned int task_num = 10000000;
1618

1719
auto start = std::chrono::high_resolution_clock::now();
1820

19-
tf::ProactiveThreadpool pool(thread_num);
21+
tf::ProactiveThreadpool proactive(thread_num);
2022
for(size_t i=0; i<task_num; i++){
21-
pool.silent_async([](){});
23+
proactive.silent_async([](){});
2224
}
23-
pool.shutdown();
25+
proactive.shutdown();
2426

2527
auto end = std::chrono::high_resolution_clock::now();
26-
std::chrono::duration<double> elapsed = end - start;
27-
std::cout << "ProactiveThreadpool elapsed time: " << elapsed.count() << std::endl;
28+
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
29+
std::cout << "ProactiveThreadpool elapsed time: " << elapsed.count() << " ms\n";
2830

2931
start = std::chrono::high_resolution_clock::now();
3032

31-
tf::Threadpool tf_pool(thread_num);
33+
tf::SimpleThreadpool simple(thread_num);
3234
for(size_t i=0; i<task_num; i++){
33-
tf_pool.silent_async([](){});
35+
simple.silent_async([](){});
3436
}
35-
tf_pool.shutdown();
37+
simple.shutdown();
3638

3739
end = std::chrono::high_resolution_clock::now();
38-
elapsed = end - start;
39-
std::cout << "Basic elapsed time: " << elapsed.count() << std::endl;
40-
std::cout << std::endl;
40+
elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
41+
std::cout << "SimpleThreadpool elapsed time: " << elapsed.count() << " ms\n";
4142
}
4243

43-
44+
// Procedure: benchmark_atomic_add
4445
void benchmark_atomic_add() {
4546

46-
std::cout << "Testing threadpool throughput on atomic add..." << std::endl;
47+
std::cout << "Benchmarking threadpool throughput on atomic add ...\n";
4748

4849
unsigned thread_num = 4;
4950
unsigned int task_num = 10000000;
5051

5152
std::atomic<int> counter(0);
5253
auto start = std::chrono::high_resolution_clock::now();
5354

54-
tf::ProactiveThreadpool pool(thread_num);
55+
tf::ProactiveThreadpool proactive(thread_num);
5556
for(size_t i=0; i<task_num; i++){
56-
pool.silent_async([&counter](){ counter++; });
57+
proactive.silent_async([&counter](){ counter++; });
5758
}
58-
pool.shutdown();
59+
proactive.shutdown();
5960

6061
auto end = std::chrono::high_resolution_clock::now();
61-
std::chrono::duration<double> elapsed = end - start;
62-
std::cout << "ProactiveThreadpool elapsed time: " << elapsed.count() << std::endl;
62+
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
63+
std::cout << "ProactiveThreadpool elapsed time: " << elapsed.count() << " ms\n";
6364

6465
counter = 0;
6566
start = std::chrono::high_resolution_clock::now();
66-
tf::Threadpool tf_pool(thread_num);
67+
tf::SimpleThreadpool simple(thread_num);
6768

6869
for(size_t i=0; i<task_num; i++){
69-
tf_pool.silent_async([&counter](){ counter++; });
70+
simple.silent_async([&counter](){ counter++; });
7071
}
71-
tf_pool.shutdown();
72+
simple.shutdown();
7273

7374
end = std::chrono::high_resolution_clock::now();
74-
elapsed = end - start;
75-
std::cout << "Basic elapsed time: " << elapsed.count() << std::endl;
76-
std::cout << std::endl;
75+
elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
76+
std::cout << "SimpleThreadpool elapsed time: " << elapsed.count() << " ms\n";
7777
}
7878

79+
// Function: main
7980
int main(int argc, char* argv[]) {
8081

8182
benchmark_empty_jobs();
8283
benchmark_atomic_add();
83-
84+
85+
return 0;
8486
}

taskflow/taskflow.hpp

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// MIT License
22
//
3-
// Copyright (c) 2018 Tsung-Wei Huang, Chun-Xun Lin, and Martin Wong
3+
// Copyright (c) 2018 Tsung-Wei Huang, Chun-Xun Lin, Guannan Guo, and Martin Wong
44
//
55
// Permission is hereby granted, free of charge, to any person obtaining a copy
66
// of this software and associated documentation files (the "Software"), to deal
@@ -45,8 +45,8 @@
4545
// ============================================================================
4646
// version
4747
#define TASKFLOW_VERSION_MAJOR 2
48-
#define TASKFLOW_VERSION_MINOR 0
49-
#define TASKFLOW_VERSION_PATCH 2
48+
#define TASKFLOW_VERSION_MINOR 1
49+
#define TASKFLOW_VERSION_PATCH 0
5050
// ============================================================================
5151

5252
// Clang mis-interprets variant's get as a non-friend of variant and cannot
@@ -194,8 +194,19 @@ inline constexpr bool is_iterable_v = is_iterable<T>::value;
194194
// Taskflow definition
195195
//-----------------------------------------------------------------------------
196196

197-
// Forward declaration
197+
// Struct: MoC
198+
template <typename T>
199+
struct MoC {
198200

201+
MoC(T&& rhs) : object(std::move(rhs)) {}
202+
MoC(const MoC& other) : object(std::move(other.object)) {}
203+
204+
T& get() { return object; }
205+
206+
mutable T object;
207+
};
208+
209+
// Forward declaration
199210
template <template<typename, typename...> class FuncType>
200211
class BasicNode;
201212

@@ -1607,7 +1618,7 @@ std::string BasicTaskflow<Traits>::dump() const {
16071618
// Taskflow traits
16081619
struct TaskflowTraits {
16091620
using NodeType = BasicNode<std::function>;
1610-
using ThreadpoolType = Threadpool;
1621+
using ThreadpoolType = SimpleThreadpool;
16111622
};
16121623

16131624
using Taskflow = BasicTaskflow<TaskflowTraits>;

taskflow/threadpool/proactive_threadpool.hpp

Lines changed: 28 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
//
2-
// contributed by Guannan
1+
// 2018/09/03 - contributed by Guannan Guo
32
//
43
// ProactiveThreadpool schedules independent jobs in a greedy manner.
54
// Whenever a job is inserted into the threadpool, the threadpool will check if there
@@ -23,12 +22,28 @@
2322

2423
namespace tf {
2524

26-
2725
// template < template<typename...> class FuncType >
28-
2926
class ProactiveThreadpool {
3027

31-
using UnitTask = std::function<void()>;
28+
using TaskType = std::function<void()>;
29+
30+
template <typename T>
31+
struct MoC {
32+
33+
MoC(T&& rhs): object(std::move(rhs)) {}
34+
MoC(const MoC& other) : object(std::move(other.object)) {}
35+
36+
T& get() {return object; }
37+
38+
mutable T object;
39+
};
40+
41+
42+
struct Worker{
43+
std::condition_variable cv;
44+
TaskType task;
45+
bool ready;
46+
};
3247

3348
public:
3449

@@ -89,7 +104,7 @@ class ProactiveThreadpool {
89104
}
90105

91106
Worker w;
92-
UnitTask t;
107+
TaskType t;
93108
std::unique_lock<std::mutex> lock(_mutex);
94109
while(!_exiting){
95110
if(_task_queue.empty()){
@@ -128,7 +143,7 @@ class ProactiveThreadpool {
128143
template <typename C>
129144
void silent_async(C&& c){
130145

131-
UnitTask t {std::forward<C>(c)};
146+
TaskType t {std::forward<C>(c)};
132147

133148
//no worker thread available
134149
if(num_workers() == 0){
@@ -175,7 +190,7 @@ class ProactiveThreadpool {
175190
// all workers are busy.
176191
if(_workers.empty()){
177192
_task_queue.emplace_back(
178-
[p = MoveOnCopy(std::move(p)), c = std::forward<C>(c)]() mutable {
193+
[p = MoC(std::move(p)), c = std::forward<C>(c)]() mutable {
179194
c();
180195
p.get().set_value();
181196
}
@@ -186,7 +201,7 @@ class ProactiveThreadpool {
186201
Worker* w = _workers.back();
187202
_workers.pop_back();
188203
w->ready = true;
189-
w->task = [p = MoveOnCopy(std::move(p)), c = std::forward<C>(c)]() mutable {
204+
w->task = [p = MoC(std::move(p)), c = std::forward<C>(c)]() mutable {
190205
c();
191206
p.get().set_value();
192207
};
@@ -197,7 +212,7 @@ class ProactiveThreadpool {
197212

198213
if(_workers.empty()){
199214
_task_queue.emplace_back(
200-
[p = MoveOnCopy(std::move(p)), c = std::forward<C>(c)]() mutable {
215+
[p = MoC(std::move(p)), c = std::forward<C>(c)]() mutable {
201216
p.get().set_value(c());
202217
return;
203218
}
@@ -207,7 +222,7 @@ class ProactiveThreadpool {
207222
Worker* w = _workers.back();
208223
_workers.pop_back();
209224
w->ready = true;
210-
w->task = [p = MoveOnCopy(std::move(p)), c = std::forward<C>(c)]() mutable {
225+
w->task = [p = MoC(std::move(p)), c = std::forward<C>(c)]() mutable {
211226
p.get().set_value(c());
212227
return;
213228
};
@@ -234,31 +249,13 @@ class ProactiveThreadpool {
234249

235250

236251
private:
237-
238-
template <typename T>
239-
struct MoveOnCopy{
240-
241-
MoveOnCopy(T&& rhs): object(std::move(rhs)) {}
242-
MoveOnCopy(const MoveOnCopy& other) : object(std::move(other.object)) {}
243-
244-
T& get() {return object; }
245-
246-
mutable T object;
247-
};
248-
249-
250-
struct Worker{
251-
std::condition_variable cv;
252-
UnitTask task;
253-
bool ready;
254-
};
255252

256253
mutable std::mutex _mutex;
257254

258255
std::condition_variable _empty;
259256
std::condition_variable _complete;
260257

261-
std::deque<UnitTask> _task_queue;
258+
std::deque<TaskType> _task_queue;
262259
std::vector<std::thread> _threads;
263260
std::vector<Worker*> _workers;
264261
std::unordered_set<std::thread::id> _worker_ids;
@@ -270,8 +267,7 @@ class ProactiveThreadpool {
270267
};
271268

272269

273-
274-
};
270+
}; // namespace tf. ----------------------------------------------------------
275271

276272

277273

0 commit comments

Comments
 (0)