ページ

2009年5月8日

[翻訳]Hadoopで動くPythonのMapReduceプログラムを書く

面白かったので、「Writing An Hadoop MapReduce Program In Python」の訳です(久しぶりです)。

HadoopMapReduceプログラムをPythonで書く



このチュートリアルでは、Hadoopの簡単なMapRecudeプログラムをPythonを使って書く方法を説明します。

動機


HadoopフレームワークはJavaで書かれていますが、Hadoop上で動くプログラムはJavaで記述する必要はありません。PythonやC++(バージョン0.14.1以降)のような他の言語で開発することができます。しかし、Hadoopのホームページのドキュメントや最も重要なPythonのサンプルコードを読むと、PythonコードをJythonを使ってJavaのjarファイルに変換しないといけないように考えてしまいます。明らかに、この方法はきわめて不便で、もし、Jythonが提供していないPythonの機能を使っている場合はともて問題です。Jythonのアプローチのもう一つの問題は、Hadoopと連携させてPythonプログラムを書くオーバーヘットです。<HADOOP_INSTALL>/src/examples/python/WordCount.pyを見れば、言っていることがわかると思います。Jythonのアプローチと、Pipeと呼んでいる新しいC++のMapReduce APIのアプローチを少なくとももう一度見てください。その違いはとても興味深いです。

このチュートリアルの目的は、HadoopMapReduceプログラムをもっとPython的なやり方、つまり、おなじみの方法で書くことです。

やりたいこと


これから、Jythonを使ってJavaのjarファイルに変換せずにHadoopで動くPythonの簡単なMapReduceプログラムを書きます。
プログラムはWordCountのサンプルコードをまねて作ります。つまり、テキストファイルを読み込んで、単語の出現頻度を数えます。入力はテキストファイルで出力もテキストファイルです。出力するテキストファイルの各行は、単語と出現回数をタブ文字で区切って記述します。

注釈: このチュートリアルに書かれているテクニックを使えば、PerlやRubyなどのPython以外のプログラミング言語を使うこともできます。 「舞台裏でおこっていること」も書いているので、間違いがあれば教えてください。


準備


Hadoopのクラスタをセットアップして起動しておいてください。この文書ではHadoopのセットアップと起動方法は説明しません。Hadoopクラスタがない場合は、次のチュートリアルが構築するための手助けになるでしょう。チュートリアルはUbuntu向けに書かれていますが、他のLinux/Unix環境でも使えるでしょう。


PythonのMapReduceコード


次のPythonコードのトリックは、HadoopStreaming(wikiのエントリを見てください)を使ってMapとReduceのプログラム間PでSTDIN(標準入力)とSTDOUT(標準出力)を介してデータを受け渡しています。単純にPythonのsys.stdinから入力データを読み込んで、sys.stdoutに出力データを書き込んでいます。
他のことはHadoopStreamingが面倒を見てくれるので、やるべきことは以上です。不思議ですか?まあ、少なくとも、私もびっくりでした。

Map: mapper.py


次のコードを/home/hadoop/mapper.pyに保存してください。このプログラムはSTDIN(ヒョジュン入力)からデータを著見込んで、単語に分割します。そして、STDOUT(標準出力)に単語の出現回数(中間データ)を記述した行(複数行)を出力します。Mapスクリプトは単語の出現頻度の合計(中間データ)は計算しません。かわりに、入力途中に「<word>」が複数回出現しても「<word> 1」とすぐに出力します。そして、次に続くReduceが最終的な合計を計算します。もちろん、すきなようにプログラムの振る舞いを変えることができますが、説明するためにこのチュートリアルではそのままにしています。


#!/usr/bin/env python
 
import sys
 
# input comes from STDIN (standard input)
for line in sys.stdin:
    # 行頭と行末の空白を取り除く
    line = line.strip()
    # 行を単語に分割する
    words = line.split()
    # カウンターを上げる
    for word in words:
        # STDOUT (標準出力)に結果を書き込む;
        # ここで出力したものはReduce(つまりrecuder.py)での
        # 入力になる
        #
        # タブ文字での分割; 単語の出現回数は 1
        print '%s\t%s' % (word, 1)


Reduce: reducer.py


次のコードを/home/hadoop/reducer.pyに保存してください。このコードはmapper.pyの結果をSTDIN(標準入力)から読み込んで、書く単語の出現回数を合計して結果をSTDOUT(標準出力)に出力します。
ファイルは実行権限があることを確認してください(chmod +x /home/hadoop/reducer.pyを実行すれば実行権限をつけられます)。実行権限がないと、問題が発生します。


#!/usr/bin/env python
 
from operator import itemgetter
import sys
 
# 単語の出現回数のマップ
word2count = {}
 
# 入力はSTDIN
for line in sys.stdin:
    # 行頭と行末の空白文字を除去
    line = line.strip()
 
    # mapper.pyの出力をパースする
    word, count = line.split('\t', 1)
    # 回数を文字列から数字に変換
    try:
        count = int(count)
        word2count[word] = word2count.get(word, 0) + count
    except ValueError:
        # 回数が数字でなければ
        # こっそり、この行はなかったことにして捨て去る
        pass
 
# 単語をアスキーソートする
#
# この処理は必要ないが、オフィシャルHadoopの単語の
# カウントサンプルコードの最終出力に似せるために行う。
sorted_word2count = sorted(word2count.items(), key=itemgetter(0))
 
# STDOUT (標準出力)に結果を書き込む
for word, count in sorted_word2count:
    print '%s\t%s'% (word, count)
    

コードをテストする (cat data | map | sort | reduce)


MapRecudeジョブでテストする前に手動てmapper.pyとreducer.pyをテストすることをお進めします。そうでなければ、問題なく終了するかもしれませんが、ジョブの結果のデータが全く出力されなかったり、期待した結果が得られないかもしれません。もし、失敗したらあなたか私が何か間違ったせいでしょう。
ここではMapとReduceスクリプトの機能テスト法のいくつかのアイデアがあります。

 # きわめて基本的なテスト
 hadoop@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hadoop/mapper.py
 foo     1
 foo     1
 quux    1
 labs    1
 foo     1
 bar     1
 quux    1  

 hadoop@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hadoop/mapper.py | sort | /home/hadoop/reducer.py
 bar     1
 foo     3
 labs    1
 quux    2
 # ebookの一つをテスト入力にする
 # (ebooksの入手方法は下記参照)
 hadoop@ubuntu:~$ cat /tmp/gutenberg/20417-8.txt | /home/hadoop/mapper.py
 The     1
 Project 1
 Gutenberg       1
 EBook   1
 of      1
 [...] 
 (わかった?)


HadoopでPythonコードを実行する


サンプルの入力データのダウンロード


サンプルとしてプロジェクトグッテンバーグの3つのebookを使います。


3つのebookをus-asciiエンコードでテキストファイルとしてダウンロードして、/tmp/gutenbergディレクトリなどの一時ディレクトリに解凍して保存してください。

 hadoop@ubuntu:~$ ls -l /tmp/gutenberg/
 total 3592
 -rw-r--r-- 1 hadoop hadoop  674425 2007-01-22 12:56 20417-8.txt
 -rw-r--r-- 1 hadoop hadoop 1423808 2006-08-03 16:36 7ldvc10.txt
 -rw-r--r-- 1 hadoop hadoop 1561677 2004-11-26 09:48 ulyss12.txt
 hadoop@ubuntu:~$

HDFSにローカルのサンプルデータをコピー


実際のMapReduceジョブを実行する前に、ローカルのファイルシステムからHadoopHDFSにファイルをコピーします。

 hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -copyFromLocal /tmp/gutenberg gutenberg
 hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls
 Found 1 items
 /user/hadoop/gutenberg  <dir>
 hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls gutenberg
 Found 3 items
 /user/hadoop/gutenberg/20417-8.txt      <r 1>   674425
 /user/hadoop/gutenberg/7ldvc10.txt      <r 1>   1423808
 /user/hadoop/gutenberg/ulyss12.txt      <r 1>   1561677

MapReduceジョブの実行


準備ができたので、PythonのMapReduceジョブをHadoopクラスタ上で実行できるようになりました。既に述べたように、HadoopStreamingを使ってMapとReduceコード間をSTDIN(標準入力)とSTDOUT(標準出力)を介してデータを受け渡します。

 hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar -mapper /home/hadoop/mapper.py -reducer /home/hadoop/reducer.py -input gutenberg/* -output gutenberg-output


もし、Reduceタスクの数を増やすなどHadoopの設定を変更したい場合は、-jobconfオプションを使えます。

 hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar -jobconf mapred.reduce.tasks=16 -mapper ...


mapred.map.tasksの重要事項: Hadoopはmapred.map.tasksでヒントを検討しないようになっています。ユーザが指定したmapred.reduce.tasksを使用しますが、それらを変更することはしません。mapred.map.tasksを指定できませんが、mapred.reduce.taskは指定できます。


ジョブはHDFSgutenbergディレクトリからすべてのファイルを読み込んで処理し

上記のコマンドのコンソールでの出力例:

 hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar -mapper /home/hadoop/mapper.py -reducer /home/hadoop/reducer.py -input gutenberg/* -output gutenberg-output
 additionalConfSpec_:null
 null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming
 packageJobJar: [/usr/local/hadoop-datastore/hadoop-hadoop/hadoop-unjar54543/]
 [] /tmp/streamjob54544.jar tmpDir=null
 [...] INFO mapred.FileInputFormat: Total input paths to process : 7
 [...] INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hadoop-datastore/hadoop-hadoop/mapred/local]
 [...] INFO streaming.StreamJob: Running job: job_200803031615_0021
 [...]
 [...] INFO streaming.StreamJob:  map 0%  reduce 0%
 [...] INFO streaming.StreamJob:  map 43%  reduce 0%
 [...] INFO streaming.StreamJob:  map 86%  reduce 0%
 [...] INFO streaming.StreamJob:  map 100%  reduce 0%
 [...] INFO streaming.StreamJob:  map 100%  reduce 33%
 [...] INFO streaming.StreamJob:  map 100%  reduce 70%
 [...] INFO streaming.StreamJob:  map 100%  reduce 77%
 [...] INFO streaming.StreamJob:  map 100%  reduce 100%
 [...] INFO streaming.StreamJob: Job complete: job_200803031615_0021
 [...] INFO streaming.StreamJob: Output: gutenberg-output  hadoop@ubuntu:/usr/local/hadoop$ 


上の出力からわかるように、Hadoopは統計と情報表示用に基本的なWebインターフェースを提供しています。Hadoopクラスタを実行時にhttp://localhost:50050/にブラウザでアクセスしてください。次の画像は、ジョブを実行後のHadoopのWebインターフェースのスクリーンショットです。

 

HDFSgutenberg-outputディレクトリに結果が正しく保存されているか確認してください:

 hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls gutenberg-output
 Found 1 items
 /user/hadoop/gutenberg-output/part-00000     <r 1>   903193  2007-09-21 13:00
 hadoop@ubuntu:/usr/local/hadoop$ 
You can then inspect the contents of the file with the dfs -cat command:
 hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -cat gutenberg-output/part-00000
 "(Lo)cra"       1
 "1490   1
 "1498," 1
 "35"    1
 "40,"   1
 "A      2
 "AS-IS".        2
 "A_     1
 "Absoluti       1
 [...]
 hadoop@ubuntu:/usr/local/hadoop$

この出力では、単語がクウォート(")で囲まれています。これは、Hadoopが挿入したものではありません。これらの単語は、Pythonコードが単語に分割した結果で、この場合は、ebookテキストの引用の開始部分です。自分でpart-00000ファイルを調べてみてください。


Pythonのイテレータとジェネレータを使ったMapperとReducerのコードの改善


上のMapperとReducerのサンプルコードは、初めてのMapReduceアプリケーションの書き方を説明しました。この記事の目的は、コードを簡潔に、Pythonの初心者でもわかりやすくすることです。しかし、実世界のアプリケーションではPythonのイテレータとジェネレータ(PDFでのもっとよい紹介記事)を使ってコードを最適化したいです。
一般的に、イテレータとジェネレータ(例えばPythonのyield文を使って、イテレータを作成するための機能)は、実際に必要になるまで配列の要素を作成しない利点があります。これは、タスクによっては計算時間の短縮やメモり効率改善につながります。

注意: 次のMapとReduceスクリプトはHadoop上で(MapReduceジョブのMappterとReducerとして)動作させたときだけ、正しく動作します。つまり、いくつかの機能はHadoopの機能に依存しているため、もう、「cat DATA | ./mapper.py | sort | ./reducer.py」を実行してテストできないことになります。

正確には、同じ単語が("foo"など)が連続して複数回現れたときだけ、("foo", 4)のように単語の出現回数の合計を計算します。大抵の場合、Hadoopが単純なPythonスクリプトより効率的にできるように、HadoopにMapとReduce間で(キー, バリュー)のペアにグルーピングさせます。

mapper.py


#!/usr/bin/env python
"""Pythonのイテレータとジェネレータを使った、より進んだMapper。"""
 
import sys
 
def read_input(file):
    for line in file:
        # 行を単語に分割
        yield line.split()
 
def main(separator='\t'):
    # 入力はSTDIN(標準入力)
    data = read_input(sys.stdin)
    for words in data:
        # 結果をSTDOUT(標準出力)に書き込む;
        # ここでの出力はReduceでの入力になる(reducer.pyの入力)
        #
        # タブ文字での分割; 単語の出現回数は 1
        for word in words:
            print '%s%s%d' % (word, separator, 1)
 
if __name__ == "__main__":
    main()
    
    

reducer.py


#!/usr/bin/env python
"""Pythonのイテレータとジェネレータを使った、より進んだReducer。"""
 
from itertools import groupby
from operator import itemgetter
import sys
 
def read_mapper_output(file, separator='\t'):
    for line in file:
        yield line.rstrip().split(separator, 1)
 
def main(separator='\t'):
    # 入力はSTDIN(標準入力)
    data = read_mapper_output(sys.stdin, separator=separator)
    # 単語ごとに出現回数がグルーピングされ
    # 連続したキーとグループを返すイテレータを作成する:
    #   current_word - 単語を含む文字列 (キー)
    #   group - すべての["<current_word>", "<count>"]アイテムをyieldするイテレータ
    for current_word, group in groupby(data, itemgetter(0)):
        try:
            total_count = sum(int(count) for current_word, count in group)
            print "%s%s%d" % (current_word, separator, total_count)
        except ValueError:
            # count was not a number, so silently discard this item
            pass
 
if __name__ == "__main__":
    main()

0 件のコメント: