1. 从命名队列发送和接收消息
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) # 连接本地服务
channel = connection.channel() # 打开通道
channel.queue_declare(queue='hello') # 定义名为 hello 的队列
for i in range(10):
# 消息发送并不会直接发送队列,而是需要 exchange,routing_key 指定队列名称,body 为消息体
channel.basic_publish(exchange='', routing_key='hello', body='Hello World %d !' % i)
time.sleep(2)
print('send: Hello World!')
connection.close() # 关闭连接
接收程序:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello') # 如果 hello 队列不存在则创建,存在则不创建,多次运行也只会存在一个 hello 队列
def callback(ch, method, properties, body):
# 收到消息的回调函数
print('received %r' % body)
channel.basic_consume(callback, queue='hello', no_ack=True)
print('waitting for message...')
channel.start_consuming() # 开始等待消息
可以运行 1 个发送程序,3 个接收程序看效果。
2. 工作队列
1)消息接收确认
上面使用了 no_ack=True,消息发送后会立即删除。如果该消息任务是一个耗时比较长的,那么出现故障将会造成消息丢失。
增加消息确认:
ch.basic_ack(delivery_tag = method.delivery_tag)
未得到确认的消息会被再次发送。如果忘记使用消息确认,消息将长久存在内存中,不会释放。
2)消息持久化
若 rabbitmq 服务宕掉,那么在运行的消息会丢失。
配置持久化:
channel.queue_declare(queue='task_queue', durable=True)
消息持久化:
channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2 # 持久化
))
3)公平分配
rabbitmq 默认是公平分配的,但有的认为是耗时耗资源的,这会造成一个工作任务很沉重,而另一个很轻松。
channel.basic_qos(prefetch_count=1)
此时,每个工作任务只处理一个任务,直到完成才会处理下一个新任务。(如果所有工作任务都是耗时的,那么队列可能会被填满。)
官方示例:
# new_task.py
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
print(" [x] Sent %r" % message)
connection.close()
worker.py
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()