Flink Program Guide (7) -- 容错 Fault Tolerance(DataStream API编程指导 -- For Java)

转载于: https://www.cnblogs.com/lanyun0520/p/5777452.html


本文翻译自StreamGuide的Fault Tolerance

----------------------------------------------------------

Flink的容错机制会在错误出现时恢复程序并继续执行,这些容错机制包括设备硬件失效、网络失效、临时程序失效等等。


一、流容错

Flink使用检查点机制来在流Job失效后对其进行恢复。该检查点机制需要一个可以再次请求前面数据的persistent(或durable)的数据源(Apache Kafka便是如此一个数据源的示例)。


检查点机制将数据源和数据sink中的进展、窗口状态以及用户定义的状态(见于Working with state)一致地(consistently)存储起来以提供exactly once的处理语义。有关检查点存储位置(如JobManager、文件系统、数据库)依赖配置的state backend


文档streaming fault tolerance详细描述了Flink流容错机制中的技术。


我们可以通过StreamExecutionEnvironment调用enableCheckPointing(n)方法来启用检查点机制,其中参数n为检查点间隔,以毫秒计。


有关检查点机制的其他参数包括:

·       重试次数:setNumberOfExecutionRetries()方法定义了在失效后job会重新启动多少次。在检查点机制已启用但该值没有设置时,job通常会无限次重启。

·       恰好执行一次 VS. 至少执行一次:你可以向enableCheckPointing(n)方法传递一个mode,该mode包括两个保证级别。其中恰好执行一次适用于绝大多数应用,而至少执行一次则可能更适合一些对要求执行时间极短的应用(持续要求几毫秒)。

·       并行检查点数量:默认地,系统不会再一个检查点正在进行时触发另一个检查点,这保证整个执行拓扑不会花太多时间在检查点上而导致流数据处理停滞。Flink允许多个重叠检查点的情况存在,这对与在有一定延迟的流水线并行情景中(例如由于外部调用服务需要时间响应而导致延迟),仍然想要非常频繁地运行检查点来在失效后仅需要很少量重运行的需求十分有用。

·       检查点超时:定义一个超时时间,如果运行中的检查点到该事件点仍未完成,则将它中止。


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start a checkpoint every 1000 ms

env.

enableCheckpointing(1000);

// advanced options:

// set mode to exactly-once (this is the default)

env.

getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// checkpoints have to complete within one minute, or are discarded

env.

getCheckpointConfig().setCheckpointTimeout(60000);

// allow only one checkpoint to be in progress at the same time

env.

getCheckpointConfig().setMaxConcurrentCheckpoints(1);


1.1 数据源和Sink的容错保证

Flink仅在数据源参加了快照(snapshotting)机制时才可以保证在更新用户定义状态恰好执行一次。下表列出了对应绑定的connector的Flink更新状态的保证级别。有关各个connector的容错保证级别的细节,请见每个connnector的文档。

SourceGuaranteesNotes

Apache Kafkaexactly once根据你使用的版本,选择合适的Kafka的connector

AWS Kinesis Streamsexactly once 

RabbitMQat most once (v 0.10) / exactly once (v 1.0) 

Twitter Streaming APIat most once 

Collectionsexactly once 

Filesexactly once 

Socketsat most once 


为了保证端到端的恰好执行一次的数据传递(以及恰好执行一次的状态语义),数据sink需要参与检查点机制。下表Flink与绑定的sink的传递保证(假设是恰好执行一次状态更新):

SinkGuaranteesNotes

HDFS rolling sinkexactly once其实现依赖于Hadoop版本

Elasticsearchat least once 

Kafka producerat least once 

Cassandra sinkat least once / exactly once仅对于幂等的(idempotent)更新是恰好执行一次的保证

AWS Kinesis Streamsat least once 

File sinksat least once 

Socket sinksat least once 

Standard outputat least once 

Redis sinkat least once 


二、重启策略

Flink支持不同的重启策略,它们控制着job在失效情况下如何重启。集群可以用一个默认重启策略来启动,该策略总是在没有job的重启策略定义时使用。当一个拥有重启策略的job提交之后,该策略将会重写集群的默认设置。


默认地重启策略是通过Flink的配置文件flink-conf.yaml设置的。配置参数restart-strategy定义了启用什么策略。在每次默认情景下,会使用不重启的策略。有关改配置支持什么值,请见下面的可用重启策略表格。


每个重启策略都自带它们的参数集合来控制它们的行为。这些值同样在配置文件中有所设置。每个重启策略的描述包含了更多有关对应配置值的信息。

重启策略restart-strategy的值

Fixed delayfixed-delay

Failure ratefailure-rate

No restartnone


除了定义一个默认的重启策略,我们也可以为每个Flink的job定义各自的重启策略。重启测略可以通过在ExecutionEnvironment中调用setRestartStrategy方法来设置。注意,该方法同样适用于StreamExecutionEnvironment


下例中展示了我们如何为我们的job的重启策略设置一个固定延迟,在该例中,失效发生时系统将尝试将job重启3次,并且每次重启尝试的间隔为10秒。


ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

env.

setRestartStrategy(RestartStrategies.fixedDelayRestart(3// number of restart attempts 

Time.

of(10, TimeUnit.SECONDS// delay

));


2.1 固定延迟重启策略

固定延迟重启策略会以一个给定的次数尝试重启job。如果超过了最大重试次数,该job将判定为最终失败。在两次连续的重启尝试之间,重启策略会等待一段固定的时间。


在配置文件flink-conf.yaml中设置以下参数时,该策略将会作为默认策略启用。

restart-strategy : fixed-delay

配置参数描述默认值

restart-strategy.fixed-delay.attemptsNumber of restart attempts1

restart-strategy.fixed-delay.delayDelay between two consecutive restart attemptsakka.ask.timeout

restart-strategy.fixed-delay.attempts: 3

restart-strategy.fixed-delay.delay: 10s


固定延迟策略同样可以使用代码设置:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

env.

setRestartStrategy(RestartStrategies.fixedDelayRestart(3// number of restart attempts 

Time.

of(10, TimeUnit.SECONDS// delay

));


2.1.1 重启尝试(Restart Attemps)

Flink认为job失败的重启次数是可以通过restart-strategy.fixed-delay.attempts配置的,默认值为1


2.1.2 重试延迟(Retry Delays)

重试的执行可以配置为有延迟的。延迟重试意味着在一次执行失败后,重新执行不会立即启动,而是要等待一个延迟后再启动。

延迟重试对于程序与外部系统交互有一定帮助,例如连接或者待定的会话需要在到达超时时间之后才可以尝试重新执行。

该值的默认值为akka.ask.timeout


2.2 失败比率重启策略

失败比率重启策略会在job失败后重启它,但是当failure rate(即平均每秒的失败次数)超出后,job将被判定为最终失败。在两次连续的重启尝试之间,重启策略会等待一段固定时间。


在配置文件flink-conf.yaml中设置以下参数时,该策略将会作为默认策略启用。

Restart-strategy: failure-rate

配置参数描述默认值

restart-strategy.failure-rate.max-failures-per-interval在判定一个job彻底失败前的给定时间内最大重启次数1

restart-strategy.failure-rate.failure-rate-interval测量failure rate的时间区间长度1 minute

restart-strategy.failure-rate.delay两次重启尝试之间等待的时间akka.ask.timeout

restart-strategy.failure-rate.max-failures-per-interval: 3

restart-strategy.failure-rate.failure-rate-interval: 5 min

restart-strategy.failure-rate.delay: 10 s


失败比率重启策略同样可以通过代码设置:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

env.

setRestartStrategy(RestartStrategies.failureRateRestart(3// max failures per interval

Time.

of(5, TimeUnit.MINUTES), //time interval for measuring failure rate

Time.

of(10, TimeUnit.SECONDS// delay

));


2.3 不重启策略

在该策略中,job失效将直接判定为最终失效,不会尝试重启。

restart-strategy: none


不重启策略同样可以通过代码设置:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

env.

setRestartStrategy(RestartStrategies.noRestart());

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,122评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,070评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,491评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,636评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,676评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,541评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,292评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,211评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,655评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,846评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,965评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,684评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,295评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,894评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,012评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,126评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,914评论 2 355