ページ

2010年12月22日

WebSocketのサーバ間でメッセージをリレーさせるサンプル

どうもです。なぜかPythonのアドベントカレンダーに参加することになっていましたおおたにです。お題が、PythonのWebフレームワークです。えっ、Webフレームワーク?僕はそんなにWebフレームワーク詳しくないし・・・。数年前ならまだしも、最近は便利に使えれば後はそれほど気にしなくなっていました。でも、Webフレームワーク・・・。ということで、WebSocketとFlaskを絡めてサンプルコードを書いたら、どこにもimport flaskってやっていない罠が・・・。ということで、前置き(言い訳)が長くなりました、WebSocketのサーバを複数立てたときに、サーバ間のデータのやりとりをどうするのかというお話です。

まず、WebSocketはHTML5関連で脚光を浴びている機能なので、知っている人も多いでしょう。簡単にいってしまえば、最初にHTTPプロトコルを偽装して、その後データのだだ漏れができるものです。Chromeでは去年サポートされ、Safariも今の最新版でサポートされています。iOS4.2.1でもサポートされているのでiPhoneからでも使えます。IE9では、「何それ?」の悲しい状態ですが、プロトコルの不安定さから仕方ないですね。Firefoxは4からサポートすると期待させておきながら、こんな残念な記事が今朝聞こえてきました。

WebSocketでデータの送受信をすると、Cometなどのロングポーリングよりはインフラに優しい作りになります。でも、ロングポーリングでも同じですが、アプリケーションサーバを複数立てたとき、アプリケーションサーバ間でデータをやりとりする必要がでてきます。リアルタイム性が大きなメリットなので、データベースとかは介せません。そこで有力なのがXMPPだと信じていますが、Google Waveが消えたのと、ブログの一つのエントリじゃ収まらないので、XMPPは忘れます。自前で作るとしたらハブサーバを作って、アプリケーションサーバとハブサーバの間でデータの垂れ流し(リレー)をさせるのが良さそうです。そのときのプロトコルもWebSocketで中身はJSONとか、MessagePackとかもいいかな、と思ったのですが、サンプルコードはライブラリを調べる時間も含めて30分以内に作らないといけないというローカルルールにより、行ベースで自分でソケットを開きます。



で、今回のサンプルは全部geventベースで書いています。非同期処理が同期処理のように書けるのがすごいメリットです。WSGIサーバにもなるので、FlaskでもDjangoでも使えます。で、なぜ、WebSocketを非同期にこだわるかはむか〜し、どこかで書いた気がするので省略。でも、今回のサンプルはある程度ブロックするので効率は悪いです。サンプルコードはここにあります。

まず、ハブサーバのコードです。

from gevent.server import StreamServer

class BCServer:
    def __init__(self):
        self.clients = []

    def received(self, socket, addr):
        fileobj = socket.makefile()
        self.clients.append(fileobj)
        try:
            while True:
                line = fileobj.readline()
                if not line:
                    break
                if line.startswith("quit:"):
                    break
                if line.startswith("msg:"):
                    msg = line.split(":", 1)[1]
                    for fo in self.clients:
                        try:
                            fo.write(msg)
                            fo.flush()
                        except Exception, e:
                            print p
        except Exception, e:
            print e
        finally:
            self.clients.remove(fileobj)


if __name__ == "__main__":
    bcs = BCServer()
    server = StreamServer(("0.0.0.0", 3000), bcs.received)
    server.serve_forever()



geventのsocketが標準ライブラリのsocketと全く同じようにあつかえます。メッセージを垂れ流しているだけなので、単純です。Webアプリケーションの方は、以前のコードとほとんど同じなので、ポイントだけ。全部読みたければ、これ

chat_clients = set()

def read_server(fileobj):
    while True:
        data = fileobj.readline()
        if not data:
            print "end read_server"
            break
        for client in chat_clients:
            client.send(data)

def handle_chat(ws):
    chat_clients.add(ws)
    while True:
        message = ws.wait()
        if message is None:
            break
        fileobj.write("msg:" + message + "\n")
        fileobj.flush()

    chat_clients.remove(ws)


fileobjはハブサーバに接続した後にsocket.makefileしたものです。これだけでメッセージをリレーしてくれます。えっ?信じられないって?それじゃ、動いている画像です。

2010年12月16日

PythonでHTML数値文字参照を文字列に変換

ちょっと前にHTML数値文字参照を文字列に変換したくなりましたが、適当なライブラリはここまでは面倒は見てくれませんでした。HTML数値文字参照、っていう名称が正しいかしませんが、ሴみたいなやつです。これを文字列に変換したいのです。3分間グーグル様に問い合わせても教えて貰えないので、自前で変換することになってしまいました。

import re

def unescape(s):
    pattern = re.compile('(&#x([0-9a-fA-F]{4});)')
    return pattern.sub(lambda x: unichr(int(x.group(2), 16)), s)

それじゃ、その逆っていうことで、

def escape(s):
    return ''.join(("&#x" + hex(ord(c))[2:] + ";"  for c in s))

これで「ほげ」を変換すると、ほげとなります。でもASCIIコードまでエスケープされるのはいやだな〜、ということで、

def escape(s):
    pattern =  = re.compile(u'([\u00ff-\uffff])')
    return pattern.sub(lambda x: "&#x" + hex(ord(x.group(1)))[2:] + ";", s)

これで、「ほげtest」はきっと、u'ほげtest'となるはず。範囲が00ffからでいいかどうかと、maxがffffでいいかはどうかはあるんですがね。

でわでわ

2010年12月14日

MongoDBのメモリ使用量 もうちょっとまじめに計測

昨日の続きです。MapReduce使ったときにメモリをどれくらい使っているのかを、もうちょっとまじめに計測してみました。まずは、データを作るところ。

import pymongo
conn = pymongo.Connection()
db = conn.my_test
table = db.items

def insert():
    for i in range(1, 1000000):
        oid = table.save({"key": i, "value": 1000 % i})

です。割と小さめのデータを100万件放り込んでいます。これを

def map_reduce(max_key):
    # map reduce
    from pymongo.code import Code

    map_code = Code("""
function() {
  emit(this.key, this);
}
"""
)

    reduce_code = Code("""
function(key, values) {
  return {"count": values.length, title: values[0]["title"], url: values[0]["url"]};
}
""")
    result = table.map_reduce(map_code, reduce_code,
                              query = {"key": {"$lte": max_key}})
    return result

で10,100, 1000,10000, 100000, 250000, 500000, 750000, 1000000とmap reduceの対象になるレコードを増やしていって、map reduce処理後のMongoDBのメモリの使用量(RSSの値)をプロットしたものが、この図です。

250000のあたりが少し落ちているのは誤差でしょう。
左の方の10000ぐらいまではほとんど重なっていてわかりませんね。map reduceの処理には80Mぐらいのメモリが必要らしいです。
そこから先はほぼ、比例的にメモリの使用量が増えていきます。map reduceでqueryを制御しないと沢山メモリを食べてくれます。

メモリの消費量が多いのが悪いこととはいえないのですが、レンタルサーバでMongoDBがメモリを食いつぶして落っこちるのは悲しい限りです。レンタルサーバでは大体一時間に16000件のデータが登録されます。今回測定したコードだと20000件のデータでは大体、90Mか100Mぐらいのメモリ使用量でした。実際に動いているデータ構造とサンプルデータ(ちょっと小さめ)で実行すると、メモリの使用量が250Mぐらいでした。300Mぐらいであればすぐに消費してくれそうです。クエリの結果がほぼメモリにすべてのってmap reduceが実行されているのでしょう。

ということで、他のプロセスがメモリも消費することを考えると、メモリがちょっとばかしたりません。490円で動いていますが、やっぱり倍の値段を払わないとだめそうです。

MongoDBはメモリを沢山食います。RDBを使っておけば楽だったかもしれません。

でわでわ。

2010年12月13日

MongoDBのメモリの使用量

とあるところで、MongoDBを使って遊んでいます。一秒間に数個のレコードをインサートし続けています。一時間に一回、集計用に一時間分のデータをMongoDBのMap Reduceを使って処理しています。そのときのメモリの使用量(psで出力された結果のRSSの部分)を一分おきにとって、グラフ化したものがこのグラフです。

最初の小山はあんまり意味がないでしょう。200M byte付近から300M byteぐらいに使用量が急激に上がっているタイミングがMap Reduceで集計している時です。データをとるためにMongoDBを再起動したり、べつのことをちょっとだけしているので、1時間の完全なデータではないですが、50分ぐらいのデータにはなっていると思います。

グラフからインサートを延々と繰り返してもさほどメモリの使用量は増えていません。データの参照もそれほど複雑なことをしたり、負荷があるわけでもありません。

グラフの右の方でまたメモリの使用量が上がっています。大体40Mbyteぐらい増えています。これもMapReduceで集計しているタイミングです。Map Reduceは基本的に、集計対象とするデータ量に応じてメモリをかなり消費するようです。

悲しいことは、これを動かしてるサーバのメモリがとっても少ないので、メモリが足りなくなってしまうことです。

2010年12月9日

WebSocketとプロキシ

プロキシ配下でWebSocketを使おうとしても、通信ができないと思っていました。わざわざ80番ポートをわざわざ使うように変更したぐらいなので、なぜだろうと不思議でした。この記事によると、バグはあるとしても通るはずだと。WebSocketのプロトコルにアップグレードする前にGETのコンテンツ領域(?)でデータを送信し始めるのはHTTPの仕様上よくないし、それをハンドリングでいないプロキシはいるだろう、そのせいで通信ができないということでした。なるほどです。

さて、会社のプロキシもPOSTの時にContent-Lengthがなかったら、POSTされないとか変なことがあったので、GETでデータの垂れ流しなんてもってのほかです。というか、PROXY経由でSubversionをつかおうとしたとき、GET/POST/HEAD以外のメソッドが全然使えない設定になっていた過去があるぐらいです。こっそり直したのですが。

さて、WebSocketの通信は最初CONNECTをするらしいです。会社の環境だとまず、ここで失敗します。HTTPSじゃないとCONNECTしないようにしているみたい。

そんなわけで、プロキシを迂回する経路ができたのでした。

2010年12月6日

Pythonで文字コード判定

昔のトラウマのせいか、文字コードの自動判定はあまり信用していません。できることなら、あまりやりたくないものです。でも、今遊んでいる、じゃなかった作っているものでできれば文字コードを自動判定したくなることもあります。扱っているのはWebページでHTMLです。なので、大抵の場合はHTMLの中に文字コードが書いているので、おそらくそれは信用してもいいでしょう。
HTMLの中に文字コードが書いていない場合は、ちょっと困ります。HTTPヘッダーに文字コードが書かれていればそれを信用するかどうかが迷うところです。あんまり信用したくありません。でも、大きなところではYahoo!Japanのニュースのブログ/意見は文字コードはHTMLにかいていません。HTTPヘッダーにはちゃんと指定してあります。でも、これはYahoo!Japanだからかな。
で仕方なく文字コードの自動判定に落ち着く訳です。Pythonで文字コードの自動判定って何がいいのか、Googleさんに聞くと、Universal Encoding Detectorにしなさいとおっしゃいます。feedparser作っているとこですね。他のはpykfなど日本語に特化したものとかありましたが、日本語だけに特化しているのはちょっとさけたかったので、お告げに従いました。pip install chardetでインストールして、試してみます。手もとにあるちょっといやんなサイトのリストをまわしてみるとちゃんと判定してくれています。こんな感じ。
import chardet
import urllib2

my_urls = [...省略...]
for url in my_urls:
    res = urllib2.urlopen(url)
    print chardet.detect(res.read())

先ほどのYahoo!の場合だと{'confidence': 0.98999999999999999, 'encoding': 'EUC-JP'}と出力されます。確かにあってます。
IBMの文字コードの自動判定するライブラリと比べると、多分100%はあり得ないと思っているので、候補をリストで出してくれたり、フォールバック先を指定できたりするとうれしいかな。フォールバック先はconfidenceの値をみて、閾値以下だと自分でフォールバック先のエンコーディングを使用すればいいのかな?