Skip to content

Commit a66f0fd

Browse files
updated semaphore
1 parent 3fbd4a6 commit a66f0fd

7 files changed

Lines changed: 233 additions & 70 deletions

File tree

examples/simple.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,4 @@ int main(){
3131

3232
return 0;
3333
}
34+

taskflow/core/atomic_notifier.hpp

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@ class AtomicNotifier {
4242
static_assert(sizeof(uint64_t) == 8, "bad platform");
4343
static_assert(sizeof(std::atomic<uint64_t>) == 8, "bad platform");
4444

45-
//static constexpr size_t kEpochOffset = kIsLittleEndian ? 1 : 0;
46-
4745
// _state stores the epoch in the most significant 32 bits and the
4846
// waiter count in the least significant 32 bits.
4947
std::atomic<uint64_t> _state;
@@ -144,8 +142,6 @@ class AtomicNotifierV2 {
144142
static_assert(sizeof(uint64_t) == 8, "bad platform");
145143
static_assert(sizeof(std::atomic<uint64_t>) == 8, "bad platform");
146144

147-
//static constexpr size_t kEpochOffset = kIsLittleEndian ? 1 : 0;
148-
149145
// _state stores the epoch in the most significant 32 bits and the
150146
// waiter count in the least significant 32 bits.
151147
std::atomic<uint64_t> _state;

taskflow/core/executor.hpp

Lines changed: 8 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1699,8 +1699,7 @@ inline void Executor::_tear_down_invoke(Worker& worker, Node* node) {
16991699
_tear_down_topology(worker, node->_topology);
17001700
}
17011701
}
1702-
// Here we asssume the parent is in a busy loop (e.g., corun) waiting for
1703-
// its join counter to become 0.
1702+
// The parent is in a corun loop waiting for its join counter to reach 0.
17041703
else {
17051704
//parent->_join_counter.fetch_sub(1, std::memory_order_acq_rel);
17061705
parent->_join_counter.fetch_sub(1, std::memory_order_release);
@@ -2305,68 +2304,34 @@ inline void Runtime::corun_all() {
23052304
template <typename... S,
23062305
std::enable_if_t<all_same_v<Semaphore, std::decay_t<S>...>, void>*
23072306
>
2308-
void Runtime::acquire(S&&... semaphores) {
2309-
constexpr size_t N = sizeof...(S);
2310-
std::array<Semaphore*, N> items { std::addressof(semaphores)... };
2311-
_executor._corun_until(_worker, [&](){
2312-
// Ideally, we should use a better deadlock-avoidance algorithm but
2313-
// in practice the number of semaphores will not be too large and
2314-
// tf::Semaphore does not provide blocking method. Hence, we are
2315-
// mostly safe here. This is similar to the GCC try_lock implementation:
2316-
// https://github.com/gcc-mirror/gcc/blob/master/libstdc%2B%2B-v3/include/std/mutex
2317-
for(size_t i=0; i < N; i++) {
2318-
if(items[i]->try_acquire() == false) {
2319-
for(size_t j=0; j<i; j++) {
2320-
items[j]->release();
2321-
}
2322-
return false;
2323-
}
2324-
}
2325-
return true;
2326-
});
2307+
void Runtime::acquire(S&... semaphores) {
2308+
_executor._corun_until(_worker, [&](){ return tf::try_acquire(semaphores...); });
23272309
// TODO: exception?
23282310
}
23292311

23302312
// Function:: acquire
23312313
template <typename I,
23322314
std::enable_if_t<std::is_same_v<deref_t<I>, Semaphore>, void>*
23332315
>
2334-
void Runtime::acquire(I begin, I end) {
2335-
_executor._corun_until(_worker, [begin, end](){
2336-
// Ideally, we should use a better deadlock-avoidance algorithm but
2337-
// in practice the number of semaphores will not be too large and
2338-
// tf::Semaphore does not provide blocking method. Hence, we are
2339-
// mostly safe here. This is similar to the GCC try_lock implementation:
2340-
// https://github.com/gcc-mirror/gcc/blob/master/libstdc%2B%2B-v3/include/std/mutex
2341-
for(I ptr = begin; ptr != end; ptr++) {
2342-
if(ptr->try_acquire() == false) {
2343-
for(I ptr2 = begin; ptr2 != ptr; ptr2++) {
2344-
ptr2->release();
2345-
}
2346-
return false;
2347-
}
2348-
}
2349-
return true;
2350-
});
2316+
void Runtime::acquire(I first, I last) {
2317+
_executor._corun_until(_worker, [=](){ return tf::try_acquire(first, last); });
23512318
// TODO: exception?
23522319
}
23532320

23542321
// Function: release
23552322
template <typename... S,
23562323
std::enable_if_t<all_same_v<Semaphore, std::decay_t<S>...>, void>*
23572324
>
2358-
void Runtime::release(S&&... semaphores){
2359-
(semaphores.release(), ...);
2325+
void Runtime::release(S&... semaphores){
2326+
tf::release(semaphores...);
23602327
}
23612328

23622329
// Function:: release
23632330
template <typename I,
23642331
std::enable_if_t<std::is_same_v<deref_t<I>, Semaphore>, void>*
23652332
>
23662333
void Runtime::release(I begin, I end) {
2367-
for(I ptr = begin; ptr != end; ptr++) {
2368-
ptr->release();
2369-
}
2334+
tf::release(begin, end);
23702335
}
23712336

23722337
// Destructor

taskflow/core/graph.hpp

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22

33
#include "../utility/traits.hpp"
44
#include "../utility/iterator.hpp"
5+
6+
#ifdef TF_ENABLE_TASK_POOL
57
#include "../utility/object_pool.hpp"
8+
#endif
9+
610
#include "../utility/os.hpp"
711
#include "../utility/math.hpp"
812
#include "../utility/small_vector.hpp"
@@ -491,7 +495,7 @@ class Runtime {
491495
template <typename... S,
492496
std::enable_if_t<all_same_v<Semaphore, std::decay_t<S>...>, void>* = nullptr
493497
>
494-
void acquire(S&&... semaphores);
498+
void acquire(S&... semaphores);
495499

496500
/**
497501
@brief acquires the given range of semaphores with a deadlock avoidance algorithm
@@ -550,7 +554,7 @@ class Runtime {
550554
template <typename... S,
551555
std::enable_if_t<all_same_v<Semaphore, std::decay_t<S>...>, void>* = nullptr
552556
>
553-
void release(S&&... semaphores);
557+
void release(S&... semaphores);
554558

555559
/**
556560
@brief releases the given range of semaphores
@@ -709,7 +713,9 @@ class Node {
709713
FINISHED = 2
710714
};
711715

716+
#ifdef TF_ENABLE_TASK_POOL
712717
TF_ENABLE_POOLABLE_ON_THIS;
718+
#endif
713719

714720
// state bit flag
715721
constexpr static int CONDITIONED = 1;
@@ -886,23 +892,31 @@ class Node {
886892
/**
887893
@private
888894
*/
895+
#ifdef TF_ENABLE_TASK_POOL
889896
inline ObjectPool<Node> _task_pool;
897+
#endif
890898

891899
/**
892900
@private
893901
*/
894902
template <typename... ArgsT>
895903
TF_FORCE_INLINE Node* animate(ArgsT&&... args) {
896-
//return new Node(std::forward<ArgsT>(args)...);
904+
#ifdef TF_ENABLE_TASK_POOL
897905
return _task_pool.animate(std::forward<ArgsT>(args)...);
906+
#else
907+
return new Node(std::forward<ArgsT>(args)...);
908+
#endif
898909
}
899910

900911
/**
901912
@private
902913
*/
903914
TF_FORCE_INLINE void recycle(Node* ptr) {
904-
//delete ptr;
915+
#ifdef TF_ENABLE_TASK_POOL
905916
_task_pool.recycle(ptr);
917+
#else
918+
delete ptr;
919+
#endif
906920
}
907921

908922
// ----------------------------------------------------------------------------

taskflow/core/semaphore.hpp

Lines changed: 83 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,15 @@ class Semaphore {
7070
public:
7171

7272
/**
73-
@brief constructs a semaphore with the given counter
73+
@brief constructs a default semaphore with count equal to zero
74+
75+
Application can use tf::Semaphore::reset to reset the counter of
76+
the semaphore later.
77+
*/
78+
Semaphore() : _count(0) { };
79+
80+
/**
81+
@brief constructs a semaphore with the given count
7482
7583
A semaphore creates a constraint that limits the maximum concurrency,
7684
i.e., the number of workers, in a set of tasks.
@@ -83,9 +91,11 @@ class Semaphore {
8391
}
8492

8593
/**
86-
@brief queries the counter value (not thread-safe during the run)
94+
@brief queries the current value of the associated counter
8795
8896
@param memory_order the memory order of this load (default std::memory_order_relaxed)
97+
98+
Queries the current value of the associated counter.
8999
*/
90100
size_t count(std::memory_order memory_order = std::memory_order_relaxed) const {
91101
return _count.load(memory_order);
@@ -96,6 +106,8 @@ class Semaphore {
96106
97107
@return @c true if it decremented the internal counter, otherwise @c false
98108
109+
Tries to atomically decrement the internal counter by @c 1. If the operation succeeds,
110+
returns @c true, otherwise @c false.
99111
*/
100112
bool try_acquire() {
101113
auto old = _count.load(std::memory_order_acquire);
@@ -112,7 +124,9 @@ class Semaphore {
112124
113125
@param n the value by which the internal counter will be incremented
114126
@return @c true if it decremented the internal counter, otherwise @c false
115-
127+
128+
The release operation always succeeds as it simply increments
129+
the counter of this semaphore.
116130
*/
117131
void release(size_t n = 1) {
118132
_count.fetch_add(n, std::memory_order_release);
@@ -123,6 +137,11 @@ class Semaphore {
123137
124138
@param count the new count value
125139
@param memory_order memory order to which this operation will be applied
140+
141+
@note
142+
Calling tf::Semaphore::reset will immediately change the underlying
143+
counter to the given @c count value, regardless other threads acquiring
144+
or releasing the semaphore.
126145
*/
127146
void reset(size_t count, std::memory_order memory_order = std::memory_order_relaxed) {
128147
_count.store(count, memory_order);
@@ -135,15 +154,28 @@ class Semaphore {
135154

136155
/**
137156
@brief tries to acquire all semaphores in the specified range
157+
158+
@tparam I iterator type
159+
@param first iterator to the beginning (inclusive)
160+
@param last iterator to the end (exclusive)
161+
162+
Tries to acquire all semaphores in the specified range.
163+
164+
@return @c true if all semaphores are acquired, otherwise @c false
138165
*/
139166
template <typename I,
140167
std::enable_if_t<std::is_same_v<deref_t<I>, Semaphore>, void> * = nullptr
141168
>
142-
bool try_acquire(I begin, I end) {
143-
I ptr = begin;
144-
for(; ptr != end; ptr++) {
169+
bool try_acquire(I first, I last) {
170+
// Ideally, we should use a better deadlock-avoidance algorithm but
171+
// in practice the number of semaphores is small and
172+
// tf::Semaphore does not provide blocking require. Hence, we are
173+
// mostly safe here. This is similar to the GCC try_lock implementation:
174+
// https://github.com/gcc-mirror/gcc/blob/master/libstdc%2B%2B-v3/include/std/mutex
175+
I ptr = first;
176+
for(; ptr != last; ptr++) {
145177
if(ptr->try_acquire() == false) {
146-
for(I ptr2 = begin; ptr2 != ptr; ptr2++) {
178+
for(I ptr2 = first; ptr2 != ptr; ptr2++) {
147179
ptr2->release();
148180
}
149181
return false;
@@ -154,11 +186,22 @@ bool try_acquire(I begin, I end) {
154186

155187
/**
156188
@brief tries to acquire all semaphores
189+
190+
@param semaphores semaphores to acquire
191+
192+
Tries to acquire all the semaphores.
193+
194+
@return @c true if all semaphores are acquired, otherwise @c false
157195
*/
158196
template<typename... S,
159197
std::enable_if_t<all_same_v<Semaphore, std::decay_t<S>...>, void>* = nullptr
160198
>
161-
bool try_acquire(S&&... semaphores) {
199+
bool try_acquire(S&... semaphores) {
200+
// Ideally, we should use a better deadlock-avoidance algorithm but
201+
// in practice the number of semaphores is small and
202+
// tf::Semaphore does not provide blocking require. Hence, we are
203+
// mostly safe here. This is similar to the GCC try_lock implementation:
204+
// https://github.com/gcc-mirror/gcc/blob/master/libstdc%2B%2B-v3/include/std/mutex
162205
constexpr size_t N = sizeof...(S);
163206
std::array<Semaphore*, N> items { std::addressof(semaphores)... };
164207
size_t i = 0;
@@ -173,5 +216,37 @@ bool try_acquire(S&&... semaphores) {
173216
return true;
174217
}
175218

219+
/**
220+
@brief tries to acquire all semaphores in the specified range
221+
222+
@tparam I iterator type
223+
@param first iterator to the beginning (inclusive)
224+
@param last iterator to the end (exclusive)
225+
226+
Releases all the semaphores in the given range.
227+
*/
228+
template <typename I,
229+
std::enable_if_t<std::is_same_v<deref_t<I>, Semaphore>, void> * = nullptr
230+
>
231+
void release(I first, I last) {
232+
std::for_each(first, last, [](tf::Semaphore& semaphore){
233+
semaphore.release();
234+
});
235+
}
236+
237+
/**
238+
@brief tries to acquire all semaphores
239+
240+
@param semaphores semaphores to release
241+
242+
Releases all the semaphores.
243+
*/
244+
template<typename... S,
245+
std::enable_if_t<all_same_v<Semaphore, std::decay_t<S>...>, void>* = nullptr
246+
>
247+
void release(S&... semaphores) {
248+
(semaphores.release(), ...);
249+
}
250+
176251
} // end of namespace tf. ---------------------------------------------------
177252

taskflow/taskflow.hpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,17 @@
3333
// TF_VERSION / 100000 is the major version
3434

3535
// current version: 3.8.0
36-
#define TF_VERSION 300700
36+
#define TF_VERSION 300800
3737

3838
#define TF_MAJOR_VERSION TF_VERSION/100000
3939
#define TF_MINOR_VERSION TF_VERSION/100%1000
4040
#define TF_PATCH_VERSION TF_VERSION%100
4141

42+
// Macros to fine-tune the performance of Taskflow at compile time
43+
//
44+
// + TF_ENABLE_TASK_POOL : enable task pool optimization
45+
// + TF_ENABLE_ATOMIC_NOTIFIER: enable atomic notifier (required C++20)
46+
4247
/**
4348
@brief taskflow namespace
4449
*/

0 commit comments

Comments
 (0)