@@ -139,8 +139,6 @@ class Topology;
139139class Task ;
140140class FlowBuilder ;
141141class SubflowBuilder ;
142-
143- template <typename Threadpool>
144142class BasicTaskflow ;
145143
146144using Graph = std::forward_list<Node>;
@@ -152,8 +150,7 @@ class Node {
152150
153151 friend class Task ;
154152 friend class Topology ;
155-
156- template <typename Executor> friend class BasicTaskflow ;
153+ friend class BasicTaskflow ;
157154
158155 using StaticWork = std::function<void ()>;
159156 using DynamicWork = std::function<void (SubflowBuilder&)>;
@@ -274,7 +271,7 @@ inline void Node::_dump(std::ostream& os) const {
274271// class: Topology
275272class Topology {
276273
277- template < typename Executor> friend class BasicTaskflow ;
274+ friend class BasicTaskflow ;
278275
279276 public:
280277
@@ -357,8 +354,7 @@ inline std::string Topology::dump() const {
357354class Task {
358355
359356 friend class FlowBuilder ;
360-
361- template <typename Executor> friend class BasicTaskflow ;
357+ friend class BasicTaskflow ;
362358
363359 public:
364360
@@ -1050,14 +1046,39 @@ inline auto FlowBuilder::silent_emplace(C&& c) {
10501046// ============================================================================
10511047// Taskflow Definition
10521048// ============================================================================
1053-
1049+
10541050// Class: BasicTaskflow
1055- template <typename Executor>
10561051class BasicTaskflow : public FlowBuilder {
1052+
1053+ struct Work {
10571054
1055+ Work () = default ;
1056+ Work (const Work&) = delete ;
1057+
1058+ Work (Work&& rhs) : taskflow {rhs.taskflow }, node {rhs.node } {
1059+ rhs.taskflow = nullptr ;
1060+ rhs.node = nullptr ;
1061+ }
1062+
1063+ Work (BasicTaskflow& t, Node& n) : taskflow{&t}, node {&n} {}
1064+
1065+ BasicTaskflow* taskflow {nullptr };
1066+ Node* node {nullptr };
1067+
1068+ void operator () ();
1069+
1070+ Work& operator = (Work&& rhs) {
1071+ taskflow = rhs.taskflow ;
1072+ node = rhs.node ;
1073+ rhs.taskflow = nullptr ;
1074+ rhs.node = nullptr ;
1075+ return *this ;
1076+ }
1077+ };
1078+
10581079 public:
10591080
1060- using StaticWork = typename Node::StaticWork;
1081+ using StaticWork = typename Node::StaticWork;
10611082 using DynamicWork = typename Node::DynamicWork;
10621083
10631084 explicit BasicTaskflow ();
@@ -1070,7 +1091,6 @@ class BasicTaskflow : public FlowBuilder {
10701091 void silent_dispatch ();
10711092 void wait_for_all ();
10721093 void wait_for_topologies ();
1073- void num_workers (size_t );
10741094
10751095 size_t num_nodes () const ;
10761096 size_t num_workers () const ;
@@ -1081,7 +1101,7 @@ class BasicTaskflow : public FlowBuilder {
10811101
10821102 private:
10831103
1084- Executor _executor;
1104+ SimpleThreadpool2<Work> _executor;
10851105
10861106 Graph _graph;
10871107
@@ -1090,56 +1110,112 @@ class BasicTaskflow : public FlowBuilder {
10901110 void _schedule (Node&);
10911111};
10921112
1113+ // Operator
1114+ void BasicTaskflow::Work::operator () () {
1115+
1116+ assert (taskflow && node);
1117+
1118+ // Here we need to fetch the num_successors first to avoid the invalid memory
1119+ // access caused by topology clear.
1120+ const auto num_successors = node->num_successors ();
1121+
1122+ // regular node type
1123+ // The default node work type. We only need to execute the callback if any.
1124+ if (auto index=node->_work .index (); index == 0 ) {
1125+ if (auto &f = std::get<StaticWork>(node->_work ); f != nullptr ){
1126+ std::invoke (f);
1127+ }
1128+ }
1129+ // subflow node type
1130+ // The first time we enter into the subflow context, "subnodes" must be empty.
1131+ // After executing the user's callback on subflow, there will be at least one
1132+ // node node used as "super source". The second time we enter this context we
1133+ // don't have to reexecute the work again.
1134+ else {
1135+ assert (std::holds_alternative<DynamicWork>(node->_work ));
1136+
1137+ SubflowBuilder fb (node->_subgraph , taskflow->num_workers ());
1138+
1139+ bool empty_graph = node->_subgraph .empty ();
1140+
1141+ std::invoke (std::get<DynamicWork>(node->_work ), fb);
1142+
1143+ // Need to create a subflow
1144+ if (empty_graph) {
1145+
1146+ auto & S = node->_subgraph .emplace_front ([](){});
1147+
1148+ S._topology = node->_topology ;
1149+
1150+ for (auto i = std::next (node->_subgraph .begin ()); i != node->_subgraph .end (); ++i) {
1151+
1152+ i->_topology = node->_topology ;
1153+
1154+ if (i->num_successors () == 0 ) {
1155+ i->precede (fb.detached () ? node->_topology ->_target : *node);
1156+ }
1157+
1158+ if (i->num_dependents () == 0 ) {
1159+ S.precede (*i);
1160+ }
1161+ }
1162+
1163+ // this is for the case where subflow graph might be empty
1164+ if (!fb.detached ()) {
1165+ S.precede (*node);
1166+ }
1167+
1168+ taskflow->_schedule (S);
1169+
1170+ if (!fb.detached ()) {
1171+ return ;
1172+ }
1173+ }
1174+ }
1175+
1176+ // At this point, the node/node storage might be destructed.
1177+ for (size_t i=0 ; i<num_successors; ++i) {
1178+ if (--(node->_successors [i]->_dependents ) == 0 ) {
1179+ taskflow->_schedule (*(node->_successors [i]));
1180+ }
1181+ }
1182+ }
1183+
10931184// Constructor
1094- template <typename Executor>
1095- BasicTaskflow<Executor>::BasicTaskflow() :
1185+ inline BasicTaskflow::BasicTaskflow () :
10961186 FlowBuilder {_graph, std::thread::hardware_concurrency ()},
10971187 _executor {std::thread::hardware_concurrency ()} {
10981188}
10991189
11001190// Constructor
1101- template <typename Executor>
1102- BasicTaskflow<Executor>::BasicTaskflow(unsigned N) :
1191+ inline BasicTaskflow::BasicTaskflow (unsigned N) :
11031192 FlowBuilder {_graph, std::thread::hardware_concurrency ()},
11041193 _executor {N} {
11051194}
11061195
11071196// Destructor
1108- template <typename Executor>
1109- BasicTaskflow<Executor>::~BasicTaskflow () {
1197+ inline BasicTaskflow::~BasicTaskflow () {
11101198 wait_for_topologies ();
11111199}
11121200
1113- // Procedure: num_workers
1114- template <typename Executor>
1115- void BasicTaskflow<Executor>::num_workers(size_t W) {
1116- _executor.shutdown ();
1117- _executor.spawn (W);
1118- _partition_factor = W;
1119- }
1120-
11211201// Function: num_nodes
1122- template <typename Executor>
1123- size_t BasicTaskflow<Executor>::num_nodes() const {
1202+ inline size_t BasicTaskflow::num_nodes () const {
11241203 // return _nodes.size();
11251204 return std::distance (_graph.begin (), _graph.end ());
11261205}
11271206
11281207// Function: num_workers
1129- template <typename Executor>
1130- size_t BasicTaskflow<Executor>::num_workers() const {
1208+ inline size_t BasicTaskflow::num_workers () const {
11311209 return _executor.num_workers ();
11321210}
11331211
11341212// Function: num_topologies
1135- template <typename Executor>
1136- size_t BasicTaskflow<Executor>::num_topologies() const {
1213+ inline size_t BasicTaskflow::num_topologies () const {
11371214 return std::distance (_topologies.begin (), _topologies.end ());
11381215}
11391216
11401217// Procedure: silent_dispatch
1141- template <typename Executor>
1142- void BasicTaskflow<Executor>::silent_dispatch() {
1218+ inline void BasicTaskflow::silent_dispatch () {
11431219
11441220 if (_graph.empty ()) return ;
11451221
@@ -1150,8 +1226,7 @@ void BasicTaskflow<Executor>::silent_dispatch() {
11501226}
11511227
11521228// Procedure: dispatch
1153- template <typename Executor>
1154- std::shared_future<void > BasicTaskflow<Executor>::dispatch() {
1229+ inline std::shared_future<void > BasicTaskflow::dispatch () {
11551230
11561231 if (_graph.empty ()) {
11571232 return std::async (std::launch::deferred, [](){}).share ();
@@ -1166,17 +1241,15 @@ std::shared_future<void> BasicTaskflow<Executor>::dispatch() {
11661241}
11671242
11681243// Procedure: wait_for_all
1169- template <typename Executor>
1170- void BasicTaskflow<Executor>::wait_for_all() {
1244+ inline void BasicTaskflow::wait_for_all () {
11711245 if (!_graph.empty ()) {
11721246 silent_dispatch ();
11731247 }
11741248 wait_for_topologies ();
11751249}
11761250
11771251// Procedure: wait_for_topologies
1178- template <typename Executor>
1179- void BasicTaskflow<Executor>::wait_for_topologies() {
1252+ inline void BasicTaskflow::wait_for_topologies () {
11801253 for (auto & t: _topologies){
11811254 t._future .get ();
11821255 }
@@ -1186,81 +1259,12 @@ void BasicTaskflow<Executor>::wait_for_topologies() {
11861259// Procedure: _schedule
11871260// The main procedure to schedule a give task node.
11881261// Each task node has two types of tasks - regular and subflow.
1189- template <typename Executor>
1190- void BasicTaskflow<Executor>::_schedule(Node& node) {
1191-
1192- _executor.silent_async ([this , &node](){
1193-
1194- // Here we need to fetch the num_successors first to avoid the invalid memory
1195- // access caused by topology clear.
1196- const auto num_successors = node.num_successors ();
1197-
1198- // regular node type
1199- // The default node work type. We only need to execute the callback if any.
1200- if (auto index=node._work .index (); index == 0 ) {
1201- if (auto &f = std::get<StaticWork>(node._work ); f != nullptr ){
1202- std::invoke (f);
1203- }
1204- }
1205- // subflow node type
1206- // The first time we enter into the subflow context, "subnodes" must be empty.
1207- // After executing the user's callback on subflow, there will be at least one
1208- // node node used as "super source". The second time we enter this context we
1209- // don't have to reexecute the work again.
1210- else {
1211- assert (std::holds_alternative<DynamicWork>(node._work ));
1212-
1213- SubflowBuilder fb (node._subgraph , num_workers ());
1214-
1215- bool empty_graph = node._subgraph .empty ();
1216-
1217- std::invoke (std::get<DynamicWork>(node._work ), fb);
1218-
1219- // Need to create a subflow
1220- if (empty_graph) {
1221-
1222- auto & S = node._subgraph .emplace_front ([](){});
1223-
1224- S._topology = node._topology ;
1225-
1226- for (auto i = std::next (node._subgraph .begin ()); i != node._subgraph .end (); ++i) {
1227-
1228- i->_topology = node._topology ;
1229-
1230- if (i->num_successors () == 0 ) {
1231- i->precede (fb.detached () ? node._topology ->_target : node);
1232- }
1233-
1234- if (i->num_dependents () == 0 ) {
1235- S.precede (*i);
1236- }
1237- }
1238-
1239- // this is for the case where subflow graph might be empty
1240- if (!fb.detached ()) {
1241- S.precede (node);
1242- }
1243-
1244- _schedule (S);
1245-
1246- if (!fb.detached ()) {
1247- return ;
1248- }
1249- }
1250- }
1251-
1252- // At this point, the node/node storage might be destructed.
1253- for (size_t i=0 ; i<num_successors; ++i) {
1254- if (--(node._successors [i]->_dependents ) == 0 ) {
1255- _schedule (*(node._successors [i]));
1256- }
1257- }
1258- });
1262+ inline void BasicTaskflow::_schedule (Node& node) {
1263+ _executor.emplace (*this , node);
12591264}
12601265
12611266// Function: dump_topology
1262- template <typename Executor>
1263- std::string BasicTaskflow<Executor>::dump_topologies() const {
1267+ inline std::string BasicTaskflow::dump_topologies () const {
12641268
12651269 std::ostringstream os;
12661270
@@ -1273,8 +1277,7 @@ std::string BasicTaskflow<Executor>::dump_topologies() const {
12731277
12741278// Function: dump
12751279// Dumps the taskflow in graphviz. The result can be viewed at http://www.webgraphviz.com/.
1276- template <typename Executor>
1277- std::string BasicTaskflow<Executor>::dump() const {
1280+ inline std::string BasicTaskflow::dump () const {
12781281
12791282 std::ostringstream os;
12801283
@@ -1291,7 +1294,7 @@ std::string BasicTaskflow<Executor>::dump() const {
12911294
12921295// -----------------------------------------------------------------------------
12931296
1294- using Taskflow = BasicTaskflow<SimpleThreadpool> ;
1297+ using Taskflow = BasicTaskflow;
12951298
12961299}; // end of namespace tf. ---------------------------------------------------
12971300
0 commit comments