ページ

2011年1月14日

初めてのRabbitMQその1

メッセージングキューがなぜか続きます。今回はSpringSourceに捨てられたプロダクト、RabbitMQです。昨日のZeroMQとは違うんです。ちゃんとした(?)AMQPなんです。AMQPの標準に準拠した高可用性、高スケーラビリティ、ポータブルなメッセージングシステムらしいです。ただ、このうたい文句の検証は大変なのでやりませんん。小人さん、お願いします。

僕の手元にはMacしかないので、Mac上で遊んでみます。まずは、RabbitMQのインストールはhomebrewから(MacPortsやめました)。brew install rabbitmqです。インストールが終わるまではお茶を飲みながらゆっくり待ちましょう。それから、Python用のモジュールをインストールします。amqpなのでamqplibです。pip install amqplibで終わりです、

それではrabbitmqを起動します。rabbitmq-serverで起動します。運用環境ではRabbitMQのユーザとパスワードを設定してguestでのログインはできなくするのがお作法らしいですが、テスト用なので余分な作業はしません。これでしばらく待てばRabbitMQが起動します。うさぎ年らしくていいですね。(意味不明)

まあ今回も、一斉配信ですね。amqplibのdemoのコードをスリムにしただけです。
import amqplib.client_0_8 as amqp

def callback(msg):
    for key, val in msg.properties.items():
        print '%s: %s' % (key, str(val))
    for key, val in msg.delivery_info.items():
        print '> %s: %s' % (key, str(val))

    print ''
    print msg.body
    print '-------'
    msg.channel.basic_ack(msg.delivery_tag)

    #
    # Cancel this callback
    #
    if msg.body == 'quit':
        msg.channel.basic_cancel(msg.consumer_tag)


def main():
    conn = amqp.Connection()
    ch = conn.channel()
    ch.access_request('/data', active=True, read=True)

    ch.exchange_declare('myfan', 'fanout', auto_delete=True)
    qname, _, _ = ch.queue_declare()
    ch.queue_bind(qname, 'myfan')
    ch.basic_consume(qname, callback=callback)

    #
    # Loop as long as the channel has callbacks registered
    #
    while ch.callbacks:
        ch.wait()

    ch.close()
    conn.close()

if __name__ == '__main__':
    main()

この人を起動して待ち受けます。送信側は

import sys
import amqplib.client_0_8 as amqp

def main():
    msg_body = ' '.join(sys.argv[1:])

    conn = amqp.Connection()
    ch = conn.channel()
    ch.access_request('/data', active=True, write=True)

    ch.exchange_declare('myfan', 'fanout', auto_delete=True)

    msg = amqp.Message(msg_body, content_type='text/plain', application_headers={'foo': 7, 'bar': 'baz'})

    ch.basic_publish(msg, 'myfan')

    ch.close()
    conn.close()

if __name__ == '__main__':
    main()

短いです。コマンドラインの引数に送信するメッセージを書いて起動します。
ZeroMQが相手ノードを指定して自分でルーティングルートを制御するのに対して、チャネルのキーに対して制御しているみたいです。
うーん、RabbitMQはZeroMQと違ってサーバが必要になりますが、コードは書きやすそう。少なくともノードの制御を気にいしなくていいのがいい。

amqp.Connectionで接続先のサーバを指定できますが(デフォルトはlocalhost)、サーバが落ちていたり、接続中にサーバが落ちたらどうやって別のサーバにつなぎ替えるんだろう?自分でやるのかな・・・

0 件のコメント: