@@ -42,7 +42,7 @@ class BasicSpeculativeThreadpool {
4242 struct Worker {
4343 std::condition_variable cv;
4444 TaskType task;
45- bool ready;
45+ bool ready { false } ;
4646 };
4747
4848 public:
@@ -83,10 +83,11 @@ class BasicSpeculativeThreadpool {
8383
8484 auto _lookahead (){
8585 auto id = std::this_thread::get_id ();
86- std::scoped_lock<std::mutex> lock (_mutex);
8786 return _worker_local.find (id);
8887 }
8988
89+ std::unique_ptr<Worker[]> _works;
90+
9091}; // class BasicSpeculativeThreadpool. --------------------------------------
9192
9293// Constructor
@@ -101,19 +102,19 @@ BasicSpeculativeThreadpool<Func>::~BasicSpeculativeThreadpool(){
101102 shutdown ();
102103}
103104
104-
105105// Function: is_owner
106106template < template <typename ...> class Func >
107107bool BasicSpeculativeThreadpool<Func>::is_owner() const {
108108 return std::this_thread::get_id () == _owner;
109109}
110110
111-
111+ // Function: num_tasks
112112template < template <typename ...> class Func >
113113size_t BasicSpeculativeThreadpool<Func>::num_tasks() const {
114114 return _task_queue.size ();
115115}
116116
117+ // Function: num_workers
117118template < template <typename ...> class Func >
118119size_t BasicSpeculativeThreadpool<Func>::num_workers() const {
119120 return _threads.size ();
@@ -123,7 +124,7 @@ size_t BasicSpeculativeThreadpool<Func>::num_workers() const {
123124template < template <typename ...> class Func >
124125void BasicSpeculativeThreadpool<Func>::shutdown(){
125126
126- if (not is_owner ()){
127+ if (! is_owner ()){
127128 throw std::runtime_error (" Worker thread cannot shut down the pool" );
128129 }
129130
@@ -167,21 +168,22 @@ template < template<typename...> class Func >
167168void BasicSpeculativeThreadpool<Func>::spawn(unsigned N) {
168169
169170 // TODO: is_owner
170- if (not is_owner ()){
171+ if (! is_owner ()){
171172 throw std::runtime_error (" Worker thread cannot spawn threads" );
172173 }
173174
175+ // Lock to synchronize all workers before creating _worker_locals
176+ std::scoped_lock<std::mutex> lock (_mutex);
177+ _works.reset (new Worker[N]);
178+
174179 for (size_t i=0 ; i<N; ++i){
175- _threads.emplace_back ([this ]()->void {
180+ _threads.emplace_back ([this , i=i ]()->void {
176181
177- Worker w;
178- TaskType t;
179- auto id = std::this_thread::get_id ();
182+ Worker& w = _works[i];
183+ TaskType t;
180184
181185 std::unique_lock<std::mutex> lock (_mutex);
182186
183- _worker_local.insert ({id, &w});
184-
185187 while (!_exiting){
186188 if (_task_queue.empty ()){
187189 w.ready = false ;
@@ -214,10 +216,12 @@ void BasicSpeculativeThreadpool<Func>::spawn(unsigned N) {
214216 } // End of while ------------------------------------------------------
215217 });
216218
217- } // End of For -------------------------------------------------------------
219+ _worker_local.insert ({_threads.back ().get_id (), &_works[i]});
220+ } // End of For ---------------------------------------------------------------------------------
218221}
219222
220223
224+ // Function: async
221225template < template <typename ...> class Func >
222226template <typename C>
223227auto BasicSpeculativeThreadpool<Func>::async(C&& c){
@@ -244,7 +248,7 @@ auto BasicSpeculativeThreadpool<Func>::async(C&& c){
244248 // speculation
245249 if (std::this_thread::get_id () != _owner){
246250 auto iter = _lookahead ();
247- if (iter != _worker_local.end () and iter->second ->task == nullptr ){
251+ if (iter != _worker_local.end () && iter->second ->task == nullptr ){
248252 iter->second ->task =
249253 [p = MoC (std::move (p)), c = std::forward<C>(c)]() mutable {
250254 c ();
@@ -280,7 +284,7 @@ auto BasicSpeculativeThreadpool<Func>::async(C&& c){
280284 // speculation
281285 if (std::this_thread::get_id () != _owner){
282286 auto iter = _lookahead ();
283- if (iter != _worker_local.end () and iter->second ->task == nullptr ){
287+ if (iter != _worker_local.end () && iter->second ->task == nullptr ){
284288 iter->second ->task =
285289 [p = MoC (std::move (p)), c = std::forward<C>(c)]() mutable {
286290 p.get ().set_value (c ());
@@ -327,7 +331,7 @@ void BasicSpeculativeThreadpool<Func>::silent_async(C&& c){
327331 // speculation
328332 if (std::this_thread::get_id () != _owner){
329333 auto iter = _lookahead ();
330- if (iter != _worker_local.end () and iter->second ->task == nullptr ){
334+ if (iter != _worker_local.end () && iter->second ->task == nullptr ){
331335 iter->second ->task = std::move (t);
332336 return ;
333337 }
0 commit comments