Kafka事务及幂等

消息系统的用户从更严格的幂等生产者语义中获益良多,即每个消息写将被精确地持久化一次,没有重复和数据丢失——即使在客户端重试或代理失败的情况下也是如此。这些更强的语义不仅使编写应用程序更容易,而且扩展了可以使用给定消息传递系统的应用程序的空间。

然而,幂等Producer并不为跨多个主题分区的写提供保证。为此,需要更强的事务保证。能够自动写入多个主题分区。
在原子性上,我们指的是跨topicpartition将一组消息作为一个单元提交的能力:要么提交所有消息,要么不提交

流处理应用程序是“consu -transform- production”任务的管道,当流的重复处理不可接受时,绝对需要事务保证。因此,将事务保证添加到Kafka(一个流平台)使其不仅对流处理更有用,而且对其他各种应用程序也更有用。

在本文档中,我们提出了将事务引入Kafka的建议。我们将只关注面临变化的用户:客户端API的变化,我们将引入的新配置,以及保证的总结。我们还概述了基本的数据流,它总结了我们将在事务中引入的所有新rpc。设计细节在单独的文档中给出。

简单介绍一下Transaction和Streams

在上一节中,我们提到事务的主要动机是在Kafka流中只启用一次处理。我们有必要再深入研究一下这个用例,

回想一下,使用Kafka Streams的数据转换通常通过多个stream processors进行,每个处理器由Kafka主题连接。这个设置被称为流拓扑,基本上是一个DAG,其中流处理器是节点,连接Kafka主题是顶点。这种模式是所有流架构的典型模式。您可以在这里阅读更多关于Kafka streams架构的内容。

因此,Kafka流的事务本质上将包含输入消息、本地状态存储的更新和输出消息。在事务中包含输入偏移量会促使将“sendOffsets”API添加到生产者接口,如下所述。进一步的细节将在单独的KIP中呈现。

Public Interfaces

Producer API changes
生产者将获得5个新方法(initTransactions、beginTransaction、sendoffset、commitTransaction、abortTransaction),并更新send方法以抛出一个新的异常。详情如下:

public interface Producer<K,V> extends Closeable {
   
  /**
   * Needs to be called before any of the other transaction methods. Assumes that
   * the transactional.id is specified in the producer configuration.
   *
   * This method does the following:
   *   1. Ensures any transactions initiated by previous instances of the producer
   *      are completed. If the previous instance had failed with a transaction in
   *      progress, it will be aborted. If the last transaction had begun completion,
   *      but not yet finished, this method awaits its completion.
   *   2. Gets the internal producer id and epoch, used in all future transactional
   *      messages issued by the producer.
   *
   * @throws IllegalStateException if the TransactionalId for the producer is not set
   *         in the configuration.
   */
  void initTransactions() throws IllegalStateException;
   
  /**
   * Should be called before the start of each new transaction.
   *
   * @throws ProducerFencedException if another producer is with the same
   *         transactional.id is active.
   */
  void beginTransaction() throws ProducerFencedException;
   
  /**
   * Sends a list of consumed offsets to the consumer group coordinator, and also marks
   * those offsets as part of the current transaction. These offsets will be considered
   * consumed only if the transaction is committed successfully.
   *
   * This method should be used when you need to batch consumed and produced messages
   * together, typically in a consume-transform-produce pattern.
   *
   * @throws ProducerFencedException if another producer is with the same
   *         transactional.id is active.
   */
  void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                                String consumerGroupId) throws ProducerFencedException;
   
  /**
   * Commits the ongoing transaction.
   *
   * @throws ProducerFencedException if another producer is with the same
   *         transactional.id is active.
   */
  void commitTransaction() throws ProducerFencedException;
   
  /**
   * Aborts the ongoing transaction.
   *
   * @throws ProducerFencedException if another producer is with the same
   *         transactional.id is active.
 
 
   */
  void abortTransaction() throws ProducerFencedException;
 
 
  /**
   * Send the given record asynchronously and return a future which will eventually contain the response information.
   *
   * @param record The record to send
   * @return A future which will eventually contain the response information
   *
   */
  public Future<RecordMetadata> send(ProducerRecord<K, V> record);
 
  /**
   * Send a record and invoke the given callback when the record has been acknowledged by the server
   */
  public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
}

The OutOfOrderSequence Exception
如果代理检测到数据丢失,生产者将引发OutOfOrderSequenceException。换句话说,如果它接收到的序列号大于它所期望的序列号。这个异常将在将来返回并传递给回调(如果有的话)。这是一个致命的异常,以后对产生器方法(如sendbeginTransactioncommitTransaction等)的调用将引发IlegalStateException

public class KafkaTransactionsExample {
  
  public static void main(String args[]) {
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);
 
 
    // Note that the ‘transactional.id’ configuration _must_ be specified in the
    // producer config in order to use transactions.
    KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);
 
    // We need to initialize transactions once per producer instance. To use transactions,
    // it is assumed that the application id is specified in the config with the key
    // transactional.id.
    //
    // This method will recover or abort transactions initiated by previous instances of a
    // producer with the same app id. Any other transactional messages will report an error
    // if initialization was not performed.
    //
    // The response indicates success or failure. Some failures are irrecoverable and will
    // require a new producer  instance. See the documentation for TransactionMetadata for a
    // list of error codes.
    producer.initTransactions();
     
    while(true) {
      ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
      if (!records.isEmpty()) {
        // Start a new transaction. This will begin the process of batching the consumed
        // records as well
        // as an records produced as a result of processing the input records.
        //
        // We need to check the response to make sure that this producer is able to initiate
        // a new transaction.
        producer.beginTransaction();
         
        // Process the input records and send them to the output topic(s).
        List<ProducerRecord<String, String>> outputRecords = processRecords(records);
        for (ProducerRecord<String, String> outputRecord : outputRecords) {
          producer.send(outputRecord);
        }
         
        // To ensure that the consumed and produced messages are batched, we need to commit
        // the offsets through
        // the producer and not the consumer.
        //
        // If this returns an error, we should abort the transaction.
         
        sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets());
         
      
        // Now that we have consumed, processed, and produced a batch of messages, let's
        // commit the results.
        // If this does not report success, then the transaction will be rolled back.
        producer.endTransaction();
      }
    }
  }
}

New Configurations

Broker configs
  • transactional.id.timeout.ms
    transaction coordinator主动终止 producer transactionalId之前等待的ms中的最大时间量(该事务协调器没有收到来自它的任何事务状态更新)
    默认值为604800000(7天)。
  • max.transaction.timeout.ms

事务允许的最大超时。如果客户端请求的事务时间超过这个值,那么代理将在InitPidRequest中返回InvalidTransactionTimeout错误。这可以防止客户端超时过大,这会阻止用户从事务中包含的主题进行读取。
默认值是900000(15分钟)。这是需要发送消息事务的时间段的保守上限。

Default is 900000 (15 min). This is a conservative upper bound on the period of time a transaction of messages will need to be sent.

  • transaction.state.log.replication.factor
    事务状态主题的副本数。
    Default: 3

  • transaction.state.log.num.partitions
    事务状态主题的分区数。
    Default: 50

  • transaction.state.log.min.isr
    线事务状态主题的每个分区的insync副本的最小数量。
    Default: 2

  • transaction.state.log.segment.bytes
    事务状态主题的段大小
    Default: 104857600 bytes.

Producer configs
  • enable.idempotence
    是否启用等幂性(默认为false)。如果禁用,生产者将不会在produce请求中设置PID字段,并且当前生产者交付语义将生效。注意,必须启用幂等性才能使用事务。
    当启用idempotence时,我们强制acks=all,retries > 1,并且max.inflight. request.per.connection =1。如果这些配置没有这些值,我们就不能保证幂等性。如果应用程序没有显式地覆盖这些设置,当启用幂等性时,生产者将设置
    acks=all, retries=Integer.MAX_VALUE,max.inflight.requests.per.connection=1

transaction.timeout.ms
事务协调器在主动终止正在进行的事务之前等待来自生产者的事务状态更新的ms中的最大时间量。
这个配置值将与InitPidRequest一起发送到事务协调器。如果该值大于max.transaction.timeout.ms在BROKER设置的ms,请求将失败,并出现InvalidTransactionTimeout错误
默认是60000。这使得事务不会阻塞下游消费超过一分钟,这在实时应用程序中通常是允许的。

transactional.id
用于事务传递的TransactionalId。这支持跨多个生产者会话的可靠性语义,因为它允许客户端保证使用相同TransactionalId的事务在启动任何新事务之前已经完成。如果不提供TransactionalId,则生产者仅限于幂等交付
注意,启用。如果配置了TransactionalId,则必须启用幂等性。
默认值为空,这意味着不能使用事务。

Consumer configs

isolation.level
以下是可能的值(默认为read_uncommitted):

read_uncommitted:消费uncommittedcommitted的消息 in offset ordering.

read_committed:只消费non-transactional messages(非开启事务的消息和)和committed transactional messages in offset order.未提交的消息对consumer不可见,只有在事务结束后,消息才对consumer可见。为了维持in offset ordering,这个设置意味着我们必须缓冲消费者中的消息,直到我们看到给定事务中的所有消息

Proposed Changes

Summary of Guarantees
  • Idempotent Producer Guarantees
    为了实现幂等生产者语义,我们引入了生产者id(后面成为为PID)和Kafka消息的sequence numbers的概念。在initialization过程中,每个新生产者都会被分配一个惟一的PID。PID分配对用户是完全透明的

对于给定的PID,序列号将从零开始并单调递增,每个主题分区产生一个序列号。在发送给Broker的每个消息上,Produer将递增序列号。Broker在内存中维护从每个PID接收到的每个TopicPartition的sequence numbers(<PID,TopicPartition>)。

  1. 如果一个produce request的sequence numbers大于PID/TopicPartition上次提交 committed message,BROKER将拒绝该请求。
  2. 较低sequence numbers 消息会导致重复错误,生产者可以忽略该错误。
  3. 具有较高sequence numbers的消息会导致序列错误,这表示一些消息已经丢失,并且是致命的

这确保了,即使producer在失败时必须retry requests,每条消息都将被精确地保存在日志中一次。此外,由于为new instance of a producer分配惟一的PID,因此我们只能保证在单个producer 会话中实现幂等生产。
这些幂等生产者语义对于无状态应用程序(如度量跟踪和审计)可能非常有用。

  • Transactional Guarantees
    Transactional,可以自动对多个TopicPartitions的写入保证事务,例如。作为一个单元,对这些主题分区的所有写操作都将成功或失败
    因为offsets topic记录了consumer 消费的进度,因此利用了上述功能,使应用程序能够将consumed and produced 的消息批次绑定到到一个原子单元中
    只有当整个“consu -transform- production”被完整地执行时,才可以认为一次完整的事务完成。

另外,有状态的应用也可以保证重启后从断点处继续处理,也即事务恢复。

为了实现这种效果,应用程序必须提供一个稳定的(重启后不变)唯一的ID,也即Transaction ID。Transactin ID与PID可能一一对应。区别在于Transaction ID由用户提供,而PID是内部的实现对用户透明

当提供这样一个TransactionalId时,Kafka将保证:

1.跨Session的数据幂等发送。当具有相同Transaction ID的新的Producer实例被创建且工作时,旧的且拥有相同Transaction ID的Producer将不再工作。
2.跨Session的事务恢复。如果某个应用实例宕机,新的实例可以保证任何未完成的旧的事务要么Commit要么Abort,使得新实例从一个正常状态开始工作。

需要注意的是,上述的事务保证是从Producer的角度去考虑的。从Consumer的角度来看,该保证会相对弱一些。尤其是不能保证所有被某事务Commit过的所有消息都被一起消费,因为:

  1. 对于压缩的Topic而言,同一事务的某些消息可能被其它版本覆盖
  2. 事务包含的消息可能分布在多个Segment中(即使在同一个Partition内),当老的Segment被删除时,该事务的部分数据可能会丢失
  3. Consumer在一个事务内可能通过seek方法访问任意Offset的消息,从而可能丢失部分消息
  4. Consumer可能并不需要消费某一事务内的所有Partition,因此它将永远不会读取组成该事务的所有消息

关键概念

为了实现Transaction,确保一组消息是自动的produce和consumer,我们引入了几个新概念:
我们将引入一个称为事务协调器的实体对象。与group coordinator类似,每个生产者都被分配一个transaction coordinator,所有分配pid和管理事务的逻辑都由transaction coordinator完成。

我们引入控制消息的概念。这些是写进用户主题特殊消息,由client
处理,但对用户透明。例如,可以使用它们让broker向consumer表明以前获取的消息是否已经原子提交。在此之前已经提出了控制消息

我们引入了TransactionalId的概念,使用户能够以持久的方式标识producer。具有相同TransactionalId的生产者的不同实例将能够resume(或abort)前一个实例实例化的任何事务

我们引入了producer epoch的概念,它使我们能够确保具有给定TransactionalId的producer只有一个合法的active实例,从而使我们能够在发生故障时维护保证事务。

除了上面的新概念之外,我们还引入了新的请求类型、现有请求的新版本和核心消息格式的新版本,以支持事务。所有这些的细节将在其他文档介绍。

完整事务过程

image.png
image.png

1. 找到 transaction coordinator -- the FindCoordinatorRequest
由于Transaction Coordinator是分配PID和管理事务的核心,因此Producer要做的第一件事情就是通过向任意一个Broker发送FindCoordinator请求找到Transaction Coordinator的位置。

  1. 获取 producer Id -- the InitPidRequest
    找到Transaction Coordinator后,具有幂等特性的Producer必须发起InitPidRequest请求以获取PID。
  • 2.1 如果事务特性被开启,InitPidRequest会发送给Transaction Coordinator。如果Transaction Coordinator是第一次收到包含有该Transaction ID的InitPidRequest请求,它将会把该<TransactionID, PID>存入Transaction Log,如上图中步骤2.1所示。这样可保证该对应关系被持久化,从而保证即使Transaction Coordinator宕机该对应关系也不会丢失。这使我们能够将TransactionalId的相同PID返回去实例化 producer,从而能够恢复或中止以前不完整的事务

除了返回PID外,InitPidRequest还会执行如下任务:
增加该PID对应的epoch。具有相同PID但epoch小于该epoch的其它Producer(如果有)新开启的事务将被拒绝。
恢复(Commit或Abort)之前的Producer未完成的事务(如果有)。

注意:InitPidRequest的处理过程是同步阻塞的。一旦该调用正确返回,生产者就可以发送数据并启动新的事务。

  • 2.2 另外,如果事务特性未开启,InitPidRequest可发送至任意Broker,则分配一个新的PID,并且该生产者在单个会话中只使用幂等语义和事务语义。
  1. 启动Transaction – The beginTransaction() API
    Kafka从0.11.0.0版本开始,提供beginTransaction()方法用于开启一个事务。调用该方法后,Producer本地会记录已经开启了事务,但Transaction Coordinator只有在Producer发送第一条消息后才认为事务已经开启。

  2. The consume-transform-produce loop
    这一阶段,包含了整个事务的数据处理过程,并且包含了多种请求。

  • 4.1 AddPartitionsToTxnRequest 注册partition
    一个Producer可能会给多个<Topic, Partition>发送数据,给一个新的<Topic, Partition>发送数据前,它需要先向Transaction Coordinator发送AddPartitionsToTxnRequest。

Transaction Coordinator会将该<Transaction, TopicPartition>存于Transaction Log内,并将其状态置为BEGIN,如上图中步骤4.1所示。有了该信息后,我们才可以在后续步骤中为每个Topic, Partition>写入commit或abort标记(如上图中步骤5.2所示)。

另外,如果该<Topic, Partition>为该事务中第一个<Topic, Partition>,Transaction Coordinator还会启动对该事务的计时(每个事务都有自己的超时时间

  • 4.2 ProduceRequest
    Producer通过一个或多个ProduceRequest发送一系列消息。除了应用数据外,该请求还包含了PID,epoch,和Sequence Number
    4.2a
    4.3 AddOffsetCommitsToTxnRequest
    Producer有一个新的KafkaProducer.sendOffsetsToTransaction API方法,它支持批量处理consumed and produced 的消息。这个方法接受Map<TopicPartitions、OffsetAndMetadata>和一个groupId参数。

该方法先判断在当前事务中该方法是否已经被调用并传入了相同的Group ID。若是,直接跳到下一步;若不是,则向Transaction Coordinator发送AddOffsetsToTxnRequests请求,Transaction Coordinator将对应的所有<Topic, Partition>存于Transaction Log中,并将其状态记为BEGIN,该方法会阻塞直到收到响应.in step 4.2a

4.4 TxnOffsetCommitRequest 提交消费偏移
作为sendOffsetsToTransaction方法的一部分,在处理完AddOffsetsToTxnRequest后,Producer也会发送TxnOffsetCommit请求给Consumer Coordinator从而将本事务包含的与读操作相关的各<Topic, Partition>的Offset持久化到内部的__consumer_offsets主题中,step 4.3a

在此过程中,Consumer Coordinator会通过PID和对应的epoch来验证是否应该允许该Producer的该请求。

这里需要注意:
写入__consumer_offsets的Offset信息在当前事务Commit前对外是不可见的。也即在当前事务被Commit前,可认为该Offset尚未Commit,偏移量在外部是不可见的,也即对应的消息尚未被完成处理。在事务提交之前,
Consumer Coordinator并不会立即更新缓存中相应<Topic, Partition>的Offset,因为此时这些更新操作尚未被COMMIT或ABORT。

5. Committing or Aborting a Transaction 提交或终止事务

一旦数据被写入,用户必须调用KafkaProducer.endTransaction方法或者KafkaProducer.abortTransaction方法开始提交或中止事务

5.1 EndTxnRequest

commitTransaction方法使得Producer写入的数据对下游Consumer可见。abortTransaction方法通过Transaction Marker将Producer写入的数据标记为Aborted状态。下游的Consumer如果将isolation.level设置为READ_COMMITTED,则它读到被Abort的消息后直接将其丢弃而不会返回给客户程序,也即被Abort的消息对应用程序不可见。

无论是Commit还是Abort,Producer都会发送EndTxnRequest请求(附加指示事务是Commit还是Abort的数据)给Transaction Coordinator

收到该请求后,Transaction Coordinator会进行如下操作

  1. 将PREPARE_COMMIT或PREPARE_ABORT消息写入Transaction Log,step 5.1a所示
  2. 通过WriteTxnMarkerRequest以Transaction Marker的形式将COMMIT或ABORT command消息写入用户数据日志以及Offset Log中,如上图中步骤5.2所示
  3. 最后将 COMMITTED (or ABORTED) message写入Transaction Log中,如上图中步骤5.3所示
5.2 WriteTxnMarkerRequest

上面提到的WriteTxnMarkerRequest由Transaction Coordinator发送给当前事务涉及到的每个<Topic, Partition>的Leader。在接收到这个请求时,每个BROKER都将向LOG写入一个COMMIT(PID)或ABORT(PID)控制消息,step 5.2a

该控制消息向Broker以及Consumer表明对应PID的消息被Commit了还是被Abort了。
在此之前,consumer将缓冲具有PID的message,直到读取相应的COMMIT或ABORT消息,然后分别交付或drop消息。

这里要注意,如果事务也涉及到__consumer_offsets,即该事务中有消费数据的操作且将该消费的Offset存于__consumer_offsets中,Transaction Coordinator也需要向该内部Topic的各Partition的Leader发送WriteTxnMarkerRequest从而写入COMMIT(PID)或COMMIT(PID)控制信息step 5.2a on the left

5.3 Writing the final Commit or Abort Message 写最终的commit或abort消息

在所有commit或abort控制消息写入datalog之后,事务协调器最终将COMMITTED or ABORTED message 写入transaction log,表明事务已完成step in 5.3。此时,可以删除事务日志中与事务相关的大多数消息。

此时,Transaction Log中所有关于该事务的消息全部可以移除。当然,由于Kafka内数据是Append Only的,不可直接更新和删除,这里说的移除只是将其标记为null从而在Log Compact时不再保留。

另外,COMPLETE_COMMIT或COMPLETE_ABORT的写入并不需要得到所有RREPLICAS的ACK,因为如果该消息丢失,可以根据事务协议重发。

我们只需要保留已完成事务的PID和时间戳,这样我们就可以最终删除为producer 产生的TransactionalId->PID mapping。请参阅下面的过期pid一节。

补充说明,如果参与该事务的某些<Topic, Partition>在被写入Transaction Marker前不可用,它对READ_COMMITTED的Consumer不可见,但不影响其它可用<Topic, Partition>的COMMIT或ABORT。在该<Topic, Partition>恢复可用后,Transaction Coordinator会重新根据PREPARE_COMMIT或PREPARE_ABORT向该<Topic, Partition>发送Transaction Marker。

总结
PID与Sequence Number的引入实现了写操作的幂等性
写操作的幂等性结合At Least Once语义实现了单一Session内的Exactly Once语义
Transaction Marker与PID提供了识别消息是否应该被读取的能力,从而实现了事务的隔离性
Offset的更新标记了消息是否被读取,从而将对读操作的事务处理转换成了对写(Offset)操作的事务处理
Kafka事务的本质是,将一组写操作(如果有)对应的消息与一组读操作(如果有)对应的Offset的更新进行同样的标记(即Transaction Marker)来实现事务中涉及的所有读写操作同时对外可见或同时对外不可见
Kafka只提供对Kafka本身的读写操作的事务性,不提供包含外部系统的事务性

参考自
https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka#TransactionalMessaginginKafka-ProducerIDsandstategroups
https://www.confluent.io/blog/transactions-apache-kafka/
https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer

https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-4.1AddPartitionsToTxnRequest

https://mp.weixin.qq.com/s/HBN-rSYRozNOsjMmk5cjLw
https://www.jianshu.com/p/f77ade3f41fd

https://www.infoq.cn/article/kafka-analysis-part-8

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,496评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,407评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,632评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,180评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,198评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,165评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,052评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,910评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,324评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,542评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,711评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,424评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,017评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,668评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,823评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,722评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,611评论 2 353