第7章 吞吐量优先的使用场景

本章介绍在大流量场景下,提高RocketMQ 集群吞吐量的一些方法,有些方法当服务器出异常时会增大丢消息的概率,用户需要根据业务需求酌情使用。

7.1 在Broker 端进行消息过滤

在Broker 端进行消息过滤,可以减少元效消息发送到Consumer ,少占用网络带宽从而提高吞吐量。Broker 端有三种方式进行消息过滤。

7.1.1 消息的Tag 和Key

对一个应用来说,尽可能只用一个Topic ,不同的消息子类型用Tag 来标识(每条消息只能有一个Tag),服务器端基于Tag 进行过滤,并不需要读取消息体的内容,所以效率很高。发送消息设置了Tag 以后,消费方在订阅消息时,才可以利用Tag 在Broker 端做消息过滤。
其次是消息的Key 。对发送的消息设置好Key ,以后可以根据这个Key 来查找消息。所以这个Key 一般用消息在业务层面的唯一标识码来表示,这样后续查询消息异常,消息丢失等都很方便。Broker 会创建专门的索引文件,来存储Key 到消息的映射,由于是哈希索引,应尽量使Key 唯一,避免潜在的哈希冲突。

Tag 和Key 的主要差别是使用场景不同,Tag 用在Consumer 的代码中,用来进行服务端消息过滤, Key 主要用于通过命令行查询消息。

7.1.2 通过Tag 进行过滤

用Tag 方式进行过滤的方法是传入感兴趣的Tag 标签,Tag 标签是一个普通字符串,是在创建Message 的时候添加的, 一个Message 只能有一个Tag。使用Tag 方式过滤非常高效, Broker 端可以在ConsumeQueue 中做这种过滤、,只从CommitLog 里读取过滤后被命中的消息。看一下ConsumerQueue 的存储格式,如图7-1 所示。

图7- 1 ConsumerQueue 的存储格式

Consume Queue 的第三部分存储的是Tag 对应的hash code ,是一个定长的字符串,通过Tag 过滤的过程就是对比定长的hashcode 。经过hash code 对比,符合要求的消息被从CommitLog 读取出来,不用担心Hash 冲突问题,消息在被消费前,会对比完整的Message Tag 字符串,消除Hash 冲突造成的误读。

7.1.3 用SQL 表达式的方式进行过滤

使用Tag 方式过滤虽然高效,但是支持的逻辑比较简单,在构造Message的时候,还可以通过putUserProperty 函数来增加多个自定义的属性,基于这些属性可以做复杂的过滤逻辑。

我们用类似SQL 表达式的方式对消息进行过滤。支持如下语法:

  • 数字对比, 比如>、>=、<、<= 、BETWEEN 、=;
  • 字符串对比,比如= 、<>、IN;
  • IS NULL or IS NOT NULL;
  • 逻辑符号AND 、OR 、NOT 。
    支持的数据类型:
  • 数字型,比如123 、3.1415;
  • 字符型,比如' abe’ 、注意必须用单引号;
  • NULL ,这个特殊字符;
  • 布尔型, TRUEorFALSE 。

SQL 表达式方式的过滤需要Broker 先读出消息里的属性内容, 然后做SQL 计算,增大磁盘压力,没有Tag 方式高效。

7.1.4 Filter Server 方式过滤

Filter Server 是一种比SQL 表达式更灵活的过滤方式,允许用户自定义Java 函数,根据Java 函数的逻辑对消息进行过滤。
要使用Filter Server , 首先要在启动Broker 前在配置文件里加上filterServerNums= 3 这样的配置, Broker 在启动的时候, 就会在本机启动3 个FilterServer 进程。Filter Server 类似一个RocketMQ 的Consumer 进程,它从本机Broker 获取消息,然后根据用户上传过来的Java 函数进行过滤,过滤后的消息再传给远端的Consumer 。这种方式会占用很多Broker 机器的CPU 资源,要根据实际情况谨慎使用。上传的java 代码也要经过检查,不能有申请大内存、创建线程等这样的操作,否则容易造成Broker 服务器宕机。

7.2 提高Consumer 处理能力

当Consumer 的处理速度眼不上消息的产生速度,会造成越来越多的消息积压,这个时候首先查看消费逻辑本身有没有优化空间,除此之外还有三种方法可以提高Consumer 的处理能力。

  1. 提高消费并行度
    在同一个ConsumerGroup 下(Clustering方式),可以通过增加Consumer实例的数量来提高并行度,通过加机器,或者在已有机器中启动多个Consumer进程都可以增加Consumer 实例数。注意总的Consumer 数量不要超过Topic 下Read Queue 数量,超过的Consumer 实例接收不到消息。此外,通过提高单个Consumer 实例中的并行处理的线程数可以在同一个Consumer 内增加并行度来提高吞吐量(设置方法是修改consumeThreadMin 和consumeThreadMax) 。
  2. 以批量方式进行消费
    某些业务场景下,多条消息同时处理的时间会大大小于逐个处理的时间总和,比如消费消息中涉及update 某个数据库, 一次update IO 条的时间会大大小于十次update 1条数据的时间。这时可以通过批量方式消费来提高消费的吞吐量。实现方法是设置Consumer 的consumeMessageBatchMaxSize 这个参数,默认是1 ,如果设置为N,在消息多的时候每次收到的是个长度为N 的消息链表。
  3. 检测延时情况,跳过非重要消息
    Consumer 在消费的过程中, 如果发现由于某种原因发生严重的消息堆积,短时间无法消除堆积,这个时候可以选择丢弃不重要的消息,使Consumer 尽快追上Producer 的进度。

7 .3 Consumer 的负载均衡

上一节中讲到,想要提高Consumer 的处理速度,可以启动多个Consumer并发处理,这个时候就涉及如何在多个Consumer 之间负载均衡的问题,接下来结合源码分析Consumer 的负载均衡实现。
要做负载均衡,必须知道一些全局信息,也就是一个ConsumerGroup 里到底有多少个Consumer , 知道了全局信息,才可以根据某种算法来分配,比如简单地平均分到各个Consumer。在RocketMQ 中,负载均衡或者消息分配是在Consumer 端代码中完成的, Consumer从Broker 处获得全局信息,然后自己做负载均衡,只处理分给自己的那部分消息。

7.3.1 DefaultMQPushConsumer 的负载均衡

DefaultMQPushConsumer 的负载均衡过程不需要使用者操心,客户端程序会自动处理,每个DefultMQPushConsumer 启动后,会马上会触发一个doRebalance 动作;而且在同一个ConsumerGroup 里加入新的DefaultMQPushConsumer时,各个Consumer 都会被触发doRebalance 动作。
如图7-2 所示,具体的负载均衡算法有五种,默认用的是第一种AllocateMessageQueueAveragely 。负载均衡的结果与Topic 的Message Queue数量,以及ConsumerGroup 里的Consumer 的数量有关。负载均衡的分配粒度只到Message Queue ,把Topic 下的所有Message Queue 分配到不同的Consumer中,所以Message Queue 和Consumer 的数量关系,或者整除关系影响负载均衡结果。

图7 - 2 RocketMQ 客户端负载均衡策略

以AllocateMessageQueueAveragely 策略为例,如果创建Topic 的时候,把Message Queue 数设为3 , 当Consumer 数量为2 的时候,有一个Consumer 需要处理Topic 三分之二的消息,另一个处理三分之一的消息;当Consumer 数量为4 的时候,有一个Consumer 无法收到消息,其他3 个Consumer 各处理Topic 三分之一的消息。可见Message Queue 数量设置过小不利于做负载均衡,通常情况下,应把一个Topic 的Message Queue 数设置为16 。

7.3.2 DefaultMQPullConsumer 的负载均衡

Pull Consumer 可以看到所有的Message Queue,而且从哪个Message Queue 读取消息,读消息时的Offset 都由使用者控制,使用者可以实现任何特殊方式的负载均衡。
DefaultMQPullConsumer 有两个辅助方法可以帮助实现负载均衡,一个是registerMessageQueueListener 函数,registerMessageQueueListener 函数在有新的Consumer 加人或退出时被触发。另一个辅助工具是MQPullConsumerScheduleService 类,使用这个Class类似使用DefaultMQPush Consumer ,但是它把Pull 消息的主动性留给了使用者。

7.4 提高Producer 的发送速度

发送一条消息出去要经过三步,一是客户端发送请求到服务器,二是服务器处理该请求, 三是服务器向客户端返回应答, 一次消息的发送耗时是上述三个步骤的总和。在一些对速度要求高,但是可靠性要求不高的场景下,比如日志收集类应用,可以采用Oneway 方式发送, Oneway 方式只发送请求不等待应答,即将数据写人客户端的Socket 缓冲区就返回,不等待对方返回结果,用这种方式发送消息的耗时可以缩短到微秒级。

另一种提高发送速度的方法是增加Producer 的并发量,使用多个Producer同时发送,我们不用担心多Producer 同时写会降低消息写磁盘的效率,RocketMQ 引人了一个并发窗口,在窗口内消息可以并发地写人DirectMem 中,然后异步地将连续一段无空洞的数据刷人文件系统当中。顺序写CommitLog 可让RocketMQ 无论在HDD 还是SSD 磁盘情况下都能保持较高的写人性能。目前在阿里内部经过调优的服务器上,写人性能达到90万+的TPS ,我们可以参考这个数据进行系统优化。

在Linux 操作系统层级进行调优,推荐使用EXT4 文件系统,IO 调度算法使用deadline 算法。

IO 调度算法也推荐调整为deadline 。deadline 算法大致思想如下:实现四个队列,其中两个处理正常的read 和write 操作,另外两个处理超时的read 和write 操作。正常的read 和write 队列中,元素按扇区号排序,进行正常的IO 合并处理以提高吞吐量。因为IO 请求可能会集中在某些磁盘位置,这样会导致新来的请求一直被合并,可能会有其他磁盘位置的IO 请求被饿死。超时的read 和write 的队列中,元素按请求创建时间排序,如果有超时的请求出现,就放进这两个队列,调度算法保证超时(达到最终期限时间)的队列中的IO 请求会优先被处理。

7.5 系统性能调优的一般流程

这里讨论的系统是指能完成某项功能的软硬件整体,比如我们用RocketMQ ,加上自己写的Producer 、Consumer 程序,部署到一台服务器上,组成一个消息处理系统。本节介绍对这类系统进行调优的基本流程,供读者参考。
首先是搭建测试环境, 查看硬件利用率。把测试系统搭建好以后,要想办法模拟实际使用时的情况,并且逐步增大请求量,同时检测系统的TPS 。在请求量增大到一定程度时·,系统的QPS 达到峰值,这个时候维持这种请求量,保持系统在峰值状态下运行。然后查看此时系统的硬件使用情况。

7.6 本章小结

本章重点关注性能,关注在大消息量的情况下,如何提高RocketMQ 的吞吐量。首先介绍了消息过滤,在服务端进行消息过滤可以减少无效消息传输造成的带宽浪费, Tag 是最常用的一种高效过滤方式,此外还可以用SQL 表达式、FilterServer 来过滤消息。
另一个提高吞吐量的方法是增加集群的机器数量,提高并发性,要根据实际场景增加Broker 、Consumer 或Producer 角色的机器数量。


后面的章节不做介绍了,后面是一堆的源码解析,有兴趣的可以找下对应的书籍。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容