02-RocketMQ急速入门
# 一、生产者使用
创建生产者对象 DefaultMQProducer
有了生产者对象才能发消息,在RocketMQ里必须配置生产者组名称,并且一个应用程序中只能起一个组名称,如果有两个组名称相同的生产者会报错
设置 NamesrvAddr
启动消息生产者服务
创建消息并发送
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
// 创建producer对象
DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
// 设置 NamesrvAddr
producer.setNamesrvAddr(Const.NAMESRV_ADDR_SINGLE);
// 开启生产者
producer.start();
for (int i = 0; i < 5; i++) {
// 创建消息
Message message = new Message("test_quick_topic",
"TagA",
"key_" + i,
("Hello RocketMQ " + i).getBytes());
// 发送消息得到结果
SendResult result = producer.send(message);
System.err.println("消息发出:" + result);
}
// 关闭生产者
producer.shutdown();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Topic和队列是一对多的关系; 默认一个Topic下会有4个队列;
# 二、消费者使用
# 2.1 消费者示例
创建消费者对象 DefaultMQPushConsumer
其实并不是MQ主动推给消费者消息,内部使用了长轮询机制
设置 NamesrvAddr 及消费位置 ConsumeFromWhere
消费者每次启动都可以指定消费位点,当服务已经启动过一次之后,消费位点就存在NameServer和Broker里了,后续启动后就不会从设置的位置进行消费了
进行主题订阅 subscribe
注册监听并消费 registerMessageListener
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name");
// 设置 NameServer 地址
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_SINGLE);
// 设置消费位点
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 指定订阅topic和订阅tag的表达式
consumer.subscribe("test_quick_topic", "*");
// 监听消息,指定消费行为
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt me = msgs.get(0);
try {
String topic = me.getTopic();
String tags = me.getTags();
String keys = me.getKeys();
String body = new String(me.getBody());
System.err.printf("topic: %s, tags: %s, keys: %s, body: %s\n",
topic, tags, keys, body);
} catch (Exception e) {
e.printStackTrace();
// 返回稍后重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// 返回消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 开启消费者
consumer.start();
System.err.println("Consumer start...");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 2.2 Broker重试机制
注册监听的时候,定义了消息消费的具体业务逻辑,并返回返回值到Broker,消费成功返回成功,消费失败返回reconsume_later,稍后重试。 重试消费时间是定时任务,间隔越来越久,1s,5s,10s...
# 查看消费失败,broker的重试配置
cat /usr/local/rocketmq/logs/rocketmqlogs/broker.log | grep messageDelayLevel
2021-06-14 21:46:01 INFO main - messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
1
2
3
2
3
# 三、RocketMQ集群
# 3.1 RocketMQ集群模式介绍
下图表示一个双主双从的RocketMQ集群。
- 所有相同名称的生产者组成生产者组
- 所有相同名称的消费者组成消费者组
- 生产者组作用:
- 多个生产者并行发消息,提高效率
- 事务消息的情况下,broker会主动进行rpc回调生产者,检查生产者,进行check操作。这种情况下一个生产者连不上,可以连另一个。
RocketMQ有四种集群模式
- 单点模式:一旦宕机,无法对外提供服务,在测试环境的时候,由于资源的限制,可以选择单机,执行或者学习rocketmq的功能。
- 主从模式:主节点进行消息读写首发,从节点不能接收消息。从节点用于高可用,防止消息丢失,主节点挂了从节点可以继续对消费者提供服务。但是RocketMQ不支持主从切换,redis cluster支持。
- 双主模式:性能较好,因为消息无需同步到从节点。只有master没有salve;rocketmq每个master之间是互不通信,所以,如果某个mater宕机,该台机器上的数据是无法被消费的,只有等到master重新启动才可以被消费端消费。如果宕机的master磁盘有问题,且无法恢复,会导致master的数据丢失,即消息丢失。
- 双主双从、多主多从:这种模式即每个master至少会有slave(master和slave的集群名称和接单名称一样,只是节点的id不同,而且master的id必须为0,slave的id大于0,可以是一台也可以多台slave,1,2,3,4,5标示不同的slave)。这种方式虽然需要的资源比较多,但是可以保证在master挂掉的时候,不影响消费者的消费,可以减小对消费端的影响。
比较推荐的是带上从节点的集群模式,其中主从节点间的关系如下:
- 主从模式环境构建可以保障消息的即时性与可靠性。
- 投递一条消息后,关闭主节点,从节点继续可以提供消费者数据进行消费,但是不能接收消息。
- 主节点上线后进行消费进度offset同步
参考:
# 3.2 Rocket主从搭建
参考 RocketMQ环境搭建,以上述为例,在另一台服务器上搭建从节点broker。
只需要在 ${rocketmq_home}/conf/test-conf
中对应配置的 broker-a.properties
内容复制到 broker-a-s.properties
中,然后修改两个配置项目
#1 表示 Master,>0 表示 Slave
brokerId=1
#Broker 的角色
#- ASYNC_MASTER 异步复制 Master
#- SYNC_MASTER 同步双写 Master
#- SLAVE
brokerRole=SLAVE
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
最后关闭broker,后重启两个服务器上的主从broker即可。
编辑 (opens new window)
上次更新: 2023/06/04, 12:34:19