@@ -1544,16 +1544,6 @@ inline void Executor::_invoke(Worker& worker, Node* node) {
15441544 return ;
15451545 }
15461546
1547- // if acquiring semaphore(s) exists, acquire them first
1548- if (node->_semaphores && !node->_semaphores ->to_acquire .empty ()) {
1549- SmallVector<Node*> nodes;
1550- if (!node->_acquire_all (nodes)) {
1551- _schedule (worker, nodes);
1552- return ;
1553- }
1554- node->_state .fetch_or (Node::ACQUIRED, std::memory_order_release);
1555- }
1556-
15571547 // condition task
15581548 // int cond = -1;
15591549
@@ -1616,11 +1606,6 @@ inline void Executor::_invoke(Worker& worker, Node* node) {
16161606
16171607 // invoke_successors:
16181608
1619- // if releasing semaphores exist, release them
1620- if (node->_semaphores && !node->_semaphores ->to_release .empty ()) {
1621- _schedule (worker, node->_release_all ());
1622- }
1623-
16241609 // Reset the join counter to support the cyclic control flow.
16251610 // + We must do this before scheduling the successors to avoid race
16261611 // condition on _dependents.
@@ -2193,7 +2178,7 @@ inline void Executor::_tear_down_topology(Worker& worker, Topology* tpg) {
21932178 // case 2: the final run of this topology
21942179 else {
21952180
2196- // TODO: if the topology is cancelled, need to release all semaphores
2181+ // invoke the callback after each run
21972182 if (tpg->_call != nullptr ) {
21982183 tpg->_call ();
21992184 }
@@ -2315,6 +2300,74 @@ inline void Runtime::corun_all() {
23152300 _parent->_process_exception ();
23162301}
23172302
2303+ // Function: acquire
2304+ template <typename ... S,
2305+ std::enable_if_t <all_same_v<Semaphore, std::decay_t <S>...>, void >*
2306+ >
2307+ void Runtime::acquire (S&&... semaphores) {
2308+ constexpr size_t N = sizeof ...(S);
2309+ std::array<Semaphore*, N> items { std::addressof (semaphores)... };
2310+ _executor._corun_until (_worker, [&](){
2311+ // Ideally, we should use a deadlock-avoidance algorithm but
2312+ // in practice the number of semaphores will not be too large and
2313+ // tf::Semaphore does not provide blocking method. Hence, we are
2314+ // mostly safe here. This is similar to the GCC try_lock implementation:
2315+ // https://github.com/gcc-mirror/gcc/blob/master/libstdc%2B%2B-v3/include/std/mutex
2316+ for (size_t i=0 ; i < N; i++) {
2317+ if (items[i]->try_acquire () == false ) {
2318+ for (size_t j=0 ; j<i; j++) {
2319+ items[j]->release ();
2320+ }
2321+ return false ;
2322+ }
2323+ }
2324+ return true ;
2325+ });
2326+ // TODO: exception?
2327+ }
2328+
2329+ // Function:: acquire
2330+ template <typename I,
2331+ std::enable_if_t <std::is_same_v<deref_t <I>, Semaphore>, void >*
2332+ >
2333+ void Runtime::acquire (I begin, I end) {
2334+ _executor._corun_until (_worker, [begin, end](){
2335+ // Ideally, we should use a deadlock-avoidance algorithm but
2336+ // in practice the number of semaphores will not be too large and
2337+ // tf::Semaphore does not provide blocking method. Hence, we are
2338+ // mostly safe here. This is similar to the GCC try_lock implementation:
2339+ // https://github.com/gcc-mirror/gcc/blob/master/libstdc%2B%2B-v3/include/std/mutex
2340+ for (I ptr = begin; ptr != end; ptr++) {
2341+ if (ptr->try_acquire () == false ) {
2342+ for (I ptr2 = begin; ptr2 != ptr; ptr2++) {
2343+ ptr2->release ();
2344+ }
2345+ return false ;
2346+ }
2347+ }
2348+ return true ;
2349+ });
2350+ // TODO: exception?
2351+ }
2352+
2353+ // Function: release
2354+ template <typename ... S,
2355+ std::enable_if_t <all_same_v<Semaphore, std::decay_t <S>...>, void >*
2356+ >
2357+ void Runtime::release (S&&... semaphores){
2358+ (semaphores.release (), ...);
2359+ }
2360+
2361+ // Function:: release
2362+ template <typename I,
2363+ std::enable_if_t <std::is_same_v<deref_t <I>, Semaphore>, void >*
2364+ >
2365+ void Runtime::release (I begin, I end) {
2366+ for (I ptr = begin; ptr != end; ptr++) {
2367+ ptr->release ();
2368+ }
2369+ }
2370+
23182371// Destructor
23192372inline Runtime::~Runtime () {
23202373 _executor._corun_until (_worker, [this ] () -> bool {
@@ -2402,6 +2455,11 @@ auto Runtime::async(P&& params, F&& f) {
24022455 return _async (*_executor._this_worker (), std::forward<P>(params), std::forward<F>(f));
24032456}
24042457
2458+ // ----------------------------------------------------------------------------
2459+ // Runtime: Semaphore series
2460+ // ----------------------------------------------------------------------------
2461+
2462+
24052463
24062464
24072465} // end of namespace tf -----------------------------------------------------
0 commit comments