Skip to content

Commit c3ee85a

Browse files
committed
Add unit test for observer
1 parent 791a5f8 commit c3ee85a

3 files changed

Lines changed: 111 additions & 0 deletions

File tree

taskflow/executor/observer.hpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,13 @@ class ExecutorObserver : public ExecutorObserverInterface {
8585
*/
8686
inline void clear();
8787

88+
89+
/**
90+
@brief get the number of total tasks in the observer
91+
@return number of total tasks
92+
*/
93+
inline int num_tasks() const;
94+
8895
private:
8996

9097
inline void set_up(unsigned num_workers) override final;
@@ -165,6 +172,15 @@ inline std::string ExecutorObserver::dump() const {
165172
return oss.str();
166173
}
167174

175+
// Function: num_tasks
176+
inline int ExecutorObserver::num_tasks() const {
177+
return std::accumulate(_begs.begin(), _begs.end(), 0,
178+
[](int sum, const std::vector<std::chrono::time_point<std::chrono::steady_clock>>& vec){
179+
return sum + vec.size();
180+
}
181+
);
182+
}
183+
168184

169185
} // end of namespace tf -------------------------------------------
170186

taskflow/executor/proactive.hpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,8 +225,15 @@ void ProactiveExecutor<Closure>::_spawn(unsigned N) {
225225
// shutdown cannot have task
226226
if(w.task) {
227227
lock.unlock();
228+
if(_observer) {
229+
_observer->on_entry(me);
230+
}
228231
(*w.task)();
229232
w.task = std::nullopt;
233+
234+
if(_observer) {
235+
_observer->on_exit(me);
236+
}
230237
lock.lock();
231238
}
232239
}

unittest/taskflow.cpp

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1190,12 +1190,100 @@ TEST_CASE("Composition-3" * doctest::timeout(300)) {
11901190
}
11911191

11921192

1193+
// --------------------------------------------------------
1194+
// Testcase: Observer
1195+
// --------------------------------------------------------
1196+
template <typename T>
1197+
void ObserverTest() {
1198+
for(unsigned w=0; w<=8; ++w) {
1199+
T taskflow {w};
1200+
auto observer = taskflow.share_executor()->template make_observer<tf::ExecutorObserver>();
1201+
1202+
tf::Framework frameworkA;
1203+
std::vector<tf::Task> tasks;
1204+
// Static tasking
1205+
for(auto i=0; i < 1024; i ++) {
1206+
tasks.emplace_back(frameworkA.emplace([](){}));
1207+
}
11931208

1209+
// Randomly specify dependency
1210+
for(auto i=0; i < 1024; i ++) {
1211+
for(auto j=i+1; j < 1024; j++) {
1212+
if(rand()%2 == 0) {
1213+
tasks[i].precede(tasks[j]);
1214+
}
1215+
}
1216+
}
11941217

1218+
taskflow.run_n(frameworkA, 16).get();
1219+
1220+
if(w == 0) {
1221+
REQUIRE(observer->num_tasks() == 0);
1222+
}
1223+
else {
1224+
REQUIRE(observer->num_tasks() == 1024*16);
1225+
}
1226+
1227+
observer->clear();
1228+
REQUIRE(observer->num_tasks() == 0);
1229+
tasks.clear();
1230+
1231+
// Dynamic tasking
1232+
tf::Framework frameworkB;
1233+
std::atomic<int> num_tasks {0};
1234+
// Static tasking
1235+
for(auto i=0; i < 1024; i ++) {
1236+
tasks.emplace_back(frameworkB.emplace([&](auto &subflow){
1237+
num_tasks ++;
1238+
auto num_spawn = rand() % 10 + 1;
1239+
// Randomly spawn tasks
1240+
for(auto i=0; i<num_spawn; i++) {
1241+
subflow.emplace([&](){ num_tasks ++; });
1242+
}
1243+
if(rand() % 2) {
1244+
subflow.detach();
1245+
}
1246+
else {
1247+
// In join mode, this task will be visited twice
1248+
num_tasks ++;
1249+
}
1250+
}));
1251+
}
11951252

1253+
// Randomly specify dependency
1254+
for(auto i=0; i < 1024; i ++) {
1255+
for(auto j=i+1; j < 1024; j++) {
1256+
if(rand()%2 == 0) {
1257+
tasks[i].precede(tasks[j]);
1258+
}
1259+
}
1260+
}
11961261

1262+
taskflow.run_n(frameworkB, 16).get();
11971263

1264+
if(w == 0) {
1265+
REQUIRE(observer->num_tasks() == 0);
1266+
}
1267+
else {
1268+
REQUIRE(observer->num_tasks() == num_tasks);
1269+
}
1270+
}
1271+
}
11981272

1273+
TEST_CASE("Observer" * doctest::timeout(300)) {
1274+
SUBCASE("Simple Executor") {
1275+
ObserverTest<tf::BasicTaskflow<tf::SimpleExecutor>>();
1276+
}
1277+
SUBCASE("Proactive Executor") {
1278+
ObserverTest<tf::BasicTaskflow<tf::ProactiveExecutor>>();
1279+
}
1280+
SUBCASE("Speculative Executor") {
1281+
ObserverTest<tf::BasicTaskflow<tf::SpeculativeExecutor>>();
1282+
}
1283+
SUBCASE("WorkStealing Executor") {
1284+
ObserverTest<tf::BasicTaskflow<tf::WorkStealingExecutor>>();
1285+
}
1286+
}
11991287

12001288

12011289

0 commit comments

Comments
 (0)