什么是 MQ?
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。
MQ的好处
- 实现开发语言间的解耦,比如生产者是Java,消费者是.NET
- 实现高并发场景下的分布式线程池
- 将无需即时返回且耗时的操作进行异步处理,比如短信群发
有 Broker 的 MQ
这个流派通常有一台服务器作为 Broker,所有的消息都通过它中转。生产者把消息发送给它就结束自己的任务了,Broker 则把消息主动推送给消费者(或者消费者主动轮询)
无 Broker 的 MQ
无 Broker 的 MQ 的代表是 ZeroMQ。该作者非常睿智,他非常敏锐的意识到——MQ 是更高级的 Socket,它是解决通讯问题的。所以 ZeroMQ 被设计成了一个“库”而不是一个中间件,这种实现也可以达到——没有 Broker 的目的
节点之间通讯的消息都是发送到彼此的队列中,每个节点都既是生产者又是消费者。ZeroMQ 做的事情就是封装出一套类似于 Socket 的 API 可以完成发送数据,读取数据
ZeroMQ 其实就是一个跨语言的、重量级的 Actor 模型邮箱库。你可以把自己的程序想象成一个 Actor,ZeroMQ 就是提供邮箱功能的库;ZeroMQ 可以实现同一台机器的 RPC 通讯也可以实现不同机器的 TCP、UDP 通讯,如果你需要一个强大的、灵活、野蛮的通讯能力,别犹豫 ZeroMQ
附:Queue 和 Topic 的区别
- Queue:一个发布者发布消息,下面的接收者按队列顺序接收,比如发布了 10 个消息,两个接收者 A,B 那就是 A,B总共会收到 10 条消息,不重复。
- Topic:一个发布者发布消息,有两个接收者 A,B 来订阅,那么发布了 10 条消息,A,B各收到10 条消息。
类型 | Topic | Queue |
---|---|---|
概要 | Publish Subscribe Messaging 发布订阅消息 | Point-to-Point 点对点 |
有无状态 | Topic 数据默认不落地,是无状态的。 | Queue 数据默认会在 MQ 服务器上以文件形式保存,比如 ActiveMQ 一般保存在?$AMQ_HOME\data\kr-store\data ?下面。也可以配置成 DB 存储。 |
完整性保障 | 并不保证 Publisher 发布的每条数据,Subscriber 都能接受到。 | Queue 保证每条数据都能被 Receiver 接收。 |
消息是否会丢失 | 一般来说 Publisher 发布消息到某一个 Topic 时,只有正在监听该 Topic 地址的 Sub 能够接收到消息;如果没有 Sub 在监听,该 Topic 就丢失了。 | Sender 发送消息到目标 Queue,Receiver 可以异步接收这个 Queue 上的消息。Queue 上的消息如果暂时没有 Receiver 来取,也不会丢失。 |
消息发布接收策略 | 一对多的消息发布接收策略,监听同一个 Topic 地址的多个 Sub 都能收到 Publisher 发送的消息。Sub 接收完通知 MQ 服务器 | 一对一的消息发布接收策略,一个 Sender 发送的消息,只能有一个 Receiver 接收。Receiver 接收完后,通知 MQ 服务器已接收,MQ 服务器对 Queue 里的消息采取删除或其他操作。 |
RocketMQ 简介
概述
消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要具有以下优势:
- 削峰填谷:主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题
- 系统解耦:解决不同重要程度、不同能力级别系统之间依赖导致一死全死
- 提升性能:当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统
- 蓄流压测:线上有些链路不好压测,可以通过堆积一定量消息再放开来压测
RocketMQ
Apache Alibaba RocketMQ 是一个消息中间件。消息中间件中有两个角色:消息生产者和消息消费者。RocketMQ 里同样有这两个概念,消息生产者负责创建消息并发送到 RocketMQ 服务器,RocketMQ 服务器会将消息持久化到磁盘,消息消费者从 RocketMQ 服务器拉取消息并提交给应用消费。
RocketMQ 特点
RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点:
- 支持严格的消息顺序
- 支持 Topic 与 Queue 两种模式
- 亿级消息堆积能力
- 比较友好的分布式特性
- 同时支持 Push 与 Pull 方式消费消息
- 历经多次天猫双十一海量消息考验
RocketMQ 优势
目前主流的 MQ 主要是 RocketMQ、kafka、RabbitMQ,其主要优势有:
- 支持事务型消息(消息发送和 DB 操作保持两方的最终一致性,RabbitMQ 和 Kafka 不支持)
- 支持结合 RocketMQ 的多个系统之间数据最终一致性(多方事务,二方事务是前提)
- 支持 18 个级别的延迟消息(RabbitMQ 和 Kafka 不支持)
- 支持指定次数和时间间隔的失败消息重发(Kafka 不支持,RabbitMQ 需要手动确认)
- 支持 Consumer 端 Tag 过滤,减少不必要的网络传输(RabbitMQ 和 Kafka 不支持)
- 支持重复消费(RabbitMQ 不支持,Kafka 支持)
消息队列对比参照表
基于 Docker 安装 RocketMQ
docker-compose.yml
** 注意:启动 RocketMQ Server + Broker + Console 至少需要 2G 内存 **
version: '3.3'
services:
rmqnamesrv:
image: foxiswho/rocketmq:server
container_name: rmqnamesrv
ports:
- 9876:9876
volumes:
- ./data/logs:/opt/logs
- ./data/store:/opt/store
networks:
rmq:
aliases:
- rmqnamesrv
rmqbroker:
image: foxiswho/rocketmq:broker
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
volumes:
- ./data/logs:/opt/logs
- ./data/store:/opt/store
- ./data/brokerconf/broker.conf:/etc/rocketmq/broker.conf
environment:
NAMESRV_ADDR: "rmqnamesrv:9876"
JAVA_OPTS: " -Duser.home=/opt"
JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
command: mqbroker -c /etc/rocketmq/broker.conf
depends_on:
- rmqnamesrv
networks:
rmq:
aliases:
- rmqbroker
rmqconsole:
image: styletang/rocketmq-console-ng
container_name: rmqconsole
ports:
- 8080:8080
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
depends_on:
- rmqnamesrv
networks:
rmq:
aliases:
- rmqconsole
networks:
rmq:
external:
name: my_net
broker.conf
RocketMQ Broker 需要一个配置文件,按照上面的 Compose 配置,我们需要在./data/brokerconf/
目录下创建一个名为broker.conf
的配置文件,内容如下:
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 所属集群名字
brokerClusterName=DefaultCluster
# broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,
# 在 broker-b.properties 使用: broker-b
brokerName=broker-a
# 0 表示 Master,> 0 表示 Slave
brokerId=0
# nameServer地址,分号分割
# namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
# 解决方式1 加上一句 producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
# brokerIP1=192.168.0.253
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=10911
# 删除文件时间点,默认凌晨4点
deleteWhen=04
# 文件保留时间,默认48小时
fileReservedTime=120
# commitLog 每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
# storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
# commitLog 存储路径
# storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
# 消费队列存储
# storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
# 消息索引存储路径
# storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
# checkpoint 文件存储路径
# storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
# abort 文件存储路径
# abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
# 限制的消息大小
maxMessageSize=65536
# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000
# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=ASYNC_MASTER
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
# 发消息线程池数量
# sendMessageThreadPoolNums=128
# 拉消息线程池数量
# pullMessageThreadPoolNums=128
RocketMQ 控制台
访问 http://rmqIP:8080 登入控制台
RocketMQ的架构
Nameserver
Nameserver的开发旨在轻量级,多台Nameserver互相独立,彼此间互不通信,这样的设计,保证了单台Nameserver宕机,不影响Nameserver。nameserver不会有频繁的读写,所以性能开销非常小,稳定性很高。
- NameServer用来保存活跃的broker列表,包括Master和Slave。
- NameServer用来保存所有topic和该topic所有队列的列表。
- NameServer用来保存所有broker的Filter列表。