CodeAshen's blog CodeAshen's blog
首页
  • Spring Framework

    • 《剖析Spring5核心原理》
    • 《Spring源码轻松学》
  • Spring Boot

    • Spring Boot 2.0深度实践
  • Spring Cloud

    • Spring Cloud
    • Spring Cloud Alibaba
  • RabbitMQ
  • RocketMQ
  • Kafka
  • MySQL8.0详解
  • Redis从入门到高可用
  • Elastic Stack
  • 操作系统
  • 计算机网络
  • 数据结构与算法
  • 云原生
  • Devops
  • 前端
  • 实用工具
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
  • Reference
GitHub (opens new window)

CodeAshen

后端界的小学生
首页
  • Spring Framework

    • 《剖析Spring5核心原理》
    • 《Spring源码轻松学》
  • Spring Boot

    • Spring Boot 2.0深度实践
  • Spring Cloud

    • Spring Cloud
    • Spring Cloud Alibaba
  • RabbitMQ
  • RocketMQ
  • Kafka
  • MySQL8.0详解
  • Redis从入门到高可用
  • Elastic Stack
  • 操作系统
  • 计算机网络
  • 数据结构与算法
  • 云原生
  • Devops
  • 前端
  • 实用工具
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
  • Reference
GitHub (opens new window)
  • RabbitMQ

  • RocketMQ

    • 01-RocketMQ初探门径
    • 02-RocketMQ急速入门
      • 2.1 消费者示例
      • 2.2 Broker重试机制
      • 3.1 RocketMQ集群模式介绍
      • 3.2 Rocket主从搭建
    • 03-RocketMQ生产者核心应用
    • 04-RocketMQ消费者核心应用
    • 05-RocketMQ核心原理解析
    • 06-RocketMQ高级特性
  • Kafka

  • Nginx

  • 中间件
  • RocketMQ
CodeAshen
2023-02-10
目录

02-RocketMQ急速入门

# 一、生产者使用

  1. 创建生产者对象 DefaultMQProducer

    有了生产者对象才能发消息,在RocketMQ里必须配置生产者组名称,并且一个应用程序中只能起一个组名称,如果有两个组名称相同的生产者会报错

  2. 设置 NamesrvAddr

  3. 启动消息生产者服务

  4. 创建消息并发送

public class Producer {
	public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        // 创建producer对象
        DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
        // 设置 NamesrvAddr
        producer.setNamesrvAddr(Const.NAMESRV_ADDR_SINGLE);
        // 开启生产者
        producer.start();

        for (int i = 0; i < 5; i++) {
            // 创建消息
            Message message = new Message("test_quick_topic",
                    "TagA",
                    "key_" + i,
                    ("Hello RocketMQ " + i).getBytes());

            // 发送消息得到结果
            SendResult result = producer.send(message);
            System.err.println("消息发出:" + result);
        }
        
        // 关闭生产者
        producer.shutdown();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

Topic和队列是一对多的关系; 默认一个Topic下会有4个队列;

# 二、消费者使用

# 2.1 消费者示例

  1. 创建消费者对象 DefaultMQPushConsumer

    其实并不是MQ主动推给消费者消息,内部使用了长轮询机制

  2. 设置 NamesrvAddr 及消费位置 ConsumeFromWhere

    消费者每次启动都可以指定消费位点,当服务已经启动过一次之后,消费位点就存在NameServer和Broker里了,后续启动后就不会从设置的位置进行消费了

  3. 进行主题订阅 subscribe

  4. 注册监听并消费 registerMessageListener

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        // 创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name");
        // 设置 NameServer 地址
        consumer.setNamesrvAddr(Const.NAMESRV_ADDR_SINGLE);
        // 设置消费位点
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        // 指定订阅topic和订阅tag的表达式
        consumer.subscribe("test_quick_topic", "*");
        
        // 监听消息,指定消费行为
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                MessageExt me = msgs.get(0);
                try {
                    String topic = me.getTopic();
                    String tags = me.getTags();
                    String keys = me.getKeys();
                    String body = new String(me.getBody());
                    System.err.printf("topic: %s, tags: %s, keys: %s, body: %s\n", 
                            topic, tags, keys, body);
                } catch (Exception e) {
                    e.printStackTrace();
                    // 返回稍后重试
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }

                // 返回消费成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 开启消费者
        consumer.start();
        System.err.println("Consumer start...");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# 2.2 Broker重试机制

注册监听的时候,定义了消息消费的具体业务逻辑,并返回返回值到Broker,消费成功返回成功,消费失败返回reconsume_later,稍后重试。 重试消费时间是定时任务,间隔越来越久,1s,5s,10s...

# 查看消费失败,broker的重试配置
cat /usr/local/rocketmq/logs/rocketmqlogs/broker.log | grep messageDelayLevel
2021-06-14 21:46:01 INFO main - messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
1
2
3

# 三、RocketMQ集群

# 3.1 RocketMQ集群模式介绍

下图表示一个双主双从的RocketMQ集群。

  • 所有相同名称的生产者组成生产者组
  • 所有相同名称的消费者组成消费者组
  • 生产者组作用:
    1. 多个生产者并行发消息,提高效率
    2. 事务消息的情况下,broker会主动进行rpc回调生产者,检查生产者,进行check操作。这种情况下一个生产者连不上,可以连另一个。

image-20210615003022580

RocketMQ有四种集群模式

  • 单点模式:一旦宕机,无法对外提供服务,在测试环境的时候,由于资源的限制,可以选择单机,执行或者学习rocketmq的功能。
  • 主从模式:主节点进行消息读写首发,从节点不能接收消息。从节点用于高可用,防止消息丢失,主节点挂了从节点可以继续对消费者提供服务。但是RocketMQ不支持主从切换,redis cluster支持。
  • 双主模式:性能较好,因为消息无需同步到从节点。只有master没有salve;rocketmq每个master之间是互不通信,所以,如果某个mater宕机,该台机器上的数据是无法被消费的,只有等到master重新启动才可以被消费端消费。如果宕机的master磁盘有问题,且无法恢复,会导致master的数据丢失,即消息丢失。
  • 双主双从、多主多从:这种模式即每个master至少会有slave(master和slave的集群名称和接单名称一样,只是节点的id不同,而且master的id必须为0,slave的id大于0,可以是一台也可以多台slave,1,2,3,4,5标示不同的slave)。这种方式虽然需要的资源比较多,但是可以保证在master挂掉的时候,不影响消费者的消费,可以减小对消费端的影响。

比较推荐的是带上从节点的集群模式,其中主从节点间的关系如下:

  • 主从模式环境构建可以保障消息的即时性与可靠性。
  • 投递一条消息后,关闭主节点,从节点继续可以提供消费者数据进行消费,但是不能接收消息。
  • 主节点上线后进行消费进度offset同步

参考:

  • RocketMQ架构设计 (opens new window)
  • RocketMQ主从消费机制 (opens new window)

# 3.2 Rocket主从搭建

参考 RocketMQ环境搭建,以上述为例,在另一台服务器上搭建从节点broker。

只需要在 ${rocketmq_home}/conf/test-conf 中对应配置的 broker-a.properties 内容复制到 broker-a-s.properties 中,然后修改两个配置项目

#1 表示 Master,>0 表示 Slave
brokerId=1

#Broker 的角色
#- ASYNC_MASTER 异步复制 Master
#- SYNC_MASTER 同步双写 Master
#- SLAVE
brokerRole=SLAVE
1
2
3
4
5
6
7
8

最后关闭broker,后重启两个服务器上的主从broker即可。

编辑 (opens new window)
上次更新: 2023/06/04, 12:34:19
01-RocketMQ初探门径
03-RocketMQ生产者核心应用

← 01-RocketMQ初探门径 03-RocketMQ生产者核心应用→

最近更新
01
第01章-RabbitMQ导学
02-10
02
第02章-入门RabbitMQ核心概念
02-10
03
第03章-RabbitMQ高级特性
02-10
更多文章>
Theme by Vdoing | Copyright © 2020-2023 CodeAshen | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式