Skip to content

Commit 4cc9626

Browse files
improved workstealing executor
1 parent e45f2a3 commit 4cc9626

7 files changed

Lines changed: 431 additions & 196 deletions

File tree

CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,8 @@ set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${TF_UTEST_DIR})
179179
add_executable(utility unittest/utility.cpp)
180180
target_link_libraries(utility ${PROJECT_NAME} Threads::Threads)
181181
target_include_directories(utility PRIVATE ${PROJECT_SOURCE_DIR}/doctest)
182-
add_test(passive_vector ${TF_UTEST_DIR}/utility -tc=passive_vector)
182+
add_test(passive_vector ${TF_UTEST_DIR}/utility -tc=PassiveVector)
183+
add_test(allocator ${TF_UTEST_DIR}/utility -tc=Allocator)
183184

184185
# unittest for taskflow
185186
add_executable(taskflow_test_tmp unittest/taskflow.cpp)

benchmark/mnist/main.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ int main(int argc, char *argv[]){
4747
num_threads = std::atoi(argv[1]);
4848
}
4949

50-
int rounds {5};
50+
int rounds {2};
5151

5252
std::cout << std::setw(12) << "# epochs"
5353
<< std::setw(12) << "OpenMP"
@@ -65,8 +65,8 @@ int main(int argc, char *argv[]){
6565
double tf_time {0.0};
6666

6767
for(int j=0; j<rounds; ++j) {
68-
omp_time += measure_time_omp(epoch, num_threads).count();
69-
tbb_time += measure_time_tbb(epoch, num_threads).count();
68+
//omp_time += measure_time_omp(epoch, num_threads).count();
69+
//tbb_time += measure_time_tbb(epoch, num_threads).count();
7070
tf_time += measure_time_taskflow(epoch, num_threads).count();
7171
}
7272

example/nested_subflow.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ void syncLog(std::string const& msg) {
88

99
void grow(tf::SubflowBuilder& subflow, uint64_t depth) {
1010
syncLog("Depth: " + std::to_string(depth));
11-
std::this_thread::sleep_for(std::chrono::milliseconds(100));
11+
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
1212
if(depth < 3) {
1313
subflow.silent_emplace(
1414
[depth](tf::SubflowBuilder& subsubflow){ grow(subsubflow, depth+1); },

taskflow/threadpool/notifier.hpp

Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
// 2019/02/09 - created by Tsung-Wei Huang
2+
// - modified the event count from Eigen
3+
4+
#pragma once
5+
6+
#include <iostream>
7+
#include <vector>
8+
#include <cstdlib>
9+
#include <cstdio>
10+
#include <atomic>
11+
#include <memory>
12+
#include <cassert>
13+
#include <deque>
14+
#include <optional>
15+
#include <thread>
16+
#include <algorithm>
17+
#include <set>
18+
#include <numeric>
19+
#include <cassert>
20+
21+
// This file is part of Eigen, a lightweight C++ template library
22+
// for linear algebra.
23+
//
24+
// Copyright (C) 2016 Dmitry Vyukov <[email protected]>
25+
//
26+
// This Source Code Form is subject to the terms of the Mozilla
27+
// Public License v. 2.0. If a copy of the MPL was not distributed
28+
// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
29+
30+
namespace tf {
31+
32+
// Notifier allows to wait for arbitrary predicates in non-blocking
33+
// algorithms. Think of condition variable, but wait predicate does not need to
34+
// be protected by a mutex. Usage:
35+
// Waiting thread does:
36+
//
37+
// if (predicate)
38+
// return act();
39+
// Notifier::Waiter& w = waiters[my_index];
40+
// ec.prepare_wait(&w);
41+
// if (predicate) {
42+
// ec.cancel_wait(&w);
43+
// return act();
44+
// }
45+
// ec.commit_wait(&w);
46+
//
47+
// Notifying thread does:
48+
//
49+
// predicate = true;
50+
// ec.notify(true);
51+
//
52+
// notify is cheap if there are no waiting threads. prepare_wait/commit_wait are not
53+
// cheap, but they are executed only if the preceeding predicate check has
54+
// failed.
55+
//
56+
// Algorihtm outline:
57+
// There are two main variables: predicate (managed by user) and _state.
58+
// Operation closely resembles Dekker mutual algorithm:
59+
// https://en.wikipedia.org/wiki/Dekker%27s_algorithm
60+
// Waiting thread sets _state then checks predicate, Notifying thread sets
61+
// predicate then checks _state. Due to seq_cst fences in between these
62+
// operations it is guaranteed than either waiter will see predicate change
63+
// and won't block, or notifying thread will see _state change and will unblock
64+
// the waiter, or both. But it can't happen that both threads don't see each
65+
// other changes, which would lead to deadlock.
66+
class Notifier {
67+
68+
public:
69+
70+
struct Waiter {
71+
std::atomic<Waiter*> next;
72+
std::mutex mu;
73+
std::condition_variable cv;
74+
uint64_t epoch;
75+
unsigned state;
76+
enum {
77+
kNotSignaled,
78+
kWaiting,
79+
kSignaled,
80+
};
81+
};
82+
83+
Notifier(std::vector<Waiter>& waiters) : _waiters(waiters) {
84+
assert(waiters.size() < (1 << kWaiterBits) - 1);
85+
// Initialize epoch to something close to overflow to test overflow.
86+
_state = kStackMask | (kEpochMask - kEpochInc * waiters.size() * 2);
87+
}
88+
89+
~Notifier() {
90+
// Ensure there are no waiters.
91+
assert((_state.load() & (kStackMask | kWaiterMask)) == kStackMask);
92+
}
93+
94+
// prepare_wait prepares for waiting.
95+
// After calling this function the thread must re-check the wait predicate
96+
// and call either cancel_wait or commit_wait passing the same Waiter object.
97+
void prepare_wait(Waiter* w) {
98+
w->epoch = _state.fetch_add(kWaiterInc, std::memory_order_relaxed);
99+
std::atomic_thread_fence(std::memory_order_seq_cst);
100+
}
101+
102+
// commit_wait commits waiting.
103+
void commit_wait(Waiter* w) {
104+
w->state = Waiter::kNotSignaled;
105+
// Modification epoch of this waiter.
106+
uint64_t epoch =
107+
(w->epoch & kEpochMask) +
108+
(((w->epoch & kWaiterMask) >> kWaiterShift) << kEpochShift);
109+
uint64_t state = _state.load(std::memory_order_seq_cst);
110+
for (;;) {
111+
if (int64_t((state & kEpochMask) - epoch) < 0) {
112+
// The preceeding waiter has not decided on its fate. Wait until it
113+
// calls either cancel_wait or commit_wait, or is notified.
114+
std::this_thread::yield();
115+
state = _state.load(std::memory_order_seq_cst);
116+
continue;
117+
}
118+
// We've already been notified.
119+
if (int64_t((state & kEpochMask) - epoch) > 0) return;
120+
// Remove this thread from prewait counter and add it to the waiter list.
121+
assert((state & kWaiterMask) != 0);
122+
uint64_t newstate = state - kWaiterInc + kEpochInc;
123+
newstate = (newstate & ~kStackMask) | (w - &_waiters[0]);
124+
if ((state & kStackMask) == kStackMask)
125+
w->next.store(nullptr, std::memory_order_relaxed);
126+
else
127+
w->next.store(&_waiters[state & kStackMask], std::memory_order_relaxed);
128+
if (_state.compare_exchange_weak(state, newstate,
129+
std::memory_order_release))
130+
break;
131+
}
132+
_park(w);
133+
}
134+
135+
// cancel_wait cancels effects of the previous prepare_wait call.
136+
void cancel_wait(Waiter* w) {
137+
uint64_t epoch =
138+
(w->epoch & kEpochMask) +
139+
(((w->epoch & kWaiterMask) >> kWaiterShift) << kEpochShift);
140+
uint64_t state = _state.load(std::memory_order_relaxed);
141+
for (;;) {
142+
if (int64_t((state & kEpochMask) - epoch) < 0) {
143+
// The preceeding waiter has not decided on its fate. Wait until it
144+
// calls either cancel_wait or commit_wait, or is notified.
145+
std::this_thread::yield();
146+
state = _state.load(std::memory_order_relaxed);
147+
continue;
148+
}
149+
// We've already been notified.
150+
if (int64_t((state & kEpochMask) - epoch) > 0) return;
151+
// Remove this thread from prewait counter.
152+
assert((state & kWaiterMask) != 0);
153+
if (_state.compare_exchange_weak(state, state - kWaiterInc + kEpochInc,
154+
std::memory_order_relaxed))
155+
return;
156+
}
157+
}
158+
159+
// notify wakes one or all waiting threads.
160+
// Must be called after changing the associated wait predicate.
161+
void notify(bool all) {
162+
std::atomic_thread_fence(std::memory_order_seq_cst);
163+
uint64_t state = _state.load(std::memory_order_acquire);
164+
for (;;) {
165+
// Easy case: no waiters.
166+
if ((state & kStackMask) == kStackMask && (state & kWaiterMask) == 0)
167+
return;
168+
uint64_t waiters = (state & kWaiterMask) >> kWaiterShift;
169+
uint64_t newstate;
170+
if (all) {
171+
// Reset prewait counter and empty wait list.
172+
newstate = (state & kEpochMask) + (kEpochInc * waiters) + kStackMask;
173+
} else if (waiters) {
174+
// There is a thread in pre-wait state, unblock it.
175+
newstate = state + kEpochInc - kWaiterInc;
176+
} else {
177+
// Pop a waiter from list and unpark it.
178+
Waiter* w = &_waiters[state & kStackMask];
179+
Waiter* wnext = w->next.load(std::memory_order_relaxed);
180+
uint64_t next = kStackMask;
181+
if (wnext != nullptr) next = wnext - &_waiters[0];
182+
// Note: we don't add kEpochInc here. ABA problem on the lock-free stack
183+
// can't happen because a waiter is re-pushed onto the stack only after
184+
// it was in the pre-wait state which inevitably leads to epoch
185+
// increment.
186+
newstate = (state & kEpochMask) + next;
187+
}
188+
if (_state.compare_exchange_weak(state, newstate,
189+
std::memory_order_acquire)) {
190+
if (!all && waiters) return; // unblocked pre-wait thread
191+
if ((state & kStackMask) == kStackMask) return;
192+
Waiter* w = &_waiters[state & kStackMask];
193+
if (!all) w->next.store(nullptr, std::memory_order_relaxed);
194+
_unpark(w);
195+
return;
196+
}
197+
}
198+
}
199+
200+
private:
201+
202+
// State_ layout:
203+
// - low kStackBits is a stack of waiters committed wait.
204+
// - next kWaiterBits is count of waiters in prewait state.
205+
// - next kEpochBits is modification counter.
206+
static const uint64_t kStackBits = 16;
207+
static const uint64_t kStackMask = (1ull << kStackBits) - 1;
208+
static const uint64_t kWaiterBits = 16;
209+
static const uint64_t kWaiterShift = 16;
210+
static const uint64_t kWaiterMask = ((1ull << kWaiterBits) - 1)
211+
<< kWaiterShift;
212+
static const uint64_t kWaiterInc = 1ull << kWaiterBits;
213+
static const uint64_t kEpochBits = 32;
214+
static const uint64_t kEpochShift = 32;
215+
static const uint64_t kEpochMask = ((1ull << kEpochBits) - 1) << kEpochShift;
216+
static const uint64_t kEpochInc = 1ull << kEpochShift;
217+
std::atomic<uint64_t> _state;
218+
std::vector<Waiter>& _waiters;
219+
220+
void _park(Waiter* w) {
221+
std::unique_lock<std::mutex> lock(w->mu);
222+
while (w->state != Waiter::kSignaled) {
223+
w->state = Waiter::kWaiting;
224+
w->cv.wait(lock);
225+
}
226+
}
227+
228+
void _unpark(Waiter* waiters) {
229+
Waiter* next = nullptr;
230+
for (Waiter* w = waiters; w; w = next) {
231+
next = w->next.load(std::memory_order_relaxed);
232+
unsigned state;
233+
{
234+
std::unique_lock<std::mutex> lock(w->mu);
235+
state = w->state;
236+
w->state = Waiter::kSignaled;
237+
}
238+
// Avoid notifying if it wasn't waiting.
239+
if (state == Waiter::kWaiting) w->cv.notify_one();
240+
}
241+
}
242+
243+
Notifier(const Notifier&) = delete;
244+
void operator=(const Notifier&) = delete;
245+
};
246+
247+
248+
249+
} // namespace tf ------------------------------------------------------------
250+

0 commit comments

Comments
 (0)