Kyoto Tycoon に MessagePack-RPC をプラグインして Java から使う
Tokyo Cabinet を始めとする Tokyo シリーズの作者として知られる平林幹雄さんですが、Tokyo シリーズに続く新製品として、Kyoto シリーズがリリースされています。
Kyoto Tycoon(以下KT)は、ネットワーク経由で使えるデータベースサーバで、Tokyo Tyrantの後継製品に当たります*1。
KT は HTTP ベースのプロトコルで操作することができますが、別のプロトコルを追加することもできます。
実際に memcached プロトコルのプラグインが標準でバンドルされています。(memcachedプロトコルをKTにプラグインする)
と言うわけで、KT を MessagePack-RPC で使えるようにするプラグインを書いてみました。github からダウンロードできます。
MessagePack-RPC を使うと、通信を非同期化したり、他の MessagePack-RPC サーバと同時に通信したりするプログラムを簡単に書けます。そして何より、高速です。
コンパイルしたライブラリを KT の -plsv オプションに指定すると、MessagePack-RPC でアクセスできるようになります:
$ ktserver -plsv /usr/local/libexec/libktmsgpack.dylib -plex 'port=18801'
この MessagePack-RPC をプラグインした KT を、Javaから利用してみます。
MessagePack-RPC の Java 版は、最近のバージョンアップで使い勝手が大きく向上しています:
import java.util.List; import org.msgpack.rpc.Client; import org.msgpack.rpc.loop.EventLoop; public class SimpleExample { // RPCのインタフェースを宣言 public static interface KyotoTycoonClient { void set(String key, String value); String get(String key); List<String> match_prefix(String prefix); } public static void main(String[] args) throws Exception { EventLoop loop = EventLoop.defaultEventLoop(); Client cli = new Client("127.0.0.1", 18801, loop); KyotoCabinetClient kt = cli.proxy(KyotoCabinetClient.class); // ココがポイント // setとget kt.set("k1", "v1"); kt.set("k2", "v2"); String v1 = kt.get("k1"); String v2 = kt.get("k2"); // プレフィックスで検索 List<String> match = kt.match_prefix("k"); System.out.println(v1); #=> "v1" System.out.println(v2); #=> "v2" System.out.println(match); #=> ["k1", "k2"] cli.close(); loop.shutdown(); } }
KyotoTycoonClient kt = cli.proxy(KyotoTycoonClient.class); の行がポイントです。
proxy() メソッドにインタフェースを渡すと、そのインタフェースが自動的に実装されたオブジェクト*2が返ってきます。このオブジェクトのメソッドを呼ぶと、リモートサーバの機能を呼び出せます。
この KyotoTycoonClient インタフェースは、kt-msgpack パッケージの中に同梱しています:
インタフェースの定義で返り値を Future
↓このプログラムは、入力データを処理しながら KT から次々にデータを取り出し、最後に結果を表示するプログラムです。
import java.util.Map; import java.util.HashMap; import org.msgpack.rpc.Client; import org.msgpack.rpc.loop.EventLoop; import org.msgpack.rpc.Future; public class Test { // RPCのインタフェースを宣言 public static interface KyotoTycoonClient { void set(String key, String value); String get(String key); // 戻り値をFuture<T>にして、メソッド名の末尾に "Async "を付ける Future<String> getAsync(String key); } public static Future<String> getUserInfo(KyotoTycoonClient kt, String user) { return kt.getAsync(user+"/info"); } public static void main(String[] args) throws Exception { EventLoop loop = EventLoop.defaultEventLoop(); Client cli = new Client("127.0.0.1", 18801, loop); KyotoTycoonClient kt = cli.proxy(KyotoTycoonClient.class); // とりあえずデータをセットしておく kt.set("viver/info", "http://d.hatena.ne.jp/viver"); kt.set("frsyuki/info", "http://github.com/frsyuki"); kt.set("muga/info", "http://github.com/muga"); kt.set("msgpack/info", "http://github.com/msgpack"); // 入力データ: String[] input = new String[] { "viver", "frsyuki", "muga", "msgpack" }; // KTから非同期にデータを取り出していく... Map<String, Future<String>> map = new HashMap<String, Future<String>>(); for(String user : input) { Future<String> data = getUserInfo(c, user); map.put(user, data); } // 通信はこの間にバックグラウンドで進行... // 最後にデータを出力 StringBuilder sb = new StringBuilder(); for(Map.Entry<String, Future<String>> pair : map.entrySet()) { sb.append(pair.getKey()); sb.append(": "); sb.append(pair.getValue().get()); sb.append("\n"); } // 出力結果: // msgpack: http://github.com/msgpack // muga: http://github.com/muga // frsyuki: http://github.com/frsyuki // viver: http://d.hatena.ne.jp/viver System.out.println(sb.toString()); cli.close(); loop.shutdown(); } }