CodeAshen's blog CodeAshen's blog
首页
  • Spring Framework

    • 《剖析Spring5核心原理》
    • 《Spring源码轻松学》
  • Spring Boot

    • Spring Boot 2.0深度实践
  • Spring Cloud

    • Spring Cloud
    • Spring Cloud Alibaba
  • RabbitMQ
  • RocketMQ
  • Kafka
  • MySQL8.0详解
  • Redis从入门到高可用
  • Elastic Stack
  • 操作系统
  • 计算机网络
  • 数据结构与算法
  • 云原生
  • Devops
  • 前端
  • 实用工具
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
  • Reference
GitHub (opens new window)

CodeAshen

后端界的小学生
首页
  • Spring Framework

    • 《剖析Spring5核心原理》
    • 《Spring源码轻松学》
  • Spring Boot

    • Spring Boot 2.0深度实践
  • Spring Cloud

    • Spring Cloud
    • Spring Cloud Alibaba
  • RabbitMQ
  • RocketMQ
  • Kafka
  • MySQL8.0详解
  • Redis从入门到高可用
  • Elastic Stack
  • 操作系统
  • 计算机网络
  • 数据结构与算法
  • 云原生
  • Devops
  • 前端
  • 实用工具
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
  • Reference
GitHub (opens new window)
  • RabbitMQ

    • 第01章-RabbitMQ导学
    • 第02章-入门RabbitMQ核心概念
    • 第03章-RabbitMQ高级特性
      • 1.1 什么是生产端的可靠性投递
      • 1.2 互联网大厂的解决方案
      • 3.1 Confirm 确认消息
      • 3.2 Return 返回消息
    • 第04章-RabbitMQ高级整合
    • 第05章-高可靠RabbitMQ集群架构
  • RocketMQ

  • Kafka

  • Nginx

  • 中间件
  • RabbitMQ
CodeAshen
2023-02-10
目录

第03章-RabbitMQ高级特性

本章主要内容为,RabbitMQ 的高级特性和实际场景应用,包括消息如何保障 100% 的投递成功 ?幂等性概念详解,在海量订单产生的业务高峰期,如何避免消息的重复消费问题?Confirm 确认消息、Return 返回消息,自定义消费者,消息的 ACK 与重回队列,消息的限流,TTL 消息,死信队列等 ...

# 一、保证消息 100% 投递成功

# 1.1 什么是生产端的可靠性投递

  1. 保证消息的成功发出
  2. 保证 MQ 节点的成功接收
  3. 发送端收到 MQ 节点(Broker)确认应答
  4. 完善的消息补偿机制

# 1.2 互联网大厂的解决方案

  1. 消息信息落库,对消息状态进行标记

    image-20200714213841486

    方案步骤:
    1     业务数据落库(BIZ DB),消息信息落库(MSG DB)并设置投递状态为 0(待投递)
    2~3   producer 投递消息到 Broker,Broker 收到消息返回 ACK 到 producer
    4     producer 收到 ACK 后将消息信息投递状态设置为 1(成功)
    5~6   分布式定时任务获取消息投递状态为 0 的消息进行重新投递,并设置 retry 次数
    7     若 retry 次数大于三次,就将消息投递状态设置为 2(失败)
    
    缺点:
    至少两次数据库 IO,高并发场景下并不合适
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
  2. 消息的延迟投递,做二次确认,回调检查

    image-20200714214331639

    方案步骤:
    0     业务数据落库(BIZ DB)
    1~2   上游服务(Upstream)投递消息 a(绿线)到 MQ,同时投递一条延迟消息 b(橙线)
    3~4   下游服务(Downstream)监听 a 类消息,消费成功就发送一条确认消息 c(蓝线)到 MQ
    5     补偿服务(Callback)监听 c 类消息,消费到消息就将消息信息落库(MSG DB),表示本次消息投递消费成功
    6     延迟消息 b 时间到,补偿服务监听到此消息,查询 MSG DB 中是否记录了成功消费的记录,若没有,就让上游服务重新投递
    
    优点:
    上下游服务只关注消息的投递和消费,消息的可靠性由补偿服务保证
    
    1
    2
    3
    4
    5
    6
    7
    8
    9

# 二、消息幂等性解决方案

消费端 — 幂等性保障

消费端实现幂等性,就意味着,我们的消息永远不会被消费多次,即使我们收到了多条一样的消息。

可以借鉴数据库乐观锁的机制,比如执行一条更新库存的SQL语句:

update t_reps set count = count -1, version = version + 1 where version = 1;

业界主流的幂等性操作:

  1. 唯一 ID + 指纹码 机制,利用数据库主键去重

    在进行操作之前,查询数据表中有没有对应的记录
    select count(1) from t_order where id = 唯一ID + 指纹码;
    如果没有,直接进行 insert 操作就行了;
    如果有了,为了保证幂等性就不进行操作了,消费端不进行消费了。
    
    1
    2
    3
    4

    好处:实现简单

    坏处:高并发下有数据库写入的性能瓶颈

    解决方案:根据 ID 进行分库分表,进行路由算法。将 IO 操作分担到多个数据库中,降低单库的压力。

  2. 利用 Redis 的原子性去实现

    使用 Redis 进行幂等操作,需要考虑的为问题:

    • 第一:我们是否需要进行数据库落库,如果落库的话,关键问题是数据库和缓存如何做到原子性?

    • 第二:如果不进行落库,如果都存到缓存中,如何设置定时同步的策略?

# 三、RabbitMQ 投递消息机制

# 3.1 Confirm 确认消息

理解 Confirm 确认机制:

  • 消息的确认,是指生产者投递消息后,如果 Broker 收到消息,则会给生产者一个应答
  • 生产者进行接受应答,用来确认这条消息是否正常发送到 Broker,这种方式也是消息可靠性投递的核心保障

image-20200925133931362

如何实现 Confirm 确认消息?

  1. 在 channel 上开启消息确认模式:channel.confirmSelect()

  2. 在 channel 上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等待后续处理

示例代码

# 3.2 Return 返回消息

Return Listener 用于处理一些不可路由的消息。

我们的消息生产者,通过指定一个 Exchange 和 RoutingKey,把消息送达到某一个队列中,然后我们的消费这监听队列,进行消息消费操作。

但是在某些情况下,我们在发送消息的时候,当前的 exchange 不存在或者指定的 RoutingKey 路由不到,这时候我们需要监听这种不可达的消息,就要使用 Return Listener。

image-20200925140009137

基础 API 中的配置项:

Mandatory:如果为 true,则监听器会收到路由不可达的消息,然后进行后续处理;如果为 false,则 Broker 自动删除该消息。

示例代码

# 四、自定义消费者

之前的代码中消费者进行消费的时候都是使用 while 循环,进行 consumer.nextDelivery 方法获取下一条消息,然后进行消费处理!

但是我们使用自定义的 Consumer 更加方便,解耦行更加强,也是实际开发中更常用的方式。

示例代码

# 五、消费端的限流策略

什么是消费段的限流?

假设一个场景,首先,我们的 RabbitMQ 服务器上有上万条未处理的消息,我们随便打开一个消费端,会出现以下情况:

巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据!可能导致消费端服务器性能下降,卡顿甚至卡死!

RabbitMQ 提供了一种 qos(服务质量保证)功能,即在 非自动确认消息 的前提下,如果一定数目的消息(通过基于 consumer 或者 channel 设置 Qos 的值)未被确认前,不能消费新的消息。

/**
 * prefetchSize:消费者所能接收未确认消息的总体大小的上限,单位为B,设置为0则表示没有上限
 * prefetchCount: 设置消费者客户端最大能“保持”的未确认的消息数(即预取个数),0则表示没有上限
 * global:true - channel级别上生效;false - consumer级别上生效
 */
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
1
2
3
4
5
6

示例代码

# 六、消息的 ACK 和重回队列机制

消费端手动 ACK 和 NACK

  • 消费端进行消费的时候,如果由于业务异常我们可以进行日志记录,然后进行补偿。

  • 如果由于服务器宕机等严重问题,我们就需要手动进行 ack,保障消费端消费成功。

消费端的重回队列

消费端的重回队列是为了对没有处理成功的消息,把消息重新退回给 Broker。

一般我们在实际应用中,都会关闭重回队列,也就是设置为 false。

示例代码

# 七、TTL 队列/消息

TTL 是 Time to Live 的缩写,也就是生存时间

  • RabbitMQ 支持消息的过期时间,在消息发送时可以指定

    AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
            .deliveryMode(2)
            .contentEncoding("UTF-8")
            .expiration("10000")       // 设置消息的TTL为10s
            .build();
    String msg = "Hello RabbitMQ TTL Message";
    channel.basicPublish(exchange, routingKey, properties, msg.getBytes());
    
    1
    2
    3
    4
    5
    6
    7
  • RabbitMQ 支持队列的过期时间,从消息入队列开始,只要超过了队列的超时时间限制,那么消息就会自动删除

    Map<String, Object> arguments = new HashMap<>();
    arguments.put("x-message-ttl", 20000);
    channel.queueDeclare(queueName, true, false, false, arguments);
    
    1
    2
    3
  • 两者同时设置取较短过期时间

示例代码

注意:声明队列的时候,可以指定队列中消息的 TTL,也可以指定队列的 TTL,二者的不同的概念,对应参数分别是 x-message-ttl 和 x-expires

后者控制队列被自动删除前处于未使用状态的时间。未使用的意思是队列上没有任何的消费者,队列也没有被重新声明,并且在过期时间段内也未调用过 Basic.Get 命令。

# 八、死信队列

死信队列:DLX, Dead-Letter-Exchange

利用 DLX,当消息在一个队列中变成死信(dead message)之后,他能重新被 publish 到另一个 Exchange 中,这个 Exchange 就是 DLX。

ELX 也是一个正常的 Exchange,和一般的 Exchange 没有区别,他能在任何队列上被指定,实际上就是设置某个队列的属性。

当这个队列中有死信时,rabbitMQ 就会自动的将这个消息重新发布到设置的 Exchange 上去,进而被路由到另一个队列。

可以监听这个队列中的消息做相应的处理,这个特性可以弥补 RabbitMQ3.0 以前支持的 immediate 的功能。

消息变成死信有以下几种情况

  • 消息被拒绝(basic.reject/basic.nack),并且 requeue 为 false

  • 消息 TTL 过期

  • 当队列达到最大长度

死信队列设置

  • 首先需要设置死信队列的 Exchange 和 Queue,然后进行绑定:

    exchange: dlx.exchange
    queue: dlx.queue
    routingkey: #
    
    1
    2
    3
  • 然后正常声明交换机、队列、绑定,只不过需要在队列上加上一个参数即可

    arguments.put("x-dead-letter-exchange", "dlx.exchange");      // 交换机
    arguments.put("x-dead-letter-routing-key", "dlx.routingKey"); // 路由键(根据情况指定)
    
    1
    2

示例代码

编辑 (opens new window)
上次更新: 2023/06/04, 12:34:19
第02章-入门RabbitMQ核心概念
第04章-RabbitMQ高级整合

← 第02章-入门RabbitMQ核心概念 第04章-RabbitMQ高级整合→

最近更新
01
第01章-RabbitMQ导学
02-10
02
第02章-入门RabbitMQ核心概念
02-10
03
第04章-RabbitMQ高级整合
02-10
更多文章>
Theme by Vdoing | Copyright © 2020-2023 CodeAshen | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式