Asakusa on Spark

Asakusa on Spark

AsakusaがSpark上で動くようになりました。
Asakusa on Spark (Developer Preview) — Asakusa Framework Developer Preview 0.2.2 documentation

すでに実際に本番に利用しています。
ノーチラス・テクノロジーズがさくらインターネットにAsakusa Frameworkで開発した大規模データの高速処理基盤を導入し、顧客単位での精度の高い原価計算を実現高速処理基盤はApache Spark™で構築 | NAUTILUS

OSSとしての公開を行いましたので、内容や位置づけをまとめておきます。例によってノーチラスは社内でいろんな意見は当然出ていますが、今回は概ね一致している感じです。

  • パフォーマンス

概ね「業務バッチ処理という観点で見れば、すべからくHadoopMapReduceより、Sparkのほうが高速に処理を終えることができる」というのが結論になっています。ノーチラスの持っているユースケースでは、ほぼすべてのケースでパフォーマンス改善が見られました。おおよそ3倍から5倍の程度の実行時間の短縮が達成されています。これは一度のバッチで処理するデータサイズが、概ね数百GByte以下程度で、かつ、かなりの程度で複雑な処理になっているケースでのパフォーマンスになっています。さすがに非常に単純な処理(例:単純集計一発だけ)で、一度に処理するデータサイズが概ね500Gbyte以上のケースでは、MapReduceに軍配があがると思いますが、そのケースですらSparkもそれなりのパフォーマンスが出しつつあります。

なぜSparkにパフォーマンス優位があるか、というと、これは割と単純にHadoopMapReduceのオーバーヘッドが大きく、これをSparkでは取り除いているということにつきるでしょう。巷では(というかSparkの公式サイトでは)オンメモリー処理によりパフォーマンスが出ているといわれているが多いのですが、実際はそうでもありません。もっともIOコストのかかるノード間のシャッフルデータ転送時のディスク強制書き出しはHadoopMapReduceと同じであり、同じようにコストがかかります。Sparkでいうところの、オンメモリー処理というのは、繰り返し処理のときに明示的にキャッシュが使えるということだと思いますが、現実の処理では同じデータの繰り返し処理が中心でできているバッチがそうそうあるわけではなく、繰り返し処理で明示キャッシュが使えるからといってパフォーマンスが一般に10倍とかになるわけがありません。冷静に考えれば普通におかしいとわかる話なのですが、トレンディーな技術や会社が、VCからお金を集めるときにはよくあるストレッチの話なので、まぁ仕方がない話ですわね。

課題になっていたHadoopMapReduceのオーバーヘッドとは何かというと、自分の見るところでは大きく二つあって、一つはすべての処理を無理矢理Map・Reduceの形にしなければならない、という点です。これはよく言われるところではありますが、HadoopMapReduceではすべての処理をMap・Reduceの形に変形し、かつこの順序で実行する場合はどうしても無駄が発生します。もちろん、これはMap/Reduceの形にすることによって並列分散処理が実行しやすい、というメリットの裏返しですが、いろいろな処理をしていくようになってくると、さすがにデメリットが目立ちます。二つ目はMap・Reduceのタスク処理が実態としてはそれぞれが独立したjvmアプリケーションになっている点です。Map・Reduceのタスクが行われるたびにアプリケーションの起動・終了が行われるわけで、jvmの再利用オプションがあるとはいえ、このオーバーヘッドはかなり大きい。

上記の二点は大規模データに対する単純処理であれば、それほどコストになりませんが、DAGベースで1000ステージを超えるようなケースであれば、下手をすると総コストの5割がこのオーバーヘッドになることがあります。Sparkでは、上記の課題をきれいにとりはらっています。すなわち、処理を無理にMap・Reduceの形での順序実行をしているわけでもなく、またジョブ実行自体を普通に一つのアプリケーションとして管理しています。したがって、単純にHadoopMapReduceからSparkに変更するだけで、オーバーヘッドのコストが削減されることになってしまっています。

  • HadoopMapReduceの終焉

かなりの大規模なデータ処理のセグメントを除いて、現状のSparkはHadoopMapReduceと比較する限りにおいては、ほぼ一方的に優れていると言って良いでしょう。今後はHadoopMapReduceで動いているほぼ大部分の業務系の処理はSparkに(可能であれば)移行すると思います。Hadoopが登場した時分では、大規模データ(特にweblogやlifelog)の一斉集計が主要用途でしたが、現状では、データに対する操作は単純なGroupByではなく、さまざまな操作を要求されつつあります。このような状況で、パフォーマンスで見れば、HadoopMapReduceで動かし続けるデメリットは余りにも大きい。MapReduceに対する改善はすでに小規模なもののみとなっており、Sparkから比べて処理時間が3〜5倍かかることを鑑みても、HadoopMapReduce上でのアプリケーションに継続投資を行うことはあまりにも効率が悪い。もちろん大量データの単純集計であれば、まだまだHadoopMapReduceに分があるので、そのような仕組みはそのまま運用していればよい話で、無理やりSparkに移行する必要もないでしょう。

特に、我々がフォーカスしているような、特に企業の業務系のバッチ処理ではHadoopMapReduceを利用するメリットはほぼゼロです。データサイズや処理の複雑性から見ても、Sparkに軍配があがります。仮に、HadoopMapReduceからSparkに移行できないとすれば、それはあまりに大量の処理を裸のMapReduceで書きすぎたということになるのではないでしょうか? 4年前のAsakusaのリリース当初から、MapReduceを裸で書くことはアセンブラで処理を記述することとそれほど大差はなく、ほどなくメンテできなくなるだろうし、デメリットがどんどん大きくなりますよと、わりと声を大にして言ってきたが、現実となりつつある感もします。HadoopMapReduceは、ほぼいわゆる「レガシー」資産になりつつあります。

  • Sparkは無敵なのか?

ではSparkはそれほど無敵なのか?ということですが、当たり前ですが、無敵ではないです。(むしろHadoopが隙だらけだったという方が正しい)

・設定やチューニングが面倒
現状ではたいていの場合は、パフォーマンスが出ずに玉砕することが多いでしょう。加えて安定して稼動させるには経験が必要です。これはもうHadoopMapReduceのメリット(設定が分散処理基盤のわりには簡単)とデメリット(パフォーマンスの上限が割りと簡単に出てしまう)の交換に見えます。Sparkではパフォーマンスを出すために、アレやコレやパラメータを設定して、パフォーマンスを出すために試行錯誤する必要があります。これをアプリケーションごとに行う必要があります。さらにYarnで実行させるには、Yarnの設定も重要になる、ということで、これは確かに面倒です。広範囲にわたって整合性のとれた設定が必要であり、これはなかなか難易度が高いです。徐々にノウハウが共有化されていますが、できることが増えるということのデメリットはゼロにはならないでしょう。とはいえ、これはまさに時間の問題ですね。

・基本的なアーキテクチャの問題。
これはいろいろ意見がありますね。

ひとつは各処理の論理的な出力ポートがひとつであることです。ほぼ似たようなミドルのTezは複数取れますね。処理をブランチさせるようなフロー制御を行う場合は、基本的に無理が発生します。実際、Asakusaの実行基盤のターゲットとするときも問題になりました。Asakusaではあの手この手で解決しているが、普通に実装するのはかなり無理筋になります。

ふたつめはShuffle時点の強制書き出し。言うまでもなく、これはいちいちコストがかかるので。とはいえ、賛否両論です。特に処理の終盤で、前半戦で利用したデータをもう一度使うような場合があれば、書き出しは有効です。放っておけばそのままメモリーを占有してしまうからね。また、改めて言うまでもないですが、ノード・フェイルにも当然強い。そもそもRDDの発想からすると、書き出しは発想の根本にあるものなので、反対するやつは使うなという話にもなるようです。(んじゃー、サイトに堂々とin memory 処理とか書くんじゃねーよ、とか思いますが・・。実際、早とちりしている人も多数いますし)

とはいえ、どこで何を使うかは処理全体をコンパイルする時点である程度目星はつくので、ちゃんと見渡せる仕組みがあれば、Shuffleをいちいち書き出すのは無駄であることは間違いないでしょう。必要なときだけ書ければ十分だと思います。今後のメモリーの容量は太陽系の大きさ(比喩)まで広がる可能性があるので、plannerが賢くなった、しっかりした分散処理の管理基盤がでてきれば(まだないけど)たぶんSparkはわりとあっさり負けます。・・・当分先だと思いますが、具体的にいうとRack Scale Architectureベースのメニーコア前提の、割とかっちりした処理基盤が出たときには、かなり簡単に水を空けられるよなあ・・・とか思います。もちろん、その場合はHadoopは文字通り象なみのスピードの扱いになるとは思いますが。

さて、今回はAsakusaの開発陣の頑張りもあって、かなり大幅なアーキテクチャの変更が行われています。従前のAsakusaのコンセプトを維持しつつ、コンパイラがほぼ全面的に書きなおしになっています。従来のAsakusaはAsakusaDSLから最適なMapReduceプログラムを生成する仕組みになっていましたが、新しいコンパイラは一旦、DAGの中間構造を生成し、そのDAGの中間データからSparkに最適なバイトコードを生成する仕組みになっています。つまり、今後Spark以外の「より高速なDAGの実行エンジン」が出てきたときには、Sparkのバイトコード生成部分のみを入れ替えるだけで、新たな実行環境に対応することが可能になっています。現状、ノード分散環境化での実行形式の標準がDAGになりつつあるのは、周知の通りであり、DAGの実行エンジンはTezやFlinkを見るまでもなく、今後もいろいろ出てくるだろうとみています。Asakusaはそのエンジンに対応しやすくするために、今回根本のアーキテクチャを再構築しています。

これは、Asakusaの意味合いの変化をもたらすと思っています。ユーザーはAsakusaDSLで処理を記述しておけば、今後、より高速な処理エンジンが開発されてきたとしても、Asakusaが対応しさえすれば、そのメリットを享受できるということになります。特にソースコード・テストデータを「まったく変更することなし」に新たな高速環境に移行できるというメリットは非常に大きい。個人的には、テストデータ・テスト環境をそのまま持ってこれるのは、大きいと思っています。これは業務系アプリケーションの投資可搬性を大幅に向上させることになります。Asakusaにより業務システム投資サイクルと分散環境のライフサイクルのギャップを埋めることが可能になると思っています。

  • 今後

まぁこんな感じです。我々としては、Better HadoopとしてSparkを利用している、というのと、Sparkに対応するのと同時に、その先も見ながらAsakusaのアーキテクチャを予定通りに変えました、というところです。正直ベースで、Spark、かなり速いです。予想よりもいいですね・・・

Sparkをはじめ、今後の分散環境は高速化がますます進むでしょう。いろいろとできることが増えると思います。ベースデータレイヤーとしてのHDFS-APIは鉄板だと思いますので、HDFS互換にデータ溜めておいて、処理基盤だけほいほい取り替えるとパフォーマンスが勝手にあがるという感じになるでしょう。とはいえ、アプリケーションレイヤーから見るとシステム投資のライフサイクルと実行基盤のライフサイクルのギャップが進行しますが、その辺を埋めるのがAsakusaって感じになるといいなぁと思っています。


・・・って気がついたら一年ぐらい放置してたので、ちょっと反省してます。某プロジェクトにどっぷりだったので・・・
今後はできれば2ヶ月に1回ぐらいは更新したいと思います。すみません。