読者です 読者をやめる 読者になる 読者になる

MessagePack-RPCのプロトコル仕様(ドラフト)と実装例

以前に書いた MessagePack RPC プロトコル は少し古くなってしまったので、ここでまとめ直しておこうと思います。


MessagePack-RPCのプロトコルは、非同期型の呼び出しができる点(非同期プロトコル)が大きな特徴となっています。複数のサーバが相互に通信し合ったり、通信と計算をオーバーラップさせて高速化を図ったりするような、高度なネットワークアプリケーションを実装しやすくなります。
RPCライブラリと言うよりも、シンプルな非同期メッセージングライブラリと言えます。


ここではMessagePack-RPCのプロトコル仕様と、以上の特徴を活かせるような実装例も合わせて紹介します。

基本仕様:MessagePack-RPC specification version 0.1

MessagePack-RPCのプロトコルは、メソッドの呼び出すRequestメッセージと、それに対するResponseメッセージからなります。Requestに対しては、Responseを返す必要があります。
またこれらに加えて、Notificationメッセージがあります。NotificationはRequestと似ていますが、Responseを返す必要がありません。

Requestメッセージ

Requestは、次の4つの要素からなる配列 [type, msgid, method, params] です:

type
必ず整数0です。このメッセージがRequestであることを示します。
msgid
シーケンス番号を示す最大 32bit の符号無し整数です。この手続き呼び出しに対応するResponseメッセージにも同じ値を入れます。
method
呼び出すメソッドを示す、最大 32bit の符号無し整数か、文字列です。
params
メソッドの引数を示す配列です。配列の要素は、任意のオブジェクトです。
Responseメッセージ

Responseは、次の4つの要素からなる配列 [type, msgid, error, result] です。

type
必ず整数1です。このがResponseメッセージであることを示します。
msgid
このResponseが対応しているRequestのシーケンス番号です。
error
メソッドが正常に実行された場合:nil メソッドがエラーだった場合:nil 以外の任意のオブジェクト
result
メソッドの返り値を示す、任意のオブジェクトです。エラーだった場合は普通は nil にするでしょうが、nil でなくても良いでしょう。
Notificationメッセージ

Requestは、次の3つの要素からなる配列 [type, method, params] です:

type
必ず整数2です。このメッセージがNotificationメッセージであることを示します。
method
呼び出すメソッドを示す、最大 32bit の符号無し整数か、文字列です。
params
メソッドの引数を示す配列です。配列の要素は、任意のオブジェクトです。
Responseの送信順序

Requestを受け取ったサーバーは、必ずしもRequestを受け取った順番通りにResponseを返す必要はありません。複数のRequestを受け取ったら、順不同で返すことができます。
言い換えれば、Requestを送ったあと、Responseを待たずに立て続けに別のRequestを送れるようにクライアントを実装した(=Requestのパイプライン化に対応した)場合は、どちらのRequestに対するResponseが先に返ってきても正常に動作するようにする必要があります。
と言っても、そもそもクライアントはRequestのパイプライン化に対応しなくても問題ありません。

非同期拡張仕様:MessagePack-RPC specification version 0.2

非同期呼び出しAPI

クライアントAPIがRequestメッセージのパイプライン化に対応したら、specification version 0.2 に昇格することにします。「このライブラリはMessagePack-RPC/0.2に対応しているよ」と書いてあると、「これを使えば非同期呼び出しができるんだなー」と目に見えて分かって便利かな、と思いました。

非同期呼び出しのAPIは、基本的には以下のようになると思います:

client = new Client(...);

Future f = client.call(...);  # ここで要求を送るか、送る準備をする

f.join();  # ここで応答を待つ
result = f.result();    # 返り値を取り出す

あるいは、複数の呼び出しを並列して行えるように:

client = new Client(...);

MultiFuture ff;
ff.add( client.call(...) );    # 2つの要求を並列して送る
ff.add( client.call(...) );

# 応答が1つ届くたびに処理を行いたい場合:
ff.each_result {|future|
    result = future.result();
}

# すべての応答を同時に待つ場合:
futures = ff.join_all();

コールバックしたい場合:

client = new Client(...);

Future f = client.call(...);  # ここで要求を送るか、送る準備をする

f.attach_callback( callback_function );    # 応答が返ってきたら、指定した関数が呼ばれる

実装例

真面目にRPCライブラリを実装しようと思うと、タイムアウト、コネクションプーリング、非同期などなど、考えるべき難しい問題がたくさんあって大変です。
そこでここでは、イベント駆動型のアーキテクチャでMessagePack-RPCを実装する例について少し紹介します。

イベントループ

イベント駆動型のアーキテクチャでは、プログラムはイベントループを中心にして駆動します。サーバの実装では、RPCライブラリがプログラム全体の駆動方法を決めることになるので、イベントループを直接叩いてタイマーなどイベントハンドラを追加できるように、機能は豊富にあったほうが便利そうです。
最近は各言語向けに高速なイベントループの実装があるようので、それを使うのが良さそうです:

Ruby
現状の実装では、Revを使っています。RevはlibevRubyバインディングです。ベンチマークテストによれば、libeventよりも高速に動作するケースがあるようです。
Python
TwistedTornadoが使えそうです。他にもあるかも
Java
NIOを直接使うか、Apache MINAあたりが使えそうです。
Perl
AnyEvent? AnyEvent::MPRPC - MessagePack RPC の Perl 実装
C++
mpio を使って実装を進めています。他に libevlibevent が使えそうです。
イベントハンドラ:MessagePackストリーム

イベントハンドラでは、ソケットからデータを読み込んで、オブジェクトを切り出していきます。
MessagePackのストリームデシリアライザを使えば、簡単に実装できるハズです。
静的型付け言語では、切り出されたオブジェクトを静的型に変換する必要があります。まずメッセージの先頭要素(つまりtype)だけを型変換して、REQUEST か RESPONSE か NOTIFY かを判定し、その後で後続の要素を静的型に変換すると良さそうです。

Session

コネクションプーリングや非同期呼び出しをサポートし、実装の詳細の隠蔽するために、セッションの概念を導入します。
MessagePackストリームは、Sessionを1つ持ちます。
Sessionは、0個か複数のMessagePackストリームを持ちます。(循環参照ができない言語では、こちらの参照は弱参照で。MessagePackストリームを閉じる=ソケットをcloseすると同時に、弱参照を削除する)
Sessionは、メッセージのシーケンス番号(msgid)からFuture(後述)を引く対応表を持ちます。
Sessionは、メソッドのディスパッチャ(nullable)を持ちます(サーバー用)。
Sessionは、宛先アドレス(nullable)とイベントループを持ちます(クライアント用)。


MessagePackストリームは、メッセージを受け取ったら、Sessionに渡します。
Sessionは、MessagePackストリームからメッセージを受け取ったら:

Responseなら
対応表からFutureを取り出して、Futureに結果とエラーをセットする。Futureにコールバック関数がattachされていれば、それを呼び出す
RequestかNotificationなら
メソッドと引数をディスパッチャに渡します。返り値の受けとり方については、RPC サーバの遅延リターン - steps to phantasien が参考になります。


クライアントは、Sessionのメソッドを呼び出して要求を送信します。
Sessionは、まず新しいmsgidとFutureを作成します。次に、msgidとFutureの対応を対応表に追加します。メッセージはMessagePackを使ってシリアライズします。
シリアライズしたデータは、MessagePackストリームの内の1つを使って送信します。もしSessionがMessagePackストリームを1つも持っていなかったら、新しいコネクションを確立し、イベントループに登録します。
ここで新しいコネクションを確立する操作は、イベントループを使って非同期になるように実装すると良さそうです。
ここで新しいコネクションを確立するとき、宛先アドレスがnullだったら、例外を投げます。

Client

Clientは、Sessionを継承します。
Sessionに加えて、後述のタイムアウト用の機能を実装します。

SessionPool

SessionPoolは、宛先アドレスからSessionを引く、連想配列を持ちます。
SessionPoolは、イベントループを持ちます。
複数のコンテキストでSessionを使い回せるようになるので、コネクションのプーリングを実現できます。

Server

Serverは、セッションプールを継承します。
新しいコネクションを受け付けたら、MessagePackストリームを作成して、イベントループに登録します。また、宛先アドレスがnullであるSessionを新しく作成し、そのSessionにMessagePackストリームを追加します。

Future

Futureは、まだ結果が返ってきていないRPC要求を表します。
メンバには、結果(nullable)、エラー(nullable)、イベントループ、コールバック関数(nullable)を持ちます。
結果を待ち受けて同期するには、結果かエラーがセットされるまで、イベントループを回します。
マルチスレッドで動作する場合は、相互排除が必要になります。

タイムアウトについて

Futureのメンバにカウンタを持たせます。
ClientやSessionPoolは、コンストラクタでイベントループにタイマーを登録し、一定間隔ごとにすべてのFutureのカウンタを1ずつ減らしていきます。
カウンタが0になったら、エラーに「タイムアウトエラー」をセットします。

リポジトリ

MessagePack-RPCのリポジトリは、http://github.com/msgpack/msgpack-rpc にあります。
現状のRuby版の実装は、確定していない仕様も実装していたりして若干読み辛いですが、メッセージングライブラリを実装するときの参考になるかもしれません。