RocketMQ同步复制性能优化【实战笔记】

目录
一、问题描述
    1.压测日志
    2.客户端日志
二、解决发送失败情况
三、解决发送TPS过低情况
四、原因分析
    1.刷盘流程回顾
    2.主从复制回顾
    3.流程模拟
    4.原因总结
一、问题描述

早些时候写过性能测试和性能优化文章,主要基于异步刷盘/异步复制;由于业务需要需要搭建异步刷盘/同步复制集群;同时对性能进行压测。

压测结果显示集群几乎无法使用,TPS居然是个位数,客户端也在报错。

1.压测日志
日志截图.jpg
2.客户端日志
2019-09-19 19:22:38,038 ERROR RocketmqClient - [BENCHMARK_PRODUCER] Send Exception
org.apache.rocketmq.client.exception.MQBrokerException: CODE: 2  DESC: [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 209ms, size of queue: 9
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
  at org.apache.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:671)
  at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:467)
  at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:449)
  at 
二、解决发送失败情况

经排查,将transientStorePoolEnable关闭(默认为false);压测显示最高TPS有1.9万。

brokerRole=SYNC_MASTER
#transientStorePoolEnable=true
发送情况.jpg
三、解决发送TPS过低情况

最高TPS只有1.9万,依然过低,与预期相差甚远,我们预期压测应该可以到7到8万这样可以满足业务发展需要。再次检查broker端参数配置,没有发现有参数导致性能如此过低。
回顾性能调优的几个方面:系统调优、集群调优、JVM调优。
系统调优与集群调优都已经做过了,唯一没有优化的JVM调优,堆内存设置默认的8G。
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"

将JVM堆内存提高4倍后,压测效果明显提升,基本可以达到预期7万多的TPS。

四、原因分析

1.为什么在异步刷盘/同步复制时开启堆外内存池transientStorePoolEnable后,集群压测几乎无法进行?
2.为什么在异步刷盘/同步复制时调大JVM堆内存后,性能明显提升呢?提升了的倍数几乎是堆内存增大的倍数。

1.刷盘流程回顾

消息追加回顾。参见:https://www.jianshu.com/p/5aa591a72fb4
刷盘回顾。参见:https://www.jianshu.com/p/62f876fda84c

异步刷盘未开启堆外缓存示意图
异步刷盘未开启堆外内存池.jpg
异步刷盘开启堆外缓存示意图
异步刷盘开启堆外内存池.jpg

小结:异步刷盘未开启transientStorePoolEnable时,消息追加到mappedByteBuffer中,异步线程刷调用mappedByteBuffer.force落盘;异步刷盘开启transientStorePoolEnable时,消息写入wrtieBuffer中,异步线程将消息提交到fileChannel,然后异步线程调用fileChannel.force落盘。

2.主从复制回顾

主从复制参见:https://www.jianshu.com/p/8fd0cf5f2ce7

负责向Slave写入数据:HAConnection#WriteSocketService

//查找待拉取偏移量之后所有的可读消息
SelectMappedBufferResult selectResult =                        HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
// ...
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
// ...
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
byteBuffer.position(pos);
int size = readPosition - pos; //计算距离最大可读位置的大小
ByteBuffer byteBufferNew = byteBuffer.slice();
byteBufferNew.limit(size);
return new SelectMappedBufferResult()

小结:主从复制使用mappedByteBuffer向Slave同步数据。

3.流程模拟
开启堆外内存池流程
@Test
public void test01(){
    // 堆外内存池transientStorePoolEnable开启后,消息追加操作
    try {
        File file = new File("/Users/yongliang/logs/temp.log");
        FileChannel fileChannel = new RandomAccessFile(file, "rw").getChannel();
        String data = "beautiful girl!";
        // mmap 文件映射操作
        MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, data.length());
        // 堆外内存transientStorePoolEnable开启
        ByteBuffer byteBuffer = ByteBuffer.allocateDirect(data.length());
        // ----------------消息追加开始-----------------------
        // 注意此时使用堆内内存分配
        ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(data.length());
        msgStoreItemMemory.put(data.getBytes());
        // 开启transientStorePoolEnable消息写入了ByteBuffer
        byteBuffer.put(msgStoreItemMemory.array(),0,data.length());
        // ----------------消息追加结束-----------------------

        // ----------------消息提交开始-----------------------
        byteBuffer.position(0);
        byteBuffer.limit(data.length());
        fileChannel.write(byteBuffer);
        // ----------------消息提交结束-----------------------

        // --------主从复制从mappedByteBuffer获取消息开始----------
        mappedByteBuffer.position(0);
        mappedByteBuffer.limit(data.length());
        Charset charset = Charset.forName("UTF-8");
        CharsetDecoder decoder = charset.newDecoder();
        CharBuffer charBuffer = decoder.decode(mappedByteBuffer.asReadOnlyBuffer());
        System.out.println(charBuffer.toString());
        // --------主从复制从mappedByteBuffer获取消息结束----------
    } catch (Exception e) {
        e.printStackTrace();
    }
}

小结:模拟开启堆外内存池transientStorePoolEnable的消息追加及主从复制流程。

未开启堆外内存池流程
@Test
public void test02(){
    try {
        File file = new File("/Users/yongliang/logs/temp1.log");
        FileChannel fileChannel = new RandomAccessFile(file, "rw").getChannel();
        String data = "beautiful girl!";
        // mmap 文件映射操作
        MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, data.length());

        // ----------------消息追加开始-----------------------
        // 注意消息组装使用堆内内存分配
        ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(data.length());
        msgStoreItemMemory.put(data.getBytes());
        mappedByteBuffer.put(msgStoreItemMemory.array(),0,data.length());
        // ----------------消息追加结束-----------------------

        // --------主从复制从mappedByteBuffer获取消息开始----------
        mappedByteBuffer.position(0);
        mappedByteBuffer.limit(data.length());
        Charset charset = Charset.forName("UTF-8");
        CharsetDecoder decoder = charset.newDecoder();
        CharBuffer charBuffer = decoder.decode(mappedByteBuffer.asReadOnlyBuffer());
        System.out.println(charBuffer.toString());
        // --------主从复制从mappedByteBuffer获取消息结束----------
    } catch (Exception e) {
        e.printStackTrace();
    }
}

小结:模拟未开启堆外内存池transientStorePoolEnable的消息追加及主从复制流程。

4、原因总结

1.为什么在异步刷盘/同步复制时开启堆外内存transientStorePoolEnable后,集群压测几乎无法进行?
解释:
1>主从同步复制使用mappedByteBuffer;
2>开启堆外内存池transientStorePoolEnable后数据先落到WriteBuffer,再通过异步提交线程提交到FileChannel,再通过mmap将数据映射到mappedByteBuffer;
3>未开启堆外内存池transientStorePoolEnable数据直接写入到mappedByteBuffe;
由于开启堆外内存数据映射到mappedByteBuffer比直接写入mappedByteBuffer多了很多步骤,再加上发送队列处理事件默认只有200毫秒(waitTimeMillsInSendQueue=200),造成集群不能正常压测的原因。

2.为什么在异步刷盘/同步复制时调大JVM堆内存后,性能明显提升呢?提升了的倍数几乎是对内存增大的倍数。
解释:
从模拟流程中可以看出,在组装消息时使用堆内存,提高堆内存显著提高写入Tps的原因所在。

// 注意消息组装使用堆内内存分配
ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(data.length());


作者老梁,哈啰出行高级技术专家,参与了《RocketMQ技术内幕》审稿工作。专注后端中间件方向,已陆续发表RocketMQ系列、Kafka系列、gRPC系列、Sentinel系列、Java NIO系列。其中RocketMQ系列已发表40余篇。源码、实战、原理、调优期待与你一起学习。


公众号.jpg

个人微信.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,734评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,931评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,133评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,532评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,585评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,462评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,262评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,153评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,587评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,792评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,919评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,635评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,237评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,855评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,983评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,048评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,864评论 2 354

推荐阅读更多精彩内容