@@ -160,53 +160,77 @@ class BasicTaskflow : public FlowBuilder {
160160 // 2. add variable names to function signature
161161
162162 /* *
163- @brief silently runs the framework w/ callback to threads and returns immediately
163+ @brief silently runs the framework w/ a callback and returns immediately
164+
165+ @param framework a tf::Framework
164166 */
165167 void silent_run (Framework& framework);
166168
167169 /* *
168- @brief silently runs the framework w/o callback to threads and returns immediately
170+ @brief silently runs the framework w/o any callback and returns immediately
171+
172+ @param framework a tf::Framework
173+ @param callable a callable object to be invoked after every execution
169174 */
170175 template <typename C>
171176 void silent_run (Framework& framework, C&& callable);
172177
173178 /* *
174- @brief silently runs the framework N times w/ a callback to threads and returns immediately
179+ @brief silently runs the framework N times w/ a callback and returns immediately
180+
181+ @param framework a tf::Framework
182+ @param N a size_t to indicate number of repeatition
175183 */
176184 void silent_run_n (Framework& framework, size_t N);
177185
178186 /* *
179- @brief silently runs the framework N times w/o a callback to threads and returns immediately
187+ @brief silently runs the framework N times w/o any callback and returns immediately
188+
189+ @param framework a tf::Framework
190+ @param N a size_t to indicate number of repeatition
191+ @param callable a callable object to be invoked after every execution
180192 */
181193 template <typename C>
182194 void silent_run_n (Framework& framework, size_t N, C&& callable);
183195
184196
185197
186198 /* *
187- @brief runs the framework w/o callback to threads and returns immediately
199+ @brief runs the framework w/o any callback and returns immediately
200+
201+ @param framework a tf::Framework
188202
189203 @return a std::shared_future to access the execution status of the framework
190204 */
191- std::shared_future<void > run (Framework&);
205+ std::shared_future<void > run (Framework& framework );
192206
193207 /* *
194- @brief runs the framework w/ callback to threads and returns immediately
208+ @brief runs the framework w/ a callback and returns immediately
209+
210+ @param framework a tf::Framework
211+ @param callable a callable object to be invoked after every execution
195212
196213 @return a std::shared_future to access the execution status of the framework
197214 */
198215 template <typename C>
199- std::shared_future<void > run (Framework&, C&&);
216+ std::shared_future<void > run (Framework& framework , C&& callable );
200217
201218 /* *
202- @brief runs the framework for N times w/o a callback to threads and returns immediately
219+ @brief runs the framework for N times w/o any callback and returns immediately
220+
221+ @param framework a tf::Framework
222+ @param N a size_t to indicate number of repeatition
203223
204224 @return a std::shared_future to access the execution status of the framework
205225 */
206- std::shared_future<void > run_n (Framework&, size_t );
226+ std::shared_future<void > run_n (Framework& framework , size_t N );
207227
208228 /* *
209- @brief runs the framework for N times w/ a callback to threads and returns immediately
229+ @brief runs the framework for N times w/ a callback and returns immediately
230+
231+ @param framework a tf::Framework
232+ @param N a size_t to indicate number of repeatition
233+ @param callable a callable object to be invoked after every execution
210234
211235 @return a std::shared_future to access the execution status of the framework
212236 */
@@ -301,104 +325,94 @@ std::shared_future<void> BasicTaskflow<E>::run_n(Framework& f, size_t repeat, C&
301325 }
302326 // case 2: this epoch should run
303327 else {
304-
305- // Start from this moment
306- tpg._sources .clear ();
307- f._dependents .clear ();
308-
309- // TODO: clear the subgraph if any
310- // do we need to linearly scan the graph twice...?
311-
312- // Store the dependents for recovery
313- size_t target_depedents {0 };
314- for (auto & n: f._graph ) {
315- f._dependents .push_back (n.num_dependents ());
316- if (n.num_successors () == 0 ) {
317- target_depedents ++;
318- }
319- }
320- f._dependents .push_back (target_depedents);
321-
322328 // Set up target node's work
323329 tpg._target ._work = [&f, c=std::function<void ()>{std::forward<C>(c)}, this ]() mutable {
324330
325- // std::scoped_lock lock(f._mtx);
326-
327- // Recover the number of dependent & reset subgraph in each node
331+ // // Must recover nodes' dependent after every execution
328332 size_t i=0 ;
329333 for (auto & n: f._graph ) {
330334 n._dependents = f._dependents [i++];
331- if (n._subgraph .has_value ()) {
332- n._subgraph .reset ();
333- }
334335 }
335336 f._topologies .front ()->_target ._dependents = f._dependents .back ();
336337
337338 std::invoke (c);
338339
339340 // case 1: we still need to run the topology again
340341 if (--f._topologies .front ()->_repeat != 0 ) {
342+ // Reset subgraph in each node
343+ std::for_each (f._graph .begin (), f._graph .end (), [](Node& n){
344+ if (n._subgraph .has_value ()){ n._subgraph .reset (); }
345+ });
346+
341347 _schedule (f._topologies .front ()->_sources );
342348 }
343349 // case 2: the final run of this topology
344350 // notice that there can be another new run request before we acquire the lock
345351 else {
346-
347- std::promise<void > *pptr {nullptr };
348- {
349- // TODO: simply use f._mtx.lock()
350- std::scoped_lock lock (f._mtx );
352+ f._mtx .lock ();
351353
352- // If there is another run
353- if (f._topologies .size () > 1 ) {
354- // Set the promise
355- f._topologies .front ()->_promise .set_value ();
356-
357- auto next_tpg = std::next (f._topologies .begin ());
358- c = std::move (std::get<0 >((*next_tpg)->_target ._work ));
359- f._topologies .front ()->_repeat = (*next_tpg)->_repeat ;
360-
361- // TODO: replace swap with move?
362- std::swap (f._topologies .front ()->_promise , (*next_tpg)->_promise );
363- f._topologies .erase (next_tpg);
364- _schedule (f._topologies .front ()->_sources );
365- return ;
366- }
367- else {
368-
369- // TODO: make a vector in framework to avoid this linear search ...
370- // Remove the target from the successor list
371- for (auto & n: f._graph ) {
372- if (n._successors .back () == &(f._topologies .front ()->_target )) {
373- n._successors .clear ();
374- }
375- }
376-
377- // Need to back up the promise first here becuz framework might be
378- // destroy before taskflow leaves
379- // auto &p = f._topologies.front()->_promise;
380- pptr = &(f._topologies .front ()->_promise );
381- f._topologies .pop_front ();
382-
383- // Unlock the mutex here before the framework leaves
384- }
354+ // If there is another run
355+ if (f._topologies .size () > 1 ) {
356+ // Reset subgraph in each node
357+ std::for_each (f._graph .begin (), f._graph .end (), [](Node& n){
358+ if (n._subgraph .has_value ()){ n._subgraph .reset (); }
359+ });
360+
361+ // Set the promise
362+ f._topologies .front ()->_promise .set_value ();
363+
364+ auto next_tpg = std::next (f._topologies .begin ());
365+ c = std::move (std::get<0 >((*next_tpg)->_target ._work ));
366+ f._topologies .front ()->_repeat = (*next_tpg)->_repeat ;
367+
368+ // TODO: replace swap with move?
369+ f._topologies .front ()->_promise = std::move ((*next_tpg)->_promise );
370+ f._topologies .erase (next_tpg);
371+ f._mtx .unlock ();
372+ _schedule (f._topologies .front ()->_sources );
385373 }
386- // We set the promise in the end in case framework leaves before taskflow
387- pptr->set_value ();
374+ else {
388375
376+ // // TODO: make a vector in framework to avoid this linear search ...
377+ // // Remove the target from the successor list
378+
379+ // Need to back up the promise first here becuz framework might be
380+ // destroy before taskflow leaves
381+ auto &p = f._topologies .front ()->_promise ;
382+ f._last_target = &(f._topologies .front ()->_target );
383+ f._topologies .pop_front ();
384+ f._mtx .unlock ();
385+ // We set the promise in the end in case framework leaves before taskflow
386+ p.set_value ();
387+ }
389388 }
390389 }; // End of target's callback
391390
392- // Build precedence between nodes and target
391+ tpg._sources .clear ();
392+ f._dependents .clear ();
393+
394+ // // TODO: clear the subgraph if any
395+ // // do we need to linearly scan the graph twice...?
396+
397+ // Clear last execution data & Build precedence between nodes and target
393398 for (auto & n: f._graph ) {
399+ if (!n._successors .empty () && n._successors .front () == f._last_target ) {
400+ n._successors .erase (n._successors .begin ());
401+ }
402+ if (n._subgraph .has_value ()) {
403+ n._subgraph .reset ();
404+ }
405+
394406 n._topology = &tpg;
395407 if (n.num_dependents () == 0 ) {
396408 tpg._sources .push_back (&n);
397409 }
398410 if (n.num_successors () == 0 ) {
399411 n.precede (tpg._target );
400412 }
413+ f._dependents .push_back (n._dependents );
401414 }
415+ f._dependents .push_back (tpg._target ._dependents );
402416
403417 _schedule (tpg._sources );
404418 return tpg._future ;
@@ -414,7 +428,7 @@ BasicTaskflow<E>::Closure::Closure(BasicTaskflow& t, Node& n) :
414428// Operator ()
415429template <template <typename ...> typename E>
416430void BasicTaskflow<E>::Closure::operator () () const {
417-
431+
418432 // assert(taskflow && node);
419433
420434 // Here we need to fetch the num_successors first to avoid the invalid memory
0 commit comments