非同期IOをpthreadでエミュレーションする - libpio

マルチスレッドを使って非同期IOをエミュレーションし、どんな種類のファイルディスクリプタでも、任意のIOを非同期でできるようにするライブラリ「libpio」(Parallel I/O Libraryのつもり)を作ってみました。このコンセプトがうまくいくのかどうか、テストの段階です。
Linux AIOやPOSIX AIOはaccept()ができなかったりして、プログラム全体を一つのイベントループで実装できなくて非常に困るのですが、libpioならシングルスレッド&イベントループ1つで非同期IOサーバーを書けます。
今のところLinuxだけで動きます。


libpio-0.0.1.tar.gz


※2007/10/29追記:kqueueでも実装できました。…なぜかちゃんと動かないのですが…。使う側のコードは変更せずに、epollとkqueueを切り替えられます。
libpio-0.0.2.tar.gz


3レーン2スレッド

実装は、極めて微妙な排他処理で成り立っています。3本の「レーン」の中で2本のスレッド(IOスレッドとイベントスレッド)を走らせることで、非同期IOを行います。


イベントスレッドはレーン0、IOスレッドはレーン1から動き始めます。イベントスレッドはmain()関数と同じスレッドで、プログラムの全体で2本のスレッドが動くことになります。
各レーンには、mutexロック、requestキュー、resultキューがあり、「スレッドがレーンnで動いている」とは、「スレッドがレーンnのmutexロックをロックしている」ことを意味します。

IOスレッドは、登録されているIOリクエストでイベントが起こるまで待ちます(今のところepollを使用。おそらくkqueueやselectにも移植可)。初期状態ではIOリクエストはIOスレッドに一つも登録されていないので、待ちぼうけることになります。*1


この状態(イベントスレッドはレーン0、IOスレッドはレーン1でイベント待ち)から、main()関数のスレッド(イベントスレッド)でIOリクエストを発行すると、イベントスレッドは自分が今動いているレーン(レーン0)のrequestキューにそのIOリクエストを追加します。そして、「スイッチ」します。
「スイッチ」は、今動いているレーン以外のレーン(2つある)のmutexロックをロックしようと試み(ロックできるまで繰り返す)、ロックできたら、元動いていたレーンのmutexロックを解除します。レーン3本に対してスレッドは2本なので、1回か2回の試行でロックできます。
IOスレッドがレーン1で動いていて、今イベントスレッドはレーン0で動いているので、レーン2にスイッチすることになります。レーン0のリクエストキューにはIOリクエストが入っています。
イベントスレッドはスイッチが完了すると、IOスレッドのイベント待ちを中断させます。そして、元動いていたレーンでもスイッチ先でもないレーン(つまりレーン1。IOスレッドが動いているレーン)をロックします。このロックはブロックします。


IOスレッドはイベント待ちが中断されると、他のスレッドに「スイッチ」します。イベントスレッドがレーン2で動いていて、今IOスレッドはレーン1で動いているので、レーン0にスイッチすることになります。スイッチが完了すると、スイッチした先のrequestキューを回収し、再度イベント待ちに入ります。
IOスレッドがレーン0にスイッチしたので、イベントスレッドはレーン1のロック(先ほどブロックしていたロック)を取得します。レーン1をロックしたので、レーン2のロックは解除します。イベントスレッドはレーン1で動くことになります。ロックが完了すると、スイッチ先のresultキューを読み込んで、結果が入っていたらそれを回収します。まだこの時点では入っていないので、もう一度元動いていたスレッドでもスイッチ先でもないレーン(レーン0 = IOスレッドが動いているレーン)をロックします。このロックはブロックします。
これでIOリクエストの登録までの流れは終わりです。


IOスレッドは、登録されたIOリクエストの中でイベントが発生すると、登録された内容でIOを行います。IOが完了したら、今動いているレーンのresultキューに結果を追加し、「スイッチ」します。ここでイベントスレッドがレーン1で動いていて、今IOスレッドはレーン0で動いているので、レーン2にスイッチすることになります。レーン0のresultキューにはIOリクエストの結果が入っています。
IOスレッドがレーン2にスイッチしたので、イベントスレッドはレーン0のロックを取得します。レーン0のresultキューにはIOリクエストの結果が入っているので、これを取り出します。



…実際にはもう少し込み入っていますが(何と言っても1つ間違うとデッドロックなので)、大方は以上の流れで1つのIOが完了します。何度もロックが行われますが、ブロックするロックはイベントスレッドがIOスレッドのIO完了を待つロックだけで、そもそもこれはどうしようもない待ち時間です。
IOスレッドで行われるIOは、イベントスレッドで別のことをやっている間も平行して進行するところがポイントです。つまり非同期IOになります。
IOの粒度が大きいほど、IOスレッドの「スイッチ」回数が減り、イベントスレッドが別のことをやっていられる時間も長くなるので、性能が向上するはずです。


使い方

仕組み反して使い方は簡単で、↓こんな感じです。標準入力からデータを読み込み、標準出力に出力するだけのプログラムです。

#include <libpio.h>   /* libpio */
#include <libpio/op/rw.h>  /* libpio_read, libpio_write_all */
#include <stdio.h>

#define BUFFER_SIZE (128*1024)

int main(void)
{
        struct libpio_context_t ctx;

        /* 初期化 */
        if( libpio_init(&ctx) < 0 ) {
                perror("libpio_init()");
                exit(1);
        }

        struct libpiocb iocb;
        struct libpio_vector* vec;  /* 結果が入る配列へのポインタ。初期化は不要 */
        void* buf;
        unsigned int i;

        /* 標準入力(0)から読み込む */
        buf = malloc(BUFFER_SIZE);
        libpio_read(&iocb, NULL, 0, buf, BUFFER_SIZE);  /* iocbに値をセット */
        libpio_push(&ctx, &iocb);                       /* iocbをキューに追加 */

        while(1) {
                vec = libpio_submit_wait(&ctx);  /* キューに追加されているIOを開始して、一つ以上のIOが終わるまで待つ */
                for(i=0; i < vec->length; ++i) {
                        if( vec->iocbs[i].user.ptr == NULL ) {
                                // readが終わった
                                if( vec->iocbs[i].nbytes == 0 ) { /* EOFまたはエラー */
                                        fprintf(stderr, "libpio_op_read(): %s",
                                                strerror(vec->iocbs[i].error));
                                        return 0;
                                }
                                /* 標準出力(1)にすべて書き出す */
                                libpio_write_all(&iocb, (void*)1, 1, vec->iocbs[i].buf.rbuf,vec->iocbs[i].nbytes);
                                libpio_push(&ctx, &iocb);
                                /* さらに標準入力(0)から読み込む */
                                buf = malloc(BUFFER_SIZE);
                                libpio_read(&iocb, NULL, 0, buf, BUFFER_SIZE);
                                libpio_push(&ctx, &iocb);
                        } else {
                                // writeが終わった
                                if( vec->iocbs[i].nbytes == 0 ) { /* EOFまたはエラー */
                                        fprintf(stderr, "libpio_op_write_all(): %s",
                                                strerror(vec->iocbs[i].error));
                                        return 0;
                                }
                                free(vec->iocbs[i].buf.rbuf);
                        }
                }
        }

        return 0;
}

何度もmalloc()/free()するなんとも効率の悪い例ですが、ぱっと思いつかなかったので…


struct libpiocbの定義は↓こうなっています。

union libpiocb_data_int {
        int fd;
        int32_t  i32;
        uint32_t u32;
};
union libpiocb_data {
        void* ptr;
        void* rbuf;
        const void* wbuf;
        struct {
                union libpiocb_data_int i1;
                union libpiocb_data_int i2;
        } ipack;
        uint64_t u64;
        off_t offset;
};

enum libpio_next_state {
        LIBPIO_NEXT_END,
        LIBPIO_NEXT_CONTINUE,
        LIBPIO_NEXT_CHANGE_EVENT,
};

struct libpiocb {
        int fd;
        int events;
        enum libpio_next_state (*operation)(struct libpiocb* req, struct libpiocb* nextcb);
        size_t count;
        size_t nbytes;
        int error;
        union libpiocb_data buf;
        union libpiocb_data din;
        union libpiocb_data dout;
        union libpiocb_data user;
};


実際のIO処理は、libpiocb構造体の中にoperationという関数ポインタがあることから分かるとおり、ここに任意のIOを行う関数をセットできます。
libpio自体の関数は、基本的に↓この4つだけです。

extern int libpio_init(struct libpio_context_t* ctx);
extern void libpio_exit(struct libpio_context_t* ctx);
extern int libpio_push(struct libpio_context_t* ctx, const struct libpiocb* cb);                                                      
extern struct libpio_vector* libpio_submit_wait(struct libpio_context_t* ctx);

それぞれ初期化関数、終了関数、IO要求を追加する関数、IOを開始して終わるのを待つ関数です。(libpio_push()した段階ではIOは開始されないので注意)


では先のテストプログラムで使ったlibpio_read()の定義はと言うと、↓こうなっています。

static inline void libpio_read(struct libpiocb* iocb, void* user, int fd, void* buf, size_t count)
{
        memset(iocb, 0, sizeof(*iocb));
        iocb->fd = fd; 
        iocb->count = count;
        iocb->events = LIBPIO_EVENT_READ;
        iocb->operation = libpio_op_read;
        iocb->buf.rbuf = buf;
        iocb->user.ptr = user; 
}

enum libpio_next_state libpio_op_read(struct libpiocb* req, struct libpiocb* nextcb)
{
        ssize_t sz = read(req->fd, ((char*)req->buf.rbuf) + req->nbytes, req->count - req->nbytes);
        if( sz < 0 ) {
                if( errno == EAGAIN || errno == EINTR ) {
                        return LIBPIO_NEXT_CONTINUE;
                } else {
                        req->error = errno;
                        return LIBPIO_NEXT_END;
                }
        } else {
                req->nbytes = sz;
                return LIBPIO_NEXT_END;
        }
}

libpio_read()でoperationにlibpio_op_readをセットしています。



operationは独自に作ることも可能で、そのときにはstruct libpiocbのfdフィールドに監視したいファイルディスクリプタを入れ、eventsフィールドに監視したいイベントの種類を入れます。その他のフィールドは自由に使えます(特に4つのunion libpiocb_dataは煮るなり焼くなり)。

まだテストしていないのでダメなのですが、sendfileは↓こんな感じで作れるはずです。

static inline void libpio_sendfile(struct libpiocb* iocb, void* user, int out_fd, int in_fd, off_t *offset, size_t count)
{
        memset(iocb, 0, sizeof(*iocb));
        iocb->fd = in_fd;
        iocb->count = count;
        iocb->events = LIBPIO_EVENT_READ;
        iocb->operation = libpio_op_sendfile;
        iocb->buf.ipack.i1.fd = out_fd;
        iocb->din.ptr = (void*)offset;
        iocb->user.ptr = user;
}

enum libpio_next_state libpio_op_splice(struct libpiocb* req, struct libpiocb* nextcb)
{
        if( req->events == LIBPIO_EVENT_READ ) {
                memcpy(nextcb, req, sizeof(struct libpiocb));
                nextcb->fd = req->buf.ipack.i1.fd;
                nextcb->buf.ipack.i1.fd = req->fd;
                nextcb->events = LIBPIO_EVENT_WRITE;
                return LIBPIO_NEXT_CHANGE_EVENT;
        } else {
                long sz = splice(req->buf.ipack.i1.fd, req->din.ptr, req->fd, req->dout.ptr,
                                req->count - req->nbytes, req->buf.ipack.i2.u32);
                if( sz < 0 ) {
                        if( errno == EAGAIN || errno == EINTR ) {
                                return LIBPIO_NEXT_CONTINUE;
                        } else {
                                req->error = errno;
                                return LIBPIO_NEXT_END;
                        }
                } else if( sz == 0 ) {
                        return LIBPIO_NEXT_END;
                } else {
                        req->nbytes += sz;
                        if( req->nbytes >= req->count )
                                return LIBPIO_NEXT_END;
                        else
                                return LIBPIO_NEXT_CONTINUE;
                }
        }
}

operationはIOスレッドで動作するのでイベントスレッドと変数を共有できない(ロックを使わないと共有できない)ところで注意が必要ですが、operationの中に2つのIOを入れてみたり、いろいろとIOの粒度を上げる手段があるはずです。
うまくイベントスレッド側と処理を分担できれば、マルチCPU/マルチコア環境では普通のシングルスレッドサーバーより性能が上がるはず。


ソースコードオープンソースなので、ぜひ試してみてください。(試したらぜひブログなどなどでレポートを!)

libpio-0.0.1.tar.gz
※2007/10/29追記:libpio-0.0.2.tar.gz

*1:実際にはイベントスレッドからIOスレッドのイベント待ちを中断させるために、pipeが1つ登録されています。