MapReduce on Tyrant
先日、隅田川の屋形船で花見と洒落込んだのですが、その日はまだ一分咲きも行ってなくて悲しい思いをしたmikioです。今回はTokyo Tyrant(TT)に格納したデータを対象としてMapReduceのモデルに基づく計算をする方法について述べます。
MapReduceとは
Googleが使っているという分散処理の計算モデルおよびその実装のことだそうですが、詳しいことはググってください。Googleによる出自の論文やApacheプロジェクトによるHadoopなどのオープンソース実装にあたるのもよいでしょう(私は両者とも詳しく見ていませんが)。
今回の趣旨は、CouchDBがMapReduceと称してJavaScriptで実現しているデータ集計方法をTTとTCとLuaでやってみようじゃないかということです。簡単に言えば、以下の処理を実装します。
- ユーザから計算開始が指示されると、TTは、DB内の各々のレコードに対してmap関数を適用する
- map関数の引数には各レコードのキーと値が渡される。
- map関数は何らかの処理を行ってmapemit関数を呼び出す
- mapemit関数の引数にはキーと値が渡される
- TTは、mapemitに渡された値をキー毎にまとめて保持する
- 一連のmap関数が終わると、TTは、キー毎にまとめた値の集合の各々に対して、reduce関数を適用する
- reduce関数の引数にはキーとそれに対応する値の配列が渡される
- reduce関数は何らかの処理を行って、出力をユーザに返すかDB内に保存する
データベースやファイルシステムの中にある巨大なレコード群の中から計算対象を絞り込む「map」フェーズと、そうして得られた写像に対して集計処理を行う「reduce」フェーズに分けるのがミソらしいのですが、実はmapとreduceの間には、mapで出力したレコードをキー毎に分配してまとめる「shuffle&sort」フェーズが隠れています。分散処理の面倒な部分をshuffle&sortフェーズに集めて、かつそれをシステム側で暗黙裡に処理することで、アプリケーションプログラマが特に意識しなくても効率的な分散処理ができるようにしているらしいのです。つまり、アプリケーションプログラマはレコード群の集計処理をmap関数とreduce関数に分けて実装するという制約を受け入れるだけで、いつの間にか分散処理ができてしまうのです。言われてみるとその通りな気がしてくるわけですが、発見して実用化した人はすごいですよね。
TTの場合
MapReduceは巨大なレコード群をゴニョゴニョする仕組みですから、前提として巨大なレコード群を管理するデータベースなりファイルシステムなりが必要です。HadoopではHDFSやhBaseが、CouchDBではドキュメント指向のデータベースがそのようなストレージ層を担当しています。TTではもちろんTokyo Cabinet(TC)を用います。
さらに、map関数とreduce関数をアプリケーションプログラマに任意に記述してもらい、それをサーバ側で実行する仕組みも必要となります。Hadoopは主にJavaで、CouchDBは主にJavaScriptでそれを行います。TTではLuaを用います。
TT+TC+LuaがHadoopやCouchDBに比べて利点があるとは正直まったく思っていません。むしろTTのMapReduce機能は今回の記事のためのネタとして作ったようなものです。ただ、既にTTでデータを管理している場合には、サクっと集計処理を書きたくなった時に思い出していただけると便利な機能かもしれません。
実際に書いてみる
MapReduceのHello Worldはワードカウントだそうなので、TTでもワードカウントをやってみましょう。すなわち、TTに格納されている全レコードをテキストとみなし、その中に各英単語が何回現れるかを数えるのです。まず、TCとTTの最新版をインストールして、それから以下のLuaファイルを準備してください。
function wordcount() function mapper(key, value, mapemit) for word in string.gmatch(string.lower(value), "%w+") do mapemit(word, 1) end return true end local res = "" function reducer(key, values) res = res .. key .. "\t" .. #values .. "\n" return true end if not _mapreduce(mapper, reducer) then res = nil end return res end
とてもシンプルに記述できていますよね。mapper(map用関数)とreducer(reduce用関数)を予め定義しておき、それを_mapreduceというビルトイン関数に渡してMapReduceを実行するという意図が読み取れると思います。
mapperに着目してみます。これには各レコードのキーと値が渡されて、レコードのひとつひとつ毎に呼び出されます。第3引数のmapemitは、処理結果を出力するための関数です。今回はワードカウントをしたいので、元の文字列から単語を切り出して、その各々をmapemitに渡しています。単語が出現する度に1を加算するという処理を後でやるだけなので、mapemitが期待するキーにはその単語の文字列、値には1を指定します。戻り値は特にエラーがなければ真を返します。
reducerに着目してみます。mapperがmapemitに渡したデータがキー毎にまとめられて、各々のキーとそれに対応する値のリストがreducerに渡ってきます。値はひとつではなくて複数の値のリストが配列として渡されるところがポイントです。実際の処理ですが、今回は数を数えたいだけなので、配列の要素数をキーと一緒に出力するだけです。いま「出力する」と簡単に言いましたが、計算結果をどうするかというのも結構重要な課題です。Luaでは関数が定義時の環境のローカル変数を利用できるクロージャになるので、reducerでは呼び出し側で定義されている出力用バッファ(res)にデータを連結することで出力を生成しています。戻り値は特にエラーがなければ真を返します。
最後に_mapreduceを呼んでいます。第1引数と第2引数はmap用関数とreduce用関数です。mapの対象レコードが予め絞り込まれている時にそのキーを第3引数として指定することができますが、今は全体を処理したいので未指定にしておきます。_mapreduceは何らかのエラー(mapperやreducerが一度でも偽を返した場合)があれば偽を返し、そうでなければ真を返します。特にエラーがなければreducerが生成したバッファをクライアントに返して処理を終了します。
実行してみる
上記で準備したファイルをmapreducetest.luaなどとして保存してください。その上で、以下のようにTTのサーバを立ち上げます。
$ ttserver -ext mapreducetest.lua casket.tch
この状態ではまだ処理対象のレコードが1件もないので、先にそれを登録します。以下のコマンドを実行してください。
$ tcrmgr put localhost 1 "This is a pen." $ tcrmgr put localhost 2 "Is this your bag?" $ tcrmgr put localhost 3 "Your pen is good."
いよいよMapReduceによるワードカウントを実行します。Lua拡張で定義されたwordcount関数を呼び出すのです。
$ tcrmgr ext localhost wordcount
ちゃんと以下のような出力が出ましたよね? 単語が辞書順で提示され、出現数もきちんと集計できています。
a 1 bag 1 good 1 is 3 pen 2 this 2 your 2
他のLua拡張の関数と同様に、一度定義した関数はいつでも何度でも呼び出せますので、既にTTはワードカウント機能を備えたデータストレージに進化していると言えます。同じ要領で、古いレコードから順にexpireさせることもできますし、Wikiの自動リンクを生成することもできますし、全文検索用の転置インデックスを作成することもできます。
MapReduceによる演算結果は必ずしも結果を直接クライアントに返す必要はなく、結果をデータベースやファイルに保存してもよいし、保存しておいた結果データを対象にして再度MapReduceにかけることもできます。MapReduceの1回の処理でできることはかなり強く制約されますが、処理を繰り返すことで様々な用途に応用できるようになります。
まとめ
Tokyo TyrantとTokyo CabinetとLuaを用いて、MapReduceモデルに基づく計算を行う方法について説明しました。何だか難しそうな概念だと思いきや、意外に簡単にできるものだと感じていただければ幸いです。TTもTCもLuaもどちらかと言えば低水準すぎてユーザフレンドリなソフトウェアではないと思われることが多いかもしれませんが、組み合わせると結構高水準なことができるものなのです。
とはいえ、MapReduceは分散処理のための計算モデルなので、ここまで見てきたような1台しか使わない使用例では全く面白くありませんね。複数ノードのTTを使ったMapReduce機能は目下開発中ですので、近いうちにここでまた紹介できると思います。