@@ -230,6 +230,32 @@ class BasicTaskflow : public FlowBuilder {
230230 std::shared_future<void > run_n (Framework& framework, size_t N, C&& callable);
231231
232232
233+
234+ /* *
235+ @brief runs the framework w/o a callback until the predicate becomes true and returns immediately
236+
237+ @param framework a tf::Framework
238+ @param P predicate (a callable object returns true or false)
239+
240+ @return a std::shared_future to access the execution status of the framework
241+ */
242+ template <typename P>
243+ std::shared_future<void > run_until (Framework& framework, P&& predicate);
244+
245+
246+ /* *
247+ @brief runs the framework w/ a callback until the predicate becomes true and returns immediately
248+
249+ @param framework a tf::Framework
250+ @param P predicate (a callable object returns true or false)
251+ @param callable a callable object to be invoked after every run
252+
253+ @return a std::shared_future to access the execution status of the framework
254+ */
255+ template <typename P, typename C>
256+ std::shared_future<void > run_until (Framework& framework, P&& predicate, C&& callable);
257+
258+
233259 private:
234260
235261 Graph _graph;
@@ -298,16 +324,32 @@ std::shared_future<void> BasicTaskflow<E>::run_n(Framework& f, size_t repeat) {
298324template <template <typename ...> typename E>
299325template <typename C>
300326std::shared_future<void > BasicTaskflow<E>::run_n(Framework& f, size_t repeat, C&& c) {
327+ return run_until (f, [repeat]() mutable { return repeat-- == 0 ; }, std::forward<C>(c));
328+ }
301329
302- static_assert (std::is_invocable<C>::value);
303330
304- if (repeat == 0 ) {
331+ // Function: run_until
332+ template <template <typename ...> typename E>
333+ template <typename P>
334+ std::shared_future<void > BasicTaskflow<E>::run_until(Framework& f, P&& predicate) {
335+ return run_until (f, std::forward<P>(predicate), [](){});
336+ }
337+
338+ // Function: run_until
339+ template <template <typename ...> typename E>
340+ template <typename P, typename C>
341+ std::shared_future<void > BasicTaskflow<E>::run_until(Framework& f, P&& predicate, C&& c) {
342+
343+ // Predicate must return a boolean value
344+ static_assert (std::is_invocable<C>::value && std::is_same_v<bool , std::invoke_result_t <P>>);
345+
346+ if (std::invoke (predicate)) {
305347 return std::async (std::launch::deferred, [](){}).share ();
306348 }
307349
308350 std::scoped_lock lock (f._mtx );
309351
310- auto &tpg = _topologies.emplace_front (f, repeat );
352+ auto &tpg = _topologies.emplace_front (f, std::forward<P>(predicate) );
311353 f._topologies .push_back (&tpg);
312354
313355 const auto setup_topology = [](auto & f, auto & tpg) {
@@ -324,17 +366,17 @@ std::shared_future<void> BasicTaskflow<E>::run_n(Framework& f, size_t repeat, C&
324366 }
325367 };
326368
327-
328369 // Iterative execution to avoid stack overflow
329370 if (num_workers () == 0 ) {
330371 // Clear last execution data & Build precedence between nodes and target
331372 setup_topology (f, tpg);
332373
333374 const int tgt_predecessor = tpg._num_sinks ;
334- for (size_t i=0 ; i<repeat; i++) {
375+ do {
376+ // for(size_t i=0; i<repeat; i++) {
335377 _schedule (tpg._sources );
336378 f._topologies .front ()->_num_sinks = tgt_predecessor;
337- }
379+ } while (! std::invoke (tpg. _predicate ));
338380
339381 std::invoke (c);
340382 auto &p = f._topologies .front ()->_promise ;
@@ -356,7 +398,8 @@ std::shared_future<void> BasicTaskflow<E>::run_n(Framework& f, size_t repeat, C&
356398
357399 // PV 1/31 (twhuang): thread safety?
358400 // case 1: we still need to run the topology again
359- if (--f._topologies .front ()->_repeat != 0 ) {
401+ if (!std::invoke (f._topologies .front ()->_predicate )) {
402+ // if(--f._topologies.front()->_repeat != 0) {
360403 f._topologies .front ()->_num_sinks = tgt_predecessor;
361404 _schedule (f._topologies .front ()->_sources );
362405 }
@@ -377,7 +420,7 @@ std::shared_future<void> BasicTaskflow<E>::run_n(Framework& f, size_t repeat, C&
377420 // c = std::move(std::get<StaticWork>((*next_tpg)->_target._work));
378421 c = std::move ((*next_tpg)->_work );
379422
380- f._topologies .front ()->_repeat = ( *next_tpg)->_repeat ;
423+ f._topologies .front ()->_predicate = std::move (( *next_tpg)->_predicate ) ;
381424 f._topologies .front ()->_promise = std::move ((*next_tpg)->_promise );
382425 f._topologies .erase (next_tpg);
383426
@@ -432,7 +475,7 @@ void BasicTaskflow<E>::Closure::operator () () const {
432475 else {
433476
434477 // Clear the subgraph before the task execution
435- if (!node->_spawned ) {
478+ if (!node->is_spawned () ) {
436479 node->_subgraph .emplace ();
437480 }
438481
@@ -441,14 +484,14 @@ void BasicTaskflow<E>::Closure::operator () () const {
441484 std::invoke (std::get<DynamicWork>(node->_work ), fb);
442485
443486 // Need to create a subflow if first time & subgraph is not empty
444- if (!node->_spawned ) {
445- node->_spawned = true ;
487+ if (!node->is_spawned () ) {
488+ node->set_spawned () ;
446489 if (!node->_subgraph ->empty ()) {
447490 // For storing the source nodes
448491 std::vector<Node*> src;
449492 for (auto n = node->_subgraph ->begin (); n != node->_subgraph ->end (); ++n) {
450493 n->_topology = node->_topology ;
451- n->_subtask = true ;
494+ n->set_subtask () ;
452495 if (n->num_successors () == 0 ) {
453496 if (fb.detached ()) {
454497 node->_topology ->_num_sinks ++;
@@ -479,15 +522,15 @@ void BasicTaskflow<E>::Closure::operator () () const {
479522 // This must be done before scheduling the successors, otherwise this might cause
480523 // race condition on the _dependents
481524 // if(num_successors && !node->_subtask) {
482- if (!node->_subtask ) {
525+ if (!node->is_subtask () ) {
483526 // Only dynamic tasking needs to restore _predecessors
484527 if (node->_work .index () == 1 && !node->_subgraph ->empty ()) {
485- while (!node->_predecessors .empty () && node->_predecessors .back ()->_subtask ) {
528+ while (!node->_predecessors .empty () && node->_predecessors .back ()->is_subtask () ) {
486529 node->_predecessors .pop_back ();
487530 }
488531 }
489532 node->_dependents = node->_predecessors .size ();
490- node->_spawned = false ;
533+ node->clear_status () ;
491534 }
492535
493536 // At this point, the node storage might be destructed.
0 commit comments