MapReduceのもう一つの使い方

MapReduceというと集計に使うモノという人が多いと思う。
なんとなれば、MapReduce=Hadoop=ワードカウントの図式になっているからだ。
実際、Hadoopを触ってみようという人のほとんどはexampleとしてワードカウントを使うはず。その辺に落ちてるシェークスピアのログでHadoopのexampleを動かした人もおおいはず。

ところが実際に業務バッチ的な処理を行うときに、MapReduceの効果的な使い方は別にもある。個人的は、「本当のMapReduceの使いかた」はこっちだと思う。なんということはなくて「組み合わせ計算を高速に行う」だ。ある種の計算では、順序処理でギブアップしてしまうケースの一つに組み合わせの計算がある

Node{
	List<Node> nodeList
	value(){
		hasNodeList() ? nodeList.traverse(value()) : process() -> upperNode()
		}
}

みたいなモデルにおいて

process(ALL.Node.value());
を実行するようなケースだ。

これはRDBMSでは破壊的に相性が悪い。

N階層マルチJoinになる。全部メモリーに乗り切れば力技でやれるけど、
限界を超えた瞬間、DBはモノリスのように黙る。小さなデータで試して、「隊長!できました!超おKっす!完璧っす!」って持ってくる人いるけど僕が悪かったので、勘弁してください。お願いします。

データ構造的には大きなグラフを形成していると思えばいい。グラフ構造というと引いてしまう人もいると思うけど。話題のソーシャルなんかは大抵はグラフ構造なので、今後は避けて通れないと思う。

MapReduceするとどうなるか?って話だけど・・・

Mapper{
	List<Node> list
	list.map(process() -> Tuple<Key, UpperNode>)
	emit list<Tuple>
	}

ばらまいたNodeで一斉に計算して、その上位のNodeのデータを投げて
NodeのリストつくりReducerにわたす。

Reducer{
	SortedList<Tuple> list
	list.fold(summerize() -> Node)
	emit Node
}

ReducerはもらったNodeリスト(MapReduceのお約束でこれはソートされている)を受け取って、Node単位で集計する。

あとはMap→Reduceの繰り返しになる

尚、MapにReducerから処理を渡す時はListになる。モデル的にこんな感じで進む。但し実装上はMapperからReducerの転送時に転送量を考慮する必要がある。

このモデルでは大抵はいわゆるStripesパターンを使う
http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.169.6896&rep=rep1&type=pdf
の3章あたりが参考。

このMapreduceの特徴は、ポイントは2つあって、一つは並列処理ができるということ。前述の場合は、process(ALL.Node.value())は
シーケンスに処理されることが多い。Nodeを一個づつ処理する感じですね。これでは遅いです。Mapreduceでは基本的に並列で処理ができます。

・・・・・・・・・・とまぁ、ここまでは普通に教科書的な話。

もうひとつは
実際に業務系で利用する場合に、一定のNodeで処理すべき計算が計算可能かどうか判断する必要があるということです。

例えば、Nodeの計算が参照している先のNodeの特定の値を集計する場合を考えてみよう。処理を行うべきNodeの計算自体は、参照先のNodeからのデータを参照して演算を行う。このとき、参照先のNodeの計算が終了していることが望ましい。

仮に、参照先のNodeの計算がまだ完了していない段階で自身の計算を実行してもそれは途中経過でしかない。すべてのノードの計算の結果が完了するまで繰り返し自身の計算を繰り返すことになる。だから「自身の計算が可能かどうかの判断分岐」を記述しておく必要がある。

勿論、繰り返し計算しても問題がない場合もある。特にグラフの参照が循環している可能性が高い場合は、逆に繰り返し計算をしても問題がないようにデザインしておく必要がある。そのような場合は、値が一定になった段階で収束したという風に考えることが多い。
(GoogleのPageRankの計算なんかは代表例)

いずれにしろ、ある程度構造がみえるモノであれば、これは無駄な計算になる。注目すべき点は、この「ある程度構造が見えるモノ」であるかどうかの判断だ。総じていうと、割とデータの連係がランダムジャンプに近いWeb系のデータと違って、基幹系のデータ構造は、構造が整理されているものが多い。

かつ、業務系の処理では繰り返し計算ができないようなケースも多い、というかほとんどです。例えば、在庫引当の連鎖処理とかが良い例だ。
在庫ノードの計算順序で引当処理の内容が変わるようではさすがに問題になる。

繰り返しの計算ができないような場合(すなわち計算結果が変わってしまうような)は計算開始条件を入れる必要がある。

Mapper{
	List<Node> list
	list.map(canProcess()? process() -> Tuple<Key, UpperNode> : skip())
	emit list<Tuple>
	}

という風に、計算開始条件を設定することが必要になる。
計算が完了した段階で、自身をEndNodeと設定して、上位に伝播させる。
これにより同一のノードでの計算は一回で済む。(ただし、ご推察のとおり、特定にNodeに結線が集中するようなグラフ構造の場合はそのNodeが確実にボトルネックになる。ここは要注意。この場合はそもそも並列性が低いのでMapReduce自体の採用も再検討した方がよい)

canProcess()が開始条件になるけど、一番確実なのは、下位のNode単位で計算結果が存在するか確認しておくということだろう

canProcess(nodeList.traverse(isEnded)))
みたいな感じでも良いし、Nodeから結線にすべてフラグを設定しておいてもよい。

注意点は2点あって
ひとつは「計算が完了したかどうか」の定義を明確にしておくこと。これは計算をする必要がない、ということも明示的に計算完了と同じにしておかないといつまでたっても上位の計算が始まらないということになってしまう。

ふたつめは本当に開始条件を必要があるのかは、吟味した方が良いということ各ノードのデータに独立して計算てきる場合は、条件は入れない方がよいことが多いこれはコード自体はシンプルにして、できる限り力技で処理をした方がよいということを意味している。

経験的には基幹処理系のグラフデータモデルでは開始条件が割と明確に
各ノードに割り当てる事が可能であるので、この手法は割と有効ですね。

ソーシャルグラフのような、一定のグラフ構造が想定できないような場合は開始条件は入れることができないので、むしろ計算がノード順序に依存しないロジックを策定した方がよい。