読者です 読者をやめる 読者になる 読者になる

スレッド間通信のオーバーヘッドを比較する

pthread_系の関数は mutex か cond しか待てないが、select/poll/epoll はファイルディスクリプタしか待てないので、両方待ちたいときに困る。

解決方法はいろいろあると思いますが、私の思いつく範囲では以下の4つ。

selectで待ち、シグナルで割り込む
ファイルディスクリプタはselectで待つ。他のイベントはいったんキューに入れておき、シグナルを発生させてselectを中断させる。たしかlighttpdはこの方式だったはず。ただlighttpdはシングルスレッドなのでキューは使っていなかったような(うろ覚え)
selectで待ち、パイプで割り込む
selectで待つのだが、その中にpipe(2)で作ったパイプを1つ食わせておく。ファイルディスクリプタ以外のイベントはいったんキューに入れておき、パイプに1バイト書き込んでselectを中断させる。
selectで待ち、パイプにイベントを書き込む
同様にselectにpipe(2)を1つ仕込んでおく。ファイルディスクリプタ以外のイベントは、バイト列にシリアライズしてパイプに書き込む。「POSIX.1-2001 では、 PIPE_BUF バイト以下の write(2) は atomic に行われる」*1 ことが必須(MUST)になっているらしいので、たぶん安全。シリアライズできないデータは送れない。mp::iothreadsはこの方式。
ファイルディスクリプタ専用スレッドを用意する
基本はキューを作ってpthread_系関数で待つ。ファイルディスクリプタは別のスレッドで待ち、キューに入れて pthread_cond_signal で知らせる。ファイルディスクリプタのイベントは select待ち + pthread_cond待ち の2つが必要になるので、イベントの大半がファイルディスクリプタの場合はあまりやりたくない。


他に linux-2.6.22で追加されたeventfdやsignalfdsigqueue(2) も使えるかも。
とりあえずここではパイプを使ったスレッド間通信について、pthread_系と比べてどれくらいオーバーヘッドをがあるのかを比較してみました。

実装1 cond - キューを作って pthread_cond で待つ

比較対象として、普通にキューを使ってpthread_condで待つ方式。

#include "timer.h"
#include <deque>

typedef void* task_t;

typedef std::deque<task_t> queue_t;
static queue_t queue;

static unsigned int num;
static pthread_mutex_t mutex;
static pthread_cond_t cond;

static void* worker(void* arg)
{
    for(unsigned int i = 0; i < num; i++) {
        pthread_mutex_lock(&mutex);
        while(queue.empty()) {
            pthread_cond_wait(&cond, &mutex);
        }

        queue.pop_front();

        pthread_mutex_unlock(&mutex);
    }
    return NULL;
}

int main(int argc, char* argv[])
{
    num = atoi(argv[1]);

    pthread_mutex_init(&mutex, NULL);
    pthread_cond_init(&cond, NULL);

    pthread_t th;
    pthread_create(&th, NULL, worker, NULL);

    timer_initialize();

    for(unsigned int i = 0; i < num; i++) {
        task_t task;
        pthread_mutex_lock(&mutex);
        queue.push_back(task);
        pthread_cond_signal(&cond);
        pthread_mutex_unlock(&mutex);
    }

    pthread_join(th, NULL);

    timer_show();
}

途中の timer_initialize() や timer_show() は、gettimeofday(2) を使って時間を計測するもの:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <sys/time.h>
#include <unistd.h>
#include <sys/uio.h>
#include <sys/types.h>

static double g_timer;

double gettimeofday_sec()
{
    struct timeval tv;
    gettimeofday(&tv, NULL);
    return tv.tv_sec + (double)tv.tv_usec  * 1e-6;
}

void timer_initialize()
{
    g_timer = gettimeofday_sec();
}

void timer_show()
{
    printf("%f\n", gettimeofday_sec() - g_timer);
}

実装2 pipe_cond - パイプに1バイト書き込む

一端キューに入れておき、パイプに1バイト書き込んでイベント待ちを中断さ、キューから取り出す方式。

#include "timer.h"
#include <vector>

typedef void* task_t;

typedef std::vector<task_t> queue_t;
static queue_t queue;

static unsigned int num;
static int pair[2];
static pthread_mutex_t mutex;

static void* worker(void* arg)
{
    queue_t cache;
    char trash[1024];

    unsigned int count = num;
    while(count > 0) {
        if( read(pair[0], trash, sizeof(trash)) <= 0 ) {
            perror("read");
            exit(1);
        }
        pthread_mutex_lock(&mutex);
        queue.swap(cache);
        pthread_mutex_unlock(&mutex);

        for(queue_t::iterator i(cache.begin()), iend(cache.end());
                i != iend; ++i) {
            --count;
        }

        cache.clear();
    }

    return NULL;
}

int main(int argc, char* argv[])
{
    num = atoi(argv[1]);

    pipe(pair);

    pthread_mutex_init(&mutex, NULL);

    pthread_t th;
    pthread_create(&th, NULL, worker, NULL);

    timer_initialize();

    for(unsigned int i = 0; i < num; i++) {
        task_t task;
        pthread_mutex_lock(&mutex);
        queue.push_back(task);
        pthread_mutex_unlock(&mutex);
        if( write(pair[1], "\0", 1) <= 0 ) {
            perror("write");
            exit(1);
        }
    }

    pthread_join(th, NULL);

    timer_show();
}

実装3 pipe - パイプにシリアライズしたイベントを書き込む

パイプにシリアライズしたイベントを書き込んで、イベント待ちを中断させる方式。

#include "timer.h"

typedef void* task_t;

static unsigned int num;
static int pair[2];

static void* worker(void* arg)
{
    char buf[sizeof(task_t)];
    for(unsigned int i = 0; i < num; i++) {
        if( read(pair[0], buf, sizeof(task_t)) <= 0 ) {
            perror("read");
            exit(1);
        }
    }
    return NULL;
}

int main(int argc, char* argv[])
{
    num = atoi(argv[1]);

    pipe(pair);

    pthread_t th;
    pthread_create(&th, NULL, worker, NULL);

    timer_initialize();

    for(unsigned int i = 0; i < num; i++) {
        task_t task;
        if( write(pair[1], &task, sizeof(task_t)) <= 0 ) {
            perror("write");
            exit(1);
        }
    }

    pthread_join(th, NULL);

    timer_show();
}

実装4 pipe_cached - パイプにシリアライズしたイベントを書き込む + 複数イベントを同時に読み込む

実装3と同じだが、一度に複数のイベントを読み込んでread(2)の回数を減らす方式。高負荷時に効果が出る。

#include "timer.h"

typedef void* task_t;

static unsigned int num;
static int pair[2];

static void* worker(void* arg)
{
    char buf[sizeof(task_t) * 1024];
    char* tail = buf;
    char* head = buf;
    size_t carry = 0;

    unsigned int count = num;
    while(count > 0) {
        ssize_t len = read(pair[0], tail, sizeof(buf)-carry);
        if(len <= 0) {
            perror("read");
            exit(1);
        }
        tail += len;

        while(tail - head >= sizeof(task_t)) {
            --count;
            head += sizeof(task_t);
        }

        carry = tail - head;
        memcpy(buf, head, carry);
        head = buf;
        tail = buf + carry;
    }

    return NULL;
}

int main(int argc, char* argv[])
{
    num = atoi(argv[1]);

    pipe(pair);

    pthread_t th;
    pthread_create(&th, NULL, worker, NULL);

    timer_initialize();

    for(unsigned int i = 0; i < num; i++) {
        task_t task;
        if( write(pair[1], &task, sizeof(task_t)) <= 0 ) {
            perror("write");
            exit(1);
        }
    }

    pthread_join(th, NULL);

    timer_show();
}

実装5 newthread - 毎回新しいスレッドを作る

意外と速いかったりして?

#include "timer.h"

static void* func(void* arg)
{
    return NULL;
}

int main(int argc, char* argv[])
{
    unsigned int num = atoi(argv[1]);

    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

    timer_initialize();

    for(unsigned int i = 0; i < num; i++) {
        pthread_t th;
        pthread_create(&th, &attr, func, NULL);
    }

    timer_show();
}

結果

num=10, 100, 1000 の場合について、それぞれ100回ずつ実行して平均した結果:

実装 num=10 num=100 num=1000
cond 0.000048 sec 0.000061 sec 0.000131 sec
pipe_cond 0.000074 sec 0.000125 sec 0.000700 sec
pipe 0.000066 sec 0.000157 sec 0.001084 sec
pipe_cached 0.000064 sec 0.000117 sec 0.000654 sec
newthread 0.000251 sec 0.001241 sec 0.011458 sec

環境は Linux vcore.local 2.6.22.9-vcore16 #1 SMP Sun Oct 14 22:13:32 JST 2007 x86_64 AMD Athlon(tm) 64 X2 Dual Core Processor 5000+ GNU/Linux


プログラム一式: thread-pipe-overhead.tar.gz

*1:http://www.linux.or.jp/JM/html/LDP_man-pages/man7/pipe.7.html