140行で作る分散リアルタイム検索エンジン(Twitter Streaming API対応)

マトモに使えるRPCライブラリ MessagePack-RPC for Ruby のバージョン 0.2.0 をリリースしました!
新たにコネクションプーリングの機能を追加しました。一度接続したコネクションを共有して使い回すことができます。コネクションを何度も張り直す負荷と遅延を削減でき、リソースの消費も抑えられます。
また、不意に切断されたコネクションを自動的に再接続する機能を導入し、信頼性を向上させています。


これを使って何か作ってみようと言うことで、twitterのリアルタイム検索エンジンを作ってみました。日本語を検索できないなど機能は貧弱ですが、プログラム全体がわずか140行に収まっています(クローラ27行、インデクサ48行、クラスタ管理ノード37行、検索クライアント28行)。
新しいつぶやきを受信するたびに、リアルタイムで転置インデックスを作成していきます。インデックスを作成するノードを複数用意して負荷を分散させられるのがポイントです。



転置インデックスを保存するストレージにはmemcachedプロトコルを使っています。memcached自体のほかに、Tokyo Tyrant(最新版が必要^^)も使えます。
Managerノードでクラスタ全体の構成情報を集約管理しているので、設定ファイルいらずで動的にクラスタを構築できます。


実際に Tokyo Tyrant を使って、10万件のつぶやきについて転置インデックスを作成してみました。
例えば"google"というキーワードを検索してみると、このようになります:

docid=6382473204        Tech update - Google Officially Launching Chrome Extensions Next Week: A couple weeks ago, Google unveiled its Chro... http://bit.ly/5QQOIP
(...中略...)
docid=6397668504        How to Manage a Group Project in Google Wave [Google Wave] http://ff.im/-cvEzA
179 entries.


ソースコードはgithubにあります:http://github.com/frsyuki/simple-realtime-search
特徴的なクローラのコードを見て見ると、このようになっています:

require 'msgpack/rpc'
require 'tweetstream'    # Twitter Streaming API のライブラリ

# 引数を解析...
mgr_host, mgr_port = (ARGV.shift || "").split(':')
user = ARGV.shift
pass = ARGV.shift
if !ARGV.empty? || mgr_port.nil? || pass.nil?
  puts "Usage: #{$0} <manager address:port> <twitter user> <twitter pass>"
  exit 1
end

# Managerからクラスタの構成情報を取得
mgr = MessagePack::RPC::Client.new(mgr_host, mgr_port.to_i)
db_addr = mgr.call(:getDBServer)    # memcachedのアドレスを取得
nodes = mgr.call(:getNodes)    # Indexerノードのアドレス(複数)を取得
mgr.close

# RPCセッションプール(新機能!)のインスタンスを作成
sp = MessagePack::RPC::SessionPool.new
round_robin = 0

# 新しいエントリを受信するごとに...
TweetStream::Client.new(user, pass).sample do |stat|
  # ラウンドロビンでIndexerノードを選択
  host, port = nodes[ (round_robin+=1) % nodes.size ]

  # RPCセッションプールから相手先のIndexerノードを取得して、インデクシング要求を発行
  sp.get_session(host, port).call(:indexText, stat[:id], stat[:text])

  # 一度接続したコネクションは使い回されるので、送信元ポートを使い尽くすこともなく、安全・高速
  # connectやデータの送信は非同期・Non-Blocking

  puts "#{round_robin}\tindexing on #{host}:#{port}: (id=#{stat[:id]}) #{stat[:text]}"
end


特に変わったことはしておらず、新しいエントリを受信するごとに、ラウンドロビンで選択したIndexerノードに要求を投げているだけです。


分散アプリケーションって意外と簡単に書けるんだなーという気がしてきますね!

裏側で MessagePack-RPC ライブラリが良きに計らってくれるのがポイントです。


中でも「RPCセッション」が便利で、「コネクション」という概念が隠蔽されています。
TCPを使って信頼性の高いネットワークプログラムを書こうとすると、プーリングしているコネクションが不意に切れたときにどうするか(どうやってコンテキストを維持したまま再接続するか)、connectのタイムアウトを待っている間に長時間スレッドが止まるのが許容できないなどの問題に悩まされます。
そこで抽象度の高い「コネクション」という概念を隠蔽し、さらに高次の信頼性を確保するレイヤーを再構築する必要があります。これをいかに小さいオーバーヘッドで確実に実装するかが各種RPCライブラリの腕の見せ所になるわけですが…MessagePack-RPC for Ruby を使えば何も考えなくていいですよーという話になります^^;


実際に全部のノードを1台のホスト上で動かすには、↓このように実行します:

# 最新の MessagePack-RPC と、各種gemをインストール
gem install msgpack-rpc
gem install system_timer
gem install memcache-client

# クラスタ起動
memcached -p 11211 &
./rrse-manager 5000 127.0.0.1:11211 &
./rsse-indexer  127.0.0.1:5000 127.0.0.1:6001 &
./rsse-indexer  127.0.0.1:5000 127.0.0.1:6002 &

# クロール開始
./rsse-crawler  127.0.0.1:5000 twitter-user-name password

# キーワードを検索してみる
./rrse-searcher 127.0.0.1:5000 keyword


マトモに使えるRPCライブラリ MessagePack-RPC for Ruby でした。