rocketmq实践笔记

1 架构原理

 1.1 应用场景


只支持发布订阅模式。

大数据量的消息堆积能力,最终数据是持久化到磁盘上,理论上无限大。

用于业务消息的收发,日志消息的收发请用kafka生态。

1.2 namesrv

namesrv节点是无状态的,用于对整个rocketmq集群的状态保持,包括多少主节点、从节点、topic路由(topic分布在哪些节点上),周期性的健康检查等等。

与broker节点保持长连接。

心跳:间隔30s,心跳超时时间10s,这两个参数无法改变。

一旦连接断开,namesrv会立即感知,但不会立马通知生产者,消费者。需要生产者消费者自身定时获取最新的rocketmq元数据信息

 1.3  broker

实际收发消息的节点。

broker节点分主节点(brokerid=0)、从节点(brokerid=其他)。简单理解,从节点是主节点的拷贝,复制。

具体如何复制,有两种集群模式异步复制、同步双写。

异步复制:从节点从主节点异步复制commitlog,所以存在小概率的数据丢失风险

同步双写:为了解决异步复制存在的缺陷,同步双写模式是生产者同步往主、从节点写数据,从而保证主从节点的数据是完全一致的。但性能略低于异步复制。适用于高可靠性场景,例如电商

 1.4  生产者、消费者

生产者负责生产消息,发消息。

消费者者负责订阅消息,消费消息。

生产者连接到namesrv,定时同步,获取rocketmq集群元数据。

与broker建立长连接,broker每隔10秒,检查连接,如果超过2分钟没有心跳,关闭连接。

消费模式:广播消费、集群消费。广播消费是指每个消费者都会收到同样的消息。集群消费模式则是按照一定分配策略,每个消费者负责消费不同的队列。


 1.5 消息队列

如果每个topic由管理员手动在cluster创建,会在每个broker上默认创建8个队列。可以认为队列是一个无线长度的数组,offset是下标,消费者通过offset来访问消息。

问题:队列数量怎么设定,会对吞吐量造成影响吗?


2 数据持久化过程

2.1 发消息

producer-》broker

broker是一个java程序,数据最先到jvm堆上-》页缓存(物理内存,堆外内存)-》本地磁盘

2.2 收消息

NIO 零拷贝机制,无需拷贝到堆内存。

 先从页缓存中找,如果找不到再从本地磁盘load

2.3 消息清理

默认:broker会定期删除过期的消息,默认有效期3天,每天凌晨4点执行这个操作。

3天前的消息会从队列尾部删除。

可在broker配置文件中修改

3 如何保证高可用

 3.1 影响可靠性的几种情况

broker不可用、宿主机宕机、宿主机损坏、磁盘损坏等

主节点完全损坏数据不可恢复的情况下,怎么处理? 

同步双写模式(v4.2),slave节点无法自动进行故障转移,此时就要线上新增主节点、从节点,等3天后,就可以将发生故障节点的从节点下线。


3.2 负载均衡

topic要保证分配到所有broker上,这里可以利用多个broker的负载均衡能力。默认为每个topic创建8个队列。假设有3个主broker,就有24个队列。

RocketMQ用一个叫ClientID的概念,来唯一标记一个客户端实例,一个客户端实例对于Broker而言会开辟一个Netty的客户端实例。 而ClientID是由ClientIP+InstanceName构成

发消息时,默认是采用轮询的策略往每个可用的队列发。

如果要发生顺序消息。则在发送的时候,可以根据业务ID,固定选择一个队列。

顺序消息存在的缺陷:当指定队列所在的broker不可用时,由于队列总数发生变化,hash取模后的定位队列发生变化,会导致短暂的乱序。

当一个消费者组里上线下线节点的时候,怎么重新分配队列呢?

在消费者触发

1、每个客户端在启动时会去namesrv上获取元数据,topic所在的broker、有多少队列

2、获取该组里的其他节点,知道组里的其他兄弟姐妹

3、除非rebalance,重新分配各自负责的队列,

节点下线怎么办?什么时候触发?触发条件都有哪些?

1、20s定时reblance

2、所有consumer收到broker的consume变化通知,例如上线、下线

3、每次client启动时。

心跳间隔要短些,1秒 2秒。这里心跳超时就很关键了,不能太长,也不能过于短。

broker多久会把已经下线的consume踢掉呢?前面讲到是2分钟,太滞后了?

实际测试情况,版本号v4.3:

起一个生产者线程;2个consumer,中途停掉一个consumer,查看另一个consumer是否已经接手所有topic 队列。

默认配置下consumer上下线很快就能触发reblance,这个配置并不是2分钟。所以放心使用吧,不用担心failover太长。

3.3 failover故障转移能力

slave节点挂掉,无影响;

master挂掉的时候,该节点负责的topic的写能力会受影响,读没有影响。slave无法自动升级为master节点,也没这个必要,简单粗暴。

 3.4 重试机制

可以在producer初始化的时候配置多样的重试策略,可向同一个broker发起多次重试,broker失败后重试其他broker。初始化配置如下:

```

DefaultMQProducer producer =new DefaultMQProducer(group);

producer.setNamesrvAddr(nameServer);

producer.setInstanceName(instanceName);

producer.setCompressMsgBodyOverHowmuch(compressMsgBodyOverHowmuch);

producer.setMaxMessageSize(maxMessageSize);

producer.setRetryAnotherBrokerWhenNotStoreOK(retryAnotherBrokerWhenNotStoreOk);

producer.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendAsyncFailed);

producer.setRetryTimesWhenSendFailed(retryTimesWhenSendFailed);

// 使用故障延迟机制,会对获取的MQ进行可用性验证

producer.setSendLatencyFaultEnable(true);

producer.setSendMsgTimeout(sendMsgTimeout);

//设置到broker的心跳

producer.setHeartbeatBrokerInterval(3000);

//从namesrv获取topic路由

producer.setPollNameServerInterval(3000);

```

3.5 一些健康检查的时间

生产者与消费者会延迟(默认30s心跳),发往该broker的消息失败,此时要使用重试机制。

心跳间隔可以在producer或consumer初始化的时候指定heartbeat参数。

4 最佳实践

4.1 配置优化

autoCreateTopicEnable=false    生产环境禁用自动创建topic ,由管理员手工在cluster的每个节点上创建,why?

storePathRootDir=/rocketmq/store  设置消息内容的根目录,如果是docker容器化,则需要把此路径外挂到外部存储

storePathCommitLog=/rocketmq/store/commitlog 设置commitlog日志的根目录

,如果是docker容器化,则需要把此路径外挂到外部存储

maxMessageSize  500000  500k

sendMessageThreadPoolNums=128  # 发消息线程池 ,

pullMessageThreadPoolNums=128  拉消息线程池

useReentrantLockWhenPutMessage=true#否则报flow control异常 

 4.2 DOCKER容器化 部署

集群容器化部署请参考:

https://github.com/wuzuquan/docker-file/tree/master/rocketmq

注意RocketMQ只有一种模式,即发布订阅模式。

1、多master  slave模式,异步复制 ,当master宕机时,会丢失极少量的消息

2、多master slave模式,同步双写,性能稍低,电商场景推荐使用此模式。


docker build -f dockerfile-broker -t 11.4.76.193/redis/rocketmq-broker:4.3.0 .

docker build -f dockerfile-namesrv -t 11.4.76.193/redis/rocketmq-namesvr:4.3.0 .

编写docker-compose,编排 部署。

在docker环境下的坑: host模式下, brokerip1异常,往namesrv注册的ip地址变为docker虚拟网络的网关ip。发消息会返回:SLAVE_NOT_AVAILABLE

云生产环境,只能使用托管模式,host模式slave复制失效

rocketmq消息日志:

conf目录下有几个logback*.xml文件,将文件中的${user.home}/logs/rocketmqlogs/broker_default.log修改为目标日志路径即可

否则默认在root目录下,一会就撑满,或者把日志级别调高

测试,在namesrv节点上:

添加环境变量:export NAMESRV_ADDR=localhost:9876

测试生产者:sh tools.sh org.apache.rocketmq.example.quickstart.Producer

测试消费者:sh tools.sh org.apache.rocketmq.example.quickstart.Consumer

 4.3 客户端

客户端配置:

heartbeatBrokerInterval 5000,默认30s,向broker发生心跳

pollNameServerInterval 3000,轮询namesrv的时间

 4.4 生产者发消息

1、一个应用使用一个topic,topic由管理员创建

2、设置keys字段,用于定位排查。keys要保持唯一性

3、无论发生成功还是失败,打印消息操作日志 :sendresult、key

4、对于不可丢失的消息,要有重试机制,可在producer初始化时统一配置

5、对于可靠性要求不高的应用,可以用oneway方式异步发生

6、顺序消息:简单的讲,每个topic都有N个队列,保证把消息发生到同一个队列就是顺序消息了。但是顺序消息无法利用failover特性,当队列所在的master挂掉,如果主备切换有一小段时间不可用,顺序消息会发送失败。

顺序消息的一种,正常情况下可以保证完全的顺序消息,但是一旦发生通信异常,Broker 重启,由于队列 总数发生变化,哈希取模后定位的队列会变化,产生短暂的消息顺序不一致。 如果业务能容忍在集群异常情况(如某个 Broker 宕机或者重启)下,消息短暂的乱序,使用普通顺序方 式比较合适

 4.5 消费者

1、消费过程要做到幂等性(消费端去重机制)

根据keys字段进行去重,消费过的可以存入redis或db中

2、批量方式消费

consumer.setConsumeMessageBatchMaxSize(50);

consumer.setConsumeThreadMin(2); 

consumer.setConsumeThreadMax(5);

3、必须要有消费者组,由管理员后台创建

每个组有自己独立的offset,可以独立的消费同一个topic

4、从哪里开始消费?

comsumefromwhere配置指定:

CONSUME_FROM_LAST_OFFSET 从上一次消费开始

CONSUME_FROM_FIRST_OFFSET ,从头开始消费

CONSUME_FROM_TIMESTAMP ,从指定时间开始


 6  v4.3新特性:事务消息

在之前版本中,事务消息曾经被阉割过,应该是不成熟。4.3版本又重新推出此功能乃一大利好,而且不依赖于其他外部组件。

什么叫事务消息:简单的讲,业务逻辑与发消息两个操作,要么同时成功,要么同时失败。

对于电商交易过程中消息强一致性需求,可以采用此种消息模式。

也可以借助事务消息实现弱一致性的分布式事务,尤其是上游业务无需关注下游业务是否成功,来决定是否需要回滚的场景。

事务消息代码示例参考官方github,或这里:

https://github.com/wuzuquan/microservice/blob/master/core/src/test/java/com/xmair/core/TransactionMQTest.java

事务消息源码分析:https://blog.csdn.net/prestigeding/article/details/81263833

prepare主题消息:RMQ_SYS_TRANS_HALF_TOPIC

commit or rollback:RMQ_SYS_TRANS_OP_HALF_TOPIC(对client不可见)

事务消息发生过程:

1、发送方:先往half队列发prepare消息

2、发送方:执行本地事务

3、发送方:如果commit,就发生commit消息,下发给订阅者;如果rollback就发生rollback消息,删除prepare,不下发。

4、rocketmq:如果没有接收到任何信息,可能超时啦,出了各种异常,咋办?回查事务状态,有可能发送方实例已经宕机,需要回查同一个生产者组的其他实例来获取状态。具体怎么获取?参考rocketmq的事务消息示例代码即可。

5、consumer段消息成功机制保障

一些配置:

检测频率在哪里设置,在broker.conf中设置transactionCheckInterval,太长了,如果要结合分布式事务,不可能给与这么长的超时时间,1-2秒较合适。

检测的时候只会对超时的prepare消息检测,超时时间在哪里设置?

transactionTimeOut,默认3秒,根据具体情况设定。

重试几次?transactionCheckMax,如果超过这个检测次数,丢弃消息,rollback。默认是5次,太多了。2-3次即可。这里有个难以解决的问题,假设producer业务已操作成功,但是所有producer都死了,没法回查,消息被超时rollback。导致下游业务接收不到消息。

6 从哪里读取消息

总结下主从切换流程:

1、当Master和Slave都正常的情况下,默认从Master处读取消息,若开启了slaveReadEnable ,且Master处积压了超过40%内存总量的未消费消息,那么会从Slave=1的Broker处读取消息。

2、当Master宕机时,长时间未向Namesrv发送心跳,Namesrv清空下线的BrokerData,Consumer从Namesrv获取的TopicRouteData里已经没有了Master的BrokerData,因此Consumer从自身的brokerAddr集合里找不到Master的BrokerAddr了,因此就按顺序向第一位Slave发送消息拉取请求。默认配置下slaveReadEnable = false,因此Slave在从CommitLog读取到消息后,设置其suggest brokerId = 0 ,也就是建议其下次从Master处读取消息。

3、在Master为恢复前,都是读取不到其brokerAddr的,因此每次都是从Slave处拉取消息,虽然其每次都建议Consumer从Master处读取。

4、当Master恢复了,从新向Namesrv发送了心跳,注册了Broker信息。Consumer获取的最新TopicRouteData里包含了Master的brokerAddr,那么下次就会听从建议从Master处读取消息,从Slave切换回Master。

7 linux内核优化

http://rocketmq.apache.org/docs/system-config/

#sysctl.conf

echo vm.max_map_count=655360 >> /etc/sysctl.conf

echo vm.overcommit_memory=1 >> /etc/sysctl.conf

echo net.core.somaxconn= 16384 >> /etc/sysctl.conf

echo vm.swappiness=0 >> /etc/sysctl.conf

echo vm.min_free_kbytes=5000 >>/etc/sysctl.conf

8 运维监控

安装官方console工具

9  常用命令

cd  /ROCKETMQ/bin


创建topic:

sh mqadmin updateTopic -c DefaultCluster -n localhost:9876 -t testtopic

查看topic列表:

sh mqadmin topicList -n 11.4.74.48:9876

查看指定topic路由信息:

sh mqadmin topicRoute -n 11.4.74.48:9876 -t testtopic1

查看指定topic状态:

sh mqadmin topicStatus -n 11.4.74.48:9876 -t testtopic1

查看指定topic的消息:

sh mqadmin queryMsgByOffset -b broker-b -i 1  -o 0 -t NewUpt_PsgEventTopic -n localhost:9876

创建消费者组:

sh mqadmin updateSubGroup -n localhost:9876 -c DefaultCluster -g push-consumer-group1

查看消费者组:

sh mqadmin consumerProgress -n localhost:9876

查看集群信息:

sh mqadmin clusterList -n 11.4.74.48:9876

查看broker状态

sh mqadmin brokerStatus -b 11.4.74.44:10911 -n 11.4.74.48:9876

创建生产者组:

创建订阅组:

sh mqadmin updateSubGroup -c DefaultCluster -g subgroup1 -n 11.4.74.48:9876果是对集群扩容,则可以通过指定新的broker地址在扩容的机器上创建一份新的订阅组信息

根据key查消息:

sh mqadmin queryMsgByKey -k test100 -t testtopic1  -n 11.4.74.48:9876

根据unique key查:(非msgid )

sh mqadmin queryMsgByUniqueKey  -i  0B0D308C16C418B4AAC236B77FFB03E3  -n 11.4.74.48:9876 -t testtopic1

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容