基于Apache Flink的流处理

第1章 状态化流处理概述。

1.1 传统的数据处理框架

事务型处理

企业在日常业务运营过程中会用到各类基于web的应用,通常是业务系统,比如订单、客户系统等等,通常一个应用对于1个或多个数据库,应用通过执行远程数据库系统的事务来读取或更新状态。

分析型处理

存储于不同事务类型数据系统中的数据,可以为企业提供业务运营相关的分析见解,通常是将数据从业务系统的数据库中复制到数仓,然后再进行分析和查询,这个过程称为ETL。

ETL:向数仓拷贝数据的过程,提取-转换-加载(Extract-Transform-Load)。

1.2 历史演变

Lambda架构

Lambda架构是一种用于处理大规模实时数据的架构模式,它结合了批处理和流处理的优势。在Lambda架构中,数据流被同时发送到批处理层和提速层,然后将它们的结果合并以提供全面的分析视图。

Lambda架构的核心组件包括:

  • 批处理层(Batch Layer):批处理层负责处理大规模的历史数据。它使用批处理作业来处理存储在持久化数据存储中的数据,生成批处理视图。这些批处理作业可以使用分布式计算框架(如Hadoop MapReduce或Spark)来执行。
  • 提速层(Speed Layer):速度层负责处理实时数据流。它使用流处理引擎(如Apache Flink、Apache Storm或Spark Streaming)来处理数据流,并生成实时视图。速度层通常采用近似实时的处理方式,以保证低延迟和高吞吐量。
  • 服务层(Serving Layer):服务层负责合并批处理层和速度层的结果,提供一致的查询接口。它将批处理视图和实时视图进行合并,并将结果存储在可查询的数据存储系统中(如HBase或Cassandra)。这样,用户可以通过查询接口实时地获取数据分析结果。

Lambda架构的优势在于它能够处理大规模的实时数据,并提供实时和全面的数据分析视图。批处理层处理历史数据,可以进行复杂的分析和计算,而提速层处理实时数据流,可以提供近似实时的结果。通过将批处理层和提速层的结果合并,Lambda架构能够提供一致和全面的数据分析能力。

1.3 状态化流处理

状态化流处理是一种设计模式:

  • 在状态化流处理中,"状态化"指的是将数据处理过程中的状态显式地管理和维护。传统的流处理模型是无状态的,每个事件都独立地处理,没有记忆或跟踪之前的事件。而在状态化流处理中,系统会维护一个状态,用于存储和更新事件流的上下文信息。
  • 状态化流处理适用于需要考虑事件之间的关系和上下文的场景。通过维护状态,系统可以跟踪和处理事件流中的状态变化,从而更好地理解和处理数据。
  • 状态可以是简单的变量,也可以是更复杂的数据结构,取决于具体的应用场景。状态可以用于聚合、过滤、转换等操作,以及用于实时计算、模式检测、窗口聚合等高级分析。
  • 通过状态化流处理,我们可以实现更复杂和有状态的数据处理逻辑,从而能够处理更丰富的实时数据分析和应用场景。

通过定期将应用状态的一致性检查点写入远程持久化存储实现状态维护,事件日志负责存储事件流并将其分布式化,由于日志是追加形式,故事件的顺序不会因向消费者发布而改变。在出现故障时,Flink以此来进行失败恢复,此外还可应用于应用更新、Bug修复、结果修正、集群迁移或针对不同版本应用执行A/B测试。

1.4 有状态的流处理应用

  • 事件驱动型,例如实时推荐、异常检测、欺诈识别。
  • 数据管道,例如实时计算。
  • 流式分析,例如实时监控系统可以对传感器数据流进行分析,以检测异常情况或趋势变化;实时广告投放系统可以根据用户行为实时调整广告内容。

1.5 Flink特性

  • 提供精确1次( exactly-once )的状态一致性保障。
  • 在每秒处理数百万条事件的同时保持毫秒级延迟。基于Flink应用可以扩展到数千核心之上。
  • 支持高可用性配置(无单点失效), Kubernetes、YARN、Apache Mesos紧密集成,快速故障恢复,动态扩缩容作业等。基于上述特点,它可以 24 小时运行流式应用,几乎无须停机。
  • 允许在不丢失应用状态的前提下更新作业的程序代码,或进行跨Flink集群的作业迁移。
  • 提供了详细、可自由定制的系统及应用指标( metrics )集合,用于提前定位和响应问题。

第2章 流处理基础

2.1 DataFlow图

有向图,每个节点都称为算子,表示计算;边表示数据依赖关系。没有输入端的算子称为数据源,数据源可以从TCP套接字、文件、Kafka主题或传感器数据接口中获取数据,将原始数据转换成适合后续处理的格式;没有输出端的算子称为数据汇,其写入的目标可以是文件、数据库、消息队列或监控接口等。

数据并行:统一操作的多个任务执行在不同的数据子集上。
任务并行:不同算子的任务(基于相同或不同的数据)并行计算。

2.2 数据交换策略

  • 转发策略,发送端和接收端一对一地进行数据传输。
  • 广播策略,每个数据项都会发往下游算子的全部并行任务,该策略会把数据复制多份且涉及网络通信,因此代价比较高。
  • 基于键值的策略,根据某一键的值属性对数据分区,保证键相同的数据项交由同一任务处理。
  • 随机策略,随机均匀分配。

2.3 流处理中的基本概念

延迟和吞吐

延迟指处理一个事件所需的时间,吞吐是衡量系统处理能力(处理速率)的指标。

窗口操作

是指创建一些称为 “ 桶 ” 的有限事件集合将流数据划分为有限大小的时间段或数据块,对桶内的数据进行运算的方式。常见的窗口类型有滑动窗口、会话窗口。

处理时间

当前算子的本地时钟时间。

事件时间

数据流中事件实际发生的时间,它以附加在数据流中事件的时间戳为依据。

水位线

全局进度指标,表示我们确信不会再有延迟事件到来的某个时间点。

状态和一致性模型

有状态算子要考虑到以下几点:

  • 状态管理;
  • 状态划分;
  • 状态恢复。
任务故障

对于流中的每个事件,任务都要执行以下步骤:

  • 接收事件并存在本地缓冲区;
  • 选择性地更新内部状态;
  • 产生输出记录;

其中任一步骤都有可能发生故障,需要结果保障处理。

结果保障
  • 至多一次
  • 至少一次
  • 精确一次
  • 端到端的精确一次

第3章 Apache Flink架构

3.1 Flink组件

Flink各组件之间的交互过程
  • 作业管理器(JobManager):负责申请资源,任务分发,任务调度执行,checkpoint的协调执行;可以搭建HA,双master
  • 资源管理器(ResourceManager):负责管理任务管理器的插槽 slot
  • 任务管理器(TaskManager):负责任务的执行,基于dataflow划分出的task;与jobmanager保持心跳,汇报任务状态
  • 分发器(Dispatcher):为应用提交提供了rest接口;当一个应用被提交执行时,分发器就会启动并且将应用移交给一个jobmanager

3.2 应用部署

框架模式
  • Flink 应用打包成 JAR 文件;
  • 通过客户端提交到运行的服务,如:Dispatch、JobManager、YARN;
  • 运行的服务接收 Flink 应用,并确保其执行
库模式
  • Flink 会绑定到一个特定的容器镜像(Docker)中;
  • 镜像中包含着运行 JobManager 以及 ResourceManager 代码;
  • 容器启动后会自动加载 JobManager 和 ResourceManager,并将绑定的作业提交执行;
  • 另一个和作业无关的镜像负责部署 TaskManager;
  • 容器通过镜像启动后会自动运行 TaskManager,TaskManager 向 ResourceManager 注册;
  • 外部资源管理框架好处:负责镜像启动,并在发生故障时候容器能够重启

3.3 任务执行

算子、任务及处理槽
  • 算子并行度是指在并行计算中,一个算子(操作)被分成多个并行任务执行的数量。并行度的大小取决于可用的计算资源、问题的性质以及并行算法的设计。
  • 将任务以切片的形式调度至处理槽中有一个好处: TaskManager 中的多个任务可以在同一进程内高效地执行数据交换而无须访问网络。然而,任务过于集中也会使TaskManager负载变高,继而可能导致性能下降。
  • TaskManager 会在同 JVM 进程内以多线程的方式执行任务。和独立进程相比,线程更加轻量并且通信开销更低,但无法严格地将任务彼此隔离。因此只要有 个任务运行异常,就有可能“杀死” TaskManager 进程,导致它上面运行的所有任务都停止。如果将每个 TaskManager 成只有 个处理槽,则可以限制应用在 TaskManag 级别进行隔离,即每个 TaskManager 只运行单个应用的任务。

3.4 高可用性设置

流式应用通常都会设计成 7 × 24 小时运行,因此对于它很重要的点是:即便内部进程发生故障时也不能终止运行。为了从故障中恢复,系统首先要重启故障进程,随后需要重启应用并恢复其状态。

  • TaskManager 故障
    如果任务管理器发生故障,整个系统的可用处理槽会对应减少。这时作业管理器会向资源管理器申请更多的处理槽,只有申请成功后应用才会重启,因此应用的重启策略决定了任务管理器的故障解决策略。
  • JobManager 故障
    任务管理器用于控制流式应用执行以及保存过程中的元数据,因此任务管理器发生故障将导致流式应用无法继续处理数据。为了解决该问题,Flink提供了高可用模式,支持在原 JobManager 消失的情况下将作业的管理职责及元数据迁移到另一个JobManager。


    Flink高可用模式

    Flink 中的高可用模式是基于 ZooKeeper 来完成的,它在 Flink 中主要用于“领导”选举以及持久且高可用的数据存储。 这种模式下,JobManager 会将 JobGraph 以及全部所需的元数据(例如应用的 JAR 文件)写入远程持久化存储系统中。此外,JobManager 还会将存储位置的路径地址写入 ZooKeeper 的数据存储。
    JobManager 发生故障时,其下应用的所有任务都会自动取消。新接手工作JobManager 会执行以下步骤:
    1. 向 ZooKeeper 请求存储位置,以获取最新检查点在远程存储的状态句柄。
    2. ResourceManager 申请处理槽来继续执行应用
    3. 重启应用并利用最近 次检查点重置任务状态。
    如果是在容器环境(如 Kubernetes )中以库模式部署运行应用,容器编排服务通常会自动重启故障的 JobManager 容器。当运行在 YARN Mesos 上面时, Flink 的其余进程会触发 JobManager 进程重启。Flink 没有针对独立集群模式提供重启故障进程的工具,因此有必要运行一些后备 JobManager 来接管故障进程的工作,对于 TaskManager 也是同样。

3.5 Flink中的数据传输

TaskManager之间的数据传输
  • 在算子处理完数据后,为了不造成太大的网络压力,不会马上发送,会先收集到缓冲区中,以批次形式发送
  • 每个TaskManager都有网络缓冲池(每个缓冲默认 32KB大小),用于不同机器数据传输
  • 如果接收端和发送端位于同一台机器内,序列化先放入缓冲区,缓冲区完毕放到队列中,接收任务获取数据再反序列化
  • 如果发送端和接收端不在同一个机器,放入缓存后,先发送到TaskManager的网络缓冲池中,再进行发送

通过网络连接逐条发送记录不但低效,还会导致很多额外开销。若想充分利用网络连接带宽,就需要对数据进行缓冲。在流处理环境下,缓冲的一个明显缺点是会增加延迟,因为记录首先要收集到缓冲区中而不会立即发送。

基于信用值的流量控制

Flink实现了一个基于信用值的流量控制机制,它的工作原理如下:接收任务会给发送任务授予一定的信用值,其实就是保留一些用来接收它数据的网络缓冲。一旦发送端收到信用通知,就会在信用值所限定的范围 尽可能多传输缓冲数据,并会附带上积压量(已经填满准备传输的网络缓冲数目)大小。接收端使用保留的缓冲来处理收到的数据,同时依据各发送端的积压量信息来计算所有相连的发送端在下一轮的信用优先级。

由于发送端可以在接收端有足够资源时立即传输数据,所以基于信用值量控制有效降低延迟。 此外,信用值的授予是根据各发送端 数据积压量来完成的,因此该机制还能在出现数据倾斜( data skew )时有效分配网络资源。

任务链接
满足任务链接条件的算子流水线

任务链接的前提条件是多个算子必须有相同的并行度且通过本地转发通道相连,Flink 在默认情况下会开启任务链接。

在任务链接模式下,多个算子的函数被“融合”到同个任务中,在同个线程内执行。函数生成的记录只需通过简单方法调用就可以分别发往各自的下游函数,因此函数之间的记录传输基本上不存在序列化及通信开销。

3.6 事件时间处理

在事件时间模式下, Flink 流式应用处理的所有记录都必须包含时间戳和水位线。

一般情况下只要保证流记录的时间戳会随着数据流的前进大致递增即可。

水位线拥有两个基本属性:1、必须单调递增;2、和记录的时间戳存在联系。水位线的意义之一在于它允许应用控制结果的完整性和延迟。

3.7 算子状态

算子状态

算子状态的作用域是某个算子任务,这意味着所有在同一个并行任务之内的记录都能访问到相同的状态。算子状态不能通过其他任务访问,无论该任务是否来自相同算子。
原语有三类:

  • 列表状态
  • 联合列表状态
  • 广播状态
键值分区状态

原语有三类:

  • 单值状态
  • 列表状态
  • 映射状态
状态后端

一个可插拔组件,主要负责两件事:本地状态管理和将状态、以检查点的形式写入远程存储。

3.8 算子扩缩容

键控状态

带有键值分区状态的算子在扩缩容时会根据新的任务数量对键值重新分区。
为了降低状态在不同任务之间迁移的必要成本, Flink 不会对单独的键值实施再分配,而是会把所有键值分为不同的键值组。每个键值组都包含了部分键值, Flink 以此为单位把键值分配给不同任务。

算子状态

列表状态:把所有状态的列表条目收集起来,均匀分配给新的任务
联合列表状态:将状态列表的全部条目广播到全部的任务,由任务决定去留
广播状态:把状态直接拷贝到新的任务上

3.9 Flink检查点算法

基于Chandy-Lamport分布式快照算法来实现。该算法不会暂停整个应用,而是会把任务处理和检查点分离,这样在部分任务持久化状态过程中,其他任务还可以继续执行。
检查点的原理步骤:

  1. 由JobManager向Source数据源任务生成一个新的检查点编号,Source算子接收到信息后,暂停发出记录,利用状态后端触发生成本地状态检查点,状态后端保存完检查点后通知任务,随后任务向JobManager发送确认信息,随后恢复正常工作,然后生成特殊的CheckPoint Barrier记录,以广播的形式发送到下游任务。
  2. 当下游Transform算子接收到新的检查点分割符号,会暂停处理并且缓存当前流的数据,等待接收其他分区的检查点分隔符,所有分隔符到达后,通知状态后端生成检查点,保存通知JobManager后,向下游发送检查点分隔符CheckPoint Barrier后,继续处理数据。
  3. Sink算子接收到分隔符后依次等待分隔符到齐后,生成快照并且写入检查点,向JobManager确认。
  4. 当JobManager确认已接受所有应用任务返回检查点确认消息后,将此次检查点标记为完成。

第4章 设置Apache Flink开发环境

Java语言示例见 https://github.com/streaming-with-flink/examples-java


第5章 DataStream API(1.7版本)

5.1 Hello, Flink!

//针对传感器数据流每5秒计算一次平均温度
public static void main(String[] args) throws Exception {
    
    // 设置执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 在应用中使用事件时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    // 设置生成水位线的时间间隔,系统默认为200毫秒
    env.getConfig().setAutoWatermarkInterval(1000L);

    // 从流式数据源中创建 DataStream<SensorReading>对象
    DataStream<SensorReading> sensorData = env
        // 设置数据源,这些数据流的来惊可以是消息队列或文件,也可以是实时生成的
        .addSource(new SensorSource())
        // 负责分配事件时间所需的时间戳和水位线
        .assignTimestampsAndWatermarks(new SensorTimeAssigner());

    DataStream<SensorReading> avgTemp = sensorData
        // 使用内联 lambda 函数把华氏温度转为摄氏温度
        .map( r -> new SensorReading(r.id, r.timestamp, (r.temperature - 32) * (5.0 / 9.0)))
        // 按照传感器 id 组织数据
        .keyBy(r -> r.id)
        // 将读数按5秒的滚动窗口分组
        .timeWindow(Time.seconds(1))
        // 使用用户自定义函数计算平均温度
        .apply(new TemperatureAverager());

    // 将结果流打印到标准输出
    avgTemp.print();

    /**
     * 以上代码构建执行计划,构建完的计划会被转成 JobGrap 并根据执行环境类型的不同提交至本地或远程的 JobManager,
     * 只有在调用 execute()方法时,系统才会触发程序执行。
     * 如果是提交至远程,除 JobGraph 之外,我们还要同时提供包含应用所需全部类和依赖的 JAR 包。
     */
    env.execute("Compute average sensor temperature");
}

5.2 转换操作

基本转换
  • map
  • filter
  • flatMap
基于KeyedStream的转换
  • keyBy
  • 滚动聚合sum()、min()、max()、minBy()、maxBy()
  • reduce
多流转换
  • union
  • connect、coMap、coFlatMap
  • split、select
分发转换
  • 随机 shuffle
  • 轮流 rebalance
  • 重调 rescale
  • 广播 broadcast
  • 全局 global
  • 自定义 partitionCustom

5.3 富函数

open、close

  • DataStream API 中所有的转换函数都有对应的富函数。
  • 富函数可以在处理第一条数据之前进行初始化操作,获取到一些上下文信息。

第6章 基于时间和窗口的算子

6.1 配置时间特性

在定义分布式流处理应用程序中的时间算子操作之前,我们先了解“时间”的含义,当你指定了一个窗口用于收集每一分钟的bucket中产生的事件时,如何确定每个bucket中具体包含了哪些事件呢?在DataStream API,你可以在创建窗口的时候使用时间特性去告知Flink如何定义时间,时间特性是StreamExecutionEnvironment的一个属性,包括了几种时间类型:

处理时间(Processing Time)

处理时间是指执行相应算子操作的机器的系统时间。当流式程序按处理时间运行时,所有基于时间的操作(如时间窗口)将使用运行相应算子的计算机的系统时钟。每小时处理时间窗口将包括系统时钟指示整小时的时间之间到达特定算子的所有记录。例如,如果应用程序在9:15 am开始运行,则第一个每小时处理时间窗口将包括在9:15 am和10:00 am之间处理的事件,下一个窗口将包括在10:00 am和11:00 am之间处理的事件,以此类推。处理时间是最简单的时间概念,不需要流和机器之间的协调,无需依赖水位线,它提供了最佳的性能和最低的延迟。但是,在分布式和异步环境中,处理时间不能提供结果确定性,因为它容易受到记录到达系统(例如从消息队列写入)的速度以及数据在上下游算子之间的处理速度的影响。

事件时间(Event Time)

Event Time指的是数据流中每个元素或者每个事件自带的时间属性,一般是指事件发生的时间,系统的逻辑时间由水位线去定义。正如我们在“时间戳”章节中了解到的,时间戳要么在进入数据处理管道之前就存在于数据中,要么由源函数生成,在事件时间中,时间的进度取决于数据,而不取决于任何时钟。当水位线声明某个时间间隔内的所有时间戳都已接收到了,事件时间窗口将触发计算。理想情况下,事件时间窗口会产生确定性结果,即使事件发生顺序混乱,窗口结果将不依赖于读取或处理流的速度。

注入时间(Ingest Time)

将源算子操作的处理时间指定为每个接入记录的事件时间戳,并自动生成水位线。它是EventTime和ProcessingTime的混合体。事件的接入时间是它进入流处理器的时间。与事件时间相比,接入时间并没有提供太多的实际价值,因为它不能提供确定的结果,并且具有与事件时间相近的性能。

6.2 分配时间戳和生成水位线

DataStream API提供了TimestampAssigner接口,以便在元素被接入到流应用程序后从元素中提取时间戳。通常,时间戳分配程序是在源函数之后立即调用的,因为大多数分配程序在生成水位线时都对元素的时间戳顺序作了假设性猜想。由于元素通常是并行摄入的,所以任何导致Flink跨并行流分区重新分配元素的操作,都会打乱元素的时间戳顺序,例如并行性更改、KeyBy()或其他引起重新分配的操作。
最好的做法是分配时间戳,并在尽可能靠近源的地方甚至在SourceFunction内生成水位线。根据应用场景,在分配时间戳之前,如果这些操作没有引起元素的重新分配,可以对输入流应用执行过滤或转换操作。
为了确保事件时间操作按预期运行,应该在任何依赖于事件时间的转换之前调用分配器,例如, 在第一个事件时间窗口之前。

周期性水位线分配器

周期性地分配水位线意味着我们指示系统以固定的机器时间间隔检查事件时间的进度,默认时间间隔设置是200毫秒。

//  设置时间间隔为1000毫秒
env.getConfig().setAutoWatermarkInterval(1000L)

实际上,每隔5秒,Flink就会调用AssignerWithPeriodicWatermarks的getCurrentWatermark()方法。如果该方法返回非空值的时间戳大于前一个水位线的时间戳,则生成新的水位线。否则,如果方法返回一个空值,或者返回的水位线的时间戳小于最后发出的水位线的时间戳,则不会生成新水位线。

定点水位线分配器

有时输入流包含特殊的元组或标记,用于指示流的进度。对于这种情况,或者当可以根据输入元素的其他属性定义水位线时,Flink提供了AssignerWithPunctuatedWatermarks接口。它定义了checkAndGetNextWatermark()方法,该方法在extractTimestamp()之后为每个事件调用。该方法可以决定是否生成新的水位线。如果方法返回的非空水位线大于最新发出的水位线,则发出新水位线。

水位线、延迟及完整性问题
  • 现实中没有完美的水位线,需要进行有根据的猜测,假设性设定数据整体之间的延迟,从而在应用程序中生成水位线。需要使用关于源、网络和分区的等因素来估计处理进度和输入记录延迟的上限,也就是对迟到数据的容忍度。估计就意味着有出错的空间,在这种情况下,生成的水位线可能是不准确的,往往会造成不必要数据延迟或应用程序延迟变大。
  • 如果生成松散的水位线(水位线远远落后于处理过的记录的时间戳),则会增加生成结果的延迟,但是可以更大程度上保证了结果完整性。此外,状态的大小通常会增加,因为应用程序需要缓冲更多的数据,直到触发计算为止。在执行计算时,我们基本可以确定所有相关的数据都是可用的。
  • 另一方面,如果你生成了非常紧密的水位线,也就是设置了一个很小的迟到时间,这些水位线可能比一些后续记录的时间戳更大,基于时间的计算可能在所有相关数据到达之前执行,这样做可能会产生不完整或不准确的结果,但是好处是可以降低结果的延迟。
  • 与构建的批处理应用程序不同,在基于所有数据都可用的前提条件下,延迟/结果完整性是权衡流处理应用程序的基本特征,流处理应用程序处理的是接收到的无界数据。水位线是一种功能强大的解决方式,可以根据时间控制应用程序的行为。除了水位线之外,Flink还有许多特性来调整基于时间的操作的确切行为,如process函数和窗口触发器,并提供了处理迟到数据的不同方法。

6.3 处理函数

DataStream API提供了一系列底层转换,即process 函数,这些函数可以访问记录的时间戳和水位线,并注册将来某个特定时间触发的定时器。此外,process函数还支持将记录发送到多个输出流。process函数通常用于构建事件驱动的应用程序,并实现可能不适用于预定义窗口和转换的自定义逻辑。例如,Flink的SQL支持的大多数算子都是基于process函数实现的。

目前,Flink提供八种不同的process函数:ProcessFunction、KeyedProcessFunction、CoProcessFunction、ProcessJoinFunction、BroadcastProcessFunction、KeyedBroadcastProcessFunction、ProcessWindowFunction和ProcessAllWindowFunction。

时间服务和计时器

KeyedProcessFunction是一个非常通用的函数,可以应用于KeyedStream。对流的每个记录调用该函数,并返回零条、一条或多条记录。所有process函数都实现RichFunction接口,提供open()、close()和getRuntimeContext()方法。另外,KeyedProcessFunction还提供了以下两个方法:processElement()、onTimer()

当定时器触发时,将调用onTimer()回调函数。processElement()和onTimer()方法是同步的,以防止对状态的并发访问和操作。

每个key可以有多个定时器(Timers),但每个时间戳只能有一个定时器。默认情况下,KeyedProcessFunction会保留heap上的优先级队列中的所有定时器(Timers)的时间戳。但是,你可以配置RocksDB状态后端来存储定时器(Timer)。定时器与函数的任何其他状态一起被存入检查点。如果应用程序需要从故障中恢复,则在应用程序重新启动时过期的所有处理时间定时器将在应用程序恢复时立即触发。

6.4 窗口算子

定义窗口算子

窗口算子可以应用于key类型流或none-key类型流。key类型窗口上的窗口算子是并行计算的,而非key类型窗口是在单线程处理的。

要创建窗口算子,你需要指定两个窗口组件:

  • 确定输入流的元素如何分组到窗口中的窗口分配器,窗口分配器生成一个WindowedStream(非key类型数据流生成AllWindowedStream)。
  • 应用于WindowedStream(或AllWindowedStream)上,并处理分配给窗口的元素的窗口函数。
内置窗口分配器

Flink的内置窗口分配器创建类型为TimeWindow的窗口。此窗口类型实质上表示两个时间戳之间的时间间隔,左闭右开。此类型窗口包括定义窗口边界、检查窗口是否相交以及合并重叠窗口的方法。常见的内置窗口分配器有以下几种:


滚动窗口分配器将元素放入大小固定且互不重叠的窗口中

渭动窗口分配器将元素放入大小固定且可能重叠的窗口中

会话窗口分配器将元素放入大小不同的活动的非重叠窗口中
在窗口上应用函数

窗口函数定义了对窗口中的数据元素执行的计算逻辑。有两种类型的函数可以用于窗口函数:

  • 增量聚合函数(Incremental aggregation functions):在元素被添加到窗口并保持和更新单个值为窗口状态时直接应用增量聚合函数。这些函数通常非常节省空间,并最终产生聚合值作为结果。ReduceFunction和AggregateFunction都是增量聚合函数。
  • 全量窗口函数(Full window functions):收集窗口的所有元素,并在对所有收集的元素求值时遍历元素列表。全量窗口函数通常需要更多的内存,可以完成比增量聚合函数更复杂的逻辑。ProcessWindowFunction是一个全量窗口函数。
自定义函数
配置了增量聚合及全量窗口函数的窗口算子
触发器

触发器定义何时计算窗口并输出窗口的结果。触发器可以根据特定于时间或特定数据条件(如元素计数或某些接收到的元素值)中的处理情况决定是否触发。例如,当处理时间或水位线超过窗口结束边界的时间戳时,将触发前面讨论的时间窗口的默认触发器。

每次调用触发器时,它都会生成一个TriggerResult来确定应该对窗口执行什么操作。TriggerResult可以取以下值之一:

  • CONTINUE(跳过)
  • FIRE(触发)
  • PURGE(清除)
  • FIRE_AND_PURGE(触发并清除):首先计算窗口(FIRE),然后删除所有状态和元数据(PURGE)。
移除器

在Flink的窗口机制中,移除器是一个可选组件。它可以在窗口函数执行之前或之后从窗口中删除元素。

在将窗口函数应用于窗口内容之前和之后分别调用evictBefore()和evictAfter()方法。这两个方法都有一个Iterable参数(服务于添加到窗口的所有元素)、窗口中的元素数量(大小)参数、窗口对象和一个EvictorContext参数。通过调用可从Iterable获得的Iterator对象上的remove()方法,可以从窗口中删除元素。

6.5 基于时间的双流 Join

基于间隔的Join
基于间隔的关联(Interval Join)

interval join连接来自两个具有公共key的流的事件,这两个流之间的时间戳间隔不超过指定的时间间隔。

 input1
.keyBy(…)
.between(<lower-bound>, <upper-bound>) // 下界和上界定义为负的和正的时间间隔,例如,between(Time.hour(-1), Time.minute(15)).
.process(ProcessJoinFunction) // process pairs of matched events,join事件双方都被传递到ProcessJoinFunction中
基于窗口的Join

两个输入流的元素都被分配到公共窗口,并在窗口完成时联接(或分组)。

6.6 处理迟到数据

DataStream API提供了处理延迟事件的不同选项:

  • 延迟事件可以简单地删除。
  • 延迟事件可以重定向到单独的流。
  • 计算结果可以根据延迟事件进行更新,并且必须输出更新。
重定向迟到事件

延迟事件还可以使用侧输出流特性重定向到另一个DataStream,可以使用常规的接收函数处理或发出延迟事件。根据业务需求,后期数据稍后可以通过定期的回填过程集成到流应用程序的结果中。

基于迟到事件更新结果

延迟事件在它们应该完成的计算之后到达算子。因此,算子输出的结果是不完整或不准确的。另一种策略是重新计算不完整的结果并输出更新,而不是删除或重定向延迟事件。但是,为了能够重新计算和更新结果,需要考虑一些问题。

  • 支持重新计算和更新已输出结果的算子需要在发出第一个结果后保留计算所需的所有状态。但是,由于算子通常不可能永远保留所有状态,所以需要在某个时候清除状态。一旦清除了某个结果的状态,就不能再更新该结果,只能删除或重定向延迟事件。

  • 除了保持状态外,下游算子或跟随算子的外部系统还需要能够处理这些更新。例如,将结果和key值窗口算子的更新写入key值存储的接收器算子可以通过使用upsert写操作用最新更新结果覆盖不准确的结果来实现这一点。对于某些用例,可能还需要区分第一个结果和由于延迟事件而导致的更新。

窗口算子API提供了一个方法来显式声明你期望的迟到元素。在使用事件时间窗口时,可以指定允许迟到的时间。允许迟到的窗口算子不会在水位线通过窗口的结束时间戳后删除窗口及其状态。相反,算子将继续维护包括迟到时间段内的完整窗口。当一个迟到元素在允许的迟到周期内到达时,它就像一个正常到达的元素一样被处理并传递给触发器。当水位线通过窗口的结束时间戳和延迟间隔时,窗口最终被删除,随后的所有迟到元素被丢弃。


第7章 有状态算子和应用

有状态算子及用户函数都是流应用中常见组成部分。事实上,由于数据会随着时间以流式到来,大多数复杂一些的操作都需要存储部分数据或者中间结果,很多Flink内置的DataStream算子、数据源以及数据汇都是有状态的,它们需要对数据记录进行缓冲或者对中间结果以及元数据加以维护。

7.1 实现有状态函数

键值分区状态只能由作用在KeyedStream上面的函数使用,这个可以通过DataStream.keyBy()方法来得到一个KeyedStream。KeyedStream会根据指定键值进行分区并记住键值的定义,作用在KeyedStream上的算子可以访问它的键值定义上下文信息。Flink为键值分区状态提供了多种数据类型,每个类型对应了一种状态结构,用户可以根据自定义函数与状态的交互方式或性能选择不同的状态类型:

  • ValueState[T]:用于保存类型为T的单个值。可用方法有value()、update();
  • ListState[T]:用list结构保存多个类型为T的元素,常用方法get()、add()、addAll()、update()等,但是它不支持删除单个元素,我们可以使用update()方法更新整个列表,使用给定的列表值替换已有值;
  • MapState[K,V]:用于保存一组键到值的映射;
  • ReducingState[T]:提供了和ListState[T]相同的方法(除了addAll、update),但是你需要传递一个聚合函数ReduceFunction用来对存入的数据进行聚合;
  • AggregatingState[I,O]:和ReducingState[T]行为类似,但它使用了更加通用的AggregatingFunction来聚合状态内部的值。

总结

  • 当我们创建一个状态对象时,我们需要利用RichFunction中的RuntimeContext在Flink运行时中注册一个StateDescriptor;
  • 每个状态类型都有自己特定的StateDescriptor,入参中要写入状态名称与类型,ReducingState和AggregatingState的描述符还需要接收一个ReducingFunction或AggregatingFunction对象,以此来对加入的值进行聚合;
  • 状态名称的作用域是整个算子,我们可以通过在函数中注册多个状态描述符来创建多个状态对象;
  • 状态类型可以通过Class或TypeInformation对象指定,因为Flink要为状态创建合适的序列化器,所有类型指定是强制的。
  • 通常情况下,状态引用对象要在RichFunction的open()方法中初始化;
  • 我们一般会将状态引用对象声明为函数类的普通成员变量;
  • 对于函数类得外部传入的参数,我们也是以普通成员变量的方式通过构造函数传入;
  • 状态引用对象只提供用于访问状态的接口而不会存储状态本身,具体保存工作交由状态后端完成。

7.2 为有状态的应用开启故障恢复

启动周期性检查点功能

7.3 确保有状态应用的可维护性

flink利用保存点机制来对应用及其状态进行维护,但是需要初始版本应用的全部有状态算子都指定好两个参数,才可以在未来正常使用,这两个参数是算子唯一标识和最大并行度
算子的唯一标识和最大并行度会被固定在保存点上,不可更改.一旦修改只能丢弃从头开始运行

指定算子唯一标识,uid方法

为使用键值分区状态的算子定义最大并行度

7.4 有状态应用的性能及鲁棒性

选择状态后端
  1. MemoryStateBackend

MemoryStateBackend 是将状态维护在 Java 堆上的一个内部状态后端。键值状态和窗口算子使用哈希表来存储数据(values)和定时器(timers)。当应用程序 checkpoint 时,此后端会在将状态发给 JobManager 之前快照下状态,JobManager 也将状态存储在 Java 堆上。默认情况下,MemoryStateBackend 配置成支持异步快照。异步快照可以避免阻塞数据流的处理,从而避免反压的发生。当然,使用 new MemoryStateBackend(MAX_MEM_STATE_SIZE, false)也可以禁用该特点

默认情况下,每一个状态的大小限制为 5 MB。可以通过 MemoryStateBackend 的构造函数增加这个大小。状态大小受到 akka 帧大小的限制(maxStateSize <= akka.framesize 默认 10 M),所以无论怎么调整状态大小配置,都不能大于 akka 的帧大小。也可以通过 akka.framesize 调整 akka 帧大小。
状态的总大小不能超过 JobManager 的内存。

  1. FsStateBackend

FsStateBackend需要配置的主要是文件系统,如 URL(类型,地址,路径)。

当选择使用 FsStateBackend时,正在进行的数据会被存在TaskManager的内存中。在checkpoint时,此后端会将状态快照写入配置的文件系统和目录的文件中,同时会在JobManager的内存中(在高可用场景下会存在 Zookeeper 中)存储极少的元数据。容量限制上,单 TaskManager 上 State 总量不超过它的内存,总大小不超过配置的文件系统容量。

默认情况下,FsStateBackend 配置成提供异步快照,以避免在状态 checkpoint 时阻塞数据流的处理。该特性可以实例化 FsStateBackend 时传入false的布尔标志来禁用掉,例如:new FsStateBackend(path, false)

  1. RocksDBStateBackend

RocksDBStateBackend 的配置也需要一个文件系统(类型,地址,路径)。

RocksDB 是一种嵌入式的本地数据库。RocksDBStateBackend 将处理中的数据使用 RocksDB 存储在本地磁盘上。在 checkpoint 时,整个 RocksDB 数据库会被存储到配置的文件系统中,或者在超大状态作业时可以将增量的数据存储到配置的文件系统中。同时 Flink 会将极少的元数据存储在 JobManager 的内存中,或者在 Zookeeper 中(对于高可用的情况)。RocksDB 默认也是配置成异步快照的模式。

RocksDB是一个 key/value 的内存存储系统,和其他的 key/value 一样,先将状态放到内存中,如果内存快满时,则写入到磁盘中,但需要注意RocksDB不支持同步的 Checkpoint,构造方法中没有同步快照这个选项。不过RocksDB支持增量的 Checkpoint,也是目前唯一增量 Checkpoint 的 Backend,意味着并不需要把所有 sst 文件上传到 Checkpoint 目录,仅需要上传新生成的 sst 文件即可。它的 Checkpoint 存储在外部文件系统(本地或HDFS),其容量限制只要单个 TaskManager 上 State 总量不超过它的内存+磁盘,单Key最大2G,总大小不超过配置的文件系统容量即可。

7.5 更新有状态应用


第8章 读写外部系统

8.1 应用的一致性保证

Flink的检查点和恢复机制定期的会保存应用程序状态的一致性检查点。在故障的情况下,应用程序的状态将会从最近一次完成的检查点恢复,并继续处理。尽管如此,可以使用检查点来重置应用程序的状态无法完全达到令人满意的一致性保证。相反,source和sink的连接器需要和Flink的检查点和恢复机制进行集成才能提供有意义的一致性保证。

为了给应用程序提供恰好处理一次语义的状态一致性保证,应用程序的source连接器需要能够将source的读位置重置到之前保存的检查点位置。当处理一次检查点时,source操作符将会把source的读位置持久化,并在恢复的时候从这些读位置开始重新读取。支持读位置的检查点的source连接器一般来说是基于文件的存储系统,如:文件流或者Kafka source(检查点会持久化某个正在消费的topic的读偏移量)。如果一个应用程序从一个无法存储和重置读位置的source连接器摄入数据,那么当任务出现故障的时候,数据就会丢失。也就是说我们只能提供at-most-once)的一致性保证。

Fink的检查点和恢复机制和可以重置读位置的source连接器结合使用,可以保证应用程序不会丢失任何数据。尽管如此,应用程序可能会发出两次计算结果,因为从上一次检查点恢复的应用程序所计算的结果将会被重新发送一次(一些结果已经发送出去了,这时任务故障,然后从上一次检查点恢复,这些结果将被重新计算一次然后发送出去)。所以,可重置读位置的source和Flink的恢复机制不足以提供端到端的恰好处理一次语义,即使应用程序的状态是恰好处理一次一致性级别。

一个志在提供端到端恰好处理一次语义一致性的应用程序需要特殊的sink连接器。sink连接器可以在不同的情况下使用两种技术来达到恰好处理一次一致性语义:幂等性写入和事务性写入。

幂等性写入

一个幂等操作无论执行多少次都会返回同样的结果。例如,重复的向hashmap中插入同样的key-value对就是幂等操作,因为头一次插入操作之后所有的插入操作都不会改变这个hashmap,因为hashmap已经包含这个key-value对了。另一方面,append操作就不是幂等操作了,因为多次append同一个元素将会导致列表每次都会添加一个元素。在流处理程序中,幂等写入操作是很有意思的,因为幂等写入操作可以执行多次但不改变结果。所以它们可以在某种程度上缓和Flink检查点机制带来的重播计算结果的效应。

需要注意的是,依赖于幂等性sink来达到exactly-once语义的应用程序,必须保证在从检查点恢复以后,它将会覆盖之前已经写入的结果。例如,一个包含有sink操作的应用在sink到一个key-value存储时必须保证它能够确定的计算出将要更新的key值。同时,从Flink程序sink到的key-value存储中读取数据的应用,在Flink从检查点恢复的过程中,可能会看到不想看到的结果。当重播开始时,之前已经发出的计算结果可能会被更早的结果所覆盖(因为在恢复过程中)。所以,一个消费Flink程序输出数据的应用,可能会观察到时间回退,例如读到了比之前小的计数。也就是说,当流处理程序处于恢复过程中时,流处理程序的结果将处于不稳定的状态,因为一些结果被覆盖掉,而另一些结果还没有被覆盖。一旦重播完成,也就是说应用程序已经通过了之前出故障的点,结果将会继续保持一致性。

事务性写入

第二种实现端到端的恰好处理一次一致性语义的方法基于事务性写入。其思想是只将最近一次成功保存的检查点之前的计算结果写入到外部系统中去。这样就保证了在任务故障的情况下,端到端恰好处理一次语义。应用将被重置到最近一次的检查点,而在这个检查点之后并没有向外部系统发出任何计算结果。通过只有当检查点保存完成以后再写入数据这种方法,事务性的方法将不会遭受幂等性写入所遭受的重播不一致的问题。尽管如此,事务性写入却带来了延迟,因为只有在检查点完成以后,我们才能看到计算结果。

Flink提供了两种构建模块来实现事务性sink连接器:write-ahead-log(WAL,预写式日志)sink和两阶段提交sink。WAL式sink将会把所有计算结果写入到应用程序的状态中,等接到检查点完成的通知,才会将计算结果发送到sink系统。因为sink操作会把数据都缓存在状态后段,所以WAL可以使用在任何外部sink系统上。尽管如此,WAL还是无法提供刀枪不入的恰好处理一次语义的保证,再加上由于要缓存数据带来的状态后段的状态大小的问题,WAL模型并不十分完美。

与之形成对比的,2PC sink需要sink系统提供事务的支持或者可以模拟出事务特性的模块。对于每一个检查点,sink开始一个事务,然后将所有的接收到的数据都添加到事务中,并将这些数据写入到sink系统,但并没有提交(commit)它们。当事务接收到检查点完成的通知时,事务将被commit,数据将被真正的写入sink系统。这项机制主要依赖于一次sink可以在检查点完成之前开始事务,并在应用程序从一次故障中恢复以后再commit的能力。

2PC协议依赖于Flink的检查点机制。检查点屏障是开始一个新的事务的通知,所有操作符自己的检查点成功的通知是它们可以commit的投票,而作业管理器通知一个检查点成功的消息是commit事务的指令。于WAL sink形成对比的是,2PC sinks依赖于sink系统和sink本身的实现可以实现恰好处理一次语义。更多的,2PC sink不断的将数据写入到sink系统中,而WAL写模型就会有之前所述的问题。

不可重置的源 可重置的源
any sink at-most-once at-least-once
幂等性sink at-most-once exactly-once(当从任务失败中恢复时,存在暂时的不一致性)
预写式日志sink at-most-once at-least-once
2PC sink at-most-once exactly-once

第9章 搭建Flink运行流式应用


第10章 Flink和流式应用运维


第11章 还有什么

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,525评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,203评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,862评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,728评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,743评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,590评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,330评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,244评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,693评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,885评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,001评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,723评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,343评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,919评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,042评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,191评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,955评论 2 355

推荐阅读更多精彩内容