読者です 読者をやめる 読者になる 読者になる

Ruby で MessagePack-RPC

高速なオブジェクトシリアライズ形式 MessagePack をプロトコルに採用したRPCライブラリをリリースしました。
Ruby を使って簡単にRPCサーバーやクライアントを実装できます。

MessagePack-RPC プロトコルは既にkumofsやクラスタ管理ツール「clx」などで利用しており、高速なサーバーの実装にも便利ツールの実装にも、幅広く使えるシンプルなプロトコル仕様になっています。

clxを使うと複数のサーバーをグループに分けて、同じグループに属するサーバーに対して同じコマンドを実行できます。コマンドは並列して実行されるので、ファイル転送(rsync)のような時間のかかるコマンドでも快適に使えます。


clxのコアは汎用的なRPCサーバーで、RPC以外の機能はすべてモジュールとして実装されています。モジュールは起動時に登録できるほか、実行中でも追加でき、RPCを使ってネットワーク越しにモジュールを追加することもできます。


PFIインターンに行ってきました。


クライアントAPIはRPCの非同期呼び出しをサポートし、サーバーAPIはRPCの遅延リターンをサポートしています。本格的に非同期・イベント駆動なアプリケーションを書くことができるので、分散アプリケーションのモックアップを開発するのにもぴったりです。

まず全体のモックアップを作り、次にサーバー側だけ本番コードに書き直していくように、モックコードと本番コードでプロトコルを変更せずに段階的に開発を進めていくことができます。
あるいはRPCサーバーのテストコードを書くのにも便利でしょう。


I/O処理部分には高速なイベント駆動ライブラリの libev (のRubyバインディングrev) を利用しているので、実用的にも十分使える速度で動かもしれません:-p

プロトコルパーサにMessagePackを利用しているので、特に長い文字列を送受信する場合には高速に動作するはずです*1。そのような場合はCPUよりもネットワーク帯域がボトルネックになるはずなので、Rubyでも相当に高速なサーバーやクライアントを実装できるかもしれません。


他にも、例えばサーバーをRubyで書いてクライアントをperlで書くなど、多くの言語の得意分野を活用しながら1つの分散システムを構築したい場合にも便利に使えると思います。


↓こんな感じでコードを書けます:

  • クライアント側
require 'msgpack/rpc'

# localhost:5000 に接続
cli = MessagePack::RPC::Client.new("localhost", 5000)

# 5秒でタイムアウトさせる
cli.timeout = 5


# helloメソッドを呼び出して応答を待つ
puts cli.call(:hello)


# 非同期にhelloメソッドを呼び出して...
cli.callback(:hello) do |err, res|
  # ...応答が帰ってきたらこのブロックが呼ばれるようにする
  puts res
end


# 非同期にhelloメソッドを呼び出して...
req = cli.send(:hello)
# ...応答を待つ
puts req.join.result


# 非同期に並列して100回helloメソッドを呼び出して...
reqs = []
100.times do
  reqs.push cli.send(:hello)
end

# ...応答を待ち受ける
reqs.each do |req|
  puts req.join.result
end

cli.close
  • サーバー側
require 'msgpack/rpc'

# RPCメソッドを実装したクラス
class MyServer

  # helloメソッドを実装
  def hello
    return "Hello, World!"
  end

  # RPCの遅延リターン
  def async_hello
    # MessagePack::RPC::AsyncResult のインスタンスを返すと遅延リターンに
    as = MessagePack::RPC::AsyncResult.new

    Thread.new do
      # たとえばスレッドプールに投入して非同期でタスクを実行し...
      sleep 1
      # ...処理が終わったら結果を返す
      as.result "ok."
    end

    return as
  end
end

# 5000番ポートでlisten
svr = MessagePack::RPC::Server.new
svr.listen("0.0.0.0", 5000, MyServer.new)

# シグナルをキャッチしたら終了
Signal.trap(:TERM) { svr.stop }
Signal.trap(:INT)  { svr.stop }

svr.run


MessagePack for Ruby は gem でインストールできます:

gem install msgpack-rpc


以下のgemパッケージに依存しています。gem install msgpack-rpc で一緒にインストールされます:


最後に MessagePack-RPC を使った実用的なプログラムの例を載せておきます:
このRPCサーバーは、リモートからBrainfuckで書かれたプログラムを投げ込むと、実行して結果を返してくれます。なんとOok!にも対応しています。
RPCで Ruby, Brainfuck, Ook! の3つの言語を切り替えることができます。

  • サーバー側
require 'rubygems'
require 'msgpack/rpc'
require 'stringio'


# Ruby
def ruby_run(code)
  eval(code)
end

# Brainfuck
def brainfuck_run(code)
  code.gsub!(/\s/,'')

  mem = []
  ptr = 0
  stack = [0]
  addr = 0

  while code.length > addr
    case code[addr].chr
    when '>'
      ptr += 1
    when '<'
      ptr -= 1
    when '+'
      if mem[ptr].nil?
        mem[ptr] = 0
      end
    mem[ptr] += 1
    when '-'
      if mem[ptr].nil?
        mem[ptr] = 0
      end
    mem[ptr] -= 1
    #when ','
    #  mem[ptr] = STDIN.getc
    when '.'
      print mem[ptr].chr
    when '['
      if mem[ptr] == 0
        while code[addr].chr == ']'
          addr += 1
        end
      else
        stack.push(addr)
      end
    when ']'
      if mem[ptr] != 0
        addr = stack.pop - 1
      else
        stack.pop
      end
    end
    addr += 1
  end
end

# Ook!
def ook_run(code)
  elems = code.strip.split(/\s/)
  i=0; xa = elems.select {|x| (i+=1) % 2 == 1 }
  i=0; xb = elems.select {|x| (i+=1) % 2 == 0 }
  xs = xa.zip(xb)
  bf = ""
  xs.each {|cmd|
    case cmd
    when %w[Ook. Ook?]
      bf << ">"
    when %w[Ook? Ook.]
      bf << "<"
    when %w[Ook. Ook.]
      bf << "+"
    when %w[Ook! Ook!]
      bf << "-"
    when %w[Ook. Ook!]
      bf << ","
    when %w[Ook! Ook.]
      bf << "."
    when %w[Ook! Ook?]
      bf << "["
    when %w[Ook? Ook!]
      bf << "]"
    end
  }
  brainfuck_run(bf)
end


# RPCサーバー
class Service
  def initialize(default)
    setlang(default)
  end

  # 引数で渡されたコードを実行
  def run(cmd)
    puts "#{@lang_name}\n#{cmd}"

    # $stdoutをすり替え
    $stdout = StringIO.new
    begin
      # 実行!
      @lang.call(cmd)
      result = $stdout.string
    ensure
      $stdout.close
      $stdout = STDOUT
    end

    puts result
    return result
  end

  # 言語を切り替え
  def setlang(lang)
    lang.strip!
    puts "change lang #{lang}"
    @lang = method("#{lang}_run")
    @lang_name = lang
    lang
  end
end

if ARGV.length != 1
  puts "usage: #{$0} <port>"
  exit 1
end
port = ARGV.shift.to_i

service = Service.new('ruby')  # デフォルトはruby

svr = MessagePack::RPC::Server.new
svr.listen("0.0.0.0", port, service)
svr.run
  • クライアント側
require 'rubygems'
require 'msgpack/rpc'

# クライアントのラッパー
class Client < MessagePack::RPC::Client
  def run(cmd)
    puts call(:run, cmd)
  end
  def setlang(lang)
    puts call(:setlang, lang)
  end
end

cli = Client.new("127.0.0.1", 5000)
cli.timeout = 5


# Ruby で Hello World!
cli.setlang "ruby"
cli.run <<END
puts "Hello, World!"
END


# Brainfuck で Hello World!
cli.setlang "brainfuck"
cli.run <<END
+++++++++[>++++++++>+++++++++++>+++++<<<-]>.>++.+++++++..+++.>-.
------------.<++++++++.--------.+++.------.--------.>+.
END


# Ook! で Hello World!
cli.setlang "ook"
cli.run <<END
Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook.
Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook! Ook? Ook. Ook?
Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook.
Ook. Ook. Ook. Ook? Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook.
Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook.
Ook. Ook? Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook? Ook. Ook.
Ook? Ook. Ook? Ook. Ook? Ook. Ook? Ook. Ook! Ook! Ook? Ook!
Ook. Ook? Ook. Ook. Ook. Ook. Ook! Ook. Ook. Ook? Ook. Ook.
Ook! Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook.
Ook. Ook. Ook. Ook. Ook! Ook. Ook! Ook. Ook. Ook. Ook. Ook.
Ook. Ook. Ook! Ook. Ook. Ook? Ook. Ook. Ook. Ook. Ook! Ook.
Ook? Ook. Ook? Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook.
Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook.
Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook! Ook.
Ook. Ook? Ook! Ook. Ook. Ook. Ook. Ook. Ook. Ook. Ook! Ook.
Ook! Ook! Ook! Ook! Ook! Ook! Ook! Ook! Ook! Ook! Ook! Ook!
Ook! Ook. Ook! Ook! Ook! Ook! Ook! Ook! Ook! Ook! Ook! Ook!
Ook! Ook! Ook! Ook! Ook! Ook! Ook! Ook. Ook. Ook? Ook. Ook.
Ook! Ook. Ook. Ook? Ook! Ook.
END

*1:文字列のデシリアライズはゼロ・コピーで行われるので、長さが長大になっても非常に高速にデシリアライズできます。