スレッドプールの実装方法について
March 03, 2014 at 08:58 PM | categories: unix, programming |スレッドプール(thread pool)を実装するには、暇なときはthreadを寝かせておいて必要なときに起こす、というイベント通知の仕組みが必要になる。 UnixでC/C++で実装するときはpthreadの条件変数を使うのが普通だと思われるが、適当なファイルディスクリプタをopenしておいてread等でブロックさせる方法でも実装できそう。
どのようなやり方が一般的なのか、いくつか有名どころのOSSの実装を調べてみた。
libuvの場合
https://github.com/joyent/libuv
単純にpthread_cond_wait
をつかっている 1
static void worker(void* arg) { struct uv__work* w; QUEUE* q; (void) arg; for (;;) { uv_mutex_lock(&mutex); while (QUEUE_EMPTY(&wq)) uv_cond_wait(&cond, &mutex); q = QUEUE_HEAD(&wq); if (q == &exit_message) uv_cond_signal(&cond); else { QUEUE_REMOVE(q); QUEUE_INIT(q); /* Signal uv_cancel() that the work req is executing. */ } uv_mutex_unlock(&mutex); if (q == &exit_message) break; w = QUEUE_DATA(q, struct uv__work, wq); w->work(w); uv_mutex_lock(&w->loop->wq_mutex); w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */ QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq); uv_async_send(&w->loop->wq_async); uv_mutex_unlock(&w->loop->wq_mutex); } }
Boost.Asioの場合
http://www.boost.org/doc/libs/1_55_0/doc/html/boost_asio.html
Boost.Asio
にスレッドプールそのものは提供されてないが以下のようにして簡単に実装することができる
#include <thread> #include <functional> #include <boost/asio.hpp> int main ( int argc, char* argv[] ) { asio::io_service io_service; asio::io_service::work work(io_service); std::vector<std::thread> threadPool; for(size_t t = 0; t < std::thread::hardware_concurrency(); t++){ threadPool.push_back(thread(std::bind(&asio::io_service::run, &io_service))); } io_service.post(std::bind(an_expensive_calculation, 42)); io_service.post(std::bind(a_long_running_task, 123)); //Do some things with the main thread io_service.stop(); for(std::thread& t : threadPool) { t.join(); } }
http://stackoverflow.com/questions/14265676/using-boostasio-thread-pool-for-general-purpose-tasks
長くなるのでコードは省略するが、io_service::post
するとunixの場合は最終的にはtask_io_service::wake_one_idle_thread_and_unlock
からpthread_cond_signal
が呼ばれる。
memcachedの場合
https://github.com/memcached/memcached
libevent
のイベント通知機能を利用して実装している。それぞれのthread初期化の際にpipeをつくって、そのfdをlibeventに渡す。 2 libevent内部でそのfdをepoll
なりkqueue
なりでブロックして待つ。
// // memcached.c // typedef struct { pthread_t thread_id; /* unique ID of this thread */ struct event_base *base; /* libevent handle this thread uses */ struct event notify_event; /* listen event for notify pipe */ int notify_receive_fd; /* receiving end of notify pipe */ int notify_send_fd; /* sending end of notify pipe */ struct thread_stats stats; /* Stats generated by this thread */ struct conn_queue *new_conn_queue; /* queue of new connections to handle */ cache_t *suffix_cache; /* suffix cache */ uint8_t item_lock_type; /* use fine-grained or global item lock */ } LIBEVENT_THREAD; // // thread.c // void thread_init(int nthreads, struct event_base *main_base) { // // 中略 // threads = calloc(nthreads, sizeof(LIBEVENT_THREAD)); // // さらに中略 // for (i = 0; i < nthreads; i++) { int fds[2]; if (pipe(fds)) { perror("Can't create notify pipe"); exit(1); } threads[i].notify_receive_fd = fds[0]; threads[i].notify_send_fd = fds[1]; setup_thread(&threads[i]); /* Reserve three fds for the libevent base, and two for the pipe */ stats.reserved_fds += 5; } /* Create threads after we've done all the libevent setup. */ for (i = 0; i < nthreads; i++) { create_worker(worker_libevent, &threads[i]); } /* Wait for all the threads to set themselves up before returning. */ pthread_mutex_lock(&init_lock); wait_for_thread_registration(nthreads); pthread_mutex_unlock(&init_lock); }
pthread_cond_waitの実装
脱線するが、pthread_cond_waitがどのようにsleepにはいってるのか気になったので調べた。
https://sourceware.org/git/?p=glibc.git;a=tree;f=nptl;hb=HEAD
pthread_cond_wait
のソースコードはglibc
のnptl
以下にある。
__pthread_cond_wait
がlll_futex_wait
を呼んでおり、これは以下のように実装されている。(以下はx86_64のもの)
#define lll_futex_wait(futex, val, private) \ lll_futex_timed_wait(futex, val, NULL, private) #define lll_futex_timed_wait(futex, val, timeout, private) \ ({ \ register const struct timespec *__to __asm ("r10") = timeout; \ int __status; \ register __typeof (val) _val __asm ("edx") = (val); \ __asm __volatile ("syscall" \ : "=a" (__status) \ : "0" (SYS_futex), "D" (futex), \ "S" (__lll_private_flag (FUTEX_WAIT, private)), \ "d" (_val), "r" (__to) \ : "memory", "cc", "r11", "cx"); \ __status; \ })
上記アセンブラは大体以下のような意味3
futex(futex, FUTEX_WAIT, val, timeout, NULL, 0); // 便宜上、上記コードの引数の変数名をそのままつかっているが、 // 1つめのfutexはシステムコールのfutexで、 // 2つめは引数のpthread_cond_tの__futexメンバ変数のアドレス
futex() システムコールは、 指定したアドレスの値が変更されるのをプログラムが待つ手段や 特定のアドレスに対して待機中のプロセスを wake (起床) させる手段を提供する
futex(2) http://linuxjm.sourceforge.jp/html/LDP_man-pages/man2/futex.2.html
とのこと。
まとめ
- pthread_cond_waitをつかったもののほうが普通は高速なはず
- memcachedのようなやりかただとユーザプロセス側でスレッドプール管理のための排他制御がほとんど不要になるので多少実装が簡単か
blog comments powered by Disqus
About Me
mojavy |
Recent posts
95/5 Mbps とは
(August 30, 2015 at 04:22 PM)組み込み用プログラミング言語のパフォーマンス比較
(April 21, 2015 at 01:10 AM)最近読んだ本
(April 05, 2015 at 01:23 PM)Phabricatorを使ったワークフローについて
(March 02, 2015 at 08:55 PM)dnsimpleでダイナミックDNSをつかう
(December 23, 2014 at 08:02 PM)www2014のアドテク関連のResearch Trackメモ
(October 06, 2014 at 09:05 PM)flappymacs がMELPAに登録されました
(July 16, 2014 at 01:07 AM)EmacsでFlappy Birdっぽいもの書きました
(July 10, 2014 at 08:01 PM)
Recent Popular posts
Popular posts
Categories
- C (rss) (3)
- R (rss) (1)
- adtech (rss) (1)
- advent calendar (rss) (2)
- algorithms (rss) (2)
- android (rss) (2)
- aws (rss) (1)
- blog (rss) (2)
- blogofile (rss) (3)
- books (rss) (1)
- c++ (rss) (1)
- chef (rss) (4)
- common lisp (rss) (10)
- debian (rss) (2)
- dns (rss) (1)
- elasticsearch (rss) (1)
- elf (rss) (1)
- elisp (rss) (1)
- emacs (rss) (5)
- english (rss) (1)
- game (rss) (2)
- gearman (rss) (1)
- git (rss) (1)
- github (rss) (1)
- gitlab (rss) (1)
- golang (rss) (2)
- history (rss) (1)
- impress.js (rss) (1)
- internet (rss) (1)
- ios (rss) (3)
- jekyll (rss) (1)
- jenkins (rss) (1)
- linux (rss) (4)
- lisp (rss) (2)
- ltsv (rss) (1)
- lua (rss) (1)
- mac (rss) (3)
- mach-o (rss) (1)
- memo (rss) (2)
- mustache (rss) (1)
- note (rss) (1)
- objective-c (rss) (4)
- os (rss) (1)
- osx (rss) (2)
- others (rss) (1)
- paco (rss) (1)
- pdf (rss) (1)
- php (rss) (2)
- postfix (rss) (1)
- programming (rss) (12)
- project management (rss) (1)
- python (rss) (5)
- quicklinks (rss) (6)
- raspberry pi (rss) (2)
- redmine (rss) (1)
- reveal.js (rss) (1)
- ruby (rss) (10)
- sbcl (rss) (2)
- security (rss) (1)
- shell (rss) (2)
- smtp (rss) (1)
- solr (rss) (1)
- statistics (rss) (2)
- tips (rss) (10)
- tmux (rss) (3)
- toml (rss) (1)
- tools (rss) (1)
- twitter (rss) (1)
- ubuntu (rss) (1)
- unix (rss) (5)
- v8 (rss) (1)
- web (rss) (7)
- xcode (rss) (1)
- zeromq (rss) (2)
Archives
- August 2015 (1)
- April 2015 (2)
- March 2015 (1)
- December 2014 (1)
- October 2014 (1)
- July 2014 (3)
- March 2014 (6)
- February 2014 (4)
- November 2013 (3)
- October 2013 (4)
- September 2013 (2)
- July 2013 (2)
- June 2013 (2)
- May 2013 (1)
- April 2013 (6)
- March 2013 (3)
- February 2013 (8)
- January 2013 (5)
- December 2012 (1)
- November 2012 (6)
- October 2012 (7)
- August 2012 (1)
- July 2012 (9)
- June 2012 (1)
- April 2012 (1)
- December 2011 (2)
- November 2011 (2)