Kafka系列《八》-- 消费者Consumer中的消费过程解析

背景

系列文章

相信你已经对consumer的整理流程和group的形成已经有了基本的认识了,通过重平衡consumer拿到了自己应该消费的topic partition信息,并且缓存在自己的ownedPartitions中,那缓存的是什么内容呢,其实缓存的就是每个topic partition和其状态的映射关系

public synchronized void assignFromSubscribed(Collection<TopicPartition> assignments) {
      
        Map<TopicPartition, TopicPartitionState> assignedPartitionStates = new HashMap<>(assignments.size());
        for (TopicPartition tp : assignments) {
            TopicPartitionState state = this.assignment.stateValue(tp);
            if (state == null)
                state = new TopicPartitionState();
            assignedPartitionStates.put(tp, state);
        }

        assignmentId++;
        this.assignment.set(assignedPartitionStates);
    }

我们暂时先将TopicPartitionState称为订阅状态吧,它记录了offset相关的信息和状态,初始状态都是INITIALIZING

TopicPartitionState() {
            this.paused = false;
            this.pendingRevocation = false;
            this.pendingOnAssignedCallback = false;
            this.endOffsetRequested = false;
            this.fetchState = FetchStates.INITIALIZING;
            this.position = null;
            this.highWatermark = null;
            this.logStartOffset = null;
            this.lastStableOffset = null;
            this.resetStrategy = null;
            this.nextRetryTimeMs = null;
            this.preferredReadReplica = null;
        }

consumer想要开始消费,再明确了topic partition后,首先应该明确的就是从哪个offset开始消费了;

这篇文章我们就看看consumer怎么开始消费的

获取offset

consumer怎么明确从哪个offset开始消费呢?其实很简单,直接从这个topic partition上提交的最后offset开始消费就可以了;这也是在重平衡之前为什么需要等待自动提交offset完成后才开始的原因

获取topic partition上的提交offset只需要向COORDINATOR发送OFFSET_FETCH请求即可,在思考一个问题,是不是每次poll都需要重新获取offset呢?其实我们可以想一想,肯定是不需要的,只有首次consumer不知道从哪个offset开始消费的时候才需要获取,后面的poll就不需要重新获取了,因为每次poll消费了多少条数据我是直到的,直接进行累加就能得到下次poll的offset了;

基于这个背景,所以才有了上述缓存中订阅状态,我们只需要为状态是INITIALIZING的那些topic partition重新获取offset就可以了

private Set<TopicPartition> collectPartitions() {
        Set<TopicPartition> result = new HashSet<>();
        assignment.forEach((topicPartition, topicPartitionState) -> {
            if (fetchState.equals(FetchStates.INITIALIZING)) {
                result.add(topicPartition);
            }
        });
        return result;
    }

首先筛选出状态是INITIALIZING的这些topic partition,然后将这批topic partition通过OFFSET_FETCH请求发送到COORDINATOR;由于consumer开始消费必须先等待这个请求返回,而这个请求又是异步发送的,为了避免重复发送请求,这里也采用了和自动提交offset一样的代码流程

final RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future;
            if (pendingCommittedOffsetRequest != null) {
                future = pendingCommittedOffsetRequest.response;
            } else {
                future = sendOffsetFetchRequest(partitions);
                pendingCommittedOffsetRequest = new PendingCommittedOffsetRequest(partitions, generationForOffsetRequest, future);
            }

通过一个变量pendingCommittedOffsetRequest来记录已经发送出去的OFFSET_FETCH请求,下次poll的时候先检测变量是否已经设置;这一点和自动提交offset中的needsJoinPrepare变量有异曲同工之处

对于提交过offset的topic partition,OFFSET_FETCH请求会直接返回对应的offset;而对于没有提交过offset的topic partition,则返回的offset为-1;

重置offset

对于提交过offset的topic partition,OFFSET_FETCH请求会直接返回对应的offset;因此这部分的topic partition是可以直接开始消费了的

public static void refreshCommittedOffsets(final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata,
                                               final ConsumerMetadata metadata,
                                               final SubscriptionState subscriptions) {
        for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsetsAndMetadata.entrySet()) {
            final TopicPartition tp = entry.getKey();
            final OffsetAndMetadata offsetAndMetadata = entry.getValue();
            if (offsetAndMetadata != null) {
                // first update the epoch if necessary
                entry.getValue().leaderEpoch().ifPresent(epoch -> metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch));

                // it's possible that the partition is no longer assigned when the response is received,
                // so we need to ignore seeking if that's the case
                if (subscriptions.isAssigned(tp)) {
                    final ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(tp);
                    final SubscriptionState.FetchPosition position = new SubscriptionState.FetchPosition(
                            offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch(),
                            leaderAndEpoch);

                    subscriptions.seekUnvalidated(FetchStates.FETCHING, position);

                    log.info("Setting offset for partition {} to the committed offset {}", tp, position);
                }
            }
        }
    }

因此直接将这部分topic partition的订阅状态更新为FETCHING,同时把OFFSET_FETCH请求返回的offset也设置到TopicPartitionStateposition字段中,表示这部分topic partition可以直接开始消费了

而对于没有提交过offset的topic partition,则返回的offset为-1;这部分topic partition是需要重置offset的

public synchronized void resetInitializingPositions() {
        final Set<TopicPartition> partitionsWithNoOffsets = new HashSet<>();
        assignment.forEach((tp, partitionState) -> {
            if (fetchState.equals(FetchStates.INITIALIZING)) {
                 requestOffsetReset(FetchStates.AWAIT_RESET);
        });

    }

对于存在offset的那些topic partition,其订阅状态已经更新为了FETCHING;仍然处于INITIALIZING状态的这批topic partition就是需要重置offset的了,因此先把他们的订阅状态改成AWAIT_RESET;避免下次poll时重复发送OFFSET_FETCH请求

对于需要重置offset的topic partition,有两种可选的重置策略:

  • 从最开始EARLIEST的offset重置,使用-2标识

  • 从最新LATEST的offset重置,使用-1标识

重置offset通过LIST_OFFSETS请求完成,这个请求需要直接发给各个topic partition的leader所在broken节点,而不是发送给COORDINATOR;而不同topic partition的leader所在broken可能是不同的,为了避免反复往一个broken发送请求,会先把这批topic partition按照leader所在broken节点分类,一次请求携带leader在这个broken节点的所有topic partition;这样就只需要往每个broken节点发送一次请求就可以了

private Map<Node, Map<TopicPartition, ListOffsetsPartition>> groupListOffsetRequests(
            Map<TopicPartition, Long> timestampsToSearch,
            Set<TopicPartition> partitionsToRetry) {
        final Map<TopicPartition, ListOffsetsPartition> partitionDataMap = new HashMap<>();
        for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
            TopicPartition tp = entry.getKey();
            Long offset = entry.getValue();
            Metadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(tp);

                    int currentLeaderEpoch = leaderAndEpoch.epoch.orElse(ListOffsetsResponse.UNKNOWN_EPOCH);
                    partitionDataMap.put(tp, new ListOffsetsPartition()
                            .setPartitionIndex(tp.partition())
                            .setTimestamp(offset)
                            .setCurrentLeaderEpoch(currentLeaderEpoch));
                }
            }
        }
        return partitionMap.entrySet()
                .stream()
                .collect(Collectors.groupingBy(entry -> metadata.fetch().leaderFor(entry.getKey()),
                        Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
    }

然后再按照broken依次发送LIST_OFFSETS请求,发送前会先更新订阅状态的nextRetryTimeMs字段为30s(request timeout)后,这样就能避免下次poll的时候重复发送LIST_OFFSETS请求了,因为发送LIST_OFFSETS请求是会判断fetchState.equals(FetchStates.AWAIT_RESET) && nextRetryTimeMs != null && nowMs < nextRetryTimeMs,即重试时间达到前不会重复发送

private void resetPositionsAsync(Map<TopicPartition, Long> partitionResetTimestamps) {
        Map<Node, Map<TopicPartition, ListOffsetsPartition>> timestampsToSearchByNode =
                groupListOffsetRequests(partitionResetTimestamps, new HashSet<>());
        for (Map.Entry<Node, Map<TopicPartition, ListOffsetsPartition>> entry : timestampsToSearchByNode.entrySet()) {
            Node node = entry.getKey();
            final Map<TopicPartition, ListOffsetsPartition> resetTimestamps = entry.getValue();
            subscriptions.setNextAllowedRetry(resetTimestamps.keySet(), time.milliseconds() + requestTimeoutMs);

            RequestFuture<ListOffsetResult> future = sendListOffsetRequest(node, resetTimestamps, false);
        }
    }

LIST_OFFSETS请求成功返回后,再将这批topic partition的订阅状态更新为FETCHING,同时将LIST_OFFSETS请求返回的offset也设置到TopicPartitionState的position字段中,表示这部分topic partition可以也开始消费了

至此,不管有没有提交过offset的topic partition的订阅状态都更新为了FETCHING,并且都有了一个offset,都可以开始消费了;

拉取数据

前面的准备工作完成后,就可以开始拉取数据了,topic partition拉取数据时的订阅状态必须是FETCHING

public synchronized List<TopicPartition> fetchablePartitions(Predicate<TopicPartition> isAvailable) {
        // Since this is in the hot-path for fetching, we do this instead of using java.util.stream API
        List<TopicPartition> result = new ArrayList<>();
        assignment.forEach((topicPartition, topicPartitionState) -> {
            // Cheap check is first to avoid evaluating the predicate if possible
            if ((!paused && !pendingRevocation && !pendingOnAssignedCallback && fetchState.equals(FetchStates.FETCHING)) {
                result.add(topicPartition);
            }
        });
        return result;
    }

拉取数据是通过FETCH请求完成的,与LIST_OFFSETS请求一样,FETCH请求也是需要直接发给各个topic partition的leader所在broken节点,而不是发送给COORDINATOR;因此首先也是先按照broken对topic partition进行分类

protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() {
        Map<Node, FetchSessionHandler.Builder> fetchable = new HashMap<>();

        for (TopicPartition partition : fetchablePartitions()) {
            SubscriptionState.FetchPosition position = subscriptions.position(partition);

            Optional<Node> leaderOpt = position.currentLeader.leader;
            
                // if there is a leader and no in-flight requests, issue a new fetch
                FetchSessionHandler.Builder builder = fetchable.computeIfAbsent(node, k -> {
                    FetchSessionHandler fetchSessionHandler = sessionHandlers.computeIfAbsent(node.id(), n -> new FetchSessionHandler(logContext, n));
                    return fetchSessionHandler.newBuilder();
                });
                Uuid topicId = topicIds.getOrDefault(partition.topic(), Uuid.ZERO_UUID);
                FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId,
                        position.offset,
                        FetchRequest.INVALID_LOG_START_OFFSET,
                        fetchConfig.fetchSize,
                        position.currentLeader.epoch,
                        Optional.empty());
                builder.add(partition, partitionData);

            }
        }

        return fetchable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build()));
    }

然后按照broken依次发送FETCH请求;

private List<RequestFuture<ClientResponse>> sendFetchesInternal(Map<Node, FetchSessionHandler.FetchRequestData> fetchRequests,
                                                                    ResponseHandler<ClientResponse> successHandler,
                                                                    ResponseHandler<Throwable> errorHandler) {
        final List<RequestFuture<ClientResponse>> requestFutures = new ArrayList<>();

        for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequests.entrySet()) {
            final Node fetchTarget = entry.getKey();
            final FetchSessionHandler.FetchRequestData data = entry.getValue();
            final FetchRequest.Builder request = createFetchRequest(fetchTarget, data);
            final RequestFuture<ClientResponse> responseFuture = client.send(fetchTarget, request);

        return requestFutures;
    }

FETCH请求发送之前会先将发送的broken信息记录在集合里,在这个broken的FETCH请求完成之前,不会再往这个broken发送FETCH请求了,这就避免了下次poll的时候重复发送FETCH请求

Set<Integer> nodesWithPendingFetchRequests;

nodesWithPendingFetchRequests.add(fetchTarget.id());

等待FETCH请求响应后,会将响应结果缓存在fetchBuffer里,包装的对象CompletedFetch包含了拉取数据开始的offset以及拉取到的消息记录;
然后再将nodesWithPendingFetchRequests集合中的broken信息移除,以便下次能够再次拉取数据

private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;

CompletedFetch completedFetch = new CompletedFetch(
                        logContext,
                        subscriptions,
                        decompressionBufferSupplier,
                        partition,
                        partitionData,
                        metricAggregator,
                        fetchOffset,
                        requestVersion);
                completedFetches.add(completedFetch);

消费offset

消费数据就是从fetchBuffer中拉取数据进行消费,消费数据时每次最多返回500条消息,因此会循环从fetchBuffer中拉取数据,直到达到500条消息或者没有更多消息了;fetchBuffer中的数据是通过一个队列维护的,因此队首为空了就表示没有更多消息了;

每次从fetchBuffer中拉取数据时,都是处理队首元素,处理完成之后队首元素出队,然后继续处理队列的下一个元素;

队列中的每个元素都会经过几个处理步骤:

  • 初始化

  • 批次迭代

  • 批次处理

初始化

初始化其实就是处理订阅状态TopicPartitionState中的几种offset;FETCH请求的响应中会携带一些offset信息,比如highWatermark=53, lastStableOffset=53, logStartOffset=53,这里的初始化就是把这些offset信息设置到订阅状态TopicPartitionState中的相关字段中去;

我们先需要理解下这些offset的含义:

  • highWatermark:consumer能够消费到的最大offset,仅当producer写入的消息被ISR同步后才会更新highWatermark;

  • lastStableOffset:consumer事务读时能够消费到的最大offset,即consumer开始事务读时isolation.level=read_committed最大能够消费到这个offset,这个offset可能会小于highWatermark,lastStableOffset和highWatermark之间的消息是还没提交的事务消息

  • logStartOffset:起始offset,重置offset时如果是从最开始EARLIEST的offset重置,就是重置到这个offset

  • logEndOffset:消息下一次写入的offset,logEndOffset可能大于highWatermark,这之间数据就是没有被ISR同步的消息;重置offset时如果是从最新LATEST的offset重置,就是重置到这个offset

批次迭代

初始化完成后,就开始处理数据了

FETCH请求返回的数据records=MemoryRecords(size=0, buffer=java.nio.HeapByteBuffer[pos=0 lim=0 cap=0]))会包装到一个迭代器中

@Override
    public AbstractIterator<MutableRecordBatch> batchIterator() {
        return new RecordBatchIterator<>(new ByteBufferLogInputStream(buffer.duplicate(), Integer.MAX_VALUE));
    }

为什么需要一个迭代器呢?这就需要回过头来看看producer系列的文章Kafka系列《一》-- 生产者Producer流程及Partition详解

producer发送数据的时候就是通过批次的形式发送的,每个批次可能包含多条消息记录;broken在返回数据给consumer时会按照FETCH请求中指定的参数maxWaitMs=500, minBytes=1, maxBytes=52428800进行返回,即最多可以返回52428800字节或者是最大等待500ms;因此broken返回的数据可能是包括多个批次的;因此这里consumer通过一个迭代器来进行处理,迭代的单个元素就是每个批次

在producer发送每一批次时,还记得61字节的固定头部嘛,主要包括:baseOffset(8字节,默认0)、消息大小(4字节)、leader epoch (4字节)、Magic版本(1字节)、CRC容错(4字节)等等;因此这里直接从position=8开始往后读4个字节就是这个批次的大小了,然后在加上8字节的baseOffset和4字节的Size就是整个批次的完整大小了

Integer nextBatchSize() throws CorruptRecordException {
        int remaining = buffer.remaining();
        if (remaining < LOG_OVERHEAD)
            return null;
        int recordSize = buffer.getInt(8);
        
        return recordSize + 12;
    }

有了批次的大小后,就可以从响应中截取这部分大小的数据作为一个完整的批次了,也就是迭代器中next方法返回的批次了;如果迭代器中没有更多批次了,那么迭代器的hasNext方法就返回false了,那么就是处理完了这个请求的所有批次了


public MutableRecordBatch nextBatch() {
        int remaining = buffer.remaining();

        Integer batchSize = nextBatchSize();
        if (batchSize == null || remaining < batchSize)
            return null;

        ByteBuffer batchSlice = buffer.slice();
        batchSlice.limit(batchSize);
        buffer.position(buffer.position() + batchSize);
        return new DefaultRecordBatch(batchSlice);
        
    }

批次处理

通过迭代器得到一个批次的数据后,会首先进行CRC校验,因为producer在发送这个批次的时候在固定头部写入了一个CRC,这里consumer在用相同的方式计算一个CRC出来,对比两个CRC是否一致来确定数据是否被盗用更改

CRC校验通过后,再把这个批次的数据包装成一个新的迭代器,因为一个批次里可能包含多条消息,通过迭代器来迭代批次里的每条消息;这里需要和上面的迭代器区别开,这里迭代的是单个批次里的每条消息,上面迭代的是每个响应里的多个批次;

迭代每条消息的时候,直接从position=61开始,因为前面61个字节固定的批次头部

private CloseableIterator<Record> uncompressedIterator() {
        final ByteBuffer buffer = this.buffer.duplicate();
        buffer.position(61);
        return new RecordIterator() {
            @Override
            protected Record readNext(long baseOffset, long baseTimestamp, int baseSequence, Long logAppendTime) {
                try {
                    return DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, logAppendTime);
                }
        };
    }

从批次中读出每一条消息,依赖的就是从61开始,producer写入的是每条消息的大小,只需要先读出消息的大小,然后就能按照大小来读出每一条消息了;可以看到这里的流程就是首次循环先拿到一个批次保存在records里面,然后再次循环时从批次里获取一条消息后返回(这里的controlBatch是broken端的一种特殊批次,用来控制producer事务的,对于用户通过producer发送的消息都是非controlBatch的)

private Record nextFetchedRecord(FetchConfig fetchConfig) {
        while (true) {
            if (records == null || !records.hasNext()) {

                currentBatch = batches.next();
                maybeEnsureValid(fetchConfig, currentBatch);


                records = currentBatch.streamingIterator(decompressionBufferSupplier);
            } else {
                Record record = records.next();

                if (record.offset() >= nextFetchOffset) {
                    // control records are not returned to the user
                    if (!currentBatch.isControlBatch()) {
                        return record;
                    } else {
                        // Increment the next fetch offset when we skip a control batch.
                        nextFetchOffset = record.offset() + 1;
                    }
                }
            }
        }
    }

上面读出一条消息后就能够正常解析出消息里的key和value了,将其保存在一个数组里,同时记录下消息条数recordsRead、消息大小bytesRead等信息;直达读到默认的最多500条消息或者没有更多消息可读了才把消息返回给用户

List<ConsumerRecord<K, V>> records = new ArrayList<>();

for (int i = 0; i < 500; i++) {
                // Only move to next record if there was no exception in the last fetch. Otherwise, we should
                // use the last record to do deserialization again.
                if (cachedRecordException == null) {
                    corruptLastRecord = true;
                    lastRecord = nextFetchedRecord(fetchConfig);
                    corruptLastRecord = false;
                }

                if (lastRecord == null)
                    break;

                ConsumerRecord<K, V> record = parseRecord(deserializers, partition, leaderEpoch, timestampType, lastRecord);
                records.add(record);
                recordsRead++;
                bytesRead += lastRecord.sizeInBytes();
                nextFetchOffset = lastRecord.offset() + 1;
                // In some cases, the deserialization may have thrown an exception and the retry may succeed,
                // we allow user to move forward in this case.
                cachedRecordException = null;
            }

在返回消息给用户之前,还会更新一次订阅状态的position字段,即当前读到了哪个offset了;这样下次再读取这个topic partition时,信息才能够对的上,才能够正常开始消费数据

if (nextInLineFetch.nextFetchOffset() == position.offset) {
                List<ConsumerRecord<K, V>> partRecords = nextInLineFetch.fetchRecords(fetchConfig,
                        deserializers,
                        maxRecords);
}

总的来说,这里的的流程可以归结如下:

  • 先从FETCH响应中读取一个批次,如果这个批次没有500条消息,那么继续读取下一个批次

  • 如果响应中的所有批次都处理完了,依然没有达到500条消息,先把这个响应标识为已消费状态;然后从fetchBuffer队列中处理下一个FETCH响应数据,直达达到500条消息或者fetchBuffer队列中没有更多响应了

  • 如果处理响应中的某个批次达到了500条消息,那么直接返回这500条消息,同时更新订阅状态里的position

  • 下次poll的时候由于之前的响应还没标记为已消费状态,因此继续处理这个响应,因为之前已经更新过了订阅状态里的position,所以position和offset依然能够匹配,继续正常消费这个响应

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,192评论 6 511
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,858评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,517评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,148评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,162评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,905评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,537评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,439评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,956评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,083评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,218评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,899评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,565评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,093评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,201评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,539评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,215评论 2 358