Skip to content

Commit 172e06d

Browse files
update taskflow with fixed work
1 parent b93349f commit 172e06d

2 files changed

Lines changed: 120 additions & 129 deletions

File tree

example/taskflow.cpp

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,26 +8,14 @@
88
#include <random>
99
#include <climits>
1010

11-
using tf_simple_t = tf::BasicTaskflow<tf::SimpleThreadpool>;
12-
using tf_proactive_t = tf::BasicTaskflow<tf::ProactiveThreadpool>;
13-
using tf_speculative_t = tf::BasicTaskflow<tf::SpeculativeThreadpool>;
14-
using tf_privatized_t = tf::BasicTaskflow<tf::PrivatizedThreadpool>;
11+
using tf_simple_t = tf::Taskflow;
1512

1613
// Procedure: benchmark
1714
#define BENCHMARK(TITLE, F) \
1815
std::cout << "========== " << TITLE << " ==========\n"; \
1916
\
20-
std::cout << "Taskflow [simple ] elapsed time: " \
17+
std::cout << "Taskflow elapsed time: " \
2118
<< F<tf_simple_t>() << " ms\n"; \
22-
\
23-
std::cout << "Taskflow [proactive ] elapsed time: " \
24-
<< F<tf_proactive_t>() << " ms\n"; \
25-
\
26-
std::cout << "Taskflow [speculative] elapsed time: " \
27-
<< F<tf_speculative_t>() << " ms\n"; \
28-
\
29-
std::cout << "Taskflow [privatized ] elapsed time: " \
30-
<< F<tf_privatized_t>() << " ms\n"; \
3119

3220
// ============================================================================
3321
// Dynamic Stem

taskflow/taskflow.hpp

Lines changed: 118 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,6 @@ class Topology;
139139
class Task;
140140
class FlowBuilder;
141141
class SubflowBuilder;
142-
143-
template <typename Threadpool>
144142
class BasicTaskflow;
145143

146144
using Graph = std::forward_list<Node>;
@@ -152,8 +150,7 @@ class Node {
152150

153151
friend class Task;
154152
friend class Topology;
155-
156-
template <typename Executor> friend class BasicTaskflow;
153+
friend class BasicTaskflow;
157154

158155
using StaticWork = std::function<void()>;
159156
using DynamicWork = std::function<void(SubflowBuilder&)>;
@@ -274,7 +271,7 @@ inline void Node::_dump(std::ostream& os) const {
274271
// class: Topology
275272
class Topology {
276273

277-
template <typename Executor> friend class BasicTaskflow;
274+
friend class BasicTaskflow;
278275

279276
public:
280277

@@ -357,8 +354,7 @@ inline std::string Topology::dump() const {
357354
class Task {
358355

359356
friend class FlowBuilder;
360-
361-
template <typename Executor> friend class BasicTaskflow;
357+
friend class BasicTaskflow;
362358

363359
public:
364360

@@ -1050,14 +1046,39 @@ inline auto FlowBuilder::silent_emplace(C&& c) {
10501046
// ============================================================================
10511047
// Taskflow Definition
10521048
// ============================================================================
1053-
1049+
10541050
// Class: BasicTaskflow
1055-
template <typename Executor>
10561051
class BasicTaskflow : public FlowBuilder {
1052+
1053+
struct Work {
10571054

1055+
Work() = default;
1056+
Work(const Work&) = delete;
1057+
1058+
Work(Work&& rhs) : taskflow {rhs.taskflow}, node {rhs.node} {
1059+
rhs.taskflow = nullptr;
1060+
rhs.node = nullptr;
1061+
}
1062+
1063+
Work(BasicTaskflow& t, Node& n) : taskflow{&t}, node {&n} {}
1064+
1065+
BasicTaskflow* taskflow {nullptr};
1066+
Node* node {nullptr};
1067+
1068+
void operator () ();
1069+
1070+
Work& operator = (Work&& rhs) {
1071+
taskflow = rhs.taskflow;
1072+
node = rhs.node;
1073+
rhs.taskflow = nullptr;
1074+
rhs.node = nullptr;
1075+
return *this;
1076+
}
1077+
};
1078+
10581079
public:
10591080

1060-
using StaticWork = typename Node::StaticWork;
1081+
using StaticWork = typename Node::StaticWork;
10611082
using DynamicWork = typename Node::DynamicWork;
10621083

10631084
explicit BasicTaskflow();
@@ -1070,7 +1091,6 @@ class BasicTaskflow : public FlowBuilder {
10701091
void silent_dispatch();
10711092
void wait_for_all();
10721093
void wait_for_topologies();
1073-
void num_workers(size_t);
10741094

10751095
size_t num_nodes() const;
10761096
size_t num_workers() const;
@@ -1081,7 +1101,7 @@ class BasicTaskflow : public FlowBuilder {
10811101

10821102
private:
10831103

1084-
Executor _executor;
1104+
SimpleThreadpool2<Work> _executor;
10851105

10861106
Graph _graph;
10871107

@@ -1090,56 +1110,112 @@ class BasicTaskflow : public FlowBuilder {
10901110
void _schedule(Node&);
10911111
};
10921112

1113+
// Operator
1114+
void BasicTaskflow::Work::operator () () {
1115+
1116+
assert(taskflow && node);
1117+
1118+
// Here we need to fetch the num_successors first to avoid the invalid memory
1119+
// access caused by topology clear.
1120+
const auto num_successors = node->num_successors();
1121+
1122+
// regular node type
1123+
// The default node work type. We only need to execute the callback if any.
1124+
if(auto index=node->_work.index(); index == 0) {
1125+
if(auto &f = std::get<StaticWork>(node->_work); f != nullptr){
1126+
std::invoke(f);
1127+
}
1128+
}
1129+
// subflow node type
1130+
// The first time we enter into the subflow context, "subnodes" must be empty.
1131+
// After executing the user's callback on subflow, there will be at least one
1132+
// node node used as "super source". The second time we enter this context we
1133+
// don't have to reexecute the work again.
1134+
else {
1135+
assert(std::holds_alternative<DynamicWork>(node->_work));
1136+
1137+
SubflowBuilder fb(node->_subgraph, taskflow->num_workers());
1138+
1139+
bool empty_graph = node->_subgraph.empty();
1140+
1141+
std::invoke(std::get<DynamicWork>(node->_work), fb);
1142+
1143+
// Need to create a subflow
1144+
if(empty_graph) {
1145+
1146+
auto& S = node->_subgraph.emplace_front([](){});
1147+
1148+
S._topology = node->_topology;
1149+
1150+
for(auto i = std::next(node->_subgraph.begin()); i != node->_subgraph.end(); ++i) {
1151+
1152+
i->_topology = node->_topology;
1153+
1154+
if(i->num_successors() == 0) {
1155+
i->precede(fb.detached() ? node->_topology->_target : *node);
1156+
}
1157+
1158+
if(i->num_dependents() == 0) {
1159+
S.precede(*i);
1160+
}
1161+
}
1162+
1163+
// this is for the case where subflow graph might be empty
1164+
if(!fb.detached()) {
1165+
S.precede(*node);
1166+
}
1167+
1168+
taskflow->_schedule(S);
1169+
1170+
if(!fb.detached()) {
1171+
return;
1172+
}
1173+
}
1174+
}
1175+
1176+
// At this point, the node/node storage might be destructed.
1177+
for(size_t i=0; i<num_successors; ++i) {
1178+
if(--(node->_successors[i]->_dependents) == 0) {
1179+
taskflow->_schedule(*(node->_successors[i]));
1180+
}
1181+
}
1182+
}
1183+
10931184
// Constructor
1094-
template <typename Executor>
1095-
BasicTaskflow<Executor>::BasicTaskflow() :
1185+
inline BasicTaskflow::BasicTaskflow() :
10961186
FlowBuilder {_graph, std::thread::hardware_concurrency()},
10971187
_executor {std::thread::hardware_concurrency()} {
10981188
}
10991189

11001190
// Constructor
1101-
template <typename Executor>
1102-
BasicTaskflow<Executor>::BasicTaskflow(unsigned N) :
1191+
inline BasicTaskflow::BasicTaskflow(unsigned N) :
11031192
FlowBuilder {_graph, std::thread::hardware_concurrency()},
11041193
_executor {N} {
11051194
}
11061195

11071196
// Destructor
1108-
template <typename Executor>
1109-
BasicTaskflow<Executor>::~BasicTaskflow() {
1197+
inline BasicTaskflow::~BasicTaskflow() {
11101198
wait_for_topologies();
11111199
}
11121200

1113-
// Procedure: num_workers
1114-
template <typename Executor>
1115-
void BasicTaskflow<Executor>::num_workers(size_t W) {
1116-
_executor.shutdown();
1117-
_executor.spawn(W);
1118-
_partition_factor = W;
1119-
}
1120-
11211201
// Function: num_nodes
1122-
template <typename Executor>
1123-
size_t BasicTaskflow<Executor>::num_nodes() const {
1202+
inline size_t BasicTaskflow::num_nodes() const {
11241203
//return _nodes.size();
11251204
return std::distance(_graph.begin(), _graph.end());
11261205
}
11271206

11281207
// Function: num_workers
1129-
template <typename Executor>
1130-
size_t BasicTaskflow<Executor>::num_workers() const {
1208+
inline size_t BasicTaskflow::num_workers() const {
11311209
return _executor.num_workers();
11321210
}
11331211

11341212
// Function: num_topologies
1135-
template <typename Executor>
1136-
size_t BasicTaskflow<Executor>::num_topologies() const {
1213+
inline size_t BasicTaskflow::num_topologies() const {
11371214
return std::distance(_topologies.begin(), _topologies.end());
11381215
}
11391216

11401217
// Procedure: silent_dispatch
1141-
template <typename Executor>
1142-
void BasicTaskflow<Executor>::silent_dispatch() {
1218+
inline void BasicTaskflow::silent_dispatch() {
11431219

11441220
if(_graph.empty()) return;
11451221

@@ -1150,8 +1226,7 @@ void BasicTaskflow<Executor>::silent_dispatch() {
11501226
}
11511227

11521228
// Procedure: dispatch
1153-
template <typename Executor>
1154-
std::shared_future<void> BasicTaskflow<Executor>::dispatch() {
1229+
inline std::shared_future<void> BasicTaskflow::dispatch() {
11551230

11561231
if(_graph.empty()) {
11571232
return std::async(std::launch::deferred, [](){}).share();
@@ -1166,17 +1241,15 @@ std::shared_future<void> BasicTaskflow<Executor>::dispatch() {
11661241
}
11671242

11681243
// Procedure: wait_for_all
1169-
template <typename Executor>
1170-
void BasicTaskflow<Executor>::wait_for_all() {
1244+
inline void BasicTaskflow::wait_for_all() {
11711245
if(!_graph.empty()) {
11721246
silent_dispatch();
11731247
}
11741248
wait_for_topologies();
11751249
}
11761250

11771251
// Procedure: wait_for_topologies
1178-
template <typename Executor>
1179-
void BasicTaskflow<Executor>::wait_for_topologies() {
1252+
inline void BasicTaskflow::wait_for_topologies() {
11801253
for(auto& t: _topologies){
11811254
t._future.get();
11821255
}
@@ -1186,81 +1259,12 @@ void BasicTaskflow<Executor>::wait_for_topologies() {
11861259
// Procedure: _schedule
11871260
// The main procedure to schedule a give task node.
11881261
// Each task node has two types of tasks - regular and subflow.
1189-
template <typename Executor>
1190-
void BasicTaskflow<Executor>::_schedule(Node& node) {
1191-
1192-
_executor.silent_async([this, &node](){
1193-
1194-
// Here we need to fetch the num_successors first to avoid the invalid memory
1195-
// access caused by topology clear.
1196-
const auto num_successors = node.num_successors();
1197-
1198-
// regular node type
1199-
// The default node work type. We only need to execute the callback if any.
1200-
if(auto index=node._work.index(); index == 0) {
1201-
if(auto &f = std::get<StaticWork>(node._work); f != nullptr){
1202-
std::invoke(f);
1203-
}
1204-
}
1205-
// subflow node type
1206-
// The first time we enter into the subflow context, "subnodes" must be empty.
1207-
// After executing the user's callback on subflow, there will be at least one
1208-
// node node used as "super source". The second time we enter this context we
1209-
// don't have to reexecute the work again.
1210-
else {
1211-
assert(std::holds_alternative<DynamicWork>(node._work));
1212-
1213-
SubflowBuilder fb(node._subgraph, num_workers());
1214-
1215-
bool empty_graph = node._subgraph.empty();
1216-
1217-
std::invoke(std::get<DynamicWork>(node._work), fb);
1218-
1219-
// Need to create a subflow
1220-
if(empty_graph) {
1221-
1222-
auto& S = node._subgraph.emplace_front([](){});
1223-
1224-
S._topology = node._topology;
1225-
1226-
for(auto i = std::next(node._subgraph.begin()); i != node._subgraph.end(); ++i) {
1227-
1228-
i->_topology = node._topology;
1229-
1230-
if(i->num_successors() == 0) {
1231-
i->precede(fb.detached() ? node._topology->_target : node);
1232-
}
1233-
1234-
if(i->num_dependents() == 0) {
1235-
S.precede(*i);
1236-
}
1237-
}
1238-
1239-
// this is for the case where subflow graph might be empty
1240-
if(!fb.detached()) {
1241-
S.precede(node);
1242-
}
1243-
1244-
_schedule(S);
1245-
1246-
if(!fb.detached()) {
1247-
return;
1248-
}
1249-
}
1250-
}
1251-
1252-
// At this point, the node/node storage might be destructed.
1253-
for(size_t i=0; i<num_successors; ++i) {
1254-
if(--(node._successors[i]->_dependents) == 0) {
1255-
_schedule(*(node._successors[i]));
1256-
}
1257-
}
1258-
});
1262+
inline void BasicTaskflow::_schedule(Node& node) {
1263+
_executor.emplace(*this, node);
12591264
}
12601265

12611266
// Function: dump_topology
1262-
template <typename Executor>
1263-
std::string BasicTaskflow<Executor>::dump_topologies() const {
1267+
inline std::string BasicTaskflow::dump_topologies() const {
12641268

12651269
std::ostringstream os;
12661270

@@ -1273,8 +1277,7 @@ std::string BasicTaskflow<Executor>::dump_topologies() const {
12731277

12741278
// Function: dump
12751279
// Dumps the taskflow in graphviz. The result can be viewed at http://www.webgraphviz.com/.
1276-
template <typename Executor>
1277-
std::string BasicTaskflow<Executor>::dump() const {
1280+
inline std::string BasicTaskflow::dump() const {
12781281

12791282
std::ostringstream os;
12801283

@@ -1291,7 +1294,7 @@ std::string BasicTaskflow<Executor>::dump() const {
12911294

12921295
//-----------------------------------------------------------------------------
12931296

1294-
using Taskflow = BasicTaskflow<SimpleThreadpool>;
1297+
using Taskflow = BasicTaskflow;
12951298

12961299
}; // end of namespace tf. ---------------------------------------------------
12971300

0 commit comments

Comments
 (0)