1. 创建namesrv服务
拉取镜像
docker pull rocketmqinc/rocketmq
创建namesrv数据存储路径
mkdir -p /opt/app/rocketmq/data/namesrv/logs /opt/app/rocketmq/data/namesrv/store
构建namesrv容器
docker run -d \
--restart=always \
--name rmqnamesrv \
-p 9876:9876 \
-v /opt/app/rocketmq/data/namesrv/logs:/root/logs \
-v /opt/app/rocketmq/data/namesrv/store:/root/store \
-e "MAX_POSSIBLE_HEAP=100000000" \
rocketmqinc/rocketmq \
sh mqnamesrv
参数 | 说明 |
---|---|
-d | 以守护进程的方式启动 |
- -restart=always | docker重启时候容器自动重启 |
- -name rmqnamesrv | 把容器的名字设置为rmqnamesrv |
-p 9876:9876 | 把容器内的端口9876挂载到宿主机9876上面 |
-v /docker/rocketmq/data/namesrv/logs:/root/logs | 把容器内的/root/logs日志目录挂载到宿主机的 /docker/rocketmq/data/namesrv/logs目录 |
-v /docker/rocketmq/data/namesrv/store:/root/store | 把容器内的/root/store数据存储目录挂载到宿主机的 /docker/rocketmq/data/namesrv目录 |
rmqnamesrv | 容器的名字 |
-e “MAX_POSSIBLE_HEAP=100000000” | 设置容器的最大堆内存为100000000 |
rocketmqinc/rocketmq | 使用的镜像名称 |
sh mqnamesrv | 启动namesrv服务 |
2. 创建broker节点
创建broker数据存储路径
mkdir -p /opt/app/rocketmq/data/broker/logs /opt/app/rocketmq/data/broker/store /docker/rocketmq/conf
创建配置文件
vi /opt/app/rocketmq/conf/broker.conf
# 所属集群名称,如果节点较多可以配置多个
brokerClusterName = oemCluster
#broker名称,master和slave使用相同的名称,表明他们的主从关系
brokerName = broker-oem
#0表示Master,大于0表示不同的slave
brokerId = 0
#表示几点做消息删除动作,默认是凌晨4点
deleteWhen = 04
#在磁盘上保留消息的时长,单位是小时
fileReservedTime = 48
#有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制;
brokerRole = ASYNC_MASTER
#刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要;
flushDiskType = ASYNC_FLUSH
# 设置broker节点所在服务器的ip地址
brokerIP1 = 10.1.248.199
## if acl is open,the flag will be true
aclEnable=true
权限文件/conf/plain_acl.yml:
vim /opt/app/rocketmq/conf/plain_acl.yml
globalWhiteRemoteAddresses:
accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress:
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: SUB
topicPerms:
- topicA=DENY
- topicB=PUB|SUB
- topicC=SUB
groupPerms:
# the group should convert to retry topic
- groupA=DENY
- groupB=PUB|SUB
- groupC=SUB
- accessKey: admin
secretKey: hua@123
whiteRemoteAddress:
# if it is admin, it could access all resources
admin: true
字段 | 取值 | 含义 |
---|---|---|
globalWhiteRemoteAddresses | ;192.168..*;192.168.0.1 | 全局IP白名单 |
accessKey | 字符串 | Access Key |
secretKey | 字符串 | Secret Key |
whiteRemoteAddress | ;192.168..*;192.168.0.1 | 用户IP白名单 |
admin | true;false | 是否管理员账户 |
defaultTopicPerm | DENY;PUB;SUB;PUB|SUB | 默认的Topic权限 |
defaultGroupPerm | DENY;PUB;SUB;PUB|SUB | 默认的ConsumerGroup权限 |
topicPerms | topic=权限 | 各个Topic的权限 |
groupPerms | group=权限 | 各个ConsumerGroup的权限 |
构建broker容器
docker run -d \
--restart=always \
--name rmqbroker \
--link rmqnamesrv:namesrv \
-p 10911:10911 \
-p 10909:10909 \
-v /opt/app/rocketmq/data/broker/logs:/root/logs \
-v /opt/app/rocketmq/data/broker/store:/root/store \
-v /opt/app/rocketmq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf \
-v /opt/app/rocketmq/conf/plain_acl.yml:/opt/rocketmq-4.4.0/conf/plain_acl.yml \
-e "NAMESRV_ADDR=namesrv:9876" \
-e "MAX_POSSIBLE_HEAP=200000000" \
rocketmqinc/rocketmq \
sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
参数 | 说明 |
---|---|
-d | 以守护进程的方式启动 |
–restart=always | docker重启时候镜像自动重启 |
- -name rmqbroker | 把容器的名字设置为rmqbroker |
- --link rmqnamesrv:namesrv | 和rmqnamesrv容器通信 |
-p 10911:10911 | 把容器的非vip通道端口挂载到宿主机 |
-p 10909:10909 | 把容器的vip通道端口挂载到宿主机 |
-e “NAMESRV_ADDR=namesrv:9876” | 指定namesrv的地址为本机namesrv的ip地址:9876 |
-e “MAX_POSSIBLE_HEAP=200000000” | 指定broker服务的最大堆内存 |
rocketmqinc/rocketmq | 使用的镜像名称 |
sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf | 指定配置文件启动broker节点 |
3. 创建rockermq-console服务
docker pull pangliang/rocketmq-console-ng
构建rockermq-console容器
docker run -d \
--restart=always \
--name rmqadmin \
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=10.1.248.199:9876 \
-Dcom.rocketmq.sendMessageWithVIPChannel=false" \
-p 8099:8080 \
pangliang/rocketmq-console-ng
参数 | 说明 |
---|---|
-d | 以守护进程的方式启动 |
- -restart=always | docker重启时候镜像自动重启 |
- -name rmqadmin | 把容器的名字设置为rmqadmin |
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=10.1.248.199:9876 | 设置namesrv服务的ip地址 |
-Dcom.rocketmq.sendMessageWithVIPChannel=false" | 不使用vip通道发送消息 |
–p 8099:8080 | 把容器内的端口8080挂载到宿主机上的8099端口 |
需要关闭防火墙或者开放namesrv和broker端口
如果不设置,控制台服务将无法访问namesrv服务
异常信息如下
org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to failed
关闭防火墙
systemctl stop firewalld.service
开放指定端口
firewall-cmd --permanent --zone=public --add-port=9876/tcp
firewall-cmd --permanent --zone=public --add-port=10911/tcp
# 立即生效
firewall-cmd --reload
访问控制台
网页访问http://10.1.248.199:8099/查看控制台信息
4. 附录
1. RocketMQ集群部署结构
1) Name Server(域名服务)
Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
2) Broker(代理)
roker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的Broker Name,不同的Broker Id来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。
每个Broker与Name Server集群中的所有节点建立长连接,定时(每隔30s)注册Topic信息到所有Name Server。Name Server定时(每隔10s)扫描所有存活broker的连接,如果Name Server超过2分钟没有收到心跳,则Name Server断开与Broker的连接。
3) Producer
roducer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
Producer每隔30s(由ClientConfig的pollNameServerInterval)从Name server获取所有topic队列的最新情况,这意味着如果Broker不可用,Producer最多30s能够感知,在此期间内发往Broker的所有消息都会失败。
Producer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s中扫描所有存活的连接,如果Broker在2分钟内没有收到心跳数据,则关闭与Producer的连接。
4) Consumer
Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。
Consumer每隔30s从Name server获取topic的最新队列情况,这意味着Broker不可用时,Consumer最多最需要30s才能感知。
Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval决定)向所有关联的broker发送心跳,Broker每隔10s扫描所有存活的连接,若某个连接2分钟内没有发送心跳数据,则关闭连接;并向该Consumer Group的所有Consumer发出通知,Group内的Consumer重新分配队列,然后继续消费。
当Consumer得到master宕机通知后,转向slave消费,slave不能保证master的消息100%都同步过来了,因此会有少量的消息丢失。但是一旦master恢复,未同步过去的消息会被最终消费掉。
消费者对列是消费者连接之后(或者之前有连接过)才创建的。我们将原生的消费者标识由 {IP}@{消费者group}扩展为 {IP}@{消费者group}{topic}{tag},(例如xxx.xxx.xxx.xxx@mqtest_producer-group_2m2sTest_tag-zyk)。任何一个元素不同,都认为是不同的消费端,每个消费端会拥有一份自己消费对列(默认是broker对列数量*broker数量)。新挂载的消费者对列中拥有commitlog中的所有数据。
5)RocketMQ常用命令
需要切换到bin目录下,即:
[root@mq-master01 ~]# cd /data/rocketmq/bin
[root@mq-master01 bin]#
获取所有可用命令:
[root@mq-master01 bin]# sh mqadmin
查看帮助:
# sh mqadmin <command> -h
查询Producer的网络连接情况:
# sh mqadmin producerConnection -n localhost:9876 -g <producer-group> -t <producer-topic>
查询Consumer的网络连接情况:
# sh mqadmin consumerConnection -n localhost:9876 -g <consumer-group>
查询Consumer的消费状态:
# sh mqadmin consumerProgress -n localhost:9876 -g <consumer-group>
查询消息是否发送成功
获取指定Topic:
# sh mqadmin topicList -n localhost:9876 | grep <topicName>
查看Topic状态:
# sh mqadmin topicStatus -n localhost:9876 -t <topicName>
根据offset获取消息:
# sh sh mqadmin queryMsgByOffset -n localhost:9876 -b <broker-name> -i <queueId> -o <offset> -t <topicName>
根据offsetMsgId查询消息:
# sh sh mqadmin queryMsgById -n localhost:9876 -i <offsetMsgId>
查询消息是否被消费成功
查询消息详情:
# sh mqadmin queryMsgById -i {MsgId} -n {NameServerAddr}
查看Consumer Group订阅了哪些TOPIC:
# sh mqadmin consumerProgress -g <ConsumerGroup> -n <NameServerAddr>
查询TOPIC被哪些Consumer Group订阅了
没有查询特定TOPIC订阅情况,只能查询所有后再过滤:
# sh mqadmin statsAll -n <NameServerAddr> | grep <TOPIC>
返回结果:#Topic #Consumer Group #InTPS #OutTPS #InMsg24Hour #OutMsg24Hour
关闭nameserver和所有的broker:
# sh mqshutdown namesrv
# sh mqshutdown broker
查看所有消费组group:
# sh mqadmin consumerProgress -n 192.168.23.159:9876
查看指定消费组(kevinGroupConsumer)下的所有topic数据堆积情况:
# sh mqadmin consumerProgress -n 192.168.23.159:9876 -g kevinGroupConsumer
查看所有topic :
# sh mqadmin topicList -n 192.168.23.159:9876
查看topic信息列表详情统计
# sh mqadmin topicstatus -n 192.168.23.159:9876 -t myTopicTest1
新增topic
# sh mqadmin updateTopic –n 10.45.47.168 –c DefaultCluster –t ZTEExample
删除topic
# sh mqadmin deleteTopic –n 10.45.47.168:9876 –c DefaultCluster –t ZTEExample