Skip to content

Commit 5218dfb

Browse files
author
TSUNG-WEI HUANG
committed
fixed subflow exception bug
1 parent e61f43d commit 5218dfb

5 files changed

Lines changed: 373 additions & 169 deletions

File tree

examples/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ list(APPEND TF_EXAMPLES
3636
limited_concurrency
3737
cancel
3838
exception
39+
subflow_exception
3940
)
4041

4142
foreach(example IN LISTS TF_EXAMPLES)

examples/subflow_exception.cpp

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// This program demonstrates the exception in subflow.
2+
3+
#include <taskflow/taskflow.hpp>
4+
5+
int main() {
6+
7+
tf::Executor executor;
8+
tf::Taskflow taskflow;
9+
10+
taskflow.emplace([](tf::Subflow& sf) {
11+
tf::Task A = sf.emplace([]() { throw std::runtime_error("exception on A"); });
12+
tf::Task B = sf.emplace([]() { std::cout << "Task B\n"; });
13+
A.precede(B);
14+
sf.join();
15+
});
16+
17+
try
18+
{
19+
executor.run(taskflow).get();
20+
}
21+
catch (const std::runtime_error& re)
22+
{
23+
std::cout << re.what() << std::endl;
24+
}
25+
26+
return 0;
27+
}

taskflow/core/executor.hpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1711,15 +1711,16 @@ inline void Executor::_observer_epilogue(Worker& worker, Node* node) {
17111711
// Procedure: _process_exception
17121712
inline void Executor::_process_exception(Worker&, Node* node) {
17131713

1714-
constexpr static auto flag = Topology::EXCEPTION | Topology::CANCELLED;
1714+
constexpr static auto nflag = Node::EXCEPTION | Node::CANCELLED;
1715+
constexpr static auto tflag = Topology::EXCEPTION | Topology::CANCELLED;
17151716

17161717
//std::cout << "processing exception from " << node->_name << std::endl;
17171718

17181719
// the exception occurs under a blocking corun
17191720
if(auto anchor = node->_anchor(); anchor) {
17201721
//std::cout << "\tfind anchor: " << anchor->_name << '\n';
17211722
// multiple tasks may throw, and we only take the first thrown exception
1722-
if((anchor->_state.fetch_or(Node::EXCEPTION, std::memory_order_relaxed) & Node::EXCEPTION) == 0) {
1723+
if((anchor->_state.fetch_or(nflag, std::memory_order_relaxed) & Node::EXCEPTION) == 0) {
17231724
//std::cout << node->_name << " stores exception in anchor " << anchor->_name << std::endl;
17241725
anchor->_exception_ptr = std::current_exception();
17251726
}
@@ -1728,7 +1729,7 @@ inline void Executor::_process_exception(Worker&, Node* node) {
17281729
else if(auto tpg = node->_topology; tpg) {
17291730
//std::cout << "\tno anchor - go to topology\n";
17301731
// multiple tasks may throw, and we only take the first thrown exception
1731-
if((tpg->_state.fetch_or(flag, std::memory_order_relaxed) & Topology::EXCEPTION) == 0) {
1732+
if((tpg->_state.fetch_or(tflag, std::memory_order_relaxed) & Topology::EXCEPTION) == 0) {
17321733
//std::cout << "store exception in topology " << std::endl;
17331734
tpg->_exception_ptr = std::current_exception();
17341735
}
@@ -2325,7 +2326,6 @@ void Runtime::acquire(S&&... semaphores) {
23252326
_executor._corun_until(_worker, [&](){
23262327
return tf::try_acquire(std::forward<S>(semaphores)...);
23272328
});
2328-
// TODO: exception?
23292329
}
23302330

23312331
// Function:: acquire
@@ -2336,7 +2336,6 @@ void Runtime::acquire(I first, I last) {
23362336
_executor._corun_until(_worker, [=](){
23372337
return tf::try_acquire(first, last);
23382338
});
2339-
// TODO: exception?
23402339
}
23412340

23422341
// Function: release

taskflow/core/graph.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -651,6 +651,7 @@ class Node {
651651
constexpr static int EXCEPTION = 4;
652652
constexpr static int PREEMPTED = 8;
653653
constexpr static int ANCHOR = 16;
654+
constexpr static int CANCELLED = 32;
654655

655656
using Placeholder = std::monostate;
656657

@@ -1052,8 +1053,8 @@ inline bool Node::_is_preempted() const {
10521053
// Function: _is_cancelled
10531054
// we currently only support cancellation of taskflow (no async task)
10541055
inline bool Node::_is_cancelled() const {
1055-
return _topology &&
1056-
(_topology->_state.load(std::memory_order_relaxed) & Topology::CANCELLED);
1056+
return (_topology && (_topology->_state.load(std::memory_order_relaxed) & Topology::CANCELLED)) ||
1057+
(_parent && (_parent->_state.load(std::memory_order_relaxed) & Node::CANCELLED));
10571058
}
10581059

10591060
// Procedure: _set_up_join_counter

0 commit comments

Comments
 (0)