Hadoop Streamingで分散処理をPHPでやってみた

「何番煎じか分からないけど集合知プログラミングをPHPでやってみたシリーズ」で扱っている集合知プログラミングは、とかく計算量が多くなりがちで、うまくアルゴリズムを作らないとメモリ不足に陥りがちです。

現に前回の記事では、その越えられない壁を体験してしまったので、「どげんかせんといかん。」という事で、最近ちょっと興味のある、Googleのバックエンドでも使われている"MapReduce"に関して少し調べてみました。

"MapReduce"に関しては、「"Googleを支える技術"読め。以上。」でもいいんですが、id:naoyaさんが書かれている記事が非常に分かりやすかったので、その記事のリンクを貼っておきます。
→MapReduce - naoyaのはてなダイアリー

"Googleを支える技術"もせっかくだからAmazonのリンクを貼っておきます。

Googleを支える技術 ?巨大システムの内側の世界 (WEB+DB PRESSプラスシリーズ)

Googleを支える技術 ?巨大システムの内側の世界 (WEB+DB PRESSプラスシリーズ)

何はともあれ体験してみる

MapReduceのオープンソースの実装であるHadoopというものがある事は知っていて、WEB+DB PRESS vol.47 & vol.48でid:naoyaさんが書かれていた記事も読んではいたものの、「やっぱり触ってみないと分かんないトコあるよね〜。」という事で、実際に触ってみる事に。

CodeZineさんの記事を参考にさせてもらいながら、Hadoopをインストールして、単語数カウントのサンプルを体験…。

もうほとんどそのままなぞっただけなので、ここでは実行結果は省略…。環境整えるだけだったら、ものすごく簡単でした。

でもJavaとかそんなバリバリ書けないよ(´・ω・`)

HadoopはJavaによる実装なので、Map関数・Reduce関数はJavaで書くのが普通です。が…「PHPerなんだし、やっぱりPHPで書きたいよね。」という訳で、やっと本題。

Hadoopには、標準入出力を使って任意の言語でMap関数・Reduce関数を書く事ができるようになる、"HadoopStreaming"という拡張があります。

こちらも詳細は、id:naoyaさんの記事に頼る事に…。
→Hadoop Streaming - naoyaのはてなダイアリー

つまりは、STDINを受け取って、keyとvalueをゴニョゴニョして整形して返すプログラムであれば、どんな言語でもMapReduceできちゃうって訳ですな。

という訳でMap/Reduce関数をPHPで書いてみた

対象の処理は、例によって空白区切りでの単語数のカウントです。まずはMap関数から。

#!/usr/bin/php
<?php
while ( !feof(STDIN) ) {
    $line = trim(fgets(STDIN));
    foreach ( preg_split('/\s+/', $line) as $word ) {
        if ( $word !== '' ) {
            echo "${word}\t1\n";
        }
    }
}
?>

続いてReduce関数。

#!/usr/bin/php
<?php
$count = array();
while ( !feof(STDIN) ) {
    $line = trim(fgets(STDIN));
    if ( $line !== '' ) {
        list($key, $value) = preg_split('/\t/', $line);
        $count[$key]++;
    }
}

foreach ( $count as $key => $value ) {
    echo "${key}\t${value}\n";
}
?>

いざ分散!

準備ができたところで、実際に動かしてみましょう。

サンプルに使う文書は、お約束の"hoge"とか"fuga"とかでも良かったんですが、せっかく分散環境な訳だし、多少文字数の多い物をサンプルにしてみる事にしました。

という訳で、青空文庫から"我輩は猫である"のテキストを持ってきて、事前にMeCabで分かち書きしたものを用意して入力文書にしてみました。*1

$ bin/hadoop jar contrib/streaming/hadoop-0.18.3-streaming.jar \
-input inputs/wagahaiwa_nekodearu_wakati.txt -output outputs \
-mapper 'php /home/hadoop/bin/mapreduce/php/wordcount/map.php' \
-reducer 'php /home/hadoop/bin/mapreduce/php/wordcount/reduce.php'

実行時の出力内容は省略しますが、タイムスタンプを見てみたら実行に掛かった時間は38秒…。

まぁ、元のファイルが1.4MBytes程度の小さなファイルなので、Hadoopのオーバーヘッドの方が大きくて、時間的恩恵を受けるのは無理ってもんですな…。*2

ちなみに、最初ちょっとはまってしまったんですが、mapperとreducerで渡すファイル名はフルパスで書いておかないとダメなようです。あるいは、"-file {ファイル名}"というオプションを使うと、各ノードにファイルを転送してから処理してくれるので、その場合は、mapper/reducerはファイル名だけでもいけるっぽいです。

で、ちゃんと結果が得られているか見てみましょう。Hadoopの実行結果は分散ファイルシステム上に保存されるので、一旦、通常のファイルシステム上に書き出してから、内容を見てみます。*3

$ dfsget outputs/part-00000 ~/outputs/wagahaiwa_nekodearu_wordcount.txt
$ sort -nrk 2 ~/outputs/wagahaiwa_nekodearu_wordcount.txt | head -20
の      9333
《      9214
》      9213
。      7487
、      6808
に      6805
て      6706
は      6570
ã‚’      6067
と      5629
が      5530
た      4118
で      3867
」      3614
「      3249
し      2648
ã‚‚      2609
だ      2456
ない    2413
から    2102

見事なまでに、"てにをは"が上位を占めていますね。(笑) 今回は単純な分かち書きを元にしましたが、名詞だけ抜き出したりすると、もっと面白い結果が得られるかもしれないですね。

しかし実は落とし穴が…

今回、ミニマムな環境で実験しましたが、入力ファイルがもっと超巨大なものだったとしてもそのまま応用が利くのが素晴らしいですね。

…と、非常に素晴らしいHadoopStreamingなんですが、実は一点落とし穴があって、id:naoyaさんも指摘されているんですが、HadoopStreamingはReducerへの入力が構造化されていないという問題点があります。その辺りについて、次回もうちょっと触れてみたいと思います。

という訳で、もうちょっとだけ続くんじゃ…。

*1:入力文書は事前にDFS上にアップしておく必要があります。

*2:実際、直接パイプで処理させた方が数倍早い…。

*3:dfscatもできるので、DFS上のファイルを直接参照する事もできます。