Skip to content

Commit 029764c

Browse files
committed
Refactor Framework
1 parent 197e490 commit 029764c

File tree

3 files changed

+121
-86
lines changed

3 files changed

+121
-86
lines changed

example/framework.cpp

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,29 +9,35 @@ int main(){
99

1010
// Create a framework
1111
tf::Framework f;
12-
auto A = f.silent_emplace([&](){ std::cout << "TaskA\n"; });
12+
auto A = f.silent_emplace([&](){ std::cout << "TaskA\n"; }).name("A");
1313
auto B = f.silent_emplace([&](auto& subflow){
1414
std::cout << "TaskB\n";
15-
auto B1 = subflow.silent_emplace([&](){ std::cout << "TaskB1\n"; });
16-
auto B2 = subflow.silent_emplace([&](){ std::cout << "TaskB2\n"; });
17-
auto B3 = subflow.silent_emplace([&](){ std::cout << "TaskB3\n"; });
15+
auto B1 = subflow.silent_emplace([&](){ std::cout << "TaskB1\n"; }).name("B1");
16+
auto B2 = subflow.silent_emplace([&](){ std::cout << "TaskB2\n"; }).name("B2");
17+
auto B3 = subflow.silent_emplace([&](){ std::cout << "TaskB3\n"; }).name("B3");
1818
B1.precede(B3);
1919
B2.precede(B3);
20-
});
21-
auto C = f.silent_emplace([&](){ std::cout << "TaskC\n"; });
22-
auto D = f.silent_emplace([&](){ std::cout << "TaskD\n"; });
20+
}).name("B");
21+
auto C = f.silent_emplace([&](){ std::cout << "TaskC\n"; }).name("C");
22+
auto D = f.silent_emplace([&](){ std::cout << "TaskD\n"; }).name("D");
2323

2424
A.precede(B, C);
2525
B.precede(D);
2626
C.precede(D);
2727

2828
std::cout << "Run the framework once without callback\n";
29+
30+
std::cout << "Dump before execution:\n";
31+
f.dump(std::cout);
32+
std::cout << std::endl;
33+
2934
auto future = tf.run(f);
3035
future.get();
3136
std::cout << std::endl;
3237

33-
// TODO:
34-
// tf.dump_topologies(std::cout);
38+
std::cout << "Dump after execution:\n";
39+
tf.dump_topologies(std::cout);
40+
std::cout << std::endl;
3541

3642
std::cout << "Use wait_for_all to wait for the run to finish\n";
3743
tf.run(f);

taskflow/graph/basic_taskflow.hpp

Lines changed: 91 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -160,53 +160,77 @@ class BasicTaskflow : public FlowBuilder {
160160
// 2. add variable names to function signature
161161

162162
/**
163-
@brief silently runs the framework w/ callback to threads and returns immediately
163+
@brief silently runs the framework w/ a callback and returns immediately
164+
165+
@param framework a tf::Framework
164166
*/
165167
void silent_run(Framework& framework);
166168

167169
/**
168-
@brief silently runs the framework w/o callback to threads and returns immediately
170+
@brief silently runs the framework w/o any callback and returns immediately
171+
172+
@param framework a tf::Framework
173+
@param callable a callable object to be invoked after every execution
169174
*/
170175
template<typename C>
171176
void silent_run(Framework& framework, C&& callable);
172177

173178
/**
174-
@brief silently runs the framework N times w/ a callback to threads and returns immediately
179+
@brief silently runs the framework N times w/ a callback and returns immediately
180+
181+
@param framework a tf::Framework
182+
@param N a size_t to indicate number of repeatition
175183
*/
176184
void silent_run_n(Framework& framework, size_t N);
177185

178186
/**
179-
@brief silently runs the framework N times w/o a callback to threads and returns immediately
187+
@brief silently runs the framework N times w/o any callback and returns immediately
188+
189+
@param framework a tf::Framework
190+
@param N a size_t to indicate number of repeatition
191+
@param callable a callable object to be invoked after every execution
180192
*/
181193
template <typename C>
182194
void silent_run_n(Framework& framework, size_t N, C&& callable);
183195

184196

185197

186198
/**
187-
@brief runs the framework w/o callback to threads and returns immediately
199+
@brief runs the framework w/o any callback and returns immediately
200+
201+
@param framework a tf::Framework
188202
189203
@return a std::shared_future to access the execution status of the framework
190204
*/
191-
std::shared_future<void> run(Framework&);
205+
std::shared_future<void> run(Framework& framework);
192206

193207
/**
194-
@brief runs the framework w/ callback to threads and returns immediately
208+
@brief runs the framework w/ a callback and returns immediately
209+
210+
@param framework a tf::Framework
211+
@param callable a callable object to be invoked after every execution
195212
196213
@return a std::shared_future to access the execution status of the framework
197214
*/
198215
template<typename C>
199-
std::shared_future<void> run(Framework&, C&&);
216+
std::shared_future<void> run(Framework& framework, C&& callable);
200217

201218
/**
202-
@brief runs the framework for N times w/o a callback to threads and returns immediately
219+
@brief runs the framework for N times w/o any callback and returns immediately
220+
221+
@param framework a tf::Framework
222+
@param N a size_t to indicate number of repeatition
203223
204224
@return a std::shared_future to access the execution status of the framework
205225
*/
206-
std::shared_future<void> run_n(Framework&, size_t);
226+
std::shared_future<void> run_n(Framework& framework, size_t N);
207227

208228
/**
209-
@brief runs the framework for N times w/ a callback to threads and returns immediately
229+
@brief runs the framework for N times w/ a callback and returns immediately
230+
231+
@param framework a tf::Framework
232+
@param N a size_t to indicate number of repeatition
233+
@param callable a callable object to be invoked after every execution
210234
211235
@return a std::shared_future to access the execution status of the framework
212236
*/
@@ -301,104 +325,94 @@ std::shared_future<void> BasicTaskflow<E>::run_n(Framework& f, size_t repeat, C&
301325
}
302326
// case 2: this epoch should run
303327
else {
304-
305-
// Start from this moment
306-
tpg._sources.clear();
307-
f._dependents.clear();
308-
309-
// TODO: clear the subgraph if any
310-
// do we need to linearly scan the graph twice...?
311-
312-
// Store the dependents for recovery
313-
size_t target_depedents {0};
314-
for(auto& n: f._graph) {
315-
f._dependents.push_back(n.num_dependents());
316-
if(n.num_successors() == 0) {
317-
target_depedents ++;
318-
}
319-
}
320-
f._dependents.push_back(target_depedents);
321-
322328
// Set up target node's work
323329
tpg._target._work = [&f, c=std::function<void()>{std::forward<C>(c)}, this]() mutable {
324330

325-
//std::scoped_lock lock(f._mtx);
326-
327-
// Recover the number of dependent & reset subgraph in each node
331+
//// Must recover nodes' dependent after every execution
328332
size_t i=0;
329333
for(auto& n: f._graph) {
330334
n._dependents = f._dependents[i++];
331-
if(n._subgraph.has_value()) {
332-
n._subgraph.reset();
333-
}
334335
}
335336
f._topologies.front()->_target._dependents = f._dependents.back();
336337

337338
std::invoke(c);
338339

339340
// case 1: we still need to run the topology again
340341
if(--f._topologies.front()->_repeat != 0) {
342+
// Reset subgraph in each node
343+
std::for_each(f._graph.begin(), f._graph.end(), [](Node& n){
344+
if(n._subgraph.has_value()){ n._subgraph.reset(); }
345+
});
346+
341347
_schedule(f._topologies.front()->_sources);
342348
}
343349
// case 2: the final run of this topology
344350
// notice that there can be another new run request before we acquire the lock
345351
else {
346-
347-
std::promise<void> *pptr {nullptr};
348-
{
349-
// TODO: simply use f._mtx.lock()
350-
std::scoped_lock lock(f._mtx);
352+
f._mtx.lock();
351353

352-
// If there is another run
353-
if(f._topologies.size() > 1) {
354-
// Set the promise
355-
f._topologies.front()->_promise.set_value();
356-
357-
auto next_tpg = std::next(f._topologies.begin());
358-
c = std::move(std::get<0>((*next_tpg)->_target._work));
359-
f._topologies.front()->_repeat = (*next_tpg)->_repeat;
360-
361-
// TODO: replace swap with move?
362-
std::swap(f._topologies.front()->_promise, (*next_tpg)->_promise);
363-
f._topologies.erase(next_tpg);
364-
_schedule(f._topologies.front()->_sources);
365-
return ;
366-
}
367-
else {
368-
369-
// TODO: make a vector in framework to avoid this linear search ...
370-
// Remove the target from the successor list
371-
for(auto& n: f._graph) {
372-
if(n._successors.back() == &(f._topologies.front()->_target)) {
373-
n._successors.clear();
374-
}
375-
}
376-
377-
// Need to back up the promise first here becuz framework might be
378-
// destroy before taskflow leaves
379-
//auto &p = f._topologies.front()->_promise;
380-
pptr = &(f._topologies.front()->_promise);
381-
f._topologies.pop_front();
382-
383-
// Unlock the mutex here before the framework leaves
384-
}
354+
// If there is another run
355+
if(f._topologies.size() > 1) {
356+
// Reset subgraph in each node
357+
std::for_each(f._graph.begin(), f._graph.end(), [](Node& n){
358+
if(n._subgraph.has_value()){ n._subgraph.reset(); }
359+
});
360+
361+
// Set the promise
362+
f._topologies.front()->_promise.set_value();
363+
364+
auto next_tpg = std::next(f._topologies.begin());
365+
c = std::move(std::get<0>((*next_tpg)->_target._work));
366+
f._topologies.front()->_repeat = (*next_tpg)->_repeat;
367+
368+
// TODO: replace swap with move?
369+
f._topologies.front()->_promise = std::move((*next_tpg)->_promise);
370+
f._topologies.erase(next_tpg);
371+
f._mtx.unlock();
372+
_schedule(f._topologies.front()->_sources);
385373
}
386-
// We set the promise in the end in case framework leaves before taskflow
387-
pptr->set_value();
374+
else {
388375

376+
//// TODO: make a vector in framework to avoid this linear search ...
377+
//// Remove the target from the successor list
378+
379+
// Need to back up the promise first here becuz framework might be
380+
// destroy before taskflow leaves
381+
auto &p = f._topologies.front()->_promise;
382+
f._last_target = &(f._topologies.front()->_target);
383+
f._topologies.pop_front();
384+
f._mtx.unlock();
385+
// We set the promise in the end in case framework leaves before taskflow
386+
p.set_value();
387+
}
389388
}
390389
}; // End of target's callback
391390

392-
// Build precedence between nodes and target
391+
tpg._sources.clear();
392+
f._dependents.clear();
393+
394+
//// TODO: clear the subgraph if any
395+
//// do we need to linearly scan the graph twice...?
396+
397+
// Clear last execution data & Build precedence between nodes and target
393398
for(auto& n: f._graph) {
399+
if(!n._successors.empty() && n._successors.front() == f._last_target) {
400+
n._successors.erase(n._successors.begin());
401+
}
402+
if(n._subgraph.has_value()) {
403+
n._subgraph.reset();
404+
}
405+
394406
n._topology = &tpg;
395407
if(n.num_dependents() == 0) {
396408
tpg._sources.push_back(&n);
397409
}
398410
if(n.num_successors() == 0) {
399411
n.precede(tpg._target);
400412
}
413+
f._dependents.push_back(n._dependents);
401414
}
415+
f._dependents.push_back(tpg._target._dependents);
402416

403417
_schedule(tpg._sources);
404418
return tpg._future;
@@ -414,7 +428,7 @@ BasicTaskflow<E>::Closure::Closure(BasicTaskflow& t, Node& n) :
414428
// Operator ()
415429
template <template <typename...> typename E>
416430
void BasicTaskflow<E>::Closure::operator () () const {
417-
431+
418432
//assert(taskflow && node);
419433

420434
// Here we need to fetch the num_successors first to avoid the invalid memory

taskflow/graph/framework.hpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,29 @@ class Framework : public FlowBuilder {
3636
std::mutex _mtx;
3737
std::list<Topology*> _topologies;
3838
std::vector<size_t> _dependents;
39+
Node* _last_target {nullptr};
3940
};
4041

4142
// Constructor
4243
inline Framework::Framework() : FlowBuilder{_graph} {
4344
}
4445

4546
// Procedure: dump
47+
inline void Framework::dump(std::ostream& os) const {
48+
os << "digraph Topology {\n";
49+
for(const auto& n: _graph) {
50+
n.dump(os);
51+
}
52+
os << "}\n";
53+
}
54+
4655

56+
// Function: dump
57+
inline std::string Framework::dump() const {
58+
std::ostringstream os;
59+
dump(os);
60+
return os.str();
61+
}
4762

4863

4964
}; // end of namespace tf. ---------------------------------------------------

0 commit comments

Comments
 (0)