Skip to content

Commit dcaf50c

Browse files
added nested_subflow
1 parent 7d3d149 commit dcaf50c

6 files changed

Lines changed: 72 additions & 21 deletions

File tree

CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,9 @@ target_link_libraries(simple ${PROJECT_NAME} Threads::Threads)
116116
add_executable(subflow ${TF_EXAMPLE_DIR}/subflow.cpp)
117117
target_link_libraries(subflow ${PROJECT_NAME} Threads::Threads)
118118

119+
add_executable(nested_subflow ${TF_EXAMPLE_DIR}/nested_subflow.cpp)
120+
target_link_libraries(nested_subflow ${PROJECT_NAME} Threads::Threads)
121+
119122
add_executable(debug ${TF_EXAMPLE_DIR}/debug.cpp)
120123
target_link_libraries(debug ${PROJECT_NAME} Threads::Threads)
121124

example/nested_subflow.cpp

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#include <taskflow/taskflow.hpp>
2+
3+
void syncLog(std::string const& msg) {
4+
static std::mutex logMutex;
5+
std::lock_guard<std::mutex> lock(logMutex);
6+
std::cout << msg << '\n';
7+
}
8+
9+
void grow(tf::SubflowBuilder& subflow, uint64_t depth) {
10+
syncLog("Depth: " + std::to_string(depth));
11+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
12+
if(depth < 3) {
13+
subflow.silent_emplace(
14+
[depth](tf::SubflowBuilder& subsubflow){ grow(subsubflow, depth+1); },
15+
[depth](tf::SubflowBuilder& subsubflow){ grow(subsubflow, depth+1); });
16+
subflow.detach();
17+
}
18+
}
19+
20+
int main(int argc, char *argv[]) {
21+
tf::Taskflow mainTaskFlow;
22+
mainTaskFlow.silent_emplace([](tf::SubflowBuilder& subflow){grow(subflow, 0);});
23+
mainTaskFlow.wait_for_all();
24+
25+
return 0;
26+
}

example/simple.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,6 @@ int main(){
2525

2626
return 0;
2727
}
28+
29+
30+

taskflow/graph/basic_taskflow.hpp

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -311,10 +311,16 @@ std::shared_future<void> BasicTaskflow<E>::run_n(Framework& f, size_t repeat, C&
311311

312312
auto &tpg = _topologies.emplace_front(f, repeat);
313313
f._topologies.push_back(&tpg);
314-
314+
315+
// PV 1/31 (twhuang): Lambda in C++17 is by default inline - no need for static
315316
static const auto setup_topology = [](auto& f, auto& tpg) {
317+
316318
for(auto& n: f._graph) {
317-
//// TODO: swap this with the last and then pop_back
319+
320+
// PR 1/31 (twhuang): I don't think we need check the emptiness
321+
// of the successors over here
322+
// Also, when do you clean up dynamic tasking nodes?
323+
//
318324
if(!n._successors.empty()) {
319325
for(size_t i=0; i<n._successors.size(); i++) {
320326
if(n._successors[i] == f._last_target) {
@@ -336,18 +342,23 @@ std::shared_future<void> BasicTaskflow<E>::run_n(Framework& f, size_t repeat, C&
336342
}
337343
};
338344

345+
// PV 1/31 (twhuang): single worker - we need to remove topologies?
346+
339347
// Iterative execution to avoid stack overflow
340348
if(num_workers() == 0) {
341349
// Clear last execution data & Build precedence between nodes and target
342350
setup_topology(f, tpg);
343351

344352
tpg._target._work = std::forward<C>(c);
353+
354+
// PR 1/31 (twhuang): redundant tgt_predecessors
345355
const int tgt_predecessor = tpg._target._predecessors.size();
346356

347357
for(size_t i=0; i<repeat; i++) {
348358

349359
_schedule(tpg._sources);
350-
360+
361+
// PR 1/31 (twhuang): why do we need to set the dependents again?
351362
// Reset target
352363
f._topologies.front()->_target._predecessors.resize(tgt_predecessor);
353364
f._topologies.front()->_target._dependents = tgt_predecessor;
@@ -370,7 +381,8 @@ std::shared_future<void> BasicTaskflow<E>::run_n(Framework& f, size_t repeat, C&
370381
tgt_predecessor = tpg._target._predecessors.size(), this]() mutable {
371382

372383
std::invoke(c);
373-
384+
385+
// PV 1/31 (twhuang): thread safety?
374386
// case 1: we still need to run the topology again
375387
if(--f._topologies.front()->_repeat != 0) {
376388

@@ -450,7 +462,9 @@ void BasicTaskflow<E>::Closure::operator () () const {
450462
}
451463
// subflow node type
452464
else {
453-
465+
466+
// PV 1/31 (twhuang): emplace is enough
467+
//
454468
// Clear the subgraph before the task execution
455469
if(!node->_spawned) {
456470
node->_subgraph.reset();
@@ -485,8 +499,12 @@ void BasicTaskflow<E>::Closure::operator () () const {
485499
}
486500
}
487501
}
488-
} // End of DynamicWork -------------------------------------------------------------------------
489-
502+
} // End of DynamicWork -----------------------------------------------------
503+
504+
// PV 1/31 (twhuang): probably can add a bitwise state for each node?
505+
// Also I think the while loop can be improved.
506+
// Why do we need num_successors in if?
507+
//
490508
// Recover the runtime change due to dynamic tasking except the target & spawn tasks
491509
// This must be done before scheduling the successors, otherwise this might cause
492510
// race condition on the _dependents
@@ -498,7 +516,7 @@ void BasicTaskflow<E>::Closure::operator () () const {
498516
node->_spawned = false;
499517
}
500518

501-
// At this point, the node/node storage might be destructed.
519+
// At this point, the node storage might be destructed.
502520
for(size_t i=0; i<num_successors; ++i) {
503521
if(--(node->_successors[i]->_dependents) == 0) {
504522
taskflow->_schedule(*(node->_successors[i]));

taskflow/graph/graph.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ inline Node::Node(C&& c) : _work {std::forward<C>(c)} {
7979
_topology = nullptr;
8080
}
8181

82-
8382
// Destructor
8483
inline Node::~Node() {
8584
if(_subgraph.has_value()) {

taskflow/threadpool/workstealing_threadpool.hpp

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,7 @@ class WorkStealingThreadpool {
376376
template <typename Closure>
377377
WorkStealingThreadpool<Closure>::WorkStealingThreadpool(unsigned N) : _workers {N} {
378378
_worker_maps.reserve(N);
379+
_idlers.reserve(N);
379380
_spawn(N);
380381
}
381382

@@ -508,24 +509,25 @@ void WorkStealingThreadpool<Closure>::_balance_load(unsigned me) {
508509

509510
// return if no idler - this might not be the right value
510511
// but it doesn't affect the correctness
511-
if(_idlers.empty() || n <= 1) {
512+
if(_idlers.empty() || n == 0) {
512513
return;
513514
}
514515

515516
// try with probability 1/n
516517
//if(_fast_modulo(_randomize(_workers[me].seed), n) == 0u) {
517518
// wake up my partner to help balance
518-
if(_mutex.try_lock()) {
519-
if(!_idlers.empty()) {
520-
Worker* w = _idlers.back();
521-
_idlers.pop_back();
522-
w->ready = true;
523-
w->cache = _workers[me].queue.pop();
524-
w->cv.notify_one();
525-
w->last_victim = me;
526-
}
527-
_mutex.unlock();
528-
}
519+
//if(_mutex.try_lock()) {
520+
std::scoped_lock lock(_mutex);
521+
if(!_idlers.empty()) {
522+
Worker* w = _idlers.back();
523+
_idlers.pop_back();
524+
w->ready = true;
525+
w->cache = _workers[me].queue.pop();
526+
w->cv.notify_one();
527+
w->last_victim = me;
528+
}
529+
//_mutex.unlock();
530+
//}
529531
//}
530532
}
531533

0 commit comments

Comments
 (0)