本篇文章為大家展示了RabbitMQ延遲隊列怎么利用Python實現,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
創新互聯基于成都重慶香港及美國等地區分布式IDC機房數據中心構建的電信大帶寬,聯通大帶寬,移動大帶寬,多線BGP大帶寬租用,是為眾多客戶提供專業服務器托管報價,主機托管價格性價比高,為金融證券行業成都天府聯通服務器托管,ai人工智能服務器托管提供bgp線路100M獨享,G口帶寬及機柜租用的專業成都idc公司。延遲隊列的基礎原理Time To Live(TTL)
RabbitMQ可以針對Queue設置x-expires 或者 針對Message設置 x-message-ttl,來控制消息的生存時間,如果超時(兩者同時設置以最先到期的時間為準),則消息變為dead letter(死信)
RabbitMQ消息的過期時間有兩種方法設置。
通過隊列(Queue)的屬性設置,隊列中所有的消息都有相同的過期時間。(本次延遲隊列采用的方案)對消息單獨設置,每條消息TTL可以不同。
如果同時使用,則消息的過期時間以兩者之間TTL較小的那個數值為準。消息在隊列的生存時間一旦超過設置的TTL值,就成為死信(dead letter)
Dead Letter Exchanges(DLX)
RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個參數,如果隊列內出現了dead letter,則按照這兩個參數重新路由轉發到指定的隊列。
x-dead-letter-exchange:出現死信(dead letter)之后將dead letter重新發送到指定exchange
x-dead-letter-routing-key:出現死信(dead letter)之后將dead letter重新按照指定的routing-key發送
隊列中出現死信(dead letter)的情況有:
消息或者隊列的TTL過期。(延遲隊列利用的特性)
隊列達到較大長度
消息被消費端拒絕(basic.reject or basic.nack)并且requeue=false
綜合上面兩個特性,將隊列設置TTL規則,隊列TTL過期后消息會變成死信,然后利用DLX特性將其轉發到另外的交換機和隊列就可以被重新消費,達到延遲消費效果。
延遲隊列設計及實現(Python)
從上面描述,延遲隊列的實現大致分為兩步:
產生死信,有兩種方式Per-Message TTL和 Queue TTL,因為我的需求中是所有的消息延遲處理時間相同,所以本實現中采用 Queue TTL設置隊列的TTL,如果需要將隊列中的消息設置不同的延遲處理時間,則設置Per-Message TTL(官方文檔)
設置死信的轉發規則,Dead Letter Exchanges設置方法(官方文檔)
完整代碼如下:
""" Created on Fri Aug 3 17:00:44 2018 @author: Bge """ import pika,json,logging class RabbitMQClient: def __init__(self, conn_str='amqp://user:pwd@host:port/%2F'): self.exchange_type = "direct" self.connection_string = conn_str self.connection = pika.BlockingConnection(pika.URLParameters(self.connection_string)) self.channel = self.connection.channel() self._declare_retry_queue() #RetryQueue and RetryExchange logging.debug("connection established") def close_connection(self): self.connection.close() logging.debug("connection closed") def declare_exchange(self, exchange): self.channel.exchange_declare(exchange=exchange, exchange_type=self.exchange_type, durable=True) def declare_queue(self, queue): self.channel.queue_declare(queue=queue, durable=True,) def declare_delay_queue(self, queue,DLX='RetryExchange',TTL=60000): """ 創建延遲隊列 :param TTL: ttl的單位是us,ttl=60000 表示 60s :param queue: :param DLX:死信轉發的exchange :return: """ arguments={} if DLX: #設置死信轉發的exchange arguments[ 'x-dead-letter-exchange']=DLX if TTL: arguments['x-message-ttl']=TTL print(arguments) self.channel.queue_declare(queue=queue, durable=True, arguments=arguments) def _declare_retry_queue(self): """ 創建異常交換器和隊列,用于存放沒有正常處理的消息。 :return: """ self.channel.exchange_declare(exchange='RetryExchange', exchange_type='fanout', durable=True) self.channel.queue_declare(queue='RetryQueue', durable=True) self.channel.queue_bind('RetryQueue', 'RetryExchange','RetryQueue') def publish_message(self,routing_key, msg,exchange='',delay=0,TTL=None): """ 發送消息到指定的交換器 :param exchange: RabbitMQ交換器 :param msg: 消息實體,是一個序列化的JSON字符串 :return: """ if delay==0: self.declare_queue(routing_key) else: self.declare_delay_queue(routing_key,TTL=TTL) if exchange!='': self.declare_exchange(exchange) self.channel.basic_publish(exchange=exchange, routing_key=routing_key, body=msg, properties=pika.BasicProperties( delivery_mode=2, type=exchange )) self.close_connection() print("message send out to %s" % exchange) logging.debug("message send out to %s" % exchange) def start_consume(self,callback,queue='#',delay=1): """ 啟動消費者,開始消費RabbitMQ中的消息 :return: """ if delay==1: queue='RetryQueue' else: self.declare_queue(queue) self.channel.basic_qos(prefetch_count=1) try: self.channel.basic_consume( # 消費消息 callback, # 如果收到消息,就調用callback函數來處理消息 queue=queue, # 你要從那個隊列里收消息 ) self.channel.start_consuming() except KeyboardInterrupt: self.stop_consuming() def stop_consuming(self): self.channel.stop_consuming() self.close_connection() def message_handle_successfully(channel, method): """ 如果消息處理正常完成,必須調用此方法, 否則RabbitMQ會認為消息處理不成功,重新將消息放回待執行隊列中 :param channel: 回調函數的channel參數 :param method: 回調函數的method參數 :return: """ channel.basic_ack(delivery_tag=method.delivery_tag) def message_handle_failed(channel, method): """ 如果消息處理失敗,應該調用此方法,會自動將消息放入異常隊列 :param channel: 回調函數的channel參數 :param method: 回調函數的method參數 :return: """ channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
發布消息代碼如下:
from MQ.RabbitMQ import RabbitMQClient print("start program") client = RabbitMQClient() msg1 = '{"key":"value"}' client.publish_message('test-delay',msg1,delay=1,TTL=10000) print("message send out")
消費者代碼如下:
from MQ.RabbitMQ import RabbitMQClient import json print("start program") client = RabbitMQClient() def callback(ch, method, properties, body): msg = body.decode() print(msg) # 如果處理成功,則調用此消息回復ack,表示消息成功處理完成。 RabbitMQClient.message_handle_successfully(ch, method) queue_name = "RetryQueue" client.start_consume(callback,queue_name,delay=0)
上述內容就是RabbitMQ延遲隊列怎么利用Python實現,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注創新互聯行業資訊頻道。
網站欄目:RabbitMQ延遲隊列怎么利用Python實現-創新互聯
文章分享:http://m.newbst.com/article14/dpecge.html
成都網站建設公司_創新互聯,為您提供品牌網站建設、Google、網站營銷、虛擬主機、做網站、網站收錄
聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯