(使用Python 客戶端 —— pika 0.9.8)
上一篇教程里,我們改進(jìn)了我們的日志系統(tǒng)。我們使用直連交換機(jī)替代了扇型交換機(jī),從只能盲目的廣播消息改進(jìn)為有可能選擇性的接收日志。
盡管直連交換機(jī)能夠改善我們的系統(tǒng),但是它也有它的限制 —— 沒(méi)辦法基于多個(gè)標(biāo)準(zhǔn)執(zhí)行路由操作。
在我們的日志系統(tǒng)中,我們不只希望訂閱基于嚴(yán)重程度的日志,同時(shí)還希望訂閱基于發(fā)送來(lái)源的日志。Unix 工具 syslog 就是同時(shí)基于嚴(yán)重程度 -severity (info/warn/crit...) 和 設(shè)備 -facility (auth/cron/kern...) 來(lái)路由日志的。
如果這樣的話,將會(huì)給予我們非常大的靈活性,我們既可以監(jiān)聽(tīng)來(lái)源于 “cron” 的嚴(yán)重程度為 “critical errors” 的日志,也可以監(jiān)聽(tīng)來(lái)源于 “kern” 的所有日志。
為了實(shí)現(xiàn)這個(gè)目的,接下來(lái)我們學(xué)習(xí)如何使用另一種更復(fù)雜的交換機(jī) —— 主題交換機(jī)。
發(fā)送到主題交換機(jī)(topic exchange)的消息不可以攜帶隨意什么樣子的路由鍵(routing_key),它的路由鍵必須是一個(gè)由.分隔開(kāi)的詞語(yǔ)列表。這些單詞隨便是什么都可以,但是最好是跟攜帶它們的消息有關(guān)系的詞匯。以下是幾個(gè)推薦的例子:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。詞語(yǔ)的個(gè)數(shù)可以隨意,但是不要超過(guò) 255 字節(jié)。
綁定鍵也必須擁有同樣的格式。主題交換機(jī)背后的邏輯跟直連交換機(jī)很相似 —— 一個(gè)攜帶著特定路由鍵的消息會(huì)被主題交換機(jī)投遞給綁定鍵與之想匹配的隊(duì)列。但是它的綁定鍵和路由鍵有兩個(gè)特殊應(yīng)用方式:
下邊用圖說(shuō)明:
http://wiki.jikexueyuan.com/project/rabbitmq/images/15.png" alt="" />
這個(gè)例子里,我們發(fā)送的所有消息都是用來(lái)描述小動(dòng)物的。發(fā)送的消息所攜帶的路由鍵是由三個(gè)單詞所組成的,這三個(gè)單詞被兩個(gè).分割開(kāi)。路由鍵里的第一個(gè)單詞描述的是動(dòng)物的手腳的利索程度,第二個(gè)單詞是動(dòng)物的顏色,第三個(gè)是動(dòng)物的種類。所以它看起來(lái)是這樣的:
我們創(chuàng)建了三個(gè)綁定:Q1 的綁定鍵為 .orange.,Q2 的綁定鍵為 ..rabbit 和 lazy.# 。
這三個(gè)綁定鍵被可以總結(jié)為:
一個(gè)攜帶有 quick.orange.rabbit 的消息將會(huì)被分別投遞給這兩個(gè)隊(duì)列。攜帶著 lazy.orange.elephant 的消息同樣也會(huì)給兩個(gè)隊(duì)列都投遞過(guò)去。另一方面攜帶有 quick.orange.fox 的消息會(huì)投遞給第一個(gè)隊(duì)列,攜帶有 lazy.brown.fox 的消息會(huì)投遞給第二個(gè)隊(duì)列。攜帶有 lazy.pink.rabbit 的消息只會(huì)被投遞給第二個(gè)隊(duì)列一次,即使它同時(shí)匹配第二個(gè)隊(duì)列的兩個(gè)綁定。攜帶著 quick.brown.fox 的消息不會(huì)投遞給任何一個(gè)隊(duì)列。
如果我們違反約定,發(fā)送了一個(gè)攜帶有一個(gè)單詞或者四個(gè)單詞("orange" or "quick.orange.male.rabbit")的消息時(shí),發(fā)送的消息不會(huì)投遞給任何一個(gè)隊(duì)列,而且會(huì)丟失掉。
但是另一方面,即使 "lazy.orange.male.rabbit" 有四個(gè)單詞,他還是會(huì)匹配最后一個(gè)綁定,并且被投遞到第二個(gè)隊(duì)列中。
主題交換機(jī)是很強(qiáng)大的,它可以表現(xiàn)出跟其他交換機(jī)類似的行為
當(dāng)一個(gè)隊(duì)列的綁定鍵為 "#"(井號(hào)) 的時(shí)候,這個(gè)隊(duì)列將會(huì)無(wú)視消息的路由鍵,接收所有的消息。
當(dāng) * (星號(hào)) 和 # (井號(hào)) 這兩個(gè)特殊字符都未在綁定鍵中出現(xiàn)的時(shí)候,此時(shí)主題交換機(jī)就擁有的直連交換機(jī)的行為。
接下來(lái)我們會(huì)將主題交換機(jī)應(yīng)用到我們的日志系統(tǒng)中。在開(kāi)始工作前,我們假設(shè)日志的路由鍵由兩個(gè)單詞組成,路由鍵看起來(lái)是這樣的:
代碼跟上一篇教程差不多。
emit_log_topic.py 的代碼:
\#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print " [x] Sent %r:%r" % (routing_key, message)
connection.close()
receive_logs_topic.py 的代碼:
\#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r:%r" % (method.routing_key, body,)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
執(zhí)行下邊命令 接收所有日志:
python receive_logs_topic.py "#"
執(zhí)行下邊命令 接收來(lái)自 ”kern“ 設(shè)備的日志:
python receive_logs_topic.py "kern.*"
執(zhí)行下邊命令 只接收嚴(yán)重程度為 ”critical“ 的日志:
python receive_logs_topic.py "*.critical"
執(zhí)行下邊命令 建立多個(gè)綁定:
python receive_logs_topic.py "kern.*" "*.critical"
執(zhí)行下邊命令 發(fā)送路由鍵為 "kern.critical" 的日志:
python emit_log_topic.py "kern.critical" "A critical kernel error"
執(zhí)行上邊命令試試看效果吧。另外,上邊代碼不會(huì)對(duì)路由鍵和綁定鍵做任何假設(shè),所以你可以在命令中使用超過(guò)兩個(gè)路由鍵參數(shù)。
(完整代碼參見(jiàn) emit_logs_topic.py and receive_logs_topic.py)