RocketMQ之producer发送消息源码分析

RocketMQ主要有NameServer,producer的发送,borker端的消息存储,consumer端的消费。首先我们先来看一下producer的发送
以下是producer的发送逻辑:
1.DefaultMQProducerImpl#sendDefaultImpl(入口)
2.判断一下topic是否可用,为空或者不可用则不处理DefaultMQProducerImpl#tryToFindTopicPublishInfo
3.如果topick可用,则判断一下他的发送模式来计算他的发送总次数
4.for循环发送次数,选择发送队列,如果队列为空,则直接跳出循环DefaultMQProducerImpl#selectOneMessageQueue
5.如果选择队列的耗时太长,那也不再进行处理。
6.调用发送主逻辑DefaultMQProducerImpl#sendKernelImpl
7.更新一下broker的可用时间,对broker的可用性进行更新DefaultMQProducerImpl#updateFaultItem
8.根据发送结果,来判断是否继续循环,如果是同步模式的话,就继续for循环。

private SendResult sendDefaultImpl(
       Message msg,
       final CommunicationMode communicationMode,
       final SendCallback sendCallback,
       final long timeout
   ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
       this.makeSureStateOK();
       Validators.checkMessage(msg, this.defaultMQProducer);

       final long invokeID = random.nextLong();
       long beginTimestampFirst = System.currentTimeMillis();
       long beginTimestampPrev = beginTimestampFirst;
       long endTimestamp = beginTimestampFirst;
       //获取一下topic信息
       TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
       if (topicPublishInfo != null && topicPublishInfo.ok()) {
           boolean callTimeout = false;
           MessageQueue mq = null;
           Exception exception = null;
           SendResult sendResult = null;
           //判断一下发送的模式,如果是同步的,就为重试次数+1
           int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
           int times = 0;
           String[] brokersSent = new String[timesTotal];
           for (; times < timesTotal; times++) {
               String lastBrokerName = null == mq ? null : mq.getBrokerName();
               //选择可用的队列
               MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
               if (mqSelected != null) {
                   mq = mqSelected;
                   brokersSent[times] = mq.getBrokerName();
                   try {
                       //判断一下获取队列的耗时,如果超时了就认为不可用,直接跳出循环
                       beginTimestampPrev = System.currentTimeMillis();
                       long costTime = beginTimestampPrev - beginTimestampFirst;
                       if (timeout < costTime) {
                           callTimeout = true;
                           break;
                       }
                       //发送的主逻辑
                       sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                       endTimestamp = System.currentTimeMillis();
                       //更新一下broker的可用时间,对broker的可用性进行判断
                       this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                       switch (communicationMode) {
                           case ASYNC:
                               return null;
                           case ONEWAY:
                               return null;
                           case SYNC:
                               //如果发送状态不为ok的话,判断是否需要重试,判断落盘失败,是否需要重试,需要的话就继续循环。默认是false。
                               if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                   if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                       continue;
                                   }
                               }

                               return sendResult;
                           default:
                               break;
                       }
                   } catch (RemotingException e) {
                       endTimestamp = System.currentTimeMillis();
                       this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                       log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                       log.warn(msg.toString());
                       exception = e;
                       continue;
                   } catch (MQClientException e) {
                       endTimestamp = System.currentTimeMillis();
                       this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                       log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                       log.warn(msg.toString());
                       exception = e;
                       continue;
                   } catch (MQBrokerException e) {
                       endTimestamp = System.currentTimeMillis();
                       this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                       log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                       log.warn(msg.toString());
                       exception = e;
                       switch (e.getResponseCode()) {
                           case ResponseCode.TOPIC_NOT_EXIST:
                           case ResponseCode.SERVICE_NOT_AVAILABLE:
                           case ResponseCode.SYSTEM_ERROR:
                           case ResponseCode.NO_PERMISSION:
                           case ResponseCode.NO_BUYER_ID:
                           case ResponseCode.NOT_IN_CURRENT_UNIT:
                               continue;
                           default:
                               if (sendResult != null) {
                                   return sendResult;
                               }

                               throw e;
                       }
                   } catch (InterruptedException e) {
                       endTimestamp = System.currentTimeMillis();
                       this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                       log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                       log.warn(msg.toString());

                       log.warn("sendKernelImpl exception", e);
                       log.warn(msg.toString());
                       throw e;
                   }
               } else {
                   break;
               }
           }

           if (sendResult != null) {
               return sendResult;
           }

           String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
               times,
               System.currentTimeMillis() - beginTimestampFirst,
               msg.getTopic(),
               Arrays.toString(brokersSent));

           info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

           MQClientException mqClientException = new MQClientException(info, exception);
           if (callTimeout) {
               throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
           }

           if (exception instanceof MQBrokerException) {
               mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
           } else if (exception instanceof RemotingConnectException) {
               mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
           } else if (exception instanceof RemotingTimeoutException) {
               mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
           } else if (exception instanceof MQClientException) {
               mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
           }

           throw mqClientException;
       }

       List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
       if (null == nsList || nsList.isEmpty()) {
           throw new MQClientException(
               "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
       }

       throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
           null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
   }

那么我们来看一下tryToFindTopicPublishInfo这个方法

    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        //从本地缓存中读取是否有该topic信息
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        //如果topic信息为空或者不可用则再从nameServer获取topic信息
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            //再从nameServer获取一下topic信息,此时获取的topic为默认的topic
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

updateTopicRouteInfoFromNameServer方法

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
        try {
            //从nameServer获取数据的时候锁一下
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                        //获取topic为AUTO_CREATE_TOPIC_KEY的topic
                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            1000 * 3);
                        if (topicRouteData != null) {
                            //赋值队列的读写队列数量
                            for (QueueData data : topicRouteData.getQueueDatas()) {
                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                data.setReadQueueNums(queueNums);
                                data.setWriteQueueNums(queueNums);
                            }
                        }
                    } else {
                        //获取对应的topic
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }
                    if (topicRouteData != null) {
                        TopicRouteData old = this.topicRouteTable.get(topic);
                        //判断一下topic的信息,如读写队列数量,broker信息有没有变
                        boolean changed = topicRouteDataIsChange(old, topicRouteData);
                        if (!changed) {
                            //判断一下是否需要更新topic信息
                            changed = this.isNeedUpdateTopicRouteInfo(topic);
                        } else {
                            log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                        }

                        if (changed) {
                            //赋值一份topic信息
                            TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
                            //重新往brokerAddrTable缓存里面塞入当前topic已经注册过的broker以及对应的地址
                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }

                            // Update Pub info
                            {
                                //更新一下写的topic信息
                                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                publishInfo.setHaveTopicRouterInfo(true);
                                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQProducerInner> entry = it.next();
                                    MQProducerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicPublishInfo(topic, publishInfo);
                                    }
                                }
                            }

                            // Update sub info
                            {
                                //更新一下读的topic信息
                                Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                                Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQConsumerInner> entry = it.next();
                                    MQConsumerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                    }
                                }
                            }
                            log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            return true;
                        }
                    } else {
                        log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
                    }
                } catch (Exception e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
                        log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                    }
                } finally {
                    this.lockNamesrv.unlock();
                }
            } else {
                log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
            }
        } catch (InterruptedException e) {
            log.warn("updateTopicRouteInfoFromNameServer Exception", e);
        }

        return false;
    }

选择队列selectOneMessageQueue

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
       //是否支持容错,默认是false
       if (this.sendLatencyFaultEnable) {
           try {
               //获取一下ThreadLocal里定义的index
               int index = tpInfo.getSendWhichQueue().getAndIncrement();
               //循环队列长度
               for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                   int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                   if (pos < 0)
                       pos = 0;
                   //获取相对应的队列
                   MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                   //判断当前的borker是否可用,如果可用的话,并且lastBrokerName为null(当for循环第一次发送时候lastBrokerName为null)
                   if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                       if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                           return mq;
                   }
               }
               //走到这一步,说明没有符合条件的broker,那么再取一个broker
               final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
               //获取当前broker的写队列数量
               int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
               if (writeQueueNums > 0) {
                   //获取下一个mq队列,然后将对应的brokername还有队列id赋值
                   final MessageQueue mq = tpInfo.selectOneMessageQueue();
                   if (notBestBroker != null) {
                       mq.setBrokerName(notBestBroker);
                       mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                   }
                   return mq;
               } else {
                   latencyFaultTolerance.remove(notBestBroker);
               }
           } catch (Exception e) {
               log.error("Error occurred when selecting message queue", e);
           }
           //重新获取队列
           return tpInfo.selectOneMessageQueue();
       }

       return tpInfo.selectOneMessageQueue(lastBrokerName);
   }

发送主逻辑

 private SendResult sendKernelImpl(final Message msg,
                                      final MessageQueue mq,
                                      final CommunicationMode communicationMode,
                                      final SendCallback sendCallback,
                                      final TopicPublishInfo topicPublishInfo,
                                      final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        //获取broker为master角色的broker地址
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        if (null == brokerAddr) {
            //r如果地址为空,重新获取一下topic信息
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

        SendMessageContext context = null;
        if (brokerAddr != null) {
            //是否开启vip通道,如果是的话,端口号减2为vip通道
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

            byte[] prevBody = msg.getBody();
            try {
                //for MessageBatch,ID has been set in the generating process
                if (!(msg instanceof MessageBatch)) {
                    MessageClientIDSetter.setUniqID(msg);
                }

                int sysFlag = 0;
                boolean msgBodyCompressed = false;
                //压缩一下msg信息
                if (this.tryToCompressMessage(msg)) {
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                    msgBodyCompressed = true;
                }
                //看一下是不是需要支持事务
                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }
                //发送信息校验一下
                if (hasCheckForbiddenHook()) {
                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                    checkForbiddenContext.setCommunicationMode(communicationMode);
                    checkForbiddenContext.setBrokerAddr(brokerAddr);
                    checkForbiddenContext.setMessage(msg);
                    checkForbiddenContext.setMq(mq);
                    checkForbiddenContext.setUnitMode(this.isUnitMode());
                    this.executeCheckForbiddenHook(checkForbiddenContext);
                }
                //发送消息的前置逻辑
                if (this.hasSendMessageHook()) {
                    context = new SendMessageContext();
                    context.setProducer(this);
                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    context.setCommunicationMode(communicationMode);
                    context.setBornHost(this.defaultMQProducer.getClientIP());
                    context.setBrokerAddr(brokerAddr);
                    context.setMessage(msg);
                    context.setMq(mq);
                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    if (isTrans != null && isTrans.equals("true")) {
                        context.setMsgType(MessageType.Trans_Msg_Half);
                    }

                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                        context.setMsgType(MessageType.Delay_Msg);
                    }
                    this.executeSendMessageHookBefore(context);
                }
                //构建发送消息体
                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                requestHeader.setTopic(msg.getTopic());
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                requestHeader.setQueueId(mq.getQueueId());
                requestHeader.setSysFlag(sysFlag);
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                requestHeader.setBatch(msg instanceof MessageBatch);
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }

                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {
                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }

                SendResult sendResult = null;
                //根据发送模式,进行发送message
                switch (communicationMode) {
                    case ASYNC:
                        Message tmpMessage = msg;
                        if (msgBodyCompressed) {
                            //If msg body was compressed, msgbody should be reset using prevBody.
                            //Clone new message using commpressed message body and recover origin massage.
                            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            msg.setBody(prevBody);
                        }
                        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeAsync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            tmpMessage,
                            requestHeader,
                            timeout - costTimeAsync,
                            communicationMode,
                            sendCallback,
                            topicPublishInfo,
                            this.mQClientFactory,
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                            context,
                            this);
                        break;
                    case ONEWAY:
                    case SYNC:
                        long costTimeSync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeSync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout - costTimeSync,
                            communicationMode,
                            context,
                            this);
                        break;
                    default:
                        assert false;
                        break;
                }

                if (this.hasSendMessageHook()) {
                    context.setSendResult(sendResult);
                    this.executeSendMessageHookAfter(context);
                }

                return sendResult;
            } catch (RemotingException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (MQBrokerException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (InterruptedException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } finally {
                msg.setBody(prevBody);
            }
        }

        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }

以上就是RocketMQ的producer发送消息的源码分析

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,137评论 6 511
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,824评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,465评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,131评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,140评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,895评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,535评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,435评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,952评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,081评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,210评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,896评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,552评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,089评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,198评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,531评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,209评论 2 357

推荐阅读更多精彩内容