RabbitMQ使用参考 YS
2020-02-27 191浏览
- 1.RabbitMQ 使用参考 邹业盛 2016-01-10 16:55 更新 1. 安装 从网站http://www.rabbitmq.com/install-generic-unix.html下载到二进制 源码, 进入 sbin 目录, 直接运行 server 即可. 默认服务监听在 5672 端口上(带上 SSL 默认在 5671 上). 2. 基本概念 RabbitMQ , 是一个使用 erlang 编写的 AMQP (高级消息队列协议) 的服务实现. 简单来说, 就是一个功能强大的消息队列服务. 通常我们谈到队列服务, 会有三个概念, 发消息者 , 队列 , 收消息者 . ( 消息 本来也应 该算是一个独立的概念, 但是简单处理之下, 它可能并没有太多的内涵) 流程上是, 发消息者 把消息放到 队列 中去, 然后 收消息者 从 队列 中取出消息. RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在 发消息者 和 队列 之间, 加入了 交换器 (Exchange) . 这样 发消息者 和 队列 就没有直接联系, 转而变成 发消息者 把 消息给 交换器 , 交换器 根据调度策略再把消息再给 队列 . 当然, 多一层抽象会增加复杂度, 但是同时, 功能上也更灵活. 事实上, 很多时候面对具体 场景时, 在这种"四段式"的结构下, 你可选择的方案不止一种的. 不过也不必过于担心, 在一些自我规定的"原则"之下, "正确"的方案也不会那么纠结. 总结一下 4 + 1 个概念, 或者说, 五种角色: Producing , 生产者, 产生消息的角色. Exchange , 交换器, 在得到生产者的消息后, 把消息扔到队列的角色. Queue , 队列, 消息暂时呆的地方. Consuming , 消费者, 把消息从队列中取出的角色.
- 2.消息 Message , RabbitMQ 中的消息有自己的一系列属性, 某些属性对信息流有直接 影响. 在使用过程中, 我们通常还会关注如下的机制: 持久化 , 服务重启时, 是否能恢复队列中的数据. 调度策略 , 交换器如何把消息给到哪些队列, 是每个队列给一条, 或者把一条消息给多个 队列. 分配策略 , 队列面对消费者时, 如何把消息吐出去, 来一个消费者就把消息全给它, 还是 只给一条. 状态反馈 , 当消息从某一个队列中被提出后, 这个消息的生命周期就此结束, 还是说需要 一个具体的信号以明确标识消息已被正确处理. 上面这些内容, 初看之下好像情况有些复杂了, 不过在具体使用过程中, 这些东西都是很 自然地需要考虑的. 当一套服务跑起来之后, 这些细枝末节自然消失在无形之中. 3. 基本形式 当服务启在 5672 端口之后, 我们就可以开始使用 RabbitMQ 了. 根据前面的内容, 我们需要站在两个角度(消息的提供方, 和消息的使用方), 去分别考虑 五种角色的情况. 当然, 在使用时其实只是两个角度, 每边四种角色的情况. 因为消息的 提供方不关心使用方, 反之, 消息的使用方也不关心消息的提供方. 这种关系上的无依赖 本身是"队列服务"的一个最大使用意义所在, 用于业务间的分离(不管是分了好, 还是必 须分). 我们先看如何产生消息, 即把消息放到队列当中, 等待下一步的处理. (之后的代码, 使用 Python , 相应的 AMQP 协议实现的模块是 pika ) # -*-coding:utf-8 -*import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange='first', type='fanout') channel.queue_declare(queue='hello') channel.queue_bind(exchange='first', queue='hello') channel.basic_publish(exchange='first', routing_key='', body='Hello World!') 上面代码的细节先不用管它, 但是直观看到做的事有: 获取连接.
- 3.从连接上获取一个 channel , 类似于数据库访问在连接上获取一个 cursor . 声明一个 exchange . (只会创建一次) 声明一个 queue . (只会创建一次) 把 queue 绑定到 exchange 上. 向指定的 exchange 发送一条消息. 消息发出之后, 可以使用 rabbitmqctl 这个工具查看服务的一些当前状态, 比如队列情 况: $ ./rabbitmqctl list_queues Listing queues ... hello 3 ...done. 然后是另一边, 从队列取出消息: # -*-coding:utf-8 -*import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print body channel.basic_consume(callback, queue='hello', no_ack=True) channel.start_consuming() 前面部分和之前的一样, 获取连接, 拿到 channel . 这里声明 queue 是在做重复的事 (之前 Producing 的代码已经做过声明了). 但是 Producing 和 Consuming 的代 码你并不知道哪一个会先执行, 所以为了确保需要的 queue 是存在的, 使用时总先声明 一下是好的方式. 接下来就是定义了一个异步回调, 标明在获取到消息之后要执行的处理函数. 最后, 开始接收服务器的消息. 和前面一样, 看一下代码做的事: 获取连接.
- 4.从连接上拿到 channel . 声明需要的 queue . 定义一个从指定 queue 获取消息的回调处理. 开始接收消息. 两边的代码都完成了, 可以先把取出消息的代码跑起来, 然后再重复运行产生消息的代码, 就能看到效果. 这里我们可以先反思一下我们的思维. 从流程上来说, 之前我们是先考虑如何产生消息, 然后是如何获取消息. 我们按这个顺序来编写了两段代码. 但是我们在使用时, 顺序反过 来是一种更直观的方式. 即先是有服务跑起来, 守着等消息. 然后才是不定时有消息产生 出来. 这一前一后在思维上是有一些微妙的不同的. 如果从 C/S 结构上来看, Consuming 的角色更像是 Server , 而 Producing 的角色更像是 Client . 为什么在这里讲这个呢, 是因为稍后会依次介绍整个流程中的细节, 比如 exchange 的 调度策略, 多个 Producing , 多个 Consuming , 多个 Queue 的情况下, 我们如何 去实现期望的行为. 当系统中的元素与角色有些多的时候, 我们需要一个比较明确的思维 方式来保持自己的清醒. 基本的流程就是前面的两段代码. 接下来会依次介绍提到过的, 我们关心的几个机制. 持久化 调度策略 分配策略 状态反馈 然后, 会有几个实例分析, 用以演示一些典型的使用模式. 4. 持久化 考虑这样的场景, 当消息被暂存到队列后, 在没有被提取的情况下, RabbitMQ 服务停 掉了怎么办. # -*-coding:utf-8 -*import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange='first', type='fanout') channel.queue_declare(queue='hello')
- 5.channel.queue_bind(exchange='first', queue='hello') channel.basic_publish(exchange='first', routing_key='', body='Hello World!') 上 面 的 代 码 , 我 们 创 建 了 一 条 内 容 为 Hello World! 的 消 息 , 通 过 命 令 行 工 具 : $ ./rabbitmqctl list_queues Listing queues ... hello 1 ...done. $ ./rabbitmqctl list_exchanges Listing exchanges ... direct amq.direct amq.fanout amq.headers direct fanout headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic first fanout ...done. 可 以 查 到 , 在 名 为 hello 的 队 列 中 , 有 1 条 消 息 . 有 一 个 类 型 为 fanout , 名 为 first 的交换器. 此 时 通 过 Ctrl-C 或 ./rabbitmqctl stop 把 R a b b i t M Q 服 务 停 掉 , 再 重 启 . 交 换 器 , 队列, 消息都是不会恢复的. 所以, 默认情况下, 消息, 队列, 交换器 都不具有持久化的性质. 如果我们需要持久化功 能, 那么在声明的时候就需要配置好. 交 换 器 和 队 列 的 持 久 化 性 质 , 在 声 明 时 通 过 一 个 durable 参 数 即 可 实 现 : channel.exchange_declare(exchange='first', type='fanout', durable=True) channel.queue_declare(queue='hello', durable=True) 这 样 , 在 服 务 重 启 之 后 , first 和 hello 都 会 恢 复 . 但 是 hello 中 的 消 息 不 会 , 还 需 要 额外配置. 这是 消息 的属性的相关内容: channel.basic_publish(exchange='first', routing_key='', body='Hello World!', properties=pika.BasicProperties(
- 6.delivery_mode = 2, )) 通 过 properties , 把 此 条 消 息 , 仅 仅 是 此 条 消 息 配 置 成 需 要 持 久 化 的 . 这 样 , 在 服 务 重 启之后, 队列中的这种消息就可以恢复. 这里注意一下, 消息的持久化并不是一个很强的约束, 涉及数据落地的时机, 及系统层面 的 fsync 等问题, 不要认为消息完全不会丢. 如果要尽可能高地提高消息的持久化的有 效性, 还需要配置其它的一些机制, 比如后面会谈到的 状态反馈 中的 confirm mode. 交换器 , 队列, 消息 这三者的持久化问题都介绍过了. 前两者是一经声明, 则其性质无法 再被更改, 即你不能先声明一个非持久化的队列, 再声明一个持久化的同名队列, 企图修 改它, 这是不行的. 你重复声明时, 相关参数需要一致. 当然, 你可以删除它们再重新声 明: channel.queue_delete(queue='hello') channel.exchange_delete(exchange='first') 5. 调度策略 我们考虑交换器 Exchange 和队列 Queue 的关系. Exchange 在得到消息后会依据 规则把消息投到一个或多个队列当中. 在调度策略方面, 有两个需要了解的地方, 一是交换器的类型(前面我们用的是 fanout), 二是交换器和队列的绑定关系. 在绑定了的前提下, 我们再谈不同类型的交换器的规则. 绑定动作本身也会影响交换器的行为. 交换器的类型, 内置的有四种, 分别是: fanout direct topic headers 下面一一介绍. 5.1. fanout
- 7.故名思义, fanout 类型的交换器, 其行为是把消息转发给所有绑定的队列上, 就是一 个"广播"行为. # -*-coding:'>coding: