Savepoints

Overview

Savepints是外部存储的checkpoint,你可以通过该checkpoint来停止,重复消费和更新你的Flink程序。使用Flink的checkpoint机制来为你的流式计算程序创建一个(非自增的)快照,并且将checkpoint数据和元数据写入到外部的文件系统中。

本文涵盖了触发,恢复和处理Savepints的所有步骤。关于更多的Flink如何处理状态和失败的细节,请查看State in Streaming Programs页。

注意:为了程序和Flink版本直接能够正常的升级,请仔细查阅下述的assigning IDs to your operators部分。

Assigning Operator IDs

为了能够在未来升级的你程序,强烈推荐按照本节所描述的一样去调整你的程序。主要的改变是通过uid(String)方法人为指定operator ID。这些id用来关联每个operator的状态。

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
 .addSource(new StatefulSource())
 .uid("source-id") // ID for the source operator
 .shuffle()
 // Stateful mapper with ID
 .map(new StatefulMapper())
 .uid("mapper-id") // ID for the mapper
 // Stateless printing sink
 .print(); // Auto-generated ID

如果不人为指定ID,它们会被自动生成。只要ID不变化,则程序可以自动的从savepoint恢复。ID的生成依赖于程序的结构,并且对程序变化敏感。因此强烈建议人为分配ID。

Savepoint State

可以将savepoint想象成持有每个有状态的操作的Operator ID到State的映射关系:

Operator ID | State
------------+------------------------
source-id   | State of StatefulSource
mapper-id   | State of StatefulMapper

在上面的例子中,print sink是无状态的,因此不是savepoint的一部分。默认情况下,会尝试映射savepoint的每条记录到新的程序中。

Operations

可以使用命令行客户端来触发savepoint,取消一个job并且记录savepoint,从savepoint恢复和删除savepoint。
如果Flink>=1.2.0,也可以使用webui来从savepoint恢复。

Triggering Savepoints

当触发一次savepoint时,将在目标目录下创建一个新的savepoint目录。数据和元数据会存储在该目录下。例如使用FsStateBackend 或 RocksDBStateBackend时:

# Savepoint target directory
/savepoints/
# Savepoint directory
/savepoints/savepoint-:shortjobid-:savepointid/
# Savepoint file contains the checkpoint meta data
/savepoints/savepoint-:shortjobid-:savepointid/_metadata
# Savepoint state
/savepoints/savepoint-:shortjobid-:savepointid/...

Note: 尽管看起来savepoint可以被移动,但是实际上当前不行,因为_metadata文件中包含绝对路径。请跟踪FLINK-5778关于解除这方面限制所取得的进展。

注意,如果使用MemoryStateBackend,metadata和savepoint state存储在_metadata文件中。由于它是自包含的,所以你可以移动和存储到任何位置。

Trigger a Savepoint

$ bin/flink savepoint :jobId [:savepointDirectory]

这会触发ID为:jobId的作业的savepoint,并且返回创建的savepoint的路径。你需要这个路径来恢复和删除savepoint。
此外,你可以选择指定目标文件系统目录来存储savepoint。该目录必须能被JobManager访问。
如果你不指定一个目标目录,则需要配置一个默认目录。否则,取消任务并savepoint会失败。

Resuming from Savepoints

$ bin/flink run -s :savepointPath [:runArgs]

这将提交一个作业,并制定恢复的savepoint。你也可以给一个savepoint目录的或者_metadata文件的目录。

Allowing Non-Restored State

默认情况下,恢复操作会尝试映射savepoint的所有状态回程序中。如果你已经删除了一个操作,也允许跳过这个状态而不会映射到新程序中,通过--allowNonRestoredState (short: -n) 选项:

$ bin/flink run -s :savepointPath -n [:runArgs]

Disposing Savepoints

 $ bin/flink savepoint -d :savepointPath

这会删除存储在savepointPath中的savepoint。
注意,也可以通过常规的文件系统操作删除一个savepoint,而不会影响其他的savepoint或者checkpoint(请记住,每个savepoint都是自包含的)。在Flink1.2中,执行上述的savepoint命令是一个更频繁的任务。

Configuration

你可以通过state.savepoints.dir属性来配置默认的savepoint目录。当触发savepoint时,这个目录会用来存储savepoint。你可以通过使用触发命令并且指定目标目录来覆盖这个默认值。

# Default savepoint target directory
state.savepoints.dir: hdfs:///flink/savepoints

如果你既不配置默认值,也不指定目标目录,savepoint操作会失败。

F.A.Q

Should I assign IDs to all operators in my job?

As a rule of thumb, yes. Strictly speaking, it is sufficient to only assign IDs via the uid method to the stateful operators in your job. The savepoint only contains state for these operators and stateless operator are not part of the savepoint.

In practice, it is recommended to assign it to all operators, because some of Flink’s built-in operators like the Window operator are also stateful and it is not obvious which built-in operators are actually stateful and which are not. If you are absolutely certain that an operator is stateless, you can skip the uid method.

What happens if I add a new operator that requires state to my job?

When you add a new operator to your job it will be initialized without any state. Savepoints contain the state of each stateful operator. Stateless operators are simply not part of the savepoint. The new operator behaves similar to a stateless operator.

What happens if I delete an operator that has state from my job?

By default, a savepoint restore will try to match all state back to the restored job. If you restore from a savepoint that contains state for an operator that has been deleted, this will therefore fail.

You can allow non restored state by setting the --allowNonRestoredState (short: -n) with the run command:

$ bin/flink run -s :savepointPath -n [:runArgs]

What happens if I reorder stateful operators in my job?

If you assigned IDs to these operators, they will be restored as usual.

If you did not assign IDs, the auto generated IDs of the stateful operators will most likely change after the reordering. This would result in you not being able to restore from a previous savepoint.

What happens if I add or delete or reorder operators that have no state in my job?

If you assigned IDs to your stateful operators, the stateless operators will not influence the savepoint restore.

If you did not assign IDs, the auto generated IDs of the stateful operators will most likely change after the reordering. This would result in you not being able to restore from a previous savepoint.

What happens when I change the parallelism of my program when restoring?

If the savepoint was triggered with Flink >= 1.2.0 and using no deprecated state API like Checkpointed, you can simply restore the program from a savepoint and specify a new parallelism.

If you are resuming from a savepoint triggered with Flink < 1.2.0 or using now deprecated APIs you first have to migrate your job and savepoint to Flink >= 1.2.0 before being able to change the parallelism. See the upgrading jobs and Flink versions guide.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,417评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,921评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,850评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,945评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,069评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,188评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,239评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,994评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,409评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,735评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,898评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,578评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,205评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,916评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,156评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,722评论 2 363
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,781评论 2 351