一、Flink TaskManager内存模型

- Task堆上内存:用户代码执行过程中产生的Java对象,如中间数据、算子结果数据、缓存数据等。
- Task堆外内存:用户代码中显式调用内存分配接口创建的内存,比如用户代码直接与文件系统或网络交互时用到的缓冲区(DMA技术,如内存文件映射、内存网络映射等)。
- 托管内存:流处理中,算子状态数据如果存储到RocksDB,表示RocksDB使用的内存。
- 网络缓冲内存:Task之间数据交换时使用的内存(包括线程间的数据交换还是跨网络的数据交换)。
内存模型中,除了托管内存和网络缓冲区内存是为了特殊目的存在,并且需要高处理性能以外,其余内存基本属于JVM进程的标配。托管内存和网络内存缓冲区都使用MemorySegment作为底层数据结构。
二、网络缓冲区相关数据结构
1. MemorySegment
MemorySegment,是Flink管理的内存抽象的最小分配单元。默认情况下,一个MemorySegment对应着一个32KB大小的内存块。这块内存既可以是堆上内存(Java的byte数组),也可以是堆外内存(基于Netty的DirectByteBuffer)。
MemorySegment同时也提供了对二进制数据进行读取和写入的方法。对于Java基本数据类型,如short、int、long等,MemorySegment 内置了方法,可以直接返回或者写入数据,对于其他数据类型,读取二进制数组byte[]后进行反序列化,序列化为二进制数组byte[]后写入。

HeapMemorySegment用来分配堆上内存,HybridMemorySegment用 来分配堆外内存和堆上内存。实际上在2017年之后的Flink中,并没有使用HeapMemorySegment,而是使用HybridMemorySegment这个类来同 时实现堆上和堆外内存的分配
2. Buffer和NetworkBuffer
Task算子处理数据完毕,将结果交给下游的时候,使用的抽象或 者说内存对象是Buffer。Buffer接口是网络层面上传输数据和事件的统一抽象,其实现类是NetworkBuffer。Flink在各个TaskManager之间传递数据时,使用的是这一层的抽象。1个NetworkBuffer中包装了1个 MemorySegment。

3. BufferPool、LocalBufferPool和NetworkBufferPool
Buffer资源池在Flink中叫作BufferPool。BufferPool用来管理Buffer,包含Buffer的申请、释放、销毁、可用Buffer通知等,其实现类是LocalBufferPool,每个Task拥有自己的LocalBufferPool。

为了方便对 BufferPool 的管 理 , Flink 设计了 BufferPoolFactory,提供BufferPool的创建和销毁,其唯一的实现类是NetworkBufferPool。
每个TaskManager只有一个NetworkBufferPool , 同一个TaskManager上的Task共享NetworkBufferPool。
NetworkBufferPool持有该TaskManager在进行数据传递时所能够使用的所有内存,每个Task的 LocalBufferPool 所需要的内存都是从 NetworkBufferPool 申请而来的。
三、网络缓冲区内存的管理
网络缓冲区(NetworkBuffer)是网络交换数据的包装,当结果分区(ResultParition)开始向下游Task发送数据的时候,需要向LocalBufferPool申请Buffer资源,将数据写入MemorySegment。
1. 内存申请
LocalBufferPool的大小是动态的,在最小MemorySegment数量与最大MemorySegment数量之间浮动。使用NetworkBufferPool创建LocalBufferPool时, 如果该TaskManager的内存无法满足所有Task所需的最小 MemorySegment的数量总和,则会发生错误。
(1)Buffer的申请
结果分区(ResultParition)申请Buffer进行数据写入,代码如下:

LocalBufferPool首先从自身持有的MemorySegment中分配可用 的,如果没有可用的,则从TaskManager的NetworkBufferPool中申 请,如果没有,则阻塞等待可用的MemorySegment,代码如下:

(2)MemorySegment的申请
申请Buffer本质上来说就是申请MemorySegment,如果在 LocalBufferPool中,则申请新的堆外内存MemorySegment,代码如下:

2. 内存释放
Buffer使用了引用计数机制来判断什么时候可以释放Buffer到可用资源池。每创建一个BufferConsumer,就会对Buffer的引用计数 +1,每个Buffer被消费完,就会对Buffer的引用计数-1,当Buffer引用计数为0的时候就可以回收了。
(1)Buffer回收
前边介绍过Buffer的主要实现类是NetworkBuffer,同时继承了 AbstractReferenceCountedByteBuf。当Buffer被消费一次后,就会对 Buffer的引用计数-1,代码如下:

Buffer 回收之后 ,并不会释放MemorySegment , 此时 MemorySegment仍然在LocalBufferPool的资源池中,除非TaskManager 级别内存不足,才会释放回TaskManager持有的全局资源池。
释放MemorySegment的时候,同样要根据MemorySegment的类型来进行,并且要在不低于保留内存的情况下,将内存释放回内存段中,变为可用内存,后续申请MemorySegment的时候,可以重复利用该内存片段。
(2)MemorySegment释放
当NetworkBufferPool关闭的时候进行内存的释放,交还给操作系统。