聊聊flink Table Schema的定义

本文主要研究一下flink Table Schema的定义

实例

定义字段及类型

.withSchema(
  new Schema()
    .field("MyField1", Types.INT)     // required: specify the fields of the table (in this order)
    .field("MyField2", Types.STRING)
    .field("MyField3", Types.BOOLEAN)
)
  • 通过field定义字段名及字段类型

定义字段属性

.withSchema(
  new Schema()
    .field("MyField1", Types.SQL_TIMESTAMP)
      .proctime()      // optional: declares this field as a processing-time attribute
    .field("MyField2", Types.SQL_TIMESTAMP)
      .rowtime(...)    // optional: declares this field as a event-time attribute
    .field("MyField3", Types.BOOLEAN)
      .from("mf3")     // optional: original field in the input that is referenced/aliased by this field
)
  • 通过proctime定义processing-time,通过rowtime定义event-time,通过from定义引用或别名

定义Rowtime属性

// Converts an existing LONG or SQL_TIMESTAMP field in the input into the rowtime attribute.
.rowtime(
  new Rowtime()
    .timestampsFromField("ts_field")    // required: original field name in the input
)

// Converts the assigned timestamps from a DataStream API record into the rowtime attribute
// and thus preserves the assigned timestamps from the source.
// This requires a source that assigns timestamps (e.g., Kafka 0.10+).
.rowtime(
  new Rowtime()
    .timestampsFromSource()
)

// Sets a custom timestamp extractor to be used for the rowtime attribute.
// The extractor must extend `org.apache.flink.table.sources.tsextractors.TimestampExtractor`.
.rowtime(
  new Rowtime()
    .timestampsFromExtractor(...)
)
  • 通过timestampsFromField、timestampsFromSource、timestampsFromExtractor定义rowtime

定义watermark strategies

// Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum
// observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp
// are not late.
.rowtime(
  new Rowtime()
    .watermarksPeriodicAscending()
)

// Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.
// Emits watermarks which are the maximum observed timestamp minus the specified delay.
.rowtime(
  new Rowtime()
    .watermarksPeriodicBounded(2000)    // delay in milliseconds
)

// Sets a built-in watermark strategy which indicates the watermarks should be preserved from the
// underlying DataStream API and thus preserves the assigned watermarks from the source.
.rowtime(
  new Rowtime()
    .watermarksFromSource()
)
  • 通过watermarksPeriodicAscending、watermarksPeriodicBounded、watermarksFromSource定义watermark strategies

StreamTableEnvironment.connect

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/StreamTableEnvironment.scala

abstract class StreamTableEnvironment(
    private[flink] val execEnv: StreamExecutionEnvironment,
    config: TableConfig)
  extends TableEnvironment(config) {

  //......

  def connect(connectorDescriptor: ConnectorDescriptor): StreamTableDescriptor = {
    new StreamTableDescriptor(this, connectorDescriptor)
  }

  //......
}
  • StreamTableEnvironment的connect方法创建StreamTableDescriptor

StreamTableDescriptor

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/descriptors/StreamTableDescriptor.scala

class StreamTableDescriptor(
    tableEnv: StreamTableEnvironment,
    connectorDescriptor: ConnectorDescriptor)
  extends ConnectTableDescriptor[StreamTableDescriptor](
    tableEnv,
    connectorDescriptor)
  with StreamableDescriptor[StreamTableDescriptor] {

  //......
}
  • StreamTableDescriptor继承了ConnectTableDescriptor

ConnectTableDescriptor

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/descriptors/ConnectTableDescriptor.scala

abstract class ConnectTableDescriptor[D <: ConnectTableDescriptor[D]](
    private val tableEnv: TableEnvironment,
    private val connectorDescriptor: ConnectorDescriptor)
  extends TableDescriptor
  with SchematicDescriptor[D]
  with RegistrableDescriptor { this: D =>

  private var formatDescriptor: Option[FormatDescriptor] = None
  private var schemaDescriptor: Option[Schema] = None

  /**
    * Searches for the specified table source, configures it accordingly, and registers it as
    * a table under the given name.
    *
    * @param name table name to be registered in the table environment
    */
  override def registerTableSource(name: String): Unit = {
    val tableSource = TableFactoryUtil.findAndCreateTableSource(tableEnv, this)
    tableEnv.registerTableSource(name, tableSource)
  }

  /**
    * Searches for the specified table sink, configures it accordingly, and registers it as
    * a table under the given name.
    *
    * @param name table name to be registered in the table environment
    */
  override def registerTableSink(name: String): Unit = {
    val tableSink = TableFactoryUtil.findAndCreateTableSink(tableEnv, this)
    tableEnv.registerTableSink(name, tableSink)
  }

  /**
    * Searches for the specified table source and sink, configures them accordingly, and registers
    * them as a table under the given name.
    *
    * @param name table name to be registered in the table environment
    */
  override def registerTableSourceAndSink(name: String): Unit = {
    registerTableSource(name)
    registerTableSink(name)
  }

  /**
    * Specifies the format that defines how to read data from a connector.
    */
  override def withFormat(format: FormatDescriptor): D = {
    formatDescriptor = Some(format)
    this
  }

  /**
    * Specifies the resulting table schema.
    */
  override def withSchema(schema: Schema): D = {
    schemaDescriptor = Some(schema)
    this
  }

  // ----------------------------------------------------------------------------------------------

  /**
    * Converts this descriptor into a set of properties.
    */
  override def toProperties: util.Map[String, String] = {
    val properties = new DescriptorProperties()

    // this performs only basic validation
    // more validation can only happen within a factory
    if (connectorDescriptor.isFormatNeeded && formatDescriptor.isEmpty) {
      throw new ValidationException(
        s"The connector '$connectorDescriptor' requires a format description.")
    } else if (!connectorDescriptor.isFormatNeeded && formatDescriptor.isDefined) {
      throw new ValidationException(
        s"The connector '$connectorDescriptor' does not require a format description " +
          s"but '${formatDescriptor.get}' found.")
    }

    properties.putProperties(connectorDescriptor.toProperties)
    formatDescriptor.foreach(d => properties.putProperties(d.toProperties))
    schemaDescriptor.foreach(d => properties.putProperties(d.toProperties))

    properties.asMap()
  }
}
  • ConnectTableDescriptor提供了withSchema方法,返回Schema

Schema

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/descriptors/Schema.scala

class Schema extends Descriptor {

  // maps a field name to a list of properties that describe type, origin, and the time attribute
  private val tableSchema = mutable.LinkedHashMap[String, mutable.LinkedHashMap[String, String]]()

  private var lastField: Option[String] = None

  def schema(schema: TableSchema): Schema = {
    tableSchema.clear()
    lastField = None
    schema.getFieldNames.zip(schema.getFieldTypes).foreach { case (n, t) =>
      field(n, t)
    }
    this
  }

  def field(fieldName: String, fieldType: TypeInformation[_]): Schema = {
    field(fieldName, TypeStringUtils.writeTypeInfo(fieldType))
    this
  }

  def field(fieldName: String, fieldType: String): Schema = {
    if (tableSchema.contains(fieldName)) {
      throw new ValidationException(s"Duplicate field name $fieldName.")
    }

    val fieldProperties = mutable.LinkedHashMap[String, String]()
    fieldProperties += (SCHEMA_TYPE -> fieldType)

    tableSchema += (fieldName -> fieldProperties)

    lastField = Some(fieldName)
    this
  }

  def from(originFieldName: String): Schema = {
    lastField match {
      case None => throw new ValidationException("No field previously defined. Use field() before.")
      case Some(f) =>
        tableSchema(f) += (SCHEMA_FROM -> originFieldName)
        lastField = None
    }
    this
  }

  def proctime(): Schema = {
    lastField match {
      case None => throw new ValidationException("No field defined previously. Use field() before.")
      case Some(f) =>
        tableSchema(f) += (SCHEMA_PROCTIME -> "true")
        lastField = None
    }
    this
  }

  def rowtime(rowtime: Rowtime): Schema = {
    lastField match {
      case None => throw new ValidationException("No field defined previously. Use field() before.")
      case Some(f) =>
        tableSchema(f) ++= rowtime.toProperties.asScala
        lastField = None
    }
    this
  }

  final override def toProperties: util.Map[String, String] = {
    val properties = new DescriptorProperties()
    properties.putIndexedVariableProperties(
      SCHEMA,
      tableSchema.toSeq.map { case (name, props) =>
        (Map(SCHEMA_NAME -> name) ++ props).asJava
      }.asJava
    )
    properties.asMap()
  }
}
  • Schem提供了field、from、proctime、rowtime方法用于定义Schema的相关属性

Rowtime

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/descriptors/Rowtime.scala

class Rowtime extends Descriptor {

  private var timestampExtractor: Option[TimestampExtractor] = None
  private var watermarkStrategy: Option[WatermarkStrategy] = None

  def timestampsFromField(fieldName: String): Rowtime = {
    timestampExtractor = Some(new ExistingField(fieldName))
    this
  }

  def timestampsFromSource(): Rowtime = {
    timestampExtractor = Some(new StreamRecordTimestamp)
    this
  }

  def timestampsFromExtractor(extractor: TimestampExtractor): Rowtime = {
    timestampExtractor = Some(extractor)
    this
  }

  def watermarksPeriodicAscending(): Rowtime = {
    watermarkStrategy = Some(new AscendingTimestamps)
    this
  }

  def watermarksPeriodicBounded(delay: Long): Rowtime = {
    watermarkStrategy = Some(new BoundedOutOfOrderTimestamps(delay))
    this
  }

  def watermarksFromSource(): Rowtime = {
    watermarkStrategy = Some(PreserveWatermarks.INSTANCE)
    this
  }

  def watermarksFromStrategy(strategy: WatermarkStrategy): Rowtime = {
    watermarkStrategy = Some(strategy)
    this
  }

  final override def toProperties: java.util.Map[String, String] = {
    val properties = new DescriptorProperties()

    timestampExtractor.foreach(normalizeTimestampExtractor(_)
      .foreach(e => properties.putString(e._1, e._2)))
    watermarkStrategy.foreach(normalizeWatermarkStrategy(_)
      .foreach(e => properties.putString(e._1, e._2)))

    properties.asMap()
  }
}
  • Rowtime提供了timestampsFromField、timestampsFromSource、timestampsFromExtractor方法用于定义timestamps;提供了watermarksPeriodicAscending、watermarksPeriodicBounded、watermarksFromSource、watermarksFromStrategy方法用于定义watermark strategies

小结

  • StreamTableEnvironment的connect方法创建StreamTableDescriptor;StreamTableDescriptor继承了ConnectTableDescriptor;ConnectTableDescriptor提供了withSchema方法,返回Schema
  • Schem提供了field、from、proctime、rowtime方法用于定义Schema的相关属性;通过proctime定义processing-time,通过rowtime定义event-time,通过from定义引用或别名
  • Rowtime提供了timestampsFromField、timestampsFromSource、timestampsFromExtractor方法用于定义timestamps;提供了watermarksPeriodicAscending、watermarksPeriodicBounded、watermarksFromSource、watermarksFromStrategy方法用于定义watermark strategies

doc

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,002评论 6 509
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,777评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,341评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,085评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,110评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,868评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,528评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,422评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,938评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,067评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,199评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,877评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,540评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,079评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,192评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,514评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,190评论 2 357

推荐阅读更多精彩内容