世界的过客

每个人获得的东西都恰好是他值得获得的东西

MQ-RabbitMQ

从一个简单的例子开始。

首先是生产者

import sys
import pika

credentials = pika.PlainCredentials("guest", "guest")
conn_params = pika.ConnectionsParameters("localhost",  
    credentials= credentials)
conn_broker = pika.BlockingConnection(conn_params) # 到此为止,建立了到代理服务器的链接

channel = conn_broker.channel() # 获取信道
channel.exchange_declare(exchange="hello-exchange", 
         type="direct", passive=False, durable=True, 
         auto_delete=False) # 声明交换器

msg = sys.argv[1]
msg_props = pika.BasicProperties()
msg_props.content_type= text/plain". # 创建纯文本消息

channel.basic_publish(body=msg, exchange="hello-exchange", 
                     properties=msg_props,
                     routing_key="hola") # 发布消息
conn_broker.close()

消费者:

import sys
import pika

credentials = pika.PlainCredentials("guest", "guest")
conn_params = pika.ConnectionsParameters("localhost",
                   credentials= credentials)
conn_broker = pika.BlockingConnection(conn_params) # 到此为止,建立了到代理服务器的链接

channel = conn_broker.channel() # 获取信道
channel.exchange_declare(exchange="hello-exchange", 
               type="direct", passive=False, durable=True, 
               auto_delete=False) # 声明交换器

channel.queue_declare(queue="hello-queue") # 声明队列
channel.queue_bind(queue="hello-queue", exchange="hello-exchange",
               routing_key="hola") # 通过routing_key将队列和交换器绑定起来

def msg_consumer(channel, method, header, body):
    channel.basic_ack(delivery_tag=method.delivery_tag)
    if body.decode("utf8") == "quit": # 例子来源于rabbitmq实战,但是body在 python3中是byte类型
        channel.basic_cancel(consumer_tag="hello-consumer")
        channel.stop_consuming()
    else:
        print(body.decode("utf8"))
channel.basic_consume(queue=queue_name, 
        on_message_callback= msg_consumer, auto_ack=False)
channel.start_consuming()

生产者和消费者

生产者创建消息=》发布到代理服务器(RabbitMQ)

消息

消息分为: payload和label

payload是你要传输的内容,label描述了有效载荷.

AMQP用标签表述消息, rabbitmq根据标签把消息发送给感兴趣的接受放. 其方式是fire-and-forget

接受者得到payload, 标签在过程中不会发送

持久化消息

delivery mode 设置为2, 但是必须是持久化的交换器和持久化的队列, 三者加在一起才能真正具有持久化.

信道(channel)

信道事建立在“真实” TCP链接内的虚拟链接, 信道复用TCP链接,可以有无数个信道,但是会共用一个TCP链接.

AMQP三要素

交换器(exchange)

生产者把消息发布到交换器上

交换器根据routing-key来决定消息投递到哪个队列

队列通过routing-key绑定到交换器.

交换器类型: topic, fanout, direct, headers

direct: 如果路由键匹配,就投递到对应的队列

fanout: 广播到所有绑定到该交换器的队列上

topic: 多个来源的消息到同一个队列上

持久化

durable设置为True

队列(queue)

消息到达队列,并被消费者接受

创建队列

queue.declare, 如果消费者在一条信道上订阅了某个队列,无法再声明队列了, 必须先取消订阅, 将 信道设置为“传输模式”.

queue.declare有两个参数:

  • exclusive, 为true, 队列私有, 只能创建的使用,一个队列一个消费者

  • auto-delete , 最后一个消费者取消订阅后, 队列自动移除.

交换器是topic情况下:queue.bind_queue通过指定队列名, 交换器名, 和绑定键名, 将队列绑定到交换器.

持久化

durable设置为True

绑定(bind_key)

绑定key决定了交换器将符合绑定key规则的消息都发送到这个队列里.

vhost虚拟主机

虚拟主机实现了自身的交换器,队列,和绑定, 并且带有权限控制

vhost只能通过命令行来创建删除

创建

# rabbitmqctl add_vhost[vhost_name]

删除

# rabbitmqctl delete_vhost[vhost_name]

显示

# rabbitmqctl list_vhosts

多个消费者订阅到同一队列,如何分发消息

rabbitmq拥有多个消费者的时候, 队列收到的消息将以(round-robin)的方式发送给消费者, 每个消息只会发送给一个消费者.

ACK机制

消费者确认消息会basic.ack显式发送确认.或者auto-ack为true. 如果没有ack,链接断开,那么该消息会被发给下一个 消费者, 确保消息被正确处理.

如果没有返回ack,那么rabbitmq不会发送下一条消息给消费者.

Reject机制

如果消费者容易出问题,但是消息又不想删除, 两种做法,第一断开链接,第二 basic.reject(版本大于2.0).

如果reject设置了requeue参数为true, 该消息会被发给下一个消费者.