Skip to content

Commit d3657cd

Browse files
refactored
1 parent 46e13ac commit d3657cd

4 files changed

Lines changed: 35 additions & 40 deletions

File tree

example/framework.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ int main(){
4646

4747
std::cout << "Execute the framework 2 times without a callback\n";
4848
tf.run_n(f, 2).get();
49+
std::cout << "Dump after 2 executions:\n";
50+
tf.dump_topologies(std::cout);
4951
std::cout << std::endl;
5052

5153
std::cout << "Execute the framework 4 times with a callback\n";

taskflow/graph/basic_taskflow.hpp

Lines changed: 28 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -153,42 +153,36 @@ class BasicTaskflow : public FlowBuilder {
153153
*/
154154
std::string dump_topologies() const;
155155

156-
157-
158-
// TODO:
159-
// 1. add parameters
160-
// 2. add variable names to function signature
161-
162156
/**
163157
@brief silently runs the framework w/ a callback and returns immediately
164158
165-
@param framework a tf::Framework
159+
@param framework a tf::Framework object
166160
*/
167161
void silent_run(Framework& framework);
168162

169163
/**
170164
@brief silently runs the framework w/o any callback and returns immediately
171165
172-
@param framework a tf::Framework
173-
@param callable a callable object to be invoked after every execution
166+
@param framework a tf::Framework object
167+
@param callable a callable object to be invoked after every run
174168
*/
175169
template<typename C>
176170
void silent_run(Framework& framework, C&& callable);
177171

178172
/**
179173
@brief silently runs the framework N times w/ a callback and returns immediately
180174
181-
@param framework a tf::Framework
175+
@param framework a tf::Framework object
182176
@param N a size_t to indicate number of repeatition
183177
*/
184178
void silent_run_n(Framework& framework, size_t N);
185179

186180
/**
187181
@brief silently runs the framework N times w/o any callback and returns immediately
188182
189-
@param framework a tf::Framework
190-
@param N a size_t to indicate number of repeatition
191-
@param callable a callable object to be invoked after every execution
183+
@param framework a tf::Framework object
184+
@param N the number of runs
185+
@param callable a callable object to be invoked after every run
192186
*/
193187
template <typename C>
194188
void silent_run_n(Framework& framework, size_t N, C&& callable);
@@ -208,7 +202,7 @@ class BasicTaskflow : public FlowBuilder {
208202
@brief runs the framework w/ a callback and returns immediately
209203
210204
@param framework a tf::Framework
211-
@param callable a callable object to be invoked after every execution
205+
@param callable a callable object to be invoked after every run
212206
213207
@return a std::shared_future to access the execution status of the framework
214208
*/
@@ -219,7 +213,7 @@ class BasicTaskflow : public FlowBuilder {
219213
@brief runs the framework for N times w/o any callback and returns immediately
220214
221215
@param framework a tf::Framework
222-
@param N a size_t to indicate number of repeatition
216+
@param N number of runs
223217
224218
@return a std::shared_future to access the execution status of the framework
225219
*/
@@ -229,13 +223,13 @@ class BasicTaskflow : public FlowBuilder {
229223
@brief runs the framework for N times w/ a callback and returns immediately
230224
231225
@param framework a tf::Framework
232-
@param N a size_t to indicate number of repeatition
233-
@param callable a callable object to be invoked after every execution
226+
@param N number of runs
227+
@param callable a callable object to be invoked after every run
234228
235229
@return a std::shared_future to access the execution status of the framework
236230
*/
237231
template<typename C>
238-
std::shared_future<void> run_n(Framework&, size_t, C&&);
232+
std::shared_future<void> run_n(Framework& framework, size_t N, C&& callable);
239233

240234

241235
private:
@@ -321,14 +315,13 @@ std::shared_future<void> BasicTaskflow<E>::run_n(Framework& f, size_t repeat, C&
321315
// case 1: the previous execution is still running
322316
if(f._topologies.size() > 1) {
323317
tpg._target._work = std::forward<C>(c);
324-
return tpg._future;
325318
}
326319
// case 2: this epoch should run
327320
else {
328321
// Set up target node's work
329322
tpg._target._work = [&f, c=std::function<void()>{std::forward<C>(c)}, this]() mutable {
330323

331-
//// Must recover nodes' dependent after every execution
324+
// Must recover nodes' dependent after every execution
332325
size_t i=0;
333326
for(auto& n: f._graph) {
334327
n._dependents = f._dependents[i++];
@@ -339,6 +332,7 @@ std::shared_future<void> BasicTaskflow<E>::run_n(Framework& f, size_t repeat, C&
339332

340333
// case 1: we still need to run the topology again
341334
if(--f._topologies.front()->_repeat != 0) {
335+
342336
// Reset subgraph in each node
343337
std::for_each(f._graph.begin(), f._graph.end(), [](Node& n){
344338
if(n._subgraph.has_value()){ n._subgraph.reset(); }
@@ -355,6 +349,7 @@ std::shared_future<void> BasicTaskflow<E>::run_n(Framework& f, size_t repeat, C&
355349

356350
// If there is another run
357351
if(f._topologies.size() > 1) {
352+
358353
// Reset subgraph in each node
359354
std::for_each(f._graph.begin(), f._graph.end(), [](Node& n){
360355
if(n._subgraph.has_value()){ n._subgraph.reset(); }
@@ -365,21 +360,18 @@ std::shared_future<void> BasicTaskflow<E>::run_n(Framework& f, size_t repeat, C&
365360

366361
auto next_tpg = std::next(f._topologies.begin());
367362
c = std::move(std::get<0>((*next_tpg)->_target._work));
368-
f._topologies.front()->_repeat = (*next_tpg)->_repeat;
369363

370-
// TODO: replace swap with move?
364+
f._topologies.front()->_repeat = (*next_tpg)->_repeat;
371365
f._topologies.front()->_promise = std::move((*next_tpg)->_promise);
372366
f._topologies.erase(next_tpg);
367+
373368
if(num_workers()) {
374369
f._mtx.unlock();
375370
}
376371
_schedule(f._topologies.front()->_sources);
377372
}
378373
else {
379374

380-
//// TODO: make a vector in framework to avoid this linear search ...
381-
//// Remove the target from the successor list
382-
383375
// Need to back up the promise first here becuz framework might be
384376
// destroy before taskflow leaves
385377
auto &p = f._topologies.front()->_promise;
@@ -392,23 +384,27 @@ std::shared_future<void> BasicTaskflow<E>::run_n(Framework& f, size_t repeat, C&
392384
p.set_value();
393385
}
394386
}
395-
}; // End of target's callback
387+
}; // End of target's work ------------------------------------------------
396388

397389
tpg._sources.clear();
398390
f._dependents.clear();
399391

400-
//// TODO: clear the subgraph if any
401-
//// do we need to linearly scan the graph twice...?
402-
403392
// Clear last execution data & Build precedence between nodes and target
404393
for(auto& n: f._graph) {
394+
395+
// TODO: swap this with the last and then pop_back
396+
// Here we use "front" because users may add nodes to the graph
405397
if(!n._successors.empty() && n._successors.front() == f._last_target) {
406398
n._successors.erase(n._successors.begin());
407399
}
400+
// remove_if std::vector
401+
402+
// reset the dynamic tasking
408403
if(n._subgraph.has_value()) {
409404
n._subgraph.reset();
410405
}
411-
406+
407+
// reset the target links
412408
n._topology = &tpg;
413409
if(n.num_dependents() == 0) {
414410
tpg._sources.push_back(&n);
@@ -421,8 +417,9 @@ std::shared_future<void> BasicTaskflow<E>::run_n(Framework& f, size_t repeat, C&
421417
f._dependents.push_back(tpg._target._dependents);
422418

423419
_schedule(tpg._sources);
424-
return tpg._future;
425420
}
421+
422+
return tpg._future;
426423
}
427424

428425
// Constructor

taskflow/graph/framework.hpp

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@
22

33
#include "flow_builder.hpp"
44

5-
// TODO:
6-
// clear the graph only at the beginning of each run.
7-
85
namespace tf {
96

107
// TODO: document the class
@@ -21,10 +18,8 @@ class Framework : public FlowBuilder {
2118

2219
Framework();
2320

24-
// TODO
2521
void dump(std::ostream& ostream) const;
2622

27-
// TODO
2823
std::string dump() const;
2924

3025
protected:
@@ -45,7 +40,7 @@ inline Framework::Framework() : FlowBuilder{_graph} {
4540

4641
// Procedure: dump
4742
inline void Framework::dump(std::ostream& os) const {
48-
os << "digraph Topology {\n";
43+
os << "digraph Framework {\n";
4944
for(const auto& n: _graph) {
5045
n.dump(os);
5146
}

unittest/taskflow.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -911,9 +911,6 @@ TEST_CASE("DetachedSubflow" * doctest::timeout(300)) {
911911
// --------------------------------------------------------
912912
TEST_CASE("Framework" * doctest::timeout(300)) {
913913

914-
// TODO
915-
// fixed the number of workers and test it from 0 to 4
916-
917914
// Empty subflow test
918915
for(unsigned W=0; W<=4; ++W) {
919916

@@ -957,6 +954,10 @@ TEST_CASE("Framework" * doctest::timeout(300)) {
957954

958955
REQUIRE(count == 7000);
959956
}
957+
958+
959+
// TODO: test correctness when framework got changed between runs
960+
960961
}
961962

962963

0 commit comments

Comments
 (0)