並列メッセージングフレームワーク「MessagePack-RPC for C++」リリース
分散KVS kumofs のコードは、全体で約2万行です。
そのうち、ネットワークI/Oやプロトコルに関するコードは約1万行で、全体の約半分を占めています。
並列イベント駆動I/Oフレームワーク「mpio」リリース
ネットワークアプリケーションを実装する上で、もっとも大きな障壁は、ネットワークI/Oとプロトコルです。
では、それが両方ともフレームワークでサポートされ、コードを書く必要が無くなったらどうでしょうか?
54行で簡単な分散KVSを実装したり、140行で分散リアルタイム検索エンジンを実装することができます。すなわち、インデックス作成サーバ、検索サーバ、DBサーバなど、多数のサーバが連携し、スケールアウトの恩恵を得ることができるネットワークアプリケーションを、1台のホスト上で動作する並列アプリケーションとほぼ同じように書くことができます。
実装上の問題から解放されれば、並列性や耐障害性など、本質的なシステムの設計に集中することができます。
MessagePack-RPC for C++ は、このようにネットワークにまつわる複雑なコードの実装を一切不要にし、高速・高信頼な分散システムの量産を可能にすることを目指した、C++のメッセージングフレームワークです。
I/O戦略は、並列イベント駆動I/Oフレームワーク mpio を基盤としており、あらゆるメッセージに対する処理を自動的に並列化します(後述)。
また、その名前の通り、プロトコルには MessagePack を利用しています。MessagePackは多言語に対応した高速なシリアライズライブラリであり、異なる言語で実装されたプログラムの間でメッセージを交換することができます。
例えば、サーバをC++で実装し、管理ツールをRubyで実装することで、高い処理性能を実現しながら、管理タスクは効率的に自動化することができます。クライアントライブラリの実装に苦労する必要も無くなるでしょう。C++で書かれたサーバの機能を、Rubyから直接呼び出すことができます。
メッセージの交換には、同期・非同期・コールバックの3種類の方法をサポートしています。呼び出し処理、応答に対する処理、あるいはその他の計算処理をすべてオーバーラップさせ、並列に実行することで、処理時間を短縮することが可能です。
いずれもMessagePackのメモリ管理機構(zone)と統合されており、使い勝手と効率を高いレベルで両立しています。
MessagePack-RPC for C++ は、分散Key-valueストア kumofs の内部で利用していたRPCシステムをリファクタリングし、ライブラリとして使いようにインタフェースを設計し直したものです。実際のところ、kumofs時代のコードの跡形もありませんが、イベント駆動型でスケーラビリティの高い基本設計や、堅牢性の高いコネクションプーリングなどは健在です。
ここでは MessagePack-RPC for C++ の重要な要素である、並列性、Future、セッションプール そして 遅延リターン について、サンプルコードを交えながら具体的に紹介します。
MessagePack-RPC for C++ のリポジトリは msgpack-rpc@github に、ソースパッケージは SourceForge.net にあります。
並列性
MessagePack-RPC for C++ は、クライアント・サーバどちらの実装もイベント駆動型のI/O戦略を基盤としており、多数のホストと効率的に通信することができます。
また、1つのソケットに立て続けに届いたメッセージを含め、あらゆるメッセージに対する処理を並列に実行します:
この特性は、I/O戦略に Wavyアーキテクチャ を採用し、MessagePackのストリームデシリアライザを統合することで実現されています。MessagePack-RPC for C++ を利用することで、特別な工夫をすることなく、マルチコアCPUの性能を存分に引き出すことが可能です。
さらに、複数のサーバを段階的に連ね、縦横に並べることで、極めて高い並列処理性能を得ることができます:
MessagePack-RPC for C++ のFuture(後述)や遅延リターン(後述)などの特性を利用することで、このようなスケールアウト型のアーキテクチャを簡単に実現できます。
ここまでは特にサンプルコードはありません。普通に MessgePack-RPC for C++ を使ってプログラムを書けば、こうなります。MessagePack-RPCの目的は、高度なネットワークプログラミングの知識を定式化し、高速・高信頼な分散システムの量産を可能にすることにあります。
続いてサンプルコードを交えながら、Future、セッションプール、遅延リターンについて紹介します。
Future
Futureは、まだ値が返ってきていない、未来の値を表します。この値をオブジェクトとして扱えるようにすることで、非同期型のメッセージングを簡単に行えるようにしています。
例えば、以下のようにFutureを利用することで、2つの呼び出しを並列して行うことができます:
#include <msgpack/rpc/client.h> int main(void) { msgpack::rpc::client c("127.0.0.1", 9090); // 2つのFutureを作成 msgpack::rpc::future f1 = c.call("add", 1, 2); msgpack::rpc::future f2 = c.call("add", 2, 3); // 結果を待ち受ける int result1 = f1.get<int>(); int result2 = f2.get<int>(); }
また、Futureにはコールバック関数を設定できます。
コールバック型の呼び出しは、並列化にもっとも適しています。呼び出し処理だけでなく、返ってきた結果に対する処理も並列して実行することが可能です。
#include <msgpack/rpc/client.h> void myfunc(msgpack::rpc::future f) { /* 応答が返ってきたら呼ばれる */ } int main(void) { msgpack::rpc::client c("127.0.0.1", 8080); // 2つのFutureを作成し、コールバック関数を設定 msgpack::rpc::future f1 = c.call("add", 1, 2).attach_callback(myfunc); msgpack::rpc::future f2 = c.call("add", 2, 3).attach_callback(myfunc); c.get_loop()->start(4); // 4スレッドで起動 f1.join(); f2.join(); }
もちろん、同期型の呼び出しもできます:
#include <msgpack/rpc/client.h> int main(void) { msgpack::rpc::client c("127.0.0.1", 9090); int result = c.call("add", 1, 2).get<int>(); }
セッションプール
MessagePack-RPC for C++は、マトモに使えるメッセージングライブラリです。すなわち、タイムアウトやコネクションプーリングを真面目に実装しています。
さらに驚きを禁じ得ないことに、プールされたコネクションは複数のスレッドで共有することができます。
次のコードは合法であり、コネクションは1本しか張られません:
#include <msgpack/rpc/client.h> #include <msgpack/rpc/session_pool.h> #include <mp/pthread.h> void thread_1(msgpack::rpc::session_pool* sp) { // セッションプールからセッションを取り出す msgpack::rpc::session s = sp->get_session("127.0.0.1", 9090); int result = s.call("add", 1, 2).get<int>(); } void thread_2(msgpack::rpc::session_pool* sp) { // セッションプールからセッションを取り出す msgpack::rpc::session s = sp->get_session("127.0.0.1", 9090); int result = s.call("add", 2, 3).get<int>(); } int main(void) { // 共有セッションプール msgpack::rpc::session_pool sp; sp.start(4); // session_poolを4スレッドで起動 // スレッドを2つ起動 mp::pthread_thread t1; mp::pthread_thread t2; t1.run( mp::bind(thread_1, &sp) ); t2.run( mp::bind(thread_2, &sp) ); t1.join(); t2.join(); sp.end(); }
マルチスレッドで動作するイベント駆動型のアプリケーションでは、色々なスレッドが、各々のタイミングで、様々なホストと、勝手気ままに通信したくなります。このセッションプールを利用すれば、それらのケースで毎度毎度コネクションを張り直す必要はありません。
これによって遅延を短縮できるほかに、負荷の高い分散アプリケーションでありがちな「ポート番号が足りなくなる問題」を防ぐことができます。
遅延リターン
サーバ同士が互いに通信するP2P型のアプリケーションでは、クライアントから要求を受け取った後、別のサーバに要求を中継し、その応答を待ってからクライアントに応答を返したいことがあります。
このとき、中継先のサーバから応答を待つ間にスレッドをブロックさせておくと、クライアントの数に応じてスレッドの数を増やさなければならないため、効率が落ちます。
そこで MessagePack-RPC for C++ では、応答の返却をいったん遅延させ、別のイベントが到着したときに返却することができます:
#include <msgpack/rpc/server.h> #include <msgpack/rpc/client.h> using namespace mp::placeholders; namespace rpc { using namespace msgpack::rpc; } // サーバの実装 class myserver : public rpc::dispatcher { public: // 中継先のサーバから応答が返ってきたら呼ばれる static void callback(rpc::future f, rpc::request req) { req.result(f.get<int>()); // ここで応答を返す } // クライアントから要求を受け取ったら呼ばれる void dispatch(rpc::request req) { std::string method = req.method().as<std::string>(); msgpack::type::tuple<int, int> params(req.params()); // 別のサーバに処理を中継 rpc::session s = m_sp.get_session("127.0.0.1", 8080); // コールバック呼び出し rpc::future f = s.call(method, params.get<0>(), params.get<1>()); f.attach_callback( mp::bind(&callback, _1, req) ); // ここでは応答を返さない } rpc::server& listen(const std::string& host, uint16_t port) { m_svr.serve(this); m_svr.listen(host, port); return m_svr; } public: myserver() : m_sp(m_svr.get_loop()) { } // イベントループを共有 private: msgpack::rpc::server m_svr; msgpack::rpc::session_pool m_sp; }; int main(void) { myserver s; s.listen("0.0.0.0", 9090).start(4); // サーバを4スレッドで起動 // クライアントから要求を発行 msgpack::rpc::client c("127.0.0.1", 9090); int result = c.call("add", 1, 2).get<int>(); }
MessagePack-RPCを始めよう
MessagePack-RPC + Ruby は快適すぎて泣ける
http://twitter.com/frsyuki/statuses/12921999611
まぁ涙を拭けよ (´・ω・`)つ~ RT @frsyuki: なんか C++ で果てしなく苦労していた実装が、Ruby(とMessagePack-RPC)を使ったら超カンタンにできた…泣けた。
http://twitter.com/syuu1228/status/6366413523
MessagePack-RPC for Rubyでカスタム型を扱うのはこんな感じ。超簡単過ぎて泣けて来る。プロトタイピングの速度がやばい。 > http://gist.github.com/380979
http://twitter.com/kzk_mover/status/12954942320
というわけで、まずRuby版でプロトタイプを実装し、C++に移植していく流れがオススメです。
Ruby版はgemで簡単にインストールできます:
$ gem install msgpack-rpc
簡単なサーバの実装
require 'msgpack/rpc' class MyServer def initialize @kvs = {} end def set(key, val) @kvs[key] = val nil end def get(key) @kvs[key] end end svr = MessagePack::RPC::Server.new svr.listen "0.0.0.0", 9191, MyServer.new svr.run
簡単なクライアントの実装
require 'msgpack/rpc' cli = MessagePack::RPC::Client.new("127.0.0.1", 9191) cli.call(:set, "k1", "v1") cli.call(:set, "k2", "v2") p cli.call(:get, "k1") #=> "v1" p cli.call(:get, "k2") #=> "v2"
このように、Ruby版は泣けるほど簡単に使える一方で、非同期呼び出しや遅延リターンもサポートした、フル機能の実装です。
C++版とRuby版は同じプロトコルなので、クライアントはRubyのまま、サーバだけをC++で実装することもできます。
MessagePack-RPCの未来
IDLやコードジェネレータ、Java版などなど、活発に開発が進行中です。
ぜひ msgpack-rpc@github をWatch/Forkしてみてください。