目录
一、一致性定义
Flink通过插入barrier将流分为逻辑上的批,用来保存状态。因此一个checkpointid可以理解为一批数据的状态改变。当checkpoint 被global commited之后,标志着完成了该快照。当发生failover时,会从最近的快照开始恢复,因此处于global commited状态的数据不会被重算,可以认为是一致的。由于Flink采用异步快照机制,当完成local commit的checkpoint处于pending状态,此时若发生重算,则需要丢弃pending checkpoint的数据。若收到完成全局快照的notify,则状态变成global commited。
处于global commited的数据可以认为是一致的,因为不会发生重算,产生不同的side effect。
处于pending checkpoint的数据,不能向外部直接提交,因为重算时可能会有不同的结果。
针对重算的pending checkpoint是否相同,Flink目前实现端到端一致性有两种方式,分别是Idempotent Sink和Transaction Sink。
对于幂等的业务逻辑,在replay时借助upsert可以达到最终一致,在replay时产生的key的集合可以完全覆盖之前的数据,从达到状态最终一致。
对于不确定性逻辑,比如由于窗口、流交汇顺序等原因等情况导致pending checkpoint数据重算时不一致(重算时key不能完全覆盖之前pending时产生的key set,或者有增加等非幂等操作),只能输出global commited的数据,pending checkpoint则暂时缓存。
对于pending checkpoint数据的缓存方式,flink目前有两种方式
采用wal日志保存pending checkpoint,等待pending checkpoint变成global commited之后向外部输出。
直接向外部系统输出,需要外部系统支持事务(FlinkKafkaproducer011),或者可以模拟事务提交(HDFS的原子命名)
二、Idempotent Sink
若数据有一下两条属性,可以直接向外部系统输出,重算时,利用幂等性达到最终一致性
三、Transactional Sink
3.1 实现原理
为了保证数据不会丢失,Transactional Sink 需要缓存pending checkpoints,只有处于global commit的Sink可以对下游系统可见。
因为可能在收到Job manager发送的之前notify就发生crash,或者收到notify在执行commit之前就crash。为了保证数据不丢失,在restore之后,需要重新提交pengding checkpoint,同时abort尚未完成快照的ck4 。
虽然ck3在进行快照时ck2、ck3都尚处于pending状态,但如果fail recover时载入的ck3的状态,其pending checkpoints事实上已经是处于global commit状态。需要确保一定能成功提交,负责可能会造成数据丢失,因此在初始化调用initializeState ()方法时,需要重新提交pending checkpoints。然而若pending checkpoints已经在notify时提交成功了,就会造成了对checkpint的重复提交,针对这一问题Flink给出了三种解决方案
GenericWriteAheadSink:采用了外部存储去记录事实上执行commit成功的checkpointID,避免了重复提交.
TwopahseCommitSinkFunction 依赖于事务commit的幂等保证不会重复commit。
BucketingFileSink 利用原子命名实现对global commit状态的checkpointID提交。
3.2 write ahead log
3.2.1 实现
抽象类GenericWriteAheadSink实现了缓存数据。在收到上游的消息时,会将消息存储在state中(保存于taskowned目录下),收到全局一致快照完成的notify后,调用sendValues(Iterable<IN> values, long checkpointId, long timestamp)方法向下游系统发送global commit的消息。
3.1中提到在restrore时会提交pending checkpoints,若之前调用过notifycheckpointComplete方法,就会导致重复提交,GenericWriteAheadSink使用CheckpointCommitter记录实际提交成功的checkpointid,将commit成功的checkpointid信息存储于独立的state的外存,这样保证在作业重启时,依旧可以知道已经commit的checkpoints,在提交pending checkpoints时,会剔除已经提交成功的checkpoints。
//GenericWriteAheadSink在notifyCheckpointComplete时提交pendingcheckpoints,同时更新CheckpointCommitter中已提交的checkpointID信息
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
synchronized (pendingCheckpoints) {
Iterator<PendingCheckpoint> pendingCheckpointIt = pendingCheckpoints.iterator();
while (pendingCheckpointIt.hasNext()) {
PendingCheckpoint pendingCheckpoint = pendingCheckpointIt.next();
long pastCheckpointId = pendingCheckpoint.checkpointId;
int subtaskId = pendingCheckpoint.subtaskId;
long timestamp = pendingCheckpoint.timestamp;
StreamStateHandle streamHandle = pendingCheckpoint.stateHandle;
if (pastCheckpointId <= checkpointId) {
try {
if (!committer.isCheckpointCommitted(subtaskId, pastCheckpointId)) {
try (FSDataInputStream in = streamHandle.openInputStream()) {
//调用sendValues发送wal中的records
boolean success = sendValues(
new ReusingMutableToRegularIteratorWrapper<>(
new InputViewIterator<>(
new DataInputViewStreamWrapper(
in),
serializer),
serializer),
pastCheckpointId,
timestamp);
if (success) {
// in case the checkpoint was successfully committed,
// discard its state from the backend and mark it for removal
// in case it failed, we retry on the next checkpoint
//sendValues成功,则commiter记录成功的CheckpointId
committer.commitCheckpoint(subtaskId, pastCheckpointId);
streamHandle.discardState();
pendingCheckpointIt.remove();
}
}
} else {
streamHandle.discardState();
pendingCheckpointIt.remove();
}
} catch (Exception e) {
// we have to break here to prevent a new (later) checkpoint
// from being committed before this one
LOG.error("Could not commit checkpoint.", e);
break;
}
}
}
}
}
在作业重启恢复时,会在open方法里调用cleanRestoredHandles(),借助CheckpointCommitter剔除已经提交的checkpointid。避免了重复提交。
/**
* Called at {@link #open()} to clean-up the pending handle list.
* It iterates over all restored pending handles, checks which ones are already
* committed to the outside storage system and removes them from the list.
*/
private void cleanRestoredHandles() throws Exception {
synchronized (pendingCheckpoints) {
Iterator<PendingCheckpoint> pendingCheckpointIt = pendingCheckpoints.iterator();
while (pendingCheckpointIt.hasNext()) {
PendingCheckpoint pendingCheckpoint = pendingCheckpointIt.next();
if (committer.isCheckpointCommitted(pendingCheckpoint.subtaskId, pendingCheckpoint.checkpointId)) {
pendingCheckpoint.stateHandle.discardState();
pendingCheckpointIt.remove();
}
}
}
}
Cassandra的connector CassandraRowWriteAheadSink、CassandraTupleWriteAheadSink继承了GenericWriteAheadSink,实现了对exactly once 写的支持。
CassandraCommitter继承了CheckpointCommitter,在Cassandra中保存commitedCheckpointID信息。
3.2.2 优缺点
write ahead log不能实现真正的exactly once 传输,下列两种情况会导致数据重复
sendValues()在向外部commit数据时crash,若外部系统不支持原子写,则恢复后replay会造成数据重复。
sendValues()方法成功完成,但程序在CheckpointCommitter在被调用前或者CheckpointCommitter commit checkpointid失败,Sink在recovery时会重复提交。
要避免重复写,需要外部系统支持upsert操作的key-value存储或关系型数据库,后续的commit会覆盖之前commit失败的数据,达到最终一致性。与幂等性Sink不同的是,WAL sink向外部系统提交的是处于global commit的数据,消除了不确定性带来的影响。
性能的影响,占用内存buffer数据,sendValues()时阻塞Sink,降低影响吞吐。
3.3 TwopahseCommitSinkFunction
3.3.1 实现
二段提交是阻塞式的协议,参与者在完成pre commit后,需要阻塞等待其他参与者完成pre commit,直到收到协调者发送commit后,才能进行提交。Flink通过将pre commit 完成的事务挂起(pending transaction),减少了二段提交带来的阻塞。Transactional Sink 的实现都是基于二段提交,TwopahseCommitSinkFunction是适用于下游系统支持事务的抽象类。可以将消息直接输出到下游系统,但只有commit后才能对下游系统可见。
WAL将pending checkpoints保存在Flink的state中,由于Flink的采用了local state(存储计算结合),尚未global commited的checkpoints状态会在故障恢复后丢弃,pending checkpoint因此不需要做任何abort操作,随着状态重启会自行丢弃。
但是对于TwopahseCommitSinkFunction则不同,消息会在pre commit时写入外部系统。对于使用TXNID标识事务的若未提交的事务会对事务造成影响,则需要在初始化abort之前的事务。
实现TwopahseCommitSinkFunction,需要编写泛型IN,TXN,CONTEXT.
IN:指定输入的类型
TXN是事务的标识符,可以由TXN唯一指定一个事务,在重启时可以利用TXN恢复fail之前尚处于pending状态的事务,
CONTEXT可选项,定义userContext,确保只会在subtask第一次调用被初始化。在FlinkKafkaProducer011中用来记录subtask被分配的txid,一旦txid被指定就不会在发生改变,用来abort尚处于pending状态的事务(详见4.2)。
默认需要重写的方法:
/**
* This method must be the only place to call {@link #beginTransaction()} to ensure that the
* {@link TransactionHolder} is created at the same time.
*/
private TransactionHolder<TXN> beginTransactionInternal() throws Exception {
return new TransactionHolder<>(beginTransaction(), clock.millis());
}
/**
* Write value within a transaction.
*/
protected abstract void invoke(TXN transaction, IN value, Context context) throws Exception;
/**
* Pre commit previously created transaction. Pre commit must make all of the necessary steps to prepare the
* transaction for a commit that might happen in the future. After this point the transaction might still be
* aborted, but underlying implementation must ensure that commit calls on already pre committed transactions
* will always succeed.
*
* <p>Usually implementation involves flushing the data.
*/
protected abstract void preCommit(TXN transaction) throws Exception;
/**
* Commit a pre-committed transaction. If this method fail, Flink application will be
* restarted and {@link TwoPhaseCommitSinkFunction#recoverAndCommit(Object)} will be called again for the
* same transaction.
*/
protected abstract void commit(TXN transaction);
/**
* Abort a transaction.
*/
protected abstract void abort(TXN transaction);
上图是需要重写的5个方法在一个事务的生命周期中默认的执行顺序,
在snapshot ck3时,调用precommit方法,将ck3对应的消息flush进外部系统,确保事务处于可以commit的状态(预写数据完全写入外部存储),随后将ck3加入pending Transaction,并开启下一个事务ck4,最后执行ck3的快照.
在收到notify ck3的消息后,提交ck3
若发生crash,从ck3恢复,则利用TXN恢复ck3状态中处于pending的checkpoints ck2 ck3,并commit。随后恢复ck4,并abort ck4(若不abort ck4,则之前预写进外部系统的数据不能清除)
3.3.2 优缺点
要使用 TwopahseCommitSinkFunction对外部系统具有如下要求
外部系统必须提供事务支持或者能够可以Sink去模拟事务(BucketingSink的原子命名模拟保证了提交的原子性),在事务Committed之前不能对下游系统可见
在快照间隔内事务不能timeout,否则无法以事务的方式提交输出。
事务必须在收到job manager发送的global commited的消息后,才能commited。在fail recovery的时候,若恢复时间较长(载入大状态),若事务关闭(事务timeout),该数据会丢失。
在fail recovery后,事务需要支持恢复之前pending的事务,并进行提交。(一些外部系统能够使用transaction id去commit或者abort之前的事务)
事务的提交必须是幂等的,因为在恢复时,会重新提交一遍pendingtransaction,因此需要对同一个事务的commit是幂等的。
可以看到外部系统不但要支持事务,同时也要能根据事务id去恢复之前的事务。
好处是实现了将消息以批的方式原子写入外部系统,实现了append only系统的exactly once
3.4 BucketingFileSink
BuckettingFileSink是实现HDFS实现exacly once的Sink,通过使用原子命名的方式,将处于pending checkpoint移动到commited的路径下
[图片上传失败...(image-3f6f44-1566631735896)]
四、FlinkKafkaProducer011
FlinkKafkaProducer011继承了TwopahseCommitSinkFunction实现exacly once。但是Kafka存在以下三点问题,导致不能直接使用TwopahseCommitSinkFunction默认的实现。
4.1 Kafka不支持恢复事务
kafka producer原生的api方式恢复时,Flink中在利用反射在获取事务的pid和epoch,作为TXN存储在状态中,恢复时,也利用反射修改producer的pid和eopch。实现事务的恢复。
4.2 Kafka 处于open的事务会阻塞后续事务
受限于Kafka是append only的存储,若Topic之前有事务处于open状态,没有关闭,则后续的事务即使commit成功,也不能对下游系统可见。因此超时时间需要大于快照间隔,且若状态较大时,还需要考虑恢复时从HDFS载入远程状态的时间;设置过长的timeout又会导致Kafka消息会被replay之前尚未结束的事务阻塞。
故kafka在fail recover时除了abort currentTransaction,还需要abort所有可能处于open状态的事务.
发生fail时,从ck1的状态恢复,但是从实际上kafka中open的事务有ck2,ck3,ck4。
正常的系统等待之前的事务超时就可以,但是对于kafka,需要去abort之前仍然处于open 的事务。
为了解决这一问题,Kafka使用3.3.1中提到的userContext存储txid,即在subTask时就初始化了txid的pool,在fail recover时调用initializeState完成后,就会abort txnid pool中的事务ID,结束仍然处在open的事务。
4.3 producer不能复用问题
https://issues.apache.org/jira/browse/FLINK-8132 修复了因为复用producer导致的数据重复的bug,但带来的影响是每次快照都会新建一个producer,导致频繁的TCP创建。
bug简述
Faulty scenario with producer pool of 2.
1. started transaction 1 with producerA, written record 42
2. checkpoint 1 triggered, pre committing txn1, started txn2 with producerB, written record 43
3. checkpoint 1 completed, committing txn1, returning producerA to the pool
4. checkpoint 2 triggered , committing txn2, started txn3 with producerA, written record 44
5. crash....
6. recover to checkpoint 1, txn1 from producerA found to "pendingCommitTransactions", attempting to recoverAndCommit(txn1)
7. unfortunately txn1 and txn3 from the same producers are identical from KafkaBroker perspective and thus txn3 is being committed
result is that both records 42 and 44 are committed.
由于txn1和txn3复用producerA,导致从ck1恢复时提交txn1会误提交txn3
现有解决方案:
每次开启新的事务都会new producer,分配新的producer id和epoch,这样在initializeState ()时调用commit方法不会误提交后序的数据,导致脏写。
可以考虑的修改方向:
产生这个问题是由于使用kafka同一个txnid事务无法保证commit幂等性,故需要保证一个事务只能commit一次。
仿照Cassandra sink中CassandraCheckpointCommiter的实现方式,将commit成功的checkpoint写入kafka,因为kafka支持跨topic的事务,可以在一个topic中存储已经commit成功的checkpointID。由于在一个事务中,可以确保commit checkpoint和记录已提交checkpointID的元信息在一个原子操作中,避免了3.2.2所述checkpointcommiter失效的场景。
在上例中,ck1在收到notify时,将producerA执行commit操作,并将commitcheckpointid ck1记录在kafka topic
在从ck1恢复时,读取kafka topic可知 txn1被成功剔除,不会被二次commit。
值得注意的是,当且只有在commit成功后,proucerA才会被放入txn pool被ck2开启的事务txn3复用,所以不会发生ck1在notify前crash,需要commit txn1进而误提交txn3的情况。
五、总结
为了保证对外部系统的exacly once 传输,核心问题是实现pending checkpoints的原子性提交,官方目前对三个Sink实现了exacly once 传输。
HDFS rolling sink 依赖BuckettingFileSink实现,Cassandra sink继承GenericWriteAheadSink实现,Kafka producer继承TwopahseCommitSinkFunction。
5.1 现有方案的对比
5.2 通用解决方案
目前官方的每种解决方案都有其局限性。
对于下游系统不支持事务恢复,事务存在timeout的情况,引用3中指出一种方案,可以将precommit的pending checkoints之前commit到外部数据库,但是使用屏蔽位屏蔽使得其对外部系统不可见,直到global commit后才对外部系统可见,避免了Sink在fail后,之前事务无法resume的问题,对于失效的pendings checkpoints,则由外部系统实现垃圾回收。
参考资料:
- https://issues.apache.org/jira/browse/FLINK-8132 指出复用producer导致的数据重复commit问题
2.《steam processing with Apache flink》chapter 8. Reading from and Writing to External Systems
3. 端到端一致性,流系统Spark/Flink/Kafka/DataFlow对比总结(压箱宝具呕血之作)
4. Flink 状态管理:Carbone P, Ewen, Stephan, Fóra, Gyula, et al. State management in Apache Flink?[J]. Proceedings of the Vldb Endowment, 2017, 10(12):1718-1729.
5.https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
6.kafka事务实现 http://matt33.com/2018/11/04/kafka-transaction/