面白かったので、「Writing An Hadoop MapReduce Program In Python」の訳です(久しぶりです)。
HadoopのMapReduceプログラムをPythonで書く
動機
HadoopフレームワークはJavaで書かれていますが、Hadoop上で動くプログラムはJavaで記述する必要はありません。PythonやC++(バージョン0.14.1以降)のような他の言語で開発することができます。しかし、Hadoopのホームページのドキュメントや最も重要なPythonのサンプルコードを読むと、PythonコードをJythonを使ってJavaのjarファイルに変換しないといけないように考えてしまいます。明らかに、この方法はきわめて不便で、もし、Jythonが提供していないPythonの機能を使っている場合はともて問題です。Jythonのアプローチのもう一つの問題は、Hadoopと連携させてPythonプログラムを書くオーバーヘットです。<HADOOP_INSTALL>/src/examples/python/WordCount.pyを見れば、言っていることがわかると思います。Jythonのアプローチと、Pipeと呼んでいる新しいC++のMapReduce APIのアプローチを少なくとももう一度見てください。その違いはとても興味深いです。
このチュートリアルの目的は、HadoopのMapReduceプログラムをもっとPython的なやり方、つまり、おなじみの方法で書くことです。
やりたいこと
これから、Jythonを使ってJavaのjarファイルに変換せずにHadoopで動くPythonの簡単なMapReduceプログラムを書きます。
プログラムはWordCountのサンプルコードをまねて作ります。つまり、テキストファイルを読み込んで、単語の出現頻度を数えます。入力はテキストファイルで出力もテキストファイルです。出力するテキストファイルの各行は、単語と出現回数をタブ文字で区切って記述します。
注釈: このチュートリアルに書かれているテクニックを使えば、PerlやRubyなどのPython以外のプログラミング言語を使うこともできます。 「舞台裏でおこっていること」も書いているので、間違いがあれば教えてください。
準備
Hadoopのクラスタをセットアップして起動しておいてください。この文書ではHadoopのセットアップと起動方法は説明しません。Hadoopクラスタがない場合は、次のチュートリアルが構築するための手助けになるでしょう。チュートリアルはUbuntu向けに書かれていますが、他のLinux/Unix環境でも使えるでしょう。
- Ubuntu LinuxでHadoopを動かす (単一ノードクラスター): Ubuntu Linux上のHadoop分散ファイルシステム(HDFS)を使って単一ノードのHadoopクラスターをセットアップする方法です。
- Ubuntu LinuxでHadoopを動かす (複数ノードクラスター): Ubuntu Linux上のHadoop分散ファイルシステム(HDFS)を使って複数ノードのHadoopクラスターをセットアップする方法です。
PythonのMapReduceコード
次のPythonコードのトリックは、HadoopStreaming(wikiのエントリを見てください)を使ってMapとReduceのプログラム間PでSTDIN(標準入力)とSTDOUT(標準出力)を介してデータを受け渡しています。単純にPythonのsys.stdinから入力データを読み込んで、sys.stdoutに出力データを書き込んでいます。
他のことはHadoopStreamingが面倒を見てくれるので、やるべきことは以上です。不思議ですか?まあ、少なくとも、私もびっくりでした。
Map: mapper.py
次のコードを/home/hadoop/mapper.pyに保存してください。このプログラムはSTDIN(ヒョジュン入力)からデータを著見込んで、単語に分割します。そして、STDOUT(標準出力)に単語の出現回数(中間データ)を記述した行(複数行)を出力します。Mapスクリプトは単語の出現頻度の合計(中間データ)は計算しません。かわりに、入力途中に「<word>」が複数回出現しても「<word> 1」とすぐに出力します。そして、次に続くReduceが最終的な合計を計算します。もちろん、すきなようにプログラムの振る舞いを変えることができますが、説明するためにこのチュートリアルではそのままにしています。
#!/usr/bin/env pythonimport sys# input comes from STDIN (standard input)for line in sys.stdin:# 行頭と行末の空白を取り除くline = line.strip()# 行を単語に分割するwords = line.split()# カウンターを上げるfor word in words:# STDOUT (標準出力)に結果を書き込む;# ここで出力したものはReduce(つまりrecuder.py)での# 入力になる## タブ文字での分割; 単語の出現回数は 1print '%s\t%s' % (word, 1)
Reduce: reducer.py
次のコードを/home/hadoop/reducer.pyに保存してください。このコードはmapper.pyの結果をSTDIN(標準入力)から読み込んで、書く単語の出現回数を合計して結果をSTDOUT(標準出力)に出力します。
ファイルは実行権限があることを確認してください(chmod +x /home/hadoop/reducer.pyを実行すれば実行権限をつけられます)。実行権限がないと、問題が発生します。
#!/usr/bin/env pythonfrom operator import itemgetterimport sys# 単語の出現回数のマップword2count = {}# 入力はSTDINfor line in sys.stdin:# 行頭と行末の空白文字を除去line = line.strip()# mapper.pyの出力をパースするword, count = line.split('\t', 1)# 回数を文字列から数字に変換try:count = int(count)word2count[word] = word2count.get(word, 0) + countexcept ValueError:# 回数が数字でなければ# こっそり、この行はなかったことにして捨て去るpass# 単語をアスキーソートする## この処理は必要ないが、オフィシャルHadoopの単語の# カウントサンプルコードの最終出力に似せるために行う。sorted_word2count = sorted(word2count.items(), key=itemgetter(0))# STDOUT (標準出力)に結果を書き込むfor word, count in sorted_word2count:print '%s\t%s'% (word, count)
コードをテストする (cat data | map | sort | reduce)
MapRecudeジョブでテストする前に手動てmapper.pyとreducer.pyをテストすることをお進めします。そうでなければ、問題なく終了するかもしれませんが、ジョブの結果のデータが全く出力されなかったり、期待した結果が得られないかもしれません。もし、失敗したらあなたか私が何か間違ったせいでしょう。
ここではMapとReduceスクリプトの機能テスト法のいくつかのアイデアがあります。
# きわめて基本的なテストhadoop@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hadoop/mapper.pyfoo 1foo 1quux 1labs 1foo 1bar 1quux 1hadoop@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hadoop/mapper.py | sort | /home/hadoop/reducer.pybar 1foo 3labs 1quux 2# ebookの一つをテスト入力にする# (ebooksの入手方法は下記参照)hadoop@ubuntu:~$ cat /tmp/gutenberg/20417-8.txt | /home/hadoop/mapper.pyThe 1Project 1Gutenberg 1EBook 1of 1[...](わかった?)
HadoopでPythonコードを実行する
サンプルの入力データのダウンロード
サンプルとしてプロジェクトグッテンバーグの3つのebookを使います。
3つのebookをus-asciiエンコードでテキストファイルとしてダウンロードして、/tmp/gutenbergディレクトリなどの一時ディレクトリに解凍して保存してください。
hadoop@ubuntu:~$ ls -l /tmp/gutenberg/total 3592-rw-r--r-- 1 hadoop hadoop 674425 2007-01-22 12:56 20417-8.txt-rw-r--r-- 1 hadoop hadoop 1423808 2006-08-03 16:36 7ldvc10.txt-rw-r--r-- 1 hadoop hadoop 1561677 2004-11-26 09:48 ulyss12.txthadoop@ubuntu:~$
HDFSにローカルのサンプルデータをコピー
hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -copyFromLocal /tmp/gutenberg gutenberghadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -lsFound 1 items/user/hadoop/gutenberg <dir>hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls gutenbergFound 3 items/user/hadoop/gutenberg/20417-8.txt <r 1> 674425/user/hadoop/gutenberg/7ldvc10.txt <r 1> 1423808/user/hadoop/gutenberg/ulyss12.txt <r 1> 1561677
MapReduceジョブの実行
準備ができたので、PythonのMapReduceジョブをHadoopクラスタ上で実行できるようになりました。既に述べたように、HadoopStreamingを使ってMapとReduceコード間をSTDIN(標準入力)とSTDOUT(標準出力)を介してデータを受け渡します。
hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar -mapper /home/hadoop/mapper.py -reducer /home/hadoop/reducer.py -input gutenberg/* -output gutenberg-output
もし、Reduceタスクの数を増やすなどHadoopの設定を変更したい場合は、-jobconfオプションを使えます。
hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar -jobconf mapred.reduce.tasks=16 -mapper ...
mapred.map.tasksの重要事項: Hadoopはmapred.map.tasksでヒントを検討しないようになっています。ユーザが指定したmapred.reduce.tasksを使用しますが、それらを変更することはしません。mapred.map.tasksを指定できませんが、mapred.reduce.taskは指定できます。
ジョブはHDFSのgutenbergディレクトリからすべてのファイルを読み込んで処理し
上記のコマンドのコンソールでの出力例:
hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar -mapper /home/hadoop/mapper.py -reducer /home/hadoop/reducer.py -input gutenberg/* -output gutenberg-outputadditionalConfSpec_:nullnull=@@@userJobConfProps_.get(stream.shipped.hadoopstreamingpackageJobJar: [/usr/local/hadoop-datastore/hadoop-hadoop/hadoop-unjar54543/][] /tmp/streamjob54544.jar tmpDir=null[...] INFO mapred.FileInputFormat: Total input paths to process : 7[...] INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hadoop-datastore/hadoop-hadoop/mapred/local][...] INFO streaming.StreamJob: Running job: job_200803031615_0021[...][...] INFO streaming.StreamJob: map 0% reduce 0%[...] INFO streaming.StreamJob: map 43% reduce 0%[...] INFO streaming.StreamJob: map 86% reduce 0%[...] INFO streaming.StreamJob: map 100% reduce 0%[...] INFO streaming.StreamJob: map 100% reduce 33%[...] INFO streaming.StreamJob: map 100% reduce 70%[...] INFO streaming.StreamJob: map 100% reduce 77%[...] INFO streaming.StreamJob: map 100% reduce 100%[...] INFO streaming.StreamJob: Job complete: job_200803031615_0021[...] INFO streaming.StreamJob: Output: gutenberg-output hadoop@ubuntu:/usr/local/hadoop$
上の出力からわかるように、Hadoopは統計と情報表示用に基本的なWebインターフェースを提供しています。Hadoopクラスタを実行時にhttp://localhost:50050/にブラウザでアクセスしてください。次の画像は、ジョブを実行後のHadoopのWebインターフェースのスクリーンショットです。
HDFSのgutenberg-outputディレクトリに結果が正しく保存されているか確認してください:
hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls gutenberg-outputFound 1 items/user/hadoop/gutenberg-output/part-00000 <r 1> 903193 2007-09-21 13:00hadoop@ubuntu:/usr/local/hadoop$You can then inspect the contents of the file with the dfs -cat command:hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -cat gutenberg-output/part-00000"(Lo)cra" 1"1490 1"1498," 1"35" 1"40," 1"A 2"AS-IS". 2"A_ 1"Absoluti 1[...]hadoop@ubuntu:/usr/local/hadoop$
この出力では、単語がクウォート(")で囲まれています。これは、Hadoopが挿入したものではありません。これらの単語は、Pythonコードが単語に分割した結果で、この場合は、ebookテキストの引用の開始部分です。自分でpart-00000ファイルを調べてみてください。
Pythonのイテレータとジェネレータを使ったMapperとReducerのコードの改善
上のMapperとReducerのサンプルコードは、初めてのMapReduceアプリケーションの書き方を説明しました。この記事の目的は、コードを簡潔に、Pythonの初心者でもわかりやすくすることです。しかし、実世界のアプリケーションではPythonのイテレータとジェネレータ(PDFでのもっとよい紹介記事)を使ってコードを最適化したいです。
一般的に、イテレータとジェネレータ(例えばPythonのyield文を使って、イテレータを作成するための機能)は、実際に必要になるまで配列の要素を作成しない利点があります。これは、タスクによっては計算時間の短縮やメモり効率改善につながります。
注意: 次のMapとReduceスクリプトはHadoop上で(MapReduceジョブのMappterとReducerとして)動作させたときだけ、正しく動作します。つまり、いくつかの機能はHadoopの機能に依存しているため、もう、「cat DATA | ./mapper.py | sort | ./reducer.py」を実行してテストできないことになります。
正確には、同じ単語が("foo"など)が連続して複数回現れたときだけ、("foo", 4)のように単語の出現回数の合計を計算します。大抵の場合、Hadoopが単純なPythonスクリプトより効率的にできるように、HadoopにMapとReduce間で(キー, バリュー)のペアにグルーピングさせます。
mapper.py
#!/usr/bin/env python"""Pythonのイテレータとジェネレータを使った、より進んだMapper。"""import sysdef read_input(file):for line in file:# 行を単語に分割yield line.split()def main(separator='\t'):# 入力はSTDIN(標準入力)data = read_input(sys.stdin)for words in data:# 結果をSTDOUT(標準出力)に書き込む;# ここでの出力はReduceでの入力になる(reducer.pyの入力)## タブ文字での分割; 単語の出現回数は 1for word in words:print '%s%s%d' % (word, separator, 1)if __name__ == "__main__":main()
reducer.py
#!/usr/bin/env python"""Pythonのイテレータとジェネレータを使った、より進んだReducer。"""from itertools import groupbyfrom operator import itemgetterimport sysdef read_mapper_output(file, separator='\t'):for line in file:yield line.rstrip().split(separator, 1)def main(separator='\t'):# 入力はSTDIN(標準入力)data = read_mapper_output(sys.stdin, separator=separator)# 単語ごとに出現回数がグルーピングされ# 連続したキーとグループを返すイテレータを作成する:# current_word - 単語を含む文字列 (キー)# group - すべての["<current_word>", "<count>"]アイテムをyieldするイテレータfor current_word, group in groupby(data, itemgetter(0)):try:total_count = sum(int(count) for current_word, count in group)print "%s%s%d" % (current_word, separator, total_count)except ValueError:# count was not a number, so silently discard this itempassif __name__ == "__main__":main()

0 コメント:
コメントを投稿