読者です 読者をやめる 読者になる 読者になる

MessagePack-RPC for C++ テクニカルプレビュー

バイナリシリアライズ形式 MessagePack をプロトコルに利用したRPCライブラリ MessagePack-RPC の、C++版を開発しています。

以前に MessagePack-RPC for Ruby について 54行で実装する分散KVS140行で作る分散リアルタイム検索エンジンを紹介しましたが、そのC++版です。


大まかな設計はRuby版と同じで、Ruby版と同じような使い勝手で利用できます。

しかしRuby版とは異なり、C++版では完全にマルチスレッドに対応しています。具体的には、マルチコア時代の高並列性IOアーキテクチャ Wavy を利用しています:


複数のスレッドでイベントループを共有しており、マルチスレッドでイベントハンドラを次々に処理していきます。
単純なイベント駆動I/Oと比べると、並列性が高いという利点があります。イベントハンドラの中で処理が多少ブロックしても、他のイベントハンドラは並列して動作し続けます。
単純なスレッドプールと比べると、スレッド間でデータ(タスク)を受け渡す必要がないという利点があります。遅延とCPU負荷が若干小さくなります。


MessagePack-RPC for C++では、この並列イベント駆動I/Oライブラリの上にRPCライブラリを構築しています:


この設計は、並列性が無いことを除けばRuby版も同じです。
イベント駆動I/Oのイベントハンドラで、MessagePackのデシリアライザにデータを読み込んでいき、メッセージを切り出していきます。MessagePackにはイベント駆動型のI/Oに適したストリームデシリアライザを実装してあるので、それを利用しています。


ここで、TCPコネクションRPCセッションというインタフェースの裏に隠蔽されているところがポイントです。
1つのRPCセッションは、複数のコネクションを持っていることもあるし、1本も持っていないこともあります。
アプリケーションでは、TCPコネクションを確立したり、TCPコネクションが切れたら再接続するといった処理を書く必要はありません。それはMessagePack-RPCの中で自動的に(かつ非同期で)行います。必要になったときに自動的に接続し、切断されたら再接続します。コネクションプーリングにも対応しています。


このコネクションを隠蔽するあたりのコードをどのように設計するかが、各種RPCライブラリの腕の見せ所になってきます。タイムアウトやエラー処理などの信頼性や、性能や並列性もこのあたりの設計に大きく影響されます。(ネットワークプログラミングで一番難しいところをRPCライブラリでやってくれるので、アプリケーションは簡単に)
MessagePack-RPCでは、非同期プロトコル(サーバーは要求された順番通りに応答を返す必要が無く、早く処理できた要求から返して良い)であるという利点を活かせるように、すべての処理を非同期・ノンブロックにすることで大量のホストや要求を同時に扱えるようにしています。またC++版では、マルチコアCPUの性能を引き出すために、多くの処理を並列して処理できるようにインタフェースを設計しています。


MessagePack-RPCのクライアントAPIでは、以下の3つの呼び出しパターンに対応しています:

  • callパターン
  • send/joinパターン
  • callbackパターン


callパターンは、ふつうのメソッド呼び出しのように、呼び出したら結果が返ってくるまで待つパターンです:

コードが読みやすくなり、メモリ管理も楽という利点がありますが、並列性はありません。


send/joinパターンは、並列していくつもRPC要求をサーバーに送り、後で結果を待ち受けるパターンです:

サーバーからの応答を待っている間に、並列してRPC要求を送信できます。


callbackパターンは、RPC呼び出し時にコールバック関数を登録しておいて、応答が返ってきたタイミングで呼ばれるようにするパターンです:

send/joinパターンと比べると、結果を受信した後の処理を並列して実行できる利点があります。
若干(だいぶ?)コードが見にくくなる欠点がありますが、並列性は高いです。



と、このような機能を実装した MessagePack-RPC for C++ のテクニカルプレビュー版をリリースしました:msgpack-rpc-0.0.0.tar.gz
コンパイルするには、MessagePack for C++ のバージョン0.3.9以上が必要です:http://sourceforge.jp/projects/msgpack/releases/


インタフェースはまだ変わるかも知れませんが、↓こんな感じで使えます:

  • クライアント側
#include <msgpack/rpc/client.h>

namespace rpc {
    using namespace msgpack;
    using namespace msgpack::rpc;
}  // namespace rpc

int main(void) {
    // クライアントを初期化
    rpc::loop clilo;
    rpc::client cli("127.0.0.1", 8080, &clilo);  // ブロックしない

    // 引数を作成
    // MessagePackでシリアライズできる任意の型を渡せる
    std::vector<std::string> arg;
    arg.push_back("MessagePack-RPC");
    arg.push_back("test");

    // send/join型のRPC呼び出し
    rpc::auto_async as = s->send("echo", arg);   // ブロックしない
    as->join();   // ここで初めてブロックする

    rpc::object ret = as->result();
    rpc::auto_zone z = as->release_zone();

    // call型のRPC呼び出し
    ret = cli.call("echo", arg, &z);
}
  • サーバー側
#include <msgpack/rpc/server.h>

namespace rpc {
    using namespace msgpack;
    using namespace msgpack::rpc;
}  // namespace rpc

// msgpack::rpc::dispatcher を継承したクラスを実装
class myecho : public rpc::dispatcher {       
public:
    typedef rpc::request request;           
    typedef rpc::response response;       
    typedef rpc::auto_zone auto_zone;      

    // dispatchメンバ関数を実装
    void dispatch(
            request req,
            response* res,
            auto_zone z)
    {
        switch(req.method().as<int>()) {
        case 0:
            echo(req, res, z);
            return;
        default:
            throw rpc::type_error();
        }
    }

    void echo(request req, response* res, auto_zone z)
    {
        // req.params() に引数が入っている
        res->result(req.params());    // MessagePackでシリアライズ可能な任意のオブジェクトを返せる
    }
};

int main(void) {
    rpc::loop svrlo;    // 並列イベント駆動I/Oループ
    rpc::server svr(&svrlo);  // サーバーのコンストラクタにイベントループを渡す

    std::auto_ptr<rpc::dispatcher> dp(new myecho);    // dispatcherを作る
    svr.listen("0.0.0.0", 8080, dp.get());    // listenする

    svrlo.start(8);    // 8スレッドで開始
    svrlo.join();
}


ソースコードの中のtest/ディレクトリに、実際に動作するテストコードも入っています(make checkで実行)。

C++にしては使いやすいインタフェースだと思っていますが、どうでしょうか。