Kafka系列《五》-- 消费者Consumer流程概览

背景

消费者consumer的一般使用场景示例:

properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
       
        // 创建kafka生产者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        ArrayList<String> topics = new ArrayList<>();
        topics.add("kafka-k8s-test");
        consumer.subscribe(topics);

        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            System.out.println("***********************************************");
            for (ConsumerRecord<String, String> record : consumerRecords) {
                System.out.println("***********************************************");
                System.out.println(record);
                System.out.println("***********************************************");
            }
        }

通过示例代码我们应该明确consumer的几点基础:

  • consumer都是通过组的形式来消费数据的,因此必须指定配置group.id

  • consumer必须得再循环中调用poll方法进行消费;因此必须以循环的思维来理解consumer的源码

整体流程

consumer的整理流程比producer的要复杂很多,与producer不同的是,它没有启动守护NIO线程一直中后台运行,而是单线程的;具体指的是它的subscribe方法、poll方法等都是要求单线程操作的;

但是它与broken之间的请求仍然是通过NIO实现的,因此理解了producer中的NIO处理流程,对理解consumer源码有很大的帮助

还是先从源码出发,看看consumer的整理流程

  • 首先与producer一样,初始化Metadata,并表明metadata需要更新,设置needFullUpdate = true

  • consumer中与broken交互的主要逻辑也是通过NIO实现的,虽然没有启动新的线程,但是NIO的处理逻辑都是一致的,再回顾下:

    • 检查metadata是否需要更新
    • 运行NIO的select方法,处理监听的事件
    • selector方法返回后(超时返回或者有事件返回),处理事件,可能是连接事件、可能是写数据事件、可能是读数据事件
  • 然后通过subscribe方法订阅感兴趣的topic,其实就是将topic名称缓存到集合中,只要是出现了新的topic,就会再次触发metadata的更新

  • 然后通过poll方法拉取topic中的数据,这个过程相对复杂,但是大致可以分为几个步骤,我们依次看下

  • 正如前面所说,consumer都是通过group的形式消费数据的,而一个group中可能包含多个consumer(只需要这些consumer启动时设置的group id一致,那么这些consumer就属于同一个group);因此就形成了多个consumer消费同一个topic的场景,随之而来的问题就是哪个consumer消费哪个partition呢?多个consumer之间可能是互相不感知的,因此只能由broken来决定了

  • 与producer中的事务性一样,consumer消费数据时,也是通过COORDINATOR来分配consumer和partition之间的对应关系的;而COORDINATOR也是通过FIND_COORDINATOR请求来确定的

Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=consumer-group1-1, correlationId=0, headerVersion=1) and timeout 30000 to node -1: FindCoordinatorRequestData(key='group1', keyType=0, coordinatorKeys=[])

  • 正常收到FIND_COORDINATOR响应后,会将响应的broken作为consumer的COORDINATOR

Received FindCoordinator response ClientResponse(receivedTimeMs=1734430992805, latencyMs=371, disconnected=false, timedOut=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=consumer-group1-1, correlationId=0, headerVersion=1), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=1, host='1.1.1.1', port=9092, coordinators=[]))

  • consumer确定了COORDINATOR后,那COORDINATOR怎么把消费方案同步给consumer呢?其实这时候COORDINATOR也是才初始化完成,还没有group相关的信息,也不知道group下有哪些成员,跟不知道要怎么分配消费方案;因此consumer首先需要申请加入到group,然后才能接收COORDINATOR的消费方案

  • 需要注意的是,如果consumer是自动提交offset,那么consumer在接收新的消费方案之前,会先提交自己的offset;因此下次分配到的partition可能就变了,必须先将自己已经消费的offset先提交;后续的流程会先等待offset提交完成,即等待offset提交成功或者遇到可重试失败时都会等待它先完成;遇到不可重试异常时,则直接跳过这次提交,等待下次poll数据时再次重新开始提交offset

  • consumer会维护自己在group中的状态,初始时的group状态是UNJOINED;即consumer还不是group的成员

  • consumer自动提交offset是异步触发的,然后就会开始申请加入group;开始申请加入group时,consumer会先将自己的group状态更新为PREPARING_REBALANCE,然后通过JOIN_GROUP请求申请加入group;

Sending JOIN_GROUP request with header RequestHeader(apiKey=JOIN_GROUP, apiVersion=5, clientId=consumer-group1-1, correlationId=5, headerVersion=1) and timeout 305000 to node 2147483646: JoinGroupRequestData(groupId='group1', sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, memberId='', groupInstanceId=null, protocolType='consumer', protocols=[JoinGroupRequestProtocol(name='range', metadata=[0, 3, 0, 0, 0, 1, 0, 14, 107, 97, 102, 107, 97, 45, 107, 56, 115, 45, 116, 101, 115, 116, -1, -1, -1, -1, 0, 0, 0, 0, -1, -1, -1, -1, -1, -1]), JoinGroupRequestProtocol(name='cooperative-sticky', metadata=[0, 3, 0, 0, 0, 1, 0, 14, 107, 97, 102, 107, 97, 45, 107, 56, 115, 45, 116, 101, 115, 116, 0, 0, 0, 4, -1, -1, -1, -1, 0, 0, 0, 0, -1, -1, -1, -1, -1, -1])], reason='')

  • 上面可以看到JOIN_GROUP请求中有一段metadata,其实这段metadata就是编码后的topic信息;consumer中partition的默认分配方案是rangecooperative-sticky两种,所以这里对应有两条metadata,其中metadata部分的内容包括:版本号(2字节即0, 3)、topic数量(4字节即0,0,0,1)、topic长度(2字节即0,14)、topic内容(14字节即107, 97, 102, 107, 97, 45, 107, 56, 115, 45, 116, 101, 115, 116)、自定义数据长度(4字节,不存在时为-1,即-1,-1,-1,-1)、已有的topic partition数量(4字节,初始化还没有任何topic partition,即0,0,0,0)、generation信息(4字节,初始为-1,即-1,-1,-1,-1)、rackId长度(2字节,默认为-1,即-1,-1);可以看到rangecooperative-sticky这两种分配方案中的metadata除了自定义数据不同之外,其它的都是完全一样的;不同的就是cooperative-sticky这种分配方案会把generation信息写入到自定义数据中,它是通过INT32写入的,因此长度是4字节(即0,0,0,4),写入的内容就是generation信息,由于初始为-1,所以写入的内容就是(-1,-1,-1,-1)

  • 正常收到JOIN_GROUP响应后,首次会遇到MEMBER_ID_REQUIRED异常,即每个consumer都需要分配一个member id;同时响应中会携带COORDINATOR分配给这个consumer的member id;consumer需要携带这个member id重新JOIN_GROUP

Received JOIN_GROUP response from node 2147483646 for request with header RequestHeader(apiKey=JOIN_GROUP, apiVersion=5, clientId=consumer-group1-1, correlationId=5, headerVersion=1): JoinGroupResponseData(throttleTimeMs=0, errorCode=79, generationId=-1, protocolType=null, protocolName='', leader='', skipAssignment=false, memberId='consumer-group1-1-6e0925f5-436f-4b2f-a269-89bf13213840', members=[])

  • 然后重新发送JOIN_GROUP请求,可以看到和第一个的JOIN_GROUP请求相比,就是多了member id

Sending JOIN_GROUP request with header RequestHeader(apiKey=JOIN_GROUP, apiVersion=5, clientId=consumer-group1-1, correlationId=8, headerVersion=1) and timeout 305000 to node 2147483646: JoinGroupRequestData(groupId='group1', sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, memberId='consumer-group1-1-6e0925f5-436f-4b2f-a269-89bf13213840', groupInstanceId=null, protocolType='consumer', protocols=[JoinGroupRequestProtocol(name='range', metadata=[0, 3, 0, 0, 0, 1, 0, 14, 107, 97, 102, 107, 97, 45, 107, 56, 115, 45, 116, 101, 115, 116, -1, -1, -1, -1, 0, 0, 0, 0, -1, -1, -1, -1, -1, -1]), JoinGroupRequestProtocol(name='cooperative-sticky', metadata=[0, 3, 0, 0, 0, 1, 0, 14, 107, 97, 102, 107, 97, 45, 107, 56, 115, 45, 116, 101, 115, 116, 0, 0, 0, 4, -1, -1, -1, -1, 0, 0, 0, 0, -1, -1, -1, -1, -1, -1])], reason='need to re-join with the given member-id: consumer-group1-1-6e0925f5-436f-4b2f-a269-89bf13213840')

  • 然后重新收到JOIN_GROUP响应,此时consumer会将自己的group状态更新为COMPLETING_REBALANCECOORDINATOR会在响应中指定group中的某个consumer作为这个group的leader,leader一般为COORDINATOR收到的第一个consumer的JOIN_GROUP,并由leader完成partition的实际分配;同时在响应中会给出使用哪种partition分配方案(range)、哪一个consumer作为这个group的leader、以及这个group中的所有member信息、以及generation信息(从1开始递增)

Received JOIN_GROUP response from node 2147483646 for request with header RequestHeader(apiKey=JOIN_GROUP, apiVersion=5, clientId=consumer-group1-1, correlationId=8, headerVersion=1): JoinGroupResponseData(throttleTimeMs=0, errorCode=0, generationId=1, protocolType=null, protocolName='range', leader='consumer-group1-1-6e0925f5-436f-4b2f-a269-89bf13213840', skipAssignment=false, memberId='consumer-group1-1-6e0925f5-436f-4b2f-a269-89bf13213840', members=[JoinGroupResponseMember(memberId='consumer-group1-1-6e0925f5-436f-4b2f-a269-89bf13213840', groupInstanceId=null, metadata=[0, 3, 0, 0, 0, 1, 0, 14, 107, 97, 102, 107, 97, 45, 107, 56, 115, 45, 116, 101, 115, 116, -1, -1, -1, -1, 0, 0, 0, 0, -1, -1, -1, -1, -1, -1])])

  • 作为leader的consumer,首先需要制定partition分配方案,制定partition分配方案需要考虑整个group中所有consumer订阅的所有topic,这也就意味着leader需要有所有这些topic的metadata信息,比如group中有member订阅了其它的topic,leader分配partition之前也需要把所有的topic信息都刷新到本地;这里是通过range方法来分配partition,具体分配方式参考系类文章:Kafka系列《六》-- 消费者Consumer中的消费方案分配算法解析
  • leader完成分配方案后,就明确了每个member的消费方案了,即每个member消费哪些partition明确了;需要注意的是,一个partition是不能被多个member同时消费的;分配完成后,得到一个map信息Map<String, List<TopicPartition>>,即每个member对应的topic partition信息;比如group下只有一个member时,它的消费方案如下;因为只有一个member,那么这个topic的所有partition都由它消费

{consumer-group1-1-6e0925f5-436f-4b2f-a269-89bf13213840=Assignment(partitions=[kafka-k8s-test-0, kafka-k8s-test-1, kafka-k8s-test-2])}

  • leader制定完成partition消费方案后,再按照上面相同的编码方式,将消费方案通过SYNC_GROUP请求同步到COORDINATOR

Sending SYNC_GROUP request with header RequestHeader(apiKey=SYNC_GROUP, apiVersion=3, clientId=consumer-group1-1, correlationId=9, headerVersion=1) and timeout 30000 to node 2147483646: SyncGroupRequestData(groupId='group1', generationId=1, memberId='consumer-group1-1-6e0925f5-436f-4b2f-a269-89bf13213840', groupInstanceId=null, protocolType='consumer', protocolName='range', assignments=[SyncGroupRequestAssignment(memberId='consumer-group1-1-6e0925f5-436f-4b2f-a269-89bf13213840', assignment=[0, 3, 0, 0, 0, 1, 0, 14, 107, 97, 102, 107, 97, 45, 107, 56, 115, 45, 116, 101, 115, 116, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, -1, -1, -1, -1])])

  • 正常收到SYNC_GROUP响应中会包含刚刚leader发送过去的partition分配方案,因为新的member加入group的时候也会发送一个空的SYNC_GROUP请求,普通member也就能通过SYNC_GROUP请求获取到leader制定的分配方案了;正常收到响应后consumer再将自己的group状态更新为STABLE,意味着member已经正式成为group的一员了

Received SYNC_GROUP response from node 2147483646 for request with header RequestHeader(apiKey=SYNC_GROUP, apiVersion=3, clientId=consumer-group1-1, correlationId=9, headerVersion=1): SyncGroupResponseData(throttleTimeMs=0, errorCode=0, protocolType=null, protocolName=null, assignment=[0, 3, 0, 0, 0, 1, 0, 14, 107, 97, 102, 107, 97, 45, 107, 56, 115, 45, 116, 101, 115, 116, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, -1, -1, -1, -1])

  • consumer正式加入到group后,首先会将generation缓存到本地,generation id是通过JOIN_GROUP请求由COORDINATOR分配的,在SYNC_GROUP成功加入到group后才算正式确定了generation id;然后最重要的是将分配给自己的topic partition缓存在本地,并通过一个map记录每个topic partition的订阅状态Map<TopicPartition, TopicPartitionState> assignedPartitionStates,这个状态数据对于后面的消费过程至关重要;partition的初始订阅状态是INITIALIZING

  • consume成功加入到group后,成功拿到了消费方案,就可以开始消费数据了;但在此之前,还有一个问题没有明确,应该从哪个offset开始消费呢?因此需要先确定开始消费的offset;你可能会想不都是从offset=0开始消费吗?其实是不行的,回想一个consumer的示例代码中,一般都是在一个循环里不断的poll数据,不能每次poll数据都从offset=0开始吧!而是应该从上一次poll之后的offset开始;所以每次poll的时候都需要明确应该从哪个offset开始

  • consumer通过向COORDINATOR发送OFFSET_FETCH请求来明确每次消费开始的offset,可以看到这个请求中是没有member id的,也就是COORDINATOR是不关心谁来消费这个topic partition,只是告诉你之前的consumer在这个topic partition上已经消费到了哪个offset

Sending OFFSET_FETCH request with header RequestHeader(apiKey=OFFSET_FETCH, apiVersion=5, clientId=consumer-group1-1, correlationId=10, headerVersion=1) and timeout 30000 to node 2147483646: OffsetFetchRequestData(groupId='group1', topics=[OffsetFetchRequestTopic(name='kafka-k8s-test', partitionIndexes=[2, 0, 1])], groups=[], requireStable=false)

  • 正常情况下收到OFFSET_FETCH响应中会包含每个topic partition之前已经提交的offset,如果没有提交过offset,那么offset就是-1

Received OFFSET_FETCH response from node 2147483646 for request with header RequestHeader(apiKey=OFFSET_FETCH, apiVersion=5, clientId=consumer-group1-1, correlationId=10, headerVersion=1): OffsetFetchResponseData(throttleTimeMs=0, topics=[OffsetFetchResponseTopic(name='kafka-k8s-test', partitions=[OffsetFetchResponsePartition(partitionIndex=0, committedOffset=-1, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=1, committedOffset=-1, committedLeaderEpoch=-1, metadata='', errorCode=0), OffsetFetchResponsePartition(partitionIndex=2, committedOffset=-1, committedLeaderEpoch=-1, metadata='', errorCode=0)])], errorCode=0, groups=[])

  • 对于提交过offset的topic partition,则会直接将这个topic partition的订阅状态更新为FETCHING,并将最后提交的offset也更新到状态数据里

  • 而对于没有提交过offset的topic partition,则需要重置offset;会先将topic partition的订阅状态更新为AWAIT_RESET,然后再向broken获取一个offset;因为是重置offset,因此只需要向各个topic partition的leader partition所在broken发送请求就可以了,而不需要向COORDINATOR发送请求;由于consumer分配得到的多个topic partition的leader partition可能处于不同的broken,因此consumer会先将分配给自己的topic partition按照broken分组,这样每个broken就只需要请求一次就可以了;

  • 同时重置offset时,有两种可选策略,一是从最开始的offset重置、二是从最新的offset重置,默认是从最新的offset开始重置;比如kafka-k8s-test-1这个topic partition的leader partition所在broken id 是2,就会向这个broken发送LIST_OFFSETS请求来重置offset,其中timestamp=-1表示从最新的offset重置;如果timestamp=-2表示从最开始的offset重置

Sending LIST_OFFSETS request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=5, clientId=consumer-group1-1, correlationId=11, headerVersion=1) and timeout 30000 to node 2: ListOffsetsRequestData(replicaId=-1, isolationLevel=0, topics=[ListOffsetsTopic(name='kafka-k8s-test', partitions=[ListOffsetsPartition(partitionIndex=1, currentLeaderEpoch=-1, timestamp=-2, maxNumOffsets=1)])])

  • 正常情况下收到LIST_OFFSETS中会包含重置后的offset;然后再将这个topic partition的状态订阅更新为FETCHING,并将获取到的offset也更新到状态数据里

Received LIST_OFFSETS response from node 2 for request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=5, clientId=consumer-group1-1, correlationId=11, headerVersion=1): ListOffsetsResponseData(throttleTimeMs=0, topics=[ListOffsetsTopicResponse(name='kafka-k8s-test', partitions=[ListOffsetsPartitionResponse(partitionIndex=1, errorCode=0, oldStyleOffsets=[], timestamp=-1, offset=53, leaderEpoch=3)])])

  • consumer明确了offset之后,订阅状态也都更新为了FETCHING;然后才能开始拉取数据,携带刚刚得到的offset发送FETCH请求到partition的leader节点

Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-group1-1, correlationId=20, headerVersion=1) and timeout 30000 to node 2: FetchRequestData(clusterId=null, replicaId=-1, replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=0, sessionEpoch=0, topics=[FetchTopic(topic='kafka-k8s-test', topicId=AAAAAAAAAAAAAAAAAAAAAA, partitions=[FetchPartition(partition=1, currentLeaderEpoch=-1, fetchOffset=53, lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576, replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], forgottenTopicsData=[], rackId='')

正常收到FETCH响应后,拉取到的数据记录在records
Received FETCH response from node 2 for request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-group1-1, correlationId=20, headerVersion=1): FetchResponseData(throttleTimeMs=0, errorCode=0, sessionId=1874419746, responses=[FetchableTopicResponse(topic='kafka-k8s-test', topicId=AAAAAAAAAAAAAAAAAAAAAA, partitions=[PartitionData(partitionIndex=1, errorCode=0, highWatermark=53, lastStableOffset=53, logStartOffset=53, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, preferredReadReplica=-1, records=MemoryRecords(size=0, buffer=java.nio.HeapByteBuffer[pos=0 lim=0 cap=0]))])], nodeEndpoints=[])

  • 拉取到数据后,consumer再通过一个队列来接收数据ConcurrentLinkedQueue<CompletedFetch> completedFetches

  • 最后consumer就可以从队列中正常读取到数据了

整个consumer的流程虽然上面已经做了很多精简,但是流程仍然比较复杂;

理清楚整个流程对于理解consumer的源码有很大帮助,最重要的一点是带着循环的思维来理解,可以看到我们poll的时间间隔是很短的,在这个时间间隔里consumer能干多少事就干多少事,干不完的就等待下一次循环继续干

你可能会想,直接设置一个很大的poll时间间隔呢?其实是没必要的,因为从consumer的代码来看,它的流程虽然很长,但是它的代码基本上都是先判断然后再确定是否需要处理这一步,设置一个很大的poll时间间隔,它会在时间间隔超时前会尽可能多的帮你拉取更多数据。

最后在总结一下整个流程:

  • 确定COORDINATOR,group的信息由它管理

  • 如果是自动提交offset,则在加入到group之前需要先提交offset

  • 申请加入group

  • 如果是作为leader加入到了group,则还需要制定partition分配方案,然后将分配方案同步到COORDINATOR

  • 对于普通member来说,直接从group同步partition分配方案

  • 明确partition分配方案后,还需要明确从哪个offset开始消费

  • 对于提交过offset的topic partition来说,直接从提交后面的offset开始消费就可以了

  • 对于没有提交过offset的topic partition的,则需要请求开始消费的offset,默认是从最新的offset开始

  • 最后才能开始正常消费

Consumer的更多内容:

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容