1.同时支持流处理和批处理的计算引擎,只有两种选择:一个 是 Apache Spark,一个是 Apache Flink。Spark 的技术理念是基于批来模拟流的计算,Flink基于流计算来模拟批计算
2.Flink 是一个低延迟、高吞吐、统一的大数据计算引擎,Flink 的计算 平台可以实现毫秒级的延迟情况下,每秒钟处理上亿次的消息或者事件。同时 Flink 提供了一个 Exactly-once 的一致性语义。保证了数据的正确性
3.Flink提供了有状态的计算,支持状态管理,支持强一致性的数据语义,以及支持 Event Time,WaterMark 对消息乱序的处理
4.状态管理
经常要对数据进行统计, 如 Sum、Count、Min、Max,这些值是需要存储的。因为要不断更新,这些值或者变量就可以理 解为一种状态。Flink 提供了内置的状态管理,可以把这些状态存储在 Flink 内部,而不需要把它存储在外部系 统。link 会定期将这些状 态做 Checkpoint 持久化,把 Checkpoint 存储到一个分布式的持久化系统中,比如 HDFS。这样 的话,当 Flink 的任务出现任何故障时,它都会从最近的一次 Checkpoint 将整个流的状态进行 恢复,然后继续运行它的流处理
- 4.1 Flink 是如何做到在 Checkpoint 恢复过程中没有任何数据的丢失和数据的冗余?来保证精准计 算的?
Flink 利用了一套非常经典的 Chandy-Lamport 算法,它的核心思想是把这个流计 算看成一个流式的拓扑,定期从这个拓扑的头部 Source 点开始插入特殊的 Barriers,从上游开 始不断的向下游广播这个 Barriers。每一个节点收到所有的 Barriers,会将 State 做一次 Snapshot, 当每个节点都做完 Snapshot 之后,整个拓扑就算完整的做完了一次 Checkpoint。接下来不管出 现任何故障,都会从最近的 Checkpoint 进行恢复。Flink 利用这套经典的算法,保证了强一致性的语义
image.png
- 4.2 Flink 是如何解决乱序问题的?
Flink 提供了 Event Time 和 WaterMark 的一些先进技术 来解决乱序的问题。使得用户可以有序的处理这个消息
image.png
- 5.Flink在滴滴的应用
1、 轨迹数据:轨迹数据和订单数据往往是业务方特别关心的。同时因为每一个用户在打车以 后,都必须要实时的看到自己的轨迹,所以这些数据有强烈的实时需求。
2、 交易数据:滴滴的交易数据
3、 埋点数据:滴滴各个业务方的埋点数据,包括终端以及后端的所有业务数据,
4、 日志数据:整个的日志系统都有一些特别强烈的实时需求
image.png
-
6.Flink 与 Storm 两个框架对比:
image.png
- 7.IDEA快速搭建Flink
image.png
Flink程序的常规步骤:
1.获取一个执行环境(execution environment)
2.加载/创建初始数据;
3.指定数据相关的转换;
4.指定计算结果的存储位置;
5.触发程序执行
- Flink UML关系图
image.png
- Flink实操
基于两个集群进行的: Flink Session Cluster 以及一个 Kafka 集群。
一个 Flink 集群总是包含一个 JobManager 以及一个或多个 Flink TaskManager。JobManager 负责处理 Job 提交、 Job 监控以及资源管理。Flink TaskManager 运行 worker 进程, 负责实际任务 Tasks 的执行,而这些任务共同组成了一个 Flink Job。 在这篇文章中, 我们会先运行一个 TaskManager,接下来会扩容到多个 TaskManager。 另外,这里我们会专门使用一个 client 容器来提交 Flink Job, 后续还会使用该容器执行一些操作任务。需要注意的是,Flink 集群的运行并不需要依赖 client 容器, 我们这里引入只是为了使用方便。
这里的 Kafka 集群由一个 Zookeeper 服务端和一个 Kafka Broker 组成
image.png
一开始,我们会往 JobManager 提交一个名为 Flink 事件计数 的 Job,此外,我们还创建了两个 Kafka Topic:input 和 output
image.png
该 Job 负责从 input topic 消费点击事件ClickEvent
,每个点击事件都包含一个timestamp
和一个page
属性。 这些事件将按照page
属性进行分组,然后按照每 15s 窗口 windows 进行统计, 最终结果输出到 output topic 中。
总共有 6 种不同的 page 属性,针对特定 page,我们会按照每 15s 产生 1000 个点击事件的速率生成数据。 因此,针对特定 page,该 Flink job 应该能在每个窗口中输出 1000 个该 page 的点击数据
- Flink环境搭建
1>需要在自己的主机上提前安装好 docker (1.12+) 和 docker-compose (2.1+)
2>使用的配置文件位于 flink-playgrounds 仓库中, 首先检出该仓库并构建 docker 镜像:git clone https://github.com/apache/flink-playgrounds.git cd flink-playgrounds/operations-playground docker-compose build
接下来在开始运行之前先在 Docker 主机上创建检查点和保存点目录(这些卷由 jobmanager 和 taskmanager 挂载,如 docker-compose.yaml 中所指定的):
mkdir -p /Users/wudy.yu/flink/tmp/flink-checkpoints-directory/ mkdir -p /Users/wudy.yu/flink/tmp/flink-savepoints-directory
然后启动环境(
docker-compose up -d
):
image.png
接下来你可以执行如下命令来查看正在运行中的 Docker 容器:wudy.yu@wudyyudeMacBook-Pro operations-playground % docker-compose >ps Name Command State >Ports ------------------------------------------------------------------------------------------------------->----------------------- operations-playground_clickevent- /docker-entrypoint.sh java ... Up >6123/tcp, 8081/tcp generator_1 operations-playground_client_1 /docker-entrypoint.sh flin ... Up >6123/tcp, 8081/tcp operations-playground_jobmanager_1 /docker-entrypoint.sh jobm ... >Up 6123/tcp, 0.0.0.0:8081->8081/tcp operations-playground_kafka_1 start-kafka.sh Up >0.0.0.0:9092->9092/tcp, 0.0.0.0:9094->9094/tcp operations-playground_taskmanager_1 /docker-entrypoint.sh task ... >Up 6123/tcp, 8081/tcp operations-playground_zookeeper_1 /bin/sh -c /usr/sbin/sshd ... Up >2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
从上面的信息可以看出 client 容器已成功提交了 Flink Job (Exit 0), 同时包含数据生成器在内的所有集群组件都处于运行中状态 (Up)。
你可以执行如下命令停止 docker 环境:
docker-compose down -v
-
Flink WebUI 界面 #
观察Flink集群首先想到的就是 Flink WebUI 界面:打开浏览器并访问 http://localhost:8081,如果一切正常,你将会在界面上看到一个 TaskManager 和一个处于 “RUNNING” 状态的名为 Click Event Count 的 Job
image.png
-
日志 #
- JobManager
JobManager 日志可以通过 docker-compose 命令进行查看:
docker-compose logs -f jobmanager
JobManager 刚启动完成之时,你会看到很多关于 checkpoint completion (检查点完成)的日志
- TaskManager
TaskManager 日志也可以通过同样的方式进行查看:
docker-compose logs -f taskmanager
TaskManager 刚启动完成之时,你同样会看到很多关于 checkpoint completion (检查点完成)的日志
Flink CLI #
Flink CLI 相关命令可以在 client 容器内进行使用。 比如,想查看 Flink CLI 的
help
命令,可以通过如下方式进行查看:
docker-compose run --no-deps client flink --help
Flink REST API #
Flink REST API 可以通过本机的
localhost:8081
进行访问,也可以在 client 容器中通过jobmanager:8081
进行访问。 比如,通过如下命令可以获取所有正在运行中的 Job:
curl localhost:8081/jobs
wudy.yu@wudyyudeMacBook-Pro ~ % curl localhost:8081/jobs {"jobs": [{"id":"23023e23c9b6539c18bad615e10419dd","status":"RUNNING"}]}%
Kafka Topics #
可以运行如下命令查看 Kafka Topics 中的记录:
//input topic (1000 records/s) docker-compose exec kafka kafka-console-consumer.sh \ --bootstrap-server localhost:9092 --topic input //output topic (24 records/min) docker-compose exec kafka kafka-console-consumer.sh \ --bootstrap-server localhost:9092 --topic output
核心特性探索 #
CLI命令:
docker-compose run --no-deps client flink list
Creating operations-playground_client_run ... done Waiting for response... ------------------ Running/Restarting Jobs ------------------- 21.05.2023 13:04:01 : 23023e23c9b6539c18bad615e10419dd : Click Event >Count (RUNNING) -------------------------------------------------------------- No scheduled jobs.
一旦 Job 提交,Flink 会默认为其生成一个 JobID,后续对该 Job 的 所有操作(无论是通过 CLI 还是 REST API)都需要带上 JobID
Job 失败与恢复 #
在 Job (部分)失败的情况下,Flink 对事件处理依然能够提供精确一次的保障, 在本节中你将会观察到并能够在某种程度上验证这种行为
Step 1: 观察输出 #
如前文所述,事件以特定速率生成,刚好使得每个统计窗口都包含确切的 1000 条记录。 因此,你可以实时查看 output topic 的输出,确定失败恢复后所有的窗口依然输出正确的统计数字, 以此来验证 Flink 在 TaskManager 失败时能够成功恢复,而且不丢失数据、不产生数据重复。
为此,通过控制台命令消费 output topic,保持消费直到 Job 从失败中恢复 :--bootstrap-server localhost:9092 --topic output```
Step 2: 模拟失败 #
为了模拟部分失败故障,你可以 kill 掉一个 TaskManager,这种失败行为在生产环境中就相当于 TaskManager 进程挂掉、TaskManager 机器宕机或者从框架或用户代码中抛出的一个临时异常(例如,由于外部资源暂时不可用)而导致的失败。
docker-compose kill taskmanager
几秒钟后,JobManager 就会感知到 TaskManager 已失联,接下来它会 取消 Job 运行并且立即重新提交该 Job 以进行恢复。 当 Job 重启后,所有的任务都会处于 SCHEDULED 状态,如以下截图中紫色方格所示
image.png
注意:虽然 Job 的所有任务都处于 SCHEDULED 状态,但整个 Job 的状态却显示为 RUNNING
此时,由于 TaskManager 提供的 TaskSlots 资源不够用,Job 的所有任务都不能成功转为 RUNNING 状态,直到有新的 TaskManager 可用。在此之前,该 Job 将经历一个取消和重新提交 不断循环的过程。
与此同时,数据生成器 (data generator) 一直不断地往 input topic 中生成 ClickEvent 事件,在生产环境中也经常出现这种 Job 挂掉但源头还在不断产生数据的情况
Step 3: 失败恢复 #
一旦 TaskManager 重启成功,它将会重新连接到 JobManager。
docker-compose up -d taskmanager
当 TaskManager 注册成功后,JobManager 就会将处于SCHEDULED
状态的所有任务调度到该 TaskManager 的可用 TaskSlots 中运行,此时所有的任务将会从失败前最近一次成功的 checkpoint 进行恢复, 一旦恢复成功,它们的状态将转变为RUNNING
。
接下来该 Job 将快速处理 Kafka input 事件的全部积压(在 Job 中断期间累积的数据), 并以更快的速度(>24 条记录/分钟)产生输出,直到它追上 kafka 的 lag 延迟为止。 此时观察 output topic 输出, 你会看到在每一个时间窗口中都有按page
进行分组的记录,而且计数刚好是 1000。 由于我们使用的是 FlinkKafkaProducer “至少一次"模式,因此你可能会看到一些记录重复输出多次。
注意:在大部分生产环境中都需要一个资源管理器 (Kubernetes、Yarn)对 失败的 Job 进行自动重启
Job 升级与扩容 #
升级 Flink 作业一般都需要两步:第一,使用 Savepoint 优雅地停止 Flink Job。 Savepoint 是整个应用程序状态的一次快照(类似于 checkpoint ),该快照是在一个明确定义的、全局一致的时间点生成的。第二,从 Savepoint 恢复启动待升级的 Flink Job。 在此,“升级”包含如下几种含义:
- 配置升级(比如 Job 并行度修改)
- Job 拓扑升级(比如添加或者删除算子)
- Job 的用户自定义函数升级
在开始升级之前,你可能需要实时查看 Output topic 输出, 以便观察在升级过程中没有数据丢失或损坏--bootstrap-server localhost:9092 --topic output
Step 1: 停止 Job #
要优雅停止 Job,需要使用 JobID 通过 CLI 或 REST API 调用 “stop” 命令。 JobID 可以通过获取所有运行中的 Job 接口或 Flink WebUI 界面获取,拿到 JobID 后就可以继续停止作业了:
CLI命令:
docker-compose run --no-deps client flink stop <job-id>
预期输出Suspending job "<job-id>" with a savepoint. Suspended job "<job-id>" with a savepoint.
Savepoint 已保存在 state.savepoints.dir 指定的路径中,该配置在 flink-conf.yaml 中定义,flink-conf.yaml 挂载在本机的
/Users/wudy.yu/flink/tmp/flink-savepoints-directory/
目录下。 在下一步操作中我们会用到这个 Savepoint 路径,如果我们是通过 REST API 操作的, 那么 Savepoint 路径会随着响应结果一起返回,我们可以直接查看文件系统来确认 Savepoint 保存情况
ls -lia /tmp/flink-savepoints-directory
total 0 13957451 drwxr-xr-x 2 wudy.yu staff 64 5 20 23:04 . 13957440 drwxr-xr-x 6 wudy.yu staff 192 5 20 23:10 ..
Step 2a: 重启 Job (不作任何变更) #
现在你可以从这个 Savepoint 重新启动待升级的 Job,为了简单起见,不对该 Job 作任何变更就直接重启
CLI命令:docker-compose run --no-deps client flink run -s <savepoint-path> \ -d /opt/ClickCountJob.jar \ --bootstrap.servers kafka:9092 --checkpointing --event-time
预期输出
Starting execution of program Job has been submitted with JobID <job-id>
一旦该 Job 再次处于 RUNNING 状态,你将从 output Topic 中看到数据在快速输出, 因为刚启动的 Job 正在处理停止期间积压的大量数据。另外,你还会看到在升级期间 没有产生任何数据丢失:所有窗口都在输出 1000
Step 2b: 重启 Job (修改并行度) #
在从 Savepoint 重启 Job 之前,你还可以通过修改并行度来达到扩容 Job 的目的
docker-compose run --no-deps client flink run -p 3 -s <savepoint-path> \ -d /opt/ClickCountJob.jar \ --bootstrap.servers kafka:9092 --checkpointing --event-time
预期输出:
Starting execution of program Job has been submitted with JobID <job-id>
现在 Job 已重新提交,但由于我们提高了并行度所以导致 TaskSlots 不够用(1 个 TaskSlot 可用,总共需要 3 个),最终 Job 会重启失败。通过如下命令
docker-compose scale taskmanager=2
你可以向 Flink 集群添加第二个 TaskManager(为 Flink 集群提供 2 个 TaskSlots 资源), 它会自动向 JobManager 注册,TaskManager 注册完成后,Job 会再次处于 “RUNNING” 状态。
一旦 Job 再次运行起来,从 output Topic 的输出中你会看到在扩容期间数据依然没有丢失: 所有窗口的计数都正好是 1000
查询 Job 指标 #
可以通过 JobManager 提供的 REST API 来获取系统和用户指标
具体请求方式取决于我们想查询哪类指标,Job 相关的指标分类可通过jobs/<job-id>/metrics
获得,而要想查询某类指标的具体值则可以在请求地址后跟上get
参数
curl "localhost:8081/jobs/<jod-id>/metrics?get=lastCheckpointSize"
wudy.yu@wudyyudeMacBook-Pro ~ % curl >"localhost:8081/jobs/23023e23c9b6539c18bad615e10419dd/metrics?>get=lastCheckpointSize" [{"id":"lastCheckpointSize","value":"5411"}]%
REST API 不仅可以用于查询指标,还可以用于获取正在运行中的 Job 详细信息
curl localhost:8081/jobs/23023e23c9b6539c18bad615e10419dd
{
"jid": "23023e23c9b6539c18bad615e10419dd",
"name": "Click Event Count",
"isStoppable": false,
"state": "RUNNING",
"start-time": 1684674241279,
"end-time": -1,
"duration": 3787413,
"maxParallelism": -1,
"now": 1684678028692,
"timestamps": {
"INITIALIZING": 1684674241279,
"FINISHED": 0,
"FAILING": 0,
"CANCELLING": 0,
"RECONCILING": 0,
"RESTARTING": 1684676632478,
"RUNNING": 1684676633487,
"FAILED": 0,
"CREATED": 1684674243053,
"CANCELED": 0,
"SUSPENDED": 0
},
"vertices": [{
"id": "bc764cd8ddf7a0cff126f51c16239658",
"name": "Source: ClickEvent Source",
"maxParallelism": 128,
"parallelism": 1,
"status": "RUNNING",
"start-time": 1684676634197,
"end-time": -1,
"duration": 1394495,
"tasks": {
"RUNNING": 1,
"CANCELED": 0,
"FAILED": 0,
"DEPLOYING": 0,
"SCHEDULED": 0,
"CANCELING": 0,
"INITIALIZING": 0,
"CREATED": 0,
"FINISHED": 0,
"RECONCILING": 0
},
"metrics": {
"read-bytes": 0,
"read-bytes-complete": true,
"write-bytes": 0,
"write-bytes-complete": true,
"read-records": 0,
"read-records-complete": true,
"write-records": 0,
"write-records-complete": true,
"accumulated-backpressured-time": 0,
"accumulated-idle-time": 1313806,
"accumulated-busy-time": 0.0
}
}, {
"id": "0a448493b4782967b150582570326227",
"name": "ClickEvent Counter",
"maxParallelism": 128,
"parallelism": 1,
"status": "RUNNING",
"start-time": 1684676634249,
"end-time": -1,
"duration": 1394443,
"tasks": {
"RUNNING": 1,
"CANCELED": 0,
"FAILED": 0,
"DEPLOYING": 0,
"SCHEDULED": 0,
"CANCELING": 0,
"INITIALIZING": 0,
"CREATED": 0,
"FINISHED": 0,
"RECONCILING": 0
},
"metrics": {
"read-bytes": 49708,
"read-bytes-complete": true,
"write-bytes": 0,
"write-bytes-complete": true,
"read-records": 0,
"read-records-complete": true,
"write-records": 0,
"write-records-complete": true,
"accumulated-backpressured-time": 0,
"accumulated-idle-time": 1375570,
"accumulated-busy-time": 0.0
}
}, {
"id": "ea632d67b7d595e5b851708ae9ad79d6",
"name": "ClickEventStatistics Sink: Writer",
"maxParallelism": 128,
"parallelism": 1,
"status": "RUNNING",
"start-time": 1684676634295,
"end-time": -1,
"duration": 1394397,
"tasks": {
"RUNNING": 1,
"CANCELED": 0,
"FAILED": 0,
"DEPLOYING": 0,
"SCHEDULED": 0,
"CANCELING": 0,
"INITIALIZING": 0,
"CREATED": 0,
"FINISHED": 0,
"RECONCILING": 0
},
"metrics": {
"read-bytes": 49708,
"read-bytes-complete": true,
"write-bytes": 54936,
"write-bytes-complete": true,
"read-records": 0,
"read-records-complete": true,
"write-records": 0,
"write-records-complete": true,
"accumulated-backpressured-time": 0,
"accumulated-idle-time": 1327184,
"accumulated-busy-time": 0.0
}
}, {
"id": "6d2677a0ecc3fd8df0b72ec675edf8f4",
"name": "ClickEventStatistics Sink: Committer",
"maxParallelism": 128,
"parallelism": 1,
"status": "RUNNING",
"start-time": 1684676634298,
"end-time": -1,
"duration": 1394394,
"tasks": {
"RUNNING": 1,
"CANCELED": 0,
"FAILED": 0,
"DEPLOYING": 0,
"SCHEDULED": 0,
"CANCELING": 0,
"INITIALIZING": 0,
"CREATED": 0,
"FINISHED": 0,
"RECONCILING": 0
},
"metrics": {
"read-bytes": 104644,
"read-bytes-complete": true,
"write-bytes": 0,
"write-bytes-complete": true,
"read-records": 1308,
"read-records-complete": true,
"write-records": 0,
"write-records-complete": true,
"accumulated-backpressured-time": 0,
"accumulated-idle-time": 1380924,
"accumulated-busy-time": 0.0
}
}],
"status-counts": {
"RUNNING": 4,
"CANCELED": 0,
"FAILED": 0,
"DEPLOYING": 0,
"SCHEDULED": 0,
"CANCELING": 0,
"INITIALIZING": 0,
"CREATED": 0,
"FINISHED": 0,
"RECONCILING": 0
},
"plan": {
"jid": "23023e23c9b6539c18bad615e10419dd",
"name": "Click Event Count",
"type": "STREAMING",
"nodes": [{
"id": "6d2677a0ecc3fd8df0b72ec675edf8f4",
"parallelism": 1,
"operator": "",
"operator_strategy": "",
"description": "ClickEventStatistics Sink: Committer<br/>",
"inputs": [{
"num": 0,
"id": "ea632d67b7d595e5b851708ae9ad79d6",
"ship_strategy": "FORWARD",
"exchange": "pipelined_bounded"
}],
"optimizer_properties": {}
}, {
"id": "ea632d67b7d595e5b851708ae9ad79d6",
"parallelism": 1,
"operator": "",
"operator_strategy": "",
"description": "ClickEventStatistics Sink: Writer<br/>",
"inputs": [{
"num": 0,
"id": "0a448493b4782967b150582570326227",
"ship_strategy": "FORWARD",
"exchange": "pipelined_bounded"
}],
"optimizer_properties": {}
}, {
"id": "0a448493b4782967b150582570326227",
"parallelism": 1,
"operator": "",
"operator_strategy": "",
"description": "Window(TumblingEventTimeWindows(15000), EventTimeTrigger, CountingAggregator, ClickEventStatisticsCollector)<br/>",
"inputs": [{
"num": 0,
"id": "bc764cd8ddf7a0cff126f51c16239658",
"ship_strategy": "HASH",
"exchange": "pipelined_bounded"
}],
"optimizer_properties": {}
}, {
"id": "bc764cd8ddf7a0cff126f51c16239658",
"parallelism": 1,
"operator": "",
"operator_strategy": "",
"description": "Source: ClickEvent Source<br/>",
"optimizer_properties": {}
}]
}
} %
请查阅 REST API 参考,该参考上有完整的指标查询接口信息,包括如何查询不同种类的指标(例如 TaskManager 指标)
延伸拓展 #
你可能已经注意到了,Click Event Count 这个 Job 在启动时总是会带上
--checkpointing
和--event-time
两个参数, 如果我们去除这两个参数,那么 Job 的行为也会随之改变。
--checkpointing
参数开启了 checkpoint 配置,checkpoint 是 Flink 容错机制的重要保证。 如果你没有开启 checkpoint,那么在 Job 失败与恢复这一节中,你将会看到数据丢失现象发生。--event-time
参数开启了 Job 的 事件时间 机制,该机制会使用ClickEvent
自带的时间戳进行统计。 如果不指定该参数,Flink 将结合当前机器时间使用事件处理时间进行统计。如此一来,每个窗口计数将不再是准确的 1000 了。
Click Event Count 这个 Job 还有另外一个选项,该选项默认是关闭的,你可以在 client 容器的docker-compose.yaml
文件中添加该选项从而观察该 Job 在反压下的表现,该选项描述如下:--backpressure
将一个额外算子添加到 Job 中,该算子会在偶数分钟内产生严重的反压(比如:10:12 期间,而 10:13 期间不会)。这种现象可以通过多种网络指标观察到,比如:outputQueueLength
和outPoolUsage
指标,通过 WebUI 上的反压监控也可以观察到。