第03章-RabbitMQ高级特性
本章主要内容为,RabbitMQ 的高级特性和实际场景应用,包括消息如何保障 100% 的投递成功 ?幂等性概念详解,在海量订单产生的业务高峰期,如何避免消息的重复消费问题?Confirm 确认消息、Return 返回消息,自定义消费者,消息的 ACK 与重回队列,消息的限流,TTL 消息,死信队列等 ...
# 一、保证消息 100% 投递成功
# 1.1 什么是生产端的可靠性投递
- 保证消息的成功发出
- 保证 MQ 节点的成功接收
- 发送端收到 MQ 节点(Broker)确认应答
- 完善的消息补偿机制
# 1.2 互联网大厂的解决方案
消息信息落库,对消息状态进行标记
方案步骤: 1 业务数据落库(BIZ DB),消息信息落库(MSG DB)并设置投递状态为 0(待投递) 2~3 producer 投递消息到 Broker,Broker 收到消息返回 ACK 到 producer 4 producer 收到 ACK 后将消息信息投递状态设置为 1(成功) 5~6 分布式定时任务获取消息投递状态为 0 的消息进行重新投递,并设置 retry 次数 7 若 retry 次数大于三次,就将消息投递状态设置为 2(失败) 缺点: 至少两次数据库 IO,高并发场景下并不合适
1
2
3
4
5
6
7
8
9消息的延迟投递,做二次确认,回调检查
方案步骤: 0 业务数据落库(BIZ DB) 1~2 上游服务(Upstream)投递消息 a(绿线)到 MQ,同时投递一条延迟消息 b(橙线) 3~4 下游服务(Downstream)监听 a 类消息,消费成功就发送一条确认消息 c(蓝线)到 MQ 5 补偿服务(Callback)监听 c 类消息,消费到消息就将消息信息落库(MSG DB),表示本次消息投递消费成功 6 延迟消息 b 时间到,补偿服务监听到此消息,查询 MSG DB 中是否记录了成功消费的记录,若没有,就让上游服务重新投递 优点: 上下游服务只关注消息的投递和消费,消息的可靠性由补偿服务保证
1
2
3
4
5
6
7
8
9
# 二、消息幂等性解决方案
消费端 — 幂等性保障
消费端实现幂等性,就意味着,我们的消息永远不会被消费多次,即使我们收到了多条一样的消息。
可以借鉴数据库乐观锁的机制,比如执行一条更新库存的SQL语句:
update t_reps set count = count -1, version = version + 1 where version = 1;
业界主流的幂等性操作:
唯一 ID + 指纹码 机制,利用数据库主键去重
在进行操作之前,查询数据表中有没有对应的记录 select count(1) from t_order where id = 唯一ID + 指纹码; 如果没有,直接进行 insert 操作就行了; 如果有了,为了保证幂等性就不进行操作了,消费端不进行消费了。
1
2
3
4好处:实现简单
坏处:高并发下有数据库写入的性能瓶颈
解决方案:根据 ID 进行分库分表,进行路由算法。将 IO 操作分担到多个数据库中,降低单库的压力。
利用 Redis 的原子性去实现
使用 Redis 进行幂等操作,需要考虑的为问题:
第一:我们是否需要进行数据库落库,如果落库的话,关键问题是数据库和缓存如何做到原子性?
第二:如果不进行落库,如果都存到缓存中,如何设置定时同步的策略?
# 三、RabbitMQ 投递消息机制
# 3.1 Confirm 确认消息
理解 Confirm 确认机制:
- 消息的确认,是指生产者投递消息后,如果 Broker 收到消息,则会给生产者一个应答
- 生产者进行接受应答,用来确认这条消息是否正常发送到 Broker,这种方式也是消息可靠性投递的核心保障
如何实现 Confirm 确认消息?
在 channel 上开启消息确认模式:
channel.confirmSelect()
在 channel 上添加监听:
addConfirmListener
,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等待后续处理
# 3.2 Return 返回消息
Return Listener 用于处理一些不可路由的消息。
我们的消息生产者,通过指定一个 Exchange 和 RoutingKey,把消息送达到某一个队列中,然后我们的消费这监听队列,进行消息消费操作。
但是在某些情况下,我们在发送消息的时候,当前的 exchange 不存在或者指定的 RoutingKey 路由不到,这时候我们需要监听这种不可达的消息,就要使用 Return Listener。
基础 API 中的配置项:
Mandatory:如果为 true,则监听器会收到路由不可达的消息,然后进行后续处理;如果为 false,则 Broker 自动删除该消息。
# 四、自定义消费者
之前的代码中消费者进行消费的时候都是使用 while 循环,进行 consumer.nextDelivery 方法获取下一条消息,然后进行消费处理!
但是我们使用自定义的 Consumer 更加方便,解耦行更加强,也是实际开发中更常用的方式。
# 五、消费端的限流策略
什么是消费段的限流?
假设一个场景,首先,我们的 RabbitMQ 服务器上有上万条未处理的消息,我们随便打开一个消费端,会出现以下情况:
巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据!可能导致消费端服务器性能下降,卡顿甚至卡死!
RabbitMQ 提供了一种 qos(服务质量保证)功能,即在 非自动确认消息 的前提下,如果一定数目的消息(通过基于 consumer 或者 channel 设置 Qos 的值)未被确认前,不能消费新的消息。
/**
* prefetchSize:消费者所能接收未确认消息的总体大小的上限,单位为B,设置为0则表示没有上限
* prefetchCount: 设置消费者客户端最大能“保持”的未确认的消息数(即预取个数),0则表示没有上限
* global:true - channel级别上生效;false - consumer级别上生效
*/
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
2
3
4
5
6
# 六、消息的 ACK 和重回队列机制
消费端手动 ACK 和 NACK
消费端进行消费的时候,如果由于业务异常我们可以进行日志记录,然后进行补偿。
如果由于服务器宕机等严重问题,我们就需要手动进行 ack,保障消费端消费成功。
消费端的重回队列
消费端的重回队列是为了对没有处理成功的消息,把消息重新退回给 Broker。
一般我们在实际应用中,都会关闭重回队列,也就是设置为 false。
# 七、TTL 队列/消息
TTL 是 Time to Live 的缩写,也就是生存时间
RabbitMQ 支持消息的过期时间,在消息发送时可以指定
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) .contentEncoding("UTF-8") .expiration("10000") // 设置消息的TTL为10s .build(); String msg = "Hello RabbitMQ TTL Message"; channel.basicPublish(exchange, routingKey, properties, msg.getBytes());
1
2
3
4
5
6
7RabbitMQ 支持队列的过期时间,从消息入队列开始,只要超过了队列的超时时间限制,那么消息就会自动删除
Map<String, Object> arguments = new HashMap<>(); arguments.put("x-message-ttl", 20000); channel.queueDeclare(queueName, true, false, false, arguments);
1
2
3两者同时设置取较短过期时间
注意:声明队列的时候,可以指定队列中消息的 TTL,也可以指定队列的 TTL,二者的不同的概念,对应参数分别是
x-message-ttl
和x-expires
后者控制队列被自动删除前处于未使用状态的时间。未使用的意思是队列上没有任何的消费者,队列也没有被重新声明,并且在过期时间段内也未调用过 Basic.Get 命令。
# 八、死信队列
死信队列:DLX, Dead-Letter-Exchange
利用 DLX,当消息在一个队列中变成死信(dead message)之后,他能重新被 publish 到另一个 Exchange 中,这个 Exchange 就是 DLX。
ELX 也是一个正常的 Exchange,和一般的 Exchange 没有区别,他能在任何队列上被指定,实际上就是设置某个队列的属性。
当这个队列中有死信时,rabbitMQ 就会自动的将这个消息重新发布到设置的 Exchange 上去,进而被路由到另一个队列。
可以监听这个队列中的消息做相应的处理,这个特性可以弥补 RabbitMQ3.0 以前支持的 immediate 的功能。
消息变成死信有以下几种情况
消息被拒绝(basic.reject/basic.nack),并且 requeue 为 false
消息 TTL 过期
当队列达到最大长度
死信队列设置
首先需要设置死信队列的 Exchange 和 Queue,然后进行绑定:
exchange: dlx.exchange queue: dlx.queue routingkey: #
1
2
3然后正常声明交换机、队列、绑定,只不过需要在队列上加上一个参数即可
arguments.put("x-dead-letter-exchange", "dlx.exchange"); // 交换机 arguments.put("x-dead-letter-routing-key", "dlx.routingKey"); // 路由键(根据情况指定)
1
2