48. 从零开始学springboot: 接入阿里RocketMQ

前言

微服务的架构越来越流行, 很多老旧项目面临着解耦重构, 复杂项目的解耦通常会引入一些中间件来帮助我们更好的完成工作, 本章, 我们就来通过实例了解下消息中间件的用法.

市面上比较流行的消息中间件如下


image.png

因为鱼哥的项目上了阿里的云, 所以选择很简单, 就用RocketMQ即可, 看官们根据实际情况择优选择.

阿里RocketMQ

注意, 本文使用了4.0sdk,截止到文章发表, ali已推出5.0SDK
关于阿里的RocketMQ的介绍官方有详细的文档, 这里就不啰嗦了

https://help.aliyun.com/document_detail/29533.html

官方已SDK提供了简单的接入方式, 需要注意的是, 官方提供了两种协议的SDK

image.png
  • HTTP协议
    采用RESTful风格,方便易用,快速接入,跨网络能力强. 支持Java、C++、.NET、Go、Python、Node.js和PHP七种语言客户端

  • TCP协议
    区别于HTTP简单的接入方式,提供更为专业、可靠、稳定的TCP协议的SDK接入服务. 支持的语言包括Java、C/C++ 以及.NET

注意, 使用的话需要自费开通对应的服务, 通过后才能使用, 大部分管理功能都能在阿里云的控制台中找到.

实例

下面我们就springboot来实际使用下阿里RocketMQ.
为了较好的使用, 我们同时演示tcp何http两种方式

pom引入依赖

<!-- tcp sdk -->
<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>ons-client</artifactId>
    <version>${ali-mq-tcp.version}</version>
</dependency>
<!-- http sdk -->
<dependency>
    <groupId>com.aliyun.mq</groupId>
    <artifactId>mq-http-sdk</artifactId>
    <version>${ali-mq-http.version}</version>
    <classifier>jar-with-dependencies</classifier>
</dependency>

接入tcp方式

定义mq配置类

@Getter
@Setter
@Component
@ConfigurationProperties(prefix = "aliyun.rocketmq.tcp")
public class MqTcpProperties {

    //AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建
    private String accessKeyId;

    //AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建
    private String accessKeySecret;

    //实例TCP协议接入地址(内网)
    private String nameSrvAddr;

    //发送超时时间: 3s
    private String sendMsgTimeoutMillis = "3000";

    //线程数目:默认20
    private String consumeThreadNums = "20";

    //订阅方式: 集群
    private String messageModel = "CLUSTERING";


    public Properties getMqProperties() {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.AccessKey, this.accessKeyId);
        properties.setProperty(PropertyKeyConst.SecretKey, this.accessKeySecret);
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
        // 设置发送超时时间,单位毫秒
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.sendMsgTimeoutMillis);
        // 设置消费者线程数为20个(默认20)
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, this.consumeThreadNums);
        // 订阅方式设置
        properties.put(PropertyKeyConst.MessageModel, this.messageModel);
        return properties;
    }
}

定义Topic配置类

@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "aliyun.rocketmq.tcp")
public class MqTcpTopicProperties {
    private String topic;
    private String groupId;
    private String tag;
}

封装一个工具类

@Slf4j
public class MqTcpUtil {

    @Autowired
    private ProducerBean producer;

    @Autowired
    private MqTcpProperties mqTcpProperties;

    @Autowired
    private MqTcpTopicProperties mqTcpTopicProperties;

    private ConsumerBean consumerBean;

    /**
     * 同步发送消息 - 配置默认topic
     *
     * @param msgTag      标签,可用于消息小分类标注
     * @param messageBody 消息body内容,生产者自定义内容
     * @param msgKey      消息key值,建议设置全局唯一,可不传,不影响消息投递
     * @return success:SendResult or error:null
     */
    public SendResult sendMsg(String msgTag, String messageBody, String msgKey) {
        Message msg = new Message(mqTcpTopicProperties.getTopic(), msgTag, msgKey, messageBody.getBytes());
        return this.send(msg, Boolean.FALSE);
    }

    /**
     * 同步发送消息 - 配置默认topic - 重试次数
     *
     * @param msgTag      标签,可用于消息小分类标注
     * @param messageBody 消息body内容,生产者自定义内容
     * @param msgKey      消息key值,建议设置全局唯一,可不传,不影响消息投递
     * @param retryTimes  重试次数,注意实际请求次数为 retryTimes + 1
     * @return success:SendResult or error:null
     */
    public SendResult sendMsg(String msgTag, String messageBody, String msgKey, Integer retryTimes) {
        Message msg = new Message(mqTcpTopicProperties.getTopic(), msgTag, msgKey, messageBody.getBytes());
        SendResult result = this.send(msg, Boolean.FALSE);
        if (ObjectUtil.isNotEmpty(result) || retryTimes == 0) {
            return result;
        }
        return this.sendMsg(msgTag, messageBody, msgKey, --retryTimes);
    }

    /**
     * 同步发送消息
     *
     * @param topic       topic名
     * @param msgTag      标签,可用于消息小分类标注
     * @param messageBody 消息body内容,生产者自定义内容
     * @param msgKey      消息key值,建议设置全局唯一,可不传,不影响消息投递
     * @return success:SendResult or error:null
     */
    public SendResult sendMsg(String topic, String msgTag, String messageBody, String msgKey) {
        Message msg = new Message(topic, msgTag, msgKey, messageBody.getBytes());
        return this.send(msg, Boolean.FALSE);
    }

    /**
     * 同步发送单向消息
     *
     * @param topic       topic名
     * @param msgTag      标签,可用于消息小分类标注
     * @param messageBody 消息body内容,生产者自定义内容
     * @param msgKey      消息key值,建议设置全局唯一,可不传,不影响消息投递
     */
    public void sendOneWayMsg(String topic, String msgTag, String messageBody, String msgKey) {
        Message msg = new Message(topic, msgTag, msgKey, messageBody.getBytes());
        this.send(msg, Boolean.TRUE);
    }


    /**
     * 发送普通消息
     *
     * @param msg      消息
     * @param isOneWay 是否单向发送
     */
    private SendResult send(Message msg, Boolean isOneWay) {
        try {
            if (isOneWay) {
                //由于在 oneway 方式发送消息时没有请求应答处理,一旦出现消息发送失败,则会因为没有重试而导致数据丢失。
                //若数据不可丢,建议选用同步或异步发送方式。
                producer.sendOneway(msg);
                success(msg, "单向消息MsgId不返回");
                return null;
            } else {
                //可靠同步发送
                SendResult sendResult = producer.send(msg);
                //获取发送结果,不抛异常即发送成功
                assert sendResult != null;
                success(msg, sendResult.getMessageId());
                return sendResult;
            }
        } catch (Exception e) {
            error(msg, e);
            return null;
        }
    }

    /**
     * 成功日志打印
     *
     * @param msg
     * @param messageId
     */
    private void success(Message msg, String messageId) {
        log.info("发送MQ消息成功 -- Topic:{} ,msgId:{} , Key:{}, tag:{}, body:{}"
                , msg.getTopic(), messageId, msg.getKey(), msg.getTag(), new String(msg.getBody()));
    }

    /**
     * 异常日志打印
     *
     * @param msg
     * @param e
     */
    private void error(Message msg, Exception e) {
        log.error("发送MQ消息失败-- Topic:{}, Key:{}, tag:{}, body:{}"
                , msg.getTopic(), msg.getKey(), msg.getTag(), new String(msg.getBody()));
        log.error("errorMsg", e);
    }


    @PostConstruct
    public void init() {
        log.info("[Init] tcp consumerBean init");
        consumerBean = new ConsumerBean();
    }

    public ConsumerBean getConsumer() {
        return consumerBean;
    }

    public ConsumerBean getDefaultConsumer(MessageListener messageListener) {
        //配置文件
        Properties properties = mqTcpProperties.getMqProperties();
        //消费者
        properties.setProperty(PropertyKeyConst.GROUP_ID, mqTcpTopicProperties.getGroupId());
        //设置消费者线程数为20个(默认20)
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, mqTcpProperties.getConsumeThreadNums());
        // 广播订阅方式设置
        properties.put(PropertyKeyConst.MessageModel, mqTcpProperties.getMessageModel());
        consumerBean.setProperties(properties);

        //订阅消息
        Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
        //订阅消息
        Subscription smsSubscription = new Subscription();
        smsSubscription.setTopic(mqTcpTopicProperties.getTopic());
        smsSubscription.setExpression(mqTcpTopicProperties.getTag());
        subscriptionTable.put(smsSubscription, messageListener);
        consumerBean.setSubscriptionTable(subscriptionTable);
        return consumerBean;
    }
}

自动注入类

/**
 * 阿里云rocketMq服务自动配置类
 * 业务系统要使用阿里云rocketMq服务服务,需要在配置文件中增加如下配置
 * 【
 * aliyun.rocketmq.tcp.enable=true
 * aliyun.rocketmq.tcp.accessKeyId=
 * aliyun.rocketmq.tcp.accessKeySecret=
 * aliyun.rocketmq.tcp.nameSrvAddr=
 * aliyun.rocketmq.tcp.default.topic=
 * aliyun.rocketmq.tcp.default.groupId=
 * aliyun.rocketmq.tcp.default.tag=
 */
@Slf4j
@Configuration
@EnableConfigurationProperties({MqTcpProperties.class, MqTcpTopicProperties.class})
@ConditionalOnClass({ProducerBean.class, ConsumerBean.class})
@ConditionalOnProperty(prefix = "aliyun.rocketmq.tcp", value = "enable", havingValue = "true")
public class MqTcpDefaultAutoConfiguration {

    @Autowired
    private MqTcpProperties mqTcpProperties;

    @PostConstruct
    public void init() {
        log.info("[Auto Config] MqTcpDefaultAutoConfiguration loading......");
    }

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    @ConditionalOnMissingBean
    public ProducerBean buildProducer() {
        ProducerBean producer = new ProducerBean();
        producer.setProperties(mqTcpProperties.getMqProperties());
        return producer;
    }

    @Bean
    public MqTcpUtil mqTcpUtil() {
        return new MqTcpUtil();
    }

}

核心功能基本完成, 我们可以通过MqTcpUtil执行消息的发送了
新增个测试类

@Test
public void mqTcpTest() {
    // 自定义一条body内容
    JSONObject body = new JSONObject();
    UUID uuid = UUID.randomUUID();
    body.put("notice", "这是一条tcp通知类信息");
    //同步发送消息-不带返回值的(一般使用该方法)
    log.info(String.valueOf(mqTcpUtil.sendMsg("topic", "tag_dev", body.toJSONString(), String.valueOf(uuid))));
}

最后我们加上Ali RocketMq的配置即可

## AK
aliyun.rocketmq.tcp.enable=true
aliyun.rocketmq.tcp.accessKeyId=
aliyun.rocketmq.tcp.accessKeySecret=

## TCP
aliyun.rocketmq.tcp.nameSrvAddr=
aliyun.rocketmq.tcp.sendMsgTimeoutMillis=3000
aliyun.rocketmq.tcp.consumeThreadNums=20
aliyun.rocketmq.tcp.messageModel=CLUSTERING

## topic
aliyun.rocketmq.tcp.topic=
aliyun.rocketmq.tcp.groupId=
aliyun.rocketmq.tcp.tag=

消费者

以上完成了核心功能的接入与开发, 我们已经能正常发送MQ消息了, 通俗点讲就是生产者就绪了, 我们还需要消费者去消费这些消息.

合理的拆分是生产者和消费者分服务部署, 这里为了演示, 鱼哥就直接自产自销了.

定义消费者监听器MqMessageListener

@Slf4j
@Component
public class MqMessageListener implements MessageListener {

    @Override
    public Action consume(Message message, ConsumeContext context) {
        log.info("接收到MQ详细信息:{}", message);
        log.info("解析MQ-Body自定义内容:{}", new String(message.getBody()));
        try {
            //do something..
            return Action.CommitMessage;
        } catch (Exception e) {
            log.error("消费MQ消息失败,msgId:" + message.getMsgID() + ",ExceptionMsg:" + e.getMessage());
            return Action.ReconsumeLater;
        }
    }
}

申明消费者类, 这里我们定义两种消费者, 一种是默认配置的, 一种是主定义配置的,两者选一即可

@Component
public class MqConsumerClient {

//    @Autowired
//    private MqTcpProperties mqTcpProperties;

    @Autowired
    private MqMessageListener mqMessageListener;

//    @Autowired
//    private MqTcpTopicProperties mqTcpTopicProperties;

    @Autowired
    private MqTcpUtil mqTcpUtil;

//    //自定义消费者
//    @Bean(initMethod = "start", destroyMethod = "shutdown")
//    public ConsumerBean messageBuildConsumer() {
//        ConsumerBean consumerBean = mqTcpUtil.getConsumer();
//        //配置文件
//        Properties properties = mqTcpProperties.getMqProperties();
//        //消费者
//        properties.setProperty(PropertyKeyConst.GROUP_ID, mqTopicProperties.getGroupId());
//        //设置消费者线程数为20个(默认20)
//        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
//        consumerBean.setProperties(properties);
//        //订阅消息
//        Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
//        // 广播订阅方式设置
//        properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
//        //订阅消息
//        Subscription smsSubscription = new Subscription();
//        smsSubscription.setTopic(mqTopicProperties.getTopic());
//        smsSubscription.setExpression(mqTopicProperties.getTag());
//        subscriptionTable.put(smsSubscription, mqMessageListener);
//        consumerBean.setSubscriptionTable(subscriptionTable);
//        return consumerBean;
//    }

    // 默认消费者
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean defaultConsumer() {
        ConsumerBean consumerBean = mqTcpUtil.getDefaultConsumer(mqMessageListener);
        return consumerBean;
    }
}

最后启动服务, 即可看到消息消费的输出信息

image.png

image.png

接入http方式

定义mq配置类

@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "aliyun.rocketmq.http")
public class MqHttpProperties {

    //AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建
    private String accessKeyId;

    //AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建
    private String accessKeySecret;

    //实例TCP协议接入地址(内网)
    private String nameSrvAddr;

    //实例TCP协议接入地址(内网)
    private String sendMsgTimeoutMillis = "3000";

    //线程数目:默认20
    private String consumeThreadNums = "20";

    //线程数目:默认20
    private String messageModel = "CLUSTERING";

    public Properties getMqProperties() {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.AccessKey, this.accessKeyId);
        properties.setProperty(PropertyKeyConst.SecretKey, this.accessKeySecret);
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
        // 设置发送超时时间,单位毫秒
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, this.sendMsgTimeoutMillis);
        // 设置消费者线程数为20个(默认20)
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, this.consumeThreadNums);
        // 广播订阅方式设置
        properties.put(PropertyKeyConst.MessageModel, this.messageModel);
        return properties;
    }

定义Topic配置类

@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "aliyun.rocketmq.http")
public class MqHttpTopicProperties {
    private String topic;
    private String groupId;
    private String tag;
}

定义操作工具类

@Slf4j
public class MqHttpUtil {

    @Autowired
    private MqHttpProperties mqHttpProperties;

    @Autowired
    private MqHttpTopicProperties mqHttpTopicProperties;

    private MQProducer producer;

    private MQConsumer consumer;

    private MQClient mqClient;

    @PostConstruct
    public void init() {
        log.info("[Init] http producer init");
        mqClient = new MQClient(
                // 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
                mqHttpProperties.getNameSrvAddr(),
                // AccessKey ID,阿里云身份验证,在阿里云RAM控制台创建。
                mqHttpProperties.getAccessKeyId(),
                // AccessKey Secret,阿里云身份验证,在阿里云RAM控制台创建。
                mqHttpProperties.getAccessKeySecret()
        );
    }

    /**
     * 同步发送消息
     *
     * @param topic       topic名
     * @param msgTag      标签,可用于消息小分类标注
     * @param messageBody 消息body内容,生产者自定义内容
     * @param msgKey      消息key值,建议设置全局唯一,可不传,不影响消息投递
     * @return success:SendResult or error:null
     */
    public TopicMessage sendMsg(String topic, String msgTag, String messageBody, String msgKey) {
        producer = mqClient.getProducer(topic);
        TopicMessage msg = new TopicMessage(
                messageBody.getBytes(),
                msgTag);
        msg.setMessageId(msgKey);

        try {
            //可靠同步发送
            TopicMessage sendResult = producer.publishMessage(msg);
            //获取发送结果,不抛异常即发送成功
            assert sendResult != null;
            success(topic, msg);
            return sendResult;

        } catch (Exception e) {
            error(topic, msg, e);
            return null;
        }
    }

    public MQClient getMqClient() {
        return mqClient;
    }

    public MQConsumer getDefaultConsumer() {
        consumer = mqClient.getConsumer(mqHttpTopicProperties.getTopic(), mqHttpTopicProperties.getGroupId(), mqHttpTopicProperties.getTag());
        return consumer;
    }


    /**
     * 成功日志打印
     *
     * @param msg
     */
    private void success(String topic, TopicMessage msg) {
        log.info("发送MQ消息成功 -- Topic:{} ,msgId:{} , tag:{}, body:{}"
                , topic, msg.getMessageId(), msg.getMessageTag(), msg.getMessageBodyString());
    }

    /**
     * 异常日志打印
     *
     * @param msg
     * @param e
     */
    private void error(String topic, TopicMessage msg, Exception e) {
        log.error("发送MQ消息失败-- Topic:{}, msgId:{}, tag:{}, body:{}"
                , topic, msg.getMessageId(), msg.getMessageTag(), msg.getMessageBodyString());
        log.error("errorMsg", e);
    }

自动注入

/**
 * 阿里云rocketMq服务自动配置类
 * 业务系统要使用阿里云rocketMq服务服务,需要在配置文件中增加如下配置
 * 【
 * aliyun.rocketmq.http.enable=true
 * aliyun.rocketmq.http.accessKeyId=
 * aliyun.rocketmq.http.accessKeySecret=
 * aliyun.rocketmq.http.nameSrvAddr=
 * aliyun.rocketmq.http.default.topic=
 * aliyun.rocketmq.http.default.groupId=
 * aliyun.rocketmq.http.default.tag=
 */
@Slf4j
@Configuration
@ConditionalOnProperty(prefix = "aliyun.rocketmq.http", value = "enable", havingValue = "true")
public class MqHttpDefaultAutoConfiguration {

    @PostConstruct
    public void init() {
        log.info("[Auto Config] MqHttpDefaultAutoConfiguration loading......");
    }

    @Bean
    public MqHttpUtil mqHttpUtil() {
        return new MqHttpUtil();
    }

}

最后测试下

    @Test
    public void mqHttpTest() {
        // 自定义一条body内容
        JSONObject body = new JSONObject();
        UUID uuid = UUID.randomUUID();
        body.put("notice", "这是一条http通知类信息");
        //同步发送消息-不带返回值的(一般使用该方法)
        log.info(String.valueOf(mqHttpUtil.sendMsg("topic", "tag_dev", body.toJSONString(), String.valueOf(uuid))));
    }

最后增加配置

#***********************MQ-http*********************************
## AK
aliyun.rocketmq.http.enable=true
aliyun.rocketmq.http.accessKeyId=
aliyun.rocketmq.http.accessKeySecret=

## HTTP
aliyun.rocketmq.http.nameSrvAddr=
aliyun.rocketmq.http.sendMsgTimeoutMillis=3000
aliyun.rocketmq.http.consumeThreadNums=20
aliyun.rocketmq.http.messageModel=CLUSTERING

## topic
aliyun.rocketmq.http.topic=topic
aliyun.rocketmq.http.groupId=group_dev
aliyun.rocketmq.http.tag=tag_dev

运行测试


image.png

image.png

项目地址

注意: 案例需要配置你自己申请的阿里RocketMQ配置,否则无法启动.

https://github.com/MrCoderStack/SpringBootDemo/tree/master/sb-alibaba-rocketmq

请关注我的订阅号

订阅号.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352

推荐阅读更多精彩内容