Skip to content

Commit 4eaa573

Browse files
committed
Add run_until
1 parent e7bf1fb commit 4eaa573

4 files changed

Lines changed: 80 additions & 20 deletions

File tree

example/framework.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,13 @@ int main(){
6767
std::cout << "Silently run the framework with callback\n";
6868
tf.silent_run_n(f, 1, []() { std::cout << "The framework finishes\n"; });
6969
tf.wait_for_all();
70+
std::cout << std::endl;
71+
72+
std::cout << "Run the framework until a counter (init value=3) becomes zero\n";
73+
tf.run_until(f, [counter=3]() mutable {
74+
std::cout << "Counter = " << counter << std::endl;
75+
return counter -- == 0;
76+
}).get();
7077

7178
return 0;
7279
}

taskflow/graph/basic_taskflow.hpp

Lines changed: 58 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,32 @@ class BasicTaskflow : public FlowBuilder {
230230
std::shared_future<void> run_n(Framework& framework, size_t N, C&& callable);
231231

232232

233+
234+
/**
235+
@brief runs the framework w/o a callback until the predicate becomes true and returns immediately
236+
237+
@param framework a tf::Framework
238+
@param P predicate (a callable object returns true or false)
239+
240+
@return a std::shared_future to access the execution status of the framework
241+
*/
242+
template<typename P>
243+
std::shared_future<void> run_until(Framework& framework, P&& predicate);
244+
245+
246+
/**
247+
@brief runs the framework w/ a callback until the predicate becomes true and returns immediately
248+
249+
@param framework a tf::Framework
250+
@param P predicate (a callable object returns true or false)
251+
@param callable a callable object to be invoked after every run
252+
253+
@return a std::shared_future to access the execution status of the framework
254+
*/
255+
template<typename P, typename C>
256+
std::shared_future<void> run_until(Framework& framework, P&& predicate, C&& callable);
257+
258+
233259
private:
234260

235261
Graph _graph;
@@ -298,16 +324,32 @@ std::shared_future<void> BasicTaskflow<E>::run_n(Framework& f, size_t repeat) {
298324
template <template <typename...> typename E>
299325
template <typename C>
300326
std::shared_future<void> BasicTaskflow<E>::run_n(Framework& f, size_t repeat, C&& c) {
327+
return run_until(f, [repeat]() mutable { return repeat-- == 0; }, std::forward<C>(c));
328+
}
301329

302-
static_assert(std::is_invocable<C>::value);
303330

304-
if(repeat == 0) {
331+
// Function: run_until
332+
template <template <typename...> typename E>
333+
template <typename P>
334+
std::shared_future<void> BasicTaskflow<E>::run_until(Framework& f, P&& predicate) {
335+
return run_until(f, std::forward<P>(predicate), [](){});
336+
}
337+
338+
// Function: run_until
339+
template <template <typename...> typename E>
340+
template <typename P, typename C>
341+
std::shared_future<void> BasicTaskflow<E>::run_until(Framework& f, P&& predicate, C&& c) {
342+
343+
// Predicate must return a boolean value
344+
static_assert(std::is_invocable<C>::value && std::is_same_v<bool, std::invoke_result_t<P>>);
345+
346+
if(std::invoke(predicate)) {
305347
return std::async(std::launch::deferred, [](){}).share();
306348
}
307349

308350
std::scoped_lock lock(f._mtx);
309351

310-
auto &tpg = _topologies.emplace_front(f, repeat);
352+
auto &tpg = _topologies.emplace_front(f, std::forward<P>(predicate));
311353
f._topologies.push_back(&tpg);
312354

313355
const auto setup_topology = [](auto& f, auto& tpg) {
@@ -324,17 +366,17 @@ std::shared_future<void> BasicTaskflow<E>::run_n(Framework& f, size_t repeat, C&
324366
}
325367
};
326368

327-
328369
// Iterative execution to avoid stack overflow
329370
if(num_workers() == 0) {
330371
// Clear last execution data & Build precedence between nodes and target
331372
setup_topology(f, tpg);
332373

333374
const int tgt_predecessor = tpg._num_sinks;
334-
for(size_t i=0; i<repeat; i++) {
375+
do {
376+
//for(size_t i=0; i<repeat; i++) {
335377
_schedule(tpg._sources);
336378
f._topologies.front()->_num_sinks = tgt_predecessor;
337-
}
379+
} while(!std::invoke(tpg._predicate));
338380

339381
std::invoke(c);
340382
auto &p = f._topologies.front()->_promise;
@@ -356,7 +398,8 @@ std::shared_future<void> BasicTaskflow<E>::run_n(Framework& f, size_t repeat, C&
356398

357399
// PV 1/31 (twhuang): thread safety?
358400
// case 1: we still need to run the topology again
359-
if(--f._topologies.front()->_repeat != 0) {
401+
if(!std::invoke(f._topologies.front()->_predicate)) {
402+
//if(--f._topologies.front()->_repeat != 0) {
360403
f._topologies.front()->_num_sinks = tgt_predecessor;
361404
_schedule(f._topologies.front()->_sources);
362405
}
@@ -377,7 +420,7 @@ std::shared_future<void> BasicTaskflow<E>::run_n(Framework& f, size_t repeat, C&
377420
//c = std::move(std::get<StaticWork>((*next_tpg)->_target._work));
378421
c = std::move((*next_tpg)->_work);
379422

380-
f._topologies.front()->_repeat = (*next_tpg)->_repeat;
423+
f._topologies.front()->_predicate = std::move((*next_tpg)->_predicate);
381424
f._topologies.front()->_promise = std::move((*next_tpg)->_promise);
382425
f._topologies.erase(next_tpg);
383426

@@ -432,7 +475,7 @@ void BasicTaskflow<E>::Closure::operator () () const {
432475
else {
433476

434477
// Clear the subgraph before the task execution
435-
if(!node->_spawned) {
478+
if(!node->is_spawned()) {
436479
node->_subgraph.emplace();
437480
}
438481

@@ -441,14 +484,14 @@ void BasicTaskflow<E>::Closure::operator () () const {
441484
std::invoke(std::get<DynamicWork>(node->_work), fb);
442485

443486
// Need to create a subflow if first time & subgraph is not empty
444-
if(!node->_spawned) {
445-
node->_spawned = true;
487+
if(!node->is_spawned()) {
488+
node->set_spawned();
446489
if(!node->_subgraph->empty()) {
447490
// For storing the source nodes
448491
std::vector<Node*> src;
449492
for(auto n = node->_subgraph->begin(); n != node->_subgraph->end(); ++n) {
450493
n->_topology = node->_topology;
451-
n->_subtask = true;
494+
n->set_subtask();
452495
if(n->num_successors() == 0) {
453496
if(fb.detached()) {
454497
node->_topology->_num_sinks ++;
@@ -479,15 +522,15 @@ void BasicTaskflow<E>::Closure::operator () () const {
479522
// This must be done before scheduling the successors, otherwise this might cause
480523
// race condition on the _dependents
481524
//if(num_successors && !node->_subtask) {
482-
if(!node->_subtask) {
525+
if(!node->is_subtask()) {
483526
// Only dynamic tasking needs to restore _predecessors
484527
if(node->_work.index() == 1 && !node->_subgraph->empty()) {
485-
while(!node->_predecessors.empty() && node->_predecessors.back()->_subtask) {
528+
while(!node->_predecessors.empty() && node->_predecessors.back()->is_subtask()) {
486529
node->_predecessors.pop_back();
487530
}
488531
}
489532
node->_dependents = node->_predecessors.size();
490-
node->_spawned = false;
533+
node->clear_status();
491534
}
492535

493536
// At this point, the node storage might be destructed.

taskflow/graph/graph.hpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "../utility/traits.hpp"
55
#include "../utility/allocator.hpp"
66
#include "../utility/passive_vector.hpp"
7+
#include <bitset>
78

89
namespace tf {
910

@@ -50,6 +51,13 @@ class Node {
5051

5152
std::string dump() const;
5253

54+
// Status-related functions
55+
bool is_spawned() const { return _status & 0x1; }
56+
bool is_subtask() const { return _status & 0x2; }
57+
void set_spawned() { _status |= 0x1; }
58+
void set_subtask() { _status |= 0x2; }
59+
void clear_status() { _status &= 0x0; }
60+
5361
private:
5462

5563
std::string _name;
@@ -64,8 +72,7 @@ class Node {
6472

6573
Topology* _topology;
6674

67-
bool _spawned {false};
68-
bool _subtask {false};
75+
size_t _status {0};
6976
};
7077

7178
// Constructor

taskflow/graph/topology.hpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ class Topology {
2020
template <typename C>
2121
Topology(Graph&&, C&&);
2222

23-
Topology(Framework&, size_t);
23+
//Topology(Framework&, size_t);
24+
Topology(Framework&, std::function<bool()>);
2425

2526
std::string dump() const;
2627
void dump(std::ostream&) const;
@@ -30,7 +31,8 @@ class Topology {
3031
std::variant<Graph, Framework*> _handle;
3132

3233
std::promise <void> _promise;
33-
size_t _repeat {0};
34+
//size_t _repeat {0};
35+
std::function<bool()> _predicate {nullptr};
3436

3537
std::shared_future<void> _future;
3638

@@ -42,7 +44,8 @@ class Topology {
4244

4345

4446
// Constructor
45-
inline Topology::Topology(Framework& f, size_t repeat): _handle(&f), _repeat(repeat) {
47+
//inline Topology::Topology(Framework& f, size_t repeat): _handle(&f), _repeat(repeat) {
48+
inline Topology::Topology(Framework& f, std::function<bool()> p): _handle(&f), _predicate(p) {
4649
_future = _promise.get_future().share();
4750
}
4851

0 commit comments

Comments
 (0)