KafkaProducer Sender 线程详解(含详细的执行流程图)

温馨提示:本文基于 Kafka 2.2.1 版本。

上文 《源码分析 Kafka 消息发送流程》 已经详细介绍了 KafkaProducer send 方法的流程,该方法只是将消息追加到 KafKaProducer 的缓存中,并未真正的向 broker 发送消息,本文将来探讨 Kafka 的 Sender 线程。

@TOC
在 KafkaProducer 中会启动一个单独的线程,其名称为 “kafka-producer-network-thread | clientID”,其中 clientID 为生产者的 id 。

1、Sender 线程详解

1.1 类图

在这里插入图片描述

我们先来看一下其各个属性的含义:

  • KafkaClient client
    kafka 网络通信客户端,主要封装与 broker 的网络通信。
  • RecordAccumulator accumulator
    消息记录累积器,消息追加的入口(RecordAccumulator 的 append 方法)。
  • Metadata metadata
    元数据管理器,即 topic 的路由分区信息。
  • boolean guaranteeMessageOrder
    是否需要保证消息的顺序性。
  • int maxRequestSize
    调用 send 方法发送的最大请求大小,包括 key、消息体序列化后的消息总大小不能超过该值。通过参数 max.request.size 来设置。
  • short acks
    用来定义消息“已提交”的条件(标准),就是 Broker 端向客户端承偌已提交的条件,可选值如下0、-1、1.
  • int retries
    重试次数。
  • Time time
    时间工具类。
  • boolean running
    该线程状态,为 true 表示运行中。
  • boolean forceClose
    是否强制关闭,此时会忽略正在发送中的消息。
  • SenderMetrics sensors
    消息发送相关的统计指标收集器。
  • int requestTimeoutMs
    请求的超时时间。
  • long retryBackoffMs
    请求失败之在重试之前等待的时间。
  • ApiVersions apiVersions
    API版本信息。
  • TransactionManager transactionManager
    事务处理器。
  • Map< TopicPartition, List< ProducerBatch>> inFlightBatches
    正在执行发送相关的消息批次。

1.2 run 方法详解

Sender#run

public void run() {
    log.debug("Starting Kafka producer I/O thread.");
    while (running) {   
        try {
            runOnce();    // @1
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }
    log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
    while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {    // @2
        try {
            runOnce();
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }
    if (forceClose) {                                                                                                                                     // @3
        log.debug("Aborting incomplete batches due to forced shutdown");
        this.accumulator.abortIncompleteBatches();
    }
    try {
        this.client.close();                                                                                                                               // @4
    } catch (Exception e) {
        log.error("Failed to close network client", e);
    }
    log.debug("Shutdown of Kafka producer I/O thread has completed.");
}

代码@1:Sender 线程在运行状态下主要的业务处理方法,将消息缓存区中的消息向 broker 发送。
代码@2:如果主动关闭 Sender 线程,如果不是强制关闭,则如果缓存区还有消息待发送,再次调用 runOnce 方法将剩余的消息发送完毕后再退出。
代码@3:如果强制关闭 Sender 线程,则拒绝未完成提交的消息。
代码@4:关闭 Kafka Client 即网络通信对象。

接下来将分别探讨其上述方法的实现细节。

1.2.1 runOnce 详解

Sender#runOnce

void runOnce() {
    // 此处省略与事务消息相关的逻辑
    long currentTimeMs = time.milliseconds();
    long pollTimeout = sendProducerData(currentTimeMs);   // @1
    client.poll(pollTimeout, currentTimeMs);                            // @2
}

本文不关注事务消息的实现原理,故省略了该部分的代码。
代码@1:调用 sendProducerData 方法发送消息。
代码@2:调用这个方法的作用?

接下来分别对上述两个方法进行深入探究。

1.1.2.1 sendProducerData

接下来将详细分析其实现步骤。
Sender#sendProducerData

Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

Step1:首先根据当前时间,根据缓存队列中的数据判断哪些 topic 的 哪些分区已经达到发送条件。达到可发送的条件将在 2.1.1.1 节详细分析。

Sender#sendProducerData

if (!result.unknownLeaderTopics.isEmpty()) {
    for (String topic : result.unknownLeaderTopics)
        this.metadata.add(topic);
    
    log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
                result.unknownLeaderTopics);
    this.metadata.requestUpdate();
}

Step2:如果在待发送的消息未找到其路由信息,则需要首先去 broker 服务器拉取对应的路由信息(分区的 leader 节点信息)。

Sender#sendProducerData

long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
    Node node = iter.next();
    if (!this.client.ready(node, now)) {
        iter.remove();
        notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
    }
}

Step3:移除在网络层面没有准备好的分区,并且计算在接下来多久的时间间隔内,该分区都将处于未准备状态。
1、在网络环节没有准备好的标准如下:

  • 分区没有未完成的更新元素数据请求(metadata)。
  • 当前生产者与对端 broker 已建立连接并完成了 TCP 的三次握手。
  • 如果启用 SSL、ACL 等机制,相关状态都已就绪。
  • 该分区对应的连接正在处理中的请求数时是否超过设定值,默认为 5,可通过属性 max.in.flight.requests.per.connection 来设置。

2、client pollDelayMs 预估分区在接下来多久的时间间隔内都将处于未转变好状态(not ready),其标准如下:

  • 如果已与对端的 TCP 连接已创建好,并处于已连接状态,此时如果没有触发限流,则返回0,如果有触发限流,则返回限流等待时间。
  • 如果还位于对端建立 TCP 连接,则返回 Long.MAX_VALUE,因为连接建立好后,会唤醒发送线程的。

Sender#sendProducerData

// create produce requests
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);

Step4:根据已准备的分区,从缓存区中抽取待发送的消息批次(ProducerBatch),并且按照 nodeId:List<ProducerBatch> 组织,注意,抽取后的 ProducerBatch 将不能再追加消息了,就算还有剩余空间可用,具体抽取将在下文在详细介绍。

Sender#sendProducerData

addToInflightBatches(batches);
public void addToInflightBatches(Map<Integer, List<ProducerBatch>> batches) {
    for (List<ProducerBatch> batchList : batches.values()) {
        addToInflightBatches(batchList);
    }
}
private void addToInflightBatches(List<ProducerBatch> batches) {
    for (ProducerBatch batch : batches) {
        List<ProducerBatch> inflightBatchList = inFlightBatches.get(batch.topicPartition);
        if (inflightBatchList == null) {
            inflightBatchList = new ArrayList<>();
            inFlightBatches.put(batch.topicPartition, inflightBatchList);
        }
        inflightBatchList.add(batch);
    }
}

Step5:将抽取的 ProducerBatch 加入到 inFlightBatches 数据结构,该属性的声明如下:Map<TopicPartition, List< ProducerBatch >> inFlightBatches,即按照 topic-分区 为键,存放已抽取的 ProducerBatch,这个属性的含义就是存储待发送的消息批次。可以根据该数据结构得知在消息发送时以分区为维度反馈 Sender 线程的“积压情况”,max.in.flight.requests.per.connection 就是来控制积压的最大数量,如果积压达到这个数值,针对该队列的消息发送会限流。

Sender#sendProducerData

accumulator.resetNextBatchExpiryTime();
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);

Step6:从 inflightBatches 与 batches 中查找已过期的消息批次(ProducerBatch),判断是否过期的标准是系统当前时间与 ProducerBatch 创建时间之差是否超过120s,过期时间可以通过参数 delivery.timeout.ms 设置。

Sender#sendProducerData

if (!expiredBatches.isEmpty())
    log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
    String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
                + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
    failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
    if (transactionManager != null && expiredBatch.inRetry()) {
        // This ensures that no new batches are drained until the current in flight batches are fully resolved.
        transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
    }
}

Step7:处理已超时的消息批次,通知该批消息发送失败,即通过设置 KafkaProducer#send 方法返回的凭证中的 FutureRecordMetadata 中的 ProduceRequestResult result,使之调用其 get 方法不会阻塞。

Sender#sendProducerData

sensors.updateProduceRequestMetrics(batches);

Step8:收集统计指标,本文不打算详细分析,但后续会专门对 Kafka 的 Metrics 设计进行一个深入的探讨与学习。

Sender#sendProducerData

long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
    log.trace("Nodes with data ready to send: {}", result.readyNodes);
    pollTimeout = 0;
}

Step9:设置下一次的发送延时,待补充详细分析。

Sender#sendProducerData

sendProduceRequests(batches, now);
private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
    for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
        sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
}

Step10:该步骤按照 brokerId 分别构建发送请求,即每一个 broker 会将多个 ProducerBatch 一起封装成一个请求进行发送,同一时间,每一个 与 broker 连接只会只能发送一个请求,注意,这里只是构建请求,并最终会通过 NetworkClient#send 方法,将该批数据设置到 NetworkClient 的待发送数据中,此时并没有触发真正的网络调用。

sendProducerData 方法就介绍到这里了,既然这里还没有进行真正的网络请求,那在什么时候触发呢?

我们继续回到 runOnce 方法。

1.2.1.2 NetworkClient 的 poll 方法
 public List<ClientResponse> poll(long timeout, long now) {
    ensureActive();

    if (!abortedSends.isEmpty()) {
        // If there are aborted sends because of unsupported version exceptions or disconnects,
        // handle them immediately without waiting for Selector#poll.
        List<ClientResponse> responses = new ArrayList<>();
        handleAbortedSends(responses);
        completeResponses(responses);
        return responses;
    }

    long metadataTimeout = metadataUpdater.maybeUpdate(now);   // @1
    try {
        this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));    // @2
    } catch (IOException e) {
        log.error("Unexpected error during I/O", e);
    }

    // process completed actions
    long updatedNow = this.time.milliseconds();
    List<ClientResponse> responses = new ArrayList<>();            // @3
    handleCompletedSends(responses, updatedNow);
    handleCompletedReceives(responses, updatedNow);
    handleDisconnections(responses, updatedNow);
    handleConnections();
    handleInitiateApiVersionRequests(updatedNow);
    handleTimedOutRequests(responses, updatedNow);
    completeResponses(responses);                                               // @4
    return responses;
}

本文并不会详细深入探讨其网络实现部分,Kafka 的 网络通讯后续我会专门详细的介绍,在这里先点出其关键点。
代码@1:尝试更新云数据。
代码@2:触发真正的网络通讯,该方法中会通过收到调用 NIO 中的 Selector#select() 方法,对通道的读写就绪事件进行处理,当写事件就绪后,就会将通道中的消息发送到远端的 broker。
代码@3:然后会消息发送,消息接收、断开连接、API版本,超时等结果进行收集。
代码@4:并依次对结果进行唤醒,此时会将响应结果设置到 KafkaProducer#send 方法返回的凭证中,从而唤醒发送客户端,完成一次完整的消息发送流程。

Sender 发送线程的流程就介绍到这里了,接下来首先给出一张流程图,然后对上述流程中一些关键的方法再补充深入探讨一下。

1.2.2 run 方法流程图
在这里插入图片描述

根据上面的源码分析得出上述流程图,图中对重点步骤也详细标注了其关键点。下面我们对上述流程图中 Sender 线程依赖的相关类的核心方法进行解读,以便加深 Sender 线程的理解。

由于在讲解 Sender 发送流程中,大部分都是调用 RecordAccumulator 方法来实现其特定逻辑,故接下来重点对上述涉及到RecordAccumulator 的方法进行一个详细剖析,加强对 Sender 流程的理解。

2、RecordAccumulator 核心方法详解

2.1 RecordAccumulator 的 ready 方法详解

该方法主要就是根据缓存区中的消息,判断哪些分区已经达到发送条件。

RecordAccumulator#ready

public ReadyCheckResult ready(Cluster cluster, long nowMs) {
    Set<Node> readyNodes = new HashSet<>();
    long nextReadyCheckDelayMs = Long.MAX_VALUE;
    Set<String> unknownLeaderTopics = new HashSet<>();

    boolean exhausted = this.free.queued() > 0;
    for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {   // @1
        TopicPartition part = entry.getKey();
        Deque<ProducerBatch> deque = entry.getValue();

        Node leader = cluster.leaderFor(part);   // @2
        synchronized (deque) {
            if (leader == null && !deque.isEmpty()) {   // @3
                // This is a partition for which leader is not known, but messages are available to send.
                // Note that entries are currently not removed from batches when deque is empty.
                unknownLeaderTopics.add(part.topic());
            } else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) {    // @4
                ProducerBatch batch = deque.peekFirst();
                if (batch != null) {
                    long waitedTimeMs = batch.waitedTimeMs(nowMs);
                    boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
                    long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                    boolean full = deque.size() > 1 || batch.isFull();
                    boolean expired = waitedTimeMs >= timeToWaitMs;
                    boolean sendable = full || expired || exhausted || closed || flushInProgress();
                    if (sendable && !backingOff) {   // @5
                        readyNodes.add(leader);
                    } else {
                        long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                        // Note that this results in a conservative estimate since an un-sendable partition may have
                        // a leader that will later be found to have sendable data. However, this is good enough
                        // since we'll just wake up and then sleep again for the remaining time.
                        nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);   
                    }
                }
            }
        }
    }
    return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}

代码@1:对生产者缓存区 ConcurrentHashMap<TopicPartition, Deque< ProducerBatch>> batches 遍历,从中挑选已准备好的消息批次。
代码@2:从生产者元数据缓存中尝试查找分区(TopicPartition) 的 leader 信息,如果不存在,当将该 topic 添加到 unknownLeaderTopics (代码@3),稍后会发送元数据更新请求去 broker 端查找分区的路由信息。
代码@4:如果不在 readyNodes 中就需要判断是否满足条件,isMuted 与顺序消息有关,本文暂时不关注,在后面的顺序消息部分会重点探讨。
代码@5:这里就是判断是否准备好的条件,先一个一个来解读局部变量的含义。

  • long waitedTimeMs
    该 ProducerBatch 已等待的时长,等于当前时间戳 与 ProducerBatch 的 lastAttemptMs 之差,在 ProducerBatch 创建时或需要重试时会将当前的时间赋值给lastAttemptMs。
  • retryBackoffMs
    当发生异常时发起重试之前的等待时间,默认为 100ms,可通过属性 retry.backoff.ms 配置。
  • batch.attempts()
    该批次当前已重试的次数。
  • backingOff
    后台发送是否关闭,即如果需要重试并且等待时间小于 retryBackoffMs ,则 backingOff = true,也意味着该批次未准备好。
  • timeToWaitMs
    send 线程发送消息需要的等待时间,如果 backingOff 为 true,表示该批次是在重试,并且等待时间小于系统设置的需要等待时间,这种情况下 timeToWaitMs = retryBackoffMs 。否则需要等待的时间为 lingerMs。
  • boolean full
    该批次是否已满,如果两个条件中的任意一个满足即为 true。
    • Deque< ProducerBatch> 该队列的个数大于1,表示肯定有一个 ProducerBatch 已写满。
    • ProducerBatch 已写满。
  • boolean expired
    是否过期,等于已经等待的时间是否大于需要等待的时间,如果把发送看成定时发送的话,expired 为 true 表示定时器已到达触发点,即需要执行。
  • boolean exhausted
    当前生产者缓存已不够,创建新的 ProducerBatch 时阻塞在申请缓存空间的线程大于0,此时应立即将缓存区中的消息立即发送到服务器。
  • boolean sendable
    是否可发送。其满足下面的任意一个条件即可:
    • 该批次已写满。(full = true)。
    • 已等待系统规定的时长。(expired = true)
    • 发送者内部缓存区已耗尽并且有新的线程需要申请(exhausted = true)。
    • 该发送者的 close 方法被调用(close = true)。
    • 该发送者的 flush 方法被调用。

2.2 RecordAccumulator 的 drain方法详解

RecordAccumulator#drain

public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) { // @1
    if (nodes.isEmpty())
        return Collections.emptyMap();

    Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
    for (Node node : nodes) {                                                                                                                              
        List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);                      // @2
        batches.put(node.id(), ready);
    }
    return batches;
}

代码@1:我们首先来介绍该方法的参数:

  • Cluster cluster
    集群信息。
  • Set< Node> nodes
    已准备好的节点集合。
  • int maxSize
    一次请求最大的字节数。
  • long now
    当前时间。

代码@2:遍历所有节点,调用 drainBatchesForOneNode 方法抽取数据,组装成 Map<Integer /** brokerId */, List< ProducerBatch>> batches。

接下来重点来看一下 drainBatchesForOneNode。
RecordAccumulator#drainBatchesForOneNode

private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
    int size = 0;
    List<PartitionInfo> parts = cluster.partitionsForNode(node.id());   // @1
    List<ProducerBatch> ready = new ArrayList<>();
    int start = drainIndex = drainIndex % parts.size();                        // @2
    do {                                                                                                // @3 
        PartitionInfo part = parts.get(drainIndex);
        TopicPartition tp = new TopicPartition(part.topic(), part.partition()); 
        this.drainIndex = (this.drainIndex + 1) % parts.size();                     
            
        if (isMuted(tp, now))
            continue;

        Deque<ProducerBatch> deque = getDeque(tp);                              // @4
        if (deque == null)
            continue;

        synchronized (deque) {
            // invariant: !isMuted(tp,now) && deque != null
            ProducerBatch first = deque.peekFirst();                                         // @5
            if (first == null)
                continue;

            // first != null
            boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;   // @6
            // Only drain the batch if it is not during backoff period.
            if (backoff)                                                                                     
                continue;

            if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {     // @7
                break;
            } else {
                if (shouldStopDrainBatchesForPartition(first, tp))                                  
                    break;

                // 这里省略与事务消息相关的代码,后续会重点学习。
                batch.close();                                                                                            // @8
                size += batch.records().sizeInBytes();
                ready.add(batch);                                                                            

                batch.drained(now);                                                                             
            }
        }
    } while (start != drainIndex);
    return ready;
}

代码@1:根据 brokerId 获取该 broker 上的所有主分区。
代码@2:初始化 start。这里首先来阐述一下 start 与 drainIndex 。

  • start 当前开始遍历的分区序号。
  • drainIndex 上次抽取的队列索引后,这里主要是为了每个队列都是从零号分区开始抽取。

代码@3:循环从缓存区抽取对应分区中累积的数据。
代码@4:根据 topic + 分区号从生产者发送缓存区中获取已累积的双端Queue。
代码@5:从双端队列的头部获取一个元素。(消息追加时是追加到队列尾部)。
代码@6:如果当前批次是重试,并且还未到阻塞时间,则跳过该分区。
代码@7:如果当前已抽取的消息总大小 加上新的消息已超过 maxRequestSize,则结束抽取。
代码@8:将当前批次加入到已准备集合中,并关闭该批次,即不在允许向该批次中追加消息。

关于消息发送就介绍到这里,NetworkClient 的 poll 方法内部会调用 Selector 执行就绪事件的选择,并将抽取的消息通过网络发送到 Broker 服务器,关于网络后面的具体实现,将在后续文章中单独介绍。

作者信息:丁威,《RocketMQ技术内幕》作者,维护 中间件兴趣圈公众号,目前主要发表了源码阅读java集合、JUC(java并发包)、Netty、ElasticJob、Mycat、Dubbo、RocketMQ、mybaits等系列源码。


作者介绍:丁威,《RocketMQ技术内幕》作者,RocketMQ 社区布道师,公众号:中间件兴趣圈 维护者,目前已陆续发表源码分析Java集合、Java 并发包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源码专栏。可以点击链接加入中间件知识星球 ,一起探讨高并发、分布式服务架构,交流源码。

中间件兴趣圈

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 221,820评论 6 515
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,648评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,324评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,714评论 1 297
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,724评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,328评论 1 310
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,897评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,804评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,345评论 1 318
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,431评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,561评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,238评论 5 350
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,928评论 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,417评论 0 24
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,528评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,983评论 3 376
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,573评论 2 359

推荐阅读更多精彩内容

  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,111评论 1 32
  • ORA-00001: 违反唯一约束条件 (.) 错误说明:当在唯一索引所对应的列上键入重复值时,会触发此异常。 O...
    我想起个好名字阅读 5,335评论 0 9
  • talk is easy,show me the code,先来看一段创建producer的代码 生产者的API使...
    zy_think123阅读 1,117评论 0 3
  • MQ(消息队列)是跨进程通信的方式之一,可理解为异步rpc,上游系统对调用结果的态度往往是重要不紧急。使用消息队列...
    allin8116阅读 521评论 0 0
  • 【日精进打卡第273.天】 【知-学习】 1、背诵大纲1遍,大学1遍 【经典名句分享】 【行-实践】 修身: 齐家...
    吴佳妮_ab17阅读 79评论 0 0