Skip to content

Commit 1bb59bf

Browse files
committed
Add framework composition implementation
1 parent 3c996b5 commit 1bb59bf

File tree

6 files changed

+336
-11
lines changed

6 files changed

+336
-11
lines changed

CMakeLists.txt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,11 @@ add_executable(framework ${TF_EXAMPLE_DIR}/framework.cpp)
154154
target_link_libraries(framework ${PROJECT_NAME} Threads::Threads)
155155

156156
add_executable(dataflow ${TF_EXAMPLE_DIR}/dataflow.cpp)
157-
target_link_libraries(dataflow ${PROJECT_NAME} Threads::Threads)
157+
target_link_libraries(dataflow ${PROJECT_NAME} Threads::Threads)
158+
159+
add_executable(composition ${TF_EXAMPLE_DIR}/composition.cpp)
160+
target_link_libraries(composition ${PROJECT_NAME} Threads::Threads)
161+
158162

159163
endif()
160164

example/composition.cpp

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
#include <taskflow/taskflow.hpp> // the only include you need
2+
3+
void composition_example_1(tf::Taskflow& tf) {
4+
std::cout << '\n';
5+
std::cout << "Composition example 1\n";
6+
7+
// f1 has two independent tasks
8+
tf::Framework f1;
9+
auto [f1A, f1B] = f1.name("F1")
10+
.emplace(
11+
[&](){ std::cout << "F1 TaskA\n"; },
12+
[&](){ std::cout << "F1 TaskB\n"; }
13+
);
14+
f1A.name("f1A");
15+
f1B.name("f1B");
16+
17+
// f2A ---
18+
// |----> f2C ----> f1_module_task
19+
// f2B ---
20+
tf::Framework f2;
21+
auto [f2A, f2B, f2C] = f2.name("F2")
22+
.emplace(
23+
[&](){ std::cout << " F2 TaskA\n"; },
24+
[&](){ std::cout << " F2 TaskB\n"; },
25+
[&](){ std::cout << " F2 TaskC\n"; }
26+
);
27+
f2A.name("f2A");
28+
f2B.name("f2B");
29+
f2C.name("f2C");
30+
31+
f2A.precede(f2C);
32+
f2B.precede(f2C);
33+
34+
auto f1_module_task = f2.composed_of(f1);
35+
f2C.precede(f1_module_task);
36+
37+
tf.run_n(f2, 3).get();
38+
}
39+
40+
41+
42+
void composition_example_2(tf::Taskflow& tf) {
43+
std::cout << '\n';
44+
std::cout << "Composition example 2\n";
45+
46+
// f1 has two independent tasks
47+
tf::Framework f1;
48+
auto [f1A, f1B] = f1.name("F1")
49+
.emplace(
50+
[&](){ std::cout << "F1 TaskA\n"; },
51+
[&](){ std::cout << "F1 TaskB\n"; }
52+
);
53+
f1A.name("f1A");
54+
f1B.name("f1B");
55+
56+
// f2A ---
57+
// |----> f2C
58+
// f2B ---
59+
// f1_module_task
60+
tf::Framework f2;
61+
auto [f2A, f2B, f2C] = f2.name("F2")
62+
.emplace(
63+
[&](){ std::cout << " F2 TaskA\n"; },
64+
[&](){ std::cout << " F2 TaskB\n"; },
65+
[&](){ std::cout << " F2 TaskC\n"; }
66+
);
67+
f2A.name("f2A");
68+
f2B.name("f2B");
69+
f2C.name("f2C");
70+
71+
f2A.precede(f2C);
72+
f2B.precede(f2C);
73+
f2.composed_of(f1);
74+
75+
// f3 has a module task (f2) and a regular task
76+
tf::Framework f3;
77+
f3.name("F3");
78+
f3.composed_of(f2);
79+
f3.emplace([](){ std::cout << " F3 TaskA\n"; }).name("f3A");
80+
81+
82+
// f4: f3_module_task -> f2_module_task
83+
tf::Framework f4;
84+
f4.name("F4");
85+
auto f3_module_task = f4.composed_of(f3);
86+
auto f2_module_task = f4.composed_of(f2);
87+
f3_module_task.precede(f2_module_task);
88+
89+
tf.run_until(f4, [iter = 1] () mutable { std::cout << '\n'; return iter-- == 0; }, [](){
90+
std::cout << "First run_until finished\n";
91+
}).get();
92+
tf.run_until(f4, [iter = 2] () mutable { std::cout << '\n'; return iter-- == 0; }, [](){
93+
std::cout << "Second run_until finished\n";
94+
});
95+
tf.run_until(f4, [iter = 3] () mutable { std::cout << '\n'; return iter-- == 0; }, [](){
96+
std::cout << "Third run_until finished\n";
97+
}).get();
98+
}
99+
100+
int main(){
101+
tf::Taskflow taskflow;
102+
composition_example_1(taskflow);
103+
composition_example_2(taskflow);
104+
}
105+
106+
107+

taskflow/graph/basic_taskflow.hpp

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ class BasicTaskflow : public FlowBuilder {
247247

248248
void _schedule(Node&);
249249
void _schedule(PassiveVector<Node*>&);
250+
void _set_module_node(Node*, Framework*);
250251
};
251252

252253
// ============================================================================
@@ -391,8 +392,17 @@ void BasicTaskflow<E>::Closure::operator () () const {
391392
// regular node type
392393
// The default node work type. We only need to execute the callback if any.
393394
if(auto index=node->_work.index(); index == 0) {
394-
if(auto &f = std::get<StaticWork>(node->_work); f != nullptr){
395-
std::invoke(f);
395+
if(node->is_module()) {
396+
bool first_time = !node->is_spawned();
397+
std::invoke(std::get<StaticWork>(node->_work));
398+
if(first_time) {
399+
return ;
400+
}
401+
}
402+
else {
403+
if(auto &f = std::get<StaticWork>(node->_work); f != nullptr){
404+
std::invoke(f);
405+
}
396406
}
397407
}
398408
// subflow node type
@@ -451,7 +461,7 @@ void BasicTaskflow<E>::Closure::operator () () const {
451461
}
452462
}
453463
node->_num_dependents = node->_dependents.size();
454-
node->clear_status();
464+
node->unset_spawned();
455465
}
456466

457467
// At this point, the node storage might be destructed.
@@ -621,6 +631,10 @@ void BasicTaskflow<E>::wait_for_topologies() {
621631
// Each task node has two types of tasks - regular and subflow.
622632
template <template <typename...> typename E>
623633
void BasicTaskflow<E>::_schedule(Node& node) {
634+
if(node.is_module() && !node.is_spawned()) {
635+
_set_module_node(&node, node._module);
636+
assert(node._work.index() == 0);
637+
}
624638
_executor->emplace(*this, node);
625639
}
626640

@@ -633,11 +647,51 @@ void BasicTaskflow<E>::_schedule(PassiveVector<Node*>& nodes) {
633647
std::vector<Closure> closures;
634648
closures.reserve(nodes.size());
635649
for(auto src : nodes) {
650+
if(src->is_module() && !src->is_spawned()) {
651+
assert(src->_module != nullptr);
652+
_set_module_node(src, src->_module);
653+
assert(src->_work.index() == 0);
654+
}
636655
closures.emplace_back(*this, *src);
637656
}
638657
_executor->batch(closures);
639658
}
640659

660+
661+
template <template <typename...> typename E>
662+
void BasicTaskflow<E>::_set_module_node(Node* n, Framework* f) {
663+
664+
n->_work = [node=n, this, tgt {PassiveVector<Node*>()}] () mutable {
665+
666+
// second time to enter this context
667+
if(node->is_spawned()) {
668+
node->_dependents.resize(node->_dependents.size()-tgt.size());
669+
for(auto& t: tgt) {
670+
t->_successors.clear();
671+
}
672+
return ;
673+
}
674+
// first time to enter this context
675+
node->set_spawned();
676+
677+
PassiveVector<Node*> src;
678+
679+
for(auto &n: node->_module->_graph) {
680+
n._topology = node->_topology;
681+
if(n.num_dependents() == 0) {
682+
src.push_back(&n);
683+
}
684+
if(n.num_successors() == 0) {
685+
n.precede(*node);
686+
tgt.push_back(&n);
687+
}
688+
}
689+
690+
_schedule(src);
691+
};
692+
}
693+
694+
641695
// Function: dump_topologies
642696
template <template <typename...> typename E>
643697
std::string BasicTaskflow<E>::dump_topologies() const {

0 commit comments

Comments
 (0)