|
40 | 40 | #include <cassert> |
41 | 41 | #include <optional> |
42 | 42 |
|
| 43 | +#include <threadpool/threadpool.hpp> |
| 44 | + |
43 | 45 | // Clang mis-interprets variant's get as a non-friend of variant and cannot |
44 | 46 | // get compiled correctly. We use the patch: |
45 | 47 | // https://gcc.gnu.org/viewcvs/gcc?view=revision&revision=258854 |
@@ -124,261 +126,6 @@ struct is_iterable<T, std::void_t<decltype(std::declval<T>().begin()), |
124 | 126 | template <typename T> |
125 | 127 | inline constexpr bool is_iterable_v = is_iterable<T>::value; |
126 | 128 |
|
127 | | -//------------------------------------------------------------------------------ |
128 | | -// Utility |
129 | | -//------------------------------------------------------------------------------ |
130 | | - |
131 | | -// Struct: MoC |
132 | | -template <typename T> |
133 | | -struct MoC { |
134 | | - |
135 | | - MoC(T&& rhs) : object(std::move(rhs)) {} |
136 | | - MoC(const MoC& other) : object(std::move(other.object)) {} |
137 | | - |
138 | | - T& get() { return object; } |
139 | | - |
140 | | - mutable T object; |
141 | | -}; |
142 | | - |
143 | | -template <typename T> |
144 | | -MoC(T&&) -> MoC<T>; |
145 | | - |
146 | | -template <typename T, typename C, typename... Args> |
147 | | -void set_promise_on_invoke(std::promise<T>& p, C&& c, Args&&... args) { |
148 | | - p.set_value(std::invoke(c, args...)); |
149 | | -} |
150 | | - |
151 | | -template <typename C, typename... Args> |
152 | | -void set_promise_on_invoke(std::promise<void>&p, C&& c, Args&&... args) { |
153 | | - std::invoke(c, args...); |
154 | | - p.set_value(); |
155 | | -} |
156 | | - |
157 | | -//------------------------------------------------------------------------------ |
158 | | -// Threadpool definition |
159 | | -//------------------------------------------------------------------------------ |
160 | | - |
161 | | -// Class: Threadpool |
162 | | -class Threadpool { |
163 | | - |
164 | | - enum class Signal { |
165 | | - STANDARD, |
166 | | - SHUTDOWN |
167 | | - }; |
168 | | - |
169 | | - public: |
170 | | - |
171 | | - inline Threadpool(unsigned); |
172 | | - inline ~Threadpool(); |
173 | | - |
174 | | - template <typename C> |
175 | | - auto async(C&&, Signal = Signal::STANDARD); |
176 | | - |
177 | | - template <typename C> |
178 | | - auto silent_async(C&&, Signal = Signal::STANDARD); |
179 | | - |
180 | | - inline void shutdown(); |
181 | | - inline void spawn(unsigned); |
182 | | - |
183 | | - inline size_t num_tasks() const; |
184 | | - inline size_t num_workers() const; |
185 | | - |
186 | | - inline bool is_worker() const; |
187 | | - |
188 | | - private: |
189 | | - |
190 | | - mutable std::mutex _mutex; |
191 | | - |
192 | | - std::condition_variable _worker_signal; |
193 | | - std::deque<std::function<Signal()>> _task_queue; |
194 | | - std::vector<std::thread> _threads; |
195 | | - std::unordered_set<std::thread::id> _worker_ids; |
196 | | -}; |
197 | | - |
198 | | -// Constructor |
199 | | -inline Threadpool::Threadpool(unsigned N) { |
200 | | - spawn(N); |
201 | | -} |
202 | | - |
203 | | -// Destructor |
204 | | -inline Threadpool::~Threadpool() { |
205 | | - shutdown(); |
206 | | -} |
207 | | - |
208 | | -// Function: num_tasks |
209 | | -// Return the number of "unfinished" tasks. |
210 | | -// Notice that this value is not necessary equal to the size of the task_queue |
211 | | -// since the task can be popped out from the task queue while |
212 | | -// not yet finished. |
213 | | -inline size_t Threadpool::num_tasks() const { |
214 | | - return _task_queue.size(); |
215 | | -} |
216 | | - |
217 | | -inline size_t Threadpool::num_workers() const { |
218 | | - return _threads.size(); |
219 | | -} |
220 | | - |
221 | | -inline bool Threadpool::is_worker() const { |
222 | | - std::scoped_lock<std::mutex> lock(_mutex); |
223 | | - return _worker_ids.find(std::this_thread::get_id()) != _worker_ids.end(); |
224 | | -} |
225 | | - |
226 | | -// Procedure: spawn |
227 | | -// The procedure spawns "n" threads monitoring the task queue and executing each task. |
228 | | -// After the task is finished, the thread reacts to the returned signal. |
229 | | -inline void Threadpool::spawn(unsigned N) { |
230 | | - |
231 | | - if(is_worker()) { |
232 | | - throw std::runtime_error("Worker thread cannot spawn threads"); |
233 | | - } |
234 | | - |
235 | | - for(size_t i=0; i<N; ++i) { |
236 | | - |
237 | | - _threads.emplace_back([this] () -> void { |
238 | | - |
239 | | - { // Acquire lock |
240 | | - std::scoped_lock<std::mutex> lock(_mutex); |
241 | | - _worker_ids.insert(std::this_thread::get_id()); |
242 | | - } |
243 | | - |
244 | | - bool stop {false}; |
245 | | - |
246 | | - while(!stop) { |
247 | | - decltype(_task_queue)::value_type task; |
248 | | - |
249 | | - { // Acquire lock. -------------------------------- |
250 | | - std::unique_lock<std::mutex> lock(_mutex); |
251 | | - _worker_signal.wait(lock, [this] () { return _task_queue.size() != 0; }); |
252 | | - task = std::move(_task_queue.front()); |
253 | | - _task_queue.pop_front(); |
254 | | - } // Release lock. -------------------------------- |
255 | | - |
256 | | - // Execute the task and react to the returned signal. |
257 | | - switch(task()) { |
258 | | - case Signal::SHUTDOWN: |
259 | | - stop = true; |
260 | | - break; |
261 | | - |
262 | | - default: |
263 | | - break; |
264 | | - }; |
265 | | - |
266 | | - } // End of worker loop. |
267 | | - |
268 | | - { // Acquire lock |
269 | | - std::scoped_lock<std::mutex> lock(_mutex); |
270 | | - _worker_ids.erase(std::this_thread::get_id()); |
271 | | - } |
272 | | - |
273 | | - }); |
274 | | - } |
275 | | -} |
276 | | - |
277 | | -// Function: silent_async |
278 | | -// Insert a task without giving future. |
279 | | -template <typename C> |
280 | | -auto Threadpool::silent_async(C&& c, Signal sig) { |
281 | | - |
282 | | - // No worker, do this right away. |
283 | | - if(num_workers() == 0) { |
284 | | - c(); |
285 | | - } |
286 | | - // Dispatch this to a thread. |
287 | | - else { |
288 | | - { |
289 | | - std::unique_lock lock(_mutex); |
290 | | - _task_queue.emplace_back( |
291 | | - [c=std::forward<C>(c), ret=sig] () mutable { |
292 | | - c(); |
293 | | - return ret; |
294 | | - } |
295 | | - ); |
296 | | - } |
297 | | - _worker_signal.notify_one(); |
298 | | - } |
299 | | -} |
300 | | - |
301 | | -// Function: async |
302 | | -// Insert a callable task and return a future representing the task. |
303 | | -template<typename C> |
304 | | -auto Threadpool::async(C&& c, Signal sig) { |
305 | | - |
306 | | - using R = std::invoke_result_t<C>; |
307 | | - |
308 | | - std::promise<R> p; |
309 | | - auto fu = p.get_future(); |
310 | | - |
311 | | - // No worker, do this immediately. |
312 | | - if(_threads.empty()) { |
313 | | - if constexpr(std::is_same_v<void, R>) { |
314 | | - c(); |
315 | | - p.set_value(); |
316 | | - } |
317 | | - else { |
318 | | - p.set_value(c()); |
319 | | - } |
320 | | - } |
321 | | - // Schedule a thread to do this. |
322 | | - else { |
323 | | - { |
324 | | - std::unique_lock lock(_mutex); |
325 | | - |
326 | | - if constexpr(std::is_same_v<void, R>) { |
327 | | - _task_queue.emplace_back( |
328 | | - [p = MoC(std::move(p)), c = std::forward<C>(c), ret = sig]() mutable { |
329 | | - c(); |
330 | | - p.get().set_value(); |
331 | | - return ret; |
332 | | - } |
333 | | - ); |
334 | | - } |
335 | | - else { |
336 | | - _task_queue.emplace_back( |
337 | | - [p = MoC(std::move(p)), c = std::forward<C>(c), ret = sig]() mutable { |
338 | | - p.get().set_value(c()); |
339 | | - return ret; |
340 | | - } |
341 | | - ); |
342 | | - } |
343 | | - |
344 | | - // This can cause MSVS not to compile ... |
345 | | - /*_task_queue.emplace_back( |
346 | | - [p=MoC(std::move(p)), c=std::forward<C>(c), ret=sig] () mutable { |
347 | | - if constexpr(std::is_same_v<void, R>) { |
348 | | - c(); |
349 | | - p.get().set_value(); |
350 | | - } |
351 | | - else { |
352 | | - p.get().set_value(c()); |
353 | | - } |
354 | | - return ret; |
355 | | - } |
356 | | - );*/ |
357 | | - } |
358 | | - _worker_signal.notify_one(); |
359 | | - } |
360 | | - return fu; |
361 | | -} |
362 | | - |
363 | | -// Procedure: shutdown |
364 | | -// Remove a given number of workers. Notice that only the master can call this procedure. |
365 | | -inline void Threadpool::shutdown() { |
366 | | - |
367 | | - if(is_worker()) { |
368 | | - throw std::runtime_error("Worker thread cannot shut down the thread pool"); |
369 | | - } |
370 | | - |
371 | | - for(size_t i=0; i<_threads.size(); ++i) { |
372 | | - silent_async([](){}, Signal::SHUTDOWN); |
373 | | - } |
374 | | - |
375 | | - for(auto& t : _threads) { |
376 | | - t.join(); |
377 | | - } |
378 | | - |
379 | | - _threads.clear(); |
380 | | -} |
381 | | - |
382 | 129 | //----------------------------------------------------------------------------- |
383 | 130 | // Taskflow definition |
384 | 131 | //----------------------------------------------------------------------------- |
|
0 commit comments