Flink-1.10 源码笔记 分布式快照

Flink源码- 分布式快照

Flink 分布式快照核心概念之一, Barrier会被注入数据流,作为数据流得一部分向下流动,barrier用于不会超过其他数据,也不会被其他数据超过,数据流严格有序,barrier自带快照的id,用来标识它所有的快照,而基于barrier之后的数据导致的所有更改,就会包含在之后的检查点中,在数据流中,CheckpointBarrier和普通的数据不同,它是Flink内部事件(event)的一种。JobManager通过配置(env.enableCheckpointing(间隔时间毫秒)),定期通知data source在数据流中加入CheckpointBarrier。这样,CheckpointBarrier会随着数据流流经下游的各个节点。

1 checkpoint Coordinator 向所有的source节点 触发checkpoints

1589876675197.png

2 source节点向下游广播barrier,这个 barrier就是 实现chandy-lamport 分布式快照算法的核心 , 下游的Task只有接收到所有input的barrier才会执行checkpoint

1589876827804.png

3 当task完成state备份后,会将备份数据的地址 (state handle) 通知给checkpoint coordinator

1589876897269.png

4 下游sink节点收到上游两个inupt的barrier之后,会执行本地快照,这里 展示了RocksDB incremental checkpoint的流程, 首先RocksDB会全量数据刷到磁盘上(红色大三角) , 然后Flink框架 会从中选择没有上传的文件进行持久化备份(紫色小三角)

1589877665084.png

5 同样,当sink节点完成自己checkpoint之后,会将state handle返回 通知给coordinator

1589878581549.png

6 当checkpoint coordinator 收集所有task的 state handle, 就认为这一次的checkpoint全局完成了, 向持久化存储中在备份一个checkpoint meta文件

1589878708700.png

Checkpoint 的 EXACTLY_ONCE 语义

为了实现 exactly-once 语义, Flink通过 一个input buffer 将在对齐阶段收到的数据缓存起来,等对齐完成后在进行处理, 对于at least oncey语义, 无需缓存收集到的数据, 会对后续直接进行处理, 所以 当导致restore时, 数据可能导致多次处理


1589879876575.png

需要特别注意的是,Flink 的 Checkpoint 机制只能保证 Flink 的计算过程可以做到 EXACTLY ONCE,端到端的 EXACTLY ONCE 需要 source 和 sink 支持。

CheckpointBarrier

id:和checkpoint id对应。保持严格单调递增。后续代码逻辑会通过比较id值大小来确定checkpoint新旧。ID越大的checkpoint越新。

timestamp:记录checkpoint barrier产生的时间。ScheduledTrigger的run方法转入了系统当前时间为checkpoint的timestamp。

checkpointOptions 进行checkpoint操作时的选项。包含checkpoint类型和checkpoint保存位置偏好设置。

public class CheckpointBarrier extends RuntimeEvent {
​
 private final long id;
 private final long timestamp;
 private final CheckpointOptions checkpointOptions;
​
 public CheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) {
 this.id = id;
 this.timestamp = timestamp;
 this.checkpointOptions = checkNotNull(checkpointOptions);
 }

什么时候Barrier向下游传播

OperatorChain中的broadcastCheckpointBarrier方法将自己接收到的CheckpointBarrier向下游传播。该方法在performCheckpoint(创建快照)方法中调用。

StreamTask.performCheckpoint 方法中调用subtaskCheckpointCoordinator.checkpointState方法,在该方法中调用operatorChain.broadcastEvent进行广播

 //StreamTask.performCheckpoint方法中
 subtaskCheckpointCoordinator.checkpointState(
 checkpointMetaData,
 checkpointOptions,
 checkpointMetrics,
 operatorChain,
 this::isCanceled);

 //subtaskCheckpointCoordinator.checkpointState方法中
 operatorChain.broadcastEvent(
 new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options),
 unalignedCheckpointEnabled);

 //OperatorChain.broadcastEvent方法
 public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
 for (RecordWriterOutput<?> streamOutput : streamOutputs) {
 streamOutput.broadcastEvent(event, isPriorityEvent);
     }
 }

InputProcessorUtil类

这个类中有一个重要的方法 CheckpointBarrierHandler方法, 这个方法会根据用户指定的语义创建不同的barrier处理器

在CheckpointBarrierHandler方法中判断语义,如果是EXACTLY_ONCE语义,并且未启用对齐的检查点创建CheckpointBarrierUnaligner否则创建CheckpointBarrierAligner,如果AT_LEAST_ONCE使用CheckpointBarrierTracker,三个类都实现了CheckpointBarrierHandler接口,主要负责barrier到来时候的处理逻辑,主要目的是为了barrier对齐的问题

调用链

StreamTask.beforeInvoke

 protected void beforeInvoke() throws Exception {
 ...
 // task specific initialization
 init();
 ...
     }

OneInputStreamTask.init.

public void init() throws Exception {
 ...
 if (numberOfInputs > 0) {
 //这里创建
 CheckpointedInputGate inputGate = createCheckpointedInputGate();
 DataOutput<IN> output = createDataOutput();
 StreamTaskInput<IN> input = createTaskInput(inputGate, output);
 inputProcessor = new StreamOneInputProcessor<>(
 input,
 output,
 operatorChain);
     }
  ...
 }

OneInputStreamTask.createCheckpointedInputGate

 private CheckpointedInputGate createCheckpointedInputGate() {
 IndexedInputGate[] inputGates = getEnvironment().getAllInputGates();
 InputGate inputGate = InputGateUtil.createInputGate(inputGates);
​
 return InputProcessorUtil.createCheckpointedInputGate(
 this,
 configuration,
 getChannelStateWriter(),
 inputGate,
 getEnvironment().getMetricGroup().getIOMetricGroup(),
 getTaskNameWithSubtaskAndId());
 }

InputProcessorUtil.createCheckpointedInputGate

 public static CheckpointedInputGate createCheckpointedInputGate(
 AbstractInvokable toNotifyOnCheckpoint,
 StreamConfig config,
 ChannelStateWriter channelStateWriter,
 InputGate inputGate,
 TaskIOMetricGroup taskIOMetricGroup,
 String taskName) {
 //这里创建
 CheckpointBarrierHandler barrierHandler = createCheckpointBarrierHandler(
 config,
 IntStream.of(inputGate.getNumberOfInputChannels()),
 channelStateWriter,
 taskName,
 generateChannelIndexToInputGateMap(inputGate),
 generateInputGateToChannelIndexOffsetMap(inputGate),
 toNotifyOnCheckpoint);
 registerCheckpointMetrics(taskIOMetricGroup, barrierHandler);
 ...
​
 return new CheckpointedInputGate(inputGate, barrierHandler);
 }

createCheckpointBarrierHandler方法

//根据不同语义创建不同的 CheckpointBarrierHandler
private static CheckpointBarrierHandler createCheckpointBarrierHandler(
 StreamConfig config,
 IntStream numberOfInputChannelsPerGate,
 ChannelStateWriter channelStateWriter,
 String taskName,
 InputGate[] channelIndexToInputGate,
 Map<InputGate, Integer> inputGateToChannelIndexOffset,
 AbstractInvokable toNotifyOnCheckpoint) {
 switch (config.getCheckpointMode()) {
 case EXACTLY_ONCE:
 if (config.isUnalignedCheckpointsEnabled()) {
 return new CheckpointBarrierUnaligner(
 numberOfInputChannelsPerGate.toArray(),
 channelStateWriter,
 taskName,
 toNotifyOnCheckpoint);
 }
 return new CheckpointBarrierAligner(
 taskName,
 channelIndexToInputGate,
 inputGateToChannelIndexOffset,
 toNotifyOnCheckpoint);
 case AT_LEAST_ONCE:
 return new CheckpointBarrierTracker(numberOfInputChannelsPerGate.sum(), toNotifyOnCheckpoint);
 default:
 throw new UnsupportedOperationException("Unrecognized Checkpointing Mode: " + config.getCheckpointMode());
    }
 }
CheckpointBarrierAligner -- EXACTLY_ONCE 启用对齐使用逻辑

CheckpointBarrierAligner实现了Barrier对齐的逻辑,在processBarrier方法中

 @Override
 public void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
 final long barrierId = receivedBarrier.getId();
​
 // fast path for single channel cases
 //如果输入通道仅有一个,无需barrier对齐,直接进行checkpoint操作
 if (totalNumberOfInputChannels == 1) {
 //恢复消费
 resumeConsumption(channelIndex);
 //如果 barrierid 大于当前checkpointid,意味着需要进行新的checkpoint
 if (barrierId > currentCheckpointId) {
 // new checkpoint
 currentCheckpointId = barrierId;
 notifyCheckpoint(receivedBarrier, latestAlignmentDurationNanos);
     }
 return;
 }
​
 // -- general code path for multiple input channels --
 // todo numBarriersReceived == 接收屏障的数量(=阻塞/缓冲通道的数量) 重要:已取消的检查点必须始终为0屏障
 if (numBarriersReceived > 0) {
 // this is only true if some alignment is already progress and was not canceled
 //barrier id相等,说明同一次checkpoint的barrier从另一个数据流到来
 if (barrierId == currentCheckpointId) {
 // regular case
 // 将这个channel 阻塞住,numBarriersReceived 加一
 onBarrier(channelIndex);
 }
 else if (barrierId > currentCheckpointId) {
 // 新的barrier到来,但是id比当前正在处理的要新,说明当前处理的checkpoint尚未完成之时,
 // 又要开始处理新的checkpoint 这种情况需要终止当前正在处理的checkpoint
 // we did not complete the current checkpoint, another started before
 LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
 "Skipping current checkpoint.",
 taskName,
 barrierId,
 currentCheckpointId);
​
 // let the task know we are not completing this
 // 通知Stream终止一个检查点,  为currentCheckpointId的checkpoint
 // 最终会调用StreamTask的abortCheckpointOnBarrier方法
 notifyAbort(currentCheckpointId,
 new CheckpointException(
 "Barrier id: " + barrierId,
 CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED));
​
 // abort the current checkpoint
 // 恢复被阻塞的channel,重置barrier计数  重置对齐操作开始时间
 // todo 流对齐结束,将缓冲数据返回
 releaseBlocksAndResetBarriers();
​
 // begin a new checkpoint
 // 启动新检查点的流对齐
 beginNewAlignment(barrierId, channelIndex, receivedBarrier.getTimestamp());
 }
 else {
 // ignore trailing barrier from an earlier checkpoint (obsolete now)
 // 忽略来自早期检查点的尾随障碍(现在已过时), 恢复消费
 resumeConsumption(channelIndex);
     }
 }
 else if (barrierId > currentCheckpointId) {
 // first barrier of a new checkpoint
 // 如果当前没有已到达的barrier,并且到来的barrier id比当前的新,说明需要开始新的对齐流程
 beginNewAlignment(barrierId, channelIndex, receivedBarrier.getTimestamp());
 }
 else {
 // either the current checkpoint was canceled (numBarriers == 0) or
 // this barrier is from an old subsumed checkpoint
 // 忽略更早的checkpoint barrie
 resumeConsumption(channelIndex);
 }
​
 // check if we have all barriers - since canceled checkpoints always have zero barriers
 // this can only happen on a non canceled checkpoint
 // 如果已阻塞通道数+已关闭通道数=总的输入通道数,说明所有通道的checkpoint barrier已经到齐,对齐操作完成
 // 此时可以触发checkpoint操作,并且恢复被阻塞的channel,重置barrier计数
 if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
 // actually trigger checkpoint
 if (LOG.isDebugEnabled()) {
 LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.",
 taskName,
 receivedBarrier.getId(),
 receivedBarrier.getTimestamp());
 }
 // todo 流对齐结束,将缓冲数据返回
 releaseBlocksAndResetBarriers();
 // 通知检查点
 notifyCheckpoint(receivedBarrier, latestAlignmentDurationNanos);
     }
 }

notifyAbort终止检查点方法

 protected void notifyAbort(long checkpointId, CheckpointException cause) throws IOException {
 //这里调用的 streamTask的 abortCheckpointOnBarrier方法
 toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause);
 }
​
 // StreamTask的 abortCheckpointOnBarrier方法
@Override
 public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws IOException {
 subtaskCheckpointCoordinator.abortCheckpointOnBarrier(checkpointId, cause, operatorChain);
 }

流程图


image-20200525155534849.png
CheckpointBarrierUnaligner-- EXACTLY_ONCE -- 未启用对齐

在该类中创建了一个内部类ThreadSafeUnaligner,实现了BufferReceivedListener和Closeable接口, ThreadSafeUnaligner用户封装在netty线程和任务线程之间共享的状态

private static class ThreadSafeUnaligner implements BufferReceivedListener, Closeable</pre>

CheckpointBarrierUnaligner的方法,最后会调用threadSafeUnaligner.notifyBarrierReceived方法

```java
@Override
 public void processBarrier(
 CheckpointBarrier receivedBarrier,
 int channelIndex) throws Exception {
 //获取当前barrier id
 long barrierId = receivedBarrier.getId();
​
 //如果当前使用的检查点Id 大于 barrier id  或者 == barrier id           并且                  当前使用的barrier数等于0
 //说明 该barrier是旧的或是取消的 barrier ,那么进行忽略掉
 if (currentConsumedCheckpointId > barrierId || (currentConsumedCheckpointId == barrierId && numBarrierConsumed == 0)) {
 // ignore old and cancelled barriers
 return;
 }
 //如果 barrier id >  当前使用的检查点Id
 // 说明在检查点id为完成之前 接收到新开始的checkpoint --放弃当前执行的checkpoint,执行最新的checkpoint
 if (currentConsumedCheckpointId < barrierId) {
 //那么将当前使用的检查点id 设置为barrier id
 currentConsumedCheckpointId = barrierId;
 numBarrierConsumed = 0;
 //将 布尔值 赋值给每个元素
 Arrays.fill(hasInflightBuffers, true);
 }
 // 如果 barrier id = 当前使用的检查点Id   --说明接收到当前checkpoint的barrier
 if (currentConsumedCheckpointId == barrierId) {
 //hasInflightBuffers的channelIndex索引位置设置为false  -- 表示 该channel接收到barrier
 hasInflightBuffers[channelIndex] = false;
 // 当前使用的 barrier数 ++
 numBarrierConsumed++;
 }
 // processBarrier is called from task thread and can actually happen before notifyBarrierReceived on empty
 // buffer queues
 // to avoid replicating any logic, we simply call notifyBarrierReceived here as well
 //processBarrier是从任务线程调用的,并且可以在空缓冲区队列上的notifyBarrierReceived之前发生
 // ,以避免复制任何逻辑,我们在这里也只调用notifyBarrierReceived
 // todo 调用 notifyBarrierReceived 方法
 threadSafeUnaligner.notifyBarrierReceived(receivedBarrier, channelInfos[channelIndex]);
 }

threadSafeUnaligner对象在创建CheckpointBarrierUnaligner的时候初始化的

threadSafeUnaligner = new ThreadSafeUnaligner(totalNumChannels, checkNotNull(channelStateWriter), this);

现在进入threadSafeUnaligner.notifyBarrierReceived方法,当接收到新的barrier id时候会调用handleNewCheckpoint方法

@Override
 public synchronized void notifyBarrierReceived(CheckpointBarrier barrier, InputChannelInfo channelInfo) throws IOException {
 //获取barrier id
 long barrierId = barrier.getId();
​
 //如果 barrier id 大于 当前接受到的checkpoint id , 说明当前检查点处理完之前,接受到新的barrier id
 if (currentReceivedCheckpointId < barrierId) {
 // 终止当前执行的checkpoint id,   启动当前barrier id的 checkpoint
 handleNewCheckpoint(barrier);
 // 通知检查点触发
 handler.executeInTaskThread(() -> handler.notifyCheckpoint(barrier, 0), "notifyCheckpoint");
 }
​
 //获取通道索引
 int channelIndex = handler.getFlattenedChannelIndex(channelInfo);
​
 //如果barrier id == 当前checkpoint id  并且 ,输入通道未接收屏障的状态为 true
 //说明 task从通道中,接收到检查点的barrier
 if (barrierId == currentReceivedCheckpointId && storeNewBuffers[channelIndex]) {
 if (LOG.isDebugEnabled()) {
 LOG.debug("{}: Received barrier from channel {} @ {}.", handler.taskName, channelIndex, barrierId);
 }
​
 //将状态设置为flase - 说明收到barrier
 storeNewBuffers[channelIndex] = false;
​
 //当 barrier的channel数 等于 开启的channel 说明接受到了所有的barrier
 if (++numBarriersReceived == numOpenChannels) {
 //清空状态
 allBarriersReceivedFuture.complete(null);
           }
       }
 }

进入handleNewCheckpoint方法,该方法主要用于终止当前执行的checkpoint,开始新的checkpoint

 private synchronized void handleNewCheckpoint(CheckpointBarrier barrier) throws IOException {
 long barrierId = barrier.getId();
 if (!allBarriersReceivedFuture.isDone()) {
 // we did not complete the current checkpoint, another started before
 LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
 "Skipping current checkpoint.",
 handler.taskName,
 barrierId,
 currentReceivedCheckpointId);
​
 // let the task know we are not completing this
 //通知任务, 当前检查点未完成
 long currentCheckpointId = currentReceivedCheckpointId;
 handler.executeInTaskThread(() ->
 // 告知 Stream终止一个检查点  -- 为当前执行的检查点
 handler.notifyAbort(currentCheckpointId,
 new CheckpointException(  // 终止原因
 "Barrier id: " + barrierId,
 CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED)),
 "notifyAbort");
 }
​
 //当前检查点id = barrier id
 currentReceivedCheckpointId = barrierId;
 //将元素中的布尔值 设置为true
 Arrays.fill(storeNewBuffers, true);
 //barrier 输入通道数
 numBarriersReceived = 0;
​
 allBarriersReceivedFuture = new CompletableFuture<>();
 //启动对给定检查点id的通道状态的写操作
 //调用 ChannelStateWriterImpl的实现方法,   该类实现ChannelStateWriter接口
 channelStateWriter.start(barrierId, barrier.getCheckpointOptions());
 }
CheckpointBarrierTracker -- AT_LEAST_ONCE

在CheckpointBarrierTracker 中 定义了一个 CheckpointBarrierCount 内部类,CheckpointBarrierCount 它记录了一个barrier到来的次数以及该checkpoint是否被终止

CheckpointBarrierTracker 中还维护了pendingCheckpoints变量,用于存储已接收到barrier但是仍未触发通知进行checkpoint操作的所有checkpoint。

CheckpointBarrierCount 内部类

private static final class CheckpointBarrierCount {
​
 //检查点id
 private final long checkpointId;
 //barrier数量
 private int barrierCount;
​
 private boolean aborted;
​
 CheckpointBarrierCount(long checkpointId) {
 this.checkpointId = checkpointId;
 this.barrierCount = 1;
 }
​
 public long checkpointId() {
 return checkpointId;
 }
​
 public int incrementBarrierCount() {
 return ++barrierCount;
 }

 //是否未失败的
 public boolean isAborted() {
 return aborted;
 }

 //标记失败
 public boolean markAborted() {
 boolean firstAbort = !this.aborted;
 this.aborted = true;
 return firstAbort;
 }
​
 @Override
 public String toString() {
 return isAborted() ?
 String.format("checkpointID=%d - ABORTED", checkpointId) :
 String.format("checkpointID=%d, count=%d", checkpointId, barrierCount);
     }
 }

CheckpointBarrierTracker 的processBarrier方法 ,这个方法定义了相关barrier的逻辑

@Override
public void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
 final long barrierId = receivedBarrier.getId();
​
 // fast path for single channel trackers
 // 如果输入channel只有一个,立即触发通知进行checkpoint
 if (totalNumberOfInputChannels == 1) {
 // 通知触发checkpoint
 notifyCheckpoint(receivedBarrier, 0);
 return;
 }
​
 // general path for multiple input channels
 if (LOG.isDebugEnabled()) {
 LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelIndex);
 }
​
 // find the checkpoint barrier in the queue of pending barriers
​
 //定义 CheckpointBarrierCount对象
 CheckpointBarrierCount barrierCount = null;
 int pos = 0;
​
 // 顺序遍历找到pendingCheckpoints中的barrierId为当前接收到的barrier id的CheckpointBarrierCount对象
 // 同时,

1 checkpoint Coordinator 向所有的source节点 触发checkpoints

![1589876675197.png](https://upload-images.jianshu.io/upload_images/20250277-95f75c1139a2a3b7.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)


2 source节点向下游广播barrier,这个 barrier就是 实现chandy-lamport 分布式快照算法的核心 , 下游的Task只有接收到所有input的barrier才会执行checkpoint

![1589876827804.png](https://upload-images.jianshu.io/upload_images/20250277-f95d58ad07a65aa9.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)


3 当task完成state备份后,会将备份数据的地址 (state handle) 通知给checkpoint coordinator

![1589876897269.png](https://upload-images.jianshu.io/upload_images/20250277-b5a7d4c819564b95.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)


4 下游sink节点收到上游两个inupt的barrier之后,会执行本地快照,这里 展示了RocksDB incremental checkpoint的流程, 首先RocksDB会全量数据刷到磁盘上(红色大三角) , 然后Flink框架 会从中选择没有上传的文件进行持久化备份(紫色小三角)

![1589877665084.png](https://upload-images.jianshu.io/upload_images/20250277-3d9cc080ea73d5c8.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)


5 同样,当sink节点完成自己checkpoint之后,会将state handle返回 通知给coordinator

![1589878581549.png](https://upload-images.jianshu.io/upload_images/20250277-4ef2eb56fdbbd8ea.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)


6 当checkpoint coordinator 收集所有task的 state handle, 就认为这一次的checkpoint全局完成了, 向持久化存储中在备份一个checkpoint meta文件

![1589878708700.png](https://upload-images.jianshu.io/upload_images/20250277-a94a8d2857863ca6.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)


**Checkpoint 的 EXACTLY_ONCE 语义**

为了实现 exactly-once 语义, Flink通过 一个input buffer 将在对齐阶段收到的数据缓存起来,等对齐完成后在进行处理, 对于at least oncey语义, 无需缓存收集到的数据, 会对后续直接进行处理, 所以 当导致restore时, 数据可能导致多次处理
![1589879876575.png](https://upload-images.jianshu.io/upload_images/20250277-29a8783f871474f9.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)


需要特别注意的是,Flink 的 Checkpoint 机制只能保证 Flink 的计算过程可以做到 EXACTLY ONCE,端到端的 EXACTLY ONCE 需要 source 和 sink 支持。

## CheckpointBarrier

id:和checkpoint id对应。保持严格单调递增。后续代码逻辑会通过比较id值大小来确定checkpoint新旧。ID越大的checkpoint越新。

timestamp:记录checkpoint barrier产生的时间。ScheduledTrigger的run方法转入了系统当前时间为checkpoint的timestamp。

checkpointOptions 进行checkpoint操作时的选项。包含checkpoint类型和checkpoint保存位置偏好设置。

```java
public class CheckpointBarrier extends RuntimeEvent {
​
 private final long id;
 private final long timestamp;
 private final CheckpointOptions checkpointOptions;
​
 public CheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) {
 this.id = id;
 this.timestamp = timestamp;
 this.checkpointOptions = checkNotNull(checkpointOptions);
 }

什么时候Barrier向下游传播

OperatorChain中的broadcastCheckpointBarrier方法将自己接收到的CheckpointBarrier向下游传播。该方法在performCheckpoint(创建快照)方法中调用。

StreamTask.performCheckpoint 方法中调用subtaskCheckpointCoordinator.checkpointState方法,在该方法中调用operatorChain.broadcastEvent进行广播

 //StreamTask.performCheckpoint方法中
 subtaskCheckpointCoordinator.checkpointState(
 checkpointMetaData,
 checkpointOptions,
 checkpointMetrics,
 operatorChain,
 this::isCanceled);

 //subtaskCheckpointCoordinator.checkpointState方法中
 operatorChain.broadcastEvent(
 new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options),
 unalignedCheckpointEnabled);

 //OperatorChain.broadcastEvent方法
 public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
 for (RecordWriterOutput<?> streamOutput : streamOutputs) {
 streamOutput.broadcastEvent(event, isPriorityEvent);
     }
 }

InputProcessorUtil类

这个类中有一个重要的方法 CheckpointBarrierHandler方法, 这个方法会根据用户指定的语义创建不同的barrier处理器

在CheckpointBarrierHandler方法中判断语义,如果是EXACTLY_ONCE语义,并且未启用对齐的检查点创建CheckpointBarrierUnaligner否则创建CheckpointBarrierAligner,如果AT_LEAST_ONCE使用CheckpointBarrierTracker,三个类都实现了CheckpointBarrierHandler接口,主要负责barrier到来时候的处理逻辑,主要目的是为了barrier对齐的问题

调用链

StreamTask.beforeInvoke

 protected void beforeInvoke() throws Exception {
 ...
 // task specific initialization
 init();
 ...
     }

OneInputStreamTask.init.

public void init() throws Exception {
 ...
 if (numberOfInputs > 0) {
 //这里创建
 CheckpointedInputGate inputGate = createCheckpointedInputGate();
 DataOutput<IN> output = createDataOutput();
 StreamTaskInput<IN> input = createTaskInput(inputGate, output);
 inputProcessor = new StreamOneInputProcessor<>(
 input,
 output,
 operatorChain);
     }
  ...
 }

OneInputStreamTask.createCheckpointedInputGate

 private CheckpointedInputGate createCheckpointedInputGate() {
 IndexedInputGate[] inputGates = getEnvironment().getAllInputGates();
 InputGate inputGate = InputGateUtil.createInputGate(inputGates);
​
 return InputProcessorUtil.createCheckpointedInputGate(
 this,
 configuration,
 getChannelStateWriter(),
 inputGate,
 getEnvironment().getMetricGroup().getIOMetricGroup(),
 getTaskNameWithSubtaskAndId());
 }

InputProcessorUtil.createCheckpointedInputGate

 public static CheckpointedInputGate createCheckpointedInputGate(
 AbstractInvokable toNotifyOnCheckpoint,
 StreamConfig config,
 ChannelStateWriter channelStateWriter,
 InputGate inputGate,
 TaskIOMetricGroup taskIOMetricGroup,
 String taskName) {
 //这里创建
 CheckpointBarrierHandler barrierHandler = createCheckpointBarrierHandler(
 config,
 IntStream.of(inputGate.getNumberOfInputChannels()),
 channelStateWriter,
 taskName,
 generateChannelIndexToInputGateMap(inputGate),
 generateInputGateToChannelIndexOffsetMap(inputGate),
 toNotifyOnCheckpoint);
 registerCheckpointMetrics(taskIOMetricGroup, barrierHandler);
 ...
​
 return new CheckpointedInputGate(inputGate, barrierHandler);
 }

createCheckpointBarrierHandler方法

//根据不同语义创建不同的 CheckpointBarrierHandler
private static CheckpointBarrierHandler createCheckpointBarrierHandler(
 StreamConfig config,
 IntStream numberOfInputChannelsPerGate,
 ChannelStateWriter channelStateWriter,
 String taskName,
 InputGate[] channelIndexToInputGate,
 Map<InputGate, Integer> inputGateToChannelIndexOffset,
 AbstractInvokable toNotifyOnCheckpoint) {
 switch (config.getCheckpointMode()) {
 case EXACTLY_ONCE:
 if (config.isUnalignedCheckpointsEnabled()) {
 return new CheckpointBarrierUnaligner(
 numberOfInputChannelsPerGate.toArray(),
 channelStateWriter,
 taskName,
 toNotifyOnCheckpoint);
 }
 return new CheckpointBarrierAligner(
 taskName,
 channelIndexToInputGate,
 inputGateToChannelIndexOffset,
 toNotifyOnCheckpoint);
 case AT_LEAST_ONCE:
 return new CheckpointBarrierTracker(numberOfInputChannelsPerGate.sum(), toNotifyOnCheckpoint);
 default:
 throw new UnsupportedOperationException("Unrecognized Checkpointing Mode: " + config.getCheckpointMode());
    }
 }
CheckpointBarrierAligner -- EXACTLY_ONCE 启用对齐使用逻辑

CheckpointBarrierAligner实现了Barrier对齐的逻辑,在processBarrier方法中

 @Override
 public void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
 final long barrierId = receivedBarrier.getId();
​
 // fast path for single channel cases
 //如果输入通道仅有一个,无需barrier对齐,直接进行checkpoint操作
 if (totalNumberOfInputChannels == 1) {
 //恢复消费
 resumeConsumption(channelIndex);
 //如果 barrierid 大于当前checkpointid,意味着需要进行新的checkpoint
 if (barrierId > currentCheckpointId) {
 // new checkpoint
 currentCheckpointId = barrierId;
 notifyCheckpoint(receivedBarrier, latestAlignmentDurationNanos);
     }
 return;
 }
​
 // -- general code path for multiple input channels --
 // todo numBarriersReceived == 接收屏障的数量(=阻塞/缓冲通道的数量) 重要:已取消的检查点必须始终为0屏障
 if (numBarriersReceived > 0) {
 // this is only true if some alignment is already progress and was not canceled
 //barrier id相等,说明同一次checkpoint的barrier从另一个数据流到来
 if (barrierId == currentCheckpointId) {
 // regular case
 // 将这个channel 阻塞住,numBarriersReceived 加一
 onBarrier(channelIndex);
 }
 else if (barrierId > currentCheckpointId) {
 // 新的barrier到来,但是id比当前正在处理的要新,说明当前处理的checkpoint尚未完成之时,
 // 又要开始处理新的checkpoint 这种情况需要终止当前正在处理的checkpoint
 // we did not complete the current checkpoint, another started before
 LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
 "Skipping current checkpoint.",
 taskName,
 barrierId,
 currentCheckpointId);
​
 // let the task know we are not completing this
 // 通知Stream终止一个检查点,  为currentCheckpointId的checkpoint
 // 最终会调用StreamTask的abortCheckpointOnBarrier方法
 notifyAbort(currentCheckpointId,
 new CheckpointException(
 "Barrier id: " + barrierId,
 CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED));
​
 // abort the current checkpoint
 // 恢复被阻塞的channel,重置barrier计数  重置对齐操作开始时间
 // todo 流对齐结束,将缓冲数据返回
 releaseBlocksAndResetBarriers();
​
 // begin a new checkpoint
 // 启动新检查点的流对齐
 beginNewAlignment(barrierId, channelIndex, receivedBarrier.getTimestamp());
 }
 else {
 // ignore trailing barrier from an earlier checkpoint (obsolete now)
 // 忽略来自早期检查点的尾随障碍(现在已过时), 恢复消费
 resumeConsumption(channelIndex);
     }
 }
 else if (barrierId > currentCheckpointId) {
 // first barrier of a new checkpoint
 // 如果当前没有已到达的barrier,并且到来的barrier id比当前的新,说明需要开始新的对齐流程
 beginNewAlignment(barrierId, channelIndex, receivedBarrier.getTimestamp());
 }
 else {
 // either the current checkpoint was canceled (numBarriers == 0) or
 // this barrier is from an old subsumed checkpoint
 // 忽略更早的checkpoint barrie
 resumeConsumption(channelIndex);
 }
​
 // check if we have all barriers - since canceled checkpoints always have zero barriers
 // this can only happen on a non canceled checkpoint
 // 如果已阻塞通道数+已关闭通道数=总的输入通道数,说明所有通道的checkpoint barrier已经到齐,对齐操作完成
 // 此时可以触发checkpoint操作,并且恢复被阻塞的channel,重置barrier计数
 if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
 // actually trigger checkpoint
 if (LOG.isDebugEnabled()) {
 LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.",
 taskName,
 receivedBarrier.getId(),
 receivedBarrier.getTimestamp());
 }
 // todo 流对齐结束,将缓冲数据返回
 releaseBlocksAndResetBarriers();
 // 通知检查点
 notifyCheckpoint(receivedBarrier, latestAlignmentDurationNanos);
     }
 }

notifyAbort终止检查点方法

 protected void notifyAbort(long checkpointId, CheckpointException cause) throws IOException {
 //这里调用的 streamTask的 abortCheckpointOnBarrier方法
 toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause);
 }
​
 // StreamTask的 abortCheckpointOnBarrier方法
@Override
 public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws IOException {
 subtaskCheckpointCoordinator.abortCheckpointOnBarrier(checkpointId, cause, operatorChain);
 }

流程图


image-20200525155534849.png
CheckpointBarrierUnaligner-- EXACTLY_ONCE -- 未启用对齐

在该类中创建了一个内部类ThreadSafeUnaligner,实现了BufferReceivedListener和Closeable接口, ThreadSafeUnaligner用户封装在netty线程和任务线程之间共享的状态

private static class ThreadSafeUnaligner implements BufferReceivedListener, Closeable</pre>

CheckpointBarrierUnaligner的方法,最后会调用threadSafeUnaligner.notifyBarrierReceived方法

```java
@Override
 public void processBarrier(
 CheckpointBarrier receivedBarrier,
 int channelIndex) throws Exception {
 //获取当前barrier id
 long barrierId = receivedBarrier.getId();
​
 //如果当前使用的检查点Id 大于 barrier id  或者 == barrier id           并且                  当前使用的barrier数等于0
 //说明 该barrier是旧的或是取消的 barrier ,那么进行忽略掉
 if (currentConsumedCheckpointId > barrierId || (currentConsumedCheckpointId == barrierId && numBarrierConsumed == 0)) {
 // ignore old and cancelled barriers
 return;
 }
 //如果 barrier id >  当前使用的检查点Id
 // 说明在检查点id为完成之前 接收到新开始的checkpoint --放弃当前执行的checkpoint,执行最新的checkpoint
 if (currentConsumedCheckpointId < barrierId) {
 //那么将当前使用的检查点id 设置为barrier id
 currentConsumedCheckpointId = barrierId;
 numBarrierConsumed = 0;
 //将 布尔值 赋值给每个元素
 Arrays.fill(hasInflightBuffers, true);
 }
 // 如果 barrier id = 当前使用的检查点Id   --说明接收到当前checkpoint的barrier
 if (currentConsumedCheckpointId == barrierId) {
 //hasInflightBuffers的channelIndex索引位置设置为false  -- 表示 该channel接收到barrier
 hasInflightBuffers[channelIndex] = false;
 // 当前使用的 barrier数 ++
 numBarrierConsumed++;
 }
 // processBarrier is called from task thread and can actually happen before notifyBarrierReceived on empty
 // buffer queues
 // to avoid replicating any logic, we simply call notifyBarrierReceived here as well
 //processBarrier是从任务线程调用的,并且可以在空缓冲区队列上的notifyBarrierReceived之前发生
 // ,以避免复制任何逻辑,我们在这里也只调用notifyBarrierReceived
 // todo 调用 notifyBarrierReceived 方法
 threadSafeUnaligner.notifyBarrierReceived(receivedBarrier, channelInfos[channelIndex]);
 }

threadSafeUnaligner对象在创建CheckpointBarrierUnaligner的时候初始化的

threadSafeUnaligner = new ThreadSafeUnaligner(totalNumChannels, checkNotNull(channelStateWriter), this);

现在进入threadSafeUnaligner.notifyBarrierReceived方法,当接收到新的barrier id时候会调用handleNewCheckpoint方法

@Override
 public synchronized void notifyBarrierReceived(CheckpointBarrier barrier, InputChannelInfo channelInfo) throws IOException {
 //获取barrier id
 long barrierId = barrier.getId();
​
 //如果 barrier id 大于 当前接受到的checkpoint id , 说明当前检查点处理完之前,接受到新的barrier id
 if (currentReceivedCheckpointId < barrierId) {
 // 终止当前执行的checkpoint id,   启动当前barrier id的 checkpoint
 handleNewCheckpoint(barrier);
 // 通知检查点触发
 handler.executeInTaskThread(() -> handler.notifyCheckpoint(barrier, 0), "notifyCheckpoint");
 }
​
 //获取通道索引
 int channelIndex = handler.getFlattenedChannelIndex(channelInfo);
​
 //如果barrier id == 当前checkpoint id  并且 ,输入通道未接收屏障的状态为 true
 //说明 task从通道中,接收到检查点的barrier
 if (barrierId == currentReceivedCheckpointId && storeNewBuffers[channelIndex]) {
 if (LOG.isDebugEnabled()) {
 LOG.debug("{}: Received barrier from channel {} @ {}.", handler.taskName, channelIndex, barrierId);
 }
​
 //将状态设置为flase - 说明收到barrier
 storeNewBuffers[channelIndex] = false;
​
 //当 barrier的channel数 等于 开启的channel 说明接受到了所有的barrier
 if (++numBarriersReceived == numOpenChannels) {
 //清空状态
 allBarriersReceivedFuture.complete(null);
           }
       }
 }

进入handleNewCheckpoint方法,该方法主要用于终止当前执行的checkpoint,开始新的checkpoint

 private synchronized void handleNewCheckpoint(CheckpointBarrier barrier) throws IOException {
 long barrierId = barrier.getId();
 if (!allBarriersReceivedFuture.isDone()) {
 // we did not complete the current checkpoint, another started before
 LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
 "Skipping current checkpoint.",
 handler.taskName,
 barrierId,
 currentReceivedCheckpointId);
​
 // let the task know we are not completing this
 //通知任务, 当前检查点未完成
 long currentCheckpointId = currentReceivedCheckpointId;
 handler.executeInTaskThread(() ->
 // 告知 Stream终止一个检查点  -- 为当前执行的检查点
 handler.notifyAbort(currentCheckpointId,
 new CheckpointException(  // 终止原因
 "Barrier id: " + barrierId,
 CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED)),
 "notifyAbort");
 }
​
 //当前检查点id = barrier id
 currentReceivedCheckpointId = barrierId;
 //将元素中的布尔值 设置为true
 Arrays.fill(storeNewBuffers, true);
 //barrier 输入通道数
 numBarriersReceived = 0;
​
 allBarriersReceivedFuture = new CompletableFuture<>();
 //启动对给定检查点id的通道状态的写操作
 //调用 ChannelStateWriterImpl的实现方法,   该类实现ChannelStateWriter接口
 channelStateWriter.start(barrierId, barrier.getCheckpointOptions());
 }
CheckpointBarrierTracker -- AT_LEAST_ONCE

在CheckpointBarrierTracker 中 定义了一个 CheckpointBarrierCount 内部类,CheckpointBarrierCount 它记录了一个barrier到来的次数以及该checkpoint是否被终止

CheckpointBarrierTracker 中还维护了pendingCheckpoints变量,用于存储已接收到barrier但是仍未触发通知进行checkpoint操作的所有checkpoint。

CheckpointBarrierCount 内部类

private static final class CheckpointBarrierCount {
​
 //检查点id
 private final long checkpointId;
 //barrier数量
 private int barrierCount;
​
 private boolean aborted;
​
 CheckpointBarrierCount(long checkpointId) {
 this.checkpointId = checkpointId;
 this.barrierCount = 1;
 }
​
 public long checkpointId() {
 return checkpointId;
 }
​
 public int incrementBarrierCount() {
 return ++barrierCount;
 }

 //是否未失败的
 public boolean isAborted() {
 return aborted;
 }

 //标记失败
 public boolean markAborted() {
 boolean firstAbort = !this.aborted;
 this.aborted = true;
 return firstAbort;
 }
​
 @Override
 public String toString() {
 return isAborted() ?
 String.format("checkpointID=%d - ABORTED", checkpointId) :
 String.format("checkpointID=%d, count=%d", checkpointId, barrierCount);
     }
 }

CheckpointBarrierTracker 的processBarrier方法 ,这个方法定义了相关barrier的逻辑

@Override
public void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
 final long barrierId = receivedBarrier.getId();
​
 // fast path for single channel trackers
 // 如果输入channel只有一个,立即触发通知进行checkpoint
 if (totalNumberOfInputChannels == 1) {
 // 通知触发checkpoint
 notifyCheckpoint(receivedBarrier, 0);
 return;
 }
​
 // general path for multiple input channels
 if (LOG.isDebugEnabled()) {
 LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelIndex);
 }
​
 // find the checkpoint barrier in the queue of pending barriers
​
 //定义 CheckpointBarrierCount对象
 CheckpointBarrierCount barrierCount = null;
 int pos = 0;
​
 // 顺序遍历找到pendingCheckpoints中的barrierId为当前接收到的barrier id的CheckpointBarrierCount对象
 // 同时记录下它在pendingCheckpoints中的位置到pos
 // todo pendingCheckpoints 挂机的checkpoints -- 运行中的checkpoint,但是还未触发的checkpoint
 for (CheckpointBarrierCount next : pendingCheckpoints) {
 //找到 checkpointId == barrierId后 就退出,并记录找到的位置到pos变量
 if (next.checkpointId == barrierId) {
 // CheckpointBarrierCount进行赋值
 barrierCount = next;
 break;
     }
 pos++;
 }
​
 // 如果找到了id相同的CheckpointBarrierCount 不为null
 if (barrierCount != null) {
 // add one to the count to that barrier and check for completion
 // 记录barrier +1
 int numBarriersNew = barrierCount.incrementBarrierCount();
​
 // 如果barrier数量和输入channel数量相等,说明已接收到所有input channel的barrier,可以进行checkpoint操作
 if (numBarriersNew == totalNumberOfInputChannels) {
 // checkpoint can be triggered (or is aborted and all barriers have been seen)
 // first, remove this checkpoint and all all prior pending
 // checkpoints (which are now subsumed)
 // 检查点可以被触发(或者被终止,并且所有的barrier都被看到了)首先,删除这个检查点和所有之前的挂起检查点(现在被包含进来)
 // 移除此barrier之前的所有未完成的checkpoint
 for (int i = 0; i <= pos; i++) {
 pendingCheckpoints.pollFirst(); //todo 方法检索并移除这个deque的第一个元素。如果此deque 队列为空返回null。
 }
​
 // notify the listener
 // 如果checkpoint没有终止,通知进行checkpoint操作
 if (!barrierCount.isAborted()) {
 if (LOG.isDebugEnabled()) {
 LOG.debug("Received all barriers for checkpoint {}", barrierId);
 }
​
 // 通知触发checkpoint
 notifyCheckpoint(receivedBarrier, 0);
         }
     }
 }
 else {
 // first barrier for that checkpoint ID
 // add it only if it is newer than the latest checkpoint.
 // if it is not newer than the latest checkpoint ID, then there cannot be a
 // successful checkpoint for that ID anyways
 // todo 该检查点ID的第一个障碍只有当它比最新的检查点更新时才添加它。
 //  如果它并不比最新的检查点ID新,那么该ID就不可能有一个成功的检查点
 // 在pendingCheckpoints中没有找到id相同的checkpoint,说明这次到来的barrier对应新的checkpoint
 // barrier ID是自增的,如果barrierId > latestPendingCheckpointID说明barrier比pending的所有checkpoint都要新
 // 反之,说明barrier来迟了,直接忽略
 if (barrierId > latestPendingCheckpointID) {
 // 标记 checkpoint 开始
 markCheckpointStart(receivedBarrier.getTimestamp());
 // 更新latestPendingCheckpointID   -- todo 到目前为止遇到的最高检查点ID
 latestPendingCheckpointID = barrierId;
 // 增加当前的checkpoint到pendingCheckpoints
 pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));
​
 // make sure we do not track too many checkpoints
 // 如果pendingCheckpoints中保存的数量大于MAX_CHECKPOINTS_TO_TRACK,删除最早未保存的checkpoint
 // todo MAX_CHECKPOINTS_TO_TRACK = 追踪器追踪的是最多数量的检查点,其中一些,但不是所有的障碍都已经到达。
 if (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {
 pendingCheckpoints.pollFirst();
               }
           }
       }
}

执行逻辑图
image-20200525172333377.png

CheckpointBarrierHandler的processBarrier方法在CheckpointedInputGate的pollNext中调用

CheckpointedInputGate初始化在StreanTask的beforeInvoke方法中,调用init方法,该方法在OneInputStreamTask中

CheckpointedInputGate负责读取上游节点的数据以及对接收到barrier做出响应

@Override
 public Optional<BufferOrEvent> pollNext() throws Exception {
​
 while (true) {
​
 //从inputGate读取数据
 Optional<BufferOrEvent> next = inputGate.pollNext();
​
 // 如果不存在值
 if (!next.isPresent()) {
 //返回一个空的buuffer
 return handleEmptyBuffer();
 }

 //获取Optional保存 非空的值
 BufferOrEvent bufferOrEvent = next.get();
​
 checkState(!barrierHandler.isBlocked(offsetChannelIndex(bufferOrEvent.getChannelIndex())));
​
​
 if (bufferOrEvent.isBuffer()) {
 return next;
 }
 //如果事件是一个checkpoint barrier对象
 else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
​
 //获取checkpointBarrier
 CheckpointBarrier checkpointBarrier = (CheckpointBarrier) bufferOrEvent.getEvent();
 // 调用barrierHandler的processBarrier方法
 barrierHandler.processBarrier(checkpointBarrier, offsetChannelIndex(bufferOrEvent.getChannelIndex()));
 }
 //如果事件 是一个 checkpoint 取消标记
 else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
 //调用barrierHandler的processCancellationBarrier方法
 barrierHandler.processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());
 }
 else {
 //如果事件 , 是分区结束事件
 if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
 //调用调用barrierHandler的processEndOfPartition方法
 barrierHandler.processEndOfPartition();
 }
 return next;
                 }
         }
 }

如有错误,欢迎指正!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,794评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,050评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,587评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,861评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,901评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,898评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,832评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,617评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,077评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,349评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,483评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,199评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,824评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,442评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,632评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,474评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,393评论 2 352

推荐阅读更多精彩内容