Ruby Rev + MessagePack による分散プログラム
ネットワークプログラミングで面倒なのが、通信相手にバイト列しか送れないためプロトコルをシリアライズ方法から考えないといけないかったり、複数のクライアントに対応しようとするとスレッドやらイベント駆動やらを考えないといけないところで、なかなか本質的なロジックの実装に手を付けられずもどかしくなります。
そのあたりの面倒な部分をまとめて良きに計らってくれるRPCライブラリを作っておくと良さそうです。
とはいえ、RPCレイヤーはネットワークプログラミングの中でも速度と信頼性に大きく影響する部分でもあります。実装を丸々隠蔽されてしまうと、後になってカスタマイズできずに困ったことになるかもしれません。
最初のプロトタイピングとして簡単に使えるが、カスタマイズも効くような柔軟性が欲しいところです。
そこでRubyの高速なイベント駆動IOライブラリであるRevと、バイナリベースのシリアライズ形式であるMessagePackを使って、プロトタイピングにも使えるがカスタマイズもできて高速に動作する汎用サーバー を書いてみました。
Rev is a high performance event library for Ruby built on top of libev.
Rev@RubyForge
MessagePackは性能を重視したバイナリベースのシリアライズ形式です。
MessagePack@SourceForge
$ gem install rev $ gem install msgpack
require 'rubygems' require 'msgpack' require 'rev' # MessagePackサーバー class MessagePackSocket < Rev::TCPSocket def initialize(*args) @buffer = '' @nread = 0 @mpac = MessagePack::Unpacker.new super end def on_read(data) @buffer << data while true @nread = @mpac.execute(@buffer, @nread) if @mpac.finished? msg = @mpac.data @mpac.reset @buffer.slice!(0, @nread) @nread = 0 on_message(msg) end break if @buffer.length <= @nread end end def send_message(msg) write msg.to_msgpack end end # MessagePackSocketを継承してサーバーを実装 class MessageEchoServer < MessagePackSocket def initialize(*args) super end def on_connect puts "#{remote_addr}:#{remote_port} connected" end def on_close puts "closed" end def on_message(msg) puts "receive message: #{msg.inspect}" send_message msg # そのまま送り返す end end # サーバーを起動 host = "0.0.0.0" port = 9090 server = Rev::TCPServer.new(host, port, MessageEchoServer) server.attach(Rev::Loop.default) Rev::Loop.default.run
新しいサーバーを実装したいときは、MessagePackSocketクラスを継承して、on_message(msg)メソッドを実装してやればOKです。
標準入力からYAMLを受け取って、MessagePackでシリアライズしてサーバーに送信するクライアントは↓こんな感じになります:
require 'rubygems' require 'msgpack' require 'yaml' require 'socket' sock = TCPSocket.new(*ARGV) Thread.start { buffer = '' nread = 0 mpac = MessagePack::Unpacker.new while true buffer << sock.sysread(1024) while true nread = mpac.execute(buffer, nread) if mpac.finished? msg = mpac.data mpac.reset buffer.slice!(0, nread) nread = 0 puts "received: #{msg.inspect}" end break if buffer.length <= nread end end } while line = $stdin.gets sock.write YAML.load(line).to_msgpack sock.flush end
試しに実行してみると↓こうなります:
サーバー:
$ ruby server.rb 127.0.0.1:58659 connected receive message: [1, 2, 3] receive message: {"a"=>true, "b"=>false} closed
クライアント:
$ ruby client.rb localhost 9090 [1, 2, 3] # キーボードから入力 received: [1, 2, 3] {a: true, b: false} # キーボードから入力 received: {"a"=>true, "b"=>false}