Flink-1.10 源码笔记 两阶段提交

Flink源码分析 -- 两阶段提交

先简单了解一下处理语义

关于消息的消费、处理语义可以分为三类:

1. at most once : 至多一次,表示一条消息不管后续处理成功与否只会被消费处理一次,那么就存在数据丢失可能

2. exactly once : 精确一次,表示一条消息从其消费到后续的处理成功,只会发生一次

3. at least once :至少一次,表示一条消息从消费到后续的处理成功,可能会发生多次

在我们程序处理中通常要求能够满足exectly once语义,保证数据的准确性,flink 通过checkpoint机制提供了Exactly-Once与At-Least-Once 两种不同的消费语义实现, 可以将程序处理的所有数据都保存在状态内部,当程序发生异常失败重启可以从最近一次成功checkpoint中恢复状态数据,通过checkpoint中barrier对齐机制来实现这两不同的语义,barrier对齐发生在一个处理节点需要接收上游不同处理节点的数据,由于不同的上游节点数据处理速度不一致,那么就会导致下游节点接收到 barrier的时间点也会不一致,这时候就需要使用barrier对齐机制:在同一checkpoint中,先到达的barrier是否需要等待其他处理节点barrier达到后在发送后续数据,barrier将数据流分为前后两个checkpoint(chk n,chk n+1)的概念,如果不等待那么就会导致chk n的阶段处理了chk n+1阶段的数据,但是在source端所记录的消费偏移量又一致,如果chk n成功之后,后续的任务处理失败,任务重启会消费chk n+1阶段数据,就会到致数据重复消息,如果barrier等待就不会出现这样情况,因此barrier需要对齐那么就是实现exectly once语义,否则实现的是at least once语义。由于状态是属于flink内部存储,所以flink 仅仅满足内部exectly once语义。

barrier对齐

两阶段提交(2PC)

在分布式系统中,可以使用两阶段提交来实现事务性从而保证数据的一致性,两阶段提交分为:预提交阶段与提交阶段,通常包含两个角色:协调者与执行者,协调者用于用于管理所有执行者的操作,执行者用于执行具体的提交操作,具体的操作流程:

如果在流程中部分预提交失败,那么协调者就会收到一条失败的反馈,则会发送一条rollback消息给所有执行者,执行回滚操作,保证数据一致性;但是如果在流程中,出现部分提交成功部分提交失败,那么就会造成数据的不一致,因此后面也提出了3PC或者通过其他补偿机制来保证数据最终一致性,

两阶段提交

Flink中的两阶段提交

2017年12月Apache Flink社区发布了1.4版本。该版本引入一个新的的功能:两阶段提交Sink,即TwoPhaseCommitSinkFunction。该SinkFunction提取并封装了两阶段提交协议中的公共逻辑,自此Flink搭配特定source和sink搭建精确一次处理语义( exactly-once semantics)应用成为了可能。作为一个抽象类TwoPhaseCommitSinkFunction提供了一个抽象层供用户自行实现特定方法来支持 exactly-once semantics。

在这之前Flink支持EXACTLY_ONCE仅是对于Flink应用内部来说,对外部系统(端到端)有比较强的限制

  • 外部系统写入支持幂等性

  • 外部系统支持以事务的方式写入

在Flink1.4之后引入了TwoPhaseCommitSinkFunction接口,并在kafka Producer的connector中实现了它,支持了外部Kafka Sink 的EXACTLY_ONCE语义

两阶段提交协议针对Flink的Sink。要求下游的系统支持事务,或者是幂等性。两阶段提交是指如下两个阶段:

  1. preCommit : 执行预提交, 在Sink进行snapshot操作的时候调用此方法。

  2. commit : 真正的提交操作。当系统中各个operator的checkpoint操作都成功之后,JobManager会通知各个operator checkpoint操作已完成。此时会调用该方法

    通过这两个阶段然后结合checkpoint 过程提供的hook,来实现两阶段提交过程

1. 一旦所有operator完成各自的pre-commit,它们会发起一个commit操作

2. 倘若有一个pre-commit失败,所有其他的pre-commit必须被终止,并且Flink会回滚到最近成功完成decheckpoint

3. 一旦pre-commit完成,必须要确保commit也要成功——operator和外部系统都需要对此进行保证。倘若commit失败(比如网络故障等),Flink应用就会崩溃,然后根据用户重启策略执行重启逻辑,之后再次重试commit。这个过程至关重要,因为倘若commit无法顺利执行,就可能出现数据丢失的情况

因此,所有opeartor必须对checkpoint最终结果达成共识:即所有operator都必须认定数据提交要么成功执行,要么被终止然后回滚。

当出现崩溃时,Flink会恢复最新已完成快照中应用状态。需要注意的是在某些极偶然的场景下,pre-commit阶段已成功完成而commit尚未开始(也就是operator尚未来得及被告知要开启commit),此时倘若发生崩溃Flink会将opeartor状态恢复到已完成pre-commit但尚未commit的状态。

在一个checkpoint状态中,对于已完成pre-commit的事务状态,我们必须保存足够多的信息,这样才能确保在重启后要么重新发起commit亦或是终止掉事务。本例中这部分信息就是临时文件所在的路径以及目标目录。

TwoPhaseCommitSinkFunction考虑了这种场景,因此当应用从checkpoint恢复之后TwoPhaseCommitSinkFunction总是会发起一个抢占式的commit。这种commit必须是幂等性的,虽然大部分情况下这都不是问题。本例中对应的这种场景就是:临时文件不在临时目录下,而是已经被移动到目标目录下。

Flink两阶段提交过程

提交过程中会出现两种情况 :
1.Pre-commit失败,将恢复到最近一次CheckPoint位置
2.一旦pre-commit完成,必须要确保commit也要成功

因此,所有opeartor必须对checkpoint最终结果达成共识:即所有operator都必须认定数据提交要么成功执行,要么被终止然后回滚

现在根据源码去刨析两阶段提交代码

TwoPhaseCommitSinkFunction

该类是实现两阶段提交Sink的父类,封装了两阶段提交的主要逻辑。

initializeState方法。该方法在CheckpointedFunction接口中定义,在集群中执行的时候调用,用于初始化状态后端。 该方法主要有以下逻辑:

  1. 获取状态存储变量state。

  2. 提交所有已经执行过preCommit的事务。

  3. 终止所有尚未preCommit的事务。

  4. 创建一个新事务。

TwoPhaseCommitSinkFunction类是一个抽象类,继承了RickSinkFunction,实现了CheckpointedFunction,和CheckpointListener接口

@PublicEvolving
public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>
 extends RichSinkFunction<IN>
 implements CheckpointedFunction, CheckpointListener

initializeState方法, 主要作用 :

状态初始化方法,只会被调用一次,第一件事情是用来恢复上次checkpoint完成预提交的事务与下一次checkpoint开始的事务,对于上次checkpoint完成预提交说明该checkpoint已经完成,那么执行commit操作,下一次checkpoint开始的事务说明该checkpoint,那么执行abort操作,第二件事情是开启一个新的事务,给新的checkpoint使用

@Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        // when we are restoring state with pendingCommitTransactions, we don't really know whether the
        // transactions were already committed, or whether there was a failure between
        // completing the checkpoint on the master, and notifying the writer here.

        // (the common case is actually that is was already committed, the window
        // between the commit on the master and the notification here is very small)

        // it is possible to not have any transactions at all if there was a failure before
        // the first completed checkpoint, or in case of a scale-out event, where some of the
        // new task do not have and transactions assigned to check)

        // we can have more than one transaction to check in case of a scale-in event, or
        // for the reasons discussed in the 'notifyCheckpointComplete()' method.

        //获取 operator的state
        state = context.getOperatorStateStore().getListState(stateDescriptor);

        // todo userContext 回收标志
        boolean recoveredUserContext = false;
        // 从上一个snapshot恢复完成的时候返回true,如果任务不支持snapshot,永远返回false
        // 如果状态是从前一次执行的快照恢复的,则返回true。对于无状态任务,这总是返回false
        // 指示是否找到要恢复的任何状态 -- true 需要恢复
        if (context.isRestored()) {
            LOG.info("{} - restoring state", name());
            //开始进行状态恢复
            for (State<TXN, CONTEXT> operatorState : state.get()) {
                // 获取上下文
                userContext = operatorState.getContext();
                // 获取待提交的事务
                // 在snapshotState方法调用preCommit之后,事务会被存储到该列表
                // 获取等待的 事务    -- get pre-Commit event
                List<TransactionHolder<TXN>> recoveredTransactions = operatorState.getPendingCommitTransactions();

                List<TXN> handledTransactions = new ArrayList<>(recoveredTransactions.size() + 1);

                //循环等待的事务, 提交事务
                for (TransactionHolder<TXN> recoveredTransaction : recoveredTransactions) {
                    // If this fails to succeed eventually, there is actually data loss
                    // 如果这种方法最终失败,实际上会导致数据丢失
                    // 恢复并提交这些之前在state中保存的事务,如果失败可能是打开事务时超时或者数据出现丢失
                    // todo 提交事务
                    recoverAndCommitInternal(recoveredTransaction);
                    handledTransactions.add(recoveredTransaction.handle);
                    LOG.info("{} committed recovered transaction {}", name(), recoveredTransaction);
                }

                {
                    // 获取到 未preCommit的事务
                    TXN transaction = operatorState.getPendingTransaction().handle;
                    // 恢复并终止该事务  -- 在失败后被协调器拒绝的事务中止   --调用 FlinkKafkaProduce的recoverAndAbort方法
                    recoverAndAbort(transaction);
                    handledTransactions.add(transaction);
                    LOG.info("{} aborted recovered transaction {}", name(), operatorState.getPendingTransaction());
                }

                if (userContext.isPresent()) {
                    //在恢复(每个)用户上下文后调用的子类的回调    -- 调用 FlinkKafkaProduce的finishRecoveringContext方法
                    // 回收 context
                    finishRecoveringContext(handledTransactions);
                    recoveredUserContext = true;
                }
            }
        }

        // if in restore we didn't get any userContext or we are initializing from scratch
        // 如果在恢复中,我们没有得到任何userContext或者我们正在从头开始初始化
        if (!recoveredUserContext) {
            LOG.info("{} - no state to restore", name());

            //清空 context
            userContext = initializeUserContext();
        }
        //清空 等待提交事务的列表
        this.pendingCommitTransactions.clear();

        // 开启一个新的事务
        currentTransactionHolder = beginTransactionInternal();
        LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);
    }

snapshotState方法与checkpoint同步周期性执行的方法,首先执行preCommit对本次checkpoint事务执行预提交操作,并且开启一个新的事务提供给下一次checkpoint使用,然后将这两个事务句柄存放在state中进行容错,preCommit提交的事务就是在失败后重启需要commit的事务,而新开启的事务就是在失败后重启需要放弃的事务;

@Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // this is like the pre-commit of a 2-phase-commit transaction
        // we are ready to commit and remember the transaction

        // 检查事务对象不能为空      --- 执行state snapshot时必须要有事务对象
        checkState(currentTransactionHolder != null, "bug: no transaction object when performing state snapshot");

        long checkpointId = context.getCheckpointId();
        LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'", name(), context.getCheckpointId(), currentTransactionHolder);

        // 调用preCommit方法    预提交  -- todo 最终会调用实现类的preCommit 如 FlinkKafkaProducer011
        preCommit(currentTransactionHolder.handle);
        // 在待提交事务列表(pendingCommitTransactions)中记录该事务
        // todo 将预提交事务 添加到待提交事务列表
        pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
        LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);

        // 开启新的事务
        currentTransactionHolder = beginTransactionInternal();
        LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);

        // 清空state,然后记录当前事务和待提交事务      --todo State is Pojo类 耦合 当前事务,待提交事务列表,上下文
        state.clear();
        state.add(new State<>(
            this.currentTransactionHolder,
            new ArrayList<>(pendingCommitTransactions.values()),
            userContext));
    }

notifyCheckpointComplete方法主要时checkpoint完成之后的回调方法,负责对预提交的事务执行commit操作。

    @Override
    public final void notifyCheckpointComplete(long checkpointId) throws Exception {
        // the following scenarios are possible here
        //
        //  (1) there is exactly one transaction from the latest checkpoint that
        //      was triggered and completed. That should be the common case.
        //      Simply commit that transaction in that case.
        //
        //  (2) there are multiple pending transactions because one previous
        //      checkpoint was skipped. That is a rare case, but can happen
        //      for example when:
        //
        //        - the master cannot persist the metadata of the last
        //          checkpoint (temporary outage in the storage system) but
        //          could persist a successive checkpoint (the one notified here)
        //
        //        - other tasks could not persist their status during
        //          the previous checkpoint, but did not trigger a failure because they
        //          could hold onto their state and could successfully persist it in
        //          a successive checkpoint (the one notified here)
        //
        //      In both cases, the prior checkpoint never reach a committed state, but
        //      this checkpoint is always expected to subsume the prior one and cover all
        //      changes since the last successful one. As a consequence, we need to commit
        //      all pending transactions.
        //
        //  (3) Multiple transactions are pending, but the checkpoint complete notification
        //      relates not to the latest. That is possible, because notification messages
        //      can be delayed (in an extreme case till arrive after a succeeding checkpoint
        //      was triggered) and because there can be concurrent overlapping checkpoints
        //      (a new one is started before the previous fully finished).
        //
        // ==> There should never be a case where we have no pending transaction here
        //

        // 获取所有待提交的事务            -- 使用LinkedHashMap存储事务, 存储有序
        Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
        Throwable firstError = null;

        while (pendingTransactionIterator.hasNext()) {
            Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
            Long pendingTransactionCheckpointId = entry.getKey();
            TransactionHolder<TXN> pendingTransaction = entry.getValue();
            // 只提交在checkpointId之前的事务
            if (pendingTransactionCheckpointId > checkpointId) {
                continue;
            }

            // {name} - checkpoint {checkpointId} complete, committing transaction {pendingTransaction} from checkpoint {pendingTransactionCheckpointId}
            LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",
                name(), checkpointId, pendingTransaction, pendingTransactionCheckpointId);

            // todo 接近超时事件进行警告
            logWarningIfTimeoutAlmostReached(pendingTransaction);
            try {
                // 逐个提交之前preCommit过的事务
                commit(pendingTransaction.handle);
            } catch (Throwable t) {
                if (firstError == null) {
                    firstError = t;
                }
            }

            LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);
            // 将提交过的事务从待提交事务列表中清除
            pendingTransactionIterator.remove();
        }


        if (firstError != null) {
            throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",
                firstError);
        }
    }

在提交事务的时候调用commit方法, 会进入FlinkKafkaProducer011类中

FlinkKafkaProducer011继承了 TwoPhaseCommitSinkFunction类

public class FlinkKafkaProducer011<IN>
 extends TwoPhaseCommitSinkFunction<IN, FlinkKafkaProducer011.KafkaTransactionState, FlinkKafkaProducer011.KafkaTransactionContext>

FlinkKafkaProducer011的commit方法

    @Override
    protected void commit(KafkaTransactionState transaction) {
        if (transaction.isTransactional()) {
            try {
                //调用flink-connector-kafka-0.11项目下的FlinkKafkaProducer的commitTransaction方法
                transaction.producer.commitTransaction();
            } finally {
                //如果失败 回收 producer
                recycleTransactionalProducer(transaction.producer);
            }
        }
    }

进入FlinkKafkaProducer的commitTransaction方法

    @Override
    public void commitTransaction() throws ProducerFencedException {
        synchronized (producerClosingLock) {
            // 确保producer没有关闭
            ensureNotClosed();
            // 提交事务
            kafkaProducer.commitTransaction();
        }
    }

FlinkKafkaProducer

现在分析一下FlinkKafkaProducer类,是Flink对Kafka Producer的一个封装,其中以后用了producerClosingLock变量,用于对事务提交,回滚和关闭producer等操作加锁,在kafka2.3.0之前存在一个bug,关闭producer的线程和提交/终止事务的线程会发生死锁.在FlinkKafkaProducer对这些操作添加锁,避免此类问题,

FlinkKafkaProducer还有一个transactionId。创建的时候会从ProducerConfig配置中获取。

    // This lock and closed flag are introduced to workaround KAFKA-6635. Because the bug is only fixed in
    // Kafka 2.3.0, we need this workaround in 0.11 producer to avoid deadlock between a transaction
    // committing / aborting thread and a producer closing thread.
    // todo 这个锁和关闭标志被引入到KAFKA-6635的工作环境中。因为这个错误只在Kafka 2.3.0中得到了修复,
    //  所以我们需要在0.11生成器中使用这个解决方案来避免事务提交/中止线程和生成器关闭线程之间的死锁。
    private final Object producerClosingLock;

在调用是事务前的操作方法中,都会添加锁,防止出现死锁的情况

    // todo 初始化事务
    @Override
    public void initTransactions() {
        synchronized (producerClosingLock) {
            ensureNotClosed();
            kafkaProducer.initTransactions();
        }
    }

    // todo 开始事务
    @Override
    public void beginTransaction() throws ProducerFencedException {
        synchronized (producerClosingLock) {
            ensureNotClosed();
            kafkaProducer.beginTransaction();
        }
    }

    // todo 提交事务
    @Override
    public void commitTransaction() throws ProducerFencedException {
        synchronized (producerClosingLock) {
            // 确保producer没有关闭
            ensureNotClosed();
            // 提交事务
            kafkaProducer.commitTransaction();
        }
    }

    // todo 终止事务
    @Override
    public void abortTransaction() throws ProducerFencedException {
        synchronized (producerClosingLock) {
            ensureNotClosed();
            kafkaProducer.abortTransaction();
        }
    }

    // todo 发送sffset到事务
    @Override
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
        synchronized (producerClosingLock) {
            ensureNotClosed();
            kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
        }
    }

    // todo 发送数据
    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        return kafkaProducer.send(record);
    }

    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        return kafkaProducer.send(record, callback);
    }
    
    // todo 分发策略
    @Override
    public List<PartitionInfo> partitionsFor(String topic) {
        synchronized (producerClosingLock) {
            ensureNotClosed();
            return kafkaProducer.partitionsFor(topic);
        }
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        return kafkaProducer.metrics();
    }

    @Override
    public void close() {
        closed = true;
        synchronized (producerClosingLock) {
            kafkaProducer.close();
        }
    }

FlinkKafkaProducer011

FlinkKafkaProducer011实现了TwoPhaseCommitSinkFunction,实现了两阶段提交主要逻辑

public class FlinkKafkaProducer011<IN>
 extends TwoPhaseCommitSinkFunction<IN, FlinkKafkaProducer011.KafkaTransactionState, FlinkKafkaProducer011.KafkaTransactionContext> {

FlinkKafkaProducer011实现了1TwoPhaseCommitSinkFunction。实现了两阶段提交的主要逻辑

  1. beginTransaction:开启一个事务,在临时目录下创建一个临时文件,之后,写入数据到该文件中

  2. preCommit:在pre-commit阶段,flush缓存数据块到磁盘,然后关闭该文件,确保再不写入新数据到该文件。同时开启一个新事务执行属于下一个checkpoint的写入操作

  3. commit:在commit阶段,我们以原子性的方式将上一阶段的文件写入真正的文件目录下。注意:这会增加输出数据可见性的延时。通俗说就是用户想要看到最终数据需要等会,不是实时的。

  4. abort:一旦终止事务,我们离自己删除临时文件

//todo 开始事务
//todo 开始事务
    @Override
    protected KafkaTransactionState beginTransaction() throws FlinkKafka011Exception {
        switch (semantic) { // todo 匹配语义
            case EXACTLY_ONCE:
                // 创建一个支持事务的 生产者 ,   -- 类型 FlinkKafkaProducer
                // todo 每个新检查点都会创建一个FlinkKafkaProducer,这样不会与之前的事务出现冲突
                FlinkKafkaProducer<byte[], byte[]> producer = createTransactionalProducer();
                // 开启事务   -- 进入 FlinkKafkaProducer的beginTransaction方法
                producer.beginTransaction();
                // 创建一个 kafka事务状态(包含 事务id,生产者实例)
                return new KafkaTransactionState(producer.getTransactionalId(), producer);
            case AT_LEAST_ONCE:
            case NONE:
                // Do not create new producer on each beginTransaction() if it is not necessary
                // 获取当前的事务
                final KafkaTransactionState currentTransaction = currentTransaction();
                // 如果当前有事务,返回当前事务对应的kafka producer
                if (currentTransaction != null && currentTransaction.producer != null) {
                    return new KafkaTransactionState(currentTransaction.producer);
                }
                // initNonTransactionalProducer 会初始化一个非事务的producer
                // 返回一个 非事务的 producer
                return new KafkaTransactionState(initNonTransactionalProducer(true));
            default:
                throw new UnsupportedOperationException("Not implemented semantic");
        }
    }

    // todo 预提交
    @Override
    protected void preCommit(KafkaTransactionState transaction) throws FlinkKafka011Exception {
        switch (semantic) {
            //该语义需要 flush
            case EXACTLY_ONCE:
            case AT_LEAST_ONCE:
                flush(transaction);
                break;
                // 啥也不做
            case NONE:
                break;
            default:
                // 报错
                throw new UnsupportedOperationException("Not implemented semantic");
        }
        // 检查检查
        checkErroneous();
    }

    //提交事务
    @Override
    protected void commit(KafkaTransactionState transaction) {
        if (transaction.isTransactional()) {
            try {
                //调用flink-connector-kafka-0.11项目下的FlinkKafkaProducer的commitTransaction方法
                transaction.producer.commitTransaction();
            } finally {
                //如果失败 回收producer的transactionId 加入到(availableTransactionalIds),关闭producer
                recycleTransactionalProducer(transaction.producer);
            }
        }
    }

    // todo 用于从transactionalId恢复出对应的kafka producer,然后在提交任务
    @Override
    protected void recoverAndCommit(KafkaTransactionState transaction) {
        if (transaction.isTransactional()) {
            try (
                // 尝试根据已有的transactionId重新建立producer,然后提交任务
                FlinkKafkaProducer<byte[], byte[]> producer =
                    initTransactionalProducer(transaction.transactionalId, false)) {
                // 恢复事务
                producer.resumeTransaction(transaction.producerId, transaction.epoch);
                // 提交事务
                producer.commitTransaction();
            } catch (InvalidTxnStateException | ProducerFencedException ex) {
                // That means we have committed this transaction before.
                LOG.warn("Encountered error {} while recovering transaction {}. " +
                        "Presumably this transaction has been already committed before",
                    ex,
                    transaction);
            }
        }
    }

    // 终止事务
    @Override
    protected void abort(KafkaTransactionState transaction) {
        if (transaction.isTransactional()) {
            // 终止事务的提交
            transaction.producer.abortTransaction();
            // 回收transactionId,关闭producer
            recycleTransactionalProducer(transaction.producer);
        }
    }

如有错误 欢迎指正

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
禁止转载,如需转载请通过简信或评论联系作者。

推荐阅读更多精彩内容