Hadoop MapReduce デザインパターンの3章まで読んでみた。

Hadoop MapReduce デザインパターン ―MapReduceによる大規模テキストデータ処理

Hadoop MapReduce デザインパターン ―MapReduceによる大規模テキストデータ処理

いわゆる子象本。小象でも小僧でも無いよw

監修者のブログの
Hadoop MapReduce デザインパターン - 急がば回れ、選ぶなら近道
で詳しく紹介されています。

原著はこちら
Data-Intensive Text Processing With MapReduce (Synthesis Lectures on Human Language Technologies)

また原文のPDFをこちらから入手できます。
Jimmy Lin » Data-Intensive Text Processing with MapReduce

3章までの目次はこうなってます。

1章 イントロダクション
2章 MapReduceの基礎
3章 MapReduceアルゴリズムの設計


1章はさらっと読んで、2章は象本読んでいればある程度わかる内容ですがまた違った書き方なのでちゃんと読んだほうがいいでしょう。

1,2章に関してはこちらも参考になると思います。
第1回MapReduce本読書会 - 科学と非科学の迷宮


そして3章がメインです。


3章に関してはこちらも参考になると思います。



3.1 ローカル集約 は読めばなんとなくわかると思います。
3.2 pairsとstripes はpairsはわかると思いますがstripesがちょっとわかりにくい気がします。


なのでここら辺を書きたいと思います。


まず題材となっているのは共起です。

共起(collocation)とは、自然言語処理の分野において、任意の文において、ある単語とある単語が同時に出現することです。

共起行列はユニークな単語の数がnとした場合のn x nの正方行列です。行列の要素m_{ij} にはある文脈内での単語w_{i}と単語w_{j}に対する共起回数が入ります。

文脈を1行と考えて一番単純な下記のケースを考えてみます。

aaa bbb

この場合はこんな感じになります。話を単純化するために対角線上は無視します。また対称性をもちm_{ij}m_{ji}は同じ値になるものとします。

aaa bbb
aaa 1
bbb 1

別の例として

aaa bbb ccc
aaa ccc

だったら

aaa bbb ccc
aaa 1 2
bbb 1 1
ccc 2 1

です。

MapReduceで解く問題は入力が文章で出力が共起行列です。

入力

aaa bbb

出力

aaa bbb 1
bbb aaa 1

って感じですね。出力も行列表示できたほうが見やすいですが、そこは本質ではないので、単語 単語 共起回数 という表示形式にします。


これをpairsで解く場合の疑似コードはこちら

なんだかイマイチ読みにくい気がしますが、それはおいといてJavaで書くとこんな感じ。

ちなみにこのコードに関しては
MapReduce デザインパターン (2) - めもめも
を参考にさせていただきました。m( )m

	public static class PairsMapper extends MapReduceBase implements
			Mapper<LongWritable, Text, TextPair, IntWritable> {

		private final static IntWritable one = new IntWritable(1);

		@Override
		public void map(LongWritable key, Text value,
				OutputCollector<TextPair, IntWritable> output, Reporter reporter)
				throws IOException {
			String line = value.toString();
			String[] words = line.split("\\s+");

			for (String first : words) {
				for (String second : words) {
					if (!first.equals(second)) {
						output.collect(new TextPair(first, second), one);
					}
				}
			}
		}
	}

	public static class PairsReducer extends MapReduceBase implements
			Reducer<TextPair, IntWritable, TextPair, IntWritable> {

		@Override
		public void reduce(TextPair key, Iterator<IntWritable> values,
				OutputCollector<TextPair, IntWritable> output, Reporter reporter)
				throws IOException {
			int sum = 0;
			while (values.hasNext()) {
				sum += values.next().get();
			}
			output.collect(key, new IntWritable(sum));
		}
	}

mapの出力はキーがaaaとbbbで、値が1ですね。reduceはそれを合計してます。単純ですね。


一方stripesの疑似コードはこんな感じ

いやあよくわからないですねw

MapReduce デザインパターン (3) - めもめもを参考にJavaで実装するとこんな感じ。もはやあっているのか全く自信がありませんw

	public static class StripesMapper extends MapReduceBase implements
			Mapper<LongWritable, Text, Text, MapWritable> {

		@Override
		public void map(LongWritable key, Text value,
				OutputCollector<Text, MapWritable> output, Reporter reporter)
				throws IOException {
			String line = value.toString();
			String[] words = line.split("\\s+");
			MapWritable countMap = new MapWritable();

			for (String first : words) {
				for (String second : words) {
					if (!first.equals(second)) {
						IntWritable countWritable = (IntWritable)countMap.get( new Text( second ) );
						int count = (countWritable == null) ? 1 : countWritable.get() + 1;
						countMap.put(new Text(second), new IntWritable(count));
					}
				}
				output.collect(new Text(first), countMap);
				countMap.clear();
			}
		}
	}

	public static class StripesReducer extends MapReduceBase implements
			Reducer<Text, MapWritable, TextPair, IntWritable> {

		@Override
		public void reduce(Text key, Iterator<MapWritable> values,
				OutputCollector<TextPair, IntWritable> output, Reporter reporter)
				throws IOException {
			
			MapWritable totalMap = new MapWritable();
			while (values.hasNext()) {
				MapWritable countMap = values.next();
				for(Writable word : countMap.keySet()) {
					IntWritable totalWritable = (IntWritable)totalMap.get( word );
					int total = (totalWritable == null) ? 0 : totalWritable.get();
					IntWritable countWritable = (IntWritable)countMap.get(word);
					int count = countWritable.get();
					totalMap.put(word, new IntWritable(total + count));
				}
			}
			
			for(Writable word : totalMap.keySet()) {
				output.collect(new TextPair(key, (Text)word), (IntWritable)totalMap.get(word));
			}
		}
	}


例えば

aaa bbb ccc
aaa ccc

が来た場合を考えてみます。
1行目が来た時点でのMapperの出力はキーがaaa, 値がbbbと1, cccと1のMap(countMap)です。キーがbbbのときとcccのときとあわせて考えると以下の3レコードが出力されます。{}でMapを表しているつもりです。

aaa, {bbb:1, ccc:1}
bbb, {aaa:1, ccc:1}
ccc, {aaa:1, bbb:1}

2行目が来た時点でのMapperの出力はこんな感じ。

aaa, {ccc:1}
ccc, {aaa:1}

なのでReducerに来る頃にはキーと値はこんな感じになってます。[]でリストを表しているつもりです。

aaa, [{bbb:1, ccc:1}, {ccc:1}]
bbb, [{aaa:1, ccc:1}]
ccc, [{aaa:1, bbb:1}, {aaa:1}]

Reducer側のtotalMapはReduceにわたってくる値(countMapのIterator)を合計したものになります。

aaa, [{bbb:1, ccc:1}, {ccc:1}]がReducerにわたってきたら(keyがaaaでvaluesが [{bbb:1, ccc:1}, {ccc:1}])

{bbb:1, ccc:2}

です。

よってReducerの出力は

aaa bbb 1
aaa ccc 2

となります。


最終的な出力は下記のようになります。

aaa ccc 2
aaa bbb 1
bbb ccc 1
bbb aaa 1
ccc bbb 1
ccc aaa 2


で、pairsとstripesの比較ですが、

pairsのmap出力のパターンの数は1行の単語数がnなら\frac{n(n-1)}{2}ですがstripesはnなのでソート処理の負荷は少ないです。

そのようなこともあり一般的にはstripesのほうがpairsよりも高速です。

一方でpairsには中間データをメモリに持たないというメリットがあります。

ま、くわしくは本読んでくださいw

ソースは下記に上げてます。
https://github.com/wyukawa/HadoopMapReduceDesignPattern


ちなみに3章の残りですが、


3.3 相対頻度の計算 に関しては
MapReduce デザインパターン (4) - めもめも
を見るといいかも。

partitioner作ってセカンダリソートするんだと思います。たぶん。


3.4 セカンダリソート に関しては象本読んだほうがいいと思います。具体例が無いのでこれだけで理解するのは難しい気がします。


3.5 リレーショナルな結合 も同様な理由で象本のほうがいいかも。

ただし象本もreduce側結合しかサンプルコードがありません。

map側結合のサンプルコードが読みたいなら
Google Code Archive - Long-term storage for Google Code Project Hosting.にあるJoinWithDeptNameUsingMapSideJoin.javaを見る方がいいかも。

memory-backed結合はよくわからんけど、Hiveのjoinの最適化 - wyukawa’s blogのMap Joinのことを言っている気がします。


長くなりましたが、いじょ