2019年10月10日木曜日

Intel MKLのようなネイティブな数学ライブラリでSpark機械学習アルゴリズムを加速する

  • このエントリーをはてなブックマークに追加

Intel MKLのようなネイティブな数学ライブラリでSpark機械学習アルゴリズムを加速する

最近、仕事上でSpark MLlibのSVD(特異値分解)でMovieLensの推薦を実装するアプリの性能調査をしております。思った以上遅くて困っていました。SparkのDriver, Executorのメモリ、Executor数、OverHeadなどいろんなSparkパラメーターを調整して試してきましたが、なかなか改善できていません。

そして更に調査したところ、そもそもSpark MllibのSVDの実装が古いらしく、性能が出ない情報がネット上で結構上がっています。Workaroundとしては、SVDの代わりにAlternating Least Squares(ALS)などを使うか、あるいはネイティブな数学演算ライブラリを使って高速化するか などがあります。

今回はHDP上でIntel MKLを利用するための設定方法をご紹介します。

まず、ClouderaのBlogを参照しています。CDHのクラスタの場合はMKLのParcelsファイルが提供されているため、Cloudera Managerから簡単にインストールすることができます。
Using Native Math Libraries to Accelerate Spark Machine Learning Applications
https://docs.cloudera.com/documentation/guru-howto/data_science/topics/ght_native_math_libs_to_accelerate_spark_ml.html#using_native_math_libs_with_spark_machine_learning

Native Math Libraries for Spark ML

Spark MLlibは、Breeze線形代数パッケージを使用しています。Breezeは最適化な数値処理を行うにはnetlib-javaというライブラリに依存する。 netlib-javaは、低レベルのBLAS、LAPACK、およびARPACKライ​​ブラリのラッパーです。しかし、ライセンス関連の制限でCDHのSpark,あるいはApache CommunityのSparkに既定でnetlib-javaのネイティブなプロキシが含まれていません。特に手動で何も設定していない場合、netlib-javaはF2JというJavaベースのライブラリを使います。

ネイティブな数学ライブラリを使っているか、JavaベースのF2Jを使っているか Spark-shellで確認できます。

scala> import com.github.fommil.netlib.BLAS
import com.github.fommil.netlib.BLAS

scala> println(BLAS.getInstance().getClass().getName())
com.github.fommil.netlib.F2jBLAS

com.github.fommil.netlib.F2jBLASが表示されている場合は、F2jを使っていることを確認できます。

Intel MKLのインストールと設定

Intel MKL(Intel’s Math Kernel Library)を利用することで、(Alternating Least Squares (ALS) algorithm),Latent Dirichlet Allocation (LDA), Primary Component Analysis (PCA), Singular Value Decomposition (SVD) といったアルゴリズムを使ったモデル訓練を高速化することができます。

Intel MKLインストール

各Sparkノードで実施
https://software.intel.com/en-us/articles/installing-intel-free-libs-and-python-yum-repo

# curl -O https://yum.repos.intel.com/mkl/setup/intel-mkl.repo > /etc/yum.repos.d/intel-mkl.repo
# sudo yum -y install intel-mkl-2019.3-062
intel-mkl-<VERSION>.<UPDATE>-<BUILD_NUM>の形式で指定

image.png
MKLのインストール先は/opt/intel/mkl になります。

mklWrapperのダウンロード

mklWrapper.jarとmkl_wrapper.soを以下のリンクからダウンロードする
https://github.com/Intel-bigdata/mkl_wrapper_for_non_CDH

Copy mkl_wrapper.jar to /opt/intel/mkl/wrapper/mkl_wrapper.jar
Copy mkl_wrapper.so to /opt/intel/mkl_wrapper/mkl_wrapper.so

Spark Configuration

# echo "/opt/intel/mkl/lib/intel64" > /etc/ld.so.conf.d/mkl_blas.conf
# ldconfig

AmbariでSpark-default設定を追加

propety value
spark.driver.extraClassPath /opt/intel/mkl/wrapper/mkl_wrapper.jar
spark.executor.extraClassPath /opt/intel/mkl/wrapper/mkl_wrapper.jar
spark.driver.extraJavaOptions -Dcom.github.fommil.netlib.BLAS=com.intel.mkl.MKLBLAS -Dcom.github.fommil.netlib.LAPACK=com.intel.mkl.MKLLAPACK
spark.executor.extraJavaOptions -Dcom.github.fommil.netlib.BLAS=com.intel.mkl.MKLBLAS -Dcom.github.fommil.netlib.LAPACK=com.intel.mkl.MKLLAPACK
spark.driverEnv.MKL_VERBOSE 1
spark.executorEnv.MKL_VERBOSE 1

実際の設定 Custom Spark2-defaults
image.png

spark.executor.extraJavaOptionsは Advanced spark2-defaultsにあります。すでにあった値の後ろに追加する
image.png

確認

image.png

com.intel.mkl.MKLBLASが表示されることを確認できます。

そして再度SVDを実行してみたところ、前に比べて30%〜35%の時間短縮することができました。
アルゴリズムによって加速する度合いが違いますので、ALSの場合は4.3倍も速いという結果が出ています。ぜひ試してみてください。

HDP3 Hive Warehouse connectorを使ってSparkからHiveテーブルにアクセスする

  • このエントリーをはてなブックマークに追加

HDP3 Hive Warehouse connectorを使ってSparkからHiveテーブルにアクセスする

HDP3の環境でSparkからHiveテーブルにアクセスする方法をご紹介します。

HDP以前のバージョンはSpark HiveContext/SparkSession を使ってHiveテーブルにアクセスしていますが、HDP3はHortonworksが開発したHive Warehouse Connector(HWC)を使ってアクセスすることができます。
以下の図の通り、HDP3でSparkとHiveそれぞれMetadataを持っています。お互いへのアクセスはHWC経由になります。
image.png

Hive Warehouse Connector

Hive LLAPを使ってSparkのDataFrameをHiveテーブルにWrite, HiveテーブルデータをDataFrameにReadするためのライブラリになっています。
Hive LLAPを有効にする必要があります。

HWCは以下のアプリケーションをサポートしています。

  • Spark-shell
  • Pyspark
  • Spark-submit

使い方としてはこちらをご参照ください。
HiveWarehouseSession API operations
https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.0.1/integrating-hive/content/hive_hivewarehousesession_api_operations.html

HWC使うための設定

設定手順は以下のリンクに詳しく書いてあります。
https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.0.1/integrating-hive/content/hive_configure_a_spark_hive_connection.html

基本的にAmbariでCustom spark-2-defaultsに以下のプロパティを設定する

Property Description Comments
spark.sql.hive.hiveserver2.jdbc.url URL for HiveServer2 Interactive In Ambari, copy the value from Services > Hive > Summary > HIVESERVER2 INTERACTIVE JDBC URL.
spark.datasource.hive.warehouse.metastoreUri URI for metastore Copy the value from hive.metastore.uris. For example, thrift://mycluster-1.com:9083.
spark.datasource.hive.warehouse.load.staging.dir HDFS temp directory for batch writes to Hive For example, /tmp.
spark.hadoop.hive.llap.daemon.service.hosts Application name for LLAP service Copy value from Advanced hive-interactive-site > hive.llap.daemon.service.hosts.
spark.hadoop.hive.zookeeper.quorum Zookeeper hosts used by LLAP Copy value from Advanced hive-sitehive.zookeeper.quorum.

実際の設定画面です
image.png

で、注意する必要があるのが、spark.sql.hive.hiveserver2.jdbc.urlです。
手順で直接Ambari上のHIVESERVER2 INTERACTIVE JDBC URLをコピーするってかいてありますが、実行時にPermission Errorのエラーが表示される場合があります。
例えばPysparkの場合

pyspark --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-1.0.0.3.1.0.0-78.jar --py-files /usr/hdp/current/hive_warehouse_connector/pyspark_hwc-1.0.0.3.1.0.0-78.zip


Error
py4j.protocol.Py4JJavaError: An error occurred while calling o72.executeQuery.
: java.lang.RuntimeException: java.io.IOException: shadehive.org.apache.hive.service.cli.HiveSQLException: java.io.IOException: org.apache.hadoop.hive.ql.metadata.HiveException: Failed to compile query: org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException: Permission denied: user [anonymous] does not have [USE] privilege on [default]
    at com.hortonworks.spark.sql.hive.llap.HiveWarehouseDataSourceReader.readSchema(HiveWarehouseDataSourceReader.java:130)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation$.apply(DataSourceV2Relation.scala:56)

image.png

解決策2つがあります。
1, HIVESERVER2 INTERACTIVE JDBC URLの後ろにユーザーを指定する。例えばuser=hive

jdbc:hive2://hdp-srv2.demotest.com:2181,hdp-srv1.demotest.com:2181,hdp-srv4.demotest.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2-interactive;user=hive

2, コードの中にHiveWarehouseSession作成時にユーザーを指定する。例:.userPassword(‘hive’,‘hive’)
ソースコードからヒントを得ました。
https://github.com/hortonworks/hive-warehouse-connector/blob/HDP-3.1.0.158/python/pyspark_llap/sql/session.py

from pyspark_llap import HiveWarehouseSession
hive = HiveWarehouseSession.session(spark).userPassword('hive','hive').build()
hive.setDatabase("default")
hive.executeQuery("select * from test_table").show()
hive.showTables().show()
hive.showDatabases().show()

Zeppelin Spark Interpreterから接続する際の設定

ZeppelinからHiveにアクセスする場合もZeppelin側での設定が必要です。
詳細の設定はこちら
https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.0.1/integrating-hive/content/hive_zeppelin_configuration_hivewarehouseconnector.html

設定例
image.png

ただし、実際にコード実行したら、以下のように pyspark_llapモジュールが見つかりません というエラーが出力されます。
image.png

調べたところ、Zeppelin側でspark.submit.pyfilesの設定がうまく動作しないバグがあるらしい。
回避策として、コートの中に直接モジュールファイルをインポートする。

image.png

これでspark-shell, pyspark, zeppelinから使えるようになります。