06-RocketMQ高级特性
# 一、事务消息
# 1.1 事务消息概述
以A账户转账到B账户为例,解释RocketMQ事务机制,如下图所示:
- 生产端发送事务消息(half消息)到RocketMQ,并提供一个check回调函数
- 根据消息发送结果,执行本地事务,A账户减钱,根据结果发送Commit/Rollback消息
- 生产段发送Commit消息前,half消息对消费端不可见
- 生产段发送Commit消息后,RocketMQ将事务消息置为可见,消费端消费到消息,执行B账户加钱逻辑
- 为了防止如果Commit/Rollback消息发送失败或超时,RocketMQ有回调生产段check函数的机制,生产端检查本地事务执行结果,重新Commit或者Rollback
具体执行流程和设计理念参考官方文档:
# 1.2 事务消息示例
事务消息生产者使用 TransactionMQProducer
,指定本地事务和事务回查的对象 TransactionListener
,通过 sendMessageInTransaction
方法发送事务消息。
public class TransactionProducer {
public static final String TX_PRODUCER_GROUP = "test_tx_producer_group";
public static final String TX_TOPIC = "test_tx_topic";
public static void main(String[] args) throws MQClientException {
// Producer使用的线程池
ExecutorService executorService = new ThreadPoolExecutor(
2,
5,
100,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),
r -> new Thread(r, "test_tx_producer_group-check-thread"),
new ThreadPoolExecutor.AbortPolicy()
);
// Producer的Listener对象,做两件事情:1-异步执行本地事务 2-供MQ做事务回查
TransactionListener transactionListener = new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.err.println("----------- 执行本地事务 -----------");
System.err.println("callArg: " + arg);
// tx.begin
// 数据库落库操作
// tx.commit
// return LocalTransactionState.COMMIT_MESSAGE;
return LocalTransactionState.UNKNOW; // 如果返回unknow,MQ会执行事务回查
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.err.println("----------- 本地事务回查 -----------");
// 回查逻辑
return LocalTransactionState.COMMIT_MESSAGE;
}
};
// 创建事务Producer并启动
TransactionMQProducer producer = new TransactionMQProducer(TX_PRODUCER_GROUP);
producer.setNamesrvAddr(Const.M2_S2_ASYNC);
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
// 发送事务消息
Message message = new Message(TX_TOPIC, "TagA", "TX-Key",
"Hello, Transaction Message".getBytes(StandardCharsets.UTF_8));
producer.sendMessageInTransaction(message, "我是回调参数,本地事务执行和事务回查,都能拿到我");
// producer.shutdown();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
消费者和普通消费者没什么区别,消费者示例:
public class TransactionConsumer {
public static final String TX_CONSUMER_GROUP = "test_tx_consumer_group";
public static final String TX_TOPIC = "test_tx_topic";
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(TX_CONSUMER_GROUP);
consumer.setNamesrvAddr(Const.M2_S2_ASYNC);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe(TX_TOPIC, "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
System.err.printf("topic: %s, tags: %s, keys: %s, body: %s\n",
msg.getTopic(), msg.getTags(), msg.getKeys(), new String(msg.getBody()));
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.err.println("tx consumer started...");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 二、顺序消息
# 1.1 顺序消息概述
- 顺序消息:指的是消息的消费顺序和生产顺序相同
- 全局顺序:在某个topic下,所有的消息都要保证顺序
- 局部顺序:只要保证每一组消息被顺序消费即可
参卡:
# 1.2 顺序消费示例
生产者:
public class OrderlyProducer {
public static final String ORDERLY_PRODUCER_GROUP = "test_orderly_producer_name";
public static final String ORDERLY_TOPIC = "test_orderly_producer_name";
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer(ORDERLY_PRODUCER_GROUP);
producer.setNamesrvAddr(Const.M2_S2_ASYNC);
producer.start();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
for (int i = 0; i < 10; i++) {
String tag = i % 2 == 0 ? "A" : "B";
String key = "key-" + i;
byte[] body = sdf.format(new Date()).getBytes(StandardCharsets.UTF_8);
Message message = new Message(ORDERLY_TOPIC, tag, key, body);
// 偶数投递到0队列,奇数投递到1队列
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int index = (Integer) arg % 2;
return mqs.get(index);
}
}, i);
System.out.println(sendResult);
}
producer.shutdown();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
消费者:
public class OrderlyConsumer {
public static final String ORDERLY_CONSUMER_GROUP = "test_orderly_consumer_name";
public static final String ORDERLY_TOPIC = "test_orderly_producer_name";
public OrderlyConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(ORDERLY_CONSUMER_GROUP);
consumer.setNamesrvAddr(Const.M2_S2_ASYNC);
// 设置第一次启动从队列头部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(ORDERLY_TOPIC, "*");
consumer.registerMessageListener(new Listener());
consumer.start();
System.err.println("consumer started...");
}
// ********* 消费者监听对象,实现的是 MessageListenerOrderly 接口 *********
class Listener implements MessageListenerOrderly {
private Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(msg + ", content: " + new String(msg.getBody()));
try {
TimeUnit.SECONDS.sleep(random.nextInt(4) + 1);
} catch (Exception e) {
e.printStackTrace();
// return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; // 暂停当前队列片刻
}
}
return ConsumeOrderlyStatus.SUCCESS; // 消费成功
}
}
public static void main(String[] args) throws MQClientException {
new OrderlyConsumer();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 三、消息过滤
# 3.1 消息过滤概述
RocketMQ消息过滤方式:
Tag过滤:ConsumeQueue中存储了消息tag的哈希值,可以在broker端就进行过滤,性能高。
SQL92表达式过滤:大制做法和tag方式一样,具体过滤流程不同,需要解析sql表达式进行过滤。每次过滤都去执行SQL表达式影响效率,使用了BloomFilter避免每次都执行。
要在broker上配置 enablePropertyFilter=true
Filter Server过滤:broker启动配置(4.3.x之后不再支持),FilterServer是一种比SQL更灵活的过滤方式,允许自定义JAVA函数,FilterServer使用需要引入Filter组件,可以在配置文件中设置,理解为RocketMQ的本地Consumer进程,从本机进行获取并过滤消息,FilterServer目的就是为了减小网络传输而节省带宽,从而提升性能。
参考:
# 3.2 消息过滤示例
# Tag过滤
// 生产者
public class TagFilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC"};
for (int i = 0; i < 60; i++) {
Message msg = new Message("TagFilterTest",
tags[i % tags.length], // 消息中指定一个tag
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 消费者
public class TagFilterConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException, IOException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 指定订阅的topic和tag
consumer.subscribe("TagFilterTest", "TagA || TagC");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# SQL过滤
// 生产者
public class SqlFilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC"};
for (int i = 0; i < 10; i++) {
Message msg = new Message("SqlFilterTest",
tags[i % tags.length],
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
msg.putUserProperty("a", String.valueOf(i)); // 设置消息属性
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 消费者
public class SqlFilterConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 要在broker上配置 enablePropertyFilter=true
// 指定订阅的主题和sql表达式
consumer.subscribe("SqlFilterTest",
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB')) and (a is not null and a between 0 and 3)"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 四、RocketMQ使用建议
提升吞吐量、提高性能的设置方案:
- 提高Consumer处理能力,通过增加机器,启动多个Consumer实例,或者增加同一个Consumer的内部线程并行度。
- 批量消费(设置consumeMessageBatchMaxSize参数)
- topic下的队列个数应该与Consumer数量契合
- 生产者发送oneway消息,单次发送,适用于可靠性要求不高的场景
- 多生产者同时发送消息
- 文件系统使用ext4/io调度算法使用deadline算法
编辑 (opens new window)
上次更新: 2023/06/04, 12:34:19