第04章-RabbitMQ高级整合
# 一、RabbitMQ 整合 SpringAMQP
# 1.1 RabbitAdmin
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
2
3
4
5
6
注意:autoStartup 必须设置为 true,否则 Spring 容器不会加载 RabbitAdmin 类
RabbitAdmin 底层实现就是从 Spring 容器中获取 Exchange、Banding、RoutingKey 以及 Queue 的 @Bean 声明。然后使用 RabbitTemplate 的 execute 方法执行对应的声明、修改、删除等一系列 rabbitMq 基础功能操作。
例如:添加一个交换机、删除一个绑定、清空一个队列里的消息等
示例代码 #testAdmin方法
# 1.2 SpringAMQP RabbitMQ 声明式配置
在 RabbitMQ 基础 API 中声明 Exchange、Queue、Binding,通过 Channel 创建
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
2
3
使用 SpringAMQP 去生命,就需要使用 SpringAMQP 的如下模式,即声明 @Bean 方式,由 Spring 容器创建
// 配置交换机
@Bean
public TopicExchange exchange001() {
return new TopicExchange("topic001", true, false);
}
// 配置队列
@Bean
public Queue queue001() {
return new Queue("queue001", true);
}
// 配置绑定
@Bean
public Binding binding001() {
return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
示例代码 SpringAMQP RabbitMQ 声明式配置
# 1.3 SpringAMQP 消息模板组件 RabbitTemplate
RabbitTemplate,即消息模板,我们在与 SpringAMQP 整合的时候进行发送消息的关键类
该类提供了丰富的发送消息的方法,包括可靠性投递消息方法、回调监听消息接口 ConfirmCallback、返回值确认接口 ReturnCallBack 等等,只需要注入到 Spring 容器中,然后直接使用。
在于 Spring 整合的时候需要实例化,但是在于 SpringBoot 整合时,在配置文件里添加配置即可。
// Spring配置 RabbitTemplate
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
2
3
4
5
6
发送消息 示例代码 各种 send 方法
# 1.4 SpringAMQP 消息容器 SimpleMessageListenerContainer
简单消息监听容器,我们可以对他进行很多的设置,对于消费者的配置项,这个类都可以满足。
监听队列(多个队列)、自动启动、自动声明功能
设置事务特性、事务管理器、事务特性、事务容器(并发)、是否开启事务、回滚消息等
设置消息确认和自动确认模式、是否重回队列、异常捕获 handler 函数
设置消费者标签生成策略、是否独占模式、消费者属性等
设置具体的监听器、消息转换器等
示例代码 #messageContainer 方法
注意:SimpleMessageListenerContainer 可以进行动态设置,比如在运行中的应用可以动态修改其消费者数量的大小、接收消息的模式等
很多基于 RabbitMQ 的子定制化后端管控台在进行动态设置的时候,也是根据这一特性进行设置。
思考:SimpleMessageListenerContainer 为什么能动态感知配置变更?
# 1.5 SpringAMQP 消息监听适配器 MessageListenerAdapter
另一种设置监听器的方式
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
// 省略container各项配置...
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); // 传入消息委派对象
// 省略adapter各项配置...
container.setMessageListener(adapter);
2
3
4
5
通过 MessageListenerAdapter 的代码我们可以看出如下核心属性:
defaultListenerMethod 默认监听方法名称:用于设置监听方法名称
Delegate 委派对象:实际真实的委托对象,用于处理消息
queueOrTagMethodName 队列标识与方法名称组成的映射: 可以一一进行队列与方法名称的匹配,队列和方法名称绑定,即指定队列里的消息会被绑定的方法所接受处理
示例代码 #messageContainer 方法中使用适配器设置监听器部分
# 1.6 SpringAMQP 消息转换器 MessageConverter
我们在消息发送的时候,正常情况下消息体为二进制的数据方式进行传输,我们希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到 MessageConverter
使用方式
自定义常用转换器:MessageConverter,一般来讲都需要实现这个接口
重写下面两个方法:
toMessage : Java对象转换为Message fromMessage : Message对象转换为Java对象
1
2
具体实现
Json 转换器:Jackson2JsonMessageConverter,可以进行 java 对象的转换功能
DefaultJackson2JavaTypeMapper 映射器:可以进行 java 对象的映射关系
自定义二进制转换器:比如图片类型、pdf、ppt、流媒体
示例代码:
配置转换器 #messageContainer 配置转换器部分
# 二、RabbitMQ 整合 Spring Boot
# 2.1 消息生产者
publisher-confirms,实现一个监听器用于监听 Broker 端给我们返回的确认请求:RabbitTemplate.ConfirmCallBack
publisher-returns,保证消息对 Broker 端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达消息进行后续处理,保证消息的路由成功:RabbitTemplate.ReturnCallBack
注意一点,在发送消息的时候对 template 进行配置 mandatory=true 保证监听有效
生产端还可以配置其他属性。比如发送重试,超时时间、次数、间隔等
步骤
消息发送方 RabbitSender
发送消息测试 SendTest
# 2.2 消息消费者
首先配置手动 ack,这样可以保证消息的可靠性送达,或者在消费端消费失败的时候可以重回到队列,根据业务记录日志等处理
可以设置消费端的监听个数和最大监听个数,用于控制消费端的并发情况
消费端监听 @RabbitListener 注解,@RabbitListener 是一个组合注解,里面可以注解配置 @QueueBinding、@Queue、@Exchange 直接通过这个组合注解一次性搞定消费端交换机、队列、绑定、路由、并配置监听功能等
代码(可以配合上面的生产者测试)
消息消费者 RabbitReceiver
# 三、RabbitMQ 整合 Spring Cloud Stream
Spring Cloud Stream 整体架构核心概念图:
下图所示,对于中间的应用,消息的接收和发送可以使用不同的消息中间件
如下图,Spring Cloud Stream 在核心应用和MQ中间加入绑定层,作为一个协调者的角色,通过其代理实现消息的通信
Spring Cloud Stream 在生产端消费端管道前都加了一层插件(下图绿色部分),插件可以用于接受各种不同的消息,并且支持消息中间件的替换(如可将中间的 RabbitMQ 替换成 Kafka)
Barista 接口:Barista 接口是定义来作为后面类的参数,这一接口定义消息通道类型和通道名称,通道的名称是作为配置用,通道类型则决定了 app 会使用这一通道进行发送消息还是从中接收消息
@Output:输出注解,用于定义发送消息接口
@Input:输入注解,用于定义消息的消费者接口
@StreamListener:用于定义监听方法的注解
使用 Spring Cloud Stream 非常简单,只需要使用好这三个注解即可,在实现高性能消息生产和消费场景非常适用,但是使用 Spring Cloud Stream 框架有一个非常大的问题就是不能实现消息的可靠性投递,也就是没办法保证消息 100% 的可靠性,会存在少量的消息丢失问题。
这个原因是因为 Spring Cloud Stream 框架为了兼顾 Kafka,所以在实际中作中使用它的目的就是针对高性能的消息通信,这点就是当前版本 Spring Cloud Stream 的定位。