flink 内存模型 v1.10

  • Flink 内存主要指 TaskManager 运行时提供的内存资源。TaskManager 主要由几个内部组件构成: 负责和 JobManager 等进程通信的 actor 系统,负责在内存不足时将数据溢写到磁盘和读回的 IOManager,还有负责内存管理的 MemoryManager。其中 actor 系统和 MemoryManager 会要求大量的内存。相应地,Flink 将 TaskManager 的运行时内存分为 Network Buffers、MemoryManager 和 Free 三个区域(在 streaming 模式下只存在 Network Buffers 和 Free 两个区域,因为算子不需要缓存一次读入的大量数据)
    注意:
    ytm-表示 Memory per TaskManager Container,也就是taskmanager.memory.process.size的大小
  • Flink Memory :JVM heap, managed memory (managed by Flink) and other direct (or native) memory


    Flink Memory
  • Network Buffers: 一定数量的32KB大小的 buffer,主要用于数据的网络传输。在 TaskManager 启动的时候就会分配。可以通过 taskmanager.memory.network.fraction 来配置。(阅读官方文档配置
    计算出来有多少个MemorySegment
    long numPages = 0L;
        for (long sizeForType : budgetByType.values()) {
            numPages += sizeForType / pageSize;
        }
        return numPages;
  • Memory Manager : 这是一个由 MemoryManager 管理的,由众多MemorySegment组成的超大集合。Flink 中的算法(如 sort/shuffle/join)会向这个内存池申请 MemorySegment,将序列化后的数据存于其中,使用完后释放回内存池。默认情况下,Fraction of Total Flink Memory to be used as Managed Memory, taskmanager.memory.managed.fraction=0.4
  • Remaining JVM heap: 余下的堆内存留给用户代码以及 TaskManager 的数据结构使用的。因为这些数据结构一般都很小,所以基本上这些内存都是给用户代码使用的。从GC的角度来看,可以把这里看成的新生代,也就是说这里主要都是由用户代码生成的短期对象。

network相关参数

  1. taskmanager.memory.network.fraction:用于网络缓冲区的Flink Memory内存的比例(默认值:0.1),
  2. taskmanager.memory.network.min:网络缓冲区的最小内存大小(默认值:64 MB),
  3. taskmanager.memory.network.max:网络缓冲区的最大内存大小(默认值:1GB)

各个区域的功能如下:

  • Network Buffers 区: 网络模块用于网络传输的一组缓存块对象,单个缓存块对象默认是32KB大小。Flink 会根据 TaskManager 的最大内存来计算该区大小,默认范围是64MB至1GB。
  • Memory Manager 区: 用于为算子缓存运行时消息记录的大缓存池(比如 Sort、Join 这类耗费大量内存的操作),消息记录会被序列化之后存进这些缓存块对象。这部分区域默认占最大 heap 内存减去 Network - Buffers 后的70%,单个缓存块同样默认是32KB。
  • Free 区: 除去上述两个区域的内存剩余部分便是 Free heap,这个区域用于存放用户代码所产生的数据结构,比如用户定义的 State。
    目前 Memory Manager 的内存初始化方式有两种: 第一种是启动时即为 Network Buffers 区和 MemoryManager 区分配全部内存,这样 TaskManager 启动过程中会产生一次到多次的 full GC,导致 TaskManager 的启动慢一点,但是节省了后续执行作业时的 GC 时长。第二种方式是采用”懒分配”的方法,在内存紧张时再增量向操作系统申请内存,避免一下吃完所有的内存导致后续的其他操作内存不足,例如流计算作业的 StateBackend 保存在内存的 State 对象。
  • Network Buffers 和 MemoryManager 的存在会贯穿 TaskManager 的整个生命周期。它们管理的 Memory Segment 不断被重用,因此不会被 JVM 回收。经过若干次 GC 之后它们会进入老年代,变成常驻的对象。

flink启动参数 v1.7.2

flink run  -m yarn-cluster -yn 8 -ynm item_related_suggestion_to_es  -yjm 4096 -ytm 2048 -yqu realtime -c  com.zz.StreamingKafkaToEs  item_related_suggestion_to_es-1.0.0-jar-with-dependencies.jar
taskmanager启动参数
2019-09-30 12:41:33,730 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  - --------------------------------------------------------------------------------
2019-09-30 12:41:33,731 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -  Starting YARN TaskExecutor runner (Version: 1.7.2, Rev:ceba8af, Date:11.02.2019 @ 14:17:09 UTC)
2019-09-30 12:41:33,731 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -  OS current user: yarn
2019-09-30 12:41:34,100 WARN  org.apache.hadoop.util.NativeCodeLoader                       - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2019-09-30 12:41:34,192 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -  Current Hadoop/Kerberos user: xxx
2019-09-30 12:41:34,192 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11
2019-09-30 12:41:34,192 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -  Maximum heap size: 1250 MiBytes
2019-09-30 12:41:34,193 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -  JAVA_HOME: /usr
2019-09-30 12:41:34,194 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -  Hadoop version: 2.7.3.2.6.5.0-292
2019-09-30 12:41:34,195 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -  JVM Options:
2019-09-30 12:41:34,195 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -     -Xms1304m
2019-09-30 12:41:34,195 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -     -Xmx1304m
2019-09-30 12:41:34,195 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -     -XX:MaxDirectMemorySize=744m
2019-09-30 12:41:34,195 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -     -Dlog.file=/export10/hadoop/yarn/log/application_1558494256595_143860/container_e57_1558494256595_143860_01_017830/taskmanager.log
2019-09-30 12:41:34,195 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -     -Dlogback.configurationFile=file:./logback.xml
2019-09-30 12:41:34,195 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -     -Dlog4j.configuration=file:./log4j.properties
2019-09-30 12:41:34,195 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -  Program Arguments:
2019-09-30 12:41:34,196 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -     --configDir
2019-09-30 12:41:34,196 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner                  -     .
task manager启动参数刨析

堆内内存1304m+对外内存744m=申请的内存2048m

  • container 内存大小为 2048*0.25 = 512m <600m,所以container最小内存位600m
  • network buffer 内存大小,(2048-600)*0.1 = 144.8m
  • 堆内存大小为(2048-600) - 144.8m(networkbuffer) =1304m
  • 堆外内存大小 2048- 1304= 744m
job manager启动参数
2019-08-09 13:51:45,392 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Starting YarnSessionClusterEntrypoint (Version: 1.7.2, Rev:ceba8af, Date:11.02.2019 @ 14:17:09 UTC)
2019-08-09 13:51:45,392 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  OS current user: yarn
2019-08-09 13:51:45,770 WARN  org.apache.hadoop.util.NativeCodeLoader                       - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2019-08-09 13:51:45,818 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Current Hadoop/Kerberos user: xxx
2019-08-09 13:51:45,818 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11
2019-08-09 13:51:45,818 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Maximum heap size: 2731 MiBytes
2019-08-09 13:51:45,818 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JAVA_HOME: /usr
2019-08-09 13:51:45,820 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Hadoop version: 2.7.3.2.6.5.0-292
2019-08-09 13:51:45,820 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM Options:
2019-08-09 13:51:45,820 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Xmx3072m
2019-08-09 13:51:45,820 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlog.file=/export10/hadoop/yarn/log/application_1558494256595_143860/container_e57_1558494256595_143860_01_000001/jobmanager.log
2019-08-09 13:51:45,820 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlogback.configurationFile=file:logback.xml
2019-08-09 13:51:45,820 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlog4j.configuration=file:log4j.properties
job manager启动参数刨析
  • 默认yarn container也会占用内存
    可以看到, jobmanager启动时, 去掉yarn container占用内存大小, 即为jobmanager大小。 container大小为25%**yjm(4096), 但是不得小于600MB, 所以剩余JM heap大小为(4096- 4096 * 0.25(container内存大小) = 3072m)

flink 定制化的序列化框架

image.png
  • 可以看出这种序列化方式存储密度是相当紧凑的。其中 int 占4字节,double 占8字节,POJO多个一个字节的header,PojoSerializer只负责将header序列化进去,并委托每个字段对应的serializer对字段进行序列化。
  • memory pool 内存池 memorySegment的数据结构,由两部分组成,一部分是存储key+pointer(完整二进制数据的指针以及定长的序列化后的key),第二部分是对象的二进制数据
    如下图:


    image.png
  • 使用内存池管理内存和使用二进制存储数据的的好处:
  1. 避免oom,所有的运行时数据结构和算法只能通过内存池申请内存,保证了其使用的内存大小是固定的,不会因为运行时数据结构和算法而发生OOM。在内存吃紧的情况下,算法(sort/join等)会高效地将一大批内存块写到磁盘,之后再读回来。因此,OutOfMemoryErrors可以有效地被避免
  2. 节省内存空间,Java 对象在存储上有很多额外的消耗,使用二进制可以避免
  3. 高效的二进制操作 & 缓存友好的计算,第一,交换定长块(key+pointer)更高效,不用交换真实的数据也不用移动其他key和pointer。第二,这样做是缓存友好的,因为key都是连续存储在内存中的,可以大大减少 cache miss(cpu读取L1,L2,L3高速缓存速度高于读取主内存速度几个数量级,使用key+pointer极大提高缓存L1,L2,L3命中率)
    注意:Flink 中,排序会先用 key 比大小,这样就可以直接用二进制的key比较而不需要反序列化出整个对象。因为key是定长的,如果key相同(或者没有提供二进制key),那就必须将真实的二进制数据反序列化出来,然后再做比较。之后,只需要交换key+pointer就可以达到排序的效果,真实的数据不用移动
  4. 引入堆外内存,可以避免频繁GC,高效IO操作,堆外内存在写磁盘或网络传输时是 zero-copy,而堆内存的话,至少需要 copy 一次,堆外内存是进程间共享的,即使JVM进程崩溃也不会丢失数据,理论上来说可以做故障恢复(Flink暂时没有)
    注意: 但是强大的东西总是会有其负面的一面,不然为何大家不都用堆外内存呢。
    堆内存的使用、监控、调试都要简单很多。堆外内存意味着更复杂更麻烦。
    Flink 有时需要分配短生命周期的 MemorySegment,这个申请在堆上会更廉价。
    有些操作在堆内存上会快一点点。

    基于 Flink 优秀的设计,实现堆外内存是很方便的。Flink 将原来的 MemorySegment 变成了抽象类,并生成了两个子类。HeapMemorySegment 和 HybridMemorySegment。从字面意思上也很容易理解,前者是用来分配堆内存的,后者是用来分配堆外内存和堆内存的,HeapMemorySegment 的性能要高于HybridMemorySegment,所以二者都保留
    小知识:JIT 即使编译器:在Java编程语言和环境中,即时编译器(JIT compiler,just-in-time compiler)是一个把Java的字节码(包括需要被解释的指令的程序)转换成可以直接发送给处理器的指令的程序。当你写好一个Java程序后,源语言的语句将由Java编译器编译成字节码,而不是编译成与某个特定的处理器硬件平台对应的指令代码(比如,Intel的Pentium微处理器或IBM的System/390处理器)。字节码是可以发送给任何平台并且能在那个平台上运行的独立于平台的代码。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容