在线观看不卡亚洲电影_亚洲妓女99综合网_91青青青亚洲娱乐在线观看_日韩无码高清综合久久

鍍金池/ 教程/ 大數(shù)據(jù)/ 工作隊(duì)列
介紹
路由(Routing)
為什么需要主題交換機(jī)?
發(fā)布/訂閱
工作隊(duì)列
遠(yuǎn)程過(guò)程調(diào)用(RPC)

工作隊(duì)列

(使用pika 0.9.5 Python客戶端)

http://wiki.jikexueyuan.com/project/rabbitmq/images/7.png" alt="" />

在第一篇教程中,我們已經(jīng)寫了一個(gè)從已知隊(duì)列中發(fā)送和獲取消息的程序。在這篇教程中,我們將創(chuàng)建一個(gè)工作隊(duì)列(Work Queue),它會(huì)發(fā)送一些耗時(shí)的任務(wù)給多個(gè)工作者(Worker)。

工作隊(duì)列(又稱:任務(wù)隊(duì)列——Task Queues)是為了避免等待一些占用大量資源、時(shí)間的操作。當(dāng)我們把任務(wù)(Task)當(dāng)作消息發(fā)送到隊(duì)列中,一個(gè)運(yùn)行在后臺(tái)的工作者(worker)進(jìn)程就會(huì)取出任務(wù)然后處理。當(dāng)你運(yùn)行多個(gè)工作者(workers),任務(wù)就會(huì)在它們之間共享。

這個(gè)概念在網(wǎng)絡(luò)應(yīng)用中是非常有用的,它可以在短暫的 HTTP 請(qǐng)求中處理一些復(fù)雜的任務(wù)。

準(zhǔn)備

之前的教程中,我們發(fā)送了一個(gè)包含 “Hello World!” 的字符串消息?,F(xiàn)在,我們將發(fā)送一些字符串,把這些字符串當(dāng)作復(fù)雜的任務(wù)。我們沒(méi)有真實(shí)的例子,例如圖片縮放、pdf 文件轉(zhuǎn)換。所以使用 time.sleep() 函數(shù)來(lái)模擬這種情況。我們?cè)谧址屑由宵c(diǎn)號(hào)(.)來(lái)表示任務(wù)的復(fù)雜程度,一個(gè)點(diǎn)(.)將會(huì)耗時(shí)1秒鐘。比如 "Hello..." 就會(huì)耗時(shí)3秒鐘。

我們對(duì)之前教程的 send.py 做些簡(jiǎn)單的調(diào)整,以便可以發(fā)送隨意的消息。這個(gè)程序會(huì)按照計(jì)劃發(fā)送任務(wù)到我們的工作隊(duì)列中。我們把它命名為 new_task.py:

import sys

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)
print " [x] Sent %r" % (message,)  

我們的舊腳本(receive.py)同樣需要做一些改動(dòng):它需要為消息體中每一個(gè)點(diǎn)號(hào)(.)模擬1秒鐘的操作。它會(huì)從隊(duì)列中獲取消息并執(zhí)行,我們把它命名為 worker.py:

import time

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"  

循環(huán)調(diào)度

使用工作隊(duì)列的一個(gè)好處就是它能夠并行的處理隊(duì)列。如果堆積了很多任務(wù),我們只需要添加更多的工作者(workers)就可以了,擴(kuò)展很簡(jiǎn)單。

首先,我們先同時(shí)運(yùn)行兩個(gè) worker.py 腳本,它們都會(huì)從隊(duì)列中獲取消息,到底是不是這樣呢?我們看看。

你需要打開(kāi)三個(gè)終端,兩個(gè)用來(lái)運(yùn)行 worker.py 腳本,這兩個(gè)終端就是我們的兩個(gè)消費(fèi)者(consumers)—— C1 和 C2。

shell1$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C  
shell2$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C  

第三個(gè)終端,我們用來(lái)發(fā)布新任務(wù)。你可以發(fā)送一些消息給消費(fèi)者(consumers):

shell3$ python new_task.py First message.
shell3$ python new_task.py Second message..
shell3$ python new_task.py Third message...
shell3$ python new_task.py Fourth message....
shell3$ python new_task.py Fifth message.....  

看看到底發(fā)送了什么給我們的工作者(workers):

shell1$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'  
shell2$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'  

默認(rèn)來(lái)說(shuō),RabbitMQ 會(huì)按順序得把消息發(fā)送給每個(gè)消費(fèi)者(consumer)。平均每個(gè)消費(fèi)者都會(huì)收到同等數(shù)量得消息。這種發(fā)送消息得方式叫做——輪詢(round-robin)。試著添加三個(gè)或更多得工作者(workers)。

消息確認(rèn)

當(dāng)處理一個(gè)比較耗時(shí)得任務(wù)的時(shí)候,你也許想知道消費(fèi)者(consumers)是否運(yùn)行到一半就掛掉。當(dāng)前的代碼中,當(dāng)消息被 RabbitMQ 發(fā)送給消費(fèi)者(consumers)之后,馬上就會(huì)在內(nèi)存中移除。這種情況,你只要把一個(gè)工作者(worker)停止,正在處理的消息就會(huì)丟失。同時(shí),所有發(fā)送到這個(gè)工作者的還沒(méi)有處理的消息都會(huì)丟失。

我們不想丟失任何任務(wù)消息。如果一個(gè)工作者(worker)掛掉了,我們希望任務(wù)會(huì)重新發(fā)送給其他的工作者(worker)。

為了防止消息丟失,RabbitMQ 提供了消息響應(yīng)(acknowledgments)。消費(fèi)者會(huì)通過(guò)一個(gè) ack(響應(yīng)),告訴 RabbitMQ 已經(jīng)收到并處理了某條消息,然后RabbitMQ 就會(huì)釋放并刪除這條消息。

如果消費(fèi)者(consumer)掛掉了,沒(méi)有發(fā)送響應(yīng),RabbitMQ 就會(huì)認(rèn)為消息沒(méi)有被完全處理,然后重新發(fā)送給其他消費(fèi)者(consumer)。這樣,及時(shí)工作者(workers)偶爾的掛掉,也不會(huì)丟失消息。

消息是沒(méi)有超時(shí)這個(gè)概念的;當(dāng)工作者與它斷開(kāi)連的時(shí)候,RabbitMQ 會(huì)重新發(fā)送消息。這樣在處理一個(gè)耗時(shí)非常長(zhǎng)的消息任務(wù)的時(shí)候就不會(huì)出問(wèn)題了。

消息響應(yīng)默認(rèn)是開(kāi)啟的。之前的例子中我們可以使用 no_ack=True 標(biāo)識(shí)把它關(guān)閉。是時(shí)候移除這個(gè)標(biāo)識(shí)了,當(dāng)工作者(worker)完成了任務(wù),就發(fā)送一個(gè)響應(yīng)。

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='hello')  

運(yùn)行上面的代碼,我們發(fā)現(xiàn)即使使用 CTRL+C 殺掉了一個(gè)工作者(worker)進(jìn)程,消息也不會(huì)丟失。當(dāng)工作者(worker)掛掉這后,所有沒(méi)有響應(yīng)的消息都會(huì)重新發(fā)送。

忘記確認(rèn)

一個(gè)很容易犯的錯(cuò)誤就是忘了 basic_ack,后果很嚴(yán)重。消息在你的程序退出之后就會(huì)重新發(fā)送,如果它不能夠釋放沒(méi)響應(yīng)的消息,RabbitMQ 就會(huì)占用越來(lái)越多的內(nèi)存。

為了排除這種錯(cuò)誤,你可以使用 rabbitmqctl 命令,輸出 messages_unacknowledged 字段:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

消息持久化

如果你沒(méi)有特意告訴 RabbitMQ,那么在它退出或者崩潰的時(shí)候,將會(huì)丟失所有隊(duì)列和消息。為了確保信息不會(huì)丟失,有兩個(gè)事情是需要注意的:我們必須把“隊(duì)列”和“消息”設(shè)為持久化。

首先,為了不讓隊(duì)列消失,需要把隊(duì)列聲明為持久化(durable):

channel.queue_declare(queue='hello', durable=True)  

盡管這行代碼本身是正確的,但是仍然不會(huì)正確運(yùn)行。因?yàn)槲覀円呀?jīng)定義過(guò)一個(gè)叫hello 的非持久化隊(duì)列。RabbitMq 不允許你使用不同的參數(shù)重新定義一個(gè)隊(duì)列,它會(huì)返回一個(gè)錯(cuò)誤。但我們現(xiàn)在使用一個(gè)快捷的解決方法——用不同的名字,例如task_queue。

channel.queue_declare(queue='task_queue', durable=True)  

這個(gè)queue_declare必須在生產(chǎn)者(producer)和消費(fèi)者(consumer)對(duì)應(yīng)的代碼中修改。

這時(shí)候,我們就可以確保在 RabbitMq 重啟之后 queue_declare 隊(duì)列不會(huì)丟失。另外,我們需要把我們的消息也要設(shè)為持久化——將 delivery_mode 的屬性設(shè)為2。

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))  

注意:消息持久化

將消息設(shè)為持久化并不能完全保證不會(huì)丟失。以上代碼只是告訴了 RabbitMq 要把消息存到硬盤,但從 RabbitMq 收到消息到保存之間還是有一個(gè)很小的間隔時(shí)間。因?yàn)?RabbitMq 并不是所有的消息都使用 fsync(2) ——它有可能只是保存到緩存中,并不一定會(huì)寫到硬盤中。并不能保證真正的持久化,但已經(jīng)足夠應(yīng)付我們的簡(jiǎn)單工作隊(duì)列。如果你一定要保證持久化,你需要改寫你的代碼來(lái)支持事務(wù)(transaction)。

公平調(diào)度

你應(yīng)該已經(jīng)發(fā)現(xiàn),它仍舊沒(méi)有按照我們期望的那樣進(jìn)行分發(fā)。比如有兩個(gè)工作者(workers),處理奇數(shù)消息的比較繁忙,處理偶數(shù)消息的比較輕松。然而RabbitMQ 并不知道這些,它仍然一如既往的派發(fā)消息。

這時(shí)因?yàn)?RabbitMQ 只管分發(fā)進(jìn)入隊(duì)列的消息,不會(huì)關(guān)心有多少消費(fèi)者(consumer)沒(méi)有作出響應(yīng)。它盲目的把第 n-th 條消息發(fā)給第 n-th 個(gè)消費(fèi)者。

http://wiki.jikexueyuan.com/project/rabbitmq/images/8.png" alt="" />

我們可以使用 basic.qos 方法,并設(shè)置 prefetch_count=1。這樣是告訴RabbitMQ,再同一時(shí)刻,不要發(fā)送超過(guò)1條消息給一個(gè)工作者(worker),直到它已經(jīng)處理了上一條消息并且作出了響應(yīng)。這樣,RabbitMQ 就會(huì)把消息分發(fā)給下一個(gè)空閑的工作者(worker)。

channel.basic_qos(prefetch_count=1)  

關(guān)于隊(duì)列大小

如果所有的工作者都處理繁忙狀態(tài),你的隊(duì)列就會(huì)被填滿。你需要留意這個(gè)問(wèn)題,要么添加更多的工作者(workers),要么使用其他策略。

整合代碼

new_task.py 的完整代碼:

\#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print " [x] Sent %r" % (message,)
connection.close()  

(new_task.py源碼)

我們的worker:

\#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()  

(worker.py source)

使用消息響應(yīng)和 prefetch_count 你就可以搭建起一個(gè)工作隊(duì)列了。這些持久化的選項(xiàng)使得在 RabbitMQ 重啟之后仍然能夠恢復(fù)。

現(xiàn)在我們可以移步教程3學(xué)習(xí)如何發(fā)送相同的消息給多個(gè)消費(fèi)者(consumers)。