内存管理

TaskManager 的内存布局

Flink 内部并非直接将对象存储在堆上,而是将对象序列化到一个个预先分配的 MemorySegment 中。MemorySegment 是一段固定长度的内存(默认32KB),也是 Flink 中最小的内存分配单元。MemorySegment 提供了高效的读写方法,它的底层可以是堆上的 byte[], 也可以是堆外(off-heap)ByteBuffer。可以把 MemorySegment 看作 Java NIO 中的 ByteBuffer,Flink 还实现了 Java 的 java.io.DataOutput 和 java.io.DataInput 接口,分别是 AbstractPagedInputView 和 AbstractPagedOutputView, 可以通过一种逻辑视图的方式来操作连续的多块 MemorySegment。

在 Flink 中,TaskManager 负责任务的实际运行,通常一个 TaskManager 对应一个 JVM 进程(非 MiniCluster 模式)。抛开 JVM 内存模型,单从 TaskManager 内存的主要使用方式来看,TaskManager 的内存主要分为三个部分:

  • Network Buffers:一定数量的 MemorySegment, 主要用于网络传输。在 TaskManager 启动时分配, 通过 NetworkEnvironment 和 NetworkBufferPool 进行管理
  • Managed Memory:由 MemoryManager 管理的一组 MemorySegment 集合, 主要用于 Batch 模式下的 sorting, hashing, 和 cache 等。
  • Remaining JVM heap:余下的堆内存留给 TaskManager 的数据结构以及用户代码处理数据时使用。TaskManager 自身的数据结构并不会占用太多内存,因而主要都是供用户代码使用,用户代码创建的对象通常生命周期都较短

需要注意的是,上面所说的三部分的内存并非都是 JVM 堆上的内存,因为 MemorySegment 底层的内存可以在堆上,也可以在堆外(不由 JVM 管理)。对于 Network Buffers,这一部分内存就是在堆外(off-heap)进行分配的;对于 Managed Memory,这一部分内存可以配置在堆上,也可以配置在堆外。另外还需要注意的一点是,Managed Memory 主要是在 Batch 模式下使用,在 Streaming 模式下这一部分内存并不会预分配,因而空闲出来的内存其实都是可以给用户自定义函数使用的。

通过二进制数据管理对象

我们已经知道,Flink 是通过 MemorySegment 来管理数据对象的,因而对象首先需要被序列化保存到 MemorySegment 中。 Flink 实现了一套自己的序列化框架。这主要是出于以下考虑:首先,比较和操作二进制数据需要准确了解序列化的布局,针对二进制数据的操作来配置序列化的布局可以显著提升性能;其次,对于 Flink 应用而言,它所处理的数据对象类型通常是完全已知的,由于数据集对象的类型固定,对于数据集可以只保存一份对象 Schema 信息,可以进一步节省存储空间。

Flink 可以处理任意的 Java 或 Scala 对象,而不必实现特定的接口。对于 Java 实现的 Flink 程序,Flink 会通过反射框架获取用户自定义函数返回的类型;而对于 Scala 实现的 Flink 程序,则通过 Scala Compiler 分析用户自定义函数返回的类型。每一种数据类型都对应一个 TypeInfomation。

  • BasicTypeInfo: 基本类型(装箱的)或 String 类型
  • BasicArrayTypeInfo: 基本类型数组(装箱的)或 String 数组
  • WritableTypeInfo: 任意 Hadoop Writable 接口的实现类
  • TupleTypeInfo: 任意的 Flink Tuple 类型 (支持Tuple1 to Tuple25)
  • CaseClassTypeInfo: 任意的 Scala CaseClass (包括 Scala tuples)
  • PojoTypeInfo: 任意的 POJO (Java or Scala),Java对象的所有成员变量,要么是 public 修饰符定义,要么有 getter/setter 方法
  • GenericTypeInfo: 任意无法匹配之前几种类型的类

通过 TypeInfomation 可以获取到对应数据类型的序列化器 TypeSerializer。对于 BasicTypeInfo,Flink 提供了对应的序列化器;对于 WritableTypeInfo, Flink 会将序列化和反序列化操作委托给 Hadoop Writable 接口的 write() and readFields();对于 GenericTypeInfo, Flink 默认使用 “”Kyro 进行序列化“”;而 TupleTypeInfo、CaseClassTypeInfo 和 PojoTypeInfo 是一种组合类型,序列化时分别委托给成员的序列化器进行序列化即可。

对于可以用作 key 的数据类型,TypeInfomation 还可以生成 TypeComparator,用来直接在序列化后的二进制数据上进行 compare、hash 等操作

Flink 的类型和序列化系统也可以方便地进行扩展,用户可以提供自定义的序列化器和比较器,具体可以参考 Flink 官方提供的文档 Data Types & Serialization。

在批处理的场景下,诸如 group, sort, 和 join 等操作都需要访问大量的数据。借助于 MemorySegment 并直接操作二进制数据,Flink 可以高效地完成这些操作,避免了频繁地序列化/反序列化,并且这些操作是缓存友好的。具体可以参考 Flink 团队的文章Juggling with Bits and Bytes。

这种基于 MemorySegment 和二进制数据直接管理数据对象的方式可以带来如下好处:

- 保证内存安全:由于分配的 MemorySegment 的数量是固定的,因而可以准确地追踪 MemorySegment 的使用情况。在 Batch 模式下,如果 MemorySegment 资源不足,会将一批 MemorySegment 写入磁盘,需要时再重新读取。这样有效地减少了 OOM 的情况。
  • 减少了 GC 的压力:因为分配的 MemorySegment 是长生命周期的对象,数据都以二进制形式存放,且 MemorySegment 可以回收重用,所以 MemorySegment 会一直保留在老年代不会被 GC;而由用户代码生成的对象基本都是短生命周期的,Minor GC 可以快速回收这部分对象,尽可能减少 Major GC 的频率。此外,MemorySegment 还可以配置为使用堆外内存,进而避免 GC。
  • 节省内存空间:数据对象序列化后以二进制形式保存在 MemorySegment 中,减少了对象存储的开销。
  • 高效的二进制操作和缓存友好的计算:可以直接基于二进制数据进行比较等操作,避免了反复进行序列化于反序列;另外,二进制形式可以把相关的值,以及 hash 值,键值和指针等相邻地放进内存中,这使得数据结构可以对高速缓存更友好。

MemorySegment

前面已经介绍了,MemorySegment 是一段固定长度的内存,也是 Flink 中最小的内存分配单元。在早期版本的实现中,MemorySegment 使用的都是堆上的内存。尽管 Flink 的内存管理机制已经做了很多优化,但是 Flink 团队仍然加入了对堆外内存的支持。主要是考虑到以下几个方面:

启动很大堆内存(100s of GBytes heap memory)的 JVM 需要很长时间,GC 停留时间也会很长(分钟级)。使用堆外内存的话,JVM 只需要分配较少的堆内存(只需要分配 Remaining Heap 那一块)。
堆外内存在写磁盘或网络传输时是可以利用 zero-copy 特性,I/O 和网络传输的效率更高。
堆外内存是进程间共享的,也就是说,即使 JVM 进程崩溃也不会丢失数据。这可以用来做故障恢复。Flink暂时没有利用起这个,不过未来有可能会利用这个特性。

但是使用堆外内存同样存在一些潜在的问题:

堆内存可以很方便地进行监控和分析,相较而言堆外内存则更加难以控制;
Flink 有时可能需要短生命周期的 MemorySegment,在堆上申请开销会更小;
一些操作在堆内存上会更快一些

Flink 将原来的 MemorySegment 变成了抽象类,并提供了两个具体的子类:HeapMemorySegment 和 HybridMemorySegment。前者是用于分配堆内存,后者用来分配堆外内存和堆内存的。

sun.misc包含了低级(native硬件级别的原子操作)、不安全的操作集合。
Java无法直接访问到操作系统底层(如系统硬件等),为此Java使用native方法来扩展Java程序的功能。Unsafe类提供了硬件级别的原子操作,提供了一些绕开JVM的更底层功能,由此提高效率

MemorySegment 的实现:

public abstract class MemorySegment {
    protected final byte[] heapMemory; //堆内存引用
    protected long address; //堆外内存地址

    //基于堆内存创建MemorySegment
    MemorySegment(byte[] buffer, Object owner) {
        if (buffer == null) {
            throw new NullPointerException("buffer");
        }
        this.heapMemory = buffer;
        this.address = BYTE_ARRAY_BASE_OFFSET;
        this.size = buffer.length;
        this.addressLimit = this.address + this.size;
        this.owner = owner;
    }

    //基于堆外内存创建MemorySegment
    MemorySegment(long offHeapAddress, int size, Object owner) {
        if (offHeapAddress <= 0) {
            throw new IllegalArgumentException("negative pointer or size");
        }
        if (offHeapAddress >= Long.MAX_VALUE - Integer.MAX_VALUE) {
            // this is necessary to make sure the collapsed checks are safe against numeric overflows
            throw new IllegalArgumentException("Segment initialized with too large address: " + offHeapAddress
                    + " ; Max allowed address is " + (Long.MAX_VALUE - Integer.MAX_VALUE - 1));
        }
        this.heapMemory = null;
        this.address = offHeapAddress;
        this.addressLimit = this.address + size;
        this.size = size;
        this.owner = owner;
    }

    public boolean isOffHeap() {
        return heapMemory == null;
    }

    public final long getLong(int index) {
        final long pos = address + index;
        if (index >= 0 && pos <= addressLimit - 8) {
            //这是能够在一个实现中同时操作对内存和堆外内存的关键
            return UNSAFE.getLong(heapMemory, pos);
        }
        else if (address > addressLimit) {
            throw new IllegalStateException("segment has been freed");
        }
        else {
            // index is in fact invalid
            throw new IndexOutOfBoundsException();
        }
    }

    //......
}

堆外方式的MemorySegment

public final class HybridMemorySegment extends MemorySegment {
    private final ByteBuffer offHeapBuffer;

    //堆外内存初始化
    HybridMemorySegment(ByteBuffer buffer, Object owner) {
        super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner);
        this.offHeapBuffer = buffer;
    }

    //堆内内存初始化
    HybridMemorySegment(byte[] buffer, Object owner) {
        super(buffer, owner);
        this.offHeapBuffer = null;
    }

    //......
}

堆内方式的MemorySegment

public final class HeapMemorySegment extends MemorySegment {
    private byte[] memory;

    HeapMemorySegment(byte[] memory, Object owner) {
        super(Objects.requireNonNull(memory), owner);
        this.memory = memory;
    }

    //......
}

MemorySegment 的管理

在 TaskManager 的内存布局中我们说过,TaskManager 的内存主要分为三个部分,其中 Network Buffers 和 Managed Memory 都是一组 MemorySegment 的集合。下面就分别介绍下这两块内存是如何管理的。
Buffer 和 NetworkBufferPool

Buffer 接口是对池化的 MemorySegment 的包装,带有引用计数,类似与 Netty 的 ByteBuf。Buffer也使用两个指针分别表示写入的位置和读取的位置。Buffer 的具体实现实现类 NetworkBuffer 继承自 Netty 的 AbstractReferenceCountedByteBuf,这使得它很容易地集成了引用计数和读写指针的功能。同时,在非 Netty 场景下使用时,Buffer 也提供了 java.nio.ByteBuffer 的包装,但需要手动设置读写指针的位置。ReadOnlySlicedNetworkBuffer 则提供了只读模式的 buffer 的包装。

BufferBuilder 和 BufferConsumer 构成了写入和消费 buffer 的通用模式:通过 BufferBuilder 向底层的 MemorySegment 写入数据,再通过 BufferConsumer 生成只读的 Buffer,读取 BufferBuilder 写入的数据。这两个类都不是线程安全的,但可以实现一个线程写入,另一个线程读取的效果。

BufferPool 接口继承了 BufferProvider 和 BufferRecycler 接口,提供了申请以及回收 Buffer 的功能。LocalBufferPool 是 BufferPool 的具体实现,LocalBufferPool 中 Buffer 的数量是可以动态调整的。

BufferPoolFactory 接口是 BufferPool 的工厂,用于创建及销毁 BufferPool。NetworkBufferPool 是 BufferPoolFactory 的具体实现类。所以按照 BufferPoolFactory -> BufferPool -> Buffer 这样的结构进行组织。NetworkBufferPool 在初始化的时候创建一组 MemorySegment,这些 MemorySegment 会在所有的 LocalBufferPool 之间进行均匀分配。

class NetworkBufferPool implements BufferPoolFactory {
    //所有可用的MemorySegment,阻塞队列
    private final ArrayBlockingQueue<MemorySegment> availableMemorySegments;

    public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) {
        this.totalNumberOfMemorySegments = numberOfSegmentsToAllocate;
        this.memorySegmentSize = segmentSize;
        final long sizeInLong = (long) segmentSize;

        try {
            this.availableMemorySegments = new ArrayBlockingQueue<>(numberOfSegmentsToAllocate);
        }
        catch (OutOfMemoryError err) {
            throw new OutOfMemoryError("Could not allocate buffer queue of length "
                    + numberOfSegmentsToAllocate + " - " + err.getMessage());
        }
        try {
            for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
                //NetworkBufferPool 使用的 MemorySegment 全是堆外内存
                availableMemorySegments.add(MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, null));
            }
        }
        catch (OutOfMemoryError err) {
            ......
        }
        .......
    }
}

MemoryManager

MemoryManager 是管理 Managed Memory 的类,这部分主要是在 Batch 模式下使用,在 Streaming 模式下这一块内存不会分配。MemoryManager 主要通过内部接口 MemoryPool 来管理所有的 MemorySegment。Managed Memory 和管理相比于 Network Buffers 的管理更为简单,因为不需要 Buffer 的那一层封装。直接来看下相关

public class MemoryManager {
    //管理所有的 MemorySegment
    private final MemoryPool memoryPool;


    public MemoryManager(long memorySize, int numberOfSlots, int pageSize,
                            MemoryType memoryType, boolean preAllocateMemory) {
        // sanity checks
        ......
        this.memoryType = memoryType;
        this.memorySize = memorySize;
        this.numberOfSlots = numberOfSlots;

        // assign page size and bit utilities
        this.pageSize = pageSize;
        this.roundingMask = ~((long) (pageSize - 1));

        final long numPagesLong = memorySize / pageSize;
        if (numPagesLong > Integer.MAX_VALUE) {
            throw new IllegalArgumentException("The given number of memory bytes (" + memorySize
                    + ") corresponds to more than MAX_INT pages.");
        }
        //所有可用的 MemorySegment 数量
        this.totalNumPages = (int) numPagesLong;
        if (this.totalNumPages < 1) {
            throw new IllegalArgumentException("The given amount of memory amounted to less than one page.");
        }

        this.allocatedSegments = new HashMap<Object, Set<MemorySegment>>();
        this.isPreAllocated = preAllocateMemory;

        this.numNonAllocatedPages = preAllocateMemory ? 0 : this.totalNumPages;
        //是否需要预分配内存,Streaming 不会预分配
        final int memToAllocate = preAllocateMemory ? this.totalNumPages : 0;

        switch (memoryType) {
            case HEAP:
            //堆上
                this.memoryPool = new HybridHeapMemoryPool(memToAllocate, pageSize);
                break;
            case OFF_HEAP:
            //堆外
                if (!preAllocateMemory) {
                    LOG.warn("It is advisable to set 'taskmanager.memory.preallocate' to true when" +
                        " the memory type 'taskmanager.memory.off-heap' is set to true.");
                }
                this.memoryPool = new HybridOffHeapMemoryPool(memToAllocate, pageSize);
                break;
            default:
                throw new IllegalArgumentException("unrecognized memory type: " + memoryType);
        }
    }

    abstract static class MemoryPool {
        abstract int getNumberOfAvailableMemorySegments();
        abstract MemorySegment allocateNewSegment(Object owner);
        abstract MemorySegment requestSegmentFromPool(Object owner);
        abstract void returnSegmentToPool(MemorySegment segment);
        abstract void clear();
    }

    static final class HybridHeapMemoryPool extends MemoryPool {
        private final ArrayDeque<byte[]> availableMemory;

        HybridHeapMemoryPool(int numInitialSegments, int segmentSize) {
            this.availableMemory = new ArrayDeque<>(numInitialSegments);
            this.segmentSize = segmentSize;
            for (int i = 0; i < numInitialSegments; i++) {
                //堆上直接使用byte数组
                this.availableMemory.add(new byte[segmentSize]);
            }
        }
    }

    static final class HybridOffHeapMemoryPool extends MemoryPool {
        private final ArrayDeque<ByteBuffer> availableMemory;

        HybridOffHeapMemoryPool(int numInitialSegments, int segmentSize) {
            this.availableMemory = new ArrayDeque<>(numInitialSegments);
            this.segmentSize = segmentSize;
            //堆外使用 DirectByteBuffer
            for (int i = 0; i < numInitialSegments; i++) {
                this.availableMemory.add(ByteBuffer.allocateDirect(segmentSize));
            }
        }
    }
}

转自
https://blog.jrwang.me/2019/flink-source-code-memory-management/
http://wuchong.me/blog/2016/04/29/flink-internals-memory-manage/

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。