www.zhblog.net

Pika - Python RabbitMQ

1. 从命名队列发送和接收消息

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) # 连接本地服务
channel = connection.channel() # 打开通道
channel.queue_declare(queue='hello') # 定义名为 hello 的队列
for i in range(10):
    # 消息发送并不会直接发送队列,而是需要 exchange,routing_key 指定队列名称,body 为消息体
    channel.basic_publish(exchange='', routing_key='hello', body='Hello World %d !' % i)
    time.sleep(2)
print('send: Hello World!')
connection.close() # 关闭连接

接收程序:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello') # 如果 hello 队列不存在则创建,存在则不创建,多次运行也只会存在一个 hello 队列


def callback(ch, method, properties, body):
    # 收到消息的回调函数
    print('received %r' % body)


channel.basic_consume(callback, queue='hello', no_ack=True)

print('waitting for message...')

channel.start_consuming() # 开始等待消息


可以运行 1 个发送程序,3 个接收程序看效果。


2. 工作队列

1)消息接收确认

上面使用了 no_ack=True,消息发送后会立即删除。如果该消息任务是一个耗时比较长的,那么出现故障将会造成消息丢失。

增加消息确认:

ch.basic_ack(delivery_tag = method.delivery_tag)

未得到确认的消息会被再次发送。如果忘记使用消息确认,消息将长久存在内存中,不会释放。


2)消息持久化

若 rabbitmq 服务宕掉,那么在运行的消息会丢失。

配置持久化:

channel.queue_declare(queue='task_queue', durable=True)

消息持久化:

channel.basic_publish(exchange='', 
                      routing_key="task_queue", 
                      body=message, 
                      properties=pika.BasicProperties(
                         delivery_mode = 2 # 持久化
                      ))


3)公平分配

rabbitmq 默认是公平分配的,但有的认为是耗时耗资源的,这会造成一个工作任务很沉重,而另一个很轻松。

channel.basic_qos(prefetch_count=1)

此时,每个工作任务只处理一个任务,直到完成才会处理下一个新任务。(如果所有工作任务都是耗时的,那么队列可能会被填满。)


官方示例:

# new_task.py

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print(" [x] Sent %r" % message)
connection.close()

worker.py

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

展开阅读全文

评论

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 心情