RabbitMQ 是一個(gè)消息代理。它的核心原理非常簡單:接收和發(fā)送消息。你可以把它想像成一個(gè)郵局:你把信件放入郵箱,郵遞員就會(huì)把信件投遞到你的收件人處。在這個(gè)比喻中,RabbitMQ 就扮演著郵箱、郵局以及郵遞員的角色。
RabbitMQ 和郵局的主要區(qū)別是,它不是用來處理紙張的,它是用來接收、存儲(chǔ)和發(fā)送消息(message)這種二進(jìn)制數(shù)據(jù)的。
一般提到 RabbitMQ 和消息,都會(huì)用到一些專有名詞。
http://wiki.jikexueyuan.com/project/rabbitmq/images/1.png" alt="" />
http://wiki.jikexueyuan.com/project/rabbitmq/images/2.png" alt="" />
需要指出的是生產(chǎn)者、消費(fèi)者、代理需不要待在同一個(gè)設(shè)備上;事實(shí)上大多數(shù)應(yīng)用也確實(shí)不在會(huì)將他們放在一臺(tái)機(jī)器上。
(使用pika 0.9.5 Python客戶端)
我們的 “Hello world” 不會(huì)很復(fù)雜——僅僅發(fā)送一個(gè)消息,然后獲取它并輸出到屏幕。這樣以來我們需要兩個(gè)程序,一個(gè)用作發(fā)送消息,另一個(gè)接受消息并打印消息內(nèi)容
我們的大致的設(shè)計(jì)是這樣的:
http://wiki.jikexueyuan.com/project/rabbitmq/images/3.png" alt="" />
生產(chǎn)者(producer)把消息發(fā)送到一個(gè)名為 “hello” 的隊(duì)列中。消費(fèi)者(consumer)從這個(gè)隊(duì)列中獲取消息。
RabbitMQ 使用的是 AMQP 協(xié)議。要使用她你就必須需要一個(gè)使用同樣協(xié)議的庫。幾乎所有的編程語言都有可選擇的庫。python 也是一樣,可以從以下幾個(gè)庫中選擇:
在這一系列教程中,我們打算使用 pika。要安裝 pika,你可以使用 pip 這個(gè)包管理工具:
$ sudo pip install pika==0.9.5
安裝過程依賴于 pip 和 git-core 兩個(gè)包,你需要先安裝它們。
easy_install pip
pip install pika==0.9.5
http://wiki.jikexueyuan.com/project/rabbitmq/images/5.png" alt="" />
我們第一個(gè)程序 send.py 會(huì)發(fā)送一個(gè)消息到隊(duì)列中。首先要做的事情就是建立一個(gè)到 RabbitMQ 服務(wù)器的連接。
\#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
現(xiàn)在我們已經(jīng)連接上服務(wù)器了,那么,在發(fā)送消息之前我們需要確認(rèn)隊(duì)列是存在的。如果我們把消息發(fā)送到一個(gè)不存在的隊(duì)列,RabbitMQ 會(huì)丟棄這條消息。我門先創(chuàng)建一個(gè)名為 hello 的隊(duì)列,然后把消息發(fā)送到這個(gè)隊(duì)列中。
channel.queue_declare(queue='hello')
這時(shí)候我們就可以發(fā)送消息了,我們第一條消息只包含了 Hello World! 字符串,我們打算把它發(fā)送到我們的 hello 隊(duì)列。
在 RabbitMQ 中,消息是不能直接發(fā)送到隊(duì)列,它需要發(fā)送到交換機(jī)(exchange)中。我們不打算在這里深入討論它——你可以通過教程的第三部分了解更多。現(xiàn)在我們所需要了解的是如何使用默認(rèn)的交換機(jī)(exchange),它使用一個(gè)空字符串來標(biāo)識(shí)。交換機(jī)允許我們指定某條消息需要投遞到哪個(gè)隊(duì)列,routing_key 參數(shù)必須指定為隊(duì)列的名稱:
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print " [x] Sent 'Hello World!'"
在退出程序之前,我們需要確認(rèn)網(wǎng)絡(luò)緩沖已經(jīng)被刷寫、消息已經(jīng)投遞到 RabbitMQ。完成這些事情(正確的關(guān)閉連接)是很簡單的。
connection.close()
發(fā)送不成功!
如果這是你第一次使用 RabbitMQ,并且沒有看到 “Sent” 消息出現(xiàn)在屏幕上,你可能會(huì)抓耳撓腮不知所以。這也許是因?yàn)闆]有足夠的磁盤空間給代理使用所造成的(代理默認(rèn)需要 1Gb 的空閑空間),所以它才會(huì)拒絕接收消息。查看一下代理的日志確定并且減少必要的限制。配置文件文檔會(huì)告訴你如何更改磁盤空間限制(disk_free_limit)。
http://wiki.jikexueyuan.com/project/rabbitmq/images/6.png" alt="" />
我們的第二個(gè)程序 receive.py,將會(huì)從隊(duì)列中獲取消息并打印消息。
這次我們還是先要連接到 RabbitMQ 服務(wù)器。連接服務(wù)器的代碼和之前是一樣的。
下一步也和之前一樣,我們需要確認(rèn)隊(duì)列是存在的。使用 queue_declare 創(chuàng)建一個(gè)隊(duì)列——我們可以運(yùn)行這個(gè)命令很多次,但是只有一個(gè)隊(duì)列會(huì)被創(chuàng)建。
channel.queue_declare(queue='hello')
你也許要問: 為什么要重復(fù)聲明隊(duì)列呢 —— 我們已經(jīng)在前面的代碼中聲明過它了。如果我們確定了隊(duì)列是已經(jīng)存在的,那么我們可以不這么做,比如此前預(yù)先運(yùn)行了send.py 程序??墒俏覀儾⒉淮_定哪個(gè)程序會(huì)首先運(yùn)行。這種情況下,在程序中重復(fù)將隊(duì)列重復(fù)聲明一下是種值得推薦的做法。
你也許希望查看 RabbitMQ 中有哪些隊(duì)列、有多少消息在隊(duì)列中。此時(shí)你可以使用rabbitmqctl 工具(使用有權(quán)限的用戶):
bash
$ sudo rabbitmqctl list_queues
Listing queues ...
hello 0
...done.
(在 Windows 中不需要 sudo 命令)
從隊(duì)列中獲取消息相對(duì)來說稍顯復(fù)雜。需要為隊(duì)列定義一個(gè)回調(diào)(callback)函數(shù)。當(dāng)我們獲取到消息的時(shí)候,Pika 庫就會(huì)調(diào)用此回調(diào)函數(shù)。這個(gè)回調(diào)函數(shù)會(huì)將接收到的消息內(nèi)容輸出到屏幕上。
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
下一步,我們需要告訴 RabbitMQ 這個(gè)回調(diào)函數(shù)將會(huì)從名為 "hello" 的隊(duì)列中接收消息:
channel.basic_consume(callback,
queue='hello',
no_ack=True)
要成功運(yùn)行這些命令,我們必須保證隊(duì)列是存在的,我們的確可以確保它的存在——因?yàn)槲覀冎耙呀?jīng)使用 queue_declare 將其聲明過了。
no_ack 參數(shù)稍后會(huì)進(jìn)行介紹。
最后,我們輸入一個(gè)用來等待消息數(shù)據(jù)并且在需要的時(shí)候運(yùn)行回調(diào)函數(shù)的無限循環(huán)。
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()
send.py 的全部代碼:
\#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()
(send.py 源碼)
receive.py的全部代碼:
\#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
channel.basic_consume(callback,
queue='hello',
no_ack=True)
channel.start_consuming()
(receive.py source)
現(xiàn)在就可以在終端中運(yùn)行我們的程序了。首先,用 send.py 重續(xù)發(fā)送一條消息:
$ python send.py
[x] Sent 'Hello World!'
生產(chǎn)者(producer)程序 send.py 每次運(yùn)行之后就會(huì)停止?,F(xiàn)在我們就來接收消息:
$ python receive.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Hello World!'
成功了!我們已經(jīng)通過 RabbitMQ 發(fā)送第一條消息。你也許已經(jīng)注意到了,receive.py 程序并沒有退出。它一直在準(zhǔn)備獲取消息,你可以通過 Ctrl-C 來中止它。
試下在新的終端中再次運(yùn)行 send.py。
我們已經(jīng)學(xué)會(huì)如何發(fā)送消息到一個(gè)已知隊(duì)列中并接收消息。是時(shí)候移步到第二部分了,我們將會(huì)建立一個(gè)簡單的工作隊(duì)列(work queue)。