第06章-RocketMQ消息队列
# 6.1 Spring实现异步调用
有时候业务需要异步调用其他系统,如异步添加积分、发送短信等,Spring有以下几种方式可以实现异步调用
- AsyncRestTemplate (参考文档 (opens new window))
- @Async注解(参考文档 (opens new window))
- WebClient(Spring5.0引入)(参考文档 (opens new window))
- MQ
本章介绍MQ
MQ适用场景
- 异步处理
- 流量削峰填谷
- 解耦微服务
MQ的选择参考:MQ选择手记 (opens new window)
# 6.2 Rocket MQ搭建
# 6.2.1 Rocket MQ搭建
基于4.8.0版本
参考:
- 本地搭建:搭建RocketMQ (opens new window)
- 生产环境部署:官方文档 (opens new window)
# 6.2.2 Rocket MQ控制台搭建
github地址:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console
克隆项目到本地,打开rocketmq-console项目,选择性配置以下内容:
配置文件:
server.port=8080
# 默认localhost:9876
rocketmq.config.namesrvAddr=localhost:9876
2
3
pom文件中Rocket MQ版本:
<rocketmq.version>4.8.0</rocketmq.version>
打包运行
# 打包
mvn clean package -DskipTests
# 运行,可指定相关参数
java -jar -Dserver.port=17980 rocketmq-console-ng-2.0.0.jar
2
3
4
参考:
# 6.3 Rocket MQ术语
- Topic(主题):一类消息的集合, RocketMQ的基本订阅单位
- 消息模型
- Producer(生产者):生产消息
- Broker(消息代理):存储消息、转发消息
- Consumer(消费者):消费消息
- 部署结构
- Name server(名字服务):生产者/消费者通过名字服务查找各主题相应的Broker Ip列表
- Broker Server(代理服务器):消息中转角色,负责存储消息、转发消息
- 消费模式
- Pull Consumer(拉模式):应用调用Consumer的拉取信息方法从Broker Server拉取消息
- Push Consumer(推模式):Broker收到消息后主动推送给消费端,该模式实时性较高
- Group(组)
- Producer Group(生产者组):同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致
- Consumer Group(消费者组):同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致
- 消息传播模式
- Clustering(集群):相同Consumer Group的每个Consumer实例平均分摊消息
- Broadcasting(广播):相同Consumer Group的每个Consumer实例都接收全量的消息
- 消息类型:普通消息、顺序消息、定时/延时消息、事务消息
参考:Rocket MQ概念 (opens new window)
# 6.4 Spring消息编程模型
# 6.4.1 消息生产者
引依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
2
3
4
5
写配置
# rocketmq配置
rocketmq:
name-server: 127.0.0.1:9876
producer:
# 必须指定group
group: test-group
2
3
4
5
6
发送消息示例
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void addBonus() {
// 通过MQ发送增加积分消息
rocketMQTemplate.convertAndSend(
"add-bonus", // topic
new UserAddBonusMsgDto(share.getUserId(), 50)); // 消息内容
}
2
3
4
5
6
7
8
9
# 6.4.2 消息消费者
引依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
2
3
4
5
写配置
# rocketmq配置
rocketmq:
name-server: 127.0.0.1:9876
2
3
发送消息示例
/**
* Rocket MQ 消费者类
* 实现 RocketMQListener 接口标识这是一个消费者类,泛型指定为消息实体
* 注解 @RocketMQMessageListener 属性指定消费的一些信息,topic、group等
* 重写接口的 onMessage 消费方法
*/
@Service
@RocketMQMessageListener(topic = "add-bonus", consumerGroup = "consumer-group")
public class AddBonusMQListener implements RocketMQListener<UserAddBonusMsgDto> {
@Resource
private UserMapper userMapper;
@Override
public void onMessage(UserAddBonusMsgDto userAddBonusMsgDto) {
// 添加积分
Integer userId = userAddBonusMsgDto.getUserId();
User user = userMapper.selectByPrimaryKey(userId);
user.setBonus(user.getBonus() + userAddBonusMsgDto.getBonus());
userMapper.updateByPrimaryKey(user);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Spring Boot 对不同的消息队列都有不同的生产消费方式
如Rabbit MQ提供了 AmqpTemplate 和 @RabbitListener 注解,Kafka提供了 KafkaTemplate 和 @KafkaListener 注解。
# 6.5 分布式事务
# 6.5.1 Rocket MQ事务支持
Rocket MQ 事务支持,其中设计一些术语和概念
概念术语:
- 半消息(Half Message) 暂时无法消费的消息。生产者将消息发送到了MQ server,但这个消息会被标记为“暂不能投递”状态,先存储起来;消费者不会去消费这条消息。
- 消息回查(Message Status Check) 网络断开或生产者重启可能导致丢失事务消息的第二次确认。当MQ Server发现消息长时间处于半消息状态时,将向消息生产者发送请求,询问该消息的最终状态(提交或回滚)。
消息三态:
- Commit:提交事务消息,消费者可以消费此消息
- Rollback:回滚事务消息,broker会删除该消息,消费者不能消费
- UNKNOWN:broker需要回查确认该消息的状态
可以结合下面Rocket MQ事务设计流程理解:
- 生产者先发送一个半消息发送到MQ服务器。①
- 半消息发送成功后,生产者执行本地事务。②③
- 根据本地事务结果,将commit或rollback消息发送到MQ服务器。④
- 如果在本地事务执行过程中缺少commit/rollback消息或生产者处于等待状态,MQ服务器长时间处于半消息状态,没接收到commit或rollback消息,MQ服务器将向同一组中的每个生产者发送检查消息,以获取事务状态。⑤
- 生产者收到消息回查后,根据本地事务状态回复commit/rollback消息给MQ服务器。⑥⑦
- 最后commit的消息将传递给消费者进行消费,但是rollback的消息将被MQ服务器丢弃。
上述过程4、5步是在MQ未收到半消息是该commit或rollback的消息时,才会触发的,就是上面介绍的消息回查。
官方文档参考:Rocket MQ事务设计 (opens new window)
# 6.5.2 Rocket MQ事务案例
本案例中内容中心审核分享内容,审核通过则发送消息给用户中心,添加用户积分,内容中心本地事务更新分享的审核状态。代码如下:
编写业务类,发送MQ消息
@Service
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
public class ShareService {
private final ShareMapper shareMapper;
private final RocketMQTemplate rocketMQTemplate;
private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
/**
* 审核分享内容
* 审核通过后即发送MQ添加积分,利用Rocket MQ的事务特性
*/
public Share auditById(Integer id, ShareAuditDTO auditDTO) {
Share share = shareMapper.selectByPrimaryKey(id);
if (share == null) {
throw new IllegalArgumentException("参数非法!该分享不存在!");
}
if (!Objects.equals(share.getAuditStatus(), AuditStatusEnum.NOT_YET.getName())) {
throw new IllegalArgumentException("参数非法!该分享已经审核过了!");
}
if (Objects.equals(auditDTO.getAuditStatusEnum(), AuditStatusEnum.PASS)) {
// 审核通过:发送添加积分MQ,MQ封装的本地事务中执行审核通过操作
String transactionId = UUID.randomUUID().toString();
// 发送带事务特性的MQ消息 参数:1-分组,2-topic,3-消息,4-额外参数
rocketMQTemplate.sendMessageInTransaction(
"tx-add-bonus-group",
"add-bonus",
MessageBuilder
.withPayload(new UserAddBonusMsgDto(share.getUserId(), 50))
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId) // 设置消息header
.setHeader("share_id", id)
.build(),
auditDTO // 额外参数
);
/*
【注意】审核通过状态的操作在MQ本地事务方法中,此处只执行发送消息逻辑,
auditDTO额外参数就是在本地事务方法中执行审核的参数,详情见 AddBonusTransactionListener 类
*/
} else {
// 审核不通过:直接调用审核方法
this.auditByIdInDB(id, auditDTO);
}
return share;
}
/*
* 修改审核状态
*/
@Transactional(rollbackFor = Exception.class)
public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {
Share share = Share.builder()
.id(id)
.auditStatus(auditDTO.getAuditStatusEnum().getName())
.reason(auditDTO.getReason())
.build();
shareMapper.updateByPrimaryKeySelective(share);
}
/*
* 修改审核状态,并记录rocketmq事务日志
*/
@Transactional(rollbackFor = Exception.class)
public void auditByIdWithRocketMQLog(Integer id, ShareAuditDTO auditDTO, String transactionId) {
// 审核状态
this.auditByIdInDB(id, auditDTO);
// 记录事务日志
RocketmqTransactionLog transactionLog = RocketmqTransactionLog.builder()
.transactionId(transactionId)
.log("审核分享")
.build();
rocketmqTransactionLogMapper.insert(transactionLog);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
Rocket MQ事务类,封装了本地事务方法,以及供事务回查的检查方法
/**
* Rocket MQ 本地事务监听器
* 封装了本地事务执行的方法,和事务回查的方法
*/
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
@Resource
private ShareService shareService;
@Resource
private RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
/**
* 执行本地事务的方法
* RocketMQ事务是先发送半消息,再执行本地事务,所以本方法中才真正执行审核操作
*
* @param msg MQ消息对象
* @param arg 发送MQ消息的额外参数
* @return 返回RocketMQ事务三态之一,COMMIT/ROLLBACK/UNKNOWN
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 取出消息header中的信息
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
Integer shareId = Integer.valueOf((String) headers.get("share_id"));
// 真正执行审核操作,并发送事务后续操作 commit/rollback
try {
shareService.auditByIdWithRocketMQLog(shareId, (ShareAuditDTO) arg, transactionId);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 检查本地事务的方法
* 检查事务日志表中是否有记录,来判断本地事务是否成功
*
* @param msg MQ消息对象
* @return 返回RocketMQ事务三态之一,COMMIT/ROLLBACK/UNKNOWN
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 从消息总获取事务id
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
// 查询事务日志记录
RocketmqTransactionLog transactionLog = rocketmqTransactionLogMapper.selectOne(
RocketmqTransactionLog.builder().transactionId(transactionId).build()
);
// 根据本地事务状态发送 commit/rollback 消息
if (transactionLog != null) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
消费者端不用改变,和前面小结中的消费者代码一样。
总结本例中关键点:
- 发送消息时使用header传参,传参到
- 用额外参数arg传参,传到本地事务方法处,用来执行本地事务
- 用记日志的方法实现本地事务回查
# 6.6 Spring Cloud Stream概念
个用于构建消息驱动的微服务的框架。简单来说就是一个用于简化MQ通信的框架。
官方文档参考:Spring Cloud Stream主要概念 (opens new window)
应用通过inputs和outputs和Binder交互,Binder是一个将微服务和消息中间件继承的组件,途中的Middleware表示消息中间件,目前支持kafka、rabbitmq、rocketmq。
Spring Cloud Stream编程模型
- Destination Binder(目标绑定器):与消息中间件通信的组件
- Destination Bindings(目标绑定):Binding是连接应用程序跟消息中间件的桥梁,用于消息的消费和生产,由 binder创建
- Message(消息)
上图表示,微服务继承了Stream,Stream的Destination Binder创建了两个Binding,分别连接rabbitmq和kafka,左侧binding从rabbitmq消费消息,经过服务的处理,将消息发送到kafka。
# 6.7 Spring Cloud Stream使用
# 6.7.1 生产者
第一步:加依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
2
3
4
第二步:写注解
@SpringBootApplication
@EnableBinding(Source.class) // 整合Stream注解
public class ContentCenterApplication {}
2
3
第三步:写配置
spring:
cloud:
stream:
rocketmq:
binder:
name-server: localhost:9876
bindings:
output:
# 用来指定topic
destination: stream-test-topic
2
3
4
5
6
7
8
9
10
编码示例:
@Autowired
private Source source;
// 测试Stream发送MQ消息
public String testStream() {
Message<String> message = MessageBuilder.withPayload("消息体").build();
source.output().send(message); // 发送消息
return "success";
}
2
3
4
5
6
7
8
9
# 6.7.2 消费者
第一步:加依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
2
3
4
第二步:写注解
@SpringBootApplication
@EnableBinding(Sink.class) // 整合Stream注解
public class ContentCenterApplication {}
2
3
第三步:写配置
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
input:
# 对应生产者的topic
destination: stream-test-topic
# 一定要设置,非rocketmq可留空
group: binder-group
2
3
4
5
6
7
8
9
10
11
12
编码示例:
@Service
@Slf4j
public class TestStreamConsumer {
@StreamListener(Sink.INPUT)
public void receive(String messageBody) {
log.info("通过Stream收到消息,messageBody={}", messageBody);
}
}
2
3
4
5
6
7
8
9
# 6.8 自定义接口
# 6.8.1 自定义接口发送消息
- 编写自定义接口,使用@Output注解,标记这是一个 Output Binding
public interface MySource {
String MY_OUTPUT = "my-output";
/**
* Output注解标记这是一个 Output Binding
* 注解value和yml配置bindings下的名称一致
*/
@Output(MY_OUTPUT)
MessageChannel output();
}
2
3
4
5
6
7
8
9
10
- 启动类@EnableBinding注解上引用MySource
@SpringBootApplication
@EnableBinding({Source.class, MySource.class})
public class ContentCenterApplication {}
2
3
- 配置文件添加配置,bindings下配置 my-output,下面配置topic
spring:
cloud:
stream:
rocketmq:
binder:
name-server: localhost:9876
bindings:
output:
# 用来指定topic
destination: stream-test-topic
# 和 @Output注解的value一致
my-output:
destination: stream-my-topic
2
3
4
5
6
7
8
9
10
11
12
13
测试发送消息:
@Autowired
private MySource mySource;
/**
* 测试自定义Stream接口发送MQ消息
* @return
*/
@GetMapping("/stream2")
public String testStream2() {
Message<String> message = MessageBuilder.withPayload("消息体").build();
mySource.output().send(message);
return "success";
}
2
3
4
5
6
7
8
9
10
11
12
13
# 6.8.2 自定义接口消费消息
- 编写自定义接口,使用@Input注解,标记这是一个 Input Binding
public interface MySink {
String MY_INPUT = "my-input";
/**
* Input注解标记这是一个 Input Binding
* 注解value和yml配置bindings下的名称一致
*/
@Input(MY_INPUT)
SubscribableChannel input();
}
2
3
4
5
6
7
8
9
10
- 启动类@EnableBinding注解上引用MySource
@SpringBootApplication
@EnableBinding({Sink.class, MySink.class})
public class UserCenterApplication {}
2
3
- 配置文件添加配置,bindings下配置 my-output,下面配置topic
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
input:
# 对应生产者的topic
destination: stream-test-topic
# 一定要设置,非rocketmq可留空
group: binder-group
my-input:
destination: stream-my-topic
group: my-group
2
3
4
5
6
7
8
9
10
11
12
13
14
15
测试发送消息:
@Service
@Slf4j
public class MyTestStreamConsumer {
@StreamListener(MySink.MY_INPUT)
public void receive(String messageBody) {
log.info("通过自定义接口收到消息,messageBody={}", messageBody);
}
}
2
3
4
5
6
7
8
9
# 6.8.3 注解解析
- Source:Stream提供的发送消息的接口,和上述自定义的MySource实现一样
- Sink:Stream提供的接收消息的接口,和上述自定义的MySink实现一样
- Processor:Stream提供的用于收发消息的接口,继承了Source和Sink接口
yml配置文件中,bindings下的配置,显示binding的名称,再是topic名称,这里要结合[Spring Cloud Stream概念](#5.6 Spring Cloud Stream概念)理解。
Binder连接应用和MQ,创建Binding,通过Binding和MQ通信。
这里Source就表示创建的Output Binding,Sink就表示创建的Input Binding。
# 6.9 消息过滤
当不想消费MQ中的所有消息,想根据一定条件消费指定消息时,可以使用消息过滤。有以下几种方式:
- condition:通过@StreamListener注解的condition属性实现
- Tags:通过设置tags实现(Rocket MQ特有)
- SQL92:使用SQL语法过滤(Rocket MQ特有)
参考:
# 6.10 Spring Cloud Stream监控
使用actuator可以监控Stream健康状态,有以下端点可供监控
- /actuator/bindings
- /actuator/channels
- /actuator/health
# 6.11 Spring Cloud Stream异常处理
MQ消费者消费消息的时候,如果出现异常该如何处理。Stream提供了处理方式
参考:Stream异常处理 (opens new window)
# 6.12 Spring Cloud Stream + Rocket MQ 实现分布式事务
Stream本身并没有考虑分布式事务的问题,这里介绍的还是依赖于Rocket MQ的能力。
所以这里介绍的使用了Stream的代码,和之前Rocket MQ原生事务代码大同小异,只是稍加改造。
# 6.12.1 生产者改造
编写业务类,发送MQ消息,改造点就是使用Source发送消息,source发消息不支持额外参数,对象转为json后放到消息header里。
@Service
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
public class ShareService {
private final ShareMapper shareMapper;
private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
private final Source source;
/**
* 审核分享内容
* 审核通过后即发送MQ添加积分,利用Rocket MQ的事务特性
*/
public Share auditById(Integer id, ShareAuditDTO auditDTO) {
Share share = shareMapper.selectByPrimaryKey(id);
if (share == null) {
throw new IllegalArgumentException("参数非法!该分享不存在!");
}
if (!Objects.equals(share.getAuditStatus(), AuditStatusEnum.NOT_YET.getName())) {
throw new IllegalArgumentException("参数非法!该分享已经审核过了!");
}
if (Objects.equals(auditDTO.getAuditStatusEnum(), AuditStatusEnum.PASS)) {
// 审核通过:发送添加积分MQ,MQ封装的本地事务中执行审核通过操作
String transactionId = UUID.randomUUID().toString(); // 事务id
// 【改造点1】使用stream发送消息
source.output().send(
MessageBuilder
.withPayload(new UserAddBonusMsgDto(share.getUserId(), 50))
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId) // 设置消息header
.setHeader("share_id", id)
.setHeader("dto", JSON.toJSONString(auditDTO))
.build()
);
// 发送带事务特性的MQ消息 参数:1-分组,2-topic,3-消息,4-额外参数
// rocketMQTemplate.sendMessageInTransaction(
// "tx-add-bonus-group",
// "add-bonus",
// MessageBuilder
// .withPayload(new UserAddBonusMsgDto(share.getUserId(), 50))
// .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId) // 设置消息header
// .setHeader("share_id", id)
// .build(),
// auditDTO // 额外参数
// );
/*
【注意】审核通过状态的操作在MQ本地事务方法中,此处只执行发送消息逻辑,
auditDTO额外参数就是在本地事务方法中执行审核的参数,详情见 AddBonusTransactionListener 类
*/
} else {
// 审核不通过:直接调用审核方法
this.auditByIdInDB(id, auditDTO);
}
return share;
}
/*
* 修改审核状态
*/
@Transactional(rollbackFor = Exception.class)
public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {
Share share = Share.builder()
.id(id)
.auditStatus(auditDTO.getAuditStatusEnum().getName())
.reason(auditDTO.getReason())
.build();
shareMapper.updateByPrimaryKeySelective(share);
}
/*
* 修改审核状态,并记录rocketmq事务日志
*/
@Transactional(rollbackFor = Exception.class)
public void auditByIdWithRocketMQLog(Integer id, ShareAuditDTO auditDTO, String transactionId) {
// 审核状态
this.auditByIdInDB(id, auditDTO);
// 记录事务日志
RocketmqTransactionLog transactionLog = RocketmqTransactionLog.builder()
.transactionId(transactionId)
.log("审核分享")
.build();
rocketmqTransactionLogMapper.insert(transactionLog);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
Rocket MQ事务类,封装了本地事务方法,以及供事务回查的检查方法,这里改造点就改变一下额外参数的接收方式就好了。
/**
* Rocket MQ 本地事务监听器
* 封装了本地事务执行的方法,和事务回查的方法
*/
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
@Resource
private ShareService shareService;
@Resource
private RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
/**
* 执行本地事务的方法
* RocketMQ事务是先发送半消息,再执行本地事务,所以本方法中才真正执行审核操作
*
* @param msg MQ消息对象
* @param arg 发送MQ消息的额外参数
* @return 返回RocketMQ事务三态之一,COMMIT/ROLLBACK/UNKNOWN
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 取出消息header中的信息
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
Integer shareId = Integer.valueOf((String) headers.get("share_id"));
String dtoString = (String) headers.get("dto");
ShareAuditDTO auditDTO = JSON.parseObject(dtoString, ShareAuditDTO.class);
// 真正执行审核操作,并发送事务后续操作 commit/rollback
try {
shareService.auditByIdWithRocketMQLog(shareId, auditDTO, transactionId);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 检查本地事务的方法
* 检查事务日志表中是否有记录,来判断本地事务是否成功
*
* @param msg MQ消息对象
* @return 返回RocketMQ事务三态之一,COMMIT/ROLLBACK/UNKNOWN
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 从消息总获取事务id
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
// 查询事务日志记录
RocketmqTransactionLog transactionLog = rocketmqTransactionLogMapper.selectOne(
RocketmqTransactionLog.builder().transactionId(transactionId).build()
);
// 根据本地事务状态发送 commit/rollback 消息
if (transactionLog != null) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
配置文件:
spring:
cloud:
stream:
rocketmq:
binder:
name-server: localhost:9876
# 添加如下事务配置
bindings:
output:
producer:
transactional: true
group: tx-add-bonus-group
bindings:
output:
# 用来指定topic
destination: add-bonus
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 6.12.2 消费者改造
消费类
@Service
@Slf4j
public class AddBonusStreamConsumer {
@Resource
private UserMapper userMapper;
@Resource
private BonusEventLogMapper bonusEventLogMapper;
@StreamListener(Sink.INPUT)
public void receive(UserAddBonusMsgDto userAddBonusMsgDto) {
// 添加积分
Integer userId = userAddBonusMsgDto.getUserId();
User user = userMapper.selectByPrimaryKey(userId);
user.setBonus(user.getBonus() + userAddBonusMsgDto.getBonus());
userMapper.updateByPrimaryKey(user);
// 记录日志
bonusEventLogMapper.insert(
BonusEventLog.builder()
.userId(userId)
.value(userAddBonusMsgDto.getBonus())
.event("CONTRIBUTE")
.createTime(new Date())
.description("投稿添加积分")
.build()
);
log.info("积分添加完成");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
配置文件:
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
input:
# 对应生产者的topic
destination: add-bonus
# 一定要设置,非rocketmq可留空
group: binder-group
2
3
4
5
6
7
8
9
10
11
12
# 6.13 Spring Cloud Stream知识盘点
参考:[Stream异常处理](