Flink Table API&SQL 的流概念

动态表

这部分即将完成,在此之前请参考: 关于动态表介绍的博客
****即将完成的是:****
  Stream->Table
  Table->Stream
  update changed/retraction

时间属性

Flink可以基于不同的时间概念来处理流数据:
  1、处理时间:是指正在执行的操作所对应的物理机的系统时间也称挂钟时间。
  2、事件时间:是指流数据的处理是基于附加在每条记录上的时间戳来进行的,当事件发生是时间戳可以进行编码。
  3、摄入时间:事件进入Flink的时间,可以当做是与事件时间类似。

更多关于Flink时间处理的信息,请参考Event Time和Watermarks

Table程序要求为streaming environment指定一个响应的时间特性:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default

// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // default

// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

基于时间的操作如:Table API和SQL查询中的 window 都需要时间概念及其原始信息,因此,表可以提供一个逻辑时间属性用于指示时间并在表程序中访问对应的时间戳。

时间属性可以是每个表模式中的一部分,它们可以在从DataStream中创建表是就创建,也可以在使用TableSource时就已经预定义。一旦时间属性在开始就定义了的话,它就可以被当做一个表属性来引用并应用于基于时间的操作中。

只要时间属性还未被修改并且仅仅是从查询的一部分转发到另一部分,那么它还是一个有效的时间属性。时间属性跟常规时间戳一样,可以用于计算。一旦一个时间属性被用于计算,它会具体化并成为一个常规时间戳。常规时间戳跟Flink的时间和水印系统没有关系,所以不会被用来做基于时间的操作。

Processing Time

处理时间允许标程序产生一个基于本地主机时间的结果,它是最简单的时间概念,但是不提供决策。他既不要求时间戳提取也不产生水印。

这里有两种方法来定义处理时间属性:

在DataStream 到 Table的转换中定义

处理时间属性是在模式定义时使用.proctime属性来定义的,时间属性只能通过在物理模式的基础上新增一个逻辑字段来扩展,因此,它可以在模式定义的最后来定义。

DataStream<Tuple2<String, String>> stream = ...;

//声明一个额外的逻辑字段来作为processing time属性
Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");

WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
val stream: DataStream[(String, String)] = ...

// 声明一个额外的逻辑字段来作为processing time属性
val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)

val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
使用TableSource来定义

处理时间属性可以通过一个实现了DefinedProctimeAttribute接口的TableSource来定义,逻辑时间属性会被添加到由TableSource返回的类型来定义的物理模式中。

// 定义一个具有处理时间属性的table Source
public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {

  @Override
  public TypeInformation<Row> getReturnType() {
    String[] names = new String[] {"Username" , "Data"};
    TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
    return Types.ROW(names, types);
  }

  @Override
  public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
    // 创建stream 
    DataStream<Row> stream = ...;
    return stream;
  }

  @Override
  public String getProctimeAttribute() {
    // 这个名称的属性会被追加到第三个字段中
    return "UserActionTime";
  }
}

// 注册一个table source
tEnv.registerTableSource("UserActions", new UserActionSource());

WindowedTable windowedTable = tEnv
  .scan("UserActions")
  .window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
// 定义一个具有处理时间属性的table Source
class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {

  override def getReturnType = {
    val names = Array[String]("Username" , "Data")
    val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
    Types.ROW(names, types)
  }

  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    // 创建 stream
    val stream = ...
    stream
  }

  override def getProctimeAttribute = {
    // 这个名称的属性会被追加到第三个字段中 
    "UserActionTime"
  }
}

// 注册 table Source
tEnv.registerTableSource("UserActions", new UserActionSource)

val windowedTable = tEnv
  .scan("UserActions")
  .window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)

Event time

事件时间允许标程序产生基于包含在每条记录中的时间的结果,即使是无序或者晚到事件也会产生一致性结果,它还确保从持久存储器读取记录时表程序的可重播结果。

此外,事件时间允许在批环境和流环境中使用统一的语法,流环境中的时间属性可以是批环境中的常规字段。

为了处理流中无序事件及准时和延迟事件的差异,Flink需要抽取事件的时间戳,确保某些进度准时。

事件时间属性可以在DataStream到Table的转换中或者使用TableSource来定义。

在DataStream到Table的转换中定义

事件时间可以在模式定义中通过.rowtime属性来定义

Timestamp和watermark必须在进行转换的DataStream中就已经指定好

当DataStream转换为Table时有两种方式来定义时间属性:
  1、通过添加一个额外的逻辑属性来扩展物理模式
  2、用一个逻辑属性来替换物理属性(因为在时间戳提取之后就不再需要它)

// Option 1:

// extract timestamp and assign watermarks based on knowledge of the stream
DataStream<Tuple3<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// declare an additional logical field as an event time attribute
Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.rowtime");


// Option 2:

// extract timestamp from first field, and assign watermarks based on knowledge of the stream
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// the first field has been used for timestamp extraction, and is no longer necessary
// replace first field with a logical event time attribute
Table table = tEnv.fromDataStream(stream, "UserActionTime.rowtime, Username, Data");

// Usage:

WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
// Option 1:

// extract timestamp and assign watermarks based on knowledge of the stream
val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...)

// declare an additional logical field as an event time attribute
val table = tEnv.fromDataStream(stream, 'Username, 'Data, 'UserActionTime.rowtime)


// Option 2:

// extract timestamp from first field, and assign watermarks based on knowledge of the stream
val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...)

// the first field has been used for timestamp extraction, and is no longer necessary
// replace first field with a logical event time attribute
val table = tEnv.fromDataStream(stream, 'UserActionTime.rowtime, 'Username, 'Data)

// Usage:

val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
使用TableSource来定义

事件时间可以通过实现了DefineRowtimeAttribute接口的TableSource来定义,这个逻辑时间属性被添加到由TableSource返回的类型定义的物理模式中。

Timestamp和Watermark需要分配给由getDataStream()方法返回的stream中:

// define a table source with a rowtime attribute
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttribute {

  @Override
  public TypeInformation<Row> getReturnType() {
    String[] names = new String[] {"Username" , "Data"};
    TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
    return Types.ROW(names, types);
  }

  @Override
  public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
    // create stream 
    // ...
    // extract timestamp and assign watermarks based on knowledge of the stream
    DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
    return stream;
  }

  @Override
  public String getRowtimeAttribute() {
    // field with this name will be appended as a third field 
    return "UserActionTime";
  }
}

// register the table source
tEnv.registerTableSource("UserActions", new UserActionSource());

WindowedTable windowedTable = tEnv
  .scan("UserActions")
  .window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
// define a table source with a rowtime attribute
class UserActionSource extends StreamTableSource[Row] with DefinedRowtimeAttribute {

  override def getReturnType = {
    val names = Array[String]("Username" , "Data")
    val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
    Types.ROW(names, types)
  }

  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    // create stream 
    // ...
    // extract timestamp and assign watermarks based on knowledge of the stream
    val stream = inputStream.assignTimestampsAndWatermarks(...)
    stream
  }

  override def getRowtimeAttribute = {
    // field with this name will be appended as a third field
    "UserActionTime"
  }
}

// register the table source
tEnv.registerTableSource("UserActions", new UserActionSource)

val windowedTable = tEnv
  .scan("UserActions")
  .window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)

查询配置

在流处理中,计算是不断发生的,并且由很多情况下,需要更新先前发射的结果。有很多方式来执行查询计算和更新发射结果,这些并不会影响查询的语义,但是可能会产生近似结果。

Flink的Table API和SQL接口使用QueryConfig来控制计算、发射的结果以及更新发射结果。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,492评论 6 513
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,048评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,927评论 0 358
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,293评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,309评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,024评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,638评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,546评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,073评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,188评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,321评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,998评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,678评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,186评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,303评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,663评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,330评论 2 358

推荐阅读更多精彩内容