読者です 読者をやめる 読者になる 読者になる

並列イベント駆動I/Oフレームワーク「mpio」リリース

分散KVS kumofs のコードは、全体で約2万行です*1
そのうち、ネットワークI/Oやプロトコルに関するコードは約1万行*2で、全体の約半分を占めています。
ロジックは残りの半分*3だけで実装されています。


この実例から分かりますが、kumofsのような分散アプリケーションを開発するにはI/O周りの実装が大変で、とてつもなく大きな障壁になっています。*4
さらに今日では、性能を稼ぐためにマルチスレッド化が必須です。また、多数のクライアントを少ないリソースで効率よく相手にするには、非同期・イベント駆動型アーキテクチャも必要になります。さらに、究極的な性能を達成すべく GC を利用しない C++ においては、実装のみならず設計も大変です。
これに加えてソケットAPIの難解な挙動に対処にしなければならないため、C言語C++によるネットワークプログラミングは、vimの使いこなしなどと同様に、過酷な「修行」を必要とする分野だったと言えます。


mpio(エム・ピー・アイ・オー)は、以上のような問題の解決を目的とした、C++のライブラリです。
ソケットとに紐付けられたイベントハンドラ、バッファリング、非同期化されたwritevやsendfile、非同期connect、シグナル、タイマーなど、ネットワークプログラミングで共通して必要になる機能を簡単に利用することができます。
これらの機能は Wavyアーキテクチャ(後述)を基盤としており、完全にマルチスレッド化されています。イベントループの実装にはepollとkqueueを利用することができ、コンパイル時に自動的に選択されます。
以上の機能に加えて、マルチスレッドプログラミングで必要となる様々なユーティリティを実装しています。


以前まで Cagrakumofs の内部で利用し、開発を継続してきたmpioライブラリを、このたび独立したライブラリとしてリリースしました。
ここではmpioで実装している並列処理のアーキテクチャを紹介し、最後にサンプルコードを紹介します。
mpioソースコードmpio@githubDownloadsmpio-X.Y.Z.tar.gz)にあります。

Wavyアーキテクチャ

Wavyアーキテクチャは、1つのイベントループを複数のスレッドで共有し、続けざまに処理していくI/O方式です。
以前に マルチコア時代の高並列性IOアーキテクチャ WavyWEB+DB PRESS Vol.55 でも紹介していますが、別の図で紹介してみようと思います(クリックで拡大)

この図のように、複数のスレッドが順々にイベントループを回していきます。あるスレッドがイベントハンドラを実行している最中に、別のスレッドが イベント待ち→イベントハンドラを実行 を並列して行うことができます。
通常のイベント駆動型のアーキテクチャでは、イベントハンドラを実行している最中に イベント待ち→イベントハンドラを実行 を並列して行うことができないので、イベントハンドラがブロックしていると新しいイベントを受け付けられなくなってしまいます。Wavyでは、スレッドの本数が足りる限り並列してイベントを処理することができます。


mpioライブラリは、このWavyアーキテクチャを実装しています。ソケット1つに対してイベントハンドラを1つ割り当てて、ソケットにデータが届くとイベントハンドラが呼ばれます。また、タイマーやシグナルもイベントとして扱うことができ、普通の関数もイベントとして登録できます(=スレッドプールとして使えます*5)。
さらに実装上の工夫として、1つのソケットに立て続けに届いたメッセージも、並列して処理できるようになっています(クリックで拡大)



図のように旧バージョンの実装では、同じソケットに立て続けに届いたメッセージは並列して処理できませんでした。普通にイベント駆動型のアーキテクチャを実装するとこうなります。
新バージョン(現行バージョン)の実装では、これも並列して処理できます。メッセージに依存関係がなければ、並列して処理することで処理時間を短縮できます。

サンプルコード

シグナル

突然地味ですが、シグナルは極めて厄介な存在です。困ったことにマルチスレッドとシグナルハンドラの相性は最悪であり、シグナルハンドラの中でmutexを使うとデッドロックする可能性があります(参考:UNIX上でのC++ソフトウェア設計の定石 (2))。
mpioではちょっとした工夫*6を実装しており、シグナルを安全に扱うことができます。プログラムでは単にハンドラを登録するだけで、「非同期シグナルセーフ」などの厄介な問題を考えずにシグナルを扱えます:

#include <mp/wavy.h>         // mp::wavyを使う
#include <mp/functional.h>   // mp::bind, mp::functionを使う
#include <sys/types.h>
#include <unistd.h>
#include <stdlib.h>

// ^Cが押されるたびに呼ばれる
bool signal_handler(int* count)
{
    // 非同期シグナルセーフでない関数も呼べる
    std::cout << "count = " << *count << std::endl;
    (*count)++;
    return true;
}

int main(void)
{
    using namespace mp::placeholders;

    // イベントループを作成
    mp::wavy::loop lo;

    // シグナルハンドラを登録
    int count = 0;
    lo.add_signal(SIGINT, mp::bind(  // 関数に変数をbindする
                &signal_handler, &count));

    // 4スレッドでイベントループを開始
    lo.run(4);
}

このプログラムでは、SIGINTが届く(^Cを押す)たびにカウンタを1ずつインクリメントしていきます。


最後の lo.run(4) に注目してください。これで4本のスレッドが起動し、イベントループが動作します。
なお、mpioはもはやマルチスレッドに特化しています。シングルスレッドでは動作できない点に注意してください。2スレッド以上は必須です。

タイマー

シグナルと同様に、タイマーも簡単に扱うことができます。
ここではタイマーが3回発火したら lo->end() を呼び出して、イベントループを止めるようにしました:

bool timer_handler(int* count, mp::wavy::loop* lo)
{
    std::cout << "count = " << *count << std::endl;
    (*count)++;

    if(*count >= 3) {
        lo->end();    // タイマーが3回呼ばれたらプログラムを終了
        return false;
    } else {
        return true;
    }
}

int main(void)
{
    using namespace mp::placeholders;

    // イベントループを作成
    mp::wavy::loop lo;

    // タイマーを登録
    int count = 0;
    lo.add_timer(0.5, 1.0, mp::bind(  // 0.5秒後から1.0秒間隔で
                &timer_handler, &count, &lo));

    // 4スレッドでイベントループを開始
    lo.run(4);
}
非同期connect

connect(2)を非同期で行うことができます。タイムアウト付きのconnectは自前で実装すると非常に面倒ですが、mpioを使えば簡単に使えます。
シグナルやタイマーと同様に、connectが完了したらコールバック関数が呼び出されます:

// connectが完了したら呼ばれる
void connected(int fd, int err)
{
    if(fd < 0) {
        perror("connect error");
        return;
    }

    try {
        std::cout << "connected" << std::endl;
        // fdを使って何かする...

    } catch (...) {
        close(fd);
        throw;
    }
}

int main(void)
{
    mp::wavy::loop lo;

    struct sockaddr_in addr;
    memset(&addr, 0, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = inet_addr("127.0.0.1");
    addr.sin_port = htons(9090);

    // 非同期connect
    lo.connect(PF_INET, SOCK_STREAM, 0,
            (struct sockaddr*)&addr, sizeof(addr),
            1.0, connected);  // タイムアウトは1.0秒

    lo.run(4);
}
イベント駆動read・非同期write

先のコードはソケットを接続した段階で終わっています。ソケットをイベントループに登録すると、データが届いたときにイベントハンドラが呼ばれるようになります。これでイベント駆動型のreadを行うことができます。
イベント駆動型のアーキテクチャでは、処理が長くブロックすると並列性が落ち、性能が出ません*7。そこでイベント駆動型のアーキテクチャでは、往々にしてwriteを非同期化することで性能の向上を図ります。mpioを使うと、writeを(writevやsendfileも)簡単に非同期化することができます:

// ソケットに紐付けられるイベントハンドラ
// mp::wavy::handlerを継承
class myhandler : public mp::wavy::handler {
public:
    myhandler(int fd, mp::wavy::loop* lo) :
        mp::wavy::handler(fd),
        m_loop(lo) { }

    struct mybuffer {
        char data[512];
    };

    // ソケットにデータが届いたら呼ばれる
    void on_read(mp::wavy::event& e)
    {
        // バッファにデータを読み込む
        std::auto_ptr<mybuffer> buf(new mybuffer());
        ssize_t len = read(fd(), buf->data, sizeof(buf->data));

        if(len <= 0) {
            std::cout << "closed" << std::endl;
            e.remove();
            return;
        }

        // 非同期write = ブロックしない
        // bufはwriteが完了したら自動的に解放される
        m_loop->write(fd(), buf->data, len, buf);
    }

private:
    mp::wavy::loop* m_loop;
};

// acceptしたら呼ばれる
void accepted(int fd, int err, mp::wavy::loop* lo)
{
    if(fd < 0) {
        perror("accept error");
        lo->end();
        return;
    }

    try {
        std::cout << "accepted" << std::endl;

        // イベントハンドラを登録する
        // (コンストラクタには可変長の引数を渡せる)
        lo->add_handler<myhandler>(fd, lo);

    } catch (...) {
        close(fd);
        throw;
    }
}

int main(void)
{
    using namespace mp::placeholders;

    mp::wavy::loop lo;

    struct sockaddr_in addr;
    memset(&addr, 0, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = htonl(INADDR_ANY);
    addr.sin_port = htons(9090);  // 9090番ポートで待ち受け

    lo.listen(PF_INET, SOCK_STREAM, 0,
            (struct sockaddr*)&addr, sizeof(addr),
            mp::bind(&accepted, _1, _2, &lo));

    lo.run(4);
}

writeの実際の処理は mp::wavy::loop の内部に隠蔽されていますが、その実体は、マルチスレッド化されたイベント駆動型のアーキテクチャになっています。複数のホストに同時にデータを送ると、並列して処理されます。
writeを非同期化すると、バッファを解放するタイミングが難しくなります。これに対してmpioでは、writeメソッドにstd::auto_ptrやmp::shared_ptrを渡せるようになっています。writeが完了するとこれらの参照が解除され、メモリが解放されます。


サンプルコードの中で何度か出てきましたが、mp::bind や mp::function、mp::unordered_map など、コンパイラが先行実装している C++0x の機能を利用できるように抽象化するヘッダも含んでいます。

mpioのインストール

mpio@githubDownloads *8 の中から最新のパッケージをダウンロードし、インストールしてください:

$ ./configure
$ make
$ sudo make install

mpioライブラリを利用するには、mp::wavyを使う場合は libmpio をリンクしてください。他のユーティリティライブラリは、ヘッダをincludeするだけで利用できます。


mpioのリリース情報は、このブログで順次お知らせしていく予定です。

現状、ドキュメントはほとんどありません。kumofs や MessagePack もそうですが、ドキュメントを書いたよーというお知らせは常に募集しています^^;


※2010-04-13追記:
mpiolibevを使って同じプログラムを実装したサンプルコードをアップロードしました:

*1:git ls-files | grep -E '\.(cc|h)$' | xargs wc -l #=> 22,030行。2010-04-12現在

*2:git ls-files src/mp src/mpsrc src/rpc src/gate | grep -E '\.(cc|h)$' | xargs wc -l #=> 9,931行。

*3:kumofsは、I/Oとロジックを強く分離した形で実装しています。MessagePackの動的型付けの性質を利用することで、I/Oとロジックの疎結合化を実現。

*4:Erlangはいいですねぇ。

*5:普通は、I/Oイベント待ち(=epollやkqueue)とタスク待ち(=pthread_cond待ち)を混ぜて使うことはできません。epollにpipeを1つ喰わせておくなど、無駄の多い細工が必要です。mpioではもはやシングルスレッドを捨て、ロックやフラグを注意深く操作することで、pipeを使わずにI/Oイベント待ちとタスク待ちを混ぜることに成功しています。

*6:signalfd+epollまたはkqueueを使ってシグナルをハンドリングします。

*7:Wavyアーキテクチャでは多少長くブロックしても他の処理には影響は与えないのですが、長くブロックしすぎるとスレッドの数が足りなくなって応答速度が低下してしまいます。長くブロックする「可能性」のあるコードが入っていると、特に障害が発生した場合などに応答速度が低下し、プログラムの堅牢性が低下してしまいます。writeは、送信先のホストに障害が発生したときに長くブロックする可能性があります

*8:名前付きのリストとTagsのリストがありますが、上の方の名前付きのリストからダウンロードしてください。