0. Docker下RocketMq的搭建
官方提供多种玩法:
- Single Node.
- Cluster with docker-compose.
- Cluster on Kubernetes.
这里采取docker-compose。
0.1 镜像
docker hub有来自官方的镜像,只需要docker pull rocketmqinc/rocketmq
即可。
自己build镜像可以参考rocketmq-docker/image-build。
0.2 docker-compose玩法
查看rocketmq-docker/templates/docker-compose目录,可以看到这里提供了相应的配置文件和目录结构。于是我得到下面的目录结构:
没什么稀奇的,就把对应的目录建好,把配置文件复制下来即可。
这里docker-compose.yml需要略加修改,修改后:
version: '2'
services:
#Service for nameserver
namesrv:
image: rocketmqinc/rocketmq
container_name: rmqnamesrv
ports:
- 9876:9876
volumes:
- ./data/namesrv/logs:/home/rocketmq/logs
- ./data/namesrv/store:/home/rocketmq/store
command: sh mqnamesrv
#Service for broker
broker:
image: rocketmqinc/rocketmq
container_name: rmqbroker
links:
- namesrv
ports:
- 10909:10909
- 10911:10911
environment:
- NAMESRV_ADDR=namesrv:9876
volumes:
- ./data/broker/logs:/home/rocketmq/logs
- ./data/broker/store:/home/rocketmq/store
- ./data/broker/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf
command: sh mqbroker -c ../conf/broker.conf
#Service for another broker -- broker1
broker1:
image: rocketmqinc/rocketmq
container_name: rmqbroker1
links:
- namesrv
ports:
- 10929:10909
- 10931:10911
environment:
- NAMESRV_ADDR=namesrv:9876
volumes:
- ./data1/broker/logs:/home/rocketmq/logs
- ./data1/broker/store:/home/rocketmq/store
- ./data1/broker/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf
command: sh mqbroker -c ../conf/broker.conf
好了,docker-compose up
运行一下,为了看运行状态信息,这里没加 -d
,可以看到:
可以看到1个nameserver 2个broker 已经正常运行。
查看官方的README.md文件,验证一下是否正常。
好的,试一下
1.docker ps|grep rmqbroker
:这一步没问题,试试下一步:
2.Use docker exec -it {container_id} ./mqadmin clusterList -n {nameserver_ip}:9876
首先先获取一下nameserver的容器IP。两个命令,先拿nameserver的容器ID, ````:
然后
docker inspect ac94da50b3ce | grep IP
拿IP:执行一下:
docker exec -it d18a3a37e9ab ./mqadmin clusterList -n 172.22.0.2:9876
报错了
Caused by: java.security.NoSuchAlgorithmException: Algorithm HmacSHA1 not available
:查了很久才找到这个贴:https://github.com/apache/rocketmq-externals/issues/276
镜像里面没指定JAVA_HOME。
看看镜像里面的jre目录,还挺多的:
好了,再改下docker-compose.yml,在broker的environment加上JAVA_HOME的配置:
- JAVA_HOME=/usr/lib/jvm/jre
完整配置如下:
version: '2'
services:
#Service for nameserver
namesrv:
image: rocketmqinc/rocketmq
container_name: rmqnamesrv
ports:
- 9876:9876
volumes:
- ./data/namesrv/logs:/home/rocketmq/logs
- ./data/namesrv/store:/home/rocketmq/store
command: sh mqnamesrv
#Service for broker
broker:
image: rocketmqinc/rocketmq
container_name: rmqbroker
links:
- namesrv
ports:
- 10909:10909
- 10911:10911
environment:
- NAMESRV_ADDR=namesrv:9876
- JAVA_HOME=/usr/lib/jvm/jre
volumes:
- ./data/broker/logs:/home/rocketmq/logs
- ./data/broker/store:/home/rocketmq/store
- ./data/broker/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf
command: sh mqbroker -c ../conf/broker.conf
#Service for another broker -- broker1
broker1:
image: rocketmqinc/rocketmq
container_name: rmqbroker1
links:
- namesrv
ports:
- 10929:10909
- 10931:10911
environment:
- NAMESRV_ADDR=namesrv:9876
- JAVA_HOME=/usr/lib/jvm/jre
volumes:
- ./data1/broker/logs:/home/rocketmq/logs
- ./data1/broker/store:/home/rocketmq/store
- ./data1/broker/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf
command: sh mqbroker -c ../conf/broker.conf
重启,再试一下:
可以了,Perfect!!!
=========================分割线=======================
以下内容copy自知乎专栏RocketMQ详解【侵删】
1.服务端组件介绍
RocketMQ服务端的组件有三个,NameServer,Broker,FilterServer(可选,部署于和Broker同一台机器)
Name Server
Name Server是RocketMQ的寻址服务。用于把Broker的路由信息做聚合。客户端依靠Name Server决定去获取对应topic的路由信息,从而决定对哪些Broker做连接。
Name Server是一个几乎无状态的结点,Name Server之间采取share-nothing的设计,互不通信。
对于一个Name Server集群列表,客户端连接Name Server的时候,只会选择随机连接一个结点,以做到负载均衡。
Name Server所有状态都从Broker上报而来,本身不存储任何状态,所有数据均在内存。
如果中途所有Name Server全都挂了,影响到路由信息的更新,不会影响和Broker的通信。Broker
Broker是处理消息存储,转发等处理的服务器。
Broker以group分开,每个group只允许一个master,若干个slave。
只有master才能进行写入操作,slave不允许。
slave从master中同步数据。同步策略取决于master的配置,可以采用同步双写,异步复制两种。
客户端消费可以从master和slave消费。在默认情况下,消费者都从master消费,在master挂后,客户端由于从Name Server中感知到Broker挂机,就会从slave消费。
Broker向所有的NameServer结点建立长连接,注册Topic信息。Filter Server(可选)
RocketMQ可以允许消费者上传一个Java类给Filter Server进行过滤。
Filter Server只能起在Broker所在的机器。
拉取消息的时候,消息先经过Filter Server,Filter Server靠上传的Java类过滤消息后才推给Consumer消费。
客户端完全可以消费消息的时候做过滤,不需要Filter Server。
FilterServer存在的目的是用Broker的CPU资源换取网卡资源。因为Broker的瓶颈往往在网卡,而且CPU资源很闲。在客户端过滤会导致无需使用的消息在占用网卡资源。
使用 Java 类上传作为过滤表达式是一个双刃剑,一方面方便了应用的过滤操作且节省网卡资源,另一方面也带来了服务器端的安全风险,这需要足够谨慎,消费端上传的class要保证过滤的代码足够安全——例如在过滤程序里尽可能不做申请大内存,创建线程等操作,避免 Broker 服务器资源泄漏。
2.核心概念及术语
角色
- Producer
生产者。发送消息的客户端角色。发送消息的时候需要指定Topic。 - Consumer
消费者。消费消息的客户端角色。通常是后台处理异步消费的系统。 RocketMQ中Consumer有两种实现:PushConsumer和PullConsumer。
1).PushConsumer
推送模式(虽然RocketMQ使用的是长轮询)的消费者。消息的能及时被消费。使用非常简单,内部已处理如线程池消费、流控、负载均衡、异常处理等等的各种场景。
2).PullConsumer
拉取模式的消费者。应用主动控制拉取的时机,怎么拉取,怎么消费等。主动权更高。但要自己处理各种场景。
概念术语
- Producer Group
标识发送同一类消息的Producer,通常发送逻辑一致。发送普通消息的时候,仅标识使用,并无特别用处。若事务消息,如果某条发送某条消息的producer-A宕机,使得事务消息一直处于PREPARED状态并超时,则broker会回查同一个group的其 他producer,确认这条消息应该commit还是rollback。但开源版本并不完全支持事务消息(阉割了事务回查的代码)。 - Consumer Group
标识一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致。同一个Consumer Group下的各个实例将共同消费topic的消息,起到负载均衡的作用。
消费进度以Consumer Group为粒度管理,不同Consumer Group之间消费进度彼此不受影响,即消息A被Consumer Group1消费过,也会再给Consumer Group2消费。 - Topic
标识一类消息的逻辑名字,消息的逻辑管理单位。无论消息生产还是消费,都需要指定Topic。 - Tag
RocketMQ支持给在发送的时候给topic打tag,同一个topic的消息虽然逻辑管理是一样的。但是消费topic1的时候,如果你订阅的时候指定的是tagA,那么tagB的消息将不会投递。 - Message Queue
简称Queue或Q。消息物理管理单位。一个Topic将有若干个Q。若Topic同时创建在不同的Broker,则不同的broker上都有若干Q,消息将物理地存储落在不同Broker结点上,具有水平扩展的能力。
无论生产者还是消费者,实际的生产和消费都是针对Q级别。例如Producer发送消息的时候,会预先选择(默认轮询)好该Topic下面的某一条Q地发送;Consumer消费的时候也会负载均衡地分配若干个Q,只拉取对应Q的消息。
每一条message queue均对应一个文件,这个文件存储了实际消息的索引信息。并且即使文件被删除,也能通过实际纯粹的消息文件(commit log)恢复回来。 - Offset
RocketMQ中,有很多offset的概念。但通常我们只关心暴露到客户端的offset。一般我们不特指的话,就是指逻辑Message Queue下面的offset。
可以认为一条逻辑的message queue是无限长的数组。一条消息进来下标就会涨1。下标就是offset。
一条message queue中的max offset表示消息的最大offset。注:这里从源码上看,max_offset并不是最新的那条消息的offset,而是表示最新消息的offset+1。
而min offset则标识现存在的最小offset。
由于消息存储一段时间后,消费会被物理地从磁盘删除,message queue的min offset也就对应增长。这意味着比min offset要小的那些消息已经不在broker上了,无法被消费。 - Consumer Offset
用于标记Consumer Group在一条逻辑Message Queue上,消息消费到哪里了。
消费者拉取消息的时候需要指定offset,broker不主动推送消息,而是接受到请求的时候把存储的对应offset的消息返回给客户端。这个offset在成功消费后会更新到内存,并定时持久化。在集群消费模式下,会同步持久化到broker。在广播模式下,会持久化到本地文件。
实例重启的时候会获取持久化的consumer offset,用以决定从哪里开始消费。 - 集群消费
消费者的一种消费模式。一个Consumer Group中的各个Consumer实例分摊去消费消息,即一条消息只会投递到一个Consumer Group下面的一个实例。
实际上,每个Consumer是平均分摊Message Queue的去做拉取消费。例如某个Topic有3条Q,其中一个Consumer Group 有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中的1条Q。
而由Producer发送消息的时候是轮询所有的Q,所以消息会平均散落在不同的Q上,可以认为Q上的消息是平均的。那么实例也就平均地消费消息了。
这种模式下,消费进度的存储会持久化到Broker。 - 广播消费
消费者的一种消费模式。消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即即使这些 Consumer 属于同一个Consumer Group,消息也会被Consumer Group 中的每个Consumer都消费一次。
实际上,是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。
这种模式下,消费进度会存储持久化到实例本地。 - 顺序消息
消费消息的顺序要同发送消息的顺序一致。由于Consumer消费消息的时候是针对Message Queue顺序拉取并开始消费,且一条Message Queue只会给一个消费者(集群模式下),所以能够保证同一个消费者实例对于Q上消息的消费是顺序地开始消费(不一定顺序消费完成,因为消费可能并行)。
在RocketMQ中,顺序消费主要指的是都是Queue级别的局部顺序。这一类消息为满足顺序性,必须Producer单线程顺序发送,且发送到同一个队列,这样Consumer就可以按照Producer发送的顺序去消费消息。
生产者发送的时候可以用MessageQueueSelector为某一批消息(通常是有相同的唯一标示id)选择同一个Queue,则这一批消息的消费将是顺序消息(并由同一个consumer完成消息)。或者Message Queue的数量只有1,但这样消费的实例只能有一个,多出来的实例都会空跑。
- 普通顺序消息
顺序消息的一种,正常情况下可以保证完全的顺序消息,但是一旦发生异常,Broker宕机或重启,由于队列总数发生发化,消费者会触发负载均衡,而默认地负载均衡算法采取哈希取模平均,这样负载均衡分配到定位的队列会发化,使得队列可能分配到别的实例上,则会短暂地出现消息顺序不一致。
如果业务能容忍在集群异常情况(如某个 Broker 宕机或者重启)下,消息短暂的乱序,使用普通顺序方式比较合适。
- 严格顺序消息
顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式 Failover 特性,即 Broker集群中只要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。
如果服务器部署为同步双写模式,此缺陷可通过备机自动切换为主避免,不过仍然会存在几分钟的服务不可用。(依赖同步双写,主备自动切换,自动切换功能目前并未实现)
3. 水平扩展及负载均衡详解
Broker端水平扩展
Broker负载均衡
Broker是以group为单位提供服务。一个group里面分master和slave,master和slave存储的数据一样,slave从master同步数据(同步双写或异步复制看配置)。
过nameserver暴露给客户端后,只是客户端关心(注册或发送)一个个的topic路由信息。路由信息中会细化为message queue的路由信息。而message queue会分布在不同的broker group。所以对于客户端来说,分布在不同broker group的message queue为成为一个服务集群,但客户端会把请求分摊到不同的queue。
而由于压力分摊到了不同的queue,不同的queue实际上分布在不同的Broker group,也就是说压力会分摊到不同的broker进程,这样消息的存储和转发均起到了负载均衡的作用。
Broker一旦需要横向扩展,只需要增加broker group,然后把对应的topic建上,客户端的message queue集合即会变大,这样对于broker的负载则由更多的broker group来进行分担。
并且由于每个group下面的topic的配置都是独立的,也就说可以让group1下面的那个topic的queue数量是4,其他group下的topic queue数量是2,这样group1则得到更大的负载。
commit log
虽然每个topic下面有很多message queue,但是message queue本身并不存储消息。真正的消息存储会写在CommitLog的文件,message queue只是存储CommitLog中对应的位置信息,方便通过message queue找到对应存储在CommitLog的消息。
不同的topic,message queue都是写到相同的CommitLog 文件,也就是说CommitLog完全的顺序写。
具体如下图:
Producer
Producer端,每个实例在发消息的时候,默认会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下,如下图:
Consumer负载均衡
集群模式
在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message queue。
而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。
默认的分配算法是AllocateMessageQueueAveragely,如下图:
还有另外一种平均的算法是AllocateMessageQueueAveragelyByCircle,也是平均分摊每一条queue,只是以环状轮流分queue的形式,如下图:
需要注意的是,集群模式下,queue都是只允许分配只一个实例,这是由于如果多个实例同时消费一个queue的消息,由于拉取哪些消息是consumer主动控制的,那样会导致同一个消息在不同的实例下被消费多次,所以算法上都是一个queue只分给一个consumer实例,一个consumer实例可以允许同时分到不同的queue。
通过增加consumer实例去分摊queue的消费,可以起到水平扩展的消费能力的作用。而有实例下线的时候,会重新触发负载均衡,这时候原来分配到的queue将分配到其他实例上继续消费。
但是如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让queue的总数量大于等于consumer的数量。
广播模式
由于广播模式下要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就没有消息被分摊消费的说法。
在实现上,其中一个不同就是在consumer分配queue的时候,会所有consumer都分到所有的queue。
4.消息ACK机制及消费进度管理
https://zhuanlan.zhihu.com/p/25140744 中剖析过,consumer的每个实例是靠队列分配来决定如何消费消息的。那么消费进度具体是如何管理的,又是如何保证消息成功消费的(RocketMQ有保证消息肯定消费成功的特性(失败则重试)?
本文将详细解析消息具体是如何ack的,又是如何保证消费肯定成功的。