Flink源码分析 -- 两阶段提交
先简单了解一下处理语义
关于消息的消费、处理语义可以分为三类:
1. at most once : 至多一次,表示一条消息不管后续处理成功与否只会被消费处理一次,那么就存在数据丢失可能
2. exactly once : 精确一次,表示一条消息从其消费到后续的处理成功,只会发生一次
3. at least once :至少一次,表示一条消息从消费到后续的处理成功,可能会发生多次
在我们程序处理中通常要求能够满足exectly once语义,保证数据的准确性,flink 通过checkpoint机制提供了Exactly-Once与At-Least-Once 两种不同的消费语义实现, 可以将程序处理的所有数据都保存在状态内部,当程序发生异常失败重启可以从最近一次成功checkpoint中恢复状态数据,通过checkpoint中barrier对齐机制来实现这两不同的语义,barrier对齐发生在一个处理节点需要接收上游不同处理节点的数据,由于不同的上游节点数据处理速度不一致,那么就会导致下游节点接收到 barrier的时间点也会不一致,这时候就需要使用barrier对齐机制:在同一checkpoint中,先到达的barrier是否需要等待其他处理节点barrier达到后在发送后续数据,barrier将数据流分为前后两个checkpoint(chk n,chk n+1)的概念,如果不等待那么就会导致chk n的阶段处理了chk n+1阶段的数据,但是在source端所记录的消费偏移量又一致,如果chk n成功之后,后续的任务处理失败,任务重启会消费chk n+1阶段数据,就会到致数据重复消息,如果barrier等待就不会出现这样情况,因此barrier需要对齐那么就是实现exectly once语义,否则实现的是at least once语义。由于状态是属于flink内部存储,所以flink 仅仅满足内部exectly once语义。
两阶段提交(2PC)
在分布式系统中,可以使用两阶段提交来实现事务性从而保证数据的一致性,两阶段提交分为:预提交阶段与提交阶段,通常包含两个角色:协调者与执行者,协调者用于用于管理所有执行者的操作,执行者用于执行具体的提交操作,具体的操作流程:
如果在流程中部分预提交失败,那么协调者就会收到一条失败的反馈,则会发送一条rollback消息给所有执行者,执行回滚操作,保证数据一致性;但是如果在流程中,出现部分提交成功部分提交失败,那么就会造成数据的不一致,因此后面也提出了3PC或者通过其他补偿机制来保证数据最终一致性,
Flink中的两阶段提交
2017年12月Apache Flink社区发布了1.4版本。该版本引入一个新的的功能:两阶段提交Sink,即TwoPhaseCommitSinkFunction。该SinkFunction提取并封装了两阶段提交协议中的公共逻辑,自此Flink搭配特定source和sink搭建精确一次处理语义( exactly-once semantics)应用成为了可能。作为一个抽象类TwoPhaseCommitSinkFunction提供了一个抽象层供用户自行实现特定方法来支持 exactly-once semantics。
在这之前Flink支持EXACTLY_ONCE仅是对于Flink应用内部来说,对外部系统(端到端)有比较强的限制
外部系统写入支持幂等性
外部系统支持以事务的方式写入
在Flink1.4之后引入了TwoPhaseCommitSinkFunction接口,并在kafka Producer的connector中实现了它,支持了外部Kafka Sink 的EXACTLY_ONCE语义
两阶段提交协议针对Flink的Sink。要求下游的系统支持事务,或者是幂等性。两阶段提交是指如下两个阶段:
preCommit : 执行预提交, 在Sink进行snapshot操作的时候调用此方法。
-
commit : 真正的提交操作。当系统中各个operator的checkpoint操作都成功之后,JobManager会通知各个operator checkpoint操作已完成。此时会调用该方法
通过这两个阶段然后结合checkpoint 过程提供的hook,来实现两阶段提交过程
1. 一旦所有operator完成各自的pre-commit,它们会发起一个commit操作
2. 倘若有一个pre-commit失败,所有其他的pre-commit必须被终止,并且Flink会回滚到最近成功完成decheckpoint
3. 一旦pre-commit完成,必须要确保commit也要成功——operator和外部系统都需要对此进行保证。倘若commit失败(比如网络故障等),Flink应用就会崩溃,然后根据用户重启策略执行重启逻辑,之后再次重试commit。这个过程至关重要,因为倘若commit无法顺利执行,就可能出现数据丢失的情况
因此,所有opeartor必须对checkpoint最终结果达成共识:即所有operator都必须认定数据提交要么成功执行,要么被终止然后回滚。
当出现崩溃时,Flink会恢复最新已完成快照中应用状态。需要注意的是在某些极偶然的场景下,pre-commit阶段已成功完成而commit尚未开始(也就是operator尚未来得及被告知要开启commit),此时倘若发生崩溃Flink会将opeartor状态恢复到已完成pre-commit但尚未commit的状态。
在一个checkpoint状态中,对于已完成pre-commit的事务状态,我们必须保存足够多的信息,这样才能确保在重启后要么重新发起commit亦或是终止掉事务。本例中这部分信息就是临时文件所在的路径以及目标目录。
TwoPhaseCommitSinkFunction考虑了这种场景,因此当应用从checkpoint恢复之后TwoPhaseCommitSinkFunction总是会发起一个抢占式的commit。这种commit必须是幂等性的,虽然大部分情况下这都不是问题。本例中对应的这种场景就是:临时文件不在临时目录下,而是已经被移动到目标目录下。
提交过程中会出现两种情况 :
1.Pre-commit失败,将恢复到最近一次CheckPoint位置
2.一旦pre-commit完成,必须要确保commit也要成功
因此,所有opeartor必须对checkpoint最终结果达成共识:即所有operator都必须认定数据提交要么成功执行,要么被终止然后回滚
现在根据源码去刨析两阶段提交代码
TwoPhaseCommitSinkFunction
该类是实现两阶段提交Sink的父类,封装了两阶段提交的主要逻辑。
initializeState方法。该方法在CheckpointedFunction接口中定义,在集群中执行的时候调用,用于初始化状态后端。 该方法主要有以下逻辑:
获取状态存储变量state。
提交所有已经执行过preCommit的事务。
终止所有尚未preCommit的事务。
创建一个新事务。
TwoPhaseCommitSinkFunction类是一个抽象类,继承了RickSinkFunction,实现了CheckpointedFunction,和CheckpointListener接口
@PublicEvolving
public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>
extends RichSinkFunction<IN>
implements CheckpointedFunction, CheckpointListener
initializeState方法, 主要作用 :
状态初始化方法,只会被调用一次,第一件事情是用来恢复上次checkpoint完成预提交的事务与下一次checkpoint开始的事务,对于上次checkpoint完成预提交说明该checkpoint已经完成,那么执行commit操作,下一次checkpoint开始的事务说明该checkpoint,那么执行abort操作,第二件事情是开启一个新的事务,给新的checkpoint使用
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// when we are restoring state with pendingCommitTransactions, we don't really know whether the
// transactions were already committed, or whether there was a failure between
// completing the checkpoint on the master, and notifying the writer here.
// (the common case is actually that is was already committed, the window
// between the commit on the master and the notification here is very small)
// it is possible to not have any transactions at all if there was a failure before
// the first completed checkpoint, or in case of a scale-out event, where some of the
// new task do not have and transactions assigned to check)
// we can have more than one transaction to check in case of a scale-in event, or
// for the reasons discussed in the 'notifyCheckpointComplete()' method.
//获取 operator的state
state = context.getOperatorStateStore().getListState(stateDescriptor);
// todo userContext 回收标志
boolean recoveredUserContext = false;
// 从上一个snapshot恢复完成的时候返回true,如果任务不支持snapshot,永远返回false
// 如果状态是从前一次执行的快照恢复的,则返回true。对于无状态任务,这总是返回false
// 指示是否找到要恢复的任何状态 -- true 需要恢复
if (context.isRestored()) {
LOG.info("{} - restoring state", name());
//开始进行状态恢复
for (State<TXN, CONTEXT> operatorState : state.get()) {
// 获取上下文
userContext = operatorState.getContext();
// 获取待提交的事务
// 在snapshotState方法调用preCommit之后,事务会被存储到该列表
// 获取等待的 事务 -- get pre-Commit event
List<TransactionHolder<TXN>> recoveredTransactions = operatorState.getPendingCommitTransactions();
List<TXN> handledTransactions = new ArrayList<>(recoveredTransactions.size() + 1);
//循环等待的事务, 提交事务
for (TransactionHolder<TXN> recoveredTransaction : recoveredTransactions) {
// If this fails to succeed eventually, there is actually data loss
// 如果这种方法最终失败,实际上会导致数据丢失
// 恢复并提交这些之前在state中保存的事务,如果失败可能是打开事务时超时或者数据出现丢失
// todo 提交事务
recoverAndCommitInternal(recoveredTransaction);
handledTransactions.add(recoveredTransaction.handle);
LOG.info("{} committed recovered transaction {}", name(), recoveredTransaction);
}
{
// 获取到 未preCommit的事务
TXN transaction = operatorState.getPendingTransaction().handle;
// 恢复并终止该事务 -- 在失败后被协调器拒绝的事务中止 --调用 FlinkKafkaProduce的recoverAndAbort方法
recoverAndAbort(transaction);
handledTransactions.add(transaction);
LOG.info("{} aborted recovered transaction {}", name(), operatorState.getPendingTransaction());
}
if (userContext.isPresent()) {
//在恢复(每个)用户上下文后调用的子类的回调 -- 调用 FlinkKafkaProduce的finishRecoveringContext方法
// 回收 context
finishRecoveringContext(handledTransactions);
recoveredUserContext = true;
}
}
}
// if in restore we didn't get any userContext or we are initializing from scratch
// 如果在恢复中,我们没有得到任何userContext或者我们正在从头开始初始化
if (!recoveredUserContext) {
LOG.info("{} - no state to restore", name());
//清空 context
userContext = initializeUserContext();
}
//清空 等待提交事务的列表
this.pendingCommitTransactions.clear();
// 开启一个新的事务
currentTransactionHolder = beginTransactionInternal();
LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);
}
snapshotState方法与checkpoint同步周期性执行的方法,首先执行preCommit对本次checkpoint事务执行预提交操作,并且开启一个新的事务提供给下一次checkpoint使用,然后将这两个事务句柄存放在state中进行容错,preCommit提交的事务就是在失败后重启需要commit的事务,而新开启的事务就是在失败后重启需要放弃的事务;
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// this is like the pre-commit of a 2-phase-commit transaction
// we are ready to commit and remember the transaction
// 检查事务对象不能为空 --- 执行state snapshot时必须要有事务对象
checkState(currentTransactionHolder != null, "bug: no transaction object when performing state snapshot");
long checkpointId = context.getCheckpointId();
LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'", name(), context.getCheckpointId(), currentTransactionHolder);
// 调用preCommit方法 预提交 -- todo 最终会调用实现类的preCommit 如 FlinkKafkaProducer011
preCommit(currentTransactionHolder.handle);
// 在待提交事务列表(pendingCommitTransactions)中记录该事务
// todo 将预提交事务 添加到待提交事务列表
pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);
// 开启新的事务
currentTransactionHolder = beginTransactionInternal();
LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);
// 清空state,然后记录当前事务和待提交事务 --todo State is Pojo类 耦合 当前事务,待提交事务列表,上下文
state.clear();
state.add(new State<>(
this.currentTransactionHolder,
new ArrayList<>(pendingCommitTransactions.values()),
userContext));
}
notifyCheckpointComplete方法主要时checkpoint完成之后的回调方法,负责对预提交的事务执行commit操作。
@Override
public final void notifyCheckpointComplete(long checkpointId) throws Exception {
// the following scenarios are possible here
//
// (1) there is exactly one transaction from the latest checkpoint that
// was triggered and completed. That should be the common case.
// Simply commit that transaction in that case.
//
// (2) there are multiple pending transactions because one previous
// checkpoint was skipped. That is a rare case, but can happen
// for example when:
//
// - the master cannot persist the metadata of the last
// checkpoint (temporary outage in the storage system) but
// could persist a successive checkpoint (the one notified here)
//
// - other tasks could not persist their status during
// the previous checkpoint, but did not trigger a failure because they
// could hold onto their state and could successfully persist it in
// a successive checkpoint (the one notified here)
//
// In both cases, the prior checkpoint never reach a committed state, but
// this checkpoint is always expected to subsume the prior one and cover all
// changes since the last successful one. As a consequence, we need to commit
// all pending transactions.
//
// (3) Multiple transactions are pending, but the checkpoint complete notification
// relates not to the latest. That is possible, because notification messages
// can be delayed (in an extreme case till arrive after a succeeding checkpoint
// was triggered) and because there can be concurrent overlapping checkpoints
// (a new one is started before the previous fully finished).
//
// ==> There should never be a case where we have no pending transaction here
//
// 获取所有待提交的事务 -- 使用LinkedHashMap存储事务, 存储有序
Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
Throwable firstError = null;
while (pendingTransactionIterator.hasNext()) {
Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
Long pendingTransactionCheckpointId = entry.getKey();
TransactionHolder<TXN> pendingTransaction = entry.getValue();
// 只提交在checkpointId之前的事务
if (pendingTransactionCheckpointId > checkpointId) {
continue;
}
// {name} - checkpoint {checkpointId} complete, committing transaction {pendingTransaction} from checkpoint {pendingTransactionCheckpointId}
LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",
name(), checkpointId, pendingTransaction, pendingTransactionCheckpointId);
// todo 接近超时事件进行警告
logWarningIfTimeoutAlmostReached(pendingTransaction);
try {
// 逐个提交之前preCommit过的事务
commit(pendingTransaction.handle);
} catch (Throwable t) {
if (firstError == null) {
firstError = t;
}
}
LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);
// 将提交过的事务从待提交事务列表中清除
pendingTransactionIterator.remove();
}
if (firstError != null) {
throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",
firstError);
}
}
在提交事务的时候调用commit方法, 会进入FlinkKafkaProducer011类中
FlinkKafkaProducer011继承了 TwoPhaseCommitSinkFunction类
public class FlinkKafkaProducer011<IN>
extends TwoPhaseCommitSinkFunction<IN, FlinkKafkaProducer011.KafkaTransactionState, FlinkKafkaProducer011.KafkaTransactionContext>
FlinkKafkaProducer011的commit方法
@Override
protected void commit(KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
try {
//调用flink-connector-kafka-0.11项目下的FlinkKafkaProducer的commitTransaction方法
transaction.producer.commitTransaction();
} finally {
//如果失败 回收 producer
recycleTransactionalProducer(transaction.producer);
}
}
}
进入FlinkKafkaProducer的commitTransaction方法
@Override
public void commitTransaction() throws ProducerFencedException {
synchronized (producerClosingLock) {
// 确保producer没有关闭
ensureNotClosed();
// 提交事务
kafkaProducer.commitTransaction();
}
}
FlinkKafkaProducer
现在分析一下FlinkKafkaProducer类,是Flink对Kafka Producer的一个封装,其中以后用了producerClosingLock变量,用于对事务提交,回滚和关闭producer等操作加锁,在kafka2.3.0之前存在一个bug,关闭producer的线程和提交/终止事务的线程会发生死锁.在FlinkKafkaProducer对这些操作添加锁,避免此类问题,
FlinkKafkaProducer还有一个transactionId。创建的时候会从ProducerConfig配置中获取。
// This lock and closed flag are introduced to workaround KAFKA-6635. Because the bug is only fixed in
// Kafka 2.3.0, we need this workaround in 0.11 producer to avoid deadlock between a transaction
// committing / aborting thread and a producer closing thread.
// todo 这个锁和关闭标志被引入到KAFKA-6635的工作环境中。因为这个错误只在Kafka 2.3.0中得到了修复,
// 所以我们需要在0.11生成器中使用这个解决方案来避免事务提交/中止线程和生成器关闭线程之间的死锁。
private final Object producerClosingLock;
在调用是事务前的操作方法中,都会添加锁,防止出现死锁的情况
// todo 初始化事务
@Override
public void initTransactions() {
synchronized (producerClosingLock) {
ensureNotClosed();
kafkaProducer.initTransactions();
}
}
// todo 开始事务
@Override
public void beginTransaction() throws ProducerFencedException {
synchronized (producerClosingLock) {
ensureNotClosed();
kafkaProducer.beginTransaction();
}
}
// todo 提交事务
@Override
public void commitTransaction() throws ProducerFencedException {
synchronized (producerClosingLock) {
// 确保producer没有关闭
ensureNotClosed();
// 提交事务
kafkaProducer.commitTransaction();
}
}
// todo 终止事务
@Override
public void abortTransaction() throws ProducerFencedException {
synchronized (producerClosingLock) {
ensureNotClosed();
kafkaProducer.abortTransaction();
}
}
// todo 发送sffset到事务
@Override
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
synchronized (producerClosingLock) {
ensureNotClosed();
kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
}
}
// todo 发送数据
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return kafkaProducer.send(record);
}
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
return kafkaProducer.send(record, callback);
}
// todo 分发策略
@Override
public List<PartitionInfo> partitionsFor(String topic) {
synchronized (producerClosingLock) {
ensureNotClosed();
return kafkaProducer.partitionsFor(topic);
}
}
@Override
public Map<MetricName, ? extends Metric> metrics() {
return kafkaProducer.metrics();
}
@Override
public void close() {
closed = true;
synchronized (producerClosingLock) {
kafkaProducer.close();
}
}
FlinkKafkaProducer011
FlinkKafkaProducer011实现了TwoPhaseCommitSinkFunction,实现了两阶段提交主要逻辑
public class FlinkKafkaProducer011<IN>
extends TwoPhaseCommitSinkFunction<IN, FlinkKafkaProducer011.KafkaTransactionState, FlinkKafkaProducer011.KafkaTransactionContext> {
FlinkKafkaProducer011实现了1TwoPhaseCommitSinkFunction。实现了两阶段提交的主要逻辑
beginTransaction:开启一个事务,在临时目录下创建一个临时文件,之后,写入数据到该文件中
preCommit:在pre-commit阶段,flush缓存数据块到磁盘,然后关闭该文件,确保再不写入新数据到该文件。同时开启一个新事务执行属于下一个checkpoint的写入操作
commit:在commit阶段,我们以原子性的方式将上一阶段的文件写入真正的文件目录下。注意:这会增加输出数据可见性的延时。通俗说就是用户想要看到最终数据需要等会,不是实时的。
abort:一旦终止事务,我们离自己删除临时文件
//todo 开始事务
//todo 开始事务
@Override
protected KafkaTransactionState beginTransaction() throws FlinkKafka011Exception {
switch (semantic) { // todo 匹配语义
case EXACTLY_ONCE:
// 创建一个支持事务的 生产者 , -- 类型 FlinkKafkaProducer
// todo 每个新检查点都会创建一个FlinkKafkaProducer,这样不会与之前的事务出现冲突
FlinkKafkaProducer<byte[], byte[]> producer = createTransactionalProducer();
// 开启事务 -- 进入 FlinkKafkaProducer的beginTransaction方法
producer.beginTransaction();
// 创建一个 kafka事务状态(包含 事务id,生产者实例)
return new KafkaTransactionState(producer.getTransactionalId(), producer);
case AT_LEAST_ONCE:
case NONE:
// Do not create new producer on each beginTransaction() if it is not necessary
// 获取当前的事务
final KafkaTransactionState currentTransaction = currentTransaction();
// 如果当前有事务,返回当前事务对应的kafka producer
if (currentTransaction != null && currentTransaction.producer != null) {
return new KafkaTransactionState(currentTransaction.producer);
}
// initNonTransactionalProducer 会初始化一个非事务的producer
// 返回一个 非事务的 producer
return new KafkaTransactionState(initNonTransactionalProducer(true));
default:
throw new UnsupportedOperationException("Not implemented semantic");
}
}
// todo 预提交
@Override
protected void preCommit(KafkaTransactionState transaction) throws FlinkKafka011Exception {
switch (semantic) {
//该语义需要 flush
case EXACTLY_ONCE:
case AT_LEAST_ONCE:
flush(transaction);
break;
// 啥也不做
case NONE:
break;
default:
// 报错
throw new UnsupportedOperationException("Not implemented semantic");
}
// 检查检查
checkErroneous();
}
//提交事务
@Override
protected void commit(KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
try {
//调用flink-connector-kafka-0.11项目下的FlinkKafkaProducer的commitTransaction方法
transaction.producer.commitTransaction();
} finally {
//如果失败 回收producer的transactionId 加入到(availableTransactionalIds),关闭producer
recycleTransactionalProducer(transaction.producer);
}
}
}
// todo 用于从transactionalId恢复出对应的kafka producer,然后在提交任务
@Override
protected void recoverAndCommit(KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
try (
// 尝试根据已有的transactionId重新建立producer,然后提交任务
FlinkKafkaProducer<byte[], byte[]> producer =
initTransactionalProducer(transaction.transactionalId, false)) {
// 恢复事务
producer.resumeTransaction(transaction.producerId, transaction.epoch);
// 提交事务
producer.commitTransaction();
} catch (InvalidTxnStateException | ProducerFencedException ex) {
// That means we have committed this transaction before.
LOG.warn("Encountered error {} while recovering transaction {}. " +
"Presumably this transaction has been already committed before",
ex,
transaction);
}
}
}
// 终止事务
@Override
protected void abort(KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
// 终止事务的提交
transaction.producer.abortTransaction();
// 回收transactionId,关闭producer
recycleTransactionalProducer(transaction.producer);
}
}
如有错误 欢迎指正