読者です 読者をやめる 読者になる 読者になる

Ruby Rev + MessagePack による分散プログラム

ネットワークプログラミングで面倒なのが、通信相手にバイト列しか送れないためプロトコルをシリアライズ方法から考えないといけないかったり、複数のクライアントに対応しようとするとスレッドやらイベント駆動やらを考えないといけないところで、なかなか本質的なロジックの実装に手を付けられずもどかしくなります。
そのあたりの面倒な部分をまとめて良きに計らってくれるRPCライブラリを作っておくと良さそうです。


とはいえ、RPCレイヤーはネットワークプログラミングの中でも速度と信頼性に大きく影響する部分でもあります。実装を丸々隠蔽されてしまうと、後になってカスタマイズできずに困ったことになるかもしれません。
最初のプロトタイピングとして簡単に使えるが、カスタマイズも効くような柔軟性が欲しいところです。


そこでRubyの高速なイベント駆動IOライブラリであるRevと、バイナリベースのシリアライズ形式であるMessagePackを使って、プロトタイピングにも使えるがカスタマイズもできて高速に動作する汎用サーバー を書いてみました。

Rev is a high performance event library for Ruby built on top of libev.
Rev@RubyForge

MessagePackは性能を重視したバイナリベースのシリアライズ形式です。
MessagePack@SourceForge

$ gem install rev
$ gem install msgpack
require 'rubygems'
require 'msgpack'
require 'rev'

# MessagePackサーバー
class MessagePackSocket < Rev::TCPSocket
  def initialize(*args)
    @buffer = ''
    @nread = 0
    @mpac = MessagePack::Unpacker.new
    super
  end

  def on_read(data)
    @buffer << data

    while true
      @nread = @mpac.execute(@buffer, @nread)

      if @mpac.finished?
        msg = @mpac.data

        @mpac.reset
        @buffer.slice!(0, @nread)
        @nread = 0

        on_message(msg)
      end

      break if @buffer.length <= @nread
    end
  end

  def send_message(msg)
    write msg.to_msgpack
  end
end

# MessagePackSocketを継承してサーバーを実装
class MessageEchoServer < MessagePackSocket
  def initialize(*args)
    super
  end

  def on_connect
    puts "#{remote_addr}:#{remote_port} connected"
  end

  def on_close
    puts "closed"
  end

  def on_message(msg)
    puts "receive message: #{msg.inspect}"
    send_message msg  # そのまま送り返す
  end
end

# サーバーを起動
host = "0.0.0.0"
port = 9090

server = Rev::TCPServer.new(host, port, MessageEchoServer)
server.attach(Rev::Loop.default)

Rev::Loop.default.run

新しいサーバーを実装したいときは、MessagePackSocketクラスを継承して、on_message(msg)メソッドを実装してやればOKです。


標準入力からYAMLを受け取って、MessagePackでシリアライズしてサーバーに送信するクライアントは↓こんな感じになります:

require 'rubygems'
require 'msgpack'
require 'yaml'
require 'socket'

sock = TCPSocket.new(*ARGV)

Thread.start {
  buffer = ''
  nread = 0
  mpac = MessagePack::Unpacker.new

  while true
    buffer << sock.sysread(1024)

    while true
      nread = mpac.execute(buffer, nread)

      if mpac.finished?
        msg = mpac.data

        mpac.reset
        buffer.slice!(0, nread)
        nread = 0

        puts "received: #{msg.inspect}"
      end

      break if buffer.length <= nread
    end
  end
}

while line = $stdin.gets
  sock.write YAML.load(line).to_msgpack
  sock.flush
end


試しに実行してみると↓こうなります:
サーバー:

$ ruby server.rb
127.0.0.1:58659 connected
receive message: [1, 2, 3]
receive message: {"a"=>true, "b"=>false}
closed

クライアント:

$ ruby client.rb localhost 9090
[1, 2, 3]               # キーボードから入力
received: [1, 2, 3]
{a: true, b: false}     # キーボードから入力
received: {"a"=>true, "b"=>false}