夢とガラクタの集積場

落ちこぼれ三流エンジニアである管理人の夢想=『夢』と、潰えた夢=『ガラクタ』の集積場です。

Apache Sparkってどんなものか見てみる(その1

こんにちは。

Kafkaを試している最中で微妙ですが、最近使えるのかなぁ、と情報を集めているのが「Apache Spark」です。

MapReduceと同じく分散並行処理を行う基盤なのですが、MapReduceよりも数十倍速いとかの情報があります。
・・・んな阿呆な、とも思ったのですが、内部で保持しているRDDという仕組みが面白いこともあり、
とりあえず資料や論文を読んでみることにしました。

まず見てみた資料は「Overview of Spark」(http://spark.incubator.apache.org/talks/overview.pdf)です。
というわけで、読んだ結果をまとめてみます。

Sparkとは?

高速でインタラクティブな言語統合クラスタコンピューティング基盤

Sparkプロジェクトのゴールは?

1.インタラクティブなアルゴリズム(機械学習、グラフ描画)
2.インタラクティブなデータマイニング

  • よりプログラムしやすくすること

1.Scalaへの統合
2.Scalaインタプリタを用いてより動的に実行しやすくする

Sparkを開発したモチベーション

現在出回っているクラスタコンピューティングモデルは「ストレージから取得→結果をストレージに出力」となっており、非循環的。
#但し、ストレージに存在するためにどこでタスクを実行するか、やエラー復旧は自動でやりやすくなっていはいる。

だが、上記のモデルはデータセットを繰り返し使うような以下のモデルに適用するには非効率。
なぜなら、現状のフレームワークはクエリ実行ごとに毎回ストレージからデータをロードしているため。
1.インタラクティブなアルゴリズム(機械学習、グラフ描画)
2.インタラクティブなデータマイニング(R、Excel、Python等)

上記問題への解:Resilient Distributed Datasets(RDDs)

繰り返し利用するデータについてはメモリ上に保持することが可能な機構。
MapReduceが保持していた以下の優れた特性はそのまま引き継いでいる。
・耐障害性、データ局所性、スケーラビリティ

その上で、広いアプリケーションをサポートしている。

Sparkのプログラミングモデル

Resilient Distributed Datasets(RDDs)は以下の性質を持つ。
・イミュータブルで分割されたオブジェクトのコレクション
・並列処理(map、filter、groupBy、join)をストレージ上のデータに適用した結果生成
・再利用するためにメモリ上にキャッシュされる

RDDsに対して実行可能なアクション
・Count、reduce、collect、save・・・

  • 実行例:ログマイニング

ログファイルからエラーメッセージをメモリ上にロードし、様々なパターンに対してインタラクティブな検索を可能とする。
以下のような処理を1TBのデータに対して行う際に、ストレージ上のデータを用いた場合には170秒かかるが、RDDを用いた方式だと5〜7秒で実行可能。

lines = spark.textFile(“hdfs://...”) // ログファイルをロード(Base RDD)
errors = lines.filter(_.startsWith(“ERROR”)) // ERRORの行で始まるデータを抽出(Transformed RDD)
messages = errors.map(_.split(‘\t’)(2)) // タブ文字で分割する
cachedMsgs = messages.cache() // タブ文字で分割したメッセージをキャッシュ

cachedMsgs.filter(_.contains(“foo”)).count // キャッシュしたメッセージの中で"foo"を含む数を取得
cachedMsgs.filter(_.contains(“bar”)).count // キャッシュしたメッセージの中で"bar"を含む数を取得

RDDの耐障害性

RDDはデータの欠損が発生した際の再構築に使用するためのデータの経緯(?)を保持している。
どのファイルからどう生成したかが残っているため、障害発生時にも再構築が可能。

  • 実行例:ロジスティック回帰

ゴール:2つのデータ群をうまく区切る線を見つけること

Sparkで実行した際のコードは以下の通り。

val data = spark.textFile(...).map(readPoint).cache() // データをロードし、メモリ上にキャッシュ
var w = Vector.random(D) // ランダムなVectorを生成

for (i <- 1 to ITERATIONS) { // イテレーション回数だけ回帰処理を実施
  val gradient = data.map(p => (1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x).reduce(_ + _)
  w -= gradient
}

println("Final w: " + w)

イテレーション数を変動させた場合の実行結果としては以下。
Hadoopと比してSparkは初回のロードこそ重いが、それ以降のイテレーション速度が向上するため、回帰するまで処理する・・・
というケースにおいては非常に高速になる。

Sparkのアプリケーション例

・インメモリに対するHiveによるデータマイニング(Conviva)
・予測分析(Quantifind)
・市街のトラフィック予測(Mobile Millennium)
・TwitterのSpan判定(Monarch)
・行列因子分解による協調フィルタリング

Sparkのアプリケーション例:Conviva GeoReport

以下の性質により多数のキーの統合処理を行う際に40倍の性能が出た。
・フィルタリングされたレコードや未使用カラムの再読み込みを回避
・解凍処理が繰り返されないように回避
・デシリアライズ化されたオブジェクトをメモリ上に保持

Spark上に構築されたフレームワーク
  • Pregel on Spark (Bagel)

・Googleメッセージ送受信からグラフを描画
・200ラインのコードで記述

  • Hive on Spark (Shark)

・Apache Hiveと互換性保持
・3000ラインのコードで記述

実現方法

リソースマネージメント基盤Apache Mesosの上で動作する。
Hadoopや他のアプリと同じように動作する。
HDFS等のHadoopのinput sourceを同様に読むことができる。
Scala compilerには特に手を入れていない。

Sparkスケジューラ

Dryadのような非循環グラフを用いて処理
ステージごとのパイプラインを構築して実行
キャッシュを有効にするために再利用/局所性を持った処理を構築
シャッフルを回避するためにパーティションを用いた構成

インタラクティブなSpark

以下の変更をScalaインタプリタに行うことでSparkをコマンドラインからインタラクティブに実行することが可能。
・読み込んだクラスの依存性解決を可能にするよう修正
・生成されたクラスをネットワーク越しに分配するよう修正

メモリが十分に確保できない場合のふるまい

メモリが足りない場合でもキャッシュができた分高速化の恩恵を受けることができる。

障害発生時の復旧時間

障害が発生したイテレーションでは時間がかかるが、成功した場合の1.5倍ほどで復旧。
他のイテレーションにはほぼ影響は出ない。

・・・と、こんな感じでした。
端的に言うとデータをロードした結果を必要な分メモリ上に確保したまま処理が可能なため、処理時間を高速化できるというわけですね。
ただ、メモリ上に確保したRDDは障害発生時も問題なく復旧するなどの要素を持っているそうです。
RDDの動作は肝になりそうなので、実際に動かす場合には事前に構造や仕組みをおさえておく必要がありそうですね。

あとはApache Mesosの上で動くという形のため、Hadoopと比べて1段階インストールしたり実行するために導入するものが多くなります。
このあたりは構築する際に大変そうな点・・・となってきますね。

ともあれ、高速化する理由、特徴ともにとりあえず概要はわかりました。
今後もちょくちょく資料を読んでまとめ、使うための下準備をしていこうと思います。