Spring Cloud 集成阿里rocketMq 发送延时/定时/普通消息 解决消费轨迹未消费问题


spring cloud stream 介绍(照搬)

  • Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 SpringBoot 来创建具有生产级别的单机 Spring 应用,并且使用 Spring Integration 与 Broker 进行连接。
  • Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。
  • Spring Cloud Stream 内部有两个概念:Binder 和 Binding。
  • Binder: 跟外部消息中间件集成的组件,用来创建 Binding,各消息中间件都有自己的 Binder 实现。
    比如 Kafka 的实现 KafkaMessageChannelBinderRabbitMQ 的实现 RabbitMessageChannelBinder 以及 RocketMQ的实现 RocketMQMessageChannelBinder
  • Binding: 包括 Input Binding 和 Output Binding。
  • Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。

版本选择

因为不是用于开发,仅供学习用所以我参照了下阿里的版本,选用了最新的,具体依据自身项目做参考
阿里git版本说明传送门

阿里组件

毕业版本依赖


一. pom.xml 配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.2</version>
    </parent>
    <groupId>org.example</groupId>
    <artifactId>spring-alibaba-cloud-rocketmq-studytest</artifactId>
    <version>1.0-SNAPSHOT</version>
    <description>阿里巴巴cloud-rocketmq学习</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <!-- spring cloud 版本依赖-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>2020.0.0</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <!-- spring alibaba cloud 版本依赖-->
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>2021.1</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <!-- rocketmq 依赖-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        </dependency>
        <!-- spring boot web依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>
</project>

二. 自定义消息channel与rocketMq配置

上面我们引入了spring cloud alibab rocketmq相关依赖,下面我们开始消息通道与yml关于rocketmq的配置
由于阿里的spring-cloud-starter-stream-rocketmq 是依赖spring的stream binder实现的,所以rocketMq配置分为rocketMq的自定义配置与stream binder的公共配置,如下:

  • spring.cloud.stream.rocketmq 为rocketmq自定义配置
  • spring.cloud.stream.bindings 为srping cloud stream binder公共配置,以此来达到对Apache Kafka
    RabbitMQ等消息中间件的扩展

  • 1. 自定义普通消息

  • 普通消息YML配置
spring:
  cloud:
    stream:
      # 阿里rocketMq配置 topic 与 group 均以 实例id% 为前缀配置 如实例id为 MQ_INST_XXXX_XXX 则group或topic 配置 MQ_INST_XXXX_XXX%grouID
      rocketmq:
        binder:
          # 【若为阿里云购买服务,则为控制台的对外或对内实例地址】【若自己搭建的服务,为自定义rocketmq服务地址127.0.0.1:9876】
          name-server: http://MQ_INST_XXXX_XXX.DD.FFF.aliyuncs.com:80
          # 阿里access-key 【购买阿里服务 控制台获取填写 若为自搭服务可不填】
          access-key: AAAAAAAAAAA
          # 阿里secret-key 【购买阿里服务 控制台获取填写 若为自搭服务可不填】
          secret-key: BBBBBBBBBBBBBB
        # rocketMq 自定义消息通道配置
        bindings:
          # 阿里rocketMq binder 生产者配置
          ### 普通生产消息通道
          customized_output_channel: {producer.group: MQ_INST_XXXX_XXX%GID_QIGUANBANG_DEV}
          # 阿里rocketMq binder 消费者配置
          ### 普通消息订阅通道
          customized_input_channel: {consumer.tags: test_consumer_tag}

      # stream binder 公共配置
      bindings:
        # spring cloud stream binder 生产者配置
        ### 普通消息通道
        customized_output_channel: {destination: MQ_INST_XXXX_XXX%mg_common_topic, content-type: application/json}
        # spring cloud stream binder 消费者配置
        ### 普通消息订阅通道
        customized_input_channel: {destination: MQ_INST_XXXX_XXX%mg_common_topic, group: MQ_INST_XXXX_XXX%GID_QIGUANBANG_DEV, content-type: application/json}

关于rocketmq的group 与 topic在yml中的书写方式,官方文档是这么写的
topic 和 group 请以 实例id% 为前缀进行配置。比如 topic 为 "test",需要配置成 "实例id%test"
官方文档地址 滑到最后,但是我试过去掉后也能正常使用(可能出于兼容自搭RocketMq服务的目的),可能是购买阿里服务的需要这么填写,消息轨迹或者其他内容需要获取实例信息,这样书写方便快速获取?具体原因还需观察源码,暂时按照官方的来。

  • 自定义channel接口

spring cloud stream 提供了自定义的Mesage接口 SourceSink供开发者使用,通过在程序启动类或者服务类添加注解来启用, 如下:

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;

@Slf4j
@EnableBinding({Source.class, Sink.class})
@SpringBootApplication
public class RocketApplication {
    public static void main(String[] args) {
        SpringApplication.run(RocketApplication.class, args);
        log.debug("==========rocketMq服务启动成功!==========");
    }
}
@Component
@EnableBinding(Source.class)
public class RocketMqService {

Source 提供了生产者的接口,而Sink提供了消费者的接口,通过观察源码,我们可以发现,接口类的内容十分简单。

  • source
package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * Bindable interface with one output channel.
 *
 * @author Dave Syer
 * @author Marius Bogoevici
 * @see org.springframework.cloud.stream.annotation.EnableBinding
 */
public interface Source {

    /**
     * Name of the output channel.
     */
    String OUTPUT = "output";

    /**
     * @return output channel
     */
    @Output(Source.OUTPUT)
    MessageChannel output();

}
  • sink
package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

/**
 * Bindable interface with one input channel.
 *
 * @author Dave Syer
 * @author Marius Bogoevici
 * @see org.springframework.cloud.stream.annotation.EnableBinding
 */
public interface Sink {

    /**
     * Input channel name.
     */
    String INPUT = "input";

    /**
     * @return input channel.
     */
    @Input(Sink.INPUT)
    SubscribableChannel input();

}

而且spring cloud stream 也支持我们自定义message通道,所以我们可以通过根据自己的业务来制定不同的消息通道,以此来满足我们的业务需求,示列如下:

  • 自定义消息生产通道接口
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface OutputChannel {
    // 普通消息生产通道 对应yml自定义节点名称
    String NORMAL_PRODUCER_CHANNEL = "customized_output_channel";

    @Output(OutputChannel.NORMAL_PRODUCER_CHANNEL)
    MessageChannel NormalOutput();
}
  • 自定义消息订阅通道接口
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface InputChannel {
    // 普通消息订阅通道 对应yml自定义节点名称
    String NORMAL_CONSUMER_CHANNEL = "customized_input_channel";

    @Input(InputChannel.NORMAL_CONSUMER_CHANNEL)
    SubscribableChannel normalConsumerChannel();
}
启动类启用
import com.study.rocketmq.channel.InputChannel;
import com.study.rocketmq.channel.OutputChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

@Slf4j
@EnableBinding({InputChannel.class, OutputChannel.class})
@SpringBootApplication
public class RocketApplication {
    public static void main(String[] args) {
        SpringApplication.run(RocketApplication.class, args);
        log.debug("==========rocketMq服务启动成功!==========");
    }
}

环境配置和代码配置已经好了,下面后门开始写消息生产方法和消息订阅

  • 普通消息发送
// controller
@RestController
@RequestMapping("/msg")
public class TestMsgController {

    @Autowired
    ProducerService producerService;

    @GetMapping("/sendMsg/{msg}")
    public String sendMsg(@PathVariable("msg")String msg){
        producerService.sendNormalMsg(msg, "test_consumer_tag", "testKey");
        return "SUCCESS";
    }
}
import com.study.rocketmq.channel.OutputChannel;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class ProducerService {

    @Autowired
    private OutputChannel outputChannel;
    /**
     * 发送普通消息
     * @param message          消息内容
     * @param consumerTag   消费者group标识
     * @param msgKey            消息key
     * @return
     */
    public boolean sendNormalMsg(String message, String consumerTag, String msgKey){
        // 构建消息
        Message<String> messageBuild = MessageBuilder.withPayload(message)
                .setHeader(MessageConst.PROPERTY_TAGS, consumerTag)
                .setHeader(MessageConst.PROPERTY_KEYS, msgKey)
                .build();
        // 发送消息
        boolean sendResult = outputChannel.NormalOutput().send(messageBuild);
        if (sendResult){
            log.info("普通消息发送成功-consumerTag:{}-msgKey:{}", consumerTag, msgKey);
        }else {
            log.error("普通消息发送失败!:{}",  consumerTag, msgKey);
        }
        return sendResult;
    }
}
import com.study.rocketmq.channel.InputChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

/**
  * @ClassName MessageListener
  * @author lgq
  * @description //消息监听
  * @Date 2021/6/9
  * @Version V1.0
  */
@Slf4j
@Component
public class MessageListener {
    
    // 通过StreamListener监听消息 只允许rocketmq_KEYS = testKey接收
    @StreamListener(value = InputChannel.NORMAL_CONSUMER_CHANNEL, condition = "headers['rocketmq_KEYS'] == 'testKey'")
    public void receivePayMsg(@Payload String payResult) {
        log.debug("接收到普通消息:{}", payResult);
    }
}

通过ApiPost工具请求,默认打印SUCCESS字符,观察控制台发现没有发送成功。打开控制台也没看到我们本地的客户端注册上了。


ApiPost请求

错误日志

阿里云rocketMq控制台

后来通过查询资料得知,可能阿里的rocketMq服务版本比较高,ons客户端版本已经到了4.8而spring-cloud-starter-stream-rocketmq所使用的版本才4.4.0,所以我们排除掉它自带的依赖,引入最新的。


image.png
<!-- rocketmq 依赖-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
            <!-- 排除自带rocketMq-client依赖【低版本消息无法发送成功】-->
            <exclusions>
                <exclusion>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-client</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-acl</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- rocketMq -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-acl</artifactId>
            <version>4.8.0</version>
        </dependency>

然后重新启动,刷新阿里控制台,发现已经注册上了


阿里云rocketmq控制台

尝试重新发送消息


发送成功
  • 2. 自定义延时/定时消息

    • YML 添加如下配置
             bindings:
                 # 阿里rocketMq binder 生产者配置
                 ### 延时消息生产 producer.sync 属性需设置为true
                 delay_output_channel: {producer.group: MQ_INST_XXXX_XXX%GID_QIGUANBANG_DEV, producer.sync: true}
                 # 阿里rocketMq binder 消费者配置
                 ### 延时消息订阅
                 delay_input_channel: {consumer.tags: test_delay_tag}
             bindings:
               # spring cloud stream binder 生产者配置
               ### 延时消息
               delay_output_channel: {destination: MQ_INST_XXXX_XXX%common-delay-topic, content-type: application/json}
               # spring cloud stream binder 消费者配置
               ### 延时消息订阅
               delay_input_channel: {destination: MQ_INST_XXXX_XXX%common-delay-topic, group: MQ_INST_XXXX_XXX%GID_QIGUANBANG_DEV, content-type: application/json}
    
    • InputChannel 接口添加如下方法
        // 延时/定时消息订阅通道 对应yml自定义节点名称
       String DELAY_CONSUMER_CHANNEL = "delay_input_channel";
       
       // 延时/定时消息订阅
       @Input(DELAY_CONSUMER_CHANNEL)
       SubscribableChannel delayConsumerChannel();
    
    • OutputChannel 接口添加如下方法
     // 延时或定时消息生产通道
     String DELAY_PRODUCER_CHANNEL = "delay_output_channel";
    
     @Output(DELAY_PRODUCER_CHANNEL)
     MessageChannel delayOutput();
    
    • ProducerService 添加如下方法
         /**
      * 延时消息发送
      * @param message   延时消息体
      * @param delayLevel 延时级别 1~18 (1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 【 1=1s 2=5s 3=10s】)
      * @param ConsumerTag 消费者TAG标识 通过TAG区分消费对象
      * @param MsgKey  消息key 可以通过该字段再次区分
      * @return
      */
     public boolean sendDelayMsg(String message, int delayLevel, String ConsumerTag, String MsgKey){
         // 构建消息
         Message<String> messageBuild = MessageBuilder.withPayload(message)
                 .setHeader(MessageConst.PROPERTY_TAGS, ConsumerTag)
                 .setHeader(MessageConst.PROPERTY_KEYS, MsgKey)
                 .setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayLevel)
                 .build();
         // 发送消息
         boolean sendResult = outputChannel.delayOutput().send(messageBuild);
         if (sendResult){
             log.info("延时消息发送成功-ConsumerTag:{}-MsgKey:{}", ConsumerTag, MsgKey);
         }else {
             log.error("延时消息发送失败!:{}",  ConsumerTag, MsgKey);
         }
         return sendResult;
     }
    
    
     /**
      * https://help.aliyun.com/document_detail/43349.html
      * rocketMq 指定时间消息发送
      * @param message 消息内容
      * @param ConsumerTag 消费者group标识
      * @param MsgKey 消息key
      * @param fixedTime 指定时间戳 指定时间戳必须大于当前时间 否则立即消费 参数可设置40天内的任何时刻(单位毫秒),超过40天消息发送将失败
      * @return
      */
     public boolean sendFixedTimeMsg(String message, String ConsumerTag, String MsgKey, long fixedTime){
         // 构建消息 __STARTDELIVERTIME 为发送定时任务需要的请求头
         Message<String> messageBuild = MessageBuilder.withPayload(message)
                 .setHeader(MessageConst.PROPERTY_TAGS, ConsumerTag)
                 .setHeader(MessageConst.PROPERTY_KEYS, MsgKey)
                 .setHeader("__STARTDELIVERTIME", fixedTime)
                 .build();
         // 发送消息
         boolean sendResult = outputChannel.delayOutput().send(messageBuild);
         if (sendResult){
             log.info("定时消息发送成功-ConsumerTag:{}-MsgKey:{}", ConsumerTag, MsgKey);
         }else {
             log.error("定时消息发送失败!:{}",  ConsumerTag, MsgKey);
         }
         return sendResult;
     }
    
    • MessageListener 监听器添加如下监听
     // 监听定时/延时消息通道,只允许key = delayMsg 通过
     @StreamListener(value = InputChannel.DELAY_CONSUMER_CHANNEL, condition = "headers['rocketmq_KEYS'] == 'delayMsg'")
     public void receiveDelayMsg(@Payload String payResult) {
         log.debug("接收到延时消息:{}", payResult);
     }
    
     // 监听定时/延时消息通道,只允许key = fixTimeMsg通过
     @StreamListener(value = InputChannel.DELAY_CONSUMER_CHANNEL, condition = "headers['rocketmq_KEYS'] == 'fixTimeMsg'")
     public void receivefixTimeMsg(@Payload String payResult) {
         log.debug("接收到定时消息:{}", payResult);
     }
    

    发送延时任务 级别定义为2【对应5s】 消息tag:test_delay_tag; 消息key:delayMsg; 消息体:延时5s;

      @GetMapping("/sendMsg/{msg}")
      public String sendMsg(@PathVariable("msg")String msg){
          producerService.sendDelayMsg(msg, 2,"test_delay_tag", "delayMsg");
          return "SUCCESS";
      }
    

    发送结果:
    发送成功

    发送定时任务 消息tag:test_delay_tag; 消息key:fixTimeMsg; 消息体:延时5s;指定18:18:00消费消息

      @GetMapping("/sendMsg/{msg}")
      public String sendMsg(@PathVariable("msg")String msg) throws ParseException {
          SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
          Calendar calendar = Calendar.getInstance();
          calendar.setTime(simpleDateFormat.parse("2021-06-09 18:18:00"));
          long time = calendar.getTime().getTime();
    
          producerService.sendFixedTimeMsg(msg, "test_delay_tag", "delayMsg", time);
          return "SUCCESS";
      }
    

    我们看到消息 是在18:18:01的时候消费的,重复实验里几次,发现偶尔会有误差但是差距不大【1s以内】,这也是能接受的,需要注意的是,rocketMq定时参数可设置40天内的任何时刻(单位毫秒),超过40天消息发送将失败

    image.png

    image.png

三. application.yml 完整配置

spring:
  application:
    name: rocketmq-server
  cloud:
    stream:
      # 阿里rocketMq配置 topic 与 group 均以 实例id% 为前缀配置 如实例id为 MQ_INST_XXXX_XXX 则group或topic 配置 MQ_INST_XXXX_XXX%grouID
      rocketmq:
        binder:
          # 【若为阿里云购买服务,则为控制台的对外或对内实例地址】【若自己搭建的服务,为自定义rocketmq服务地址127.0.0.1:9876】
          name-server: http://MQ_INST_XXXX_XXX.mq-internet-access.mq-internet.aliyuncs.com:80
          # 阿里access-key 【购买阿里服务 控制台获取填写】
          access-key: LTAI4FwRvzLckUQ2xuFE4q6N
          # 阿里secret-key 【购买阿里服务 控制台获取填写】
          secret-key: 2RmSqPLLdE1lSOqBtjIrd21kGw0O12
          # 自定义轨迹信息存储TOPIC 默认为 RMQ_SYS_TRACE_TOPIC
          customized-trace-topic: rmq_sys_TRACE_DATA_cn-qingdao-publictest
        # rocketMq 自定义消息通道配置
        bindings:
          # 阿里rocketMq binder 生产者配置
          ### 延时消息生产 producer.sync 属性需设置为true
          delay_output_channel: {producer.group: MQ_INST_XXXX_XXX%GID_QIGUANBANG_DEV, producer.sync: true}
          ### 普通生产消息
          customized_output_channel: {producer.group: MQ_INST_XXXX_XXX%GID_QIGUANBANG_DEV}
          # 阿里rocketMq binder 消费者配置
          ### 延时消息订阅
          delay_input_channel: {consumer.tags: test_delay_tag}
          ### 普通消息订阅
          customized_input_channel: {consumer.tags: test_consumer_tag}
      bindings:
        # spring cloud stream binder 生产者配置
        ### 延时消息
        delay_output_channel: {destination: MQ_INST_XXXX_XXX%common-delay-topic, content-type: application/json}
        ### 普通消息
        customized_output_channel: {destination: MQ_INST_XXXX_XXX%mg_common_topic, content-type: application/json}

        # spring cloud stream binder 消费者配置
        ### 延时消息订阅
        delay_input_channel: {destination: MQ_INST_XXXX_XXX%common-delay-topic, group: MQ_INST_XXXX_XXX%GID_AQUARIUS_DELAY, content-type: application/json}
        ### 普通消息订阅
        customized_input_channel: {destination: MQ_INST_XXXX_XXX%mg_common_topic, group: MQ_INST_XXXX_XXX%GID_QIGUANBANG_DEV, content-type: application/json}
# 服务端口号
server:
  port: 8083

# slf4j日志配置
logging:
  level:
    root: info
    com.study: debug

四.spring-alibaba-cloud-rocketmq 详细配置选项

官方文档

五.MQ消费轨迹异常

关于阿里云控制台,消费消息轨迹显示未消费(或者其他),但确实已经消费了,可以升级rocketMq-client版本解决。之前我的版本是4.8.0,升级4.9.1后问题解决。


image.png
<!-- rocketMq -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-acl</artifactId>
            <version>4.9.1</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
            <!-- 排除自带rocketMq-client依赖【低版本消息无法发送成功】-->
            <exclusions>
                <exclusion>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-client</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-acl</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352

推荐阅读更多精彩内容