聊聊rocketmq5的PushConsumer

本文主要研究一下rocketmq5的PushConsumer的消费逻辑

PushConsumerImpl

org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java

class PushConsumerImpl extends ConsumerImpl implements PushConsumer {

    //......

    protected void startUp() throws Exception {
        try {
            log.info("Begin to start the rocketmq push consumer, clientId={}", clientId);
            GaugeObserver gaugeObserver = new ProcessQueueGaugeObserver(processQueueTable, clientId, consumerGroup);
            this.clientMeterManager.setGaugeObserver(gaugeObserver);
            super.startUp();
            final ScheduledExecutorService scheduler = this.getClientManager().getScheduler();
            this.consumeService = createConsumeService();
            // Scan assignments periodically.
            scanAssignmentsFuture = scheduler.scheduleWithFixedDelay(() -> {
                try {
                    scanAssignments();
                } catch (Throwable t) {
                    log.error("Exception raised while scanning the load assignments, clientId={}", clientId, t);
                }
            }, 1, 5, TimeUnit.SECONDS);
            log.info("The rocketmq push consumer starts successfully, clientId={}", clientId);
        } catch (Throwable t) {
            log.error("Exception raised while starting the rocketmq push consumer, clientId={}", clientId, t);
            shutDown();
            throw t;
        }
    }

    //......

}

PushConsumerImpl实现了AbstractIdleService的startUp方法,该方法会每隔5秒定时调度执行scanAssignments

scanAssignments

    @VisibleForTesting
    void scanAssignments() {
        try {
            log.debug("Start to scan assignments periodically, clientId={}", clientId);
            for (Map.Entry<String, FilterExpression> entry : subscriptionExpressions.entrySet()) {
                final String topic = entry.getKey();
                final FilterExpression filterExpression = entry.getValue();
                final Assignments existed = cacheAssignments.get(topic);
                final ListenableFuture<Assignments> future = queryAssignment(topic);
                Futures.addCallback(future, new FutureCallback<Assignments>() {
                    @Override
                    public void onSuccess(Assignments latest) {
                        if (latest.getAssignmentList().isEmpty()) {
                            if (null == existed || existed.getAssignmentList().isEmpty()) {
                                log.info("Acquired empty assignments from remote, would scan later, topic={}, "
                                    + "clientId={}", topic, clientId);
                                return;
                            }
                            log.info("Attention!!! acquired empty assignments from remote, but existed assignments"
                                + " is not empty, topic={}, clientId={}", topic, clientId);
                        }

                        if (!latest.equals(existed)) {
                            log.info("Assignments of topic={} has changed, {} => {}, clientId={}", topic, existed,
                                latest, clientId);
                            syncProcessQueue(topic, latest, filterExpression);
                            cacheAssignments.put(topic, latest);
                            return;
                        }
                        log.debug("Assignments of topic={} remains the same, assignments={}, clientId={}", topic,
                            existed, clientId);
                        // Process queue may be dropped, need to be synchronized anyway.
                        syncProcessQueue(topic, latest, filterExpression);
                    }

                    @Override
                    public void onFailure(Throwable t) {
                        log.error("Exception raised while scanning the assignments, topic={}, clientId={}", topic,
                            clientId, t);
                    }
                }, MoreExecutors.directExecutor());
            }
        } catch (Throwable t) {
            log.error("Exception raised while scanning the assignments for all topics, clientId={}", clientId, t);
        }
    }

scanAssignments通过queryAssignment(topic)查询Assignments,然后执行syncProcessQueue

syncProcessQueue

    void syncProcessQueue(String topic, Assignments assignments, FilterExpression filterExpression) {
        Set<MessageQueueImpl> latest = new HashSet<>();

        final List<Assignment> assignmentList = assignments.getAssignmentList();
        for (Assignment assignment : assignmentList) {
            latest.add(assignment.getMessageQueue());
        }

        Set<MessageQueueImpl> activeMqs = new HashSet<>();

        for (Map.Entry<MessageQueueImpl, ProcessQueue> entry : processQueueTable.entrySet()) {
            final MessageQueueImpl mq = entry.getKey();
            final ProcessQueue pq = entry.getValue();
            if (!topic.equals(mq.getTopic())) {
                continue;
            }

            if (!latest.contains(mq)) {
                log.info("Drop message queue according to the latest assignmentList, mq={}, clientId={}", mq,
                    clientId);
                dropProcessQueue(mq);
                continue;
            }

            if (pq.expired()) {
                log.warn("Drop message queue because it is expired, mq={}, clientId={}", mq, clientId);
                dropProcessQueue(mq);
                continue;
            }
            activeMqs.add(mq);
        }

        for (MessageQueueImpl mq : latest) {
            if (activeMqs.contains(mq)) {
                continue;
            }
            final Optional<ProcessQueue> optionalProcessQueue = createProcessQueue(mq, filterExpression);
            if (optionalProcessQueue.isPresent()) {
                log.info("Start to fetch message from remote, mq={}, clientId={}", mq, clientId);
                optionalProcessQueue.get().fetchMessageImmediately();
            }
        }
    }

syncProcessQueue方法获取维护最新的以及已有的MessageQueueImpl,然后进行遍历,对于新的执行createProcessQueue及其fetchMessageImmediately方法

createProcessQueue

    protected Optional<ProcessQueue> createProcessQueue(MessageQueueImpl mq, final FilterExpression filterExpression) {
        final ProcessQueueImpl processQueue = new ProcessQueueImpl(this, mq, filterExpression);
        final ProcessQueue previous = processQueueTable.putIfAbsent(mq, processQueue);
        if (null != previous) {
            return Optional.empty();
        }
        return Optional.of(processQueue);
    }

createProcessQueue会创建ProcessQueueImpl,并维护到processQueueTable中

ProcessQueueImpl

org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java

    public void fetchMessageImmediately() {
        receiveMessageImmediately();
    }

    private void receiveMessageImmediately() {
        receiveMessageImmediately(this.generateAttemptId());
    }

    private void receiveMessageImmediately(String attemptId) {
        final ClientId clientId = consumer.getClientId();
        if (!consumer.isRunning()) {
            log.info("Stop to receive message because consumer is not running, mq={}, clientId={}", mq, clientId);
            return;
        }
        try {
            final Endpoints endpoints = mq.getBroker().getEndpoints();
            final int batchSize = this.getReceptionBatchSize();
            final Duration longPollingTimeout = consumer.getPushConsumerSettings().getLongPollingTimeout();
            final ReceiveMessageRequest request = consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression,
                longPollingTimeout, attemptId);
            activityNanoTime = System.nanoTime();

            // Intercept before message reception.
            final MessageInterceptorContextImpl context = new MessageInterceptorContextImpl(MessageHookPoints.RECEIVE);
            consumer.doBefore(context, Collections.emptyList());

            final ListenableFuture<ReceiveMessageResult> future = consumer.receiveMessage(request, mq,
                longPollingTimeout);
            Futures.addCallback(future, new FutureCallback<ReceiveMessageResult>() {
                @Override
                public void onSuccess(ReceiveMessageResult result) {
                    // Intercept after message reception.
                    final List<GeneralMessage> generalMessages = result.getMessageViewImpls().stream()
                        .map((Function<MessageView, GeneralMessage>) GeneralMessageImpl::new)
                        .collect(Collectors.toList());
                    final MessageInterceptorContextImpl context0 =
                        new MessageInterceptorContextImpl(context, MessageHookPointsStatus.OK);
                    consumer.doAfter(context0, generalMessages);

                    try {
                        onReceiveMessageResult(result);
                    } catch (Throwable t) {
                        // Should never reach here.
                        log.error("[Bug] Exception raised while handling receive result, mq={}, endpoints={}, "
                            + "clientId={}", mq, endpoints, clientId, t);
                        onReceiveMessageException(t, attemptId);
                    }
                }

                @Override
                public void onFailure(Throwable t) {
                    String nextAttemptId = null;
                    if (t instanceof StatusRuntimeException) {
                        StatusRuntimeException exception = (StatusRuntimeException) t;
                        if (org.apache.rocketmq.shaded.io.grpc.Status.DEADLINE_EXCEEDED.getCode() == exception.getStatus().getCode()) {
                            nextAttemptId = request.getAttemptId();
                        }
                    }
                    // Intercept after message reception.
                    final MessageInterceptorContextImpl context0 =
                        new MessageInterceptorContextImpl(context, MessageHookPointsStatus.ERROR);
                    consumer.doAfter(context0, Collections.emptyList());

                    log.error("Exception raised during message reception, mq={}, endpoints={}, attemptId={}, " +
                            "nextAttemptId={}, clientId={}", mq, endpoints, request.getAttemptId(), nextAttemptId,
                        clientId, t);
                    onReceiveMessageException(t, nextAttemptId);
                }
            }, MoreExecutors.directExecutor());
            receptionTimes.getAndIncrement();
            consumer.getReceptionTimes().getAndIncrement();
        } catch (Throwable t) {
            log.error("Exception raised during message reception, mq={}, clientId={}", mq, clientId, t);
            onReceiveMessageException(t, attemptId);
        }
    }        

receiveMessageImmediately方法执行consumer.receiveMessage,成功时执行onReceiveMessageResult,异常时执行onReceiveMessageException

onReceiveMessageResult

    private void onReceiveMessageResult(ReceiveMessageResult result) {
        final List<MessageViewImpl> messages = result.getMessageViewImpls();
        if (!messages.isEmpty()) {
            cacheMessages(messages);
            receivedMessagesQuantity.getAndAdd(messages.size());
            consumer.getReceivedMessagesQuantity().getAndAdd(messages.size());
            consumer.getConsumeService().consume(this, messages);
        }
        receiveMessage();
    }

    public void receiveMessage() {
        receiveMessage(this.generateAttemptId());
    }

    public void receiveMessage(String attemptId) {
        final ClientId clientId = consumer.getClientId();
        if (dropped) {
            log.info("Process queue has been dropped, no longer receive message, mq={}, clientId={}", mq, clientId);
            return;
        }
        if (this.isCacheFull()) {
            log.warn("Process queue cache is full, would receive message later, mq={}, clientId={}", mq, clientId);
            receiveMessageLater(RECEIVING_BACKOFF_DELAY_WHEN_CACHE_IS_FULL, attemptId);
            return;
        }
        receiveMessageImmediately(attemptId);
    }    

onReceiveMessageResult执行consumer.getConsumeService().consume(this, messages),最后再执行receiveMessage,它会判断isCacheFull,为true则返回,否则再次出发receiveMessageImmediately

onReceiveMessageException

    public void onReceiveMessageException(Throwable t, String attemptId) {
        Duration delay = t instanceof TooManyRequestsException ? RECEIVING_FLOW_CONTROL_BACKOFF_DELAY :
            RECEIVING_FAILURE_BACKOFF_DELAY;
        receiveMessageLater(delay, attemptId);
    }

    private void receiveMessageLater(Duration delay, String attemptId) {
        final ClientId clientId = consumer.getClientId();
        final ScheduledExecutorService scheduler = consumer.getScheduler();
        try {
            log.info("Try to receive message later, mq={}, delay={}, clientId={}", mq, delay, clientId);
            scheduler.schedule(() -> receiveMessage(attemptId), delay.toNanos(), TimeUnit.NANOSECONDS);
        } catch (Throwable t) {
            if (scheduler.isShutdown()) {
                return;
            }
            // Should never reach here.
            log.error("[Bug] Failed to schedule message receiving request, mq={}, clientId={}", mq, clientId, t);
            onReceiveMessageException(t, attemptId);
        }
    }    

onReceiveMessageException会判断是不是TooManyRequestsException异常,是则delay取RECEIVING_FLOW_CONTROL_BACKOFF_DELAY(Duration.ofMillis(20)),否则取RECEIVING_FAILURE_BACKOFF_DELAY(Duration.ofSeconds(1)),最后执行receiveMessageLater,它会延时调度执行receiveMessage(attemptId)

ConsumeService

org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java

public abstract class ConsumeService {
    private static final Logger log = LoggerFactory.getLogger(ConsumeService.class);

    protected final ClientId clientId;
    private final MessageListener messageListener;
    private final ThreadPoolExecutor consumptionExecutor;
    private final MessageInterceptor messageInterceptor;
    private final ScheduledExecutorService scheduler;

    public ConsumeService(ClientId clientId, MessageListener messageListener, ThreadPoolExecutor consumptionExecutor,
        MessageInterceptor messageInterceptor, ScheduledExecutorService scheduler) {
        this.clientId = clientId;
        this.messageListener = messageListener;
        this.consumptionExecutor = consumptionExecutor;
        this.messageInterceptor = messageInterceptor;
        this.scheduler = scheduler;
    }

    public abstract void consume(ProcessQueue pq, List<MessageViewImpl> messageViews);

    public ListenableFuture<ConsumeResult> consume(MessageViewImpl messageView) {
        return consume(messageView, Duration.ZERO);
    }

    public ListenableFuture<ConsumeResult> consume(MessageViewImpl messageView, Duration delay) {
        final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(consumptionExecutor);
        final ConsumeTask task = new ConsumeTask(clientId, messageListener, messageView, messageInterceptor);
        // Consume message with no delay.
        if (Duration.ZERO.compareTo(delay) >= 0) {
            return executorService.submit(task);
        }
        final SettableFuture<ConsumeResult> future0 = SettableFuture.create();
        scheduler.schedule(() -> {
            final ListenableFuture<ConsumeResult> future = executorService.submit(task);
            Futures.addCallback(future, new FutureCallback<ConsumeResult>() {
                @Override
                public void onSuccess(ConsumeResult consumeResult) {
                    future0.set(consumeResult);
                }

                @Override
                public void onFailure(Throwable t) {
                    // Should never reach here.
                    log.error("[Bug] Exception raised while submitting scheduled consumption task, clientId={}",
                        clientId, t);
                }
            }, MoreExecutors.directExecutor());
        }, delay.toNanos(), TimeUnit.NANOSECONDS);
        return future0;
    }
}

ConsumeService是个抽象类,它定义了consume方法接收ProcessQueue及批量messageViews,同时它还内置了consume单个messageView的方法,支持delay参数,该方法主要就是创建ConsumeTask,然后往executorService提交ConsumeTask

ConsumeTask

org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java

public class ConsumeTask implements Callable<ConsumeResult> {
    private static final Logger log = LoggerFactory.getLogger(ConsumeTask.class);

    private final ClientId clientId;
    private final MessageListener messageListener;
    private final MessageViewImpl messageView;
    private final MessageInterceptor messageInterceptor;

    public ConsumeTask(ClientId clientId, MessageListener messageListener, MessageViewImpl messageView,
        MessageInterceptor messageInterceptor) {
        this.clientId = clientId;
        this.messageListener = messageListener;
        this.messageView = messageView;
        this.messageInterceptor = messageInterceptor;
    }

    /**
     * Invoke {@link MessageListener} to consumer message.
     *
     * @return message(s) which is consumed successfully.
     */
    @Override
    public ConsumeResult call() {
        ConsumeResult consumeResult;
        final List<GeneralMessage> generalMessages = Collections.singletonList(new GeneralMessageImpl(messageView));
        MessageInterceptorContextImpl context = new MessageInterceptorContextImpl(MessageHookPoints.CONSUME);
        messageInterceptor.doBefore(context, generalMessages);
        try {
            consumeResult = messageListener.consume(messageView);
        } catch (Throwable t) {
            log.error("Message listener raised an exception while consuming messages, clientId={}, mq={}, " +
                "messageId={}", clientId, messageView.getMessageQueue(), messageView.getMessageId(), t);
            // If exception was thrown during the period of message consumption, mark it as failure.
            consumeResult = ConsumeResult.FAILURE;
        }
        MessageHookPointsStatus status = ConsumeResult.SUCCESS.equals(consumeResult) ? MessageHookPointsStatus.OK :
            MessageHookPointsStatus.ERROR;
        context = new MessageInterceptorContextImpl(context, status);
        messageInterceptor.doAfter(context, generalMessages);
        // Make sure that the return value is the subset of messageViews.
        return consumeResult;
    }
}

ConsumeTask实现了Callable接口,其call方法主要是执行messageListener.consume(messageView),同时处理messageInterceptor的doBefore及doAfter

StandardConsumeService

org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java

public class StandardConsumeService extends ConsumeService {
    private static final Logger log = LoggerFactory.getLogger(StandardConsumeService.class);

    public StandardConsumeService(ClientId clientId, MessageListener messageListener,
        ThreadPoolExecutor consumptionExecutor, MessageInterceptor messageInterceptor,
        ScheduledExecutorService scheduler) {
        super(clientId, messageListener, consumptionExecutor, messageInterceptor, scheduler);
    }

    @Override
    public void consume(ProcessQueue pq, List<MessageViewImpl> messageViews) {
        for (MessageViewImpl messageView : messageViews) {
            // Discard corrupted message.
            if (messageView.isCorrupted()) {
                log.error("Message is corrupted for standard consumption, prepare to discard it, mq={}, "
                    + "messageId={}, clientId={}", pq.getMessageQueue(), messageView.getMessageId(), clientId);
                pq.discardMessage(messageView);
                continue;
            }
            final ListenableFuture<ConsumeResult> future = consume(messageView);
            Futures.addCallback(future, new FutureCallback<ConsumeResult>() {
                @Override
                public void onSuccess(ConsumeResult consumeResult) {
                    pq.eraseMessage(messageView, consumeResult);
                }

                @Override
                public void onFailure(Throwable t) {
                    // Should never reach here.
                    log.error("[Bug] Exception raised in consumption callback, clientId={}", clientId, t);
                }
            }, MoreExecutors.directExecutor());
        }
    }
}

StandardConsumeService继承了ConsumeService,其consume方法遍历messageViews,挨个执行父类的consume方法(创建ConsumeTask,然后提交到executorService执行),之后针对该future注册FutureCallback,onSuccess的时候执行pq.eraseMessage(messageView, consumeResult)

FifoConsumeService

org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java

class FifoConsumeService extends ConsumeService {
    private static final Logger log = LoggerFactory.getLogger(FifoConsumeService.class);

    public FifoConsumeService(ClientId clientId, MessageListener messageListener,
        ThreadPoolExecutor consumptionExecutor, MessageInterceptor messageInterceptor,
        ScheduledExecutorService scheduler) {
        super(clientId, messageListener, consumptionExecutor, messageInterceptor, scheduler);
    }

    @Override
    public void consume(ProcessQueue pq, List<MessageViewImpl> messageViews) {
        consumeIteratively(pq, messageViews.iterator());
    }

    public void consumeIteratively(ProcessQueue pq, Iterator<MessageViewImpl> iterator) {
        if (!iterator.hasNext()) {
            return;
        }
        final MessageViewImpl messageView = iterator.next();
        if (messageView.isCorrupted()) {
            // Discard corrupted message.
            log.error("Message is corrupted for FIFO consumption, prepare to discard it, mq={}, messageId={}, "
                + "clientId={}", pq.getMessageQueue(), messageView.getMessageId(), clientId);
            pq.discardFifoMessage(messageView);
            consumeIteratively(pq, iterator);
            return;
        }
        final ListenableFuture<ConsumeResult> future0 = consume(messageView);
        ListenableFuture<Void> future = Futures.transformAsync(future0, result -> pq.eraseFifoMessage(messageView,
            result), MoreExecutors.directExecutor());
        future.addListener(() -> consumeIteratively(pq, iterator), MoreExecutors.directExecutor());
    }
}

FifoConsumeService继承了ConsumeService,其consume方法获取messageViews的iterator,然后执行consumeIteratively,该方法通过iterator.next()获取一条记录,然后调用父类的consume方法(创建ConsumeTask,然后提交到executorService执行),之后针对该future0注册了一个function执行pq.eraseMessage(messageView, consumeResult),同时它对这个新future增加了listener在其执行完毕之后继续执行consumeIteratively处理下一个消息

小结

  • rocketmq5的PushConsumerImpl实现了AbstractIdleService的startUp方法,该方法会每隔5秒定时调度执行scanAssignments;scanAssignments通过queryAssignment(topic)查询Assignments,然后执行syncProcessQueue;syncProcessQueue方法获取维护最新的以及已有的MessageQueueImpl,然后进行遍历,对于新的执行createProcessQueue及其fetchMessageImmediately方法
  • receiveMessageImmediately方法执行consumer.receiveMessage,成功时执行onReceiveMessageResult,异常时执行onReceiveMessageException;onReceiveMessageResult执行consumer.getConsumeService().consume(this, messages),最后再执行receiveMessage,它会判断isCacheFull,为true则返回,否则再次出发receiveMessageImmediately;onReceiveMessageException会判断是不是TooManyRequestsException异常,是则delay取RECEIVING_FLOW_CONTROL_BACKOFF_DELAY(Duration.ofMillis(20)),否则取RECEIVING_FAILURE_BACKOFF_DELAY(Duration.ofSeconds(1)),最后执行receiveMessageLater,它会延时调度执行receiveMessage(attemptId)
  • ConsumeService是个抽象类,它定义了consume方法接收ProcessQueue及批量messageViews,同时它还内置了consume单个messageView的方法,支持delay参数,该方法主要就是创建ConsumeTask,然后往executorService提交ConsumeTask;ConsumeTask实现了Callable接口,其call方法主要是执行messageListener.consume(messageView),同时处理messageInterceptor的doBefore及doAfter;StandardConsumeService是遍历messageViews,挨个执行父类的consume方法,同时对该future注册FutureCallback,onSuccess的时候执行pq.eraseMessage(messageView, consumeResult);FifoConsumeService则是利用iterator以及listener通过递归调用consumeIteratively实现消息的顺序消费
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容