Skip to content

Commit 4d76b3e

Browse files
refactored - with bug
Merge remote-tracking branch 'refs/remotes/origin/dev' into dev
2 parents 06eab2f + fe59cf3 commit 4d76b3e

4 files changed

Lines changed: 127 additions & 19 deletions

File tree

CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,9 @@ target_link_libraries(multiple_dispatch Threads::Threads)
116116
add_executable(subsum example/subsum.cpp)
117117
target_link_libraries(subsum Threads::Threads)
118118

119+
add_executable(taskflow_subsum example/taskflow_subsum.cpp)
120+
target_link_libraries(taskflow_subsum Threads::Threads)
121+
119122

120123
# -----------------------------------------------------------------------------
121124
# Unittest

example/subsum.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,14 @@ void benchmark_subsum(){
9292
auto gen = [](){return rand()%(max_num-min_num) + min_num;};
9393
std::generate(std::begin(vec), std::end(vec), gen);
9494

95-
std::cout << "Proactive threadpool takes: "
95+
std::cout << "Simple threadpool elapesed: "
96+
<< subsum<tf::SimpleThreadpool>(vec) << " ms\n";
97+
98+
std::cout << "Proactive threadpool elapesed: "
9699
<< subsum<tf::ProactiveThreadpool>(vec) << " ms\n";
97100

98-
std::cout << "Simple threadpool takes: "
99-
<< subsum<tf::SimpleThreadpool>(vec) << " ms\n";
101+
std::cout << "Speculative threadpool elapesed: "
102+
<< subsum<tf::SpeculativeThreadpool>(vec) << " ms\n";
100103
}
101104

102105

example/taskflow_subsum.cpp

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
#include <climits>
2+
3+
#include <taskflow/taskflow.hpp> // the only include you need
4+
5+
constexpr auto min_num = -50;
6+
constexpr auto max_num = 50;
7+
constexpr auto tree_height = 20u;
8+
constexpr auto total = 1u << tree_height;
9+
10+
void update_max(std::atomic<int>& max_val, int const& value)
11+
{
12+
int old = max_val;
13+
while(old < value && !max_val.compare_exchange_weak(old, value));
14+
}
15+
16+
int maxCrossingSum(std::vector<int>& vec, int l, int m, int r){
17+
// Include elements on left of mid.
18+
auto sum = 0;
19+
auto left_sum = INT_MIN;
20+
for (auto i = m; i >= l; i--){
21+
sum = sum + vec[i];
22+
if (sum > left_sum)
23+
left_sum = sum;
24+
}
25+
26+
// Include elements on right of mid
27+
sum = 0;
28+
auto right_sum = INT_MIN;
29+
for (auto i = m+1; i <= r; i++)
30+
{
31+
sum = sum + vec[i];
32+
if (sum > right_sum)
33+
right_sum = sum;
34+
}
35+
36+
// Return sum of elements on left and right of mid
37+
return left_sum + right_sum;
38+
}
39+
40+
// This example is from https://www.geeksforgeeks.org/divide-and-conquer-maximum-sum-subarray/
41+
template<typename T>
42+
void maxSubArraySum(std::vector<int>& vec, int l, int r, std::atomic<int>& max_num, T& subflow)
43+
{
44+
// Base Case: Only one element
45+
if (l == r) {
46+
update_max(max_num, vec[l]);
47+
return ;
48+
}
49+
50+
// Find middle point
51+
int m = (l + r)/2;
52+
53+
subflow.silent_emplace(
54+
[l=l, r=r, m=m, &vec, &max_num](auto& subflow){
55+
maxSubArraySum(vec, l, m, max_num, subflow);
56+
}
57+
);
58+
59+
subflow.silent_emplace(
60+
[l=l, r=r, m=m, &vec, &max_num](auto& subflow){
61+
maxSubArraySum(vec, m+1, r, max_num, subflow);
62+
}
63+
);
64+
65+
update_max(max_num, maxCrossingSum(vec, l, m, r));
66+
}
67+
68+
69+
// Function: main
70+
int main(int argc, char* argv[]) {
71+
::srand(1);
72+
73+
std::vector<int> vec(total, 0);
74+
auto gen = [](){return rand()%(max_num-min_num) + min_num;};
75+
std::generate(std::begin(vec), std::end(vec), gen);
76+
std::atomic<int> result {INT_MIN};
77+
78+
auto start = std::chrono::high_resolution_clock::now();
79+
{
80+
tf::Taskflow tf(std::thread::hardware_concurrency());
81+
tf.silent_emplace(
82+
[&vec, &result](auto &subflow){
83+
maxSubArraySum(vec, 0, total-1, result, subflow);
84+
subflow.detach();
85+
}
86+
);
87+
tf.wait_for_all();
88+
}
89+
auto end = std::chrono::high_resolution_clock::now();
90+
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
91+
92+
std::cout << "Taskflow elapsed: " << elapsed.count() << " ms\n";
93+
//std::cout << result << '\n';
94+
95+
return 0;
96+
}
97+
98+

taskflow/threadpool/speculative_threadpool.hpp

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class BasicSpeculativeThreadpool {
4242
struct Worker{
4343
std::condition_variable cv;
4444
TaskType task;
45-
bool ready;
45+
bool ready {false};
4646
};
4747

4848
public:
@@ -83,10 +83,11 @@ class BasicSpeculativeThreadpool {
8383

8484
auto _lookahead(){
8585
auto id = std::this_thread::get_id();
86-
std::scoped_lock<std::mutex> lock(_mutex);
8786
return _worker_local.find(id);
8887
}
8988

89+
std::unique_ptr<Worker[]> _works;
90+
9091
}; // class BasicSpeculativeThreadpool. --------------------------------------
9192

9293
// Constructor
@@ -101,19 +102,19 @@ BasicSpeculativeThreadpool<Func>::~BasicSpeculativeThreadpool(){
101102
shutdown();
102103
}
103104

104-
105105
// Function: is_owner
106106
template < template<typename...> class Func >
107107
bool BasicSpeculativeThreadpool<Func>::is_owner() const {
108108
return std::this_thread::get_id() == _owner;
109109
}
110110

111-
111+
// Function: num_tasks
112112
template < template<typename...> class Func >
113113
size_t BasicSpeculativeThreadpool<Func>::num_tasks() const {
114114
return _task_queue.size();
115115
}
116116

117+
// Function: num_workers
117118
template < template<typename...> class Func >
118119
size_t BasicSpeculativeThreadpool<Func>::num_workers() const {
119120
return _threads.size();
@@ -123,7 +124,7 @@ size_t BasicSpeculativeThreadpool<Func>::num_workers() const {
123124
template < template<typename...> class Func >
124125
void BasicSpeculativeThreadpool<Func>::shutdown(){
125126

126-
if(not is_owner()){
127+
if(!is_owner()){
127128
throw std::runtime_error("Worker thread cannot shut down the pool");
128129
}
129130

@@ -167,21 +168,22 @@ template < template<typename...> class Func >
167168
void BasicSpeculativeThreadpool<Func>::spawn(unsigned N) {
168169

169170
// TODO: is_owner
170-
if(not is_owner()){
171+
if(! is_owner()){
171172
throw std::runtime_error("Worker thread cannot spawn threads");
172173
}
173174

175+
// Lock to synchronize all workers before creating _worker_locals
176+
std::scoped_lock<std::mutex> lock(_mutex);
177+
_works.reset(new Worker[N]);
178+
174179
for(size_t i=0; i<N; ++i){
175-
_threads.emplace_back([this]()->void{
180+
_threads.emplace_back([this, i=i]()->void{
176181

177-
Worker w;
178-
TaskType t;
179-
auto id = std::this_thread::get_id();
182+
Worker& w = _works[i];
183+
TaskType t;
180184

181185
std::unique_lock<std::mutex> lock(_mutex);
182186

183-
_worker_local.insert({id, &w});
184-
185187
while(!_exiting){
186188
if(_task_queue.empty()){
187189
w.ready = false;
@@ -214,10 +216,12 @@ void BasicSpeculativeThreadpool<Func>::spawn(unsigned N) {
214216
} // End of while ------------------------------------------------------
215217
});
216218

217-
} // End of For -------------------------------------------------------------
219+
_worker_local.insert({_threads.back().get_id(), &_works[i]});
220+
} // End of For ---------------------------------------------------------------------------------
218221
}
219222

220223

224+
// Function: async
221225
template < template<typename...> class Func >
222226
template <typename C>
223227
auto BasicSpeculativeThreadpool<Func>::async(C&& c){
@@ -244,7 +248,7 @@ auto BasicSpeculativeThreadpool<Func>::async(C&& c){
244248
// speculation
245249
if(std::this_thread::get_id() != _owner){
246250
auto iter = _lookahead();
247-
if(iter != _worker_local.end() and iter->second->task == nullptr){
251+
if(iter != _worker_local.end() && iter->second->task == nullptr){
248252
iter->second->task =
249253
[p = MoC(std::move(p)), c = std::forward<C>(c)]() mutable {
250254
c();
@@ -280,7 +284,7 @@ auto BasicSpeculativeThreadpool<Func>::async(C&& c){
280284
// speculation
281285
if(std::this_thread::get_id() != _owner){
282286
auto iter = _lookahead();
283-
if(iter != _worker_local.end() and iter->second->task == nullptr){
287+
if(iter != _worker_local.end() && iter->second->task == nullptr){
284288
iter->second->task =
285289
[p = MoC(std::move(p)), c = std::forward<C>(c)]() mutable {
286290
p.get().set_value(c());
@@ -327,7 +331,7 @@ void BasicSpeculativeThreadpool<Func>::silent_async(C&& c){
327331
// speculation
328332
if(std::this_thread::get_id() != _owner){
329333
auto iter = _lookahead();
330-
if(iter != _worker_local.end() and iter->second->task == nullptr){
334+
if(iter != _worker_local.end() && iter->second->task == nullptr){
331335
iter->second->task = std::move(t);
332336
return ;
333337
}

0 commit comments

Comments
 (0)