Kyoto Tycoon に MessagePack-RPC をプラグインして Java から使う

Tokyo Cabinet を始めとする Tokyo シリーズの作者として知られる平林幹雄さんですが、Tokyo シリーズに続く新製品として、Kyoto シリーズがリリースされています。
Kyoto Tycoon(以下KT)は、ネットワーク経由で使えるデータベースサーバで、Tokyo Tyrantの後継製品に当たります*1


KT は HTTP ベースのプロトコルで操作することができますが、別のプロトコルを追加することもできます。
実際に memcached プロトコルのプラグインが標準でバンドルされています。(memcachedプロトコルをKTにプラグインする


と言うわけで、KT を MessagePack-RPC で使えるようにするプラグインを書いてみました。github からダウンロードできます。


MessagePack-RPC を使うと、通信を非同期化したり、他の MessagePack-RPC サーバと同時に通信したりするプログラムを簡単に書けます。そして何より、高速です。

コンパイルしたライブラリを KT の -plsv オプションに指定すると、MessagePack-RPC でアクセスできるようになります:

$ ktserver -plsv /usr/local/libexec/libktmsgpack.dylib -plex 'port=18801'


この MessagePack-RPC をプラグインした KT を、Javaから利用してみます。
MessagePack-RPC の Java 版は、最近のバージョンアップで使い勝手が大きく向上しています:

import java.util.List;
import org.msgpack.rpc.Client;
import org.msgpack.rpc.loop.EventLoop;

public class SimpleExample {
  // RPCのインタフェースを宣言
  public static interface KyotoTycoonClient {
    void set(String key, String value);
    String get(String key);
    List<String> match_prefix(String prefix);
  }

  public static void main(String[] args) throws Exception {
    EventLoop loop = EventLoop.defaultEventLoop();
    Client cli = new Client("127.0.0.1", 18801, loop);
    KyotoCabinetClient kt = cli.proxy(KyotoCabinetClient.class);  // ココがポイント

    // setとget
    kt.set("k1", "v1");
    kt.set("k2", "v2");
    String v1 = kt.get("k1");
    String v2 = kt.get("k2");

    // プレフィックスで検索
    List<String> match = kt.match_prefix("k");

    System.out.println(v1);  #=> "v1"
    System.out.println(v2);  #=> "v2"
    System.out.println(match);   #=> ["k1", "k2"]

    cli.close();
    loop.shutdown();
  }
}


KyotoTycoonClient kt = cli.proxy(KyotoTycoonClient.class); の行がポイントです。
proxy() メソッドにインタフェースを渡すと、そのインタフェースが自動的に実装されたオブジェクト*2が返ってきます。このオブジェクトのメソッドを呼ぶと、リモートサーバの機能を呼び出せます。


この KyotoTycoonClient インタフェースは、kt-msgpack パッケージの中に同梱しています:


インタフェースの定義で返り値を Future にすると、非同期型の呼び出しになります。非同期呼び出しを使うと、遅延を隠蔽して処理時間を短縮することができます。立て続けに複数の呼び出しを行うときに効果的です。
↓このプログラムは、入力データを処理しながら KT から次々にデータを取り出し、最後に結果を表示するプログラムです。

import java.util.Map;
import java.util.HashMap;
import org.msgpack.rpc.Client;
import org.msgpack.rpc.loop.EventLoop;
import org.msgpack.rpc.Future;

public class Test {
  // RPCのインタフェースを宣言
  public static interface KyotoTycoonClient { 
    void set(String key, String value);
    String get(String key);

    // 戻り値をFuture<T>にして、メソッド名の末尾に "Async "を付ける
    Future<String> getAsync(String key);
  }

  public static Future<String> getUserInfo(KyotoTycoonClient kt, String user) {
    return kt.getAsync(user+"/info");
  }

  public static void main(String[] args) throws Exception {
    EventLoop loop = EventLoop.defaultEventLoop();
    Client cli = new Client("127.0.0.1", 18801, loop);
    KyotoTycoonClient kt = cli.proxy(KyotoTycoonClient.class);

    // とりあえずデータをセットしておく
    kt.set("viver/info", "http://d.hatena.ne.jp/viver");
    kt.set("frsyuki/info", "http://github.com/frsyuki");
    kt.set("muga/info", "http://github.com/muga");
    kt.set("msgpack/info", "http://github.com/msgpack");

    // 入力データ:
    String[] input = new String[] { "viver", "frsyuki", "muga", "msgpack" };

    // KTから非同期にデータを取り出していく...
    Map<String, Future<String>> map = new HashMap<String, Future<String>>();
    for(String user : input) {
      Future<String> data = getUserInfo(c, user);
      map.put(user, data);
    }

    // 通信はこの間にバックグラウンドで進行...

    // 最後にデータを出力
    StringBuilder sb = new StringBuilder();
    for(Map.Entry<String, Future<String>> pair : map.entrySet()) {
      sb.append(pair.getKey());
      sb.append(":  ");
      sb.append(pair.getValue().get());
      sb.append("\n");
    }
    // 出力結果:
    //   msgpack:  http://github.com/msgpack
    //   muga:  http://github.com/muga
    //   frsyuki:  http://github.com/frsyuki
    //   viver:  http://d.hatena.ne.jp/viver

    System.out.println(sb.toString());

    cli.close();
    loop.shutdown();
  }
}

*1:今ならKTの説明会が無償^^; [http://fallabs.com/mikio/tech/promenade.cgi?id=113:title=Kyoto Tycoon普及大作戦]

*2:実体はjava.lang.reflect.Proxy。動的コード生成によってリフレクションを排除する実装も進んでいます。