CodeAshen's blog CodeAshen's blog
首页
  • Spring Framework

    • 《剖析Spring5核心原理》
    • 《Spring源码轻松学》
  • Spring Boot

    • Spring Boot 2.0深度实践
  • Spring Cloud

    • Spring Cloud
    • Spring Cloud Alibaba
  • RabbitMQ
  • RocketMQ
  • Kafka
  • MySQL8.0详解
  • Redis从入门到高可用
  • Elastic Stack
  • 操作系统
  • 计算机网络
  • 数据结构与算法
  • 云原生
  • Devops
  • 前端
  • 实用工具
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
  • Reference
GitHub (opens new window)

CodeAshen

后端界的小学生
首页
  • Spring Framework

    • 《剖析Spring5核心原理》
    • 《Spring源码轻松学》
  • Spring Boot

    • Spring Boot 2.0深度实践
  • Spring Cloud

    • Spring Cloud
    • Spring Cloud Alibaba
  • RabbitMQ
  • RocketMQ
  • Kafka
  • MySQL8.0详解
  • Redis从入门到高可用
  • Elastic Stack
  • 操作系统
  • 计算机网络
  • 数据结构与算法
  • 云原生
  • Devops
  • 前端
  • 实用工具
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
  • Reference
GitHub (opens new window)
  • 剖析Spring5核心原理

  • Spring源码轻松学

  • Spring Boot 2.0深度实践

  • Spring-Cloud

  • Spring-Cloud-Alibaba

    • 第01章-简介
    • 第02章-Nacos服务发现
    • 第03章-Ribbon负载均衡
    • 第04章-Feign声明式HTTP客户端
    • 第05章-Sentinel集群容错
    • 第06章-RocketMQ消息队列
      • 6.1 Spring实现异步调用
      • 6.2 Rocket MQ搭建
        • 6.2.1 Rocket MQ搭建
      • 6.2.2 Rocket MQ控制台搭建
      • 6.3 Rocket MQ术语
      • 6.4 Spring消息编程模型
        • 6.4.1 消息生产者
        • 6.4.2 消息消费者
      • 6.5 分布式事务
        • 6.5.1 Rocket MQ事务支持
        • 6.5.2 Rocket MQ事务案例
      • 6.6 Spring Cloud Stream概念
      • 6.7 Spring Cloud Stream使用
        • 6.7.1 生产者
        • 6.7.2 消费者
      • 6.8 自定义接口
        • 6.8.1 自定义接口发送消息
        • 6.8.2 自定义接口消费消息
        • 6.8.3 注解解析
      • 6.9 消息过滤
      • 6.10 Spring Cloud Stream监控
      • 6.11 Spring Cloud Stream异常处理
      • 6.12 Spring Cloud Stream + Rocket MQ 实现分布式事务
        • 6.12.1 生产者改造
        • 6.12.2 消费者改造
      • 6.13 Spring Cloud Stream知识盘点
    • 第07章-Spring Cloud Gateway网关
    • 第08章-认证授权
    • 第09章-Nacos配置管理
    • 第10章-Sletuh调用链
  • Spring
  • Spring-Cloud-Alibaba
CodeAshen
2023-02-10
目录

第06章-RocketMQ消息队列

# 6.1 Spring实现异步调用

有时候业务需要异步调用其他系统,如异步添加积分、发送短信等,Spring有以下几种方式可以实现异步调用

  • AsyncRestTemplate (参考文档 (opens new window))
  • @Async注解(参考文档 (opens new window))
  • WebClient(Spring5.0引入)(参考文档 (opens new window))
  • MQ

本章介绍MQ

image-20210224173205642

MQ适用场景

  • 异步处理
  • 流量削峰填谷
  • 解耦微服务

MQ的选择参考:MQ选择手记 (opens new window)

# 6.2 Rocket MQ搭建

# 6.2.1 Rocket MQ搭建

基于4.8.0版本

参考:

  • 本地搭建:搭建RocketMQ (opens new window)
  • 生产环境部署:官方文档 (opens new window)

# 6.2.2 Rocket MQ控制台搭建

github地址:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

克隆项目到本地,打开rocketmq-console项目,选择性配置以下内容:

配置文件:

server.port=8080
# 默认localhost:9876
rocketmq.config.namesrvAddr=localhost:9876
1
2
3

pom文件中Rocket MQ版本:

<rocketmq.version>4.8.0</rocketmq.version>
1

打包运行

# 打包
mvn clean package -DskipTests
# 运行,可指定相关参数
java -jar -Dserver.port=17980 rocketmq-console-ng-2.0.0.jar
1
2
3
4

参考:

  • github地址 (opens new window)
  • 搭建示例 (opens new window)

# 6.3 Rocket MQ术语

  • Topic(主题):一类消息的集合, RocketMQ的基本订阅单位
  • 消息模型
    • Producer(生产者):生产消息
    • Broker(消息代理):存储消息、转发消息
    • Consumer(消费者):消费消息
  • 部署结构
    • Name server(名字服务):生产者/消费者通过名字服务查找各主题相应的Broker Ip列表
    • Broker Server(代理服务器):消息中转角色,负责存储消息、转发消息
  • 消费模式
    • Pull Consumer(拉模式):应用调用Consumer的拉取信息方法从Broker Server拉取消息
    • Push Consumer(推模式):Broker收到消息后主动推送给消费端,该模式实时性较高
  • Group(组)
    • Producer Group(生产者组):同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致
    • Consumer Group(消费者组):同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致
  • 消息传播模式
    • Clustering(集群):相同Consumer Group的每个Consumer实例平均分摊消息
    • Broadcasting(广播):相同Consumer Group的每个Consumer实例都接收全量的消息
  • 消息类型:普通消息、顺序消息、定时/延时消息、事务消息

参考:Rocket MQ概念 (opens new window)

# 6.4 Spring消息编程模型

# 6.4.1 消息生产者

引依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>
1
2
3
4
5

写配置

# rocketmq配置
rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    # 必须指定group
    group: test-group
1
2
3
4
5
6

发送消息示例

@Autowired
private RocketMQTemplate rocketMQTemplate;

public void addBonus() {
    // 通过MQ发送增加积分消息
    rocketMQTemplate.convertAndSend(
        "add-bonus",  // topic
        new UserAddBonusMsgDto(share.getUserId(), 50)); // 消息内容
}
1
2
3
4
5
6
7
8
9

# 6.4.2 消息消费者

引依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>
1
2
3
4
5

写配置

# rocketmq配置
rocketmq:
  name-server: 127.0.0.1:9876
1
2
3

发送消息示例

/**
 * Rocket MQ 消费者类
 * 实现 RocketMQListener 接口标识这是一个消费者类,泛型指定为消息实体
 * 注解 @RocketMQMessageListener 属性指定消费的一些信息,topic、group等
 * 重写接口的 onMessage 消费方法
 */
@Service
@RocketMQMessageListener(topic = "add-bonus", consumerGroup = "consumer-group")
public class AddBonusMQListener implements RocketMQListener<UserAddBonusMsgDto> {
    
    @Resource
    private UserMapper userMapper;

    @Override
    public void onMessage(UserAddBonusMsgDto userAddBonusMsgDto) {
        // 添加积分
        Integer userId = userAddBonusMsgDto.getUserId();
        User user = userMapper.selectByPrimaryKey(userId);
        user.setBonus(user.getBonus() + userAddBonusMsgDto.getBonus());
        userMapper.updateByPrimaryKey(user);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

Spring Boot 对不同的消息队列都有不同的生产消费方式

如Rabbit MQ提供了 AmqpTemplate 和 @RabbitListener 注解,Kafka提供了 KafkaTemplate 和 @KafkaListener 注解。

官方文档参考:Spring Boot特性—消息传递 (opens new window)

# 6.5 分布式事务

# 6.5.1 Rocket MQ事务支持

Rocket MQ 事务支持,其中设计一些术语和概念

概念术语:

  • 半消息(Half Message) 暂时无法消费的消息。生产者将消息发送到了MQ server,但这个消息会被标记为“暂不能投递”状态,先存储起来;消费者不会去消费这条消息。
  • 消息回查(Message Status Check) 网络断开或生产者重启可能导致丢失事务消息的第二次确认。当MQ Server发现消息长时间处于半消息状态时,将向消息生产者发送请求,询问该消息的最终状态(提交或回滚)。

消息三态:

  • Commit:提交事务消息,消费者可以消费此消息
  • Rollback:回滚事务消息,broker会删除该消息,消费者不能消费
  • UNKNOWN:broker需要回查确认该消息的状态

可以结合下面Rocket MQ事务设计流程理解:

image-20210225134238584

  1. 生产者先发送一个半消息发送到MQ服务器。①
  2. 半消息发送成功后,生产者执行本地事务。②③
  3. 根据本地事务结果,将commit或rollback消息发送到MQ服务器。④
  4. 如果在本地事务执行过程中缺少commit/rollback消息或生产者处于等待状态,MQ服务器长时间处于半消息状态,没接收到commit或rollback消息,MQ服务器将向同一组中的每个生产者发送检查消息,以获取事务状态。⑤
  5. 生产者收到消息回查后,根据本地事务状态回复commit/rollback消息给MQ服务器。⑥⑦
  6. 最后commit的消息将传递给消费者进行消费,但是rollback的消息将被MQ服务器丢弃。

上述过程4、5步是在MQ未收到半消息是该commit或rollback的消息时,才会触发的,就是上面介绍的消息回查。

官方文档参考:Rocket MQ事务设计 (opens new window)

# 6.5.2 Rocket MQ事务案例

本案例中内容中心审核分享内容,审核通过则发送消息给用户中心,添加用户积分,内容中心本地事务更新分享的审核状态。代码如下:

编写业务类,发送MQ消息

@Service
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
public class ShareService {

    private final ShareMapper shareMapper;
    private final RocketMQTemplate rocketMQTemplate;
    private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

    /**
     * 审核分享内容
     * 审核通过后即发送MQ添加积分,利用Rocket MQ的事务特性
     */
    public Share auditById(Integer id, ShareAuditDTO auditDTO) {
        Share share = shareMapper.selectByPrimaryKey(id);
        if (share == null) {
            throw new IllegalArgumentException("参数非法!该分享不存在!");
        }
        if (!Objects.equals(share.getAuditStatus(), AuditStatusEnum.NOT_YET.getName())) {
            throw new IllegalArgumentException("参数非法!该分享已经审核过了!");
        }
        
        if (Objects.equals(auditDTO.getAuditStatusEnum(), AuditStatusEnum.PASS)) {
            // 审核通过:发送添加积分MQ,MQ封装的本地事务中执行审核通过操作
            
            String transactionId = UUID.randomUUID().toString();
            // 发送带事务特性的MQ消息  参数:1-分组,2-topic,3-消息,4-额外参数
            rocketMQTemplate.sendMessageInTransaction(
                    "tx-add-bonus-group",
                    "add-bonus",
                    MessageBuilder
                            .withPayload(new UserAddBonusMsgDto(share.getUserId(), 50))
                            .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)  // 设置消息header
                            .setHeader("share_id", id)
                            .build(),
                    auditDTO   // 额外参数
            );
            
            /*
            【注意】审核通过状态的操作在MQ本地事务方法中,此处只执行发送消息逻辑,
            auditDTO额外参数就是在本地事务方法中执行审核的参数,详情见 AddBonusTransactionListener 类
             */
        } else {
            // 审核不通过:直接调用审核方法
            this.auditByIdInDB(id, auditDTO);
        }
        
        return share;
    }
    
    /*
     * 修改审核状态
     */
    @Transactional(rollbackFor = Exception.class)
    public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {
        Share share = Share.builder()
                .id(id)
                .auditStatus(auditDTO.getAuditStatusEnum().getName())
                .reason(auditDTO.getReason())
                .build();
        shareMapper.updateByPrimaryKeySelective(share);
    }

    /*
     * 修改审核状态,并记录rocketmq事务日志
     */
    @Transactional(rollbackFor = Exception.class)
    public void auditByIdWithRocketMQLog(Integer id, ShareAuditDTO auditDTO, String transactionId) {
        // 审核状态
        this.auditByIdInDB(id, auditDTO);
        // 记录事务日志
        RocketmqTransactionLog transactionLog = RocketmqTransactionLog.builder()
                .transactionId(transactionId)
                .log("审核分享")
                .build();
        rocketmqTransactionLogMapper.insert(transactionLog);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77

Rocket MQ事务类,封装了本地事务方法,以及供事务回查的检查方法

/**
 * Rocket MQ 本地事务监听器
 * 封装了本地事务执行的方法,和事务回查的方法
 */
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {

    @Resource
    private ShareService shareService;
    @Resource
    private RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

    /**
     * 执行本地事务的方法
     * RocketMQ事务是先发送半消息,再执行本地事务,所以本方法中才真正执行审核操作
     *
     * @param msg MQ消息对象
     * @param arg 发送MQ消息的额外参数
     * @return 返回RocketMQ事务三态之一,COMMIT/ROLLBACK/UNKNOWN
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 取出消息header中的信息
        MessageHeaders headers = msg.getHeaders();
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        Integer shareId = Integer.valueOf((String) headers.get("share_id"));
        // 真正执行审核操作,并发送事务后续操作 commit/rollback
        try {
            shareService.auditByIdWithRocketMQLog(shareId, (ShareAuditDTO) arg, transactionId);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    /**
     * 检查本地事务的方法
     * 检查事务日志表中是否有记录,来判断本地事务是否成功
     *
     * @param msg MQ消息对象
     * @return 返回RocketMQ事务三态之一,COMMIT/ROLLBACK/UNKNOWN
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // 从消息总获取事务id
        MessageHeaders headers = msg.getHeaders();
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        // 查询事务日志记录
        RocketmqTransactionLog transactionLog = rocketmqTransactionLogMapper.selectOne(
                RocketmqTransactionLog.builder().transactionId(transactionId).build()
        );
        // 根据本地事务状态发送 commit/rollback 消息
        if (transactionLog != null) {
            return RocketMQLocalTransactionState.COMMIT;
        }
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

消费者端不用改变,和前面小结中的消费者代码一样。

总结本例中关键点:

  • 发送消息时使用header传参,传参到
  • 用额外参数arg传参,传到本地事务方法处,用来执行本地事务
  • 用记日志的方法实现本地事务回查

# 6.6 Spring Cloud Stream概念

个用于构建消息驱动的微服务的框架。简单来说就是一个用于简化MQ通信的框架。

官方文档参考:Spring Cloud Stream主要概念 (opens new window)

SCSt with binder

应用通过inputs和outputs和Binder交互,Binder是一个将微服务和消息中间件继承的组件,途中的Middleware表示消息中间件,目前支持kafka、rabbitmq、rocketmq。

Spring Cloud Stream编程模型

SCSt overview

  • Destination Binder(目标绑定器):与消息中间件通信的组件
  • Destination Bindings(目标绑定):Binding是连接应用程序跟消息中间件的桥梁,用于消息的消费和生产,由 binder创建
  • Message(消息)

上图表示,微服务继承了Stream,Stream的Destination Binder创建了两个Binding,分别连接rabbitmq和kafka,左侧binding从rabbitmq消费消息,经过服务的处理,将消息发送到kafka。

# 6.7 Spring Cloud Stream使用

# 6.7.1 生产者

第一步:加依赖

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
1
2
3
4

第二步:写注解

@SpringBootApplication
@EnableBinding(Source.class)  // 整合Stream注解
public class ContentCenterApplication {}
1
2
3

第三步:写配置

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: localhost:9876
      bindings:
        output:
          # 用来指定topic
          destination: stream-test-topic
1
2
3
4
5
6
7
8
9
10

编码示例:

@Autowired
private Source source;

// 测试Stream发送MQ消息
public String testStream() {
    Message<String> message = MessageBuilder.withPayload("消息体").build();
    source.output().send(message);  // 发送消息
    return "success";
}
1
2
3
4
5
6
7
8
9

# 6.7.2 消费者

第一步:加依赖

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
1
2
3
4

第二步:写注解

@SpringBootApplication
@EnableBinding(Sink.class)  // 整合Stream注解
public class ContentCenterApplication {}
1
2
3

第三步:写配置

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
      bindings: 
        input: 
          # 对应生产者的topic
          destination: stream-test-topic
          # 一定要设置,非rocketmq可留空
          group: binder-group
1
2
3
4
5
6
7
8
9
10
11
12

编码示例:

@Service
@Slf4j
public class TestStreamConsumer {
    
    @StreamListener(Sink.INPUT)
    public void receive(String messageBody) {
        log.info("通过Stream收到消息,messageBody={}", messageBody);
    }
}
1
2
3
4
5
6
7
8
9

# 6.8 自定义接口

# 6.8.1 自定义接口发送消息

  1. 编写自定义接口,使用@Output注解,标记这是一个 Output Binding
public interface MySource {
    String MY_OUTPUT = "my-output";
    
    /**
     * Output注解标记这是一个 Output Binding
     * 注解value和yml配置bindings下的名称一致
     */
    @Output(MY_OUTPUT)
    MessageChannel output();
}
1
2
3
4
5
6
7
8
9
10
  1. 启动类@EnableBinding注解上引用MySource
@SpringBootApplication
@EnableBinding({Source.class, MySource.class})
public class ContentCenterApplication {}
1
2
3
  1. 配置文件添加配置,bindings下配置 my-output,下面配置topic
spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: localhost:9876
      bindings:
        output:
          # 用来指定topic
          destination: stream-test-topic
        # 和 @Output注解的value一致
        my-output:
          destination: stream-my-topic
1
2
3
4
5
6
7
8
9
10
11
12
13

测试发送消息:

@Autowired
private MySource mySource;

/**
  * 测试自定义Stream接口发送MQ消息
  * @return
  */
@GetMapping("/stream2")
public String testStream2() {
    Message<String> message = MessageBuilder.withPayload("消息体").build();
    mySource.output().send(message);
    return "success";
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 6.8.2 自定义接口消费消息

  1. 编写自定义接口,使用@Input注解,标记这是一个 Input Binding
public interface MySink {
    String MY_INPUT = "my-input";

    /**
     * Input注解标记这是一个 Input Binding
     * 注解value和yml配置bindings下的名称一致
     */
    @Input(MY_INPUT)
    SubscribableChannel input();
}
1
2
3
4
5
6
7
8
9
10
  1. 启动类@EnableBinding注解上引用MySource
@SpringBootApplication
@EnableBinding({Sink.class, MySink.class})
public class UserCenterApplication {}
1
2
3
  1. 配置文件添加配置,bindings下配置 my-output,下面配置topic
spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
      bindings: 
        input: 
          # 对应生产者的topic
          destination: stream-test-topic
          # 一定要设置,非rocketmq可留空
          group: binder-group
        my-input:
          destination: stream-my-topic
          group: my-group
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

测试发送消息:

@Service
@Slf4j
public class MyTestStreamConsumer {
    
    @StreamListener(MySink.MY_INPUT)
    public void receive(String messageBody) {
        log.info("通过自定义接口收到消息,messageBody={}", messageBody);
    }
}
1
2
3
4
5
6
7
8
9

# 6.8.3 注解解析

  • Source:Stream提供的发送消息的接口,和上述自定义的MySource实现一样
  • Sink:Stream提供的接收消息的接口,和上述自定义的MySink实现一样
  • Processor:Stream提供的用于收发消息的接口,继承了Source和Sink接口

yml配置文件中,bindings下的配置,显示binding的名称,再是topic名称,这里要结合[Spring Cloud Stream概念](#5.6 Spring Cloud Stream概念)理解。

Binder连接应用和MQ,创建Binding,通过Binding和MQ通信。

这里Source就表示创建的Output Binding,Sink就表示创建的Input Binding。

# 6.9 消息过滤

当不想消费MQ中的所有消息,想根据一定条件消费指定消息时,可以使用消息过滤。有以下几种方式:

  • condition:通过@StreamListener注解的condition属性实现
  • Tags:通过设置tags实现(Rocket MQ特有)
  • SQL92:使用SQL语法过滤(Rocket MQ特有)

参考:

  • 消息过滤手记 (opens new window)
  • 官方文档 (opens new window)

# 6.10 Spring Cloud Stream监控

使用actuator可以监控Stream健康状态,有以下端点可供监控

  • /actuator/bindings
  • /actuator/channels
  • /actuator/health

# 6.11 Spring Cloud Stream异常处理

MQ消费者消费消息的时候,如果出现异常该如何处理。Stream提供了处理方式

参考:Stream异常处理 (opens new window)

# 6.12 Spring Cloud Stream + Rocket MQ 实现分布式事务

Stream本身并没有考虑分布式事务的问题,这里介绍的还是依赖于Rocket MQ的能力。

所以这里介绍的使用了Stream的代码,和之前Rocket MQ原生事务代码大同小异,只是稍加改造。

# 6.12.1 生产者改造

编写业务类,发送MQ消息,改造点就是使用Source发送消息,source发消息不支持额外参数,对象转为json后放到消息header里。

@Service
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
public class ShareService {

    private final ShareMapper shareMapper;
    private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
    
    private final Source source;

    /**
     * 审核分享内容
     * 审核通过后即发送MQ添加积分,利用Rocket MQ的事务特性
     */
    public Share auditById(Integer id, ShareAuditDTO auditDTO) {
        Share share = shareMapper.selectByPrimaryKey(id);
        if (share == null) {
            throw new IllegalArgumentException("参数非法!该分享不存在!");
        }
        if (!Objects.equals(share.getAuditStatus(), AuditStatusEnum.NOT_YET.getName())) {
            throw new IllegalArgumentException("参数非法!该分享已经审核过了!");
        }
        
        if (Objects.equals(auditDTO.getAuditStatusEnum(), AuditStatusEnum.PASS)) {
            // 审核通过:发送添加积分MQ,MQ封装的本地事务中执行审核通过操作
            String transactionId = UUID.randomUUID().toString();    // 事务id
            
            // 【改造点1】使用stream发送消息
            source.output().send(
                    MessageBuilder
                            .withPayload(new UserAddBonusMsgDto(share.getUserId(), 50))
                            .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)  // 设置消息header
                            .setHeader("share_id", id)
                            .setHeader("dto", JSON.toJSONString(auditDTO))
                            .build()
            );
            
            // 发送带事务特性的MQ消息  参数:1-分组,2-topic,3-消息,4-额外参数
            // rocketMQTemplate.sendMessageInTransaction(
            //         "tx-add-bonus-group",
            //         "add-bonus",
            //         MessageBuilder
            //                 .withPayload(new UserAddBonusMsgDto(share.getUserId(), 50))
            //                 .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)  // 设置消息header
            //                 .setHeader("share_id", id)
            //                 .build(),
            //         auditDTO   // 额外参数
            // );
            
            /*
            【注意】审核通过状态的操作在MQ本地事务方法中,此处只执行发送消息逻辑,
            auditDTO额外参数就是在本地事务方法中执行审核的参数,详情见 AddBonusTransactionListener 类
             */
        } else {
            // 审核不通过:直接调用审核方法
            this.auditByIdInDB(id, auditDTO);
        }
        
        return share;
    }
    
    /*
     * 修改审核状态
     */
    @Transactional(rollbackFor = Exception.class)
    public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {
        Share share = Share.builder()
                .id(id)
                .auditStatus(auditDTO.getAuditStatusEnum().getName())
                .reason(auditDTO.getReason())
                .build();
        shareMapper.updateByPrimaryKeySelective(share);
    }

    /*
     * 修改审核状态,并记录rocketmq事务日志
     */
    @Transactional(rollbackFor = Exception.class)
    public void auditByIdWithRocketMQLog(Integer id, ShareAuditDTO auditDTO, String transactionId) {
        // 审核状态
        this.auditByIdInDB(id, auditDTO);
        // 记录事务日志
        RocketmqTransactionLog transactionLog = RocketmqTransactionLog.builder()
                .transactionId(transactionId)
                .log("审核分享")
                .build();
        rocketmqTransactionLogMapper.insert(transactionLog);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88

Rocket MQ事务类,封装了本地事务方法,以及供事务回查的检查方法,这里改造点就改变一下额外参数的接收方式就好了。

/**
 * Rocket MQ 本地事务监听器
 * 封装了本地事务执行的方法,和事务回查的方法
 */
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {

    @Resource
    private ShareService shareService;
    @Resource
    private RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

    /**
     * 执行本地事务的方法
     * RocketMQ事务是先发送半消息,再执行本地事务,所以本方法中才真正执行审核操作
     *
     * @param msg MQ消息对象
     * @param arg 发送MQ消息的额外参数
     * @return 返回RocketMQ事务三态之一,COMMIT/ROLLBACK/UNKNOWN
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 取出消息header中的信息
        MessageHeaders headers = msg.getHeaders();
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        Integer shareId = Integer.valueOf((String) headers.get("share_id"));
        String dtoString = (String) headers.get("dto");
        ShareAuditDTO auditDTO = JSON.parseObject(dtoString, ShareAuditDTO.class);

        // 真正执行审核操作,并发送事务后续操作 commit/rollback
        try {
            shareService.auditByIdWithRocketMQLog(shareId, auditDTO, transactionId);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    /**
     * 检查本地事务的方法
     * 检查事务日志表中是否有记录,来判断本地事务是否成功
     *
     * @param msg MQ消息对象
     * @return 返回RocketMQ事务三态之一,COMMIT/ROLLBACK/UNKNOWN
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // 从消息总获取事务id
        MessageHeaders headers = msg.getHeaders();
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        // 查询事务日志记录
        RocketmqTransactionLog transactionLog = rocketmqTransactionLogMapper.selectOne(
                RocketmqTransactionLog.builder().transactionId(transactionId).build()
        );
        // 根据本地事务状态发送 commit/rollback 消息
        if (transactionLog != null) {
            return RocketMQLocalTransactionState.COMMIT;
        }
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

配置文件:

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: localhost:9876
        # 添加如下事务配置
        bindings:
          output: 
            producer:
              transactional: true
              group: tx-add-bonus-group
      bindings:
        output:
          # 用来指定topic
          destination: add-bonus
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 6.12.2 消费者改造

消费类

@Service
@Slf4j
public class AddBonusStreamConsumer {

    @Resource
    private UserMapper userMapper;
    @Resource
    private BonusEventLogMapper bonusEventLogMapper;
    
    @StreamListener(Sink.INPUT)
    public void receive(UserAddBonusMsgDto userAddBonusMsgDto) {
        // 添加积分
        Integer userId = userAddBonusMsgDto.getUserId();
        User user = userMapper.selectByPrimaryKey(userId);
        user.setBonus(user.getBonus() + userAddBonusMsgDto.getBonus());
        userMapper.updateByPrimaryKey(user);

        // 记录日志
        bonusEventLogMapper.insert(
                BonusEventLog.builder()
                        .userId(userId)
                        .value(userAddBonusMsgDto.getBonus())
                        .event("CONTRIBUTE")
                        .createTime(new Date())
                        .description("投稿添加积分")
                        .build()
        );

        log.info("积分添加完成");
    }
    
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

配置文件:

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
      bindings: 
        input: 
          # 对应生产者的topic
          destination: add-bonus
          # 一定要设置,非rocketmq可留空
          group: binder-group
1
2
3
4
5
6
7
8
9
10
11
12

# 6.13 Spring Cloud Stream知识盘点

参考:[Stream异常处理](

编辑 (opens new window)
上次更新: 2023/06/04, 12:34:19
第05章-Sentinel集群容错
第07章-Spring Cloud Gateway网关

← 第05章-Sentinel集群容错 第07章-Spring Cloud Gateway网关→

最近更新
01
第01章-RabbitMQ导学
02-10
02
第02章-入门RabbitMQ核心概念
02-10
03
第03章-RabbitMQ高级特性
02-10
更多文章>
Theme by Vdoing | Copyright © 2020-2023 CodeAshen | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式