ページ

2011年12月9日

Python3のconcurrentパッケージ

こんな経緯Python Advent Calendarに参加させられてしまいました。まずは、言い訳から。僕がPython3を触ったことがあるのは2,3年前で、それも数日だけです。それ以降全く触っていません。はい。Python3初心者です。笑いたければ笑ってください。そんな僕がPython3で何を書こうか、と言うことで、あのみんなが大好きなJavaからぱくったと名高いconcurrentパッケージについて書きます。
まず、まっとうなPythonプログラマならあのレガシーなJavaぐらい分かるよね。Javaが分かるんだったらJavaのjava.util.concurrentパッケージぐらいは知っていて、concurrentて常識だよね、と言うことで、concurrentについての説明は多分、省略です。

PythonのconcurrentパッケージはJavaのconcurrentパッケージのほんの一部です。って言うか、executer関連しかないじゃん・・・。concurrentパッケージの下にはfuturesしかいないし・・・。でも、今後増えそうな感じがして頼もしくはあります。Python3でGILがどうなっているか知りませんが、pypyやjythonなどもあるので、スレッド周りの安全性を実装系に依存しちゃうのはあまりうれしくありません。なので、スレッドの安全性は意識した方が良いと思うわけです。でも、そんな能書きはいいのでゆっくりとコードと戯れてみましょう。

まずは、concurrentを使わないコードです。もっと最初はスレッドを使わないモノが必要ですが、めんどいのでごめんなさい。
import threading
import time

def thread_func():
    ident = threading.currentThread().ident
    print("start")
    for i in range(4):
        print("waiting " + str(i))
        print("current thread: " + str(ident))
        time.sleep(0)
    print("done")

l = [threading.Thread(target = thread_func) for i in range(4)]
for thrd in l:
    thrd.start()
for thrd in l:
    thrd.join()

スレッドをぐるぐる回して、最後にjoinしてすべてのスレッドが終わるのを待っています。これを実行するとスレッドが切り替わっているのが分かります。ループ内でちょっと重い処理をして、お互いに依存関係がないときに、こんな感じのコードは便利です。

でも、今回はスレッドというかループ数が4だから良いけど、もっと多いと一瞬で破綻しちゃうかもしれません。LinuxとかMacはデフォルトのままだとスレッド一つにつきメモリを8M消費します。こんなところで大量にメモリを消費すると困っちゃいます。と言うことで、大体スレッドプールとかを実装するわけです。まあ、みんな同じことを考えます。と言うことでDRYって怒られそうです。怒られてしょぼくれているあなたにconcurrent.futuresのexecuterです。まずは、ThreadPoolExecuterです。名前からしてそれっぽいでしょ。実行して貰えれば、スレッドが最大二つ(max_worker=2)しか動かないです。そして、別のスレッドで動いているのが分かると思います。でも、workerが一つしかいないよ!って言う人は、もっと沢山submitしてあげれば大丈夫です。

import threading
import concurrent.futures
import time

def executer_func():
    ident = threading.currentThread().ident
    print("start")
    for i in range(4):
        print("waiting " + str(i))
        print("current thread: " + str(ident))
        time.sleep(0)
    print("done")
    return ident


with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executer:
    l = [executer.submit(executer_func) for i in range(4)]
    for future in l:
        print("=== result: " + str(future.result()))

ThreadPoolExecutorを使うとスレッド数を制御・プールできて使い回せます。スレッドが終了したら、未来のオブジェクト(future)から結果を取り出せます。えっ、PythonはGILがあるから本当に重い処理はコアを生かしきれないって?わがままだな。そんなにCPUを発熱させたければ、ProcessPoolExecuterです。
import threading
import os
import concurrent.futures
import time

def executer_func():
    ident = threading.currentThread().ident
    print("start")
    for i in range(4):
        print("waiting " + str(i))
        print("current thread: " + str(ident))
        time.sleep(0)
    print("done")
    return os.getpid()


with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executer:
    l = [executer.submit(executer_func) for i in range(4)]
    for future in l:
        print("=== result: " + str(future.result()))
ほら、これでスレッドじゃなくプロセスとして実行されます。CPUをこき使ってください。
でもなー、まあ、待たなくてもいいので、実行した結果がだけをもうちょっと処理したいんだけど・・・、って誰かが文句をたれています。Javaだと・・・、ってすぐにJavaを引き合いに出す人がいます。大丈夫です。タスクが終わったらコールバックを呼ぶことができます。Javaだと何かとオブジェクトを作ってめんどいですが、Pythonはそんなことは在りません。
import threading
import concurrent.futures
import time

def executer_func():
    ident = threading.currentThread().ident
    print("start")
    for i in range(4):
        print("waiting " + str(i))
        print("current thread: " + str(ident))
        time.sleep(0)
    print("done")
    return ident

def done_callback(future):
    print("=== result: " + str(future.result()))

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executer:
    for i in range(4):
        future = executer.submit(executer_func)
        future.add_done_callback(done_callback)

futureにadd_done_callbackで処理が終わったときに実行する処理を指定できます。
まあ、大体分かったけど、発行したタスクが全部がちゃんと終わってから何かをしたいって?じゃあ、waitすればいいじゃん。と言うことで、

import threading
import concurrent.futures
import time

def executer_func():
    ident = threading.currentThread().ident
    print("start")
    for i in range(4):
        print("waiting " + str(i))
        print("current thread: " + str(ident))
        time.sleep(0)
    print("done")
    return ident


with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executer:
    l = [executer.submit(executer_func) for i in range(4)]
    results = concurrent.futures.wait(l)
    for result in results.done:
        print("=== result: " + str(result.result()))
concurrent.futures.waitですべてが終わるのを待ってくれます。その後、処理が終わったものを取り出して(result.done)、結果を処理するよろし。

executerのmapについても書こうかと思ったのですが、pythonの組み込み関数のmapと大体同じで面白くないのでやめました。
終わったモノから順番になるべく速く処理するときはconcurrent.futures.as_completedで取り出せます。

と言うことで、concurrentでした。本音を言えば、ループをみてJITがスレッドにできるところは勝手にスレッドにしてくれて、何もしていないのにある日突然、アプリケーションが高速化してくれるのが理想です。インテルのスレッディングビルディングブロックみたいに・・・。まあ、そこまで行かなくてもヒント(アノテーションとか)を書いておけばJITが解釈しやすくなるとかでも・・・。

次はaodag先生です。
でわでわ

0 件のコメント: