Skip to content

Commit 1aa65aa

Browse files
version 2.0 initial check-in
1 parent c061dec commit 1aa65aa

File tree

5 files changed

+655
-259
lines changed

5 files changed

+655
-259
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -736,6 +736,7 @@ Cpp-Taskflow is being actively developed and contributed by the following people
736736
- [Nan Xiao](https://github.com/NanXiao) fixed compilation error of unittest on the Arch platform.
737737
- [Vladyslav](https://github.com/innermous) fixed comment errors in README.md and examples.
738738
- [vblanco20-1](https://github.com/vblanco20-1) fixed compilation error on Microsoft Visual Studio.
739+
- [Glen Fraser](https://github.com/totalgee) helped edit a standalone and C++14-compatible threadpool for taskflow.
739740

740741
Please [let me know][email me] if I forgot someone!
741742

taskflow/taskflow.hpp

Lines changed: 2 additions & 255 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
#include <cassert>
4141
#include <optional>
4242

43+
#include <threadpool/threadpool.hpp>
44+
4345
// Clang mis-interprets variant's get as a non-friend of variant and cannot
4446
// get compiled correctly. We use the patch:
4547
// https://gcc.gnu.org/viewcvs/gcc?view=revision&revision=258854
@@ -124,261 +126,6 @@ struct is_iterable<T, std::void_t<decltype(std::declval<T>().begin()),
124126
template <typename T>
125127
inline constexpr bool is_iterable_v = is_iterable<T>::value;
126128

127-
//------------------------------------------------------------------------------
128-
// Utility
129-
//------------------------------------------------------------------------------
130-
131-
// Struct: MoC
132-
template <typename T>
133-
struct MoC {
134-
135-
MoC(T&& rhs) : object(std::move(rhs)) {}
136-
MoC(const MoC& other) : object(std::move(other.object)) {}
137-
138-
T& get() { return object; }
139-
140-
mutable T object;
141-
};
142-
143-
template <typename T>
144-
MoC(T&&) -> MoC<T>;
145-
146-
template <typename T, typename C, typename... Args>
147-
void set_promise_on_invoke(std::promise<T>& p, C&& c, Args&&... args) {
148-
p.set_value(std::invoke(c, args...));
149-
}
150-
151-
template <typename C, typename... Args>
152-
void set_promise_on_invoke(std::promise<void>&p, C&& c, Args&&... args) {
153-
std::invoke(c, args...);
154-
p.set_value();
155-
}
156-
157-
//------------------------------------------------------------------------------
158-
// Threadpool definition
159-
//------------------------------------------------------------------------------
160-
161-
// Class: Threadpool
162-
class Threadpool {
163-
164-
enum class Signal {
165-
STANDARD,
166-
SHUTDOWN
167-
};
168-
169-
public:
170-
171-
inline Threadpool(unsigned);
172-
inline ~Threadpool();
173-
174-
template <typename C>
175-
auto async(C&&, Signal = Signal::STANDARD);
176-
177-
template <typename C>
178-
auto silent_async(C&&, Signal = Signal::STANDARD);
179-
180-
inline void shutdown();
181-
inline void spawn(unsigned);
182-
183-
inline size_t num_tasks() const;
184-
inline size_t num_workers() const;
185-
186-
inline bool is_worker() const;
187-
188-
private:
189-
190-
mutable std::mutex _mutex;
191-
192-
std::condition_variable _worker_signal;
193-
std::deque<std::function<Signal()>> _task_queue;
194-
std::vector<std::thread> _threads;
195-
std::unordered_set<std::thread::id> _worker_ids;
196-
};
197-
198-
// Constructor
199-
inline Threadpool::Threadpool(unsigned N) {
200-
spawn(N);
201-
}
202-
203-
// Destructor
204-
inline Threadpool::~Threadpool() {
205-
shutdown();
206-
}
207-
208-
// Function: num_tasks
209-
// Return the number of "unfinished" tasks.
210-
// Notice that this value is not necessary equal to the size of the task_queue
211-
// since the task can be popped out from the task queue while
212-
// not yet finished.
213-
inline size_t Threadpool::num_tasks() const {
214-
return _task_queue.size();
215-
}
216-
217-
inline size_t Threadpool::num_workers() const {
218-
return _threads.size();
219-
}
220-
221-
inline bool Threadpool::is_worker() const {
222-
std::scoped_lock<std::mutex> lock(_mutex);
223-
return _worker_ids.find(std::this_thread::get_id()) != _worker_ids.end();
224-
}
225-
226-
// Procedure: spawn
227-
// The procedure spawns "n" threads monitoring the task queue and executing each task.
228-
// After the task is finished, the thread reacts to the returned signal.
229-
inline void Threadpool::spawn(unsigned N) {
230-
231-
if(is_worker()) {
232-
throw std::runtime_error("Worker thread cannot spawn threads");
233-
}
234-
235-
for(size_t i=0; i<N; ++i) {
236-
237-
_threads.emplace_back([this] () -> void {
238-
239-
{ // Acquire lock
240-
std::scoped_lock<std::mutex> lock(_mutex);
241-
_worker_ids.insert(std::this_thread::get_id());
242-
}
243-
244-
bool stop {false};
245-
246-
while(!stop) {
247-
decltype(_task_queue)::value_type task;
248-
249-
{ // Acquire lock. --------------------------------
250-
std::unique_lock<std::mutex> lock(_mutex);
251-
_worker_signal.wait(lock, [this] () { return _task_queue.size() != 0; });
252-
task = std::move(_task_queue.front());
253-
_task_queue.pop_front();
254-
} // Release lock. --------------------------------
255-
256-
// Execute the task and react to the returned signal.
257-
switch(task()) {
258-
case Signal::SHUTDOWN:
259-
stop = true;
260-
break;
261-
262-
default:
263-
break;
264-
};
265-
266-
} // End of worker loop.
267-
268-
{ // Acquire lock
269-
std::scoped_lock<std::mutex> lock(_mutex);
270-
_worker_ids.erase(std::this_thread::get_id());
271-
}
272-
273-
});
274-
}
275-
}
276-
277-
// Function: silent_async
278-
// Insert a task without giving future.
279-
template <typename C>
280-
auto Threadpool::silent_async(C&& c, Signal sig) {
281-
282-
// No worker, do this right away.
283-
if(num_workers() == 0) {
284-
c();
285-
}
286-
// Dispatch this to a thread.
287-
else {
288-
{
289-
std::unique_lock lock(_mutex);
290-
_task_queue.emplace_back(
291-
[c=std::forward<C>(c), ret=sig] () mutable {
292-
c();
293-
return ret;
294-
}
295-
);
296-
}
297-
_worker_signal.notify_one();
298-
}
299-
}
300-
301-
// Function: async
302-
// Insert a callable task and return a future representing the task.
303-
template<typename C>
304-
auto Threadpool::async(C&& c, Signal sig) {
305-
306-
using R = std::invoke_result_t<C>;
307-
308-
std::promise<R> p;
309-
auto fu = p.get_future();
310-
311-
// No worker, do this immediately.
312-
if(_threads.empty()) {
313-
if constexpr(std::is_same_v<void, R>) {
314-
c();
315-
p.set_value();
316-
}
317-
else {
318-
p.set_value(c());
319-
}
320-
}
321-
// Schedule a thread to do this.
322-
else {
323-
{
324-
std::unique_lock lock(_mutex);
325-
326-
if constexpr(std::is_same_v<void, R>) {
327-
_task_queue.emplace_back(
328-
[p = MoC(std::move(p)), c = std::forward<C>(c), ret = sig]() mutable {
329-
c();
330-
p.get().set_value();
331-
return ret;
332-
}
333-
);
334-
}
335-
else {
336-
_task_queue.emplace_back(
337-
[p = MoC(std::move(p)), c = std::forward<C>(c), ret = sig]() mutable {
338-
p.get().set_value(c());
339-
return ret;
340-
}
341-
);
342-
}
343-
344-
// This can cause MSVS not to compile ...
345-
/*_task_queue.emplace_back(
346-
[p=MoC(std::move(p)), c=std::forward<C>(c), ret=sig] () mutable {
347-
if constexpr(std::is_same_v<void, R>) {
348-
c();
349-
p.get().set_value();
350-
}
351-
else {
352-
p.get().set_value(c());
353-
}
354-
return ret;
355-
}
356-
);*/
357-
}
358-
_worker_signal.notify_one();
359-
}
360-
return fu;
361-
}
362-
363-
// Procedure: shutdown
364-
// Remove a given number of workers. Notice that only the master can call this procedure.
365-
inline void Threadpool::shutdown() {
366-
367-
if(is_worker()) {
368-
throw std::runtime_error("Worker thread cannot shut down the thread pool");
369-
}
370-
371-
for(size_t i=0; i<_threads.size(); ++i) {
372-
silent_async([](){}, Signal::SHUTDOWN);
373-
}
374-
375-
for(auto& t : _threads) {
376-
t.join();
377-
}
378-
379-
_threads.clear();
380-
}
381-
382129
//-----------------------------------------------------------------------------
383130
// Taskflow definition
384131
//-----------------------------------------------------------------------------

0 commit comments

Comments
 (0)