@@ -74,19 +74,17 @@ class BasicSpeculativeThreadpool {
7474 std::deque<TaskType> _task_queue;
7575 std::vector<std::thread> _threads;
7676 std::vector<Worker*> _idlers;
77- std::unordered_set<std::thread::id> _worker_ids;
7877 std::unordered_map<std::thread::id, Worker*> _worker_local;
7978
80- // TODO
8179 const std::thread::id _owner {std::this_thread::get_id ()};
8280
8381 bool _exiting {false };
8482 bool _wait_for_all {false };
8583
86- typename std::unordered_map<std::thread::id, Worker*>::iterator
87- _lookahead (){
84+ auto _lookahead (){
85+ auto id = std::this_thread::get_id ();
8886 std::scoped_lock<std::mutex> lock (_mutex);
89- return _worker_local.find (std::this_thread::get_id () );
87+ return _worker_local.find (id );
9088 }
9189
9290}; // class BasicSpeculativeThreadpool. --------------------------------------
@@ -156,7 +154,8 @@ void BasicSpeculativeThreadpool<TaskType>::shutdown(){
156154
157155 _wait_for_all = false ;
158156 _exiting = false ;
159- // task queue might have tasks that are added dynamically...
157+
158+ // task queue might have tasks added by threads outside this pool...
160159 // while(not _task_queue.empty()) {
161160 // std::invoke(_task_queue.front());
162161 // _task_queue.pop_front();
@@ -174,17 +173,15 @@ void BasicSpeculativeThreadpool<TaskType>::spawn(unsigned N) {
174173
175174 for (size_t i=0 ; i<N; ++i){
176175 _threads.emplace_back ([this ]()->void {
176+
177177 Worker w;
178178 TaskType t;
179-
180- {
181- std::scoped_lock<std::mutex> lock (_mutex);
182- // TODO
183- // _worker_ids.insert(std::this_thread::get_id());
184- _worker_local.insert ({std::this_thread::get_id (), &w});
185- }
179+ auto id = std::this_thread::get_id ();
186180
187181 std::unique_lock<std::mutex> lock (_mutex);
182+
183+ _worker_local.insert ({id, &w});
184+
188185 while (!_exiting){
189186 if (_task_queue.empty ()){
190187 w.ready = false ;
@@ -198,21 +195,16 @@ void BasicSpeculativeThreadpool<TaskType>::spawn(unsigned N) {
198195 w.cv .wait (lock);
199196 }
200197
201- // TODO?
202198 t = std::move (w.task );
203199 }
204200 else {
205201 t = std::move (_task_queue.front ());
206202 _task_queue.pop_front ();
207-
208- // if(_task_queue.empty() && _wait_for_all) {
209- // _empty_cv.notify_one();
210- // }
211203 }
212204
213205 if (t){
214206 _mutex.unlock ();
215- // TODO:
207+ // speculation loop
216208 while (t) {
217209 t ();
218210 t = std::move (w.task );
@@ -222,7 +214,6 @@ void BasicSpeculativeThreadpool<TaskType>::spawn(unsigned N) {
222214 } // End of while --------------------------------------------------------------------------
223215 });
224216
225- _worker_ids.insert (_threads.back ().get_id ());
226217 } // End of For ---------------------------------------------------------------------------------
227218}
228219
@@ -249,19 +240,18 @@ auto BasicSpeculativeThreadpool<TaskType>::async(C&& c){
249240 }
250241 // have worker(s)
251242 else {
252- // std::scoped_lock<std::mutex> lock(_mutex);
253243 if constexpr (std::is_same_v<void , R>){
254- // all workers are busy.
255-
244+
245+ // speculation
256246 if (std::this_thread::get_id () != _owner){
257247 auto iter = _lookahead ();
258248 if (iter != _worker_local.end () and iter->second ->task == nullptr ){
259- iter->second ->task = std::move (
249+ iter->second ->task =
260250 [p = MoC (std::move (p)), c = std::forward<C>(c)]() mutable {
261251 c ();
262252 p.get ().set_value ();
263- }) ;
264- return ;
253+ };
254+ return fu ;
265255 }
266256 }
267257
@@ -288,14 +278,15 @@ auto BasicSpeculativeThreadpool<TaskType>::async(C&& c){
288278 }
289279 else {
290280
281+ // speculation
291282 if (std::this_thread::get_id () != _owner){
292283 auto iter = _lookahead ();
293284 if (iter != _worker_local.end () and iter->second ->task == nullptr ){
294- iter->second ->task = std::move (
285+ iter->second ->task =
295286 [p = MoC (std::move (p)), c = std::forward<C>(c)]() mutable {
296287 p.get ().set_value (c ());
297- }) ;
298- return ;
288+ };
289+ return fu ;
299290 }
300291 }
301292
@@ -304,7 +295,6 @@ auto BasicSpeculativeThreadpool<TaskType>::async(C&& c){
304295 _task_queue.emplace_back (
305296 [p = MoC (std::move (p)), c = std::forward<C>(c)]() mutable {
306297 p.get ().set_value (c ());
307- return ;
308298 }
309299 );
310300 }
@@ -314,7 +304,6 @@ auto BasicSpeculativeThreadpool<TaskType>::async(C&& c){
314304 w->ready = true ;
315305 w->task = [p = MoC (std::move (p)), c = std::forward<C>(c)]() mutable {
316306 p.get ().set_value (c ());
317- return ;
318307 };
319308 w->cv .notify_one ();
320309 }
@@ -336,6 +325,7 @@ void BasicSpeculativeThreadpool<TaskType>::silent_async(C&& c){
336325 return ;
337326 }
338327
328+ // speculation
339329 if (std::this_thread::get_id () != _owner){
340330 auto iter = _lookahead ();
341331 if (iter != _worker_local.end () and iter->second ->task == nullptr ){
@@ -344,16 +334,6 @@ void BasicSpeculativeThreadpool<TaskType>::silent_async(C&& c){
344334 }
345335 }
346336
347- // TODO
348- // std::scoped_lock<std::mutex> lock(_mutex);
349- // if(auto iter = _worker_local.find(std::this_thread::get_id());
350- // iter != _worker_local.end()){
351- // if(iter->second->task == nullptr){
352- // iter->second->task = std::move(t);
353- // return ;
354- // }
355- // }
356-
357337 std::scoped_lock<std::mutex> lock (_mutex);
358338 if (_idlers.empty ()){
359339 _task_queue.push_back (std::move (t));
0 commit comments