ä¸æ¥ãä¸å pthread åå¼·ã
ã¯ããã«
pthread でキューを書いてみる - IT戦記ã«é¢ãã¦ãid:n-soda ããããã¨ã¦ãè²´éãªã¢ããã¤ã¹ã沢山é ããã®ã§ãã¹ã¦ç´ãã¦ããããã¨æãã¾ãã
æ¬å½ã«ãããã¨ããããã¾ãã
ææãä¼ã£ãç¹ã¾ã¨ã
å帰å¼ã³åºãã®åé¡
ãã¥ã¼ãä¸æ¯ã®å ´åã« enQ() ã®å帰å¼ã³åºããã¦ããã®ãå¤ã§ãããã㯠while ã§æ¸ããã¯ãã§ããCã®å ´åãæ«å°¾å帰ã®åé¤ãããªãå¦çç³»ãã»ã¨ãã©ã§ããããã®ã±ã¼ã¹ã§ã¯ãdeQ() å´ã®ã¹ã¬ãã㨠enQ() å´ã®ã¹ã¬ããã§é度差ãããã ãã§å帰ãã¦ãã¾ãã¾ããããã¹ã¿ãã¯ãç¸å½ç¡é§ã«æ¶è²»ãã¾ããã¹ã¿ãã¯ãªã¼ãã¼ããã¼ã§æ»ãã§ãã¾ãå¯è½æ§ãããã¾ãã
http://d.hatena.ne.jp/amachang/20080611/1213165364#c1213677983
sleep(0) ã®åé¡
ãã¥ã¼ã®ç©ºãå¾ ã¡ã« pthread_cond_wait() ã使ãããsleep(0) ãã¦ãã®ã¯å¤ã§ããå½ç¶ deQ() å´ã§ã¯ããã¥ã¼ãä¸æ¯ã®ç¶æ ãã空ãããã«å¤ãã£ãããpthread_cond_signal() ãã¾ããããããã¨ãsleep(0) ã¨ã sched_yield() ã®å¼ã³åºãã¯ãããããä¸è¦ã«ãªãã¾ãã
http://d.hatena.ne.jp/amachang/20080611/1213165364#c1213677983
-1 ã®åé¡
空ãã®æå³ã§ -1 ã®ãããªç¹å¥ãªå¤ã使ãã®ã¯ä¸è¬æ§ã®ä½ãããã°ã©ãã³ã°ã¹ã¿ã¤ã«ã§ããä»åã®ä¾ã§ã¯ããã¨ãã°ãã¥ã¼ä¸ã®æå¹ãªãã¼ã¿æ°ã示ãå¤æ°ãå°å ¥ããããã®å¤æ° >= sããªããã¥ã¼ãä¸æ¯ã ã¨å¤æããã°æ¸ãã¯ãã§ãã
http://d.hatena.ne.jp/amachang/20080611/1213165364#c1213677983
å¤æ°åã®åé¡
å¤æ°åãåããã«ããã¨æãã¾ããç¹ã«æ¡ä»¶å¤æ°ã¯ããã®å¤æ°ã示ãæ¡ä»¶ã®ååãã¤ããã¹ãã§ããä»åãªã c â non_emptyã(2)ã§æ¸ããæ¡ä»¶å¤æ°ânon_full ã¨åä»ããã¹ãã§ãããããã¨ããµã¤ã nâinãlâoutãsâsize ãããªãã§ããï¼
http://d.hatena.ne.jp/amachang/20080611/1213165364#c1213677983
static å¤æ°ã®åé¡
ããã C ã ããã¨ãã£ã¦ãstatic å¤æ°ã«ããããããªãã¦ããã¥ã¼æ§é ä½ã«ãã¦ãåé¢æ°ã«ã¯æ示çã«æ§é ä½ã¸ã®ãã¤ã³ã¿ã渡ããããã¯ããæ¹ãããã¾ã©ãè¯ãã¨æãã¾ãã
http://d.hatena.ne.jp/amachang/20080611/1213165364#c1213677983
ããããè¸ã¾ãã¦ããä¸åº¦æ¸ãç´ãã¦ã¿ã¾ããã
#include <stdio.h> #include <stdlib.h> #include <memory.h> #include <pthread.h> #include <assert.h> // æ§é ä½: queue_t typedef struct queue { unsigned int in; // 次ã«å ¥ããã¤ã³ããã¯ã¹ unsigned int out; // 次ã«åºãã¤ã³ããã¯ã¹ unsigned int size; // ãã¥ã¼ã®ãµã¤ãº unsigned int no_full; // æºã¿ã³ã«ãªã㨠0 ã«ãªã unsigned int no_empty; // 空ã£ã½ã«ãªã㨠0 ã«ãªã pthread_mutex_t mutex; pthread_cond_t cond_full; // ãã¼ã¿ãæºã¿ã³ã®ã¨ãã«å¾ ã¤ããã® cond pthread_cond_t cond_empty; // ãã¼ã¿ã空ã®ã¨ãã«å¾ ã¤ããã® cond void* buffer[1]; // ãããã¡ // ãã以ä¸ã®ã¡ã¢ãªã¯ p_queue->buffer[3] ãªã©ã¨ãã¦åç §ããã } queue_t; // é¢æ°å: create_queue // size ã§æå®ããããµã¤ãºã®ãããã¡ãæ㤠queue_t ãçæ&åæåãããã®ãã¤ã³ã¿ãè¿ã // destroy_queue ã§è§£æ¾ããå¿ è¦ããã // ã¡ã¢ãªã®ç¢ºä¿ã«å¤±æããå ´å㯠NULL ãè¿ã queue_t* create_queue (size_t size) { queue_t* p_queue; int memsize = sizeof(queue_t) + (size - 1) * sizeof(void*); // ã¡ã¢ãªã®ç¢ºä¿ // struct queue_t ã®ãµã¤ãºã¯ buffer ãµã¤ãºã¨ã㦠1 ãå«ãã§ããã®ã§ 1 ãå¼ã p_queue = (queue_t*)malloc(memsize); if (p_queue != NULL) { // ã¡ã¢ãªã 0 ã§åæå memset(p_queue, 0x00, memsize); // ãããã¡ã®ãµã¤ãº p_queue->size = size; // ãã©ã°ã®åæå p_queue->no_full = size; p_queue->no_empty = 0; // pthread_mutex_t ã®åæå p_queue->mutex = (pthread_mutex_t) PTHREAD_MUTEX_INITIALIZER; // ã¨ã©ã¼ãã§ãã¯æã mutex pthread_mutex_init(&p_queue->mutex, NULL); // pthread_cond_t ã®åæå p_queue->cond_full = (pthread_cond_t) PTHREAD_COND_INITIALIZER; p_queue->cond_empty = (pthread_cond_t) PTHREAD_COND_INITIALIZER; pthread_cond_init(&p_queue->cond_full, NULL); pthread_cond_init(&p_queue->cond_empty, NULL); } return p_queue; } // é¢æ°å: destroy_queue // create_queue ã§ç¢ºä¿ãããã¡ã¢ãªã解æ¾ãã // åæåããã¦ãã mutex ã cond ãç ´æ£ãã void destroy_queue (queue_t* p_queue) { int rv; if (p_queue != NULL) { // pthread_mutex_t ã®ç ´æ£ rv = pthread_mutex_destroy(&p_queue->mutex); assert(rv == 0); // pthread_mutex_t ã®ç ´æ£ rv = pthread_cond_destroy(&p_queue->cond_full); assert(rv == 0); rv = pthread_cond_destroy(&p_queue->cond_empty); assert(rv == 0); // ã¡ã¢ãªã®è§£æ¾ free(p_queue); } } // é¢æ°å: enqueue // ãã¥ã¼ã«ãã¼ã¿ãå ¥ãã // ãã¼ã¿ãæºã¿ã³ãªå ´åã¯ããããã¯ãã¾ã void enqueue (queue_t* p_queue, void* data) { pthread_mutex_lock(&p_queue->mutex); // -- ãããããã¯ãªãã£ã«ã«ã»ã¯ã·ã§ã³ -- // æºã¿ã³ãããªããªãã¾ã§å¾ 㤠while (!p_queue->no_full) { pthread_cond_wait(&p_queue->cond_full, &p_queue->mutex); } // ãã¼ã¿ãå ¥ãã p_queue->buffer[p_queue->in] = data; // 次ã«ãã¼ã¿ãå ¥ããå ´æãã¤ã³ã¯ãªã¡ã³ã p_queue->in++; p_queue->in %= p_queue->size; // ãã©ã°ã®æ´æ° p_queue->no_full--; p_queue->no_empty++; // -- ããã¾ã§ãã¯ãªãã£ã«ã«ã»ã¯ã·ã§ã³ -- pthread_mutex_unlock(&p_queue->mutex); pthread_cond_signal(&p_queue->cond_empty); } // é¢æ°å: dequeue // ãã¥ã¼ã«ãã¼ã¿ãå ¥ãã // ãã¼ã¿ãæºã¿ã³ãªå ´åã¯ããããã¯ãã¾ã void* dequeue (queue_t* p_queue) { void* result; pthread_mutex_lock(&p_queue->mutex); // -- ãããããã¯ãªãã£ã«ã«ã»ã¯ã·ã§ã³ -- // 空ã£ã½ãããªããªãã¾ã§å¾ 㤠while (!p_queue->no_empty) { pthread_cond_wait(&p_queue->cond_empty, &p_queue->mutex); } // ãã¼ã¿ãåãåºã result = p_queue->buffer[p_queue->out]; // 次ã«ãã¼ã¿ãåãåºãå ´æãã¤ã³ã¯ãªã¡ã³ã p_queue->out++; p_queue->out %= p_queue->size; // ãã©ã°ã®æ´æ° p_queue->no_full++; p_queue->no_empty--; // -- ããã¾ã§ãã¯ãªãã£ã«ã«ã»ã¯ã·ã§ã³ -- pthread_mutex_unlock(&p_queue->mutex); pthread_cond_signal(&p_queue->cond_full); return result; } void inspect_queue(queue_t* p_queue) { pthread_mutex_lock(&p_queue->mutex); printf( "queue(%p) {\n" " size: %d;\n" " length: %d;\n" "}\n\n", p_queue, p_queue->size, p_queue->no_empty); pthread_mutex_unlock(&p_queue->mutex); } void* enqueue_thread_func(void *p) { queue_t *p_queue = (queue_t*) p; char* p_data; int i = 100; while (i--) { p_data = malloc(5 * sizeof(char)); strcpy(p_data, "hoge"); enqueue(p_queue, p_data); } return NULL; } void* dequeue_thread_func(void *p) { queue_t *p_queue = (queue_t*) p; char* p_data; int i = 100; while (i--) { p_data = dequeue(p_queue); free(p_data); } return NULL; } #define N_ENQUEUE_THREADS 10 #define N_DEQUEUE_THREADS 10 int main (int argc, char** argv) { queue_t* p_queue = create_queue(10); int rv; int i; pthread_t enqueue_threads[N_ENQUEUE_THREADS] = {0,}; pthread_t dequeue_threads[N_DEQUEUE_THREADS] = {0,}; for (i = 0; i < N_ENQUEUE_THREADS; i ++) { rv = pthread_create(enqueue_threads + i, NULL, enqueue_thread_func, p_queue); assert(rv == 0); } for (i = 0; i < N_DEQUEUE_THREADS; i ++) { rv = pthread_create(dequeue_threads + i, NULL, dequeue_thread_func, p_queue); assert(rv == 0); } for (i = 0; i < N_ENQUEUE_THREADS; i ++) { rv = pthread_join(enqueue_threads[i], NULL); assert(rv == 0); } for (i = 0; i < N_DEQUEUE_THREADS; i ++) { rv = pthread_join(dequeue_threads[i], NULL); assert(rv == 0); } destroy_queue(p_queue); return 0; }
ä»åº¦ã¯ã©ãã§ãããã
ä»åº¦ã¯ dtrace 㧠lock, unlock, block ã®åæ°ãæ°ãã
$ sudo dtrace -n " > plockstat\$target::pthread_mutex_unlock:mutex-release, > plockstat\$target::pthread_mutex_lock:mutex-acquire, > plockstat\$target::pthread_mutex_lock:mutex-block, > plockstat\$target::_pthread_cond_wait:mutex-release > { @[probefunc,probename,arg0] = count() } >" -c ./a.out dtrace: description 'plockstat$target::pthread_mutex_unlock:mutex-release, plockstat$target::pthread_mutex_lock:mutex-acquire, plockstat$target::pthread_mutex_lock:mutex-block, plockstat$target::_pthread_cond_wait:mutex-release ' matched 4 probes dtrace: pid 21145 has exited pthread_mutex_lock mutex-acquire 2693110208 3 pthread_mutex_unlock mutex-release 2693110208 3 _pthread_cond_wait mutex-release 1048996 59 pthread_mutex_lock mutex-block 1048996 1966 pthread_mutex_unlock mutex-release 1048996 2000 pthread_mutex_lock mutex-acquire 1048996 2059 $
2000 å unlock ããã¦ãã 1048996 ã¨ããã¢ãã¬ã¹ã件㮠mutex ã®ã¢ãã¬ã¹ãªã®ã§
- 2059 å lock (wait ããã®å¾©å¸°ãå«ã¾ãã) ãã
- 59 å wait ã
- 2000 å unlock ãã
- 1966 å block ãããã¨ããããã¾ã
ã¡ããã¨åãã¦ããã§ããã
ã¾ã¨ã
çæ§ã®ã¢ããã¤ã¹ã®ãããã§ãã¨ã¦ãåå¼·ã«ãªãã¾ããã
æ¬å½ã«ãããã¨ããããã¾ãã
ï¼è¿½è¨ï¼ããã«ã¢ããã¤ã¹ãé ããã®ã§
ããä¸åç´ãã¦ã¿ã¾ããï¼ãã£ãï¼
pthread でキューを作ってみる(再々挑戦、最終版) - IT戦記