RocketMQ介绍
Apache RocketMQ是一个分布式消息传递和流媒体平台,具有低延迟、高性能和可靠性、万亿级别的容量和灵活的可伸缩性。
GitHub:https://github.com/apache/rocketmq
官网:http://rocketmq.apache.org/
本文版本:rocketmq-4.2.0-SNAPSHOT
下载地址:http://mirror.bit.edu.cn/apache/rocketmq/4.2.0/rocketmq-all-4.2.0-bin-release.zip
它提供了各种特性:(翻译自GitHub)
发布/订阅消息传递模型
定期消息传递
按时间或偏移量进行消息回溯
日志中心流
大数据集成
在同一队列中可靠的FIFO和严格的有序消息传递
有效的拉伸消费模式
在一个队列中有百万级的消息积累容量
多种消息传递协议,如JMS和OpenMessaging
灵活的分布式扩展部署体系结构
快速批量消息交换系统。
各种消息过滤机制,如SQL和标记
用于隔离测试和云隔离集群的Docker映像。
用于配置、度量和监视的功能丰富的管理仪表板
选择的理由
强调集群无单点,可扩展,任意一点高可用,水平可扩展
方便集群配置,而且容易扩展(横向和纵向),通过slave的方式每一点都可以实现高可用
支持上万个队列,顺序消息
顺序消费是实现在同一队列的,如果高并发的情况就需要队列的支持,rocketmq可以满足上万个队列同时存在
任性定制你的消息过滤
rocketmq提供了两种类型的消息过滤,也可以说三种可以通过topic进行消息过滤、可以通过tag进行消息过滤、还可以通过filter的方式任意定制过滤
消息的可靠性(无Buffer,持久化,容错,回溯消费)
消息无buffer就不用担心buffer回满的情况,rocketmq的所有消息都是持久化的,生产者本身可以进行错误重试,发布者也会按照时间阶梯的方式进行消息重发,消息回溯说的是可以按照指定的时间进行消息的重新消费,既可以向前也可以向后(前提条件是要注意消息的擦除时间)
海量消息堆积能力,消息堆积后,写入低延迟
针对于provider需要配合部署方式,对于consumer,如果是集群方式一旦master返现消息堆积会向consumer下发一个重定向指令,此时consumer就可以从slave进行数据消费了
分布式事务
我个人感觉rocketmq对这一块说的不是很清晰,而且官方也说现在这块存在缺陷(会令系统pagecache过多),所以线上建议还是少用为好
消息失败重试机制
针对provider的重试,当消息发送到选定的broker时如果出现失败会自动选择其他的broker进行重发,默认重试三次,当然重试次数要在消息发送的超时时间范围内。
针对consumer的重试,如果消息因为各种原因没有消费成功,会自动加入到重试队列,一般情况如果是因为网络等问题连续重试也是照样失败,所以rocketmq也是采用阶梯重试的方式。
定时消费
除了上面的配置,在发送消息是也可以针对message设置setDelayTimeLevel
活跃的开源社区
现在rocketmq成为了apache的一款开源产品,活跃度也是不容怀疑的
成熟度(经过双十一考验)
针对本身的成熟度,我们看看这么多年的双十一就可想而知了
相关概念
NameServer
这里我们可以理解成类似于zk的一个注册中心,而且rocketmq最初也是基于zk作为注册中心的,现在相当于为rocketmq自定义了一个注册中心,代码不超过1000行。RocketMQ 有多种配置方式可以令客户端找到 Name Server, 然后通过 Name Server 再找到 Broker,分别如下,优先级由高到低,高优先级会覆盖低优先级。客户端提供http和ip:端口号的两种方式,推荐使用http的方式可以实现nameserver的热部署。
Push Consumer
Consumer 的一种,应用通常通过 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立刻回调 Listener 接口方法,类似于activemq的方式
Pull Consume
Consumer 的一种,应用通常主动调用 Consumer 的拉消息方法从 Broker 拉消息,主动权由应用控制
Producer Group
一类producer的集合名称,这类producer通常发送一类消息,且发送逻辑一致
Consumer Group
同上,consumer的集合名称
Broker
消息中转的角色,负责存储消息(实际的存储是调用的store组件完成的),转发消息,一般也成为server,同jms中的provider
Message Filter
可以实现高级的自定义的消息过滤
Master/Slave
集群的主从关系,broker的name相同,brokerid=0的为主master,大于0的为从slave,可以一主多从,但一从只能有一主
RocketMQ角色介绍
RocketMQ由四部分构成:Producer、Consumer、Broker和NameServer
启动顺序:NameServer->Broker
为了消除单点故障,增加可靠性或增大吞吐量,可以在多台机器上部署多个nameserver和broker,并且为每个broker部署1个或多个slave,rocketmq架构图如图1.1所示。
Topic & message queue:一个分布式消息队列中间件部署好以后,可以给很多个业务提供服务,同一个业务也有不同类型的消息要投递,这些不同类型的消息以不同的 Topic 名称来区分。所以发送和接收消息前,先创建topic,针对某个 Topic 发送和接收消息。有了 Topic 以后,还需要解决性能问题 。 如果一个Topic 要发送和接收的数据量非常大, 需要能支持增加并行处理的机器来提高处理速度,这时候一个 Topic 可以根据需求设置一个或多个 Message Queue, Message Queue 类似分区或 Partition 。Topic有了多个 Message Queue 后,消息可以并行地向各个Message Queue 发送,消费者也可以并行地从多个 Message Queue 读取消息并消费 。
集群部署方式
单Master模式
只有一个 Master节点
优点:配置简单,方便部署
缺点:这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用,不建议线上环境使用
多Master模式
一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master
优点:配置简单,单个Master 宕机或重启维护对应用无影响,在磁盘配置为RAID10 时,即使机器宕机不可恢复情况下,由与 RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)。性能最高。多 Master 多 Slave 模式,异步复制
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到受到影响
多Master多Slave模式(异步复制)---本文稍后以这种方式部署集群为例
每个 Master 配置一个 Slave,有多对Master-Slave, HA,采用异步复制方式,主备有短暂消息延迟,毫秒级。
优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为Master 宕机后,消费者仍然可以从 Slave消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。
缺点: Master 宕机,磁盘损坏情况,会丢失少量消息。
多Master多Slave模式(同步双写)---文中会说明补充此集群配置,线上使用的话,推荐使用此模式集群
每个 Master 配置一个 Slave,有多对Master-Slave, HA采用同步双写方式,主备都写成功,向应用返回成功。
优点:数据与服务都无单点, Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高
缺点:性能比异步复制模式略低,大约低 10%左右,发送单个消息的 RT会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能
Rocketmq双主从异步复制集群部署(文中包含同步双写集群的配置说明)
一、预装环境:
rhel 6.9(其实redhat & centos 6~7都是通用的)
jdk-1.8
git,maven(非必须)
二、集群结构:
三、下载解压
下载地址:http://mirror.bit.edu.cn/apache/rocketmq/4.2.0/rocketmq-all-4.2.0-bin-release.zip
mkdir -p /opt/apps_install/rocketmq-4.2.0
cd /opt/apps_install/rocketmq-4.2.0
unzip rocketmq-all-4.2.0-bin-release.zip
ln -s /opt/apps_install/rocketmq-4.2.0 /opt/apps/rocketmq
四、创建存储路径
10.16.13.90(rocketmq-nameserver-1):
mkdir -p /data/rocketmq/store/{rootdir-a-m,commitlog-a-m,rootdir-b-s,commitlog-b-s}
10.16.13.91(rocketmq-nameserver-2):
mkdir -p /data/rocketmq/store/{rootdir-b-m,commitlog-b-m,rootdir-a-s,commitlog-a-s}
五、配置hosts和环境变量
两台服务器修改/etc/hosts,加入下面两行
10.16.13.90 rocketmq-nameserver-1
10.16.13.91 rocketmq-nameserver-2
修改/etc/profile,加入
export ROCKETMQ_HOME=/opt/apps/rocketmq
export PATH=$ROCKETMQ_HOME/bin:$PATH
source /etc/profile
六、修改配置文件
国际惯例,修改之前先备份~以防意外
cd /opt/apps/rocketmq;cp -r conf/ conf.default/
因为本文采用双主双从异步复制,默认的rocketmq已经为我们配置了相应配置目录
cd /opt/apps/rocketmq/conf/2m-2s-async
10.16.13.90 rocketmq-nameserver-1 角色:broker-a-master & broker-b-slave
vim broker-a.properties
#所属集群名字
brokerClusterName=sns-rocket-mq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=rocketmq-nameserver-1
#brokerId 0 表示 Master,>0 表示 Slave
brokerId=0
# Broker 对外服务的监听端口
listenPort=10911
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver-1:9876;rocketmq-nameserver-2:9876
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=72
#Broker role有3种:SYNC MASTER、ASYNC MASTER、SLAVE。关键词SYNC和ASYNC表示Master和Slave之间同步消息的机制,SYNC即同步更新,指当Slave和Master消息同步完成后,再返回发送成功的状态。ASYNC即异步更新,master与slave有短暂消息延迟,毫秒级。本文在此使用了异步复制集群模式,线上环境推荐使用同步双写模式,即SYNC_MASTER
brokerRole=ASYNC_MASTER
# 刷盘方式 ASYNC_FLUSH 异步刷盘
flushDiskType=ASYNC_FLUSH
#存储路径
storePathRootDir=/data/rocketmq/store/rootdir-a-m
storePathCommitLog=/data/rocketmq/store/commitlog-a-m
# 是否允许 Broker 自动创建Topic
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组
autoCreateSubscriptionGroup=true
vim broker-b-s.properties
brokerClusterName=sns-rocket-mq-cluster
brokerName=rocketmq-nameserver-2
listenPort=10921
namesrvAddr=rocketmq-nameserver-1:9876;rocketmq-nameserver-2:9876
brokerId=1
deleteWhen=04
fileReservedTime=72
brokerRole=SLAVE
storePathRootDir=/data/rocketmq/store/rootdir-b-s
storePathCommitLog=/data/rocketmq/store/commitlog-b-s
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
flushDiskType=ASYNC_FLUSH
10.16.13.91 rocketmq-nameserver-2 角色:broker-b-master & broker-a-slave
vim broker-b.properties
brokerClusterName=sns-rocket-mq-cluster
brokerName=rocketmq-nameserver-2
brokerIP1=10.16.13.91
brokerId=0
listenPort=10911
namesrvAddr=rocketmq-nameserver-1:9876;rocketmq-nameserver-2:9876
deleteWhen=04
fileReservedTime=72
## Broker role有3种:SYNC MASTER、ASYNC MASTER、SLAVE。关键词SYNC和ASYNC表示Master和Slave之间同步消息的机制,SYNC即同步更新,指当Slave和Master消息同步完成后,再返回发送成功的状态。ASYNC即异步更新,master与slave有短暂消息延迟,毫秒级。本文在此使用了异步复制集群模式ASYNC_MASTER,线上环境推荐使用同步双写模式,即SYNC_MASTER。
brokerRole=ASYNC_MASTER
storePathRootDir=/data/rocketmq/store/rootdir-b-m
storePathCommitLog=/data/rocketmq/store/commitlog-b-m
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
flushDiskType=ASYNC_FLUSH
vim broker-a-s.properties
brokerClusterName=sns-rocket-mq-cluster
brokerName=rocketmq-nameserver-1
listenPort=10921
namesrvAddr=rocketmq-nameserver-1:9876;rocketmq-nameserver-2:9876
brokerId=1
deleteWhen=04
fileReservedTime=72
brokerRole=SLAVE
storePathRootDir=/data/rocketmq/store/rootdir-a-s
storePathCommitLog=/data/rocketmq/store/commitlog-a-s
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
flushDiskType=ASYNC_FLUSH
修改日志配置文件(两台一样)
mkdir -p /opt/apps/rocketmq/logs
cd /opt/apps/rocketmq/conf
sed -i 's#${user.home}#/opt/apps/rocketmq#g' *.xml
七、修改启动脚本参数
继续国际惯例,修改之前先备份,两台服务器相同操作
cd /opt/apps/rocketmq/bin
cp runbroker.sh runbroker.sh.default
调一下JVM,包括nameserver 和 broker。限于自己机器的配置,参数调小一下。但Rocketmq最少的堆是1g,否则无法启动。两台机器执行相同的操作。
JAVA_OPT="${JAVA_OPT} -server -Xms16g -Xmx16g -Xmn8g -XX:MetaspaceSize=512m -XX:MaxMetaspaceSize=1g"
八、服务启动
要先启动namerserver,再启broker,两台机器执行相同的操作。
这里为了未来方便运维,最好是写好相应的服务启动脚本,养成一个好习惯比拥有一个好记性更友爱不是么~
① 启动nameserver,两台机器操作相同
mkdir -p /opt/scripts/rocketmq
cd /opt/scripts/rocketmq
vim start_rocketmq_nameserver.sh
#!/bin/bash
source /etc/profile
nohup sh /opt/apps/rocketmq/bin/mqnamesrv > /data/rocketmq/store/mqnamesrv.log 2>&1 &
② 启动broker。当然两台机器不一样,这里一个一个来
10.16.13.90 rocketmq-nameserver-1
vim start_broker_a_master.sh
#!/bin/bash
nohup sh /opt/apps/rocketmq/bin/mqbroker -c /opt/apps/rocketmq/conf/2m-2s-async/broker-a.properties > /data/rocketmq/store/broker-a-m.log 2>&1 &
vim start_broker_b_slave.sh
#!/bin/bash
nohup sh /opt/apps/rocketmq/bin/mqbroker -c /opt/apps/rocketmq/conf/2m-2s-async/broker-b-s.properties > /data/rocketmq/store/broker-b-s.log 2>&1 &
10.16.13.91 rocketmq-nameserver-2
vim start_broker_b_master.sh
#!/bin/bash
nohup sh /opt/apps/rocketmq/bin/mqbroker -c /opt/apps/rocketmq/conf/2m-2s-async/broker-b.properties > /data/rocketmq/store/broker-b-m.log 2>&1 &
vim start_broker_a_slave.sh
#!/bin/bash
nohup sh /opt/apps/rocketmq/bin/mqbroker -c /opt/apps/rocketmq/conf/2m-2s-async/broker-a-s.properties > /data/rocketmq/store/broker-a-s.log 2>&1 &
这里再啰嗦一遍,先启动nameserver,再启动broker,两台机器的启动顺序:启动nameserver---启动broker master---启动broker slave,done
都启动完毕之后可以jps看一下
[@bx_13_90 /opt/scripts/rocketmq]# jps
60161 BrokerStartup
147505 Jps
59300 NamesrvStartup
60760 rocketmq-console-ng-1.0.0.jar
124248 BrokerStartup
查看服务启动后的机器状态:
[@bx_13_90 /opt/scripts/rocketmq]# mqadmin clusterList --namesrvAddr=10.16.13.90:9876
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0
#Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE
sns-rocket-mq-cluster rocketmq-nameserver-1 0 10.16.13.90:10911 V4_2_0_SNAPSHOT 0.00(0,0ms) 0.00(0,0ms) 0 425360.18 0.0000
sns-rocket-mq-cluster rocketmq-nameserver-1 1 10.16.13.91:10921 V4_2_0_SNAPSHOT 0.00(0,0ms) 0.00(0,0ms) 0 425360.18 0.0000
sns-rocket-mq-cluster rocketmq-nameserver-2 0 10.16.13.91:10911 V4_2_0_SNAPSHOT 0.00(0,0ms) 0.00(0,0ms) 0 425360.18 0.0000
sns-rocket-mq-cluster rocketmq-nameserver-2 1 10.16.13.90:10921 V4_2_0_SNAPSHOT 0.00(0,0ms) 0.00(0,0ms) 0 425360.18 0.0000
九、服务关闭
关闭nameserver:
/opt/apps/rocketmq/bin/mqshutdown namesrv
关闭broker:
/opt/apps/rocketmq/bin/mqshutdown broker
十、部署rocketmq-console
部分开了天眼的童鞋可能看到上面jps时,有一个rocketmq-console-ng-1.0.0.jar,这里就具体说一下rocketmq的管控台,可以随时观测和修改rocketmq的状态和一些配置
这个管控台在GitHub上也有项目,GitHub地址:https://github.com/apache/rocketmq-externals,再次感谢国际开源组织gay站的无私贡献,让众基可以做伸手党,而且现在管控台还能显示简体中文了~具体的介绍GitHub上有说,这里限于篇幅就不多啰嗦了
步入正题,如何部署rocketmq管控台
这里还是福利一波吧,不用看GitHub如何编译部署blablabla,笔者直接送上编译好的jar包,绿色无毒请大家安心下载,如果链接有问题不能下载,可以发我邮件向我要(fantasymango@163.com)
链接: https://pan.baidu.com/s/1Y4fzVc2r30jtIVpvYZocWA 密码: vj2y
mkdir -p /opt/apps_install/rocketmq-console/
把下载好的jar包放在这里
启动脚本start.sh
#!/bin/bash
nohup java -jar ./rocketmq-console-ng-1.0.0.jar --server.port=8080 --rocketmq.config.namesrvAddr=10.16.13.90:9876;10.16.13.91:9876 &
访问ip:8080即可
Tips:
Broker 重启对客户端的影响
Broker 重启可能会导致正在发往这台机器的的消息发送失败,RocketMQ提供了一种优雅关闭Broker的方法,通过执行以下命令会清除Broker的写权限,过40s后,所有客户端都会更新Broker路由信息,此时再关闭Broker就不会发生发送消息失败的情况,因为所有消息都发往了其他 Broker。
# sh mqadmin wipeWritePerm -b brokerName -n namesrvAddr
Master 与Slave的关系
RocketMQ的开源版本,Master宕机,Slave不能切换为Master,这里的Slave不可写,但可读,类似于 Mysql 主备方式。