简析Flink状态生存时间(State TTL)机制的底层实现

前言

很久没写过源码走读类型的文章了。最近在做业务需求时用Flink的State TTL非常多,今天就来探索一下吧。

从Flink 1.6版本开始,社区为状态引入了TTL(time-to-live,生存时间)机制,支持Keyed State的自动过期,有效解决了状态数据在无干预情况下无限增长导致OOM的问题。State TTL的用法很简单,官方文档中给出的示例代码如下。

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
    
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

那么State TTL的背后又隐藏着什么样的思路呢?下面就从设置类StateTtlConfig入手开始研究(Flink代码版本为1.9.3)。

StateTtlConfig

该类中有5个成员属性,它们就是用户需要指定的全部参数了。

private final UpdateType updateType;
private final StateVisibility stateVisibility;
private final TtlTimeCharacteristic ttlTimeCharacteristic;
private final Time ttl;
private final CleanupStrategies cleanupStrategies;

其中,ttl参数表示用户设定的状态生存时间。而UpdateType、StateVisibility和TtlTimeCharacteristic都是枚举,分别代表状态时间戳的更新方式、过期状态数据的可见性,以及对应的时间特征。它们的含义在注释中已经解释得很清楚了。

/**
 * This option value configures when to update last access timestamp which prolongs state TTL.
 */
public enum UpdateType {
    /** TTL is disabled. State does not expire. */
    Disabled,
    /** Last access timestamp is initialised when state is created and updated on every write operation. */
    OnCreateAndWrite,
    /** The same as <code>OnCreateAndWrite</code> but also updated on read. */
    OnReadAndWrite
}
/**
 * This option configures whether expired user value can be returned or not.
 */
public enum StateVisibility {
    /** Return expired user value if it is not cleaned up yet. */
    ReturnExpiredIfNotCleanedUp,
    /** Never return expired user value. */
    NeverReturnExpired
}
/**
 * This option configures time scale to use for ttl.
 */
public enum TtlTimeCharacteristic {
    /** Processing time, see also <code>org.apache.flink.streaming.api.TimeCharacteristic.ProcessingTime</code>. */
    ProcessingTime
}

Flink目前仅支持基于处理时间的State TTL,事件时间会在不久的将来支持。

CleanupStrategies内部类则用来规定过期状态的特殊清理策略,用户在构造StateTtlConfig时,可以通过调用以下方法之一指定。

  • cleanupFullSnapshot()
    当对状态做全量快照时清理过期数据,对开启了增量检查点(incremental checkpoint)的RocksDB状态后端无效,对应源码中的EmptyCleanupStrategy。
    为什么叫做“空的”清理策略呢?因为该选项只能保证状态持久化时不包含过期数据,但TaskManager本地的过期状态则不作任何处理,所以无法从根本上解决OOM的问题,需要定期重启作业。
  • cleanupIncrementally(int cleanupSize, boolean runCleanupForEveryRecord)
    增量清理过期数据,默认在每次访问状态时进行清理,将runCleanupForEveryRecord设为true可以附加在每次写入/删除时清理。cleanupSize指定每次触发清理时检查的状态条数。
    仅对基于堆的状态后端有效,对应源码中的IncrementalCleanupStrategy。
  • cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries)
    当RocksDB做compaction操作时,通过Flink定制的过滤器(FlinkCompactionFilter)过滤掉过期状态数据。参数queryTimeAfterNumEntries用于指定在写入多少条状态数据后,通过状态时间戳来判断是否过期。
    该策略仅对RocksDB状态后端有效,对应源码中的RocksdbCompactFilterCleanupStrategy。CompactionFilter是RocksDB原生提供的机制,其说明可见这里

如果不调用上述方法,则采用默认的后台清理策略,下文有讲。

TtlStateFactory、TtlStateContext

在所有Keyed State状态后端的抽象基类AbstractKeyedStateBackend中,创建并记录一个状态实例的方法如下。

    @Override
    @SuppressWarnings("unchecked")
    public <N, S extends State, V> S getOrCreateKeyedState(
            final TypeSerializer<N> namespaceSerializer,
            StateDescriptor<S, V> stateDescriptor) throws Exception {
        checkNotNull(namespaceSerializer, "Namespace serializer");
        checkNotNull(keySerializer, "State key serializer has not been configured in the config. " +
                "This operation cannot use partitioned state.");

        InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());
        if (kvState == null) {
            if (!stateDescriptor.isSerializerInitialized()) {
                stateDescriptor.initializeSerializerUnlessSet(executionConfig);
            }
            kvState = TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
                namespaceSerializer, stateDescriptor, this, ttlTimeProvider);
            keyValueStatesByName.put(stateDescriptor.getName(), kvState);
            publishQueryableStateIfEnabled(stateDescriptor, kvState);
        }
        return (S) kvState;
    }

可见是调用了TtlStateFactory.createStateAndWrapWithTtlIfEnabled()方法来真正创建。顾名思义,TtlStateFactory是产生TTL状态的工厂类。

public static <K, N, SV, TTLSV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
    TypeSerializer<N> namespaceSerializer,
    StateDescriptor<S, SV> stateDesc,
    KeyedStateBackend<K> stateBackend,
    TtlTimeProvider timeProvider) throws Exception {
    Preconditions.checkNotNull(namespaceSerializer);
    Preconditions.checkNotNull(stateDesc);
    Preconditions.checkNotNull(stateBackend);
    Preconditions.checkNotNull(timeProvider);
    return  stateDesc.getTtlConfig().isEnabled() ?
        new TtlStateFactory<K, N, SV, TTLSV, S, IS>(
            namespaceSerializer, stateDesc, stateBackend, timeProvider)
            .createState() :
        stateBackend.createInternalState(namespaceSerializer, stateDesc);
}

由上可知,如果我们为状态描述符StateDescriptor加入了TTL,那么就会调用TtlStateFactory.createState()方法创建一个带有TTL的状态实例;否则,就调用StateBackend.createInternalState()创建一个普通的状态实例。TtlStateFactory.createState()的代码如下。

@SuppressWarnings("unchecked")
private IS createState() throws Exception {
    SupplierWithException<IS, Exception> stateFactory = stateFactories.get(stateDesc.getClass());
    if (stateFactory == null) {
        String message = String.format("State %s is not supported by %s",
            stateDesc.getClass(), TtlStateFactory.class);
        throw new FlinkRuntimeException(message);
    }
    IS state = stateFactory.get();
    if (incrementalCleanup != null) {
        incrementalCleanup.setTtlState((AbstractTtlState<K, N, ?, TTLSV, ?>) state);
    }
    return state;
}

其中,stateFactories是一个Map结构,维护了各种状态描述符与对应产生该种状态对象的工厂方法映射。所有的工厂方法都被包装成了Supplier(Java 8提供的函数式接口),所以在上述createState()方法中,可以通过Supplier.get()方法来实际执行createTtl*State()工厂方法,并获得新的状态实例。

this.stateFactories = createStateFactories();

@SuppressWarnings("deprecation")
private Map<Class<? extends StateDescriptor>, SupplierWithException<IS, Exception>> createStateFactories() {
    return Stream.of(
        Tuple2.of(ValueStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createValueState),
        Tuple2.of(ListStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createListState),
        Tuple2.of(MapStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createMapState),
        Tuple2.of(ReducingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createReducingState),
        Tuple2.of(AggregatingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createAggregatingState),
        Tuple2.of(FoldingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createFoldingState)
    ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
}

@SuppressWarnings("unchecked")
private IS createValueState() throws Exception {
    ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>(
        stateDesc.getName(), new TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer()));
    return (IS) new TtlValueState<>(createTtlStateContext(ttlDescriptor));
}

@SuppressWarnings("unchecked")
private <T> IS createListState() throws Exception {
    ListStateDescriptor<T> listStateDesc = (ListStateDescriptor<T>) stateDesc;
    ListStateDescriptor<TtlValue<T>> ttlDescriptor = new ListStateDescriptor<>(
        stateDesc.getName(), new TtlSerializer<>(LongSerializer.INSTANCE, listStateDesc.getElementSerializer()));
    return (IS) new TtlListState<>(createTtlStateContext(ttlDescriptor));
}

// 以下略去...

可见,带有TTL的状态类名其实就是普通状态类名加上Ttl前缀,只是没有公开给用户而已。并且在生成Ttl*State时,还会通过createTtlStateContext()方法生成TTL状态的上下文。

@SuppressWarnings("unchecked")
private <OIS extends State, TTLS extends State, V, TTLV> TtlStateContext<OIS, V>
    createTtlStateContext(StateDescriptor<TTLS, TTLV> ttlDescriptor) throws Exception {
    ttlDescriptor.enableTimeToLive(stateDesc.getTtlConfig()); // also used by RocksDB backend for TTL compaction filter config
    OIS originalState = (OIS) stateBackend.createInternalState(
        namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory());
    return new TtlStateContext<>(
        originalState, ttlConfig, timeProvider, (TypeSerializer<V>) stateDesc.getSerializer(),
        registerTtlIncrementalCleanupCallback((InternalKvState<?, ?, ?>) originalState));
}

TtlStateContext的本质是对以下几个实例做了封装。

  • 原始State(通过StateBackend.createInternalState()方法创建)及其序列化器(通过StateDescriptor.getSerializer()方法取得);
  • StateTtlConfig,前文已经讲过;
  • TtlTimeProvider,用来提供判断状态过期标准的时间戳。当前只是简单地代理了System.currentTimeMillis(),没有任何其他代码;
  • 一个Runnable类型的回调方法,通过registerTtlIncrementalCleanupCallback()方法产生,用于状态数据的增量清理,后面会看到它的用途。

接下来就具体看看TTL状态是如何实现的。

AbstractTtlState、AbstractTtlDecorator

在解说之前,先放一幅类图。

所有Ttl*State都是AbstractTtlState的子类,而AbstractTtlState又是装饰器AbstractTtlDecorator的子类。AbstractTtlDecorator提供了最基本的TTL逻辑,代码不长,全部抄录如下。

abstract class AbstractTtlDecorator<T> {
    /** Wrapped original state handler. */
    final T original;
    final StateTtlConfig config;
    final TtlTimeProvider timeProvider;
    /** Whether to renew expiration timestamp on state read access. */
    final boolean updateTsOnRead;
    /** Whether to renew expiration timestamp on state read access. */
    final boolean returnExpired;
    /** State value time to live in milliseconds. */
    final long ttl;

    AbstractTtlDecorator(
        T original,
        StateTtlConfig config,
        TtlTimeProvider timeProvider) {
        // ......
    }

    <V> V getUnexpired(TtlValue<V> ttlValue) {
        return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue();
    }

    <V> boolean expired(TtlValue<V> ttlValue) {
        return TtlUtils.expired(ttlValue, ttl, timeProvider);
    }

    <V> TtlValue<V> wrapWithTs(V value) {
        return TtlUtils.wrapWithTs(value, timeProvider.currentTimestamp());
    }

    <V> TtlValue<V> rewrapWithNewTs(TtlValue<V> ttlValue) {
        return wrapWithTs(ttlValue.getUserValue());
    }

    <SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> V getWithTtlCheckAndUpdate(
        SupplierWithException<TtlValue<V>, SE> getter,
        ThrowingConsumer<TtlValue<V>, CE> updater,
        ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
        TtlValue<V> ttlValue = getWrappedWithTtlCheckAndUpdate(getter, updater, stateClear);
        return ttlValue == null ? null : ttlValue.getUserValue();
    }

    <SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> TtlValue<V> getWrappedWithTtlCheckAndUpdate(
        SupplierWithException<TtlValue<V>, SE> getter,
        ThrowingConsumer<TtlValue<V>, CE> updater,
        ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
        TtlValue<V> ttlValue = getter.get();
        if (ttlValue == null) {
            return null;
        } else if (expired(ttlValue)) {
            stateClear.run();
            if (!returnExpired) {
                return null;
            }
        } else if (updateTsOnRead) {
            updater.accept(rewrapWithNewTs(ttlValue));
        }
        return ttlValue;
    }
}

它的成员属性比较容易理解,例如,updateTsOnRead表示在读取状态值时也更新时间戳(即UpdateType.OnReadAndWrite),returnExpired表示即使状态过期,在被真正删除之前也返回它的值(即StateVisibility.ReturnExpiredIfNotCleanedUp)。

状态值与TTL的包装(成为TtlValue)以及过期检测都由工具类TtlUtils来负责,思路很简单,代码如下。

public class TtlUtils {
    static <V> boolean expired(@Nullable TtlValue<V> ttlValue, long ttl, TtlTimeProvider timeProvider) {
        return expired(ttlValue, ttl, timeProvider.currentTimestamp());
    }

    static <V> boolean expired(@Nullable TtlValue<V> ttlValue, long ttl, long currentTimestamp) {
        return ttlValue != null && expired(ttlValue.getLastAccessTimestamp(), ttl, currentTimestamp);
    }

    static boolean expired(long ts, long ttl, TtlTimeProvider timeProvider) {
        return expired(ts, ttl, timeProvider.currentTimestamp());
    }

    public static boolean expired(long ts, long ttl, long currentTimestamp) {
        return getExpirationTimestamp(ts, ttl) <= currentTimestamp;
    }

    private static long getExpirationTimestamp(long ts, long ttl) {
        long ttlWithoutOverflow = ts > 0 ? Math.min(Long.MAX_VALUE - ts, ttl) : ttl;
        return ts + ttlWithoutOverflow;
    }

    static <V> TtlValue<V> wrapWithTs(V value, long ts) {
        return new TtlValue<>(value, ts);
    }
}

TtlValue的属性只有两个:状态值和时间戳,代码略去。

AbstractTtlDecorator核心方法是获取状态值的getWrappedWithTtlCheckAndUpdate(),它接受三个参数:

  • getter:一个可抛出异常的Supplier,用于获取状态值;
  • updater:一个可抛出异常的Consumer,用于更新状态的时间戳;
  • stateClear:一个可抛出异常的Runnable,用于异步删除过期状态。

可见,在默认情况下的后台清理策略是:只有状态值被读取时,才会做过期检测,并异步清除过期的状态。这种惰性清理的机制会导致那些实际已经过期但从未被再次访问过的状态无法被删除,需要特别注意。官方文档中也已有提示:

By default, expired values are explicitly removed on read, such as ValueState#value, and periodically garbage collected in the background if supported by the configured state backend.

当确认到状态过期时,会调用stateClear的逻辑进行删除;如果需要在读取时顺便更新状态的时间戳,会调用updater的逻辑重新包装一个TtlValue。

AbstractTtlState的代码更加简单,主要的方法列举如下。

final Runnable accessCallback;

<SE extends Throwable, CE extends Throwable, T> T getWithTtlCheckAndUpdate(
    SupplierWithException<TtlValue<T>, SE> getter,
    ThrowingConsumer<TtlValue<T>, CE> updater) throws SE, CE {
    return getWithTtlCheckAndUpdate(getter, updater, original::clear);
}

@Override
public void clear() {
    original.clear();
    accessCallback.run();
}

其中,accessCallback就是TtlStateContext中注册的增量清理回调。

下面以TtlMapState为例,看看具体的TTL状态如何利用上文所述的这些实现。

TtlMapState

以下是部分代码。

class TtlMapState<K, N, UK, UV>
    extends AbstractTtlState<K, N, Map<UK, UV>, Map<UK, TtlValue<UV>>, InternalMapState<K, N, UK, TtlValue<UV>>>
    implements InternalMapState<K, N, UK, UV> {
    TtlMapState(TtlStateContext<InternalMapState<K, N, UK, TtlValue<UV>>, Map<UK, UV>> ttlStateContext) {
        super(ttlStateContext);
    }

    @Override
    public UV get(UK key) throws Exception {
        TtlValue<UV> ttlValue = getWrapped(key);
        return ttlValue == null ? null : ttlValue.getUserValue();
    }

    private TtlValue<UV> getWrapped(UK key) throws Exception {
        accessCallback.run();
        return getWrappedWithTtlCheckAndUpdate(
            () -> original.get(key), v -> original.put(key, v), () -> original.remove(key));
    }

    @Override
    public void put(UK key, UV value) throws Exception {
        accessCallback.run();
        original.put(key, wrapWithTs(value));
    }

    @Override
    public void putAll(Map<UK, UV> map) throws Exception {
        accessCallback.run();
        if (map == null) {
            return;
        }
        Map<UK, TtlValue<UV>> ttlMap = new HashMap<>(map.size());
        long currentTimestamp = timeProvider.currentTimestamp();
        for (Map.Entry<UK, UV> entry : map.entrySet()) {
            UK key = entry.getKey();
            ttlMap.put(key, TtlUtils.wrapWithTs(entry.getValue(), currentTimestamp));
        }
        original.putAll(ttlMap);
    }

    @Override
    public void remove(UK key) throws Exception {
        accessCallback.run();
        original.remove(key);
    }

    @Override
    public boolean contains(UK key) throws Exception {
        TtlValue<UV> ttlValue = getWrapped(key);
        return ttlValue != null;
    }
    
    // ......
}

可见,TtlMapState的增删改查操作都是在原MapState上进行,只是加上了TTL相关的逻辑,这也是装饰器模式的特点。例如,TtlMapState.get()方法调用了上述AbstractTtlDecorator.getWrappedWithTtlCheckAndUpdate()方法,传入的获取(getter)、插入(updater)和删除(stateClear)的逻辑就是原MapState的get()、put()和remove()方法。而TtlMapState.put()只是在调用原MapState的put()方法之前,将状态包装为TtlValue而已。

增量清理策略

另外需要注意,所有增删改查操作之前都需要执行accessCallback.run()方法。如果启用了增量清理策略,该Runnable会通过在状态数据上维护一个全局迭代器向前清理过期数据。如果未启用增量清理策略,accessCallback为空。前文提到过的TtlStateFactory.registerTtlIncrementalCleanupCallback()方法如下。

private Runnable registerTtlIncrementalCleanupCallback(InternalKvState<?, ?, ?> originalState) {
    StateTtlConfig.IncrementalCleanupStrategy config =
        ttlConfig.getCleanupStrategies().getIncrementalCleanupStrategy();
    boolean cleanupConfigured = config != null && incrementalCleanup != null;
    boolean isCleanupActive = cleanupConfigured &&
        isStateIteratorSupported(originalState, incrementalCleanup.getCleanupSize());
    Runnable callback = isCleanupActive ? incrementalCleanup::stateAccessed : () -> { };
    if (isCleanupActive && config.runCleanupForEveryRecord()) {
        stateBackend.registerKeySelectionListener(stub -> callback.run());
    }
    return callback;
}

实际清理的代码则位于TtlIncrementalCleanup类中,stateIterator就是状态数据的迭代器。

void stateAccessed() {
    initIteratorIfNot();
    try {
        runCleanup();
    } catch (Throwable t) {
        throw new FlinkRuntimeException("Failed to incrementally clean up state with TTL", t
    }
}

private void initIteratorIfNot() {
    if (stateIterator == null || !stateIterator.hasNext()) {
        stateIterator = ttlState.original.getStateIncrementalVisitor(cleanupSize);
    }
}

private void runCleanup() {
    int entryNum = 0;
    Collection<StateEntry<K, N, S>> nextEntries;
    while (
        entryNum < cleanupSize &&
        stateIterator.hasNext() &&
        !(nextEntries = stateIterator.nextEntries()).isEmpty()) {
        for (StateEntry<K, N, S> state : nextEntries) {
            S cleanState = ttlState.getUnexpiredOrNull(state.getState());
            if (cleanState == null) {
                stateIterator.remove(state);
            } else if (cleanState != state.getState()) {
                stateIterator.update(state, cleanState);
            }
        }
        entryNum += nextEntries.size();
    }
}

RocksDB压缩过滤清理策略

如果启用了该策略,Flink会通过维护一个RocksDbTtlCompactFiltersManager实例来管理FlinkCompactionFilter过滤器。FlinkCompactionFilter并不是在Flink工程中维护的,而是位于Data Artisans为Flink专门维护的FRocksDB库内。FLINK-10471实现了FlinkCompactionFilter及其附属逻辑,主要为C++代码,通过JNI调用。对应的commit详见GitHub,这里就不班门弄斧了。关于RocksDB的compaction相关细节,笔者之前也写过一篇长文做了些分析。

The End

写的有些乱了,就酱吧。

民那晚安。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 225,226评论 6 524
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 96,509评论 3 405
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 172,523评论 0 370
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 61,181评论 1 302
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 70,189评论 6 401
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 53,642评论 1 316
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 41,993评论 3 431
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 40,977评论 0 280
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 47,527评论 1 326
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 39,547评论 3 347
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 41,661评论 1 355
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 37,250评论 5 351
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,991评论 3 340
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 33,422评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 34,571评论 1 277
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 50,241评论 3 382
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 46,737评论 2 366