Flink-1.10 源码笔记 两阶段提交

Flink源码分析 -- 两阶段提交

先简单了解一下处理语义

关于消息的消费、处理语义可以分为三类:

1. at most once : 至多一次,表示一条消息不管后续处理成功与否只会被消费处理一次,那么就存在数据丢失可能

2. exactly once : 精确一次,表示一条消息从其消费到后续的处理成功,只会发生一次

3. at least once :至少一次,表示一条消息从消费到后续的处理成功,可能会发生多次

在我们程序处理中通常要求能够满足exectly once语义,保证数据的准确性,flink 通过checkpoint机制提供了Exactly-Once与At-Least-Once 两种不同的消费语义实现, 可以将程序处理的所有数据都保存在状态内部,当程序发生异常失败重启可以从最近一次成功checkpoint中恢复状态数据,通过checkpoint中barrier对齐机制来实现这两不同的语义,barrier对齐发生在一个处理节点需要接收上游不同处理节点的数据,由于不同的上游节点数据处理速度不一致,那么就会导致下游节点接收到 barrier的时间点也会不一致,这时候就需要使用barrier对齐机制:在同一checkpoint中,先到达的barrier是否需要等待其他处理节点barrier达到后在发送后续数据,barrier将数据流分为前后两个checkpoint(chk n,chk n+1)的概念,如果不等待那么就会导致chk n的阶段处理了chk n+1阶段的数据,但是在source端所记录的消费偏移量又一致,如果chk n成功之后,后续的任务处理失败,任务重启会消费chk n+1阶段数据,就会到致数据重复消息,如果barrier等待就不会出现这样情况,因此barrier需要对齐那么就是实现exectly once语义,否则实现的是at least once语义。由于状态是属于flink内部存储,所以flink 仅仅满足内部exectly once语义。

barrier对齐

两阶段提交(2PC)

在分布式系统中,可以使用两阶段提交来实现事务性从而保证数据的一致性,两阶段提交分为:预提交阶段与提交阶段,通常包含两个角色:协调者与执行者,协调者用于用于管理所有执行者的操作,执行者用于执行具体的提交操作,具体的操作流程:

如果在流程中部分预提交失败,那么协调者就会收到一条失败的反馈,则会发送一条rollback消息给所有执行者,执行回滚操作,保证数据一致性;但是如果在流程中,出现部分提交成功部分提交失败,那么就会造成数据的不一致,因此后面也提出了3PC或者通过其他补偿机制来保证数据最终一致性,

两阶段提交

Flink中的两阶段提交

2017年12月Apache Flink社区发布了1.4版本。该版本引入一个新的的功能:两阶段提交Sink,即TwoPhaseCommitSinkFunction。该SinkFunction提取并封装了两阶段提交协议中的公共逻辑,自此Flink搭配特定source和sink搭建精确一次处理语义( exactly-once semantics)应用成为了可能。作为一个抽象类TwoPhaseCommitSinkFunction提供了一个抽象层供用户自行实现特定方法来支持 exactly-once semantics。

在这之前Flink支持EXACTLY_ONCE仅是对于Flink应用内部来说,对外部系统(端到端)有比较强的限制

  • 外部系统写入支持幂等性

  • 外部系统支持以事务的方式写入

在Flink1.4之后引入了TwoPhaseCommitSinkFunction接口,并在kafka Producer的connector中实现了它,支持了外部Kafka Sink 的EXACTLY_ONCE语义

两阶段提交协议针对Flink的Sink。要求下游的系统支持事务,或者是幂等性。两阶段提交是指如下两个阶段:

  1. preCommit : 执行预提交, 在Sink进行snapshot操作的时候调用此方法。

  2. commit : 真正的提交操作。当系统中各个operator的checkpoint操作都成功之后,JobManager会通知各个operator checkpoint操作已完成。此时会调用该方法

    通过这两个阶段然后结合checkpoint 过程提供的hook,来实现两阶段提交过程

1. 一旦所有operator完成各自的pre-commit,它们会发起一个commit操作

2. 倘若有一个pre-commit失败,所有其他的pre-commit必须被终止,并且Flink会回滚到最近成功完成decheckpoint

3. 一旦pre-commit完成,必须要确保commit也要成功——operator和外部系统都需要对此进行保证。倘若commit失败(比如网络故障等),Flink应用就会崩溃,然后根据用户重启策略执行重启逻辑,之后再次重试commit。这个过程至关重要,因为倘若commit无法顺利执行,就可能出现数据丢失的情况

因此,所有opeartor必须对checkpoint最终结果达成共识:即所有operator都必须认定数据提交要么成功执行,要么被终止然后回滚。

当出现崩溃时,Flink会恢复最新已完成快照中应用状态。需要注意的是在某些极偶然的场景下,pre-commit阶段已成功完成而commit尚未开始(也就是operator尚未来得及被告知要开启commit),此时倘若发生崩溃Flink会将opeartor状态恢复到已完成pre-commit但尚未commit的状态。

在一个checkpoint状态中,对于已完成pre-commit的事务状态,我们必须保存足够多的信息,这样才能确保在重启后要么重新发起commit亦或是终止掉事务。本例中这部分信息就是临时文件所在的路径以及目标目录。

TwoPhaseCommitSinkFunction考虑了这种场景,因此当应用从checkpoint恢复之后TwoPhaseCommitSinkFunction总是会发起一个抢占式的commit。这种commit必须是幂等性的,虽然大部分情况下这都不是问题。本例中对应的这种场景就是:临时文件不在临时目录下,而是已经被移动到目标目录下。

Flink两阶段提交过程

提交过程中会出现两种情况 :
1.Pre-commit失败,将恢复到最近一次CheckPoint位置
2.一旦pre-commit完成,必须要确保commit也要成功

因此,所有opeartor必须对checkpoint最终结果达成共识:即所有operator都必须认定数据提交要么成功执行,要么被终止然后回滚

现在根据源码去刨析两阶段提交代码

TwoPhaseCommitSinkFunction

该类是实现两阶段提交Sink的父类,封装了两阶段提交的主要逻辑。

initializeState方法。该方法在CheckpointedFunction接口中定义,在集群中执行的时候调用,用于初始化状态后端。 该方法主要有以下逻辑:

  1. 获取状态存储变量state。

  2. 提交所有已经执行过preCommit的事务。

  3. 终止所有尚未preCommit的事务。

  4. 创建一个新事务。

TwoPhaseCommitSinkFunction类是一个抽象类,继承了RickSinkFunction,实现了CheckpointedFunction,和CheckpointListener接口

@PublicEvolving
public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>
 extends RichSinkFunction<IN>
 implements CheckpointedFunction, CheckpointListener

initializeState方法, 主要作用 :

状态初始化方法,只会被调用一次,第一件事情是用来恢复上次checkpoint完成预提交的事务与下一次checkpoint开始的事务,对于上次checkpoint完成预提交说明该checkpoint已经完成,那么执行commit操作,下一次checkpoint开始的事务说明该checkpoint,那么执行abort操作,第二件事情是开启一个新的事务,给新的checkpoint使用

@Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        // when we are restoring state with pendingCommitTransactions, we don't really know whether the
        // transactions were already committed, or whether there was a failure between
        // completing the checkpoint on the master, and notifying the writer here.

        // (the common case is actually that is was already committed, the window
        // between the commit on the master and the notification here is very small)

        // it is possible to not have any transactions at all if there was a failure before
        // the first completed checkpoint, or in case of a scale-out event, where some of the
        // new task do not have and transactions assigned to check)

        // we can have more than one transaction to check in case of a scale-in event, or
        // for the reasons discussed in the 'notifyCheckpointComplete()' method.

        //获取 operator的state
        state = context.getOperatorStateStore().getListState(stateDescriptor);

        // todo userContext 回收标志
        boolean recoveredUserContext = false;
        // 从上一个snapshot恢复完成的时候返回true,如果任务不支持snapshot,永远返回false
        // 如果状态是从前一次执行的快照恢复的,则返回true。对于无状态任务,这总是返回false
        // 指示是否找到要恢复的任何状态 -- true 需要恢复
        if (context.isRestored()) {
            LOG.info("{} - restoring state", name());
            //开始进行状态恢复
            for (State<TXN, CONTEXT> operatorState : state.get()) {
                // 获取上下文
                userContext = operatorState.getContext();
                // 获取待提交的事务
                // 在snapshotState方法调用preCommit之后,事务会被存储到该列表
                // 获取等待的 事务    -- get pre-Commit event
                List<TransactionHolder<TXN>> recoveredTransactions = operatorState.getPendingCommitTransactions();

                List<TXN> handledTransactions = new ArrayList<>(recoveredTransactions.size() + 1);

                //循环等待的事务, 提交事务
                for (TransactionHolder<TXN> recoveredTransaction : recoveredTransactions) {
                    // If this fails to succeed eventually, there is actually data loss
                    // 如果这种方法最终失败,实际上会导致数据丢失
                    // 恢复并提交这些之前在state中保存的事务,如果失败可能是打开事务时超时或者数据出现丢失
                    // todo 提交事务
                    recoverAndCommitInternal(recoveredTransaction);
                    handledTransactions.add(recoveredTransaction.handle);
                    LOG.info("{} committed recovered transaction {}", name(), recoveredTransaction);
                }

                {
                    // 获取到 未preCommit的事务
                    TXN transaction = operatorState.getPendingTransaction().handle;
                    // 恢复并终止该事务  -- 在失败后被协调器拒绝的事务中止   --调用 FlinkKafkaProduce的recoverAndAbort方法
                    recoverAndAbort(transaction);
                    handledTransactions.add(transaction);
                    LOG.info("{} aborted recovered transaction {}", name(), operatorState.getPendingTransaction());
                }

                if (userContext.isPresent()) {
                    //在恢复(每个)用户上下文后调用的子类的回调    -- 调用 FlinkKafkaProduce的finishRecoveringContext方法
                    // 回收 context
                    finishRecoveringContext(handledTransactions);
                    recoveredUserContext = true;
                }
            }
        }

        // if in restore we didn't get any userContext or we are initializing from scratch
        // 如果在恢复中,我们没有得到任何userContext或者我们正在从头开始初始化
        if (!recoveredUserContext) {
            LOG.info("{} - no state to restore", name());

            //清空 context
            userContext = initializeUserContext();
        }
        //清空 等待提交事务的列表
        this.pendingCommitTransactions.clear();

        // 开启一个新的事务
        currentTransactionHolder = beginTransactionInternal();
        LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);
    }

snapshotState方法与checkpoint同步周期性执行的方法,首先执行preCommit对本次checkpoint事务执行预提交操作,并且开启一个新的事务提供给下一次checkpoint使用,然后将这两个事务句柄存放在state中进行容错,preCommit提交的事务就是在失败后重启需要commit的事务,而新开启的事务就是在失败后重启需要放弃的事务;

@Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // this is like the pre-commit of a 2-phase-commit transaction
        // we are ready to commit and remember the transaction

        // 检查事务对象不能为空      --- 执行state snapshot时必须要有事务对象
        checkState(currentTransactionHolder != null, "bug: no transaction object when performing state snapshot");

        long checkpointId = context.getCheckpointId();
        LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'", name(), context.getCheckpointId(), currentTransactionHolder);

        // 调用preCommit方法    预提交  -- todo 最终会调用实现类的preCommit 如 FlinkKafkaProducer011
        preCommit(currentTransactionHolder.handle);
        // 在待提交事务列表(pendingCommitTransactions)中记录该事务
        // todo 将预提交事务 添加到待提交事务列表
        pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
        LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);

        // 开启新的事务
        currentTransactionHolder = beginTransactionInternal();
        LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);

        // 清空state,然后记录当前事务和待提交事务      --todo State is Pojo类 耦合 当前事务,待提交事务列表,上下文
        state.clear();
        state.add(new State<>(
            this.currentTransactionHolder,
            new ArrayList<>(pendingCommitTransactions.values()),
            userContext));
    }

notifyCheckpointComplete方法主要时checkpoint完成之后的回调方法,负责对预提交的事务执行commit操作。

    @Override
    public final void notifyCheckpointComplete(long checkpointId) throws Exception {
        // the following scenarios are possible here
        //
        //  (1) there is exactly one transaction from the latest checkpoint that
        //      was triggered and completed. That should be the common case.
        //      Simply commit that transaction in that case.
        //
        //  (2) there are multiple pending transactions because one previous
        //      checkpoint was skipped. That is a rare case, but can happen
        //      for example when:
        //
        //        - the master cannot persist the metadata of the last
        //          checkpoint (temporary outage in the storage system) but
        //          could persist a successive checkpoint (the one notified here)
        //
        //        - other tasks could not persist their status during
        //          the previous checkpoint, but did not trigger a failure because they
        //          could hold onto their state and could successfully persist it in
        //          a successive checkpoint (the one notified here)
        //
        //      In both cases, the prior checkpoint never reach a committed state, but
        //      this checkpoint is always expected to subsume the prior one and cover all
        //      changes since the last successful one. As a consequence, we need to commit
        //      all pending transactions.
        //
        //  (3) Multiple transactions are pending, but the checkpoint complete notification
        //      relates not to the latest. That is possible, because notification messages
        //      can be delayed (in an extreme case till arrive after a succeeding checkpoint
        //      was triggered) and because there can be concurrent overlapping checkpoints
        //      (a new one is started before the previous fully finished).
        //
        // ==> There should never be a case where we have no pending transaction here
        //

        // 获取所有待提交的事务            -- 使用LinkedHashMap存储事务, 存储有序
        Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
        Throwable firstError = null;

        while (pendingTransactionIterator.hasNext()) {
            Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
            Long pendingTransactionCheckpointId = entry.getKey();
            TransactionHolder<TXN> pendingTransaction = entry.getValue();
            // 只提交在checkpointId之前的事务
            if (pendingTransactionCheckpointId > checkpointId) {
                continue;
            }

            // {name} - checkpoint {checkpointId} complete, committing transaction {pendingTransaction} from checkpoint {pendingTransactionCheckpointId}
            LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",
                name(), checkpointId, pendingTransaction, pendingTransactionCheckpointId);

            // todo 接近超时事件进行警告
            logWarningIfTimeoutAlmostReached(pendingTransaction);
            try {
                // 逐个提交之前preCommit过的事务
                commit(pendingTransaction.handle);
            } catch (Throwable t) {
                if (firstError == null) {
                    firstError = t;
                }
            }

            LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);
            // 将提交过的事务从待提交事务列表中清除
            pendingTransactionIterator.remove();
        }


        if (firstError != null) {
            throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",
                firstError);
        }
    }

在提交事务的时候调用commit方法, 会进入FlinkKafkaProducer011类中

FlinkKafkaProducer011继承了 TwoPhaseCommitSinkFunction类

public class FlinkKafkaProducer011<IN>
 extends TwoPhaseCommitSinkFunction<IN, FlinkKafkaProducer011.KafkaTransactionState, FlinkKafkaProducer011.KafkaTransactionContext>

FlinkKafkaProducer011的commit方法

    @Override
    protected void commit(KafkaTransactionState transaction) {
        if (transaction.isTransactional()) {
            try {
                //调用flink-connector-kafka-0.11项目下的FlinkKafkaProducer的commitTransaction方法
                transaction.producer.commitTransaction();
            } finally {
                //如果失败 回收 producer
                recycleTransactionalProducer(transaction.producer);
            }
        }
    }

进入FlinkKafkaProducer的commitTransaction方法

    @Override
    public void commitTransaction() throws ProducerFencedException {
        synchronized (producerClosingLock) {
            // 确保producer没有关闭
            ensureNotClosed();
            // 提交事务
            kafkaProducer.commitTransaction();
        }
    }

FlinkKafkaProducer

现在分析一下FlinkKafkaProducer类,是Flink对Kafka Producer的一个封装,其中以后用了producerClosingLock变量,用于对事务提交,回滚和关闭producer等操作加锁,在kafka2.3.0之前存在一个bug,关闭producer的线程和提交/终止事务的线程会发生死锁.在FlinkKafkaProducer对这些操作添加锁,避免此类问题,

FlinkKafkaProducer还有一个transactionId。创建的时候会从ProducerConfig配置中获取。

    // This lock and closed flag are introduced to workaround KAFKA-6635. Because the bug is only fixed in
    // Kafka 2.3.0, we need this workaround in 0.11 producer to avoid deadlock between a transaction
    // committing / aborting thread and a producer closing thread.
    // todo 这个锁和关闭标志被引入到KAFKA-6635的工作环境中。因为这个错误只在Kafka 2.3.0中得到了修复,
    //  所以我们需要在0.11生成器中使用这个解决方案来避免事务提交/中止线程和生成器关闭线程之间的死锁。
    private final Object producerClosingLock;

在调用是事务前的操作方法中,都会添加锁,防止出现死锁的情况

    // todo 初始化事务
    @Override
    public void initTransactions() {
        synchronized (producerClosingLock) {
            ensureNotClosed();
            kafkaProducer.initTransactions();
        }
    }

    // todo 开始事务
    @Override
    public void beginTransaction() throws ProducerFencedException {
        synchronized (producerClosingLock) {
            ensureNotClosed();
            kafkaProducer.beginTransaction();
        }
    }

    // todo 提交事务
    @Override
    public void commitTransaction() throws ProducerFencedException {
        synchronized (producerClosingLock) {
            // 确保producer没有关闭
            ensureNotClosed();
            // 提交事务
            kafkaProducer.commitTransaction();
        }
    }

    // todo 终止事务
    @Override
    public void abortTransaction() throws ProducerFencedException {
        synchronized (producerClosingLock) {
            ensureNotClosed();
            kafkaProducer.abortTransaction();
        }
    }

    // todo 发送sffset到事务
    @Override
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
        synchronized (producerClosingLock) {
            ensureNotClosed();
            kafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId);
        }
    }

    // todo 发送数据
    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        return kafkaProducer.send(record);
    }

    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        return kafkaProducer.send(record, callback);
    }
    
    // todo 分发策略
    @Override
    public List<PartitionInfo> partitionsFor(String topic) {
        synchronized (producerClosingLock) {
            ensureNotClosed();
            return kafkaProducer.partitionsFor(topic);
        }
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        return kafkaProducer.metrics();
    }

    @Override
    public void close() {
        closed = true;
        synchronized (producerClosingLock) {
            kafkaProducer.close();
        }
    }

FlinkKafkaProducer011

FlinkKafkaProducer011实现了TwoPhaseCommitSinkFunction,实现了两阶段提交主要逻辑

public class FlinkKafkaProducer011<IN>
 extends TwoPhaseCommitSinkFunction<IN, FlinkKafkaProducer011.KafkaTransactionState, FlinkKafkaProducer011.KafkaTransactionContext> {

FlinkKafkaProducer011实现了1TwoPhaseCommitSinkFunction。实现了两阶段提交的主要逻辑

  1. beginTransaction:开启一个事务,在临时目录下创建一个临时文件,之后,写入数据到该文件中

  2. preCommit:在pre-commit阶段,flush缓存数据块到磁盘,然后关闭该文件,确保再不写入新数据到该文件。同时开启一个新事务执行属于下一个checkpoint的写入操作

  3. commit:在commit阶段,我们以原子性的方式将上一阶段的文件写入真正的文件目录下。注意:这会增加输出数据可见性的延时。通俗说就是用户想要看到最终数据需要等会,不是实时的。

  4. abort:一旦终止事务,我们离自己删除临时文件

//todo 开始事务
//todo 开始事务
    @Override
    protected KafkaTransactionState beginTransaction() throws FlinkKafka011Exception {
        switch (semantic) { // todo 匹配语义
            case EXACTLY_ONCE:
                // 创建一个支持事务的 生产者 ,   -- 类型 FlinkKafkaProducer
                // todo 每个新检查点都会创建一个FlinkKafkaProducer,这样不会与之前的事务出现冲突
                FlinkKafkaProducer<byte[], byte[]> producer = createTransactionalProducer();
                // 开启事务   -- 进入 FlinkKafkaProducer的beginTransaction方法
                producer.beginTransaction();
                // 创建一个 kafka事务状态(包含 事务id,生产者实例)
                return new KafkaTransactionState(producer.getTransactionalId(), producer);
            case AT_LEAST_ONCE:
            case NONE:
                // Do not create new producer on each beginTransaction() if it is not necessary
                // 获取当前的事务
                final KafkaTransactionState currentTransaction = currentTransaction();
                // 如果当前有事务,返回当前事务对应的kafka producer
                if (currentTransaction != null && currentTransaction.producer != null) {
                    return new KafkaTransactionState(currentTransaction.producer);
                }
                // initNonTransactionalProducer 会初始化一个非事务的producer
                // 返回一个 非事务的 producer
                return new KafkaTransactionState(initNonTransactionalProducer(true));
            default:
                throw new UnsupportedOperationException("Not implemented semantic");
        }
    }

    // todo 预提交
    @Override
    protected void preCommit(KafkaTransactionState transaction) throws FlinkKafka011Exception {
        switch (semantic) {
            //该语义需要 flush
            case EXACTLY_ONCE:
            case AT_LEAST_ONCE:
                flush(transaction);
                break;
                // 啥也不做
            case NONE:
                break;
            default:
                // 报错
                throw new UnsupportedOperationException("Not implemented semantic");
        }
        // 检查检查
        checkErroneous();
    }

    //提交事务
    @Override
    protected void commit(KafkaTransactionState transaction) {
        if (transaction.isTransactional()) {
            try {
                //调用flink-connector-kafka-0.11项目下的FlinkKafkaProducer的commitTransaction方法
                transaction.producer.commitTransaction();
            } finally {
                //如果失败 回收producer的transactionId 加入到(availableTransactionalIds),关闭producer
                recycleTransactionalProducer(transaction.producer);
            }
        }
    }

    // todo 用于从transactionalId恢复出对应的kafka producer,然后在提交任务
    @Override
    protected void recoverAndCommit(KafkaTransactionState transaction) {
        if (transaction.isTransactional()) {
            try (
                // 尝试根据已有的transactionId重新建立producer,然后提交任务
                FlinkKafkaProducer<byte[], byte[]> producer =
                    initTransactionalProducer(transaction.transactionalId, false)) {
                // 恢复事务
                producer.resumeTransaction(transaction.producerId, transaction.epoch);
                // 提交事务
                producer.commitTransaction();
            } catch (InvalidTxnStateException | ProducerFencedException ex) {
                // That means we have committed this transaction before.
                LOG.warn("Encountered error {} while recovering transaction {}. " +
                        "Presumably this transaction has been already committed before",
                    ex,
                    transaction);
            }
        }
    }

    // 终止事务
    @Override
    protected void abort(KafkaTransactionState transaction) {
        if (transaction.isTransactional()) {
            // 终止事务的提交
            transaction.producer.abortTransaction();
            // 回收transactionId,关闭producer
            recycleTransactionalProducer(transaction.producer);
        }
    }

如有错误 欢迎指正

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容