Skip to content

Commit 4dd5d5f

Browse files
author
Tsung-Wei Huang
committed
revised tf::Semaphore
1 parent 5e17292 commit 4dd5d5f

12 files changed

Lines changed: 445 additions & 555 deletions

File tree

examples/limited_concurrency.cpp

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,28 +9,22 @@ void sl() {
99

1010
int main() {
1111

12-
tf::Executor executor(4);
13-
tf::Taskflow taskflow;
14-
1512
// define a critical region of 1 worker
1613
tf::Semaphore semaphore(1);
1714

18-
// create give tasks in taskflow
19-
std::vector<tf::Task> tasks {
20-
taskflow.emplace([](){ sl(); std::cout << "A" << std::endl; }),
21-
taskflow.emplace([](){ sl(); std::cout << "B" << std::endl; }),
22-
taskflow.emplace([](){ sl(); std::cout << "C" << std::endl; }),
23-
taskflow.emplace([](){ sl(); std::cout << "D" << std::endl; }),
24-
taskflow.emplace([](){ sl(); std::cout << "E" << std::endl; })
25-
};
26-
27-
for(auto & task : tasks) {
28-
task.acquire(semaphore);
29-
task.release(semaphore);
15+
tf::Taskflow taskflow;
16+
tf::Executor executor;
17+
18+
for(size_t i=0; i<100; i++) {
19+
taskflow.emplace([&, i](tf::Runtime& rt){
20+
rt.acquire(semaphore);
21+
std::cout << i << "-th " << "message " << "never " << "interleaves with others\n";
22+
rt.release(semaphore);
23+
});
3024
}
3125

32-
executor.run(taskflow);
33-
executor.wait_for_all();
26+
executor.run(taskflow).wait();
27+
3428

3529
return 0;
3630
}

taskflow/algorithm/critical.hpp

Lines changed: 0 additions & 78 deletions
This file was deleted.

taskflow/core/async.hpp

Lines changed: 8 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -112,26 +112,10 @@ template <typename P, typename F, typename... Tasks,
112112
tf::AsyncTask Executor::silent_dependent_async(
113113
P&& params, F&& func, Tasks&&... tasks
114114
){
115-
116-
_increment_topology();
117-
118-
size_t num_dependents = sizeof...(Tasks);
119-
120-
// create a task before scheduling the node to retain a shared ownership first
121-
AsyncTask task(animate(
122-
std::forward<P>(params), nullptr, nullptr, num_dependents,
123-
std::in_place_type_t<Node::DependentAsync>{}, std::forward<F>(func)
124-
));
125-
126-
if constexpr(sizeof...(Tasks) > 0) {
127-
(_process_async_dependent(task._node, tasks, num_dependents), ...);
128-
}
129-
130-
if(num_dependents == 0) {
131-
_schedule_async_task(task._node);
132-
}
133-
134-
return task;
115+
std::array<AsyncTask, sizeof...(Tasks)> array = { std::forward<Tasks>(tasks)... };
116+
return silent_dependent_async(
117+
std::forward<P>(params), std::forward<F>(func), array.begin(), array.end()
118+
);
135119
}
136120

137121
// Function: silent_dependent_async
@@ -187,31 +171,10 @@ template <typename P, typename F, typename... Tasks,
187171
std::enable_if_t<is_task_params_v<P> && all_same_v<AsyncTask, std::decay_t<Tasks>...>, void>*
188172
>
189173
auto Executor::dependent_async(P&& params, F&& func, Tasks&&... tasks) {
190-
191-
_increment_topology();
192-
193-
using R = std::invoke_result_t<std::decay_t<F>>;
194-
195-
std::packaged_task<R()> p(std::forward<F>(func));
196-
auto fu{p.get_future()};
197-
198-
size_t num_dependents = sizeof...(tasks);
199-
200-
AsyncTask task(animate(
201-
std::forward<P>(params), nullptr, nullptr, num_dependents,
202-
std::in_place_type_t<Node::DependentAsync>{},
203-
[p=make_moc(std::move(p))] () mutable { p.object(); }
204-
));
205-
206-
if constexpr(sizeof...(Tasks) > 0) {
207-
(_process_async_dependent(task._node, tasks, num_dependents), ...);
208-
}
209-
210-
if(num_dependents == 0) {
211-
_schedule_async_task(task._node);
212-
}
213-
214-
return std::make_pair(std::move(task), std::move(fu));
174+
std::array<AsyncTask, sizeof...(Tasks)> array = { std::forward<Tasks>(tasks)... };
175+
return dependent_async(
176+
std::forward<P>(params), std::forward<F>(func), array.begin(), array.end()
177+
);
215178
}
216179

217180
// Function: dependent_async

taskflow/core/executor.hpp

Lines changed: 74 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1544,16 +1544,6 @@ inline void Executor::_invoke(Worker& worker, Node* node) {
15441544
return;
15451545
}
15461546

1547-
// if acquiring semaphore(s) exists, acquire them first
1548-
if(node->_semaphores && !node->_semaphores->to_acquire.empty()) {
1549-
SmallVector<Node*> nodes;
1550-
if(!node->_acquire_all(nodes)) {
1551-
_schedule(worker, nodes);
1552-
return;
1553-
}
1554-
node->_state.fetch_or(Node::ACQUIRED, std::memory_order_release);
1555-
}
1556-
15571547
// condition task
15581548
//int cond = -1;
15591549

@@ -1616,11 +1606,6 @@ inline void Executor::_invoke(Worker& worker, Node* node) {
16161606

16171607
//invoke_successors:
16181608

1619-
// if releasing semaphores exist, release them
1620-
if(node->_semaphores && !node->_semaphores->to_release.empty()) {
1621-
_schedule(worker, node->_release_all());
1622-
}
1623-
16241609
// Reset the join counter to support the cyclic control flow.
16251610
// + We must do this before scheduling the successors to avoid race
16261611
// condition on _dependents.
@@ -2193,7 +2178,7 @@ inline void Executor::_tear_down_topology(Worker& worker, Topology* tpg) {
21932178
// case 2: the final run of this topology
21942179
else {
21952180

2196-
// TODO: if the topology is cancelled, need to release all semaphores
2181+
// invoke the callback after each run
21972182
if(tpg->_call != nullptr) {
21982183
tpg->_call();
21992184
}
@@ -2315,6 +2300,74 @@ inline void Runtime::corun_all() {
23152300
_parent->_process_exception();
23162301
}
23172302

2303+
// Function: acquire
2304+
template <typename... S,
2305+
std::enable_if_t<all_same_v<Semaphore, std::decay_t<S>...>, void>*
2306+
>
2307+
void Runtime::acquire(S&&... semaphores) {
2308+
constexpr size_t N = sizeof...(S);
2309+
std::array<Semaphore*, N> items { std::addressof(semaphores)... };
2310+
_executor._corun_until(_worker, [&](){
2311+
// Ideally, we should use a deadlock-avoidance algorithm but
2312+
// in practice the number of semaphores will not be too large and
2313+
// tf::Semaphore does not provide blocking method. Hence, we are
2314+
// mostly safe here. This is similar to the GCC try_lock implementation:
2315+
// https://github.com/gcc-mirror/gcc/blob/master/libstdc%2B%2B-v3/include/std/mutex
2316+
for(size_t i=0; i < N; i++) {
2317+
if(items[i]->try_acquire() == false) {
2318+
for(size_t j=0; j<i; j++) {
2319+
items[j]->release();
2320+
}
2321+
return false;
2322+
}
2323+
}
2324+
return true;
2325+
});
2326+
// TODO: exception?
2327+
}
2328+
2329+
// Function:: acquire
2330+
template <typename I,
2331+
std::enable_if_t<std::is_same_v<deref_t<I>, Semaphore>, void>*
2332+
>
2333+
void Runtime::acquire(I begin, I end) {
2334+
_executor._corun_until(_worker, [begin, end](){
2335+
// Ideally, we should use a deadlock-avoidance algorithm but
2336+
// in practice the number of semaphores will not be too large and
2337+
// tf::Semaphore does not provide blocking method. Hence, we are
2338+
// mostly safe here. This is similar to the GCC try_lock implementation:
2339+
// https://github.com/gcc-mirror/gcc/blob/master/libstdc%2B%2B-v3/include/std/mutex
2340+
for(I ptr = begin; ptr != end; ptr++) {
2341+
if(ptr->try_acquire() == false) {
2342+
for(I ptr2 = begin; ptr2 != ptr; ptr2++) {
2343+
ptr2->release();
2344+
}
2345+
return false;
2346+
}
2347+
}
2348+
return true;
2349+
});
2350+
// TODO: exception?
2351+
}
2352+
2353+
// Function: release
2354+
template <typename... S,
2355+
std::enable_if_t<all_same_v<Semaphore, std::decay_t<S>...>, void>*
2356+
>
2357+
void Runtime::release(S&&... semaphores){
2358+
(semaphores.release(), ...);
2359+
}
2360+
2361+
// Function:: release
2362+
template <typename I,
2363+
std::enable_if_t<std::is_same_v<deref_t<I>, Semaphore>, void>*
2364+
>
2365+
void Runtime::release(I begin, I end) {
2366+
for(I ptr = begin; ptr != end; ptr++) {
2367+
ptr->release();
2368+
}
2369+
}
2370+
23182371
// Destructor
23192372
inline Runtime::~Runtime() {
23202373
_executor._corun_until(_worker, [this] () -> bool {
@@ -2402,6 +2455,11 @@ auto Runtime::async(P&& params, F&& f) {
24022455
return _async(*_executor._this_worker(), std::forward<P>(params), std::forward<F>(f));
24032456
}
24042457

2458+
// ----------------------------------------------------------------------------
2459+
// Runtime: Semaphore series
2460+
// ----------------------------------------------------------------------------
2461+
2462+
24052463

24062464

24072465
} // end of namespace tf -----------------------------------------------------

0 commit comments

Comments
 (0)