两阶段提交(2PC)与其在Flink exactly once中的应用

前言

简书快正式从小黑屋里出来了,所以是时候重启更新了。这段时间积攒了不少要写的东西,逐个击破吧。

两阶段提交(two-phase commit, 2PC)是最基础的分布式一致性协议,应用广泛。本文来介绍它的相关细节以及它在Flink中的典型应用场景。

2PC简介

先介绍两个前置概念。在分布式系统中,为了让每个节点都能够感知到其他节点的事务执行状况,需要引入一个中心节点来统一处理所有节点的执行逻辑,这个中心节点叫做协调者(coordinator),被中心节点调度的其他业务节点叫做参与者(participant)。

接下来正式介绍2PC。顾名思义,2PC将分布式事务分成了两个阶段,两个阶段分别为提交请求(投票)和提交(执行)。协调者根据参与者的响应来决定是否需要真正地执行事务,具体流程如下。

提交请求(投票)阶段

  1. 协调者向所有参与者发送prepare请求与事务内容,询问是否可以准备事务提交,并等待参与者的响应。
  2. 参与者执行事务中包含的操作,并记录undo日志(用于回滚)和redo日志(用于重放),但不真正提交。
  3. 参与者向协调者返回事务操作的执行结果,执行成功返回yes,否则返回no。

提交(执行)阶段

分为成功与失败两种情况。

  • 若所有参与者都返回yes,说明事务可以提交:
  1. 协调者向所有参与者发送commit请求。
  2. 参与者收到commit请求后,将事务真正地提交上去,并释放占用的事务资源,并向协调者返回ack。
  3. 协调者收到所有参与者的ack消息,事务成功完成。
  • 若有参与者返回no或者超时未返回,说明事务中断,需要回滚:
  1. 协调者向所有参与者发送rollback请求。
  2. 参与者收到rollback请求后,根据undo日志回滚到事务执行前的状态,释放占用的事务资源,并向协调者返回ack。
  3. 协调者收到所有参与者的ack消息,事务回滚完成。

下图分别示出这两种情况。

提交成功
提交失败

2PC的优缺点

2PC的优点在于原理非常简单,容易理解及实现。缺点主要有3个,列举如下:

  • 协调者存在单点问题。如果协调者挂了,整个2PC逻辑就彻底不能运行。
  • 执行过程是完全同步的。各参与者在等待其他参与者响应的过程中都处于阻塞状态,大并发下有性能问题。
  • 仍然存在不一致风险。如果由于网络异常等意外导致只有部分参与者收到了commit请求,就会造成部分参与者提交了事务而其他参与者未提交的情况。

不过,现在人们在分布式一致性领域做了很多工作,以ZooKeeper为代表的分布式协调框架也数不胜数,2PC有了这些的加持,可靠性大大提升了,也就能够真正用在要求高的生产环境中了。下面看看2PC与Flink是怎么扯上关系的。

Flink基于2PC的事务性写入

2PC的最常见应用场景其实是关系型数据库,比如MySQL InnoDB存储引擎的XA事务系统。但我对MySQL研究不是很深入,也就不多废话了,详情参见MySQL官方文档

Flink作为流式处理引擎,自然也提供了对exactly once语义的保证。在之前关于Spark Streaming exactly once的文章中曾经提到过,端到端的exactly once语义,是输入、处理逻辑、输出三部分协同作用的结果。Flink内部依托检查点机制和轻量级分布式快照算法ABS保证exactly once。而要实现精确一次的输出逻辑,则需要施加以下两种限制之一:幂等性写入(idempotent write)、事务性写入(transactional write)。

在Spark Streaming中,要实现事务性写入完全靠用户自己,框架本身并没有提供任何实现。但是在Flink中提供了基于2PC的SinkFunction,名为TwoPhaseCommitSinkFunction,帮助我们做了一些基础的工作。

TwoPhaseCommitSinkFunction的继承关系

Flink官方推荐所有需要保证exactly once的Sink逻辑都继承该抽象类。它定义了如下4个抽象方法,需要子类实现。

    protected abstract TXN beginTransaction() throws Exception;

    protected abstract void preCommit(TXN transaction) throws Exception;

    protected abstract void commit(TXN transaction);

    protected abstract void abort(TXN transaction);
  • beginTransaction():开始一个事务,返回事务信息的句柄。
  • preCommit():预提交(即提交请求)阶段的逻辑。
  • commit():正式提交阶段的逻辑。
  • abort():取消事务。

下面以Flink与Kafka的集成来说明2PC的具体流程。注意这里的Kafka版本必须是0.11及以上,因为只有0.11+的版本才支持幂等producer以及事务性,从而2PC才有存在的意义。Kafka内部事务性的机制如下框图所示。这就是另外一个大的话题了,本文不表。

开始事务

粗略看看FlinkKafkaProducer011类实现的beginTransaction()方法。

    @Override
    protected KafkaTransactionState beginTransaction() throws FlinkKafka011Exception {
        switch (semantic) {
            case EXACTLY_ONCE:
                FlinkKafkaProducer<byte[], byte[]> producer = createTransactionalProducer();
                producer.beginTransaction();
                return new KafkaTransactionState(producer.getTransactionalId(), producer);
            case AT_LEAST_ONCE:
            case NONE:
                // Do not create new producer on each beginTransaction() if it is not necessary
                final KafkaTransactionState currentTransaction = currentTransaction();
                if (currentTransaction != null && currentTransaction.producer != null) {
                    return new KafkaTransactionState(currentTransaction.producer);
                }
                return new KafkaTransactionState(initNonTransactionalProducer(true));
            default:
                throw new UnsupportedOperationException("Not implemented semantic");
        }
    }

可见,当要求exactly once语义时,会调用createTransactionalProducer()生成包含事务ID的producer。

预提交阶段

FlinkKafkaProducer011.preCommit()方法的实现很简单。其中的flush()方法实际上是代理了KafkaProducer.flush()方法。

    @Override
    protected void preCommit(KafkaTransactionState transaction) throws FlinkKafka011Exception {
        switch (semantic) {
            case EXACTLY_ONCE:
            case AT_LEAST_ONCE:
                flush(transaction);
                break;
            case NONE:
                break;
            default:
                throw new UnsupportedOperationException("Not implemented semantic");
        }
        checkErroneous();
    }

那么preCommit()方法是在哪里使用的呢?答案是TwoPhaseCommitSinkFunction.snapshotState()方法。从前面的类图可以得知,TwoPhaseCommitSinkFunction也继承了CheckpointedFunction接口,所以2PC是与检查点机制一同发挥作用的。

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // this is like the pre-commit of a 2-phase-commit transaction
        // we are ready to commit and remember the transaction

        checkState(currentTransactionHolder != null, "bug: no transaction object when performing state snapshot");

        long checkpointId = context.getCheckpointId();
        LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'", name(), context.getCheckpointId(), currentTransactionHolder);

        preCommit(currentTransactionHolder.handle);
        pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
        LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);

        currentTransactionHolder = beginTransactionInternal();
        LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);

        state.clear();
        state.add(new State<>(
            this.currentTransactionHolder,
            new ArrayList<>(pendingCommitTransactions.values()),
            userContext));
    }

结合Flink检查点的原理,可以用下图来形象地表示预提交阶段的流程。

图来自Flink官方博客:https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html

一旦开启了checkpoint功能,JobManager就在数据流中源源不断地打入屏障(barrier),作为检查点的界限。屏障随着算子链向下游传递,每到达一个算子都会触发将状态快照写入状态后端的动作。当屏障到达Kafka sink后,通过KafkaProducer.flush()方法刷写消息数据,但还未真正提交。接下来还是需要通过检查点来触发提交阶段。

提交阶段

FlinkKafkaProducer011.commit()方法实际上是代理了KafkaProducer.commitTransaction()方法,正式向Kafka提交事务。

    @Override
    protected void commit(KafkaTransactionState transaction) {
        if (transaction.isTransactional()) {
            try {
                transaction.producer.commitTransaction();
            } finally {
                recycleTransactionalProducer(transaction.producer);
            }
        }
    }

该方法的调用点位于TwoPhaseCommitSinkFunction.notifyCheckpointComplete()方法中。顾名思义,当所有检查点都成功完成之后,会回调这个方法。

    @Override
    public final void notifyCheckpointComplete(long checkpointId) throws Exception {
        Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
        checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending");
        Throwable firstError = null;

        while (pendingTransactionIterator.hasNext()) {
            Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
            Long pendingTransactionCheckpointId = entry.getKey();
            TransactionHolder<TXN> pendingTransaction = entry.getValue();
            if (pendingTransactionCheckpointId > checkpointId) {
                continue;
            }

            LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",
                name(), checkpointId, pendingTransaction, pendingTransactionCheckpointId);

            logWarningIfTimeoutAlmostReached(pendingTransaction);
            try {
                commit(pendingTransaction.handle);
            } catch (Throwable t) {
                if (firstError == null) {
                    firstError = t;
                }
            }

            LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);

            pendingTransactionIterator.remove();
        }

        if (firstError != null) {
            throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",
                firstError);
        }
    }

该方法每次从正在等待提交的事务句柄中取出一个,校验它的检查点ID,并调用commit()方法提交之。这阶段的流程可以用下图来表示。

可见,只有在所有检查点都成功完成这个前提下,写入才会成功。这符合前文所述2PC的流程,其中JobManager为协调者,各个算子为参与者(不过只有sink一个参与者会执行提交)。一旦有检查点失败,notifyCheckpointComplete()方法就不会执行。如果重试也不成功的话,最终会调用abort()方法回滚事务。

    @Override
    protected void abort(KafkaTransactionState transaction) {
        if (transaction.isTransactional()) {
            transaction.producer.abortTransaction();
            recycleTransactionalProducer(transaction.producer);
        }
    }

The End

晚安(好像有点早啊喂

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,634评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,951评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,427评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,770评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,835评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,799评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,768评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,544评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,979评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,271评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,427评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,121评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,756评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,375评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,579评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,410评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,315评论 2 352

推荐阅读更多精彩内容