零、本文纲要
一、RocketMQ基础
- MQ特点
- RocketMQ安装
- 测试RocketMQ
二、RocketMQ集群搭建
- 角色
- 集群特点
- 集群模式
- 双Master双Slave模式-同步双写
三、mqadmin管理工具
四、RocketMQ监控平台
一、RocketMQ基础
1. MQ特点
优点: 应用解耦 / 流量削峰 / 数据分发
缺点: 可用性降低 / 复杂度提高 / 一致性问题
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
开发语言 | Java | Erlang | Java | Scala |
单机吞吐量 | 万级 | 万级 | 10万级 | 10万级 |
时效性 | ms | us | ms | ms以内 |
可用性 | 高(主从) | 高(主从) | 非常高(分布式架构) | 非常高(分布式架构) |
功能特性 | 成熟的产品,有较多的文档,各种协议支持较好。 | 基于Erlang开发,并发能力很强,性能极好,延时性低,管理界面友好。 | MQ功能比较完善,扩展性佳。 | 只支持主要的MQ功能,像一些消息查询、消息回溯等功能没有,为了大数据准备的。 |
2. RocketMQ安装
- ① 下载安装包/解压
官方下载网址,版本为4.9.3
解压到指定目录下:unzip rocketmq-all-4.9.3-bin-release.zip -d /usr/local
- ② 启动 NameServer
后台启动:nohup sh bin/mqnamesrv &
查看启动日志:tail -f ~/logs/rocketmqlogs/namesrv.log
注意:如果遇到nohup: 忽略输入并把输出追加到"nohup.out"
表示默认输出内容到当前目录下的nohup.out文件中【并不是报错】。
也可以自定义,将输出日志到我们指定的文件,如下:
先创建目录:mkdir -p /root/logs/rocketmqlogs/
,
再创建文件:touch /root/logs/rocketmqlogs/namesrv.log
,
最后再使用nohup sh bin/mqnamesrv > ~/logs/rocketmqlogs/namesrv.log 2>&1 &
命令。
(此处是RocketMQ启动运行即可生成的日志文件,仅作示范用。)
补充 Linux 命令内容
nohup Command [ Arg … ] [ & ]
Command:要执行的命令;
Arg:一些参数,可以指定输出文件;
&:让命令在后台执行,终端退出后命令仍旧执行。
解读我们的命令:nohup sh bin/mqnamesrv > ~/logs/rocketmqlogs/namesrv.log 2>&1 &
第一处>
表示标准输出重定向,此处省略了前面的 1;
~/logs/rocketmqlogs/namesrv.log
表示重定向的目标文件;
2>&1
表示将标准错误 2 重定向到标准输出 &1;
最后标准输出 &1 再被重定向输入到 namesrv.log 文件中。
- ③ 启动 Broker
后台启动:nohup sh bin/mqbroker -n localhost:9876 &
查看启动日志:tail -f ~/logs/rocketmqlogs/broker.log
注意:
runbroker.sh默认JVM环境配置内存较大,此处可以按需做调整,默认配置如下:
# 修改前
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
# 修改后
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
runserver.sh默认JVM环境配置内存也较大,此处可以按需做调整,默认配置如下:
# 修改前
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
# 修改后
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
- ④ 关闭RocketMQ
关闭 NameServer:sh bin/mqshutdown namesrv
关闭 Broker:sh bin/mqshutdown broker
3. 测试RocketMQ
窗口一:
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
窗口二:
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
关闭:
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
二、RocketMQ集群搭建
1. 角色
Producer(消息发送者) / Consumer(消息接收者)
Broker(暂存和传输消息) / NameServer(管理Broker) / Topic(区分消息的种类) / Message Queue(Topic的分区,存储消息的物理地址)
2. 集群特点
- ① NameServer
无状态节点,直接启动多个即为集群,节点间无需信息同步;
- ② Broker
分主从,一主多从,不可一从多主;
主从关系指定使用相同的BrokerName,不同BrokerId,0表示主,非0为从;
每个Broker节点与NameServer集群所有节点建立长连接,定时注册Topic到所有NameServer;
- ③ Producer
无状态,直接启动多个即为集群,节点间无需信息同步;
与NameServer集群中的一个节点(随机)建立长连接,定期从NameServer取Topic路由信息;
向提供Topic服务的Master建立长连接,定时发送心跳;
- ④ Consumer
与NameServer集群中的一个节点(随机)建立长连接,定期从NameServer取Topic路由信息;
向提供Topic服务的Master、Slave建立长连接,定时发送心跳;
既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定;
3. 集群模式
单Master模式 / 多Master模式 / 多Master多Slave模式(同步/异步)
4. 双Master双Slave模式-同步双写
- ① 环境准备
序号 | IP-Addr | 角色 | 架构模式 |
---|---|---|---|
1 | 192.168.253.128 | nameserver、brokerserver | Master1、Slave2 |
2 | 192.168.253.130 | nameserver、brokerserver | Master2、Slave1 |
主机1:broker-a的master、broker-b的slave
主机2:broker-b的master、broker-a的slave
- ② 配置Host
命令:vim /etc/hosts
# nameserver
192.168.253.128 rocketmq-nameserver1
192.168.253.130 rocketmq-nameserver2
# broker
192.168.253.128 rocketmq-master1
192.168.253.128 rocketmq-slave2
192.168.253.130 rocketmq-master2
192.168.253.130 rocketmq-slave1
重启网卡:systemctl restart network
- ③ 开放防火墙
# 开放name server默认端口
firewall-cmd --remove-port=9876/tcp --permanent
# 开放master默认端口
firewall-cmd --remove-port=10911/tcp --permanent
# 开放slave默认端口 (当前集群模式可不开启)
firewall-cmd --remove-port=11011/tcp --permanent
# 重启防火墙
firewall-cmd --reload
- ④ 配置环境变量
命令:vim /etc/profile
#set rocketmq
ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq-all-4.4.0-bin-release
PATH=$PATH:$ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PATH
重新加载使之生效:source /etc/profile
- ⑤ 自定义消息存储路径
mkdir -p /usr/local/rocketmq-all-4.4.0-bin-release/store
mkdir -p /usr/local/rocketmq-all-4.4.0-bin-release/store/commitlog
mkdir -p /usr/local/rocketmq-all-4.4.0-bin-release/store/consumequeue
mkdir -p /usr/local/rocketmq-all-4.4.0-bin-release/store/index
- ⑥ 编写Broker配置文件
通过查看conf/目录不难看出官方默认提供了各种类型的基础配置文件,可以直接使用。如下:
├── conf
├── 2m-2s-async #双主双从(异步)
├── 2m-2s-sync #双主双从(同步)
├── 2m-noslave #双主
Ⅰ master的配置文件
注意修改brokerName、listenPort
主机1:broker-a的master、broker-b的slave
主机2:broker-b的master、broker-a的slave
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq-all-4.4.0-bin-release/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq-all-4.4.0-bin-release/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq-all-4.4.0-bin-release/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq-all-4.4.0-bin-release/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq-all-4.4.0-bin-release/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq-all-4.4.0-bin-release/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
Ⅱ broker的配置文件
注意修改brokerName、listenPort
主机1:broker-a的master、broker-b的slave
主机2:broker-b的master、broker-a的slave
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq-all-4.4.0-bin-release/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq-all-4.4.0-bin-release/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq-all-4.4.0-bin-release/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq-all-4.4.0-bin-release/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq-all-4.4.0-bin-release/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq-all-4.4.0-bin-release/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
- ⑦ 启动集群服务
Ⅰ 启动NameServer集群
分别启动两个主机的NameServer服务,如下:
cd /usr/local/rocketmq-all-4.4.0-bin-release/bin
nohup sh mqnamesrv &
Ⅱ 启动Broker集群
分别启动两个主机的BrokerServer服务,如下:
cd /usr/local/rocketmq-all-4.4.0-bin-release/bin
nohup sh mqbroker -c /usr/local/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a.properties &
cd /usr/local/rocketmq-all-4.4.0-bin-release/bin
nohup sh mqbroker -c /usr/local/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b-s.properties &
cd /usr/local/rocketmq-all-4.4.0-bin-release/bin
nohup sh mqbroker -c /usr/local/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b.properties &
cd /usr/local/rocketmq-all-4.4.0-bin-release/bin
nohup sh mqbroker -c /usr/local/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a-s.properties &
三、mqadmin管理工具
执行命令方法:./mqadmin {command} {args}
具体的内容可见官方文档。
由于日常比较少使用,可以使用的时候再查询文档。另外官方也提供了控制台工具,后文会介绍。
四、RocketMQ监控平台
官方拓展开源项目网址,在对应网址内下载rocketmq-console/rocketmq-dashboard。
rocketmq-dashboard开源项目网址,下载开源项目代码,如下:
然后使用mvn工具打包如下:
mvn clean package -Dmaven.test.skip=true
然后上传至对应主机运行,如下:
运行jar包,如下:
java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar
访问RocketMQ-Dashboard,默认为http://192.168.253.128:8080
。
五、结尾
以上即为RocketMQ-基础使用(一)的基础内容,感谢阅读。