容错机制
在这一节,我们要讨论一下Spark Streaming的容错机制。
背景知识
为了能够更好地理解Spark Streaming的容错机制,我们先来看下Spark RDD的基本容错机制。
- 任何一个RDD都是一个不可变的,可重计算的,分布式的数据集。每一个RDD都记录了确定的血缘关系(一些列转换操作的依赖关系)来进行容错。
- 如果任何RDD分区因为worder节点错误导致丢失,那么这个分区就可以通过原始输入数据集来恢复,恢复的过程就是靠RDD记录的血缘关系(一些列转换操作的依赖关系)。
- 假设RDD的transformaction是不变的,那么无论集群发生任何错误,最终生成的RDD中的数据都是一样的。
Spark的处理数据来自于容错的文件系统(HDFS,S3等),因此从容错数据生成的RDD都是容错的。然而Spark Streaming从网络接受数据的情况并不是这样。为了同样能够让生成的RDD具有容错特性,接受的数据会生成一个副本并分布在不同的集群节点上。这就会有两种类型的数据需要从错误中恢复。
- Data received and replicated - 这种数据会在失败后不会丢失,应为它会有一个副本保存在另外一个节点上,就像分身术一样,并且两个都是本体。
- Failure of the Driver Node - 如果正在运行Spark Streaming的driver节点失败了,那么很显然SparkContext也就没哟了,所有执行器以及暂存在执行器内存中的数据也就丢失了。
有了这些基础知识之外,我们接下来可以详细了解一下Spark Streaming的容错机制了。
定义
流处理系统通常会关心每一条数据会被处理几次。一个系统在所有的操作情况下(不管是错误,还是其他),可以提供三种类型的保障机制。
- At most once - 每一条数据都会被执行一次或者不执行。
- At least once - 每一条记录都会被执行一次或者多次。这种比第一种有更强的容错能力,能够保证数据不丢失。但是可能需要多次重复计算。
- Exactly once - 每一条记录都会恰好执行一次,并且没有数据丢失没有数据被执行多次。这明显是这三种中最好的一个保障机制。
基本概念
在任何流系统中,处理数据都会分为三步。
- Receiving the data - 使用接收器或者其他方式从源接收数据。
- Transforming the data - 接收到的数据进行transform操作,通过DStream或者RDD上定义的各种transformaction。
- Pushing out the data - 最终,经过处理的数据,要输出到外部系统。比如文件系统或者数据库。
如果一个流处理应用可以提供一个end-to-end的exactly-once guarantees,那么每一步都要提供一个exactly-once guarantees。也就是说,每一条记录都只能接受一次,转换一个,输出一次。下面,让我们来看一下Spark Streaming对这三步提供的一个机制。
- Receiving the data - 不同的源能够提供不同的担保机制,下面我们会说
- Transforming the data - 所有接收到的数据都会被处理一次,这要感谢RDD提供的guarantees。即使处理过程中发生了错误,只要输入数据还能被访问,那么最终结果永远都是一样。
- Pushing out the data - 输出操作只保证了at-least once,因为这取决于你使用了什么类型的操作和输出到的外部系统是否提供了什么机制(比如食事务控制)。但是用户可以实现自己的事务控制方法,来保证exactly-once。这会在这一节的后面讨论到。
数据接收机制
不同的输入数据源能够提供从at-least once到exactly once的不同保证机制。
文件源
如果待输入数据已经持久化到高可用文件系统中。那么Spark Streaming就可以从任何错误值恢复,并重新处理所有数据。这种方式提供了exactly once这种最高级别的保证机制,保证了任务错误情况下数据都能够被处理并且只处理一次。
基于接收器的源
对于基于接收器的数据源来说,容错机制同时取决于错误发生的情况和使用了何种接收器。为了方便讨论,我们把接收器分为两类。
- 可靠型接收器 - 这一类接收器会在确保接收到的数据已经备份之后和数据源通信告知数据已经接收。如果接收器出现错误,数据源就不会收到来自于接收器对于已经缓存(但并未备份)数据的确认消息。因此,在接收器重启后,数据源会重新发送数据到接收器来保证数据不会丢失。
- 非可靠型接收器 - 这种接收器没有与数据源的确认机制,所以有可能因为driver活着worker的问题出现错误后发生数据丢失的情况。
下面看看使用不同种类的接收器在具体错误发生的情况下产生的结果。如果worker节点发生故障,使用可靠型接收器不会发生数据丢失,而使用非可靠型接收器会导致还未备份的数据丢失。如果driver节点发生故障,那么所有接收到和已经备份到内存中的数据都会丢失,这将影响带有状态信息转换操作的结果。
为了避免丢失接收到的数据,Spark 1.2开始引入了write ahead logs机制来吧接收到的数据备份到可靠地存储系统中。启用write ahead logs机制并使用可靠型接收器可以保证零数据丢失。使用这种方案,提供了at-least once guarantee机制。
Deployment Scenario | Worker Failure | Driver Failure |
---|---|---|
Spark 1.1 or earlier, OR Spark 1.2 or later without write ahead logs | Buffered data lost with unreliable receivers Zero data loss with reliable receivers At-least once semantics | Buffered data lost with unreliable receivers Past data lost with all receivers Undefined semantics |
Spark 1.2 or later with write ahead logs | Zero data loss with reliable receivers At-least once semantics | Zero data loss with reliable receivers and files At-least once semantics |
输出机制
输出操作(例如foreachRDD)都提供了at-least once机制,这意味着数据可能会不止一次地输出到外部系统。当你使用saveAs***Files方法的时候就可能会发生重复写入相同数据。你需要一些额外的工作来保证exactly-once。在这里提供两种解决方案。
- 等价更新 - 即为多次写入同样的数据到相同位置,这样并不会影响最终结果。
- 事务控制 - 每一次的更新数据操作都会启动事务,用是无奈保证exactly once。
- 使用batch time和partition index来创建一个id,使用这个id来确保数据的唯一性
- 启动事务并使用这个id来更新外部系统数据,如果这个id不存在则提交更新,如果这个id已经存在那么则放弃更新。
dstream.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionIterator =>
val partitionId = TaskContext.get.partitionId()
val uniqueId = generateUniqueId(time.milliseconds, partitionId)
// use this uniqueId to transactionally commit the data in partitionIterator
}
}