RocketMQ集群以及SpringBoot集成

1. 中间件下载

官网上直接下载,比较简单就不赘述!

2. 简介

2.1. RocketMQ是什么

RocketMQ

RocketMQ 是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点,具体特性如下:

  • Producer、Consumer、队列都可以分布式。
  • Producer 向队列轮流发送消息,队列集合称为Topic,Consumer 如果做广播消费,则一个consumer实例消费这个Topic对应的所有队列,如果做集群消费,则多个Consumer 实例平均消费这个topic 对应的队列集合。
  • 能够保证严格的消息顺序
  • 提供丰富的消息拉取模式
  • 高效的订阅者水平扩展能力
  • 实时的消息订阅机制
  • 亿级消息堆积能力
  • 较少的依赖

2.2. 概念

2.2.1. 消息模型(Message Model)

RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。

2.2.2. 消息生产者(Producer)

负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。

2.2.3. 消息消费者(Consumer)

负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

2.2.4. 主题(Topic)

表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。

2.2.5. 名字服务(Name Server)

名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。

2.2.6. 拉取式消费(Pull Consumer)

Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。

2.2.7. 推动式消费(Push Consumer)

Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。

2.2.8. 生产者组(Producer Group)

同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。

2.2.9. 消费者组(Consumer Group)

同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。

2.2.10. 普通顺序消息(Normal Ordered Message)

普通顺序消费模式下,消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。

2.2.11. 严格顺序消息(Strictly Ordered Message)

严格顺序消息模式下,消费者收到的所有消息均是有顺序的。

2.3. 架构设计

2.3.1. 技术架构

技术架构图

2.3.2. 部署架构

部署架构图

3. 安装

由于RocketMQ安装比较简单,在单机这块就省略。

3.1. 单机

略!!!!

3.2. 集群

3.2.1. 集群的方式

  • 单 Master:这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用,不建议线上环境使用。

  • 多 Master 模式:一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master

    • 优点:配置简单,单个Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由与 RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)。性能最高。

    • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到受到影响。

  • 多 Master 多 Slave 模式,异步复制:每个 Master 配置一个 Slave,有多对Master-Slave,HA
    采用异步复制方式,主备有短暂消息延迟,毫秒级。

    • 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为Master 宕机后,消费者仍然可以从 Slave。

    • 缺点:Master 宕机,磁盘损坏情况,会丢失少量消息。

  • 多 Master 多 Slave 模式,同步双写:每个 Master 配置一个 Slave,有多对Master-Slave,HA
    采用同步双写方式,主备都写成功,向应用返回成功。

    • 优点:数据与服务都无单点,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高。

    • 缺点:性能比异步复制模式略低,大约低 10%左右,发送单个消息的 RT 会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能

后两种的方式比较复杂,涉及主从同步的问题,非必要的场景,建议采用多Master 这种方式。而我下面的例子也是以 多Master来部署的。

3.2.2. 部署

  • 服务器环境准备
序号 IP 角色 模式
A 192.168.244.128 nameServer1,brokerServer1 Master1
B 192.168.244.129 nameServer2,brokerServer2 Master2
  • 修改HOST

在 A、B 两服务器中将HOST文件修改,vi /etc/hosts

修改内容如下:

192.168.244.128 rocketmq-nameserver1
192.168.244.128 rocketmq-master1

192.168.244.129 rocketmq-nameserver2
192.168.244.129 rocketmq-master2
  • 上传文件

在A、B两台机器上传RockerMQ文件,并解压。我这里上传和解压路径在/usr/local

[root@centos8 rocketmq]$ pwd
/usr/local/rocketmq
[root@centos8 rocketmq]$ ll
total 40
drwxr-xr-x. 2 root root    83 Jun 24 02:49 benchmark
drwxr-xr-x. 3 root root  4096 Jun 24 02:02 bin
drwxr-xr-x. 6 root root   211 Jun  2 02:09 conf
drwxr-xr-x. 2 root root  4096 Jun 24 02:49 lib
-rw-r--r--. 1 root root 17336 Jun  2 02:09 LICENSE
-rw-r--r--. 1 root root  1338 Jun  2 02:09 NOTICE
-rw-r--r--. 1 root root  5069 Jun 24 02:02 README.md
[root@centos8 rocketmq]$ 

  • 创建存储路径

在A、B两台机器执行创建路径。

[root@centos8 rocketmq]$ mkdir /usr/local/rocketmq/store
[root@centos8 rocketmq]$ mkdir /usr/local/rocketmq/store/commitlog
[root@centos8 rocketmq]$ mkdir /usr/local/rocketmq/store/consumequeue
[root@centos8 rocketmq]$ mkdir /usr/local/rocketmq/store/index
  • 修改配置文件

这里贴一个标准配置文件,具体如下:


// 所属集群名字
brokerClusterName=rocketmq-cluster
//  broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a/broker-b ## 需要按机器A/B 区分
// 0 表示 Master,>0 表示 Slave
brokerId=0
//  nameServer地址,分号分割

namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
// 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
// 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
// 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
// Broker 对外服务的监听端口
listenPort=10911
// 删除文件时间点,默认凌晨 4点
deleteWhen=04
// 文件保留时间,默认 48 小时
fileReservedTime=120
// commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
//  ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
// destroyMapedFileIntervalForcibly=120000
// redeleteHangedFileInterval=120000
// 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
// 存储路径
storePathRootDir=/usr/local/rocketmq/store
// commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
// 消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
// 消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
// checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
// abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
// 限制的消息大小
maxMessageSize=65536
// flushCommitLogLeastPages=4
// flushConsumeQueueLeastPages=2
// flushCommitLogThoroughInterval=10000
// flushConsumeQueueThoroughInterval=60000
// Broker 的角色
//  - ASYNC_MASTER 异步复制Master
//  - SYNC_MASTER 同步双写Master
//  - SLAVE
brokerRole=ASYNC_MASTER
//  刷盘方式
//  - ASYNC_FLUSH 异步刷盘
//  - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
//  checkTransactionMessageEnable=false

//  发消息线程池数量
//  sendMessageThreadPoolNums=128
//  拉消息线程池数量
//  pullMessageThreadPoolNums=128

机器A

[root@centos8 rocketmq]$ vi/usr/local/rocketmq/conf/2m-noslave/broker-a.properties

其中brokerName=broker-a

在机器B

[root@centos8 rocketmq]$ vi/usr/local/rocketmq/conf/2m-noslave/broker-a.properties

其中brokerName=broker-b

  • 修改日志配置文件

不用说肯定也是A、B两台都要改

[root@centos8 rocketmq]$ mkdir -p /usr/local/rocketmq/logs
[root@centos8 rocketmq]$ cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml
  • 修改启动参数(A、B)

/usr/local/rocketmq/bin 路径下,找到runbroker.shrunserver.sh

我们将这两个文件的JAVA_OPT 参数修改下,不然默认情况下,JVM配置是 8G。如 JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"

修改后:

...
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"
....
  • 启动NameServer(A、B)
[root@centos8 rocketmq]$ cd /usr/local/rocketmq/bin
[root@centos8 rocketmq]$ nohup sh mqnamesrv &
机器A
机器B
  • 启动BrokerServer(A)
[root@centos8 rocketmq]$ cd /usr/local/rocketmq/bin
[root@centos8 rocketmq]$ nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties &
  • 启动BrokerServer(B)
[root@centos8 rocketmq]$ cd /usr/local/rocketmq/bin
[root@centos8 rocketmq]$ nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-b.properties &
  • rocketmq-console安装

rocketmq-console下载

这个包需要利用maven编译打包。我这里打包一个,放百度云盘上,供下载!

链接: https://pan.baidu.com/s/1hfvzJeyBG7TXnvPHtn3C5Q

提取码: atkq

最后执行jar文件

[root@centos8 rocketmq]$ java -jar rocketmq-console-ng-1.0.0.jar

页面的端口是 8082,刚开始启动有点慢,稍微等会!!

rocketmq-console界面
  • 数据清理
[root@centos8 rocketmq]$ cd /usr/local/rocketmq/bin
[root@centos8 rocketmq]$ sh mqshutdown broker
[root@centos8 rocketmq]$ sh mqshutdown namesrv

这里需要等待完全停止!

[root@centos8 rocketmq]$ rm -rf /usr/local/rocketmq/store
[root@centos8 rocketmq]$ mkdir /usr/local/rocketmq/store
[root@centos8 rocketmq]$ mkdir /usr/local/rocketmq/store/commitlog
[root@centos8 rocketmq]$ mkdir /usr/local/rocketmq/store/consumequeue
[root@centos8 rocketmq]$ mkdir /usr/local/rocketmq/store/index

最终按照以上步骤重启NameServer和BrokerServer即可!

4. SpringBoot集成

  • POM文件添加依赖
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>{换成相应版本}</version>
</dependency>
  • application.yml
######### 4.1. RocketMQ ##########
rocketmq:
  name-server: 192.168.244.128:9876;192.168.244.129:9876;
  producer:
    group: drunkard
    send-message-timeout: 30000
  • 生产者
@Slf4j
@RestController
public class RocketMqDemo {

    @Autowired
    RocketMQTemplate rocketMQTemplate;

    @GetMapping("send/{id}")
    public String send(@PathVariable("id") String id){
        UserVo userVo  = new UserVo(id,"侯征");
        log.warn(JSON.toJSONString(userVo));
        rocketMQTemplate.send("rocket-topic-01", MessageBuilder.withPayload(userVo).build());
        return "SUCESS";
    }
}
  • 消费者
@Slf4j
@Component
@RocketMQMessageListener(topic = "rocket-topic-01", consumerGroup = "my-rocket-topic-01")
public class UserConsumer implements RocketMQListener<UserVo> {

    @Override
    public void onMessage(UserVo message) {
        log.warn("接受到消息: {}",message.toString());
    }
}

5. 案例下载

觉得对你有帮助,请Star

Gitee下载,希望多多给Star。

Github下载,希望多多给Star。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343