Flink源码分析系列文档目录
请点击:Flink 源码分析系列文档目录
两阶段提交协议
两阶段提交协议针对Flink的Sink。要求下游的系统支持事务,或者是幂等性。两阶段提交是指如下两个阶段:
- preCommit: 预提交。在Sink进行snapshot操作的时候调用此方法。
- commit: 真正的提交操作。当系统中各个operator的checkpoint操作都成功之后,JobManager会通知各个operator checkpoint操作已完成。此时会调用该方法。
TwoPhaseCommitSinkFunction
该类是实现两阶段提交Sink的父类,封装了两阶段提交的主要逻辑。
initializeState方法。该方法在CheckpointedFunction接口中定义,在集群中执行的时候调用,用于初始化状态后端。
该方法主要有以下逻辑:
- 获取状态存储变量state。
- 提交所有已经执行过preCommit的事务。
- 终止所有尚未preCommit的事务。
- 创建一个新事务。
代码如下:
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// when we are restoring state with pendingCommitTransactions, we don't really know whether the
// transactions were already committed, or whether there was a failure between
// completing the checkpoint on the master, and notifying the writer here.
// (the common case is actually that is was already committed, the window
// between the commit on the master and the notification here is very small)
// it is possible to not have any transactions at all if there was a failure before
// the first completed checkpoint, or in case of a scale-out event, where some of the
// new task do not have and transactions assigned to check)
// we can have more than one transaction to check in case of a scale-in event, or
// for the reasons discussed in the 'notifyCheckpointComplete()' method.
// 获取状态存储
state = context.getOperatorStateStore().getListState(stateDescriptor);
boolean recoveredUserContext = false;
// 从上一个snapshot恢复完成的时候返回true,如果任务不支持snapshot,永远返回false
if (context.isRestored()) {
LOG.info("{} - restoring state", name());
for (State<TXN, CONTEXT> operatorState : state.get()) {
userContext = operatorState.getContext();
// 获取待提交的事务
// 在snapshotState方法调用preCommit之后,事务会被存储到该列表
List<TransactionHolder<TXN>> recoveredTransactions = operatorState.getPendingCommitTransactions();
List<TXN> handledTransactions = new ArrayList<>(recoveredTransactions.size() + 1);
for (TransactionHolder<TXN> recoveredTransaction : recoveredTransactions) {
// If this fails to succeed eventually, there is actually data loss
// 恢复并提交这些之前在state中保存的事务
recoverAndCommitInternal(recoveredTransaction);
handledTransactions.add(recoveredTransaction.handle);
LOG.info("{} committed recovered transaction {}", name(), recoveredTransaction);
}
{
// 获取到尚未preCommit的事务
TXN transaction = operatorState.getPendingTransaction().handle;
// 恢复并终止该事务
recoverAndAbort(transaction);
handledTransactions.add(transaction);
LOG.info("{} aborted recovered transaction {}", name(), operatorState.getPendingTransaction());
}
if (userContext.isPresent()) {
finishRecoveringContext(handledTransactions);
recoveredUserContext = true;
}
}
}
// if in restore we didn't get any userContext or we are initializing from scratch
if (!recoveredUserContext) {
LOG.info("{} - no state to restore", name());
userContext = initializeUserContext();
}
this.pendingCommitTransactions.clear();
// 创建一个新的事务
currentTransactionHolder = beginTransactionInternal();
LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);
}
preCommit的调用时机:Sink的snapshotState方法。该方法在Sink保存快照的时候调用。
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// this is like the pre-commit of a 2-phase-commit transaction
// we are ready to commit and remember the transaction
// 检查确保进行snapshot的时候必须存在事务
checkState(currentTransactionHolder != null, "bug: no transaction object when performing state snapshot");
long checkpointId = context.getCheckpointId();
LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'", name(), context.getCheckpointId(), currentTransactionHolder);
// 调用preCommit方法
preCommit(currentTransactionHolder.handle);
// 在未提交事务列表(pendingCommitTransactions)中记录该事务
pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);
// 开启新的事务
currentTransactionHolder = beginTransactionInternal();
LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);
// 清空state,然后记录当前事务和待提交事务
state.clear();
state.add(new State<>(
this.currentTransactionHolder,
new ArrayList<>(pendingCommitTransactions.values()),
userContext));
}
commit方法的调用时机。notifyCheckpointComplete方法,当所有的operator都checkpoint成功的时候,JobManager会通知各个operator checkpoint过程已完成。此时会调用该方法。
@Override
public final void notifyCheckpointComplete(long checkpointId) throws Exception {
// the following scenarios are possible here
//
// (1) there is exactly one transaction from the latest checkpoint that
// was triggered and completed. That should be the common case.
// Simply commit that transaction in that case.
//
// (2) there are multiple pending transactions because one previous
// checkpoint was skipped. That is a rare case, but can happen
// for example when:
//
// - the master cannot persist the metadata of the last
// checkpoint (temporary outage in the storage system) but
// could persist a successive checkpoint (the one notified here)
//
// - other tasks could not persist their status during
// the previous checkpoint, but did not trigger a failure because they
// could hold onto their state and could successfully persist it in
// a successive checkpoint (the one notified here)
//
// In both cases, the prior checkpoint never reach a committed state, but
// this checkpoint is always expected to subsume the prior one and cover all
// changes since the last successful one. As a consequence, we need to commit
// all pending transactions.
//
// (3) Multiple transactions are pending, but the checkpoint complete notification
// relates not to the latest. That is possible, because notification messages
// can be delayed (in an extreme case till arrive after a succeeding checkpoint
// was triggered) and because there can be concurrent overlapping checkpoints
// (a new one is started before the previous fully finished).
//
// ==> There should never be a case where we have no pending transaction here
//
// 获取所有待提交的事务
Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending");
Throwable firstError = null;
while (pendingTransactionIterator.hasNext()) {
Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
Long pendingTransactionCheckpointId = entry.getKey();
TransactionHolder<TXN> pendingTransaction = entry.getValue();
// 只提交在checkpointId之前的事务
if (pendingTransactionCheckpointId > checkpointId) {
continue;
}
LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",
name(), checkpointId, pendingTransaction, pendingTransactionCheckpointId);
logWarningIfTimeoutAlmostReached(pendingTransaction);
try {
// 逐个提交之前preCommit过的事务
commit(pendingTransaction.handle);
} catch (Throwable t) {
if (firstError == null) {
firstError = t;
}
}
LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);
// 将提交过的事务从待提交事务列表中清除
pendingTransactionIterator.remove();
}
if (firstError != null) {
throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",
firstError);
}
}
FlinkKafkaInternalProducer
FlinkKafkaInternalProducer为Flink对Kafka Producer的一个封装。
其中引入了producerClosingLock
变量,用于对事务提交,回滚和关闭producer等操作加锁。在kafka 2.3.0之前有一个bug,关闭producer的线程和提交/终止事务的线程会发生死锁。在FlinkKafkaInternalProducer对这些操作手工加锁,避免了此类问题。
FlinkKafkaInternalProducer还持有一个transactionId。创建的时候会从ProducerConfig
配置中获取。
主要代码如下所示:
@Override
public void beginTransaction() throws ProducerFencedException {
synchronized (producerClosingLock) {
ensureNotClosed();
kafkaProducer.beginTransaction();
}
}
@Override
public void commitTransaction() throws ProducerFencedException {
synchronized (producerClosingLock) {
ensureNotClosed();
kafkaProducer.commitTransaction();
}
}
@Override
public void abortTransaction() throws ProducerFencedException {
synchronized (producerClosingLock) {
ensureNotClosed();
kafkaProducer.abortTransaction();
}
}
@Override
public void close() {
closed = true;
synchronized (producerClosingLock) {
kafkaProducer.close();
}
}
调用事务的每个方法前先加锁(包括close方法)。防止上述的死锁情况发生。
FlinkKafkaProducer
FlinkKafkaProducer
实现了TwoPhaseCommitSinkFunction
。实现了两阶段提交的主要逻辑。
beginTransaction 方法。该方法会创建一个新的事务。
protected FlinkKafkaProducer.KafkaTransactionState beginTransaction() throws FlinkKafkaException {
switch (semantic) {
case EXACTLY_ONCE:
// 获取一个支持事务的kafka producer,类型为FlinkKafkaInternalProducer
FlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer();
// 开启kafka producer的事务
producer.beginTransaction();
// 返回事务的状态,包含有transaction Id
return new FlinkKafkaProducer.KafkaTransactionState(producer.getTransactionalId(), producer);
case AT_LEAST_ONCE:
case NONE:
// Do not create new producer on each beginTransaction() if it is not necessary
// 获取当前的transaction
final FlinkKafkaProducer.KafkaTransactionState currentTransaction = currentTransaction();
// 如果当前有事务,返回当前事务对应的kafka producer
if (currentTransaction != null && currentTransaction.producer != null) {
return new FlinkKafkaProducer.KafkaTransactionState(currentTransaction.producer);
}
// 否则直接返回不支持事务的kafka producer
return new FlinkKafkaProducer.KafkaTransactionState(initNonTransactionalProducer(true));
default:
throw new UnsupportedOperationException("Not implemented semantic");
}
}
preCommit方法如下所示:
@Override
protected void preCommit(FlinkKafkaProducer.KafkaTransactionState transaction) throws FlinkKafkaException {
switch (semantic) {
case EXACTLY_ONCE:
case AT_LEAST_ONCE:
// EXACTLY_ONCE和AT_LEAST_ONCE需要flush
flush(transaction);
break;
// NONE的话不进行任何操作
case NONE:
break;
default:
throw new UnsupportedOperationException("Not implemented semantic");
}
checkErroneous();
}
preCommit方法会调用kafka producer的flush方法,确保producer缓冲区的消息都已经发送至kafka broker。
flush方法的源码如下:
private void flush(FlinkKafkaProducer.KafkaTransactionState transaction) throws FlinkKafkaException {
// 调用kafka producer的flush方法,清空发送队列,flush过程中会阻塞
if (transaction.producer != null) {
transaction.producer.flush();
}
// 获取待发送记录数量。flush过后得发送消息条数应为0,如果不为0,抛出异常
long pendingRecordsCount = pendingRecords.get();
if (pendingRecordsCount != 0) {
throw new IllegalStateException("Pending record count must be zero at this point: " + pendingRecordsCount);
}
// if the flushed requests has errors, we should propagate it also and fail the checkpoint
checkErroneous();
}
以下是commit方法。commit方法中又调用了kafka producer的commitTransaction方法。然后回收循环使用transactionId,关闭kafka producer。
@Override
protected void commit(FlinkKafkaProducer.KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
try {
// 提交任务
transaction.producer.commitTransaction();
} finally {
// 循环使用producer的transactionId(加入到availableTransactionalIds),并且关闭producer
recycleTransactionalProducer(transaction.producer);
}
}
}
recoverAndCommit方法。该方法用于从transactionalId恢复出对应的kafka producer,然后在提交任务。
@Override
protected void recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
try (
// 尝试根据已有的transactionId重新建立producer,然后提交任务
FlinkKafkaInternalProducer<byte[], byte[]> producer =
initTransactionalProducer(transaction.transactionalId, false)) {
producer.resumeTransaction(transaction.producerId, transaction.epoch);
producer.commitTransaction();
} catch (InvalidTxnStateException | ProducerFencedException ex) {
// That means we have committed this transaction before.
LOG.warn("Encountered error {} while recovering transaction {}. " +
"Presumably this transaction has been already committed before",
ex,
transaction);
}
}
}
abort方法。用于终止事务,会放弃事务的提交,回收transactionId并关闭producer。
@Override
protected void abort(FlinkKafkaProducer.KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
// 终止transaction
transaction.producer.abortTransaction();
// 回收transactionId,关闭producer
recycleTransactionalProducer(transaction.producer);
}
}
附录
SinkFunction
SinkFunction是Flink所有数据落地端逻辑必须要实现的接口。
SinkFunction的接口的代码内容如下所示:
@Public
public interface SinkFunction<IN> extends Function, Serializable {
/** @deprecated Use {@link #invoke(Object, Context)}. */
@Deprecated
default void invoke(IN value) throws Exception {}
/**
* Writes the given value to the sink. This function is called for every record.
*
* <p>You have to override this method when implementing a {@code SinkFunction}, this is a
* {@code default} method for backward compatibility with the old-style method only.
*
* @param value The input record.
* @param context Additional context about the input record.
* @throws Exception This method may throw exceptions. Throwing an exception will cause the
* operation to fail and may trigger recovery.
*/
default void invoke(IN value, Context context) throws Exception {
invoke(value);
}
/**
* Context that {@link SinkFunction SinkFunctions } can use for getting additional data about an
* input record.
*
* <p>The context is only valid for the duration of a {@link SinkFunction#invoke(Object,
* Context)} call. Do not store the context and use afterwards!
*/
@Public // Interface might be extended in the future with additional methods.
interface Context {
/** Returns the current processing time. */
long currentProcessingTime();
/** Returns the current event-time watermark. */
long currentWatermark();
/**
* Returns the timestamp of the current input record or {@code null} if the element does not
* have an assigned timestamp.
*/
Long timestamp();
}
}
SinkFunction的代码并不是很复杂,只包含一个方法invoke(IN value, Context context)(另一个版本的invoke方法已经被废弃,这里不再介绍)。这个方法的第一个参数为数据流中到达sink算子的元素,第二个参数为context对象。这个context对象包装了元素的附带信息,如下所示:
- currentProcessingTime:返回当前的处理时间。
- currentWatermark:返回当前watermark的时间戳。
- timestamp:返回当前元素附带的timestamp(通过timestamp extractor提取或者是通过数据源指定)。如果没有指定为元素指定timestamp,返回null。
invoke是实现sink逻辑的关键。对于任何数据落地(从Flink中输出)的逻辑,我们只需要实现SinkFunction接口,将数据落地逻辑编写在invoke方法中。
RichSinkFunction
Flink中所有的RichFunction都是普通function的加强版。RichFunction除了支持编写自定义的启动和停止逻辑外,还支持在方法内部获取RuntimeContext。对于RichSinkFunction也不例外。
PrintSinkFunction
下面我们分析一个Flink官方最简单的Flink sink的实现PrintSinkFunction。这个sink的作用为将数据流中的元素逐个打印出来。
接下来我们分析下PrintSinkFunction中的重要部分。
private final PrintSinkOutputWriter<IN> writer;
/** Instantiates a print sink function that prints to standard out. */
public PrintSinkFunction() {
writer = new PrintSinkOutputWriter<>(false);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
}
@Override
public void invoke(IN record) {
writer.write(record);
}
从上面代码中可知PrintSinkFunction持有一个PrintSinkOutputWriter对象,用于将元素打印到标准输出或者是标准错误。在open方法中将这个对象初始化。invoke方法将数据流中的元素写入writer。
我们简单看下PrintSinkOutputWriter的代码,相关分析在注释中标明。
// 布尔值stdErr决定了输出到标准输出还是标准错误
public PrintSinkOutputWriter(final boolean stdErr) {
this("", stdErr);
}
public void open(int subtaskIndex, int numParallelSubtasks) {
// get the target stream
// 设置输出流
stream = target == STD_OUT ? System.out : System.err;
completedPrefix = sinkIdentifier;
// 如果并行度大于1,输出加上completedPrefix和subTask索引
if (numParallelSubtasks > 1) {
if (!completedPrefix.isEmpty()) {
completedPrefix += ":";
}
completedPrefix += (subtaskIndex + 1);
}
if (!completedPrefix.isEmpty()) {
completedPrefix += "> ";
}
}
public void write(IN record) {
// 打印到输出流
stream.println(completedPrefix + record.toString());
}