MessagePack-RPCのプロトコル仕様(ドラフト)と実装例
以前に書いた MessagePack RPC プロトコル は少し古くなってしまったので、ここでまとめ直しておこうと思います。
MessagePack-RPCのプロトコルは、非同期型の呼び出しができる点(非同期プロトコル)が大きな特徴となっています。複数のサーバが相互に通信し合ったり、通信と計算をオーバーラップさせて高速化を図ったりするような、高度なネットワークアプリケーションを実装しやすくなります。
RPCライブラリと言うよりも、シンプルな非同期メッセージングライブラリと言えます。
ここではMessagePack-RPCのプロトコル仕様と、以上の特徴を活かせるような実装例も合わせて紹介します。
基本仕様:MessagePack-RPC specification version 0.1
MessagePack-RPCのプロトコルは、メソッドの呼び出すRequestメッセージと、それに対するResponseメッセージからなります。Requestに対しては、Responseを返す必要があります。
またこれらに加えて、Notificationメッセージがあります。NotificationはRequestと似ていますが、Responseを返す必要がありません。
Requestメッセージ
Requestは、次の4つの要素からなる配列 [type, msgid, method, params] です:
- type
- 必ず整数0です。このメッセージがRequestであることを示します。
- msgid
- シーケンス番号を示す最大 32bit の符号無し整数です。この手続き呼び出しに対応するResponseメッセージにも同じ値を入れます。
- method
- 呼び出すメソッドを示す、最大 32bit の符号無し整数か、文字列です。
- params
- メソッドの引数を示す配列です。配列の要素は、任意のオブジェクトです。
Responseメッセージ
Responseは、次の4つの要素からなる配列 [type, msgid, error, result] です。
- type
- 必ず整数1です。このがResponseメッセージであることを示します。
- msgid
- このResponseが対応しているRequestのシーケンス番号です。
- error
- メソッドが正常に実行された場合:nil メソッドがエラーだった場合:nil 以外の任意のオブジェクト
- result
- メソッドの返り値を示す、任意のオブジェクトです。エラーだった場合は普通は nil にするでしょうが、nil でなくても良いでしょう。
Notificationメッセージ
Requestは、次の3つの要素からなる配列 [type, method, params] です:
- type
- 必ず整数2です。このメッセージがNotificationメッセージであることを示します。
- method
- 呼び出すメソッドを示す、最大 32bit の符号無し整数か、文字列です。
- params
- メソッドの引数を示す配列です。配列の要素は、任意のオブジェクトです。
Responseの送信順序
Requestを受け取ったサーバーは、必ずしもRequestを受け取った順番通りにResponseを返す必要はありません。複数のRequestを受け取ったら、順不同で返すことができます。
言い換えれば、Requestを送ったあと、Responseを待たずに立て続けに別のRequestを送れるようにクライアントを実装した(=Requestのパイプライン化に対応した)場合は、どちらのRequestに対するResponseが先に返ってきても正常に動作するようにする必要があります。
と言っても、そもそもクライアントはRequestのパイプライン化に対応しなくても問題ありません。
非同期拡張仕様:MessagePack-RPC specification version 0.2
非同期呼び出しAPI
クライアントAPIがRequestメッセージのパイプライン化に対応したら、specification version 0.2 に昇格することにします。「このライブラリはMessagePack-RPC/0.2に対応しているよ」と書いてあると、「これを使えば非同期呼び出しができるんだなー」と目に見えて分かって便利かな、と思いました。
非同期呼び出しのAPIは、基本的には以下のようになると思います:
client = new Client(...); Future f = client.call(...); # ここで要求を送るか、送る準備をする f.join(); # ここで応答を待つ result = f.result(); # 返り値を取り出す
あるいは、複数の呼び出しを並列して行えるように:
client = new Client(...); MultiFuture ff; ff.add( client.call(...) ); # 2つの要求を並列して送る ff.add( client.call(...) ); # 応答が1つ届くたびに処理を行いたい場合: ff.each_result {|future| result = future.result(); } # すべての応答を同時に待つ場合: futures = ff.join_all();
コールバックしたい場合:
client = new Client(...); Future f = client.call(...); # ここで要求を送るか、送る準備をする f.attach_callback( callback_function ); # 応答が返ってきたら、指定した関数が呼ばれる
実装例
真面目にRPCライブラリを実装しようと思うと、タイムアウト、コネクションプーリング、非同期などなど、考えるべき難しい問題がたくさんあって大変です。
そこでここでは、イベント駆動型のアーキテクチャでMessagePack-RPCを実装する例について少し紹介します。
イベントループ
イベント駆動型のアーキテクチャでは、プログラムはイベントループを中心にして駆動します。サーバの実装では、RPCライブラリがプログラム全体の駆動方法を決めることになるので、イベントループを直接叩いてタイマーなどイベントハンドラを追加できるように、機能は豊富にあったほうが便利そうです。
最近は各言語向けに高速なイベントループの実装があるようので、それを使うのが良さそうです:
イベントハンドラ:MessagePackストリーム
イベントハンドラでは、ソケットからデータを読み込んで、オブジェクトを切り出していきます。
MessagePackのストリームデシリアライザを使えば、簡単に実装できるハズです。
静的型付け言語では、切り出されたオブジェクトを静的型に変換する必要があります。まずメッセージの先頭要素(つまりtype)だけを型変換して、REQUEST か RESPONSE か NOTIFY かを判定し、その後で後続の要素を静的型に変換すると良さそうです。
Session
コネクションプーリングや非同期呼び出しをサポートし、実装の詳細の隠蔽するために、セッションの概念を導入します。
MessagePackストリームは、Sessionを1つ持ちます。
Sessionは、0個か複数のMessagePackストリームを持ちます。(循環参照ができない言語では、こちらの参照は弱参照で。MessagePackストリームを閉じる=ソケットをcloseすると同時に、弱参照を削除する)
Sessionは、メッセージのシーケンス番号(msgid)からFuture(後述)を引く対応表を持ちます。
Sessionは、メソッドのディスパッチャ(nullable)を持ちます(サーバー用)。
Sessionは、宛先アドレス(nullable)とイベントループを持ちます(クライアント用)。
MessagePackストリームは、メッセージを受け取ったら、Sessionに渡します。
Sessionは、MessagePackストリームからメッセージを受け取ったら:
- Responseなら
- 対応表からFutureを取り出して、Futureに結果とエラーをセットする。Futureにコールバック関数がattachされていれば、それを呼び出す
- RequestかNotificationなら
- メソッドと引数をディスパッチャに渡します。返り値の受けとり方については、RPC サーバの遅延リターン - steps to phantasien が参考になります。
クライアントは、Sessionのメソッドを呼び出して要求を送信します。
Sessionは、まず新しいmsgidとFutureを作成します。次に、msgidとFutureの対応を対応表に追加します。メッセージはMessagePackを使ってシリアライズします。
シリアライズしたデータは、MessagePackストリームの内の1つを使って送信します。もしSessionがMessagePackストリームを1つも持っていなかったら、新しいコネクションを確立し、イベントループに登録します。
ここで新しいコネクションを確立する操作は、イベントループを使って非同期になるように実装すると良さそうです。
ここで新しいコネクションを確立するとき、宛先アドレスがnullだったら、例外を投げます。
Client
Clientは、Sessionを継承します。
Sessionに加えて、後述のタイムアウト用の機能を実装します。
SessionPool
SessionPoolは、宛先アドレスからSessionを引く、連想配列を持ちます。
SessionPoolは、イベントループを持ちます。
複数のコンテキストでSessionを使い回せるようになるので、コネクションのプーリングを実現できます。
Server
Serverは、セッションプールを継承します。
新しいコネクションを受け付けたら、MessagePackストリームを作成して、イベントループに登録します。また、宛先アドレスがnullであるSessionを新しく作成し、そのSessionにMessagePackストリームを追加します。
Future
Futureは、まだ結果が返ってきていないRPC要求を表します。
メンバには、結果(nullable)、エラー(nullable)、イベントループ、コールバック関数(nullable)を持ちます。
結果を待ち受けて同期するには、結果かエラーがセットされるまで、イベントループを回します。
マルチスレッドで動作する場合は、相互排除が必要になります。
タイムアウトについて
Futureのメンバにカウンタを持たせます。
ClientやSessionPoolは、コンストラクタでイベントループにタイマーを登録し、一定間隔ごとにすべてのFutureのカウンタを1ずつ減らしていきます。
カウンタが0になったら、エラーに「タイムアウトエラー」をセットします。
リポジトリ
MessagePack-RPCのリポジトリは、http://github.com/msgpack/msgpack-rpc にあります。
現状のRuby版の実装は、確定していない仕様も実装していたりして若干読み辛いですが、メッセージングライブラリを実装するときの参考になるかもしれません。