MessagePack + Wavy で高速なRPCサーバーを書く「ccf」
先日、追記型オブジェクトストレージKastorを紹介しました。そこで「C++でクラスタアプリケーションを書くためのフレームワーク(ccf; Cluster Communcation Framework)を実装中」などなど書きました。
最近C++でサーバープログラムを書くときには MessagePack(シリアライザ・デシリアライザ。プロトコルに利用)とmp::wavy(イベント駆動I/Oとスレッドプールを統合してうまく動かすライブラリ)という2つのライブラリを使うことが多い*1のですが、この2つを組み合わせるときにいつも似たようなコードを書いていたので、いっそまとめて1つのフレームワークのようにしたら便利そうだなーという動機でccfの開発を始めました。
…以下長々と書いていますが、論よりコード。このヘッダ と サーバーのサンプル と クライアントのサンプル を見れば、何ができるのかは大体分かると思います。
一番特徴的なのはヘッダの中に入っているconnectionクラスでしょうか。これはイベントハンドラで、ファイルディスクリプタ1つに付き1つのインスタンスを作ります(ファイルディスクリプタがcloseされるタイミングでデストラクタが走ります)。
イベントハンドラをmp::wavyに登録すると、ファイルディスクリプタが読み込み可能になった時点で read_event() メンバ関数が呼ばれるようになります:
template <typename IMPL> void connection<IMPL>::read_event() { // バッファを確保 m_pac.reserve_buffer(CONNECTION_BUFFER_RESERVATION_SIZE); // バッファにreadする ssize_t rl = ::read(fd(), m_pac.buffer(), m_pac.buffer_capacity()); if(rl <= 0) { if(rl == 0) { throw mp::system_error(errno, "connection closed"); } if(errno == EAGAIN || errno == EINTR) { return; } else { throw mp::system_error(errno, "read error"); } } m_pac.buffer_consumed(rl); // デシリアライズする while(m_pac.execute()) { msgpack::object msg = m_pac.data(); std::auto_ptr<msgpack::zone> z( m_pac.release_zone() ); m_pac.reset(); static_cast<IMPL*>(this)->process_message(msg, z); // ここがポイント! } }
(m_pacはMessagePackのストリームデシリアライザです:msgpack:cpp:doc.ja:ストリームデシリアライザ)
connectionクラスはtemplateクラスで、CRTPを使っているのがポイントです。
最後に呼び出している process_message() 関数はconnectionクラスには実装されていません。connectionクラスを継承したクラスで実装します。このとき virtual を使わずに template と static_cast を使うことで、仮想関数呼び出しのオーバーヘッドを削減しています。
実際に届いたメッセージを処理する関数はサーバーのサンプルにあります:
class server : public connection<server> { public: server(int fd) : connection<server>(fd) { } ~server() { } void process_message(msgpack::object msg, std::auto_ptr<msgpack::zone>& z) { std::cout << msg << std::endl; // 届いたメッセージを表示 msgpack::sbuffer sbuf; // シリアライズ msgpack::pack(sbuf, msg); // そのまま送り返す(echoサーバー) ccf::net::send(fd(), sbuf.data(), sbuf.size(), &::free, sbuf.data()); sbuf.release(); } };
このクラスでは実際にメッセージを処理するコードだけを書けばいいのがポイント! 例外も基底クラスで良きに処理してくれます。
ccf::net::sendは、非同期に(ブロックせずに)データを送信してくれるモジュールです。単純なバッファの他に、iovec(writev)やファイル(sendfile)も送れます。送信が完了すると登録しておいたコールバック関数を呼んでくれるので、そこでバッファを解放してやります。
これで MessagePackメッセージ版のechoサーバーができました。
このccfのサンプルコードをcloneしてコンパイルすると、このMessagePack echoサーバーとクライアントが作られます:
# サーバー側 $ ./src/logic/mpecho-server # クライアント側 $ mpecho-client hello world ["./src/logic/mpecho-client", "hello", "world"] $ mpecho-client this is test ["./src/logic/mpecho-client", "this", "is", "test"]
ccfはmp::wavyのほかに、ファイルディスクリプタをキーにして任意のオブジェクトをO(1)で引ける mp::sparse_array
ccfのこれから
Ruby on Rails のように ccf /path/to/your/app とやると configure.in や Makefile.am のひな形が自動生成されて、src/logic/serverの下にソースコードを置くと自動的にMakefile.amが更新されて…という使い勝手を妄想しています^^;)
connectionクラスの上にはRPCセッションマネージャを実装する予定なのですが、うまくできた暁にはRPCスタブジェネレータも一緒に入れて ./script/generate rpc のような感じで使えたらいいなーと思っています。
ともあれ、共有ライブラリのような形にすることは無いと思います。ccfの大半がtemlpateになっていることも一因ですが、バイナリ互換性やソースコード互換性を保ったままバージョンアップしていく気が無いからです^^;) アプリケーションに応じてフレームワーク側も躊躇せず書き換えていく使い方を想定しています。
と言うのも完璧に汎用なフレームワークなんて作れないから、どうしたってアプリケーションごとにカスタマイズしたくなるよね、という発想です。
ccfのソースコードはgithubに置いてみました:http://github.com/frsyuki/ccf/tree/master
*1:と言うか毎回I/O部分からプロトコルまでぜんぶ自作することはないですよね…ライブラリに慣れると楽ちんなので、いつも使っています