Storm缺点:
1. 并不能保证exactly-once(精确一次),
2. 能保证低延迟,但不能保证高吞吐(即便是它能够保证的正确性级别高,其开销也相当大)
3. Storm Trident是对Storm的延伸,它的底层流处理引擎就是基于微批处理方法来进行计算的,从而实现了exactly-once语义,
但是在延迟性方面付出了很大的代价.
Spark Streamming缺点:
Spark通过间歇性的批处理作业来模拟流处理,会导致开发和运维相互交错,任何延迟都可能导致不一致
吞吐量高但是延迟也高
解决办法:
实现低延迟和高吞吐想到一种替代方法:将连续时间中的流数据分割成一系列微小的批量作业.
Flink特性
1. sparkStreaming特性(高吞吐,(保证精确一致性)
2. storm (低延迟)
3. 操作简单,时间正确,语义化窗口
4. 高度容错的状态管理:为防止状态在计算过程中因为系统异常而出现丢失,Flink周期性的通过分布式快照技术,
Checkpoints 实现状态的持久化维护,使得在系统停机或者异常情况下都能计算出正确的结果
Flink介绍:
1. 官方描述:Apache Flink是为分布式,高性能,随时可以用以及准确的流处理应用程序打造的开源流处理框架
2. Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态的计算.
3. Flink被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算.
批处理与流处理
1. 批处理特点:
1. 有界,持久,大量,批处理非常适合需要访问全套记录才能完成的计算工作,一般用于离线统计,
2. 流处理特点:
1. 无界,实时,流处理方式无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作,一般用于实时统计.
3. 在spark生态体系中,对批处理和流处理采用了不同的技术框架,批处理sparkSQL实现,流处理由SparkStreaming实现,这也是大部分框架采用的策略
FLink实现:
Flink是可以同时实现批处理和流处理(Flink将批处理(即处理有限的静态数据)视作一种特殊的流处理
它是一个分布式系统,能够接受数据流程序并在一台或多态机器上以容错方式执行,
Flink能够自动地确保在发生机器故障或者其他错误时计算能持续进行,或者在修复Bug或进行版本升级后有计划地再执行一次.
Flink本质上使用容错性数据流,这使得开发人员可以分析持续生成且永远不结束的数据(流处理)
消息处理层和流处理层
概述:
Flink项目的架构主要有两个主要组成部分: 消息传输层和有Flink提供的流处理层
1. 消息处理层: 从各种数据源(生产者)采集连续时间产生的数据,并传输给订阅了这些数据的应用程序和服务(消费者)
2. 流处理层: 3个用途: 1. 持续的将数据在应用程序和系统间移动 2. 聚合并处理事件; 3. 在本地维持应用程序的状态
窗口
概述:
窗口是一种机制,它用于将许多时间按照实际或者其他特征分组,从而将每一组作为整体进行分析.
时间窗口:
概述:
时间窗口是最简单和最有用的一种窗口,它支持滚动和滑动,
1. 一分钟滚动窗口的定义:
stream.timeWindow(Time.minutes(1))
2. 每半分钟滑动一次的一分钟滑动窗口
stream.timeWindow(Time.minutes(1),Time.seconds(30)
计数窗口:
概述:
Flink支持的另一种常见窗口叫做计数窗口,采用计数窗口时,分组依据不再试时间戳,而是元素的数量,
1. 描述:滑动窗口由4个元素组成的计数窗口,并且每两个元素滑动一次,滚动和滑动的计数窗口定义如下:
stream.countWindow(4)
stream.countWindos(4,2)
缺点:
虽然计数窗口有用,但是其定义不如时间窗口谨慎,因此要谨慎使用,时间不会停止,而且时间窗口总会"关闭",
但就计数窗口而言,假设其定义的元素数量为100,而某个key对应的元素永远达不到100个,name窗口就永远不会关闭,
该窗口占用的的内存也就浪费了,
会话窗口:
概述:
会话指的是活动阶段,其前后都是非活动阶段,例如用户与网站进行一系列交互(活动阶段)之后,关闭浏览器或者不在交互(非活动阶段).
会话需要有自己的处理机制,因此他们通常没有固定的持续时间(有数30s就结束,有些则长达1小时),或者没有固定的交互次数,
Flik中实现:
在Flink中,会话窗口由超时时间设定,即希望等待多久才认为会话已经结束
example:
以下代码表示: 如果用户处于非活动状态长达5min,则认为会话结束
stream.window(SessionWindow.withGap(Time.minutes(5))
触发器:
概述:
除了窗口之外,Flink还提供触发机制。触发器控制生成结果的时间,即和时聚合窗口内容并将结果返回给客户。每一个默认窗口都有一个
触发器。
例如:
采用事件时间的时间窗口将在收到水印时被触发。对于用户来说,除了收到水印时生成完整、准确的结果之外,也可以实现自定义的触发器
窗口的实现:
概述:
在Flink内部,所有类型的窗口都由同一种机制实现。虽然实现细节对于普通用户来说并不重新,但是仍然需要注意以下两点:
1. 开窗机制与检查点机制完全分离。这意味着窗口时长不依赖与检查点间隔。事实上,窗口完全可以没有"时长"
2. 高级用户可以直接用基本的开窗机制定义更复杂的窗口形式(如某种时间窗口,它可以基于计数结果或某一条记录的值生成中间结果)
基于JVM实现独立的内存管理
概述:
内存管理是所有计算框架需要重点考虑的部分:
针对内存管理,Flink实现了自身管理内存的机制,尽可能减少JVM GC对系统的影响,另外,Flink通过序列化/
反序列化方法将所有的数据对象转换成二进制在内存中存储,降低数据存储的大小的同时,能够更加有效地对
内存空间进行利用,降低GC带来的性能下降或者任务异常的风险,因此Flink较其他分布式处理的框架会显得
更加稳定,不会因为JVM GC等问题而影响整个应用的运行
水印
概述:
支持事件事件对于流处理架构而言至关重要,因为事件事件能保证结果正确,并使流处理架构拥有重新处理数据的能力,
question:
当计算基于事件时间时,如何判断所有事件是否都到达,以及何时计算和输出窗口的结果呢?换言之,如何追踪事件时间,并知晓输入数据
已经流到某个事件时间了呢? 为了追踪事件时间,需要依靠由数据驱动的时钟,而不是系统时钟
Flink水印的实现:
Flink通过水印来推进事件时间,水印是嵌在流中的常规记录,计算程序通过水印获知某个事件点已到。对于上述一分钟滚动窗口,假设水印时间
为10:01:00(或者其他时间,)那么收到水印的窗口就知道不会再由早于该时间的记录出现,因为所有时间戳小于或等于改时间的时间都已经达到。
此时,窗口可以安全地计算并给出结果(总和)。水印使事件时间与处理时间完全无关。迟到的水印(从处理时间的角度而言),并不会影响结果的
正确性,而只会影响受到结果的速度.
水印时如何生成的:
在Flink中,水印由应用程序开发生成,这通常需要对相应的领域有一定的了解,完美的水印永远不会错: 时间戳小于水印标记时间的事件不会再
出现.在特殊情况下(),最近一次事件的事件戳就可能是完美的水印.
启发式水印则相反,它只估计时间,因此有可能出错,即迟到的事件(其时间戳小于水印标记时间),晚于水印触发.针对启发式水印,Flink提供了
处理迟到元素的机制.
how:
设定水印通常需要用到领域知识,举例来说,如果知道事件的迟到时间不会超过5s,就可以将水印标记时间设为收到的最大时间戳减去5s,
另一种做法是采用一个Flink作业监控事件流,学习时间的迟到规律,并以此构建水印生成模型.
WaterMark:
概述:
大部分的时候我们希望数据是有序进入流处理系统中的,此时会出现一个问题,一旦出现乱序,如果只根据eventTime决定window的运行,
我们不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一个特定的时间后,必须触发window去进行计算,
这个特别的机制就是watermark.
intro1:
WaterMark 是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性,数据本身携带者对应的WaterMark .
Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark机制结合window来实现
数据流中的WaterMark用于表示timestamp小于WaterMark的数据,都已经达到了,因此,window的执行是由Watermark触发的,
Watermark 可以理解成一个延迟触发机制,我们可以设置Watermark的延时时长t,每次系统会校验已将达到最大的数据中的maxEventTime,
然后认定eventTime小于maxEventTime-t的所有数据都已经达到,如果有窗口的停止时间等于maxEventTime-1 ,那么这个窗口被触发执行
intro2:
当Flink接受到每一套数据时,都会产生一条Watermark,这条Watermark就等于当前所有到达数据中的maxEventTime-延迟时长,也就是说,
Watermark是由数据携带的,一旦户数携带的Watermark比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口执行。
由于Watermark是由数据携带的
因此,如果行动过程中无法获取新的数据,那么没有被触发的窗口将永远不被触发
代码实现 :
概述:
Watermark是Flink为了处理eventTime窗口计算提出的一种机制,本质上也是一种时间戳,由Flink source或者自定义的watermark生成器按照
需求定期或者按条件生成一种系统event,与普通数据流event一样流转到对应的下游operation,接收到Watermark数据operation一次不断调整自己
管理的window event time clock
2. TimestampAssigner 和Watermark
首先,eventTime计算意味着Flink必须有一个地方用于抽取每条消息中自带的时间戳, 所以TimestampAssigner的实现类都要具体实现
extractTimestamp()方法,用来抽取当前元素的eventTime,这个eventTIme会用来决定元素落到下游的哪个或者哪几个window中进行计算,
其次,在数据进入weindow前,需要有一个Watermarker生成当前的event time 对应的水位线,Flink支持两种后置的Watermark: Periodic(周期)
和Punctuated,一种是定期产生Watermark(即使没有消息产生),一种是在满足特定情况下触发.
class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with
Serializable {
var currentTimestamp: Long = 0L
//最大允许乱序事件是5s
val maxDelayTime = 5000L
var watermark: Watermark = null
override def getCurrentWatermark: Watermark = {
watermark =
new Watermark(currentTimestamp - maxDelayTime)
print(watermark)
watermark
}
override def extractTimestamp(t: String, l: Long): Long = {
val timeStamp = t.split(",")(0).toLong
currentTimestamp = Math.max(timeStamp, currentTimestamp)
println("timestamp:")
currentTimestamp
}
}
eg:
以滑动10s为间隔,水印为currenttime -5
[10:20)
在13s 输入a,watermark 8
[20:30)
在21s 输入a ,watermark 16< end window time (20) 不触发计算
在22s 输入06s 的a, 因为上一次为触发计算, 所以归到[10:20)中(a,2)
在26s 输入a, watermark 21 > end window time(20) 触发计算 ->print(a,2)
基于Watermark的window触发
条件:
基于Event Time的事件处理,Flink默认的事件触发条件Wie:
对于out-of-order及正常的数据而言:
1. watermark的时间戳>= window endTime
终决定window触发,还是由数据本身的Event Time所属的window中的window_end_time决定。例如[5:10)中最大的数据是6s,即watermark>6 就会触发窗口执行
2. window有事件发生
3. 对于late element 太多的数据而言(设置了lateness选项,默认为0)
有状态的计算:
状态概述:
所谓状态就是计算过程中产生的中间计算结果,每次计算新的数据进入到流式系统中,都是基于
中间状态结果的基础上进行运算,最终和产生正确的统计结果.
概述:
流式计算分为无状态和有状态两种情况,
1. 无状态的计算:
观察每个独立时间,并根据最后一个事件输出结果.
2. 有状态的计算:
基于有状态的计算的方式最大的优势是不需要将原始数据重新重外部存储中拿出来,从而进行全量计算,
因为这种计算方式的大家可能是非常高的,从另一个角度讲,用户无需通过调度和协调各种批量计算的工具,
这些操作全部都可以基流式计算完成,可以极大地减轻系统对其他框架的依赖,减少数据计算过程中的时间损耗以及硬件损耗.
例如:
流处理应用程序从传感器接受温度读数,并在温度超过90度时发出警告,
有状态的计算则会基于多个事件输出结果.
比较有无状态的概念:
无状态流处理分别接受每条记录,然后根据最新输入的记录生成输出记录,
有状态流处理会维护状态(根据每条输入记录进行更新),并给予最新输入的记录和当前的状态值生成输出记录
比较有无状态的区别:
无状态流处理与有状态流处理的区别。输入记录由黑条表示,无状态流处理每次只转换一条输入记录,并且根据最新的输入记录输出结果,
有状态流处理维护所有已处理记录的状态值,并根据每条新输入的记录更新状态,因此输出记录反映的是综合考虑多个事件之后的结果.
检查点: 保证Exactly-once
概述:
Flink如何保证exactly-once呢? 它使用一种被称为"检查点"的特性,在出现故障时将系统重置会正确状态,
Flink检查点的核心作用是确保状态正确,即使遇到程序中断,也要正确。
example:
记住这一基本点之后,我们用一个例子来看检查点是如何运行的,Flink为用户提供了用来定义状态的工具,
例如: 这个Scala程序按照输入记录的第一个字段(一个字符串)进行分组并维护第二个字段的计数状态).
code:
val stream:DataStream[(String,Int)]=...
val counts:DataStream[(String,Int)] = stream
.keyBy(record=>record._1)
.mapWithStage((in:(String,Int),count:Optino[Int])=>
count match{
case Some(c)=>((in._1,c+in._2),Some(c+in._2))
case None =>((in._1,in._2),Some(in._2))
})
intro:
keyBy算子: 用来将记录按照第一个元素(一个字符串)进行分组,根据该key将数据进行重新分区,然后将记录再发送给下一个算子
有状态的map算子(mapWithStage): map算子在接收到每个元素后,将输入记录的第二个字段的数据加到现有总数中,再讲更新过的元素
发送出去.
保存点: 状态版本控制
概述:
检查点由Flink自动生成,用来在故障发生时重新处理记录,从而修正状态。Flink用户还可以通过另一个特征有意识地管理状态版本,
这个特性叫作保存点(savepoint)
内容:
保存点与检查点的工作方式完全相同,只不过它有用户通过Flink命令行工具或者Web控制台手动触发,而不由Flink自动触发。和检查点一样
,保存点也被保存在稳定存储中。用户可以从保存点重启作业,而不用从头开始。保存点可以被视为作业在某一个特定时间点的快照(该时间点
即为保存点被触发的时间点)
概述2:
对保存点另一种解释是,它在明确的时间点保存应用程序状态的版本。这和用版本控制系统保存应用程序的版本很相似。
最简单的例子是在不修改应用程序代码的情况下,每隔固定的时间拍快照,即照原样保存应用程序状态的版本。
端到端的一致性:
实现方法:
1. 在sink环节缓冲所有输出,并在sink收到检查点记录时,将输出"原子提交"到存储系统。这种方法保证输出存储系统中只存在有一致性
保障的结果,并且不会出现重复的数据。从本质上说,输出存储系统会参与Flink的检查点操作。要做到这一点,输出存储系统需要具备"原子提交"能力
2. 急切地将数据写入输出存储系统,同事牢记这些数据可能是"脏"的,而且需要在发生故障时重新处理。如果发生故障,就需要将输出、输入
和Flink作业全部回滚,从而将"脏"数据覆盖,并将已经写入输出的"脏"数据删除。
Flink流批处理
概述:
1. DataStream API: 检查点、状态管理、水印、窗口和触发器
2. DataSet API: 用于调度和恢复的回溯法,特殊内存数据结构,以及查询优化
3. 通用流处理引擎
引擎:
同样的后端(流处理引擎)被用来处理有限数据和无限数据。在流处理引擎之上,Flink有以下机制:
1. 检查点机制和状态机制: 用于实现容错、有状态的处理;
2. 水印机制: 用于实现事件时钟
3. 窗口和触发器: 用于限制计算范围,并定义呈现结果的时间.
启动Event Time
概述:
由于在大多数现实世界的用例中,消息达到无序,这对构建实时流处理系统造成很大的问题,
EventTime是事件在现实世界中发生的时间,ProcessingTime是Flink系统处理该事件的时间
如何构建基于EventTIme的系统:
要启用EventTime处理,我们需要一个时间戳提取器,从消息中提取事件时间信息,请记住,消息是格式值,时间戳。
改extractTimestamp方法获取时间戳部分并将其作为一个长期。现在忽略getCurrentWatermark方法,
code:
class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with
Serializable {
override def getCurrentWatermark: Watermark = {
new Watermark(System.currentTimeMillis())
}
override def extractTimestamp(t: String, l: Long): Long = {
t.split(",")(1).toLong
}
}
step1:
我们现在需要设置这个时间戳提取器,并将TimeCharactersistic设置为EventTime。其余的代码与ProcessingTime的情况保持一致