1.MQ简介
1.1.项目工程弊端
image.png
image.png
image.png
image.png
1.2.消息队列
image.png
image.png
1.3.MQ的作用
image.png
image.png
image.png
image.png
image.png
1.4.MQ的优缺点
-
系统可用性降低,两台服务器靠MQ通信,MQ挂了,两台服务器都挂了。可以通过集群解决image.png
- 系统复杂度提高
-
异步消息机制带来的缺点。消息顺序性:期望按照顺序执行,期望MQ处理消息按照一定顺序处理。MQ里存放的顺序未必就是你要的顺序。消息丢失:消息发给MQ,MQ宕机。消息一致性:发两个消息,内容是一样,让AB都去执行,A是ok,B挂了,订单生成了,运单没生成。同步是请求方知道消息执行的结果,异步可不知道执行结果image.png
1.5.MQ产品介绍
image.png
2.环境搭建
2.1.基础概念
image.png
- 经济人(消息服务器)和消费者之间交互有两种,一是消费者通过拉取获得消息(不常用)。二是有一个监听器,一直在监听经纪人,经纪人有消息主动推送给消费者
- 全部都是集群
- 经纪人不仅要接收消息发送消息,经纪人(消息服务器)还要做消息持久化,存起来不会丢,过滤消息,不是所有的消息过来都是可用的
- 生产者、消息服务器、消费者三者如何互相找到对方?经纪人把地址注册到命名服务器中,生产者发送消息的时候连接命名服务器,获取经纪人的ip,消费者接受消息的时候拉取经纪人ip.
- 命名服务器是如何感知生产者、消费者、经纪人事实存在的?心跳机制。每一个服务器每30秒向命名服务器发送请求,如果有一台机器挂了,心跳指令停了,命名服务器如果在一定时间内没有接收到信息,就认为这台服务器挂了就下线了。
- 主题是对消息的分类,标题是进一步对消息分类,一个消息可以有多个tag
-
通过不同的topic创建队列image.png
2.2.搭建
image.png
- 需要jdk版本高于1.8
- 使用一个上传文件的小工具将jdk和mq上传到linux
- yum -y install lrzsz
- rz(弹出对话框 选择需要上传的工具 勾选发送文件到ascii)上传jdk
- 安装jdk cp jdk-8u171-linux-x64.tar.gz /opt
- tar -zxvf jdk-8u171-linux-x64.tar.gz
- vi /etc/profile 配置环境变量
# set java environment
JAVA_HOME=/opt/jdk1.8.0_171
CLASSPATH=.:$JAVA_HOME/lib.tools.jar
PATH=$JAVA_HOME/bin:$PATH
export JAVA_HOME CLASSPATH PATH
然后source /etc/profile
- 安装 mq sftp上传 put F:\study\resources\rocketMQ\rocketmq-all-4.5.2-bin-release.zip
- unzip: command not found解决yum install -y unzip zip
- unzip rocketmq-all-4.5.2-bin-release.zip
- mv rocketmq-all-4.5.2-bin-release rocketmq
- cp rocketmq /opt
- benchmark里面包含的是一些写好的测试命令
- cd /opt/rocketmq/bin
- sh mqnamesrv 报错修改jvm内存 vim runserver.sh
- JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
- sh mqbroker 直接启动时报错 因为他设置需要的内存太大 修改vim runbroker.sh
- JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
- sh mqbroker -n localhost:9876 配置命名服务器地址,注册到哪个命名服务器(启动的时候一定要带 -n参数)
- 命名服务器和消息服务器都启动成功,跑测试程序,验证往里面放消息和往外面拿消息是否ok
- export NAMESRV_ADDR=localhost:9876 告诉计算机连哪个nameserver
- sh tools.sh org.apache.rocketmq.example.quickstart.Producer调用的是benchmark里面包含的一些测试命令 在发消息到消息服务器
- sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
- 修改内存参照https://blog.csdn.net/jiangyu1013/article/details/81486374
-
小结图片.png图片.png
3.消息发送(重点)
image.png
3.1.消息发送与接收开发流程
image.png
3.2.生产者一对一
- 阿里云9876 10911开通端口
- 修改配置文件vim broker.conf (namesrvAddr = 公网ip:9876 brokerIP1 = 公网ip)
- 启动sh mqbroker -n 公网ip:9876 -c ../conf/broker.conf
- java代码用公网ip
-
虚拟机 java代码写linux的ip,关闭防火墙
图片.pngimage.pngimage.png
3.3.消费者一对一
image.png
接收消息的代码启动就会一直运行,一直在监听
3.3.单生产者对多消费者
- 生产者生产的消息要够多,消费者要至少开2-3个
- 先启动两个消费者 再启动一个生产者
-
生产者里面写循环,发送多个消息image.png
-
消费者idea设置一下,启动多个,先启动消费者image.png
-
生产者发的10个消息,默认是被平均分给消费者的(每个消费者接受部分数据),内部有个负载均衡,在消费者这边做的设定image.png
- 广播模式下,每一个消费者都接收完整的数据
3.4.多生产者对多消费者
- 多个生产者提供的多个消息可以被任意一个消费者消费
-
生产者启动两次,第一次发送的消息前缀是生产者1,第二次是生产者2,消费者启动,可以把生产者启动两次发送的消息都接收到image.pngimage.png
- 先启动两个消费者,在分别启动生产者1和生产者2,默认模式,两个消费者先平均消费生产者1,再消费生产者2
3.5.消息类别
-
同步消息
image.png -
异步消息
image.png -
单向消息
image.png -
同步消息,之前写的demo都是同步消息image.png
-
异步消息image.png
-
单向消息image.png
3.6.延时消息
image.png
image.png
- 从1s开始 下标是0 代码参数传的是下标 传3表示30秒
3.7.批量消息
image.png
image.png
图片.png
注意:属性是后面消息过滤需要用的
3.8.消息过滤
- 主题topic过滤,前面一直在用
-
分类Tag过滤image.pngimage.pngimage.png
-
sql过滤/属性过滤/语法过滤image.pngimage.pngimage.png
3.9.错乱的消息顺序
- 某个topic内部有若干个消息队列
- 消息是进入消息服务器中某个topic中的某个消息队列中
-
3个用户订单进度不同image.png
业务是有一定的顺序,这样进队列是有问题的。前置工作没做完,后置工作已经启动了
-
希望按照固定的顺序进入消息服务器,出来也是同样的顺序image.png
- 消息可以选择进入指定的队列
-
错误现象演示,发的消息在三个队列中,放进去是1-10,取得时候乱序
image.pngimage.png
3.10.错乱的消息顺序--生产者解决,把同一个id的不同消息放进同一个队列
image.png
image.png
image.png
3.11.错乱的消息顺序--消费者同一个id的不同消息用一个线程接收
image.png
image.png
image.png
- consumer.registerMessageListener(new MessageListenerOrderly() {}
3.12.事务消息
image.png
image.png
image.png
image.png
image.png
image.png
图片.png
4.集群搭建
4.1.集群介绍
image.png
- 多个broker提供服务,每个broker消息不同步
- 多个master多个slave。为每个主机挂上对应的从机。读写都是对master进行操作,从机仅仅是在主机读消息负载比较大的时候才负责读消息。
- 命名服务器也可以搭建成集群
- brokerId=0代表主 1代表从
- brokerName设定相同的brokerName表示一组
-
slave也需要连每一个命名服务器image.pngimage.png
- 每个broker对应一个topic
4.2.双主双从集群搭建-基础环境设置
image.png
ip3的从机是ip5,ip4的从机是ip6
- 两台机器需要安装jdk rocketmq进行配置,确保能正常启动
- 两台机器分别进行网络配置(阿里云服务器不用做)
cd /etc
vim hosts
#nameserver
172.28.16.191 rocketmq-nameserver1
172.28.16.192 rocketmq-nameserver2
#broker
172.28.16.191 rocketmq-master1
172.28.16.191 rocketmq-slave2
172.28.16.192 rocketmq-master2
172.28.16.192 rocketmq-slave1
systemctl restart network
- 查看两台机器的防火墙
firewall-cmd --state
systemctl stop firewalld.service
- rocketmq的运行环境添加到环境变量中(阿里云服务器也可以不用做)
#rocketmq environment
ROCKETMQ_HOME=/opt/rocketmq
PATH=$PATH:$ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PATH
source /etc/profile
4.3.双主双从集群搭建-主消息服务器配置
- 创建目录 两台机子rocketmq目录下创建
mkdir store
cd store
mkdir commitlog
mkdir consumequeue
mkdir index
cd ..
ll
- 修改配置文件
cd conf
cd 2m-2s-sync/
vim broker-a.properties
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/opt/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/opt/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/opt/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/opt/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/opt/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/opt/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
4.3.双主双从集群搭建-从消息服务器配置
vim broker-b-s.properties
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/opt/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/opt/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/opt/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/opt/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/opt/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/opt/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#发消息线程池数量
sendMessageThreadPoolNums=128
#拉消息线程池数量
pullMessageThreadPoolNums=128
4.4.双主双从集群搭建-小结
- 机器A配置broker-a.properties[brokerName=broker-a brokerId=0 listenPort=10911]
- 机器A配置broker-b-s.properties[brokerName=broker-b brokerId=1 listenPort=11011]
- 机器B配置broker-b.properties[brokerName=broker-b brokerId=0 listenPort=10911]
- 机器B配置broker-a-s.properties[brokerName=broker-a brokerId=1 listenPort=11011]
4.5.启动
两台机子分别后台启动
nohup sh mqnamesrv &
jps
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a.properties &
jps
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b-s.properties &
jps
nohup sh mqnamesrv &
jps
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b.properties &
jps
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a-s.properties &
jps
broker-b-s.properties、broker-a-s.properties无法启动,cat nohup.out查看日志发现配置的文件目录被锁定。因为之前主从设置的所有文件目录都是一样的。解决办法(两台机子对应都要修改):再新建一套和store类似的目录,b-s配置文件目录重新配置
mkdir store-slave
cd store-slave/
mkdir commitlog
mkdir consumequeue
mkdir index
#存储路径
storePathRootDir=/opt/rocketmq/store-slave
#commitLog 存储路径
storePathCommitLog=/opt/rocketmq/store-slave/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/opt/rocketmq/store-slave/consumequeue
#消息索引存储路径
storePathIndex=/opt/rocketmq/store-slave/index
#checkpoint 文件存储路径
storeCheckpoint=/opt/rocketmq/store-slave/checkpoint
#abort 文件存储路径
abortFile=/opt/rocketmq/store-slave/abort
4.6.测试
生产者连一台机器,消费者连一台机器图片.png
4.7.小结
图片.png
图片.png
图片.png
图片.png
图片.png
图片.png
图片.png
4.8.两台阿里云服务器搭建
第一台机器 116.62.133.122
#端口规划:
9876 NameServer1
10911 BrokerA-master
11011 BrokerB-slave
第二台机器 120.55.53.39
#端口规划:
9876 NameServer2
10911 BrokerB-master
11011 BrokerA-slave
配置文件
namesrvAddr=116.62.133.122:9876;120.55.53.39:9876
第一台机器两个配置文件加这句
brokerIP1=116.62.133.122
第二台机器两个配置文件加这句
brokerIP1=120.55.53.39
启动
nohup sh mqnamesrv &
jps
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a.properties &
jps
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b-s.properties &
jps
nohup sh mqnamesrv &
jps
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b.properties &
jps
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a-s.properties &
jps
关闭 要先停止broker,其次停止nameserver。
sh mqshutdown namesrv
sh mqshutdown broker
参考:https://zhuanlan.zhihu.com/p/355530526
4.9.集群监控平台
图片.png
-
一个maven项目,导入到idea,刷新maven图片.png图片.png
-
代码图片.png
-
页面图片.png
-
阿里云建议端口号全部开启图片.png
5.高级特性(重点)
5.1.消息的存储
-
基础结构图片.png
- ACK相当于是确认字符,相当于返回给你一个OK
- 如果broker宕机了,消息丢失。如何保证消息不丢失?存起来
-
升级版图片.png
-
最终版:数据库存最终是在硬盘上存,以磁盘文件的形式进行存储,离不开文件系统。能否简单一点,消息服务器直接和文件系统打交道,跳过了数据库。图片.png
5.2.高效存储
- 默认使用固态硬盘
-
硬盘内部数据读写:理想状态下,数据存硬盘占用连续的空间图片.png图片.png图片.png
- 一条信息被迫分成三部分存储,先找到A,读完告诉你B在哪。。。这种方式叫随机写
-
我们更愿意看到的方式是按照顺序连在一起存储,速度最快图片.png
- 磁盘碎片存在的情况下,想实现顺序写几乎是不可能的。因为顺序写是在现用内容的尾部追加内容。rocketmq内部采用的就是顺序写
-
先开辟空间,先占用,写数据的时候,直接在这个空间进行写数据图片.png
在大的硬盘存储的结构基础之上模拟了一块顺序写。
- rocketmq在数据传输继续改良
- linux系统发送数据的时候,硬盘数据先复制成内核态,内核态的数据不区分应用的,所有的app的数据在使用之前都是内核态,在使用的时候再转换成对应的应用程序的那种状态,叫用户态。
-
用户态数据,才开始传输,复制到网络驱动内核,复制到网卡,传输到对面的机器,成为内存数据图片.png
-
用户态可以跳过图片.png
- 所说的四次复制指本机的四次,不包含网卡到内存数据
-
虚拟机可能刚开始启动的时候只有4-5g,但是mq一启动,就奔着20个g去了。为了达成存储传输性能提高,用到一些顺序写、零拷贝技术,这都需要占用磁盘空间。虚拟机至少预留50-100g图片.png图片.png
5.3.消息存储结构
图片.png
-
消费逻辑队列,记录每个队列的使用情况。有多少个消息队列,就有多少个消费逻辑队列。二者一一绑定。消息在消息队列中的位置
图片.png - 索引区就是快速查找队列,每一个消息队列都要创建自己的索引。
- commitlog 主消息存储(占用空间大) consumequeue 用户消费队列(占用空间小) index(占用空间小)
5.4.刷盘机制
- 消息存储最终存到磁盘上,也就是文件系统,这个过程叫刷盘
-
刷盘机制一:同步刷盘图片.png
-
刷盘机制二:异步刷盘 步骤1 3 7正常执行,执行到一定量的消息再进行4 5操作图片.png
-
二者区别图片.png
5.5.高可用
图片.png
- 命名服务器可以起若干台,每个服务器是无状态的,相互之间不进行通信,每一个记录对外的服务是通过别人上报实现的。某一个挂了,不会影响服务。因为注册的时候,broker会注册到每一个命名服务器上,所有命名服务器都知道你的存在。
- master既负责生产又负责消费,只有生产压力大的时候,会自动切换slave来负责消费
- master又承担读,又承担写,过一会只写,不读,slave能否获得master的数据呢
-
读写如何区分,生产者写数据到消息服务器,消息服务器读数据到消费者
图片.png
5.6.负载均衡
-
生产者集群开始工作,同一个组并且同一个topic,会把第一个消息发给第一个队列,第二个消息会发给第二个队列,平均分。接着往下面的broker平均分图片.png
-
大家都接收了一遍,第二轮开始再往第一个broker的第一个队列发图片.png
-
消费者负载均衡:平均分配,先查询有多少个队列,再查询有多少个集群图片.png
问题:brokerA宕机,消费者一台机器不工作
-
消费者负载均衡:循环平均分配图片.png
可以避免某一个broker宕机,消费者的某台机子不至于没事干
- 广播模式不参与负载均衡的讨论
5.7.消息重试
图片.png
- 顺序消息重试
- 正常情况:消息发送给消费者,消费者收到给回执。消息队列就知道这个消息消费完了
-
异常情况:消息发送给消费者,消费者收到给回执。消息队列没有收到回执,消息队列接着发图片.png
-
该机制好处是可以保证消息一定被消费,弊端是首消息没有被消费的时候,后面的消息是永远不可能被消费的图片.png
-
无序消息重试图片.png
第一次消息重试时间10s,第二次是30s。默认重试16次,如果这个时间段内都没有发送成功的话,可以设定不再重试
-
死信队列图片.png图片.png图片.png
- 运行的时候要监控有没有阻塞,有没有死信队列。有死信队列,通过手工的方式去维护,让他消费掉
5.8.消息重复消费
图片.png
- broker闪断了一下网络,3步骤失败,生产者不知道broker接到了消息,生产者只能再发一次消息
-
生产者宕机。3执行了,生产者收不到,宕机恢复以后继续发送消息
图片.png图片.png
6.补充说明
6.1.Topic
- Topic是生产者在发送消息和消费者在拉取消息的类别。Topic与生产者和消费者之间的关系非常松散。具体来说,一个Topic可能有0个,一个或多个生产者向它发送消息;相反,一个生产者可以发送不同类型Topic的消息。类似的,消费者组可以订阅一个或多个主题,只要该组的实例保持其订阅一致即可。
- Topic在Google翻译中解释为话题。我们可以理解为第一级消息类型,类比于书的标题。在应用系统中,一个Topic标识为一类消息类型,比如交易信息。
6.2.Tag
- 标签,换句话的意思就是子主题,为用户提供了额外的灵活性。有了标签,来自同一业务模块的具有不同目的的消息可以具有相同的主题和不同的标记。标签有助于保持代码的清晰和连贯,同时标签也方便RocketMQ提供的查询功能。
- 对于Topic的一种细分,另外就是便于查询
- Tag在Google翻译中解释为标签。我们可以理解为第二级消息类型,类比于书的目录,方便检索使用消息。在应用系统中,一个Tag标识为一类消息中的二级分类,比如交易信息下的交易创建、交易完成。
6.3.GroupName
- 和现实世界中一样,RocketMQ中也有组的概念。代表具有相同角色的生产者组合或消费者组合,称为生产者组或消费者组。
- 作用是在集群HA的情况下,一个生产者down之后,本地事务回滚后,可以继续联系该组下的另外一个生产者实例,不至于导致业务走不下去。在消费者组中,可以实现消息消费的负载均衡和消息容错目标。
- 另外,有了GroupName,在集群下,动态扩展容量很方便。只需要在新加的机器中,配置相同的GroupName。启动后,就立即能加入到所在的群组中,参与消息生产或消费。
6.4.topic和tag区别
图片.png
图片.png