要篇内容介绍Executor的容错,容错方式有WAL、消息重放、其他
- 首先介绍WAL的方法,就是保存数据前,先把数据写日志。从ReceiverSupervisorImpl的pushAndReportBlock的方法开始看,代码如下
def pushAndReportBlock(
receivedBlock: ReceivedBlock,
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) {
val blockId = blockIdOption.getOrElse(nextBlockId)
val time = System.currentTimeMillis
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val numRecords = blockStoreResult.numRecords
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
logDebug(s"Reported block $blockId")
}
调用receivedBlockHandler的storeBlock方法,receivedBlockHandler决定了采用哪种方式来存储数据,代码如下
private val receivedBlockHandler: ReceivedBlockHandler = {
if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
if (checkpointDirOption.isEmpty) {
throw new SparkException(
"Cannot enable receiver write-ahead log without checkpoint directory set. " +
"Please use streamingContext.checkpoint() to set the checkpoint directory. " +
"See documentation for more details.")
}
new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
} else {
new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
}
}
如果开启WAL的方式,会将数据保存到checkpoint目录,如果checkpoint目录没有配置,就抛出异常。
先看WriteAheadLogBasedBlockHandler,开启WAL后,采用BlockManager存储数据时就不需要复本了,否则和WAL同时做容错就是重复性工作,降低了系统的性能。
再看BlockManagerBasedBlockHandler,就是将数据交给BlockManager存储,根据用户定义的存储级别来存储,系统一般默认存储级别为MEMORY_AND_DISK_SER_2,如果对数据安全性要求不高也可以不要复本。
- 消息重放就是一种非常高效的方式,采用kafka的Direct API接口读取数据时首先计算offset的位置,如果job异常,根据消费的offset位置重新指定kafka的offset,从失败的位置读取。kafka直接做为文件存储系统,就像hdfs一样,具体怎么使用以后的章节还会介绍。