読者です 読者をやめる 読者になる 読者になる

MessagePack + Wavy で高速なRPCサーバーを書く「ccf」

先日、追記型オブジェクトストレージKastorを紹介しました。そこで「C++クラスタアプリケーションを書くためのフレームワーク(ccf; Cluster Communcation Framework)を実装中」などなど書きました。


最近C++でサーバープログラムを書くときには MessagePack(シリアライザ・デシリアライザ。プロトコルに利用)とmp::wavy(イベント駆動I/Oとスレッドプールを統合してうまく動かすライブラリ)という2つのライブラリを使うことが多い*1のですが、この2つを組み合わせるときにいつも似たようなコードを書いていたので、いっそまとめて1つのフレームワークのようにしたら便利そうだなーという動機でccfの開発を始めました。


…以下長々と書いていますが、論よりコード。このヘッダサーバーのサンプルクライアントのサンプル を見れば、何ができるのかは大体分かると思います。


一番特徴的なのはヘッダの中に入っているconnectionクラスでしょうか。これはイベントハンドラで、ファイルディスクリプタ1つに付き1つのインスタンスを作ります(ファイルディスクリプタがcloseされるタイミングでデストラクタが走ります)。
イベントハンドラをmp::wavyに登録すると、ファイルディスクリプタが読み込み可能になった時点で read_event() メンバ関数が呼ばれるようになります:

template <typename IMPL>
void connection<IMPL>::read_event()
{
    // バッファを確保
    m_pac.reserve_buffer(CONNECTION_BUFFER_RESERVATION_SIZE);

    // バッファにreadする
    ssize_t rl = ::read(fd(), m_pac.buffer(), m_pac.buffer_capacity());
    if(rl <= 0) {
        if(rl == 0) { throw mp::system_error(errno, "connection closed"); }
        if(errno == EAGAIN || errno == EINTR) { return; }
        else { throw mp::system_error(errno, "read error"); }
    }

    m_pac.buffer_consumed(rl);

    // デシリアライズする
    while(m_pac.execute()) {
        msgpack::object msg = m_pac.data();
        std::auto_ptr<msgpack::zone> z( m_pac.release_zone() );
        m_pac.reset();
        static_cast<IMPL*>(this)->process_message(msg, z);  // ここがポイント!
    }
}

(m_pacはMessagePackのストリームデシリアライザです:msgpack:cpp:doc.ja:ストリームデシリアライザ


connectionクラスはtemplateクラスで、CRTPを使っているのがポイントです。
最後に呼び出している process_message() 関数はconnectionクラスには実装されていません。connectionクラスを継承したクラスで実装します。このとき virtual を使わずに template と static_cast を使うことで、仮想関数呼び出しのオーバーヘッドを削減しています。


実際に届いたメッセージを処理する関数はサーバーのサンプルにあります:

class server : public connection<server> {
public:
    server(int fd) : connection<server>(fd) { }
    ~server() { }

    void process_message(msgpack::object msg, std::auto_ptr<msgpack::zone>& z)
    {
        std::cout << msg << std::endl;    // 届いたメッセージを表示

        msgpack::sbuffer sbuf;  // シリアライズ
        msgpack::pack(sbuf, msg);

        // そのまま送り返す(echoサーバー)
        ccf::net::send(fd(), sbuf.data(), sbuf.size(), &::free, sbuf.data());
        sbuf.release();
    }
};

このクラスでは実際にメッセージを処理するコードだけを書けばいいのがポイント! 例外も基底クラスで良きに処理してくれます。


ccf::net::sendは、非同期に(ブロックせずに)データを送信してくれるモジュールです。単純なバッファの他に、iovec(writev)やファイル(sendfile)も送れます。送信が完了すると登録しておいたコールバック関数を呼んでくれるので、そこでバッファを解放してやります。
これで MessagePackメッセージ版のechoサーバーができました。


このccfのサンプルコードをcloneしてコンパイルすると、このMessagePack echoサーバーとクライアントが作られます:

# サーバー側
$ ./src/logic/mpecho-server

# クライアント側
$ mpecho-client hello world
["./src/logic/mpecho-client", "hello", "world"]

$ mpecho-client this is test
["./src/logic/mpecho-client", "this", "is", "test"]


ccfはmp::wavyのほかに、ファイルディスクリプタをキーにして任意のオブジェクトをO(1)で引ける mp::sparse_array や、deleteをコールバック関数に変換する mp::object_delete 、イベント駆動I/O用の効率の良いバッファの実装である mp::stream_buffer など、個性的で高速なライブラリが同梱されているのが特徴です。

ccfのこれから

Ruby on Rails のように ccf /path/to/your/app とやると configure.in や Makefile.am のひな形が自動生成されて、src/logic/serverの下にソースコードを置くと自動的にMakefile.amが更新されて…という使い勝手を妄想しています^^;)

connectionクラスの上にはRPCセッションマネージャを実装する予定なのですが、うまくできた暁にはRPCスタブジェネレータも一緒に入れて ./script/generate rpc のような感じで使えたらいいなーと思っています。

ともあれ、共有ライブラリのような形にすることは無いと思います。ccfの大半がtemlpateになっていることも一因ですが、バイナリ互換性やソースコード互換性を保ったままバージョンアップしていく気が無いからです^^;) アプリケーションに応じてフレームワーク側も躊躇せず書き換えていく使い方を想定しています。
と言うのも完璧に汎用なフレームワークなんて作れないから、どうしたってアプリケーションごとにカスタマイズしたくなるよね、という発想です。

ccfのソースコードはgithubに置いてみました:http://github.com/frsyuki/ccf/tree/master

*1:と言うか毎回I/O部分からプロトコルまでぜんぶ自作することはないですよね…ライブラリに慣れると楽ちんなので、いつも使っています