ページ

2011年2月28日

python websocket client 0.4.1リリース

誰も使わないだろうと思っていたのにバグ報告がきて、なおかつ恥ずかしいバグだったので、過去を亡き者にするためにpython websocket client 0.4.1をリリースをしました。pypiにもいるので、pip install websocket-clientでもインストールできます。

どれくらい恥ずかしいバグかというと、カスタムHTTPヘッダー(追加のHTTPヘッダー)をGET %s HTTP/1.1の前に書いてしまっていたのです。これじゃHTTPじゃなくなります。恥ずかしすぎます。でも、新しいリリースで過去をなかったモノにします。

うーん、次のリリースはwebsocketのプロトコルが変わった時だと思っていたので、悲しいですね。

2011年2月9日

やっぱりpycurl

昨日はurlgrabberだと言っておきながら、最新版はpycurlに依存しているし、urllibにこだわるほど思い入れもないし、やっぱり、pycurlだぜ、と言うことで。

インストールはいつも通りpip install pycurlだけでできます。コードは、

import pycurl

def body_callback(buf):
    print buf,

curl = pycurl.Curl()
curl.setopt(curl.URL, "http://localhost:8088")
curl.setopt(curl.WRITEFUNCTION, body_callback)
curl.perform()

curl.setopt(curl.URL, "http://localhost:8088")
curl.perform()

curl.close()

簡単ですね。何も考えなくてもkeep aliveがデフォルトで有効になっているので、最初のリクエスト(curl.perform())と次のリクエストは同じコネクションが使い回せます。その他、オプションはCのlibcurlと同じです。
リモートから取得したデータは登録したコールバック関数で処理します。きっとメモリ管理とか、Cの名残でしょうね。コールバック関数のbufは受け取ったデータが細切れできます。大きなデータを受信すると複数回にわけてコールバック関数が呼ばれます。
pycurlには他にもいろいろ便利な機能がたくさんありますが、それはコマンドラインのcurlと同じですね。

でわでわ

2011年2月8日

urllib2でkeep-alive

別にurllib2じゃなくてもよかったんです。Pythonでkeep-aliveでhttpクライアントさえ動いてくれれば、何でもいいのです。pythonでkeep-aliveでhttpクライアントをちょめちょめしたいとgoogleに聞くと、最近はやりのSではじまるサイトをお告げで示してくれました。Pythonでurllib2でkeep-aliveを使うには、urlgrabber使いなよ、と言っています。仕方なく、この人と心中することにしました。後でもうちょっと読むと、pycurlと言う人もいます。urlgrabberはpure pythonなので、pureな自分に最適なんだと言い聞かせています。

さて、urlgrabberのtar ballをダウンロードしてインストールします。ただ、ダウンロードしたそのものだと、僕のpython2.7だとちゃんと動いてくれないので、この記述のようにコードを変更します。

import urllib2
import time
from keepalive import HTTPHandler
keepalive_handler = HTTPHandler()
opener = urllib2.build_opener(keepalive_handler)
urllib2.install_opener(opener)

fo = urllib2.urlopen('http://localhost:8088/')
print fo.read()

print "sleeping"
time.sleep(10)

fo = urllib2.urlopen('http://localhost:8088/iya')
print fo.read()


とあるサーバでチェックしたらちゃんとkeep-aliveしています。と言うことで、テストコードが完成です。

でわでわ。
----

使ったのは0.2.1でしたが、最新版の3.9.1だとpycurlに依存しているんですね。
pycurlを直接使うのが一番。urlgrabberは忘れましょう

2011年2月4日

gevent_request_profilerを試してみたよ

gevent_request_profilerを試してみました。wsgiアプリケーションのhttpサーバをgeventを使って動かしていると、時々何かがブロックしているかも?って言うときがあります。また、ブロックしなくても、各処理にどれくらいの時間がかかっているのかプロファイルしたいことがよくあります。グリーンスレッドだと、何かが長時間ブロックするすると、他の処理がすべて待たされるので深刻です。普通のプロファイラだと、グリーンスレッドの本当の実行時間を教えてくれません。それをためのプロファイラです。まあ、greenletがどういう風に処理を実行しているのか垣間見れるのも面白いです。

さて、インストールですが、githubからチェックアウトして、インストールしました。そのたと、次のもっとも単純そうなコードを実行します。

from gevent import monkey
monkey.patch_all()

from tellapart.frontend import gevent_profiler
profiler = gevent_profiler.Profiler(request_profiling_pct=1.0)

from gevent import wsgi

def app(environ, start_response):
    path = environ["PATH_INFO"]
    if path == "/":
        start_response("200 OK", [("Content-Type", "text/html")])
        return "Hello, Profiler"

    start_response("404 Not Found", [])
    return []

if __name__ == "__main__":
    server = wsgi.WSGIServer(("", 5000), app)#, handler_class=WebSocketHandler)
    try:
        server.serve_forever()
    except KeyboardInterrupt:
        server.stop()
profilerのオブジェクトを作るときにgevent.wsgiをいじくったりするので、profilerのオブジェクトを作った後にgevent.wsgiをロードしないといけません。実行結果は、
>>> Begin RequestProfile
  Path: /
  Start time: 2011-02-04 05:35:31.300684 UTC
  End time: 2011-02-04 05:35:31.302821 UTC
  Elapsed wall time: 2.1 ms
  Elapsed CPU time: 0.8 ms
  Number of spans: 1
  Longest span CPU time: 0.8 ms
Span(
  greenlet_id=1,
  start_time=1296797731300.7,
  end_time=1296797731301.5,
  duration=0.8 ms,
  finished=True,
  fn_name=gevent.wsgi.WSGIServer.handle,
  stack_trace=  File "/Users/hiro/.virtualenvs/main/lib/python2.7/site-packages/tellapart_gevent_profiler-0.5-py2.7.egg/tellapart/frontend/gevent_profiler.py", line 274, in run
    ProfilingGreenlet._HUB._record_execution_span(self)
  File "/Users/hiro/.virtualenvs/main/lib/python2.7/site-packages/tellapart_gevent_profiler-0.5-py2.7.egg/tellapart/frontend/gevent_profiler.py", line 200, in _record_execution_span
    traceback.extract_stack(), finished)
)
<<< End RequestProfile

スレッドが一つだけなので、単純です。処理時間も見えるので素敵です。次に、グリーンスレッドを作って実行してみます。
import gevent
from gevent import monkey
monkey.patch_all()

from tellapart.frontend import gevent_profiler
profiler = gevent_profiler.Profiler(request_profiling_pct=1.0)

from gevent import wsgi

def loop():
    x = 0
    for i in range(100*10000):
        x += i
    #gevent.sleep(0)
    for i in range(100*10000):
        x += i
        

def app(environ, start_response):
    path = environ["PATH_INFO"]
    if path == "/":
        gevent.spawn(loop)
        gevent.joinall([gevent.spawn(loop) for _ in range(3)])
        start_response("200 OK", [("Content-Type", "text/html")])
        return "Hello, Profiler"

    start_response("404 Not Found", [])
    return []

if __name__ == "__main__":
    server = wsgi.WSGIServer(("", 5000), app)#, handler_class=WebSocketHandler)
    try:
        server.serve_forever()
    except KeyboardInterrupt:
        server.stop()

実行した結果は

>>> Begin RequestProfile
  Path: /
  Start time: 2011-02-04 08:17:11.701029 UTC
  End time: 2011-02-04 08:17:14.698614 UTC
  Elapsed wall time: 2997.6 ms
  Elapsed CPU time: 2989.4 ms
  Number of spans: 6
  Longest span CPU time: 1749.7 ms
Span(
  greenlet_id=1,
  start_time=1296807431701.0,
  end_time=1296807431706.3,
  duration=5.3 ms,
  finished=False,
  fn_name=gevent.wsgi.WSGIServer.handle,
  [snip]
)
Span(
  greenlet_id=2,
  start_time=1296807431712.1,
  end_time=1296807433461.7,
  duration=1749.7 ms,
  finished=True,
  fn_name=__main__.loop,
  [snip]
)
Span(
  greenlet_id=3,
  start_time=1296807433462.9,
  end_time=1296807433829.8,
  duration=366.9 ms,
  finished=True,
  fn_name=__main__.loop,
  [snip]
)
Span(
  greenlet_id=4,
  start_time=1296807433830.0,
  end_time=1296807434345.0,
  duration=515.0 ms,
  finished=True,
  fn_name=__main__.loop,
  [snip]
)
Span(
  greenlet_id=5,
  start_time=1296807434345.3,
  end_time=1296807434687.5,
  duration=342.3 ms,
  finished=True,
  fn_name=__main__.loop,
  [snip]
)
Span(
  greenlet_id=1,
  start_time=1296807434688.3,
  end_time=1296807434698.4,
  duration=10.1 ms,
  finished=True,
  fn_name=gevent.wsgi.WSGIServer.handle,
  [snip]
<<< End RequestProfile
スレッドが順番に実行されているのがわかります。スタックトレースは長いので削ってあります。loopメソッドの中は一気に処理が進むので、gevent.sleepを実行して、CPUを他のスレッドに渡してあげます(gevent.sleepのコメントを外す)。その結果が次です。

>>> Begin RequestProfile
  Path: /
  Start time: 2011-02-04 08:18:17.436797 UTC
  End time: 2011-02-04 08:18:18.885329 UTC
  Elapsed wall time: 1448.5 ms
  Elapsed CPU time: 1440.1 ms
  Number of spans: 10
  Longest span CPU time: 216.0 ms
Span(
  greenlet_id=1,
  start_time=1296807497436.8,
  end_time=1296807497444.1,
  duration=7.3 ms,
  finished=False,
  fn_name=gevent.wsgi.WSGIServer.handle,
  [snip]
)
Span(
  greenlet_id=2,
  start_time=1296807497449.6,
  end_time=1296807497654.6,
  duration=205.0 ms,
  finished=False,
  fn_name=__main__.loop,
  [snip] 
)
Span(
  greenlet_id=3,
  start_time=1296807497654.9,
  end_time=1296807497832.5,
  duration=177.6 ms,
  finished=False,
  fn_name=__main__.loop,
    [snip]
)
Span(
  greenlet_id=4,
  start_time=1296807497833.1,
  end_time=1296807498049.1,
  duration=216.0 ms,
  finished=False,
  fn_name=__main__.loop,
  [snip]
)
Span(
  greenlet_id=5,
  start_time=1296807498049.5,
  end_time=1296807498217.7,
  duration=168.2 ms,
  finished=False,
  fn_name=__main__.loop,
  [snip]
)
Span(
  greenlet_id=2,
  start_time=1296807498218.2,
  end_time=1296807498383.1,
  duration=164.9 ms,
  finished=True,
  fn_name=__main__.loop,
  [snip]
)
Span(
  greenlet_id=5,
  start_time=1296807498383.3,
  end_time=1296807498548.7,
  duration=165.4 ms,
  finished=True,
  fn_name=__main__.loop,
  [snip]
)
Span(
  greenlet_id=4,
  start_time=1296807498548.9,
  end_time=1296807498714.2,
  duration=165.3 ms,
  finished=True,
  fn_name=__main__.loop,
  [snip]
)
Span(
  greenlet_id=3,
  start_time=1296807498714.4,
  end_time=1296807498878.7,
  duration=164.3 ms,
  finished=True,
  fn_name=__main__.loop,
  [snip]
)
Span(
  greenlet_id=1,
  start_time=1296807498879.0,
  end_time=1296807498885.1,
  duration=6.2 ms,
  finished=True,
  fn_name=gevent.wsgi.WSGIServer.handle,
  [snip]
)
<<< End RequestProfile
finishedがFalseになっているものは、gevent.sleepでスレッドが切り替わったものです。それぞれの処理の実行時間が分かります。グラフィカルに見えるように処理すればもっとかっこいいのですが・・・。stack_traceのところをみれば、コードのどこでスレッドが切り替わったのかも分かります。wsgiのリクエストだけプロファイルできますが、なかなかいい感じかも。

2011年2月2日

RabbitMQのクライアントのfailover対応はどうなっているのか?

今年はうさぎ年なので、RabbitMQで遊んでいる今日この頃です。RabbitMQ自体は、クラスタリングに対応していて、クラスタ内でメッセージをルーティングして配信してくれます。ただ、AMQPのプロトコル自体にはクラスタについていの規定はありません。AMQPを利用しているサーバなりクライアントなりが独自に実装すべきもののようです。クライアントの実装としてJavaとPythonでは若干状況は異なる(開発元が違う)のですが、そのメモ。

まず、Pythonです。Pythonはamqlibを使っています。AMQPを利用しているサーバに対して使えます。でも、接続先は一つだけです。接続先が落ちている場合はつながりません。その場合、接続先を自分で列挙して順番にトライするようです。最初の接続先はそれで十分だと思いますが、実行途中でサーバがダウンした場合は、自動で他のサーバを見つけてくれたりはしません。サンプルコードのようにしか見えませんが、その辺を透過的に扱おうとしているのが、このコードです。つまり、failoverのことはクライアントでがんばれ、と。

Javaの場合は、rabbitmq.comからJavaのクライアントが提供されています。最初の接続については、複数のAddressを渡して順番に接続を試みることができるので、Pythonよりはよくできています。でも、一旦接続が確立すると、それだけを使い続けます。途中でサーバがダウンしたら、また最初からやり直しです。残念ながrこのあたりはPythonと大差ありません。AMQPのクライアントライブラリじゃなくってRabbitMQのクライアントライブラリなので期待しましたが、残念ですね。

ざっとメーリングリスト(あと、これ?)を読んだ限りでは、このあたりの事情は2008年頃からあまり変化がないようです。MSラブなのにWindowsのことは僕はよく分かりませんが、LinuxではVirtual IPを使えばいいんじゃない、と書いてあったりもします。それもソリューションの一つかもしれませんが、ちょっと敷居が高いです。
たとえばAPサーバが6台あれば、6台すべてにRabbitMQを入れてクラスタ化して、各APはlocalhostのMQを見るとかっていうのは現実的なんだろうか?これだと、failしても影響を局所化できそうですが、それ以外のコストが上がりそうです。

サーバの実装に比べてクライアントの実装がちょっと薄すぎるような気がします。まあ、failoverできれば何でもいいんですが。