Flink支持的数据类型

  • Flink流应用程序处理的是以数据对象表示的事件流。所以在Flink内部,我们需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送它们;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink需要明确知道应用程序所处理的数据类型。Flink使用类型信息的概念来表示数据类型,并为每个数据类型生成特定的序列化器、反序列化器和比较器。
  • Flink还具有一个类型提取系统,该系统分析函数的输入和返回类型,以自动获取类型信息,从而获得序列化器和反序列化器。但是,在某些情况下,例如lambda函数或泛型类型,需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。
    Flink支持Java和Scala中所有常见数据类型。使用最广泛的类型有以下几种。
package com.atguigu.apiTest

import org.apache.flink.streaming.api.scala._

object TestDataType {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //基础数据类型: Int, Double, Long, String
    val dataStream1: DataStream[Long] = env.fromElements(1L, 2L, 3L, 4L)

    //Java/scala数组(tuples)
    val dataStream2: DataStream[(String, Int)] = env.fromElements(("Adam",17), ("Sarah", 23))

    //Scala样例类
    case class Pepole(name: String, age: Int)
    val dataStream3: DataStream[Product with Serializable] = env.fromElements(Pepole("Linda", 17),("sala", 23))

    //Arrays, Lists, Maps, Enums, 等等

    //Java对象
    val dataStream5: DataStream[Person] = env.fromElements(new Person())

  }

}


java对象类:

package com.atguigu.apiTest;

public class Person {
public String name;
public int age;
public Person() {}
public Person(String name, int age) {
        this.name = name;
        this.age = age;
        }

        }
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容