CodeAshen's blog CodeAshen's blog
首页
  • Spring Framework

    • 《剖析Spring5核心原理》
    • 《Spring源码轻松学》
  • Spring Boot

    • Spring Boot 2.0深度实践
  • Spring Cloud

    • Spring Cloud
    • Spring Cloud Alibaba
  • RabbitMQ
  • RocketMQ
  • Kafka
  • MySQL8.0详解
  • Redis从入门到高可用
  • Elastic Stack
  • 操作系统
  • 计算机网络
  • 数据结构与算法
  • 云原生
  • Devops
  • 前端
  • 实用工具
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
  • Reference
GitHub (opens new window)

CodeAshen

后端界的小学生
首页
  • Spring Framework

    • 《剖析Spring5核心原理》
    • 《Spring源码轻松学》
  • Spring Boot

    • Spring Boot 2.0深度实践
  • Spring Cloud

    • Spring Cloud
    • Spring Cloud Alibaba
  • RabbitMQ
  • RocketMQ
  • Kafka
  • MySQL8.0详解
  • Redis从入门到高可用
  • Elastic Stack
  • 操作系统
  • 计算机网络
  • 数据结构与算法
  • 云原生
  • Devops
  • 前端
  • 实用工具
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
  • Reference
GitHub (opens new window)
  • RabbitMQ

  • RocketMQ

    • 01-RocketMQ初探门径
    • 02-RocketMQ急速入门
    • 03-RocketMQ生产者核心应用
    • 04-RocketMQ消费者核心应用
    • 05-RocketMQ核心原理解析
    • 06-RocketMQ高级特性
      • 1.1 事务消息概述
      • 1.2 事务消息示例
      • 1.1 顺序消息概述
      • 1.2 顺序消费示例
      • 3.1 消息过滤概述
      • 3.2 消息过滤示例
        • Tag过滤
        • SQL过滤
  • Kafka

  • Nginx

  • 中间件
  • RocketMQ
CodeAshen
2023-02-10
目录

06-RocketMQ高级特性

# 一、事务消息

# 1.1 事务消息概述

以A账户转账到B账户为例,解释RocketMQ事务机制,如下图所示:

image-20210623153427606

  1. 生产端发送事务消息(half消息)到RocketMQ,并提供一个check回调函数
  2. 根据消息发送结果,执行本地事务,A账户减钱,根据结果发送Commit/Rollback消息
  3. 生产段发送Commit消息前,half消息对消费端不可见
  4. 生产段发送Commit消息后,RocketMQ将事务消息置为可见,消费端消费到消息,执行B账户加钱逻辑
  5. 为了防止如果Commit/Rollback消息发送失败或超时,RocketMQ有回调生产段check函数的机制,生产端检查本地事务执行结果,重新Commit或者Rollback

具体执行流程和设计理念参考官方文档:

  • RocketMQ设计-事务消息 (opens new window)

# 1.2 事务消息示例

事务消息生产者使用 TransactionMQProducer,指定本地事务和事务回查的对象 TransactionListener,通过 sendMessageInTransaction 方法发送事务消息。

public class TransactionProducer {
    public static final String TX_PRODUCER_GROUP = "test_tx_producer_group";
    public static final String TX_TOPIC = "test_tx_topic";
    
    public static void main(String[] args) throws MQClientException {
        // Producer使用的线程池
        ExecutorService executorService = new ThreadPoolExecutor(
                2,
                5,
                100,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),
                r -> new Thread(r, "test_tx_producer_group-check-thread"),
                new ThreadPoolExecutor.AbortPolicy()
        );
        
        // Producer的Listener对象,做两件事情:1-异步执行本地事务 2-供MQ做事务回查
        TransactionListener transactionListener = new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                System.err.println("----------- 执行本地事务 -----------");
                System.err.println("callArg: " + arg);
                // tx.begin
                // 数据库落库操作
                // tx.commit
                // return LocalTransactionState.COMMIT_MESSAGE;
                return LocalTransactionState.UNKNOW;    // 如果返回unknow,MQ会执行事务回查
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                System.err.println("----------- 本地事务回查 -----------");
                // 回查逻辑
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        };

        // 创建事务Producer并启动
        TransactionMQProducer producer = new TransactionMQProducer(TX_PRODUCER_GROUP);
        producer.setNamesrvAddr(Const.M2_S2_ASYNC);
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();
        
        // 发送事务消息
        Message message = new Message(TX_TOPIC, "TagA", "TX-Key", 
                "Hello, Transaction Message".getBytes(StandardCharsets.UTF_8));
        producer.sendMessageInTransaction(message, "我是回调参数,本地事务执行和事务回查,都能拿到我");

        // producer.shutdown();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

消费者和普通消费者没什么区别,消费者示例:

public class TransactionConsumer {
    public static final String TX_CONSUMER_GROUP = "test_tx_consumer_group";
    public static final String TX_TOPIC = "test_tx_topic";
    
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(TX_CONSUMER_GROUP);
        consumer.setNamesrvAddr(Const.M2_S2_ASYNC);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.subscribe(TX_TOPIC, "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    for (MessageExt msg : msgs) {
                        System.err.printf("topic: %s, tags: %s, keys: %s, body: %s\n",
                                msg.getTopic(), msg.getTags(), msg.getKeys(), new String(msg.getBody()));
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        
        consumer.start();
        System.err.println("tx consumer started...");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# 二、顺序消息

# 1.1 顺序消息概述

  • 顺序消息:指的是消息的消费顺序和生产顺序相同
  • 全局顺序:在某个topic下,所有的消息都要保证顺序
  • 局部顺序:只要保证每一组消息被顺序消费即可

参卡:

  • RocketMQ概念-顺序消息 (opens new window)
  • RocketMQ特性-顺序消息 (opens new window)

# 1.2 顺序消费示例

生产者:

public class OrderlyProducer {

    public static final String ORDERLY_PRODUCER_GROUP = "test_orderly_producer_name";
    public static final String ORDERLY_TOPIC = "test_orderly_producer_name";

    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer producer = new DefaultMQProducer(ORDERLY_PRODUCER_GROUP);
        producer.setNamesrvAddr(Const.M2_S2_ASYNC);
        producer.start();

        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        for (int i = 0; i < 10; i++) {
            String tag = i % 2 == 0 ? "A" : "B";
            String key = "key-" + i;
            byte[] body = sdf.format(new Date()).getBytes(StandardCharsets.UTF_8);
            Message message = new Message(ORDERLY_TOPIC, tag, key, body);
            
            // 偶数投递到0队列,奇数投递到1队列
            SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    int index = (Integer) arg % 2;
                    return mqs.get(index);
                }
            }, i);

            System.out.println(sendResult);
        }

        producer.shutdown();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

消费者:

public class OrderlyConsumer {

    public static final String ORDERLY_CONSUMER_GROUP = "test_orderly_consumer_name";
    public static final String ORDERLY_TOPIC = "test_orderly_producer_name";
    
    public OrderlyConsumer() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(ORDERLY_CONSUMER_GROUP);
        consumer.setNamesrvAddr(Const.M2_S2_ASYNC);
        // 设置第一次启动从队列头部开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe(ORDERLY_TOPIC, "*");
        consumer.registerMessageListener(new Listener());
        consumer.start();
        System.err.println("consumer started...");
    }

    // ********* 消费者监听对象,实现的是 MessageListenerOrderly 接口 *********
    class Listener implements MessageListenerOrderly {

        private Random random = new Random();

        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            for (MessageExt msg : msgs) {
                System.out.println(msg + ", content: " + new String(msg.getBody()));
                try {
                    TimeUnit.SECONDS.sleep(random.nextInt(4) + 1);
                } catch (Exception e) {
                    e.printStackTrace();
                    // return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;  // 暂停当前队列片刻
                }
            }

            return ConsumeOrderlyStatus.SUCCESS;  // 消费成功
        }
    }

    public static void main(String[] args) throws MQClientException {
        new OrderlyConsumer();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

# 三、消息过滤

# 3.1 消息过滤概述

RocketMQ消息过滤方式:

  • Tag过滤:ConsumeQueue中存储了消息tag的哈希值,可以在broker端就进行过滤,性能高。

  • SQL92表达式过滤:大制做法和tag方式一样,具体过滤流程不同,需要解析sql表达式进行过滤。每次过滤都去执行SQL表达式影响效率,使用了BloomFilter避免每次都执行。

    要在broker上配置 enablePropertyFilter=true

  • Filter Server过滤:broker启动配置(4.3.x之后不再支持),FilterServer是一种比SQL更灵活的过滤方式,允许自定义JAVA函数,FilterServer使用需要引入Filter组件,可以在配置文件中设置,理解为RocketMQ的本地Consumer进程,从本机进行获取并过滤消息,FilterServer目的就是为了减小网络传输而节省带宽,从而提升性能。

参考:

  • RocketMQ设计-消息过滤 (opens new window)

# 3.2 消息过滤示例

# Tag过滤

// 生产者
public class TagFilterProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC"};

        for (int i = 0; i < 60; i++) {
            Message msg = new Message("TagFilterTest",
                tags[i % tags.length],  // 消息中指定一个tag
                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }

        producer.shutdown();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 消费者
public class TagFilterConsumer {
    public static void main(String[] args) throws InterruptedException, MQClientException, IOException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        // 指定订阅的topic和tag
        consumer.subscribe("TagFilterTest", "TagA || TagC");  
        
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# SQL过滤

// 生产者
public class SqlFilterProducer {
    public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC"};

        for (int i = 0; i < 10; i++) {
            Message msg = new Message("SqlFilterTest",
                tags[i % tags.length],
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            msg.putUserProperty("a", String.valueOf(i));  // 设置消息属性

            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }

        producer.shutdown();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 消费者
public class SqlFilterConsumer {
    public static void main(String[] args) throws Exception {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

        // 要在broker上配置 enablePropertyFilter=true
        // 指定订阅的主题和sql表达式
        consumer.subscribe("SqlFilterTest",
            MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB')) and (a is not null and a between 0 and 3)"));

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

# 四、RocketMQ使用建议

提升吞吐量、提高性能的设置方案:

  • 提高Consumer处理能力,通过增加机器,启动多个Consumer实例,或者增加同一个Consumer的内部线程并行度。
  • 批量消费(设置consumeMessageBatchMaxSize参数)
  • topic下的队列个数应该与Consumer数量契合
  • 生产者发送oneway消息,单次发送,适用于可靠性要求不高的场景
  • 多生产者同时发送消息
  • 文件系统使用ext4/io调度算法使用deadline算法
编辑 (opens new window)
上次更新: 2023/06/04, 12:34:19
05-RocketMQ核心原理解析
初识Kafka

← 05-RocketMQ核心原理解析 初识Kafka→

最近更新
01
第01章-RabbitMQ导学
02-10
02
第02章-入门RabbitMQ核心概念
02-10
03
第03章-RabbitMQ高级特性
02-10
更多文章>
Theme by Vdoing | Copyright © 2020-2023 CodeAshen | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式