Flink序列化框架分析

1.Flink的TypeInformation类

TypeInformation是flink中所有类型的基类,其作为生产序列化器和比较的一个工具。它包括了类型的一些基本属性,并可以通过它来生产序列化器(serializer),特殊情况下还可以生成类型比较器。(Flink中的比较器不仅仅是定义大小顺序,更是处理keys的基本辅助工具)

  • 基本类型:所有Java基本数据类型和对应的装箱类型,加上void,String,Date,BigDecimal和BigInteger
  • 基本数组和对象数组
  • 复合类型:
    • Flink Java Tuples (Flink Java API的一部分): 最多25个成员,不支持null成员
    • Scala case 类 (包括 Scala tuples): 最多25个成员, 不支持null成员
    • Row: 包含任意多个字段的元组并且支持null成员
    • POJOs: 遵循类bean模式的类
  • 辅助类型 (Option, Either, Lists, Maps, …)
  • 泛型: Flink自身不会序列化泛型,而是借助Kryo进行序列化.

POJO类非常有意思,因为POJO类可以支持复杂类型的创建,并且在定义keys时可以使用成员的名字:dataSet.join(another).where("name").equalTo("personName")。同时,POJO类对于运行时(runtime)是透明的,这使得Flink可以非常高效地处理它们。

1.1 POJO类型的规则

在满足如下条件时,Flink会将这种数据类型识别成POJO类型(并允许以成员名引用字段):

  • 该类是public的并且是独立的(即没有非静态的内部类)
  • 该类有一个public的无参构造方法
  • 该类(及该类的父类)的所有成员要么是public的,要么是拥有按照标准java bean命名规则命名的public getter和 public setter方法。

1.2 创建一个TypeInformation对象或序列化器###

创建一个TypeInformation对象时如下:

在Scala中,Flink使用在编译时运行的宏,在宏可供调用时去捕获所有泛型信息。

// 重要: 为了能够访问'createTypeInformation' 的宏方法,这个import是必须的
import org.apache.flink.streaming.api.scala._

val stringInfo: TypeInformation[String] = createTypeInformation[String]

val tupleInfo: TypeInformation[(String, Double)] = createTypeInformation[(String, Double)]

你也可以在Java使用相同的方法作为备选。

为了创建一个序列化器(TypeSerializer),只需要在TypeInformation 对象上调用typeInfo.createSerializer(config)方法。

config参数的类型是ExecutionConfig,它保留了程序的注册的自定义序列化器的相关信息。在可能用到TypeSerializer的地方,尽量传入程序的ExecutionConfig,你可以调用DataStream 或 DataSet的 getExecutionConfig()方法获取ExecutionConfig。一些内部方法(如:MapFunction)中,你可以通过将该方法变成一个Rich Function,然后调用getRuntimeContext().getExecutionConfig()获取ExecutionConfig.

2 基本类型实现示例

以String为例:

//BasicTypeInfo.java
public static final BasicTypeInfo<String> STRING_TYPE_INFO = new BasicTypeInfo<>(String.class, new Class<?>[]{}, StringSerializer.INSTANCE, StringComparator.class);

StringSerializer如下

//StringSerializer.java
public final class StringSerializer extends TypeSerializerSingleton<String> {

    private static final long serialVersionUID = 1L;
    
    public static final StringSerializer INSTANCE = new StringSerializer();
    
    private static final String EMPTY = "";

    @Override
    public boolean isImmutableType() {
        return true;
    }

    @Override
    public String createInstance() {
        return EMPTY;
    }

    @Override
    public String copy(String from) {
        return from;
    }
    
    @Override
    public String copy(String from, String reuse) {
        return from;
    }

    @Override
    public int getLength() {
        return -1;
    }

    @Override
    public void serialize(String record, DataOutputView target) throws IOException {
        StringValue.writeString(record, target);
    }

    @Override
    public String deserialize(DataInputView source) throws IOException {
        return StringValue.readString(source);
    }
    
    @Override
    public String deserialize(String record, DataInputView source) throws IOException {
        return deserialize(source);
    }

    @Override
    public void copy(DataInputView source, DataOutputView target) throws IOException {
        StringValue.copyString(source, target);
    }

    @Override
    public boolean canEqual(Object obj) {
        return obj instanceof StringSerializer;
    }

    @Override
    protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
        return super.isCompatibleSerializationFormatIdentifier(identifier)
                || identifier.equals(StringValue.class.getCanonicalName());
    }
}

上面代码中出现的StringValue是真正进行input以及output序列化过程操作,基本类型都有相应的方法,后面会单独说明下多字段Record序列化形式。
StringComparator如下

public final class StringComparator extends BasicTypeComparator<String> {

    private static final long serialVersionUID = 1L;
    
    private static final int HIGH_BIT = 0x1 << 7;
    
    private static final int HIGH_BIT2 = 0x1 << 13;
    
    private static final int HIGH_BIT2_MASK = 0x3 << 6;
    
    
    public StringComparator(boolean ascending) {
        super(ascending);
    }

    @Override
    public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
        String s1 = StringValue.readString(firstSource);
        String s2 = StringValue.readString(secondSource);
        int comp = s1.compareTo(s2); 
        return ascendingComparison ? comp : -comp;
    }


    @Override
    public boolean supportsNormalizedKey() {
        return true;
    }


    @Override
    public boolean supportsSerializationWithKeyNormalization() {
        return false;
    }

    @Override
    public int getNormalizeKeyLen() {
        return Integer.MAX_VALUE;
    }

    @Override
    public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
        return true;
    }


    @Override
    public void putNormalizedKey(String record, MemorySegment target, int offset, int len) {;
        final int limit = offset + len;
        final int end = record.length();
        int pos = 0;
        
        while (pos < end && offset < limit) {
            char c = record.charAt(pos++);
            if (c < HIGH_BIT) {
                target.put(offset++, (byte) c);
            }
            else if (c < HIGH_BIT2) {
                target.put(offset++, (byte) ((c >>> 7) | HIGH_BIT));
                if (offset < limit) {
                    target.put(offset++, (byte) c);
                }
            }
            else {
                target.put(offset++, (byte) ((c >>> 10) | HIGH_BIT2_MASK));
                if (offset < limit) {
                    target.put(offset++, (byte) (c >>> 2));
                }
                if (offset < limit) {
                    target.put(offset++, (byte) c);
                }
            }
        }
        while (offset < limit) {
            target.put(offset++, (byte) 0);
        }
    }


    @Override
    public StringComparator duplicate() {
        return new StringComparator(ascendingComparison);
    }
}

3 多字段Record示例

在开始这部分原理分析之前可以先看个示例代码

//RecordTest.java
public void testAddField() {
  try {
    // Add a value to an empty record
    Record record = new Record();
    assertTrue(record.getNumFields() == 0);
    record.addField(this.origVal1);
    assertTrue(record.getNumFields() == 1);
    assertTrue(origVal1.getValue().equals(record.getField(0, StringValue.class).getValue()));
    
    // Add 100 random integers to the record
    record = new Record();
    for (int i = 0; i < 100; i++) {
      IntValue orig = new IntValue(this.rand.nextInt());
      record.addField(orig);
      IntValue rec = record.getField(i, IntValue.class);
      
      assertTrue(record.getNumFields() == i + 1);
      assertTrue(orig.getValue() == rec.getValue());
    }
    
    // Add 3 values of different type to the record
    record = new Record(this.origVal1, this.origVal2);
    record.addField(this.origVal3);
    
    assertTrue(record.getNumFields() == 3);
    
    StringValue recVal1 = record.getField(0, StringValue.class);
    DoubleValue recVal2 = record.getField(1, DoubleValue.class);
    IntValue recVal3 = record.getField(2, IntValue.class);
    
    assertTrue("The value of the first field has changed", recVal1.equals(this.origVal1));
    assertTrue("The value of the second field changed", recVal2.equals(this.origVal2));
    assertTrue("The value of the third field has changed", recVal3.equals(this.origVal3));
  } catch (Throwable t) {
    Assert.fail("Test failed due to an exception: " + t.getMessage());
  }
}

Record代表多个数值的记录,其可以包含多个字段(可空并不体现在该记录中),内部有一个bitmap标记字段是否被赋值。为了数据交换方便,Record中的数据都以bytes方式存储,字段在访问时才被进行反序列化。当字段被修改时首先是放在cache中,并在下次序列化时合入或者显式调用updateBinaryRepresenation()方法。
Notes:

  • 该record必须是一个可变的对象,这样才可以被多个自定义方法使用来提升性能(后面单独分析)。该record是一个比较中的对象,为了减少对每个字段的序列化、反序列化操作,其保存了比较大的状态,需要有多个指针以及数组,从而要占用相对比较大的内存空间,在64位的JVM中要占用超过200bytes。
  • 该类是非线程安全的

4 存放Record的数据结构

针对上面提出的存放数据结构的疑问,这里继续深入分析下。

  • 将record放在一个迭代器中,当前存在一个叫BlockResettableMutableObjectIterator,其包含如下一些方法,读写都是在这个迭代器中进行。
    Record迭代器.png

其中以无参数next()方法为示例走读存储或者读取流程,代码如下:

public T next() throws IOException {
        // check for the left over element
        if (this.readPhase) {
            return getNextRecord();
        } else {
            // writing phase. check for leftover first
            T result = null;
            if (this.leftOverReturned) {
                // get next record
                if ((result = this.input.next()) != null) {
                    if (writeNextRecord(result)) {
                        return result;
                    } else {
                        // did not fit into memory, keep as leftover
                        this.leftOverRecord = this.serializer.copy(result);
                        this.leftOverReturned = false;
                        this.fullWriteBuffer = true;
                        return null;
                    }
                } else {
                    this.noMoreBlocks = true;
                    return null;
                }
            } else if (this.fullWriteBuffer) {
                return null;
            } else {
                this.leftOverReturned = true;
                return this.leftOverRecord;
            }
        }
    }

通过源码可以看出,在方法执行时根据标记判断是读取还是写入流程,同时方法对应getNextRecord和writeNextRecord两个方法,都在抽象类AbstractBlockResettableIterator中,两个方法源码如下:

protected T getNextRecord() throws IOException {
        if (this.numRecordsReturned < this.numRecordsInBuffer) {
            this.numRecordsReturned++;
            return this.serializer.deserialize(this.readView);
        } else {
            return null;
        }
    }
protected boolean writeNextRecord(T record) throws IOException {
        try {
            this.serializer.serialize(record, this.collectingView);
            this.numRecordsInBuffer++;
            return true;
        } catch (EOFException eofex) {
            return false;
        }
    }

其中存放数据是基于Flink内存管理部分进行申请以及维护大小等,相关初始化源码如下:

 memoryManager.allocatePages(ownerTask, emptySegments, numPages);
        
 this.collectingView = new SimpleCollectingOutputView(this.fullSegments, 
                        new ListMemorySegmentSource(this.emptySegments), memoryManager.getPageSize());
 this.readView = new RandomAccessInputView(this.fullSegments, memoryManager.getPageSize());

5 Flink 如何直接操作二进制数据

Flink 提供了如 group、sort、join 等操作,这些操作都需要访问海量数据。这里,我们以sort为例,这是一个在 Flink 中使用非常频繁的操作。
首先,Flink 会从 MemoryManager 中申请一批 MemorySegment,我们把这批 MemorySegment 称作 sort buffer,用来存放排序的数据。

sort示例.png

我们会把 sort buffer 分成两块区域。一个区域是用来存放所有对象完整的二进制数据。另一个区域用来存放指向完整二进制数据的指针以及定长的序列化后的key(key+pointer)。如果需要序列化的key是个变长类型,如String,则会取其前缀序列化。如上图所示,当一个对象要加到 sort buffer 中时,它的二进制数据会被加到第一个区域,指针(可能还有key)会被加到第二个区域。

将实际的数据和指针加定长key分开存放有两个目的。第一,交换定长块(key+pointer)更高效,不用交换真实的数据也不用移动其他key和pointer。第二,这样做是缓存友好的,因为key都是连续存储在内存中的,可以大大减少 cache miss(后面会详细解释)。

排序的关键是比大小和交换。Flink 中,会先用 key 比大小,这样就可以直接用二进制的key比较而不需要反序列化出整个对象。因为key是定长的,所以如果key相同(或者没有提供二进制key),那就必须将真实的二进制数据反序列化出来,然后再做比较。之后,只需要交换key+pointer就可以达到排序的效果,真实的数据不用移动。

sort指针.png

最后,访问排序后的数据,可以沿着排好序的key+pointer区域顺序访问,通过pointer找到对应的真实数据,并写到内存或外部(更多细节可以看这篇文章 Joins in Flink)。

5.1 缓存友好的数据结构和算法

随着磁盘IO和网络IO越来越快,CPU逐渐成为了大数据领域的瓶颈。从 L1/L2/L3 缓存读取数据的速度比从主内存读取数据的速度快好几个量级。通过性能分析可以发现,CPU时间中的很大一部分都是浪费在等待数据从主内存过来上。如果这些数据可以从 L1/L2/L3 缓存过来,那么这些等待时间可以极大地降低,并且所有的算法会因此而受益。

在上面讨论中我们谈到的,Flink 通过定制的序列化框架将算法中需要操作的数据(如sort中的key)连续存储,而完整数据存储在其他地方。因为对于完整的数据来说,key+pointer更容易装进缓存,这大大提高了缓存命中率,从而提高了基础算法的效率。这对于上层应用是完全透明的,可以充分享受缓存友好带来的性能提升。

References

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,948评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,371评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,490评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,521评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,627评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,842评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,997评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,741评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,203评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,534评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,673评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,339评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,955评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,770评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,000评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,394评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,562评论 2 349

推荐阅读更多精彩内容