CentOSでHadoopを使ってみる
__ __ __ / / / /___ _____/ /___ ____ ____ / /_/ / __ ‘/ __ / __ \/ __ \/ __ \ / __ / /_/ / /_/ / /_/ / /_/ / /_/ / /_/ /_/\__,_/\__,_/\____/\____/ .___/ /_/
インストール
- hadoopをcentosに入れてみる。最新バージョンは2011/11/25日の段階では0.23.0
- 各Linuxディストリビューションに対応済みのcdh3(Cloudera Distribution including Apache Hadoop v3)を入れる。cdh3の最新バージョンは0.20.0
- hadoopの他に愉快な仲間達のhive,pig,hbaseも入れる。
jdkのインストール
- hadoopはjavaで動くので当然必要となる。既にインストール済みの場合は不要。
- jdkのダウンロード
- jdkのインストール
$ wget http://download.oracle.com/otn-pub/java/jdk/7u1-b08/jdk-7u1-linux-x64.rpm $ sudo rpm -ivh jdk-7u1-linux-x64.rpm 準備中... ########################################### [100%] 1:jdk ########################################### [100%] Unpacking JAR files... rt.jar... jsse.jar... charsets.jar... tools.jar... localedata.jar...環境変数の設定
export JAVA_HOME=/usr/java/default
- 設定変更を反映と確認をする。
$ source /etc/profile $ echo $JAVA_HOME /usr/java/defaultreposの登録
$ wget http://archive.cloudera.com/redhat/cdh/cloudera-cdh3.repo $ sudo mv cloudera-cdh3.repo /etc/yum.repos.d/ $ sudo yum update yum $ yum search hadoop Loaded plugins: downloadonly, fastestmirror cloudera-cdh3 81/81 ===================================================== Matched: hadoop ====================================================== flume.noarch : Flume is a reliable, scalable, and manageable distributed log collection application for collecting data such : as logs and delivering it to data stores such as Hadoop's HDFS. flume-master.noarch : The flume master daemon is the central administration and data path control point for flume nodes. flume-node.noarch : The flume node daemon is a core element of flume's data path and is responsible for generating, : processing, and delivering data. hadoop-0.20.noarch : Hadoop is a software platform for processing vast amounts of data hadoop-0.20-conf-pseudo.noarch : Hadoop installation in pseudo-distributed mode hadoop-0.20-datanode.noarch : Hadoop Data Node hadoop-0.20-debuginfo.i386 : Debug information for package hadoop-0.20 hadoop-0.20-debuginfo.x86_64 : Debug information for package hadoop-0.20 hadoop-0.20-doc.noarch : Hadoop Documentation hadoop-0.20-fuse.i386 : Mountable HDFS hadoop-0.20-fuse.x86_64 : Mountable HDFS hadoop-0.20-jobtracker.noarch : Hadoop Job Tracker hadoop-0.20-libhdfs.i386 : Hadoop Filesystem Library hadoop-0.20-libhdfs.x86_64 : Hadoop Filesystem Library hadoop-0.20-namenode.noarch : The Hadoop namenode manages the block locations of HDFS files hadoop-0.20-native.i386 : Native libraries for Hadoop Compression hadoop-0.20-native.x86_64 : Native libraries for Hadoop Compression hadoop-0.20-pipes.i386 : Hadoop Pipes Library hadoop-0.20-pipes.x86_64 : Hadoop Pipes Library hadoop-0.20-sbin.i386 : Binaries for secured Hadoop clusters hadoop-0.20-sbin.x86_64 : Binaries for secured Hadoop clusters hadoop-0.20-secondarynamenode.noarch : Hadoop Secondary namenode hadoop-0.20-source.noarch : Source code for Hadoop hadoop-0.20-tasktracker.noarch : Hadoop Task Tracker hadoop-hbase.noarch : HBase is the Hadoop database. Use it when you need random, realtime read/write access to your Big : Data. This project's goal is the hosting of very large tables -- billions of rows X millions of : columns -- atop clusters of commodity hardware. hadoop-hbase-doc.noarch : Hbase Documentation hadoop-hbase-master.noarch : The Hadoop HBase master Server. hadoop-hbase-regionserver.noarch : The Hadoop HBase RegionServer server. hadoop-hbase-thrift.noarch : The Hadoop HBase Thrift Interface hadoop-hive.noarch : Hive is a data warehouse infrastructure built on top of Hadoop hadoop-hive-metastore.noarch : Shared metadata repository for Hive. hadoop-hive-server.noarch : Provides a Hive Thrift service. hadoop-pig.noarch : Pig is a platform for analyzing large data sets hadoop-zookeeper.noarch : A high-performance coordination service for distributed applications. hadoop-zookeeper-server.noarch : The Hadoop Zookeeper server hue.noarch : The hue metapackage hue-common.i386 : A browser-based desktop interface for Hadoop hue-common.x86_64 : A browser-based desktop interface for Hadoop hue-filebrowser.noarch : A UI for the Hadoop Distributed File System (HDFS) hue-jobbrowser.noarch : A UI for viewing Hadoop map-reduce jobs hue-jobsub.noarch : A UI for designing and submitting map-reduce jobs to Hadoop hue-plugins.noarch : Hadoop plugins for Hue hue-shell.i386 : A shell for console based Hadoop applications hue-shell.x86_64 : A shell for console based Hadoop applications mahout.noarch : A set of Java libraries for scalable machine learning. oozie.noarch : Oozie is a system that runs workflows of Hadoop jobs. sqoop.noarch : Sqoop allows easy imports and exports of data sets between databases and the Hadoop Distributed File System : (HDFS).yum install
- とりえあず本体だけ入れる。
$ su $ yum install hadoop-0.20 -y
- 1台で動かす設定を入れる。
$ yum install hadoop-0.20-conf-pseudo -y
- 上のコマンドを実行すると依存パッケージが入る。これらはデーモンプロセスを起動するために必要。
- hive,pig,hbaseを入れる。
$ yum install hadoop-hive -y $ yum install hadoop-pig -y $ yum install hadoop-hbase -yhadoopの起動
- 依存して入ったパッケージによるデーモンプロセスを起動。
$ /etc/init.d/hadoop-0.20-datanode start $ /etc/init.d/hadoop-0.20-namenode start $ /etc/init.d/hadoop-0.20-tasktracker start $ /etc/init.d/hadoop-0.20-jobtracker start $ /etc/init.d/hadoop-0.20-secondarynamenode start #これは最初は不要
HDF(Hadoop Distributed File System)
- HDFSを一言で表すならHadoop上のファイルを効率よく安全に管理するためのファイルシステム。
- Map/Reduceの仕組みを利用し、データの計算を分散サーバ上で行い、結果をネットワークを介して取得する。
- NameNodeとDataNodeの2週類サーバで構成。
- NameNodeサーバにファイルのメタ情報(どのサーバのどこのディレクトリに何のファイルのアクセス権で設定されているかなど)を格納。
- DataNodeサーバにデータファイルを格納。(ファイルはブロックという特定サイズで分割し格納。)
- 設定ファイルのディレクトリ:/etc/hadoop-0.20/conf.pseudo/
alias設定
- hdfsのコマンドは長くなってしまうので、以下の1行を$HOME/.bashrc等にaliasとして張っておくと便利。
alias hdfs='/usr/bin/hadoop dfs'hdfs上にフォルダを作成
$ hdfs -mkdir HDFS_TEST01 $ hdfs -ls hdfs -ls Found 1 items drwxr-xr-x - yuta supergroup 0 2011-12-04 22:36 /user/yuta/HDFS_TEST01hdfsにファイルをコピー
- ローカルで作成したファイルをhdfs上に配る。
$ cat hdfs_input/input haddop1 haddop2 haddop3 haddop4 haddop5 $ hdfs -put hdfs_input/input HDFS_TEST01hdfs上のファイルを確認
$ hdfs -cat HDFS_TEST01/input hadoop1 hadoop2 hadoop3 hadoop4 hadoop5hdfs上のファイルを取得
$ hdfs -get HDFS_TEST01/input output $ cat output/input hadoop1 hadoop2 hadoop3 hadoop4 hadoop5
Map/Reduce
- Map/Reduceは正確に言うとMap/Shuffle/Reduce処理に分けられる。Shuffleは内部的に自動で行われる。
- MapはHashを作成することをイメージする。例えば英語テキスト中の単語のカウントをする場合、単語を区切り、各単語に1という数値を割り当てる。その際に{key,value}というペアでデータを持つ。
- Shuffleはkey順でのsortと重複しないようにユニークなkeyに対してvalueを割り当てる。shuffleの段階ではkeyの重複を削るだけ。まだReduce処理はしない。
- Reduceでkeyに対するvalueを整形する。例えば単語カウントの場合、shuffleされた各value値を加算する。
円周率を計算してみる
- サンプルとして存在するMap/Reduceのpi計算を行う。5というのがMapの数。
$ hadoop jar /usr/lib/hadoop-0.20/hadoop-examples.jar pi 5 2000 Number of Maps = 5 Samples per Map = 2000 Wrote input for Map #0 Wrote input for Map #1 Wrote input for Map #2 Wrote input for Map #3 Wrote input for Map #4 Starting Job 11/12/04 23:08:40 INFO mapred.FileInputFormat: Total input paths to process : 5 11/12/04 23:08:41 INFO mapred.JobClient: Running job: job_201112042137_0001 11/12/04 23:08:42 INFO mapred.JobClient: map 0% reduce 0% 11/12/04 23:09:00 INFO mapred.JobClient: map 20% reduce 0% 11/12/04 23:09:01 INFO mapred.JobClient: map 40% reduce 0% 11/12/04 23:09:14 INFO mapred.JobClient: map 60% reduce 0% 11/12/04 23:09:17 INFO mapred.JobClient: map 80% reduce 0% 11/12/04 23:09:23 INFO mapred.JobClient: map 100% reduce 0% 11/12/04 23:09:45 INFO mapred.JobClient: map 100% reduce 100% 11/12/04 23:09:51 INFO mapred.JobClient: Job complete: job_201112042137_0001 11/12/04 23:09:51 INFO mapred.JobClient: Counters: 23 11/12/04 23:09:51 INFO mapred.JobClient: Job Counters 11/12/04 23:09:51 INFO mapred.JobClient: Launched reduce tasks=1 11/12/04 23:09:51 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=67682 11/12/04 23:09:51 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 11/12/04 23:09:51 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 11/12/04 23:09:51 INFO mapred.JobClient: Launched map tasks=5 11/12/04 23:09:51 INFO mapred.JobClient: Data-local map tasks=5 11/12/04 23:09:51 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=46591 11/12/04 23:09:51 INFO mapred.JobClient: FileSystemCounters 11/12/04 23:09:51 INFO mapred.JobClient: FILE_BYTES_READ=116 11/12/04 23:09:51 INFO mapred.JobClient: HDFS_BYTES_READ=1170 11/12/04 23:09:51 INFO mapred.JobClient: FILE_BYTES_WRITTEN=324508 11/12/04 23:09:51 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=215 11/12/04 23:09:51 INFO mapred.JobClient: Map-Reduce Framework 11/12/04 23:09:51 INFO mapred.JobClient: Reduce input groups=2 11/12/04 23:09:51 INFO mapred.JobClient: Combine output records=0 11/12/04 23:09:51 INFO mapred.JobClient: Map input records=5 11/12/04 23:09:51 INFO mapred.JobClient: Reduce shuffle bytes=140 11/12/04 23:09:51 INFO mapred.JobClient: Reduce output records=0 11/12/04 23:09:51 INFO mapred.JobClient: Spilled Records=20 11/12/04 23:09:51 INFO mapred.JobClient: Map output bytes=90 11/12/04 23:09:51 INFO mapred.JobClient: Map input bytes=120 11/12/04 23:09:51 INFO mapred.JobClient: Combine input records=0 11/12/04 23:09:51 INFO mapred.JobClient: Map output records=10 11/12/04 23:09:51 INFO mapred.JobClient: SPLIT_RAW_BYTES=580 11/12/04 23:09:51 INFO mapred.JobClient: Reduce input records=10 Job Finished in 72.311 seconds Estimated value of Pi is 3.1408000000000000000072.311sも掛かりながらこの精度ですか....
Map/Reduceを自分で書く
- 以下ではHadoop拡張のHadoopStreamingを用いて標準入出力を介するプログラム処理を記述する。
- 単語の出現回数を計算する処理を自分で書く。言語はPythonを利用する。
- map.py reduce.pyの2種類を用意する
map.py/reduce.pyの作成
- map.py
#!/usr/bin/env python import sys for line in sys.stdin: line = line.strip() words = line.split() for word in words: print '%s\t%s' % (word, 1)
- reduce.py
#!/usr/bin/env python from operator import itemgetter import sys word2count = {} for line in sys.stdin: line = line.strip() word, count = line.split('\t', 1) try: count = int(count) word2count[word] = word2count.get(word, 0) + count except ValueError: pass sorted_word2count = sorted(word2count.items(), key=itemgetter(0)) for word, count in sorted_word2count: print '%s\t%s'% (word, count)
- map.pyのテスト。keyに対するvalueが設定される。
$ echo 'c c++ java python perl php javascript java python' | /home/yuta/work/dev/hadoop/map_reduce/map.py c 1 c++ 1 java 1 python 1 perl 1 php 1 javascript 1 java 1 python 1
- reduce.pyのテスト。keyに対してvalueの値が束ねられている。
$ echo 'c c++ java python perl php javascript java python' | /home/yuta/work/dev/hadoop/map_reduce/map.py | sort | /home/yuta/work/dev/hadoop/map_reduce/reduce.py c 1 c++ 1 java 2 javascript 1 perl 1 php 1 python 2テキストデータをHDFS上にコピー
- グーテンバーグ上のテキストをコピーして実行。
$ mkdir gutenberg $ cd gutenberg $ wget http://www.gutenberg.org/cache/epub/20417/pg20417.txt $ wget http://www.gutenberg.org/cache/epub/5000/pg5000.txt $ wget http://www.gutenberg.org/cache/epub/4300/pg4300.txt $ cd .. $ hdfs -copyFromLocal gutenberg gutenberg $ hdfs -ls gutenberg Found 3 items -rw-r--r-- 1 yuta supergroup 674566 2011-12-04 23:56 /user/yuta/gutenberg/pg20417.txt -rw-r--r-- 1 yuta supergroup 1573150 2011-12-04 23:56 /user/yuta/gutenberg/pg4300.txt -rw-r--r-- 1 yuta supergroup 1423801 2011-12-04 23:56 /user/yuta/gutenberg/pg5000.txthadoopでmap.py/reduce.pyを実行
$ hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar -mapper /home/yuta/work/dev/hadoop/map_reduce/map.py -reducer /home/yuta/work/dev/hadoop/map_reduce/reduce.py -input gutenberg/* -output gutenberg-output packageJobJar: [/var/lib/hadoop-0.20/cache/yuta/hadoop-unjar3531972439212126610/] [] /tmp/streamjob4995151306966509636.jar tmpDir=null 11/12/04 23:57:14 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 11/12/04 23:57:14 WARN snappy.LoadSnappy: Snappy native library not loaded 11/12/04 23:57:14 INFO mapred.FileInputFormat: Total input paths to process : 3 11/12/04 23:57:15 INFO streaming.StreamJob: getLocalDirs(): [/var/lib/hadoop-0.20/cache/yuta/mapred/local] 11/12/04 23:57:15 INFO streaming.StreamJob: Running job: job_201112042137_0003 11/12/04 23:57:15 INFO streaming.StreamJob: To kill this job, run: 11/12/04 23:57:15 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job -Dmapred.job.tracker=localhost:8021 -kill job_201112042137_0003 11/12/04 23:57:15 INFO streaming.StreamJob: Tracking URL: http://localhost.localdomain:50030/jobdetails.jsp?jobid=job_201112042137_0003 11/12/04 23:57:16 INFO streaming.StreamJob: map 0% reduce 0% 11/12/04 23:58:20 INFO streaming.StreamJob: map 100% reduce 100% 11/12/04 23:58:20 INFO streaming.StreamJob: To kill this job, run: 11/12/04 23:58:20 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job -Dmapred.job.tracker=localhost:8021 -kill job_201112042137_0003 11/12/04 23:58:20 INFO streaming.StreamJob: Tracking URL: http://localhost.localdomain:50030/jobdetails.jsp?jobid=job_201112042137_0003 11/12/04 23:58:20 ERROR streaming.StreamJob: Job not successful. Error: NA 11/12/04 23:58:20 INFO streaming.StreamJob: killJob... Streaming Command Failed!処理に失敗している.... 調べてみたところfileオプションを付ける必要があるみたい。
- fileオプションを付けて再チャレンジ
$ hdoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar -mapper /home/yuta/work/dev/hadoop/map_reduce/map.py -reducer /home/yuta/work/dev/hadoop/map_reduce/reduce.py -input gutenberg/* -output gutenberg-output -file /home/yuta/work/dev/hadoop/map_reduce/map.py -file /home/yuta/work/dev/hadoop/map_reduce/reduce.py packageJobJar: [/home/yuta/work/dev/hadoop/map_reduce/map.py, /home/yuta/work/dev/hadoop/map_reduce/reduce.py, /var/lib/hadoop-0.20/cache/yuta/hadoop-unjar8731079524882793743/] [] /tmp/streamjob400379298028508245.jar tmpDir=null 11/12/05 00:26:29 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 11/12/05 00:26:29 WARN snappy.LoadSnappy: Snappy native library not loaded 11/12/05 00:26:29 INFO mapred.FileInputFormat: Total input paths to process : 3 11/12/05 00:26:29 INFO streaming.StreamJob: getLocalDirs(): [/var/lib/hadoop-0.20/cache/yuta/mapred/local] 11/12/05 00:26:29 INFO streaming.StreamJob: Running job: job_201112042137_0007 11/12/05 00:26:29 INFO streaming.StreamJob: To kill this job, run: 11/12/05 00:26:29 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job -Dmapred.job.tracker=localhost:8021 -kill job_201112042137_0007 11/12/05 00:26:29 INFO streaming.StreamJob: Tracking URL: http://localhost.localdomain:50030/jobdetails.jsp?jobid=job_201112042137_0007 11/12/05 00:26:31 INFO streaming.StreamJob: map 0% reduce 0% 11/12/05 00:26:47 INFO streaming.StreamJob: map 33% reduce 0% 11/12/05 00:26:51 INFO streaming.StreamJob: map 67% reduce 0% 11/12/05 00:27:02 INFO streaming.StreamJob: map 100% reduce 0% 11/12/05 00:27:11 INFO streaming.StreamJob: map 100% reduce 100% 11/12/05 00:27:18 INFO streaming.StreamJob: Job complete: job_201112042137_0007 11/12/05 00:27:18 INFO streaming.StreamJob: Output: gutenberg-outputWarningが出ているけれども取りあえずは成功。
- outputファイルを確認する。
$ hdfs -cat gutenberg-output/part-00000 | head -n 20 "(Lo)cra" 1 "1490 1 "1498," 1 "35" 1 "40," 1 "A 2 "AS-IS". 1 "A_ 1 "Absoluti 1 "Alack! 1 "Alack!" 1 "Alla 1 "Allegorical 1 "Alpha 1 "Alpha," 1 "Alpine-glow" 1 "An 2 "And 3 "Antoni 1 "At 1Map/Reduceが成功して、wordの出現回数が記録されている。
Map/Reduceを最適化する
- map.py
#!/usr/bin/env python import sys def read_input(file): for line in file: yield line.split() def main(separator='\t'): data = read_input(sys.stdin) for words in data: for word in words: print '%s%s%d' % (word, separator, 1) if __name__ == "__main__": main()
- reduce.py
#!/usr/bin/env python from itertools import groupby from operator import itemgetter import sys def read_mapper_output(file, separator='\t'): for line in file: yield line.rstrip().split(separator, 1) def main(separator='\t'): data = read_mapper_output(sys.stdin, separator=separator) for current_word, group in groupby(data, itemgetter(0)): try: total_count = sum(int(count) for current_word, count in group) print "%s%s%d" % (current_word, separator, total_count) except ValueError: # count was not a number, so silently discard this item pass if __name__ == "__main__": main()
Hadoopのエラー
Name node is in safe mode.Resources are low on NN. Safe mode must be turned off manually.
何かしらのエラーでHadoopがSafe Modeに切り替わってしまっているみたい。以下のコマンドでSafe Modeが解除できる。
$ hadoop dfsadmin -safemode leave Safe mode is OFF $ hadoop dfsadmin -safemode get Safe mode is OFFWARN hdfs.DFSClient: DataStreamer Exception: org.apache.hadoop.ipc.RemoteException: java.io.IOException:could only be replicated to 0 nodes, instead of 1
このエラーはhttp://localhost:50070/にアクセスすると直る。
今後学習しないといけないこと
時間がある時に次のことをまとめようと思う。
- 分散grep
- 分散sort
- 分散キャッシュ