CodeAshen's blog CodeAshen's blog
首页
  • Spring Framework

    • 《剖析Spring5核心原理》
    • 《Spring源码轻松学》
  • Spring Boot

    • Spring Boot 2.0深度实践
  • Spring Cloud

    • Spring Cloud
    • Spring Cloud Alibaba
  • RabbitMQ
  • RocketMQ
  • Kafka
  • MySQL8.0详解
  • Redis从入门到高可用
  • Elastic Stack
  • 操作系统
  • 计算机网络
  • 数据结构与算法
  • 云原生
  • Devops
  • 前端
  • 实用工具
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
  • Reference
GitHub (opens new window)

CodeAshen

后端界的小学生
首页
  • Spring Framework

    • 《剖析Spring5核心原理》
    • 《Spring源码轻松学》
  • Spring Boot

    • Spring Boot 2.0深度实践
  • Spring Cloud

    • Spring Cloud
    • Spring Cloud Alibaba
  • RabbitMQ
  • RocketMQ
  • Kafka
  • MySQL8.0详解
  • Redis从入门到高可用
  • Elastic Stack
  • 操作系统
  • 计算机网络
  • 数据结构与算法
  • 云原生
  • Devops
  • 前端
  • 实用工具
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
  • Reference
GitHub (opens new window)
  • RabbitMQ

  • RocketMQ

    • 01-RocketMQ初探门径
      • 4.1 RocketMQ单节点搭建
      • 4.2 RocketMQ控制台
    • 02-RocketMQ急速入门
    • 03-RocketMQ生产者核心应用
    • 04-RocketMQ消费者核心应用
    • 05-RocketMQ核心原理解析
    • 06-RocketMQ高级特性
  • Kafka

  • Nginx

  • 中间件
  • RocketMQ
CodeAshen
2023-02-10
目录

01-RocketMQ初探门径

# 一、RocketMQ 整体介绍

  • RocketMQ 是一款分布式、队列模型的消息中间件
  • 支持集群模型、负载均衡、水平扩展能力
  • 亿级别的消息堆积能力
  • 采用零拷贝的原理、顺序写盘、随机读
  • 丰富的API使用(同步消息、异步消息、顺序消息、延迟消息、事务消息)
  • 代码优秀,底层通信框架采用Netty NIO框架
  • NameServer 代替 Zookeper
  • 强调集群无单点,可扩展,任意一点高可用,水平可扩展
  • 消息失败重试机制、消息可查询
  • 开源社区活跃、成熟度(经过双十一考验)

# 二、RocketMQ概念模型

  • **Producer:**消息生产者,负责生产消息,一般由业务系统负责产生消息。
  • **Consumer:**消息消费者,负责消费消息,一般是由后台系统负责异步消费。
  • **Push Consumer:**Consumer的一种,需要向Consumer对象注册监听。
  • **Pull Consumer:**Consumer的以中,需要主动请求Broker拉去消息。
  • **Producer Group:**生产者集合,一般用于发送一类消息。
  • **Consumer Group:**消费者集合,一般用于接受一类消息进行消费。
  • **Broker:**MQ消息服务(中转角色,用于消息存储与生产消息转发)。

参考官方文档:基本概念 (opens new window)

# 三、RocketMQ源码包结构

  • rocketmq-broker:主要的业务逻辑,消息收发,主从同步,pagecache
  • rocketmq-client:客户端接口,比如生产者和消费者
  • rocketmq-example:示例,比如生产者和消费者
  • rocketmq-common:公用数据结构等等
  • rocketmq-distribution:编译模块,编译输出等
  • rocketmq-fliter:进行Broker过滤的不感兴趣的消息传输,减小带宽压力
  • rocketmq-logappender、rocketmq-logging:日志相关
  • rocketmq-namesrv:Namesrv服务,用于服务协调
  • rocketmq-openmessaging:对外提供服务
  • rocketmq-remoting:远程调用接口,封装Netty底层通信
  • rocketmq-srvutil:提供一些公用的工具方法,比如解析命令行参数
  • rocketmq-store:消息存储
  • rocketmq-test、rocketmq-example:测试、示例
  • rocketmq-tools:管理工具,比如有名的mqadmin工具

# 四、RocketMQ环境搭建

# 4.1 RocketMQ单节点搭建

RocketMQ天然支持集群,这里我们使用单点为例,演示搭建过程

  1. hosts添加信息

    vi /etc/hosts
    
    # 配置以下信息
    192.168.36.123 rocketmq-nameserver1
    196.168.36.123 rocketmq-master1
    
    1
    2
    3
    4
    5
  2. 上传解压

    地址:

    # 上传 apache-rocketmq.tar.gz 文件至/usr/local
    tar -zxvf apache-rocketmq.tar.gz -C /usr/local
    # 建立软连接
    ln -s apache-rocketmq rocketmq
    ll /usr/local
    
    1
    2
    3
    4
    5
  3. 创建存储路径

    mkdir /usr/local/rocketmq/store
    mkdir /usr/local/rocketmq/store/commitlog
    mkdir /usr/local/rocketmq/store/consumequeue
    mkdir /usr/local/rocketmq/store/index
    
    1
    2
    3
    4
  4. RocketMQ配置文件

    配置文件在 ${rocketmq_home}/conf 下,有不同集群模式的配置,我们复制一个,在此基础上修改

    cd /usr/local/rocketmq/conf
    cp -r 2m-2s-async/ test-conf/
    vim test-conf/broker-a.properties
    
    1
    2
    3

    使用以下配置

    #所属集群名字
    brokerClusterName=rocketmq-cluster
    #broker 名字,注意此处不同的配置文件填写的不一样
    brokerName=broker-a
    #0 表示 Master,>0 表示 Slave
    brokerId=0
    #nameServer 地址,分号分割
    namesrvAddr=rocketmq-nameserver1:9876
    #在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    #Broker 对外服务的监听端口
    listenPort=10911
    #删除文件时间点,默认凌晨 4 点
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=120
    #commitLog 每个文件的大小默认 1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    #存储路径
    storePathRootDir=/usr/local/rocketmq/store
    #commitLog 存储路径
    storePathCommitLog=/usr/local/rocketmq/store/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
    #消息索引存储路径
    storePathIndex=/usr/local/rocketmq/store/index
    #checkpoint 文件存储路径
    storeCheckpoint=/usr/local/rocketmq/store/checkpoint
    #abort 文件存储路径
    abortFile=/usr/local/rocketmq/store/abort
    #限制的消息大小
    maxMessageSize=65536
    
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000
    
    #Broker 的角色
    #- ASYNC_MASTER 异步复制 Master
    #- SYNC_MASTER 同步双写 Master
    #- SLAVE
    brokerRole=ASYNC_MASTER
    #刷盘方式
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH
    #checkTransactionMessageEnable=false
    #发消息线程池数量
    #sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessageThreadPoolNums=128
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
  5. 修改日志配置文件

    mkdir -p /usr/local/rocketmq/logs
    cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml
    
    1
    2
  6. 修改启动脚本参数

    RocketMQ对内存要求较高,启动脚本配置的JVM内存很大,这里修改一下

    # 修改broker启动配置
    vim /usr/local/rocketmq/bin/runbroker.sh
    
    1
    2

    修改配置:JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m - XX:MaxPermSize=320m"

    # 修改nameserver启动配置
    vim /usr/local/rocketmq/bin/runserver.sh
    
    1
    2

    修改配置:JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m - XX:MaxPermSize=320m"

  7. 启动 NameServer 和 Broker

    cd /usr/local/rocketmq/bin
    # 先启动 NameServer
    nohup sh mqnamesrv &
    # 再启动 broker,指定配置文件,以及输出
    nohup sh mqbroker -c /usr/local/rocketmq/conf/test-conf/broker-a.properties >/dev/null 2>&1 &
    
    # jps命令查看java进程,验证
    jps
    
    1
    2
    3
    4
    5
    6
    7
    8
  8. 关闭进程和数据清理

    cd /usr/local/rocketmq/bin
    sh mqshutdown broker
    sh mqshutdown namesrv
    # --等待停止
    rm -rf /usr/local/rocketmq/store
    mkdir /usr/local/rocketmq/store
    mkdir /usr/local/rocketmq/store/commitlog
    mkdir /usr/local/rocketmq/store/consumequeue
    mkdir /usr/local/rocketmq/store/index
    # --按照上面步骤重启 NameServer 与 BrokerServer
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10

# 4.2 RocketMQ控制台

官方提供的RocketMQ控制台,是一个SpringBoot项目,github地址:rocketmq-externals (opens new window)

打包其中的 rocketmq-console 模块,mvn clean package -Dmaven.test.skip=true,在运行jar包即可

编辑 (opens new window)
上次更新: 2023/06/04, 12:34:19
第05章-高可靠RabbitMQ集群架构
02-RocketMQ急速入门

← 第05章-高可靠RabbitMQ集群架构 02-RocketMQ急速入门→

最近更新
01
第01章-RabbitMQ导学
02-10
02
第02章-入门RabbitMQ核心概念
02-10
03
第03章-RabbitMQ高级特性
02-10
更多文章>
Theme by Vdoing | Copyright © 2020-2023 CodeAshen | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式