Flink 源码之WindowOperator

Flink源码分析系列文档目录

请点击:Flink 源码分析系列文档目录

WindowOperator

WindowOperator负责window中元素存储和计算流程。

WindowOperator包含如下几个重要方法:

  • open:operator初始化的逻辑
  • processElement: 新元素进入window的时候调用
  • onEventTime: event time计算触发时候的逻辑
  • onProcessingTime:processing time计算触发时候的逻辑

open方法

open方法的调用栈如图所示:


open方法调用栈

由图可知,open方法在Task Manager中的Task -> StreamTask的openAllOperator得到调用。

在分析open方法之前,我们需要先分析下WindowOperator类的继承结构。


WindowOperator的继承结构

WindowOperator继承自AbstractUdfStreamOperator,AbstractStreamOperator继承自AbstractStreamOperator。

AbstractStreamOperator的open方法:

    /**
     * This method is called immediately before any elements are processed, it should contain the
     * operator's initialization logic, e.g. state initialization.
     *
     * <p>The default implementation does nothing.
     *
     * @throws Exception An exception in this method causes the operator to fail.
     */
    @Override
    public void open() throws Exception {}

根据英文注释,可以总结open方法的功能如下:

  • open方法在operator处理任何元素之前调用
  • open方法负责operator的初始化工作,例如状态初始化
  • 如果该方法抛出异常,operator会失败,无法进行后续处理数据的流程

AbstractUdfStreamOperator的open代码如下所示:

@Override
public void open() throws Exception {
    super.open();
    FunctionUtils.openFunction(userFunction, new Configuration());
}

FunctionUtils这一行对userFunction进行了处理,稍后分析。userFunction为用户自定义处理方法的封装。比如apply或者process函数中传入的自定义处理逻辑,会被封装为userFunction。

FunctionUtils.openFunction方法:

public static void openFunction(Function function, Configuration parameters) throws Exception{
    if (function instanceof RichFunction) {
        RichFunction richFunction = (RichFunction) function;
        richFunction.open(parameters);
    }
}

如果userFunction使用的是RichFunction类型的话,会调用richFunction的open方法。

接下来分析下WindowOperator的open方法。代码如下:

public void open() throws Exception {
    super.open();

    // 打开迟到被丢弃数据条数统计的监控
    this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
    // 创建一个基于时间戳的数据收集器,用于输出窗口数据到下游
    timestampedCollector = new TimestampedCollector<>(output);

    // 获取时间服务,用于向windowAssignerContext传递当前的processing time
    internalTimerService =
            getInternalTimerService("window-timers", windowSerializer, this);

    // 创建triggerContext和processContext,后面用到时候分析
    triggerContext = new Context(null, null);
    processContext = new WindowContext(null);

    // 创建WindowAssignerContext,主要用于获取processing time
    windowAssignerContext = new WindowAssigner.WindowAssignerContext() {
        @Override
        public long getCurrentProcessingTime() {
            return internalTimerService.currentProcessingTime();
        }
    };

    // create (or restore) the state that hold the actual window contents
    // NOTE - the state may be null in the case of the overriding evicting window operator
    // 创建windowState,用于储存窗口中的数据
    if (windowStateDescriptor != null) {
        windowState = (InternalAppendingState<K, W, IN, ACC, ACC>) getOrCreateKeyedState(windowSerializer, windowStateDescriptor);
    }

    // create the typed and helper states for merging windows
    // 如果windowAssigner是MergingWindowAssigner子类,即使用的是SessionWindow的话,执行if内的内容
    if (windowAssigner instanceof MergingWindowAssigner) {

        // store a typed reference for the state of merging windows - sanity check
        if (windowState instanceof InternalMergingState) {
            windowMergingState = (InternalMergingState<K, W, IN, ACC, ACC>) windowState;
        }
        // TODO this sanity check should be here, but is prevented by an incorrect test (pending validation)
        // TODO see WindowOperatorTest.testCleanupTimerWithEmptyFoldingStateForSessionWindows()
        // TODO activate the sanity check once resolved
//          else if (windowState != null) {
//              throw new IllegalStateException(
//                      "The window uses a merging assigner, but the window state is not mergeable.");
//          }

// 以下逻辑为创建储存window合并的状态变量mergingSetsState
        @SuppressWarnings("unchecked")
        final Class<Tuple2<W, W>> typedTuple = (Class<Tuple2<W, W>>) (Class<?>) Tuple2.class;

        final TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>(
                typedTuple,
                new TypeSerializer[] {windowSerializer, windowSerializer});

        final ListStateDescriptor<Tuple2<W, W>> mergingSetsStateDescriptor =
                new ListStateDescriptor<>("merging-window-set", tupleSerializer);

        // get the state that stores the merging sets
        mergingSetsState = (InternalListState<K, VoidNamespace, Tuple2<W, W>>)
                getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, mergingSetsStateDescriptor);
        mergingSetsState.setCurrentNamespace(VoidNamespace.INSTANCE);
    }
}

processElement方法

有数据到达window的时候,系统会调用processElement方法。代码如下所示:

public void processElement(StreamRecord<IN> element) throws Exception {
    // 获取需要分配给该元素的window。以eventtime时间窗口为例,element的eventtime位于window的起止时间之中,则该窗口需要指派给此元素
    final Collection<W> elementWindows = windowAssigner.assignWindows(
        element.getValue(), element.getTimestamp(), windowAssignerContext);

    //if element is handled by none of assigned elementWindows
    // 返回元素是否被处理,如果元素经过处理,返回false
    boolean isSkippedElement = true;

    // 此处获取到key的值,即keyBy方法字段的值
    // 该KeyedStateBackend在StreamTaskStateInitializerImpl中创建
    final K key = this.<K>getKeyedStateBackend().getCurrentKey();

    // 判断Window是否是MergingWindowAssigner(合并窗口)的子类。比如SessionWindow属于MergingWindowAssigner
    if (windowAssigner instanceof MergingWindowAssigner) {
         // MergingWindowAssigner部分的处理逻辑,代码在后面分析,此处省略
    } else {
        // 非MergingWindowAssigner部分的处理逻辑,代码在后面分析,此处省略
    }

    // side output input event if
    // element not handled by any window
    // late arriving tag has been set
    // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
    // 如果元素没有被window处理,并且元素来迟,会加入到旁路输出
        // 否则此数据被丢弃,迟到被丢弃数据条数监控会增加1
    if (isSkippedElement && isElementLate(element)) {
        if (lateDataOutputTag != null){
            sideOutput(element);
        } else {
            this.numLateRecordsDropped.inc();
        }
    }
}

MergingWindowAssigner部分的处理逻辑

在分析MergingWindowAssigner之前我们需要提前看下MergingWindowSet类。该类负责处理整个合并窗口的过程,以及存放合并窗口的结果。

MergingWindowSet中用一个重要的变量为mapping,定义为:

private final Map<W, W> mapping;

该类负责存放一个window到window的映射。具体这个映射有什么作用呢?
这里先提前介绍下windowState,windowState是一个HeapListState。它的内部具有三个值:key,namespace和value。对于给定的key和namespace,可以获取到属于他们的一组值。在windowOperator中,key为keyBy函数对应的元素字段的值,namspace为当前的window。
可以认为HeapListState是一个Map<K, Map<NS, V>>的数据结构,对于给定的key,每个window和它里面的数据以Map<NS, V>形式储存。Window对象本身作为Namespace,相当于Map<NS, V>这个map中的key,维护了window和数据之间的关系。
但是问题来了,对于MergingWindowAssigner(SessionWindow),窗口是一直被合并的,合并之后的窗口和原先作为key保存数据对应关系的窗口肯定是不同的。该窗口对应的数据无法被合并之后的窗口获取到。
为了解决这个问题,有如下两种方案:

  1. 每次window合并之后,将原先window下的数据转移到合并后window下。这样涉及到很频繁的HashMap操作,性能开销较大。
  2. 选择一个固定的window(这里叫做状态window),来代表以后每次合并后的window,作为key来维护合并后window内的数据。同时再建立一个映射,维护状态window和合并后窗口之间的关系。Flink采用了这种做法。上述的mapping变量正是用于维护这个对应关系的。mapping的key为合并后的窗口,value为状态窗口。
    Flink使用一组被合并窗口中的第一个窗口,作为他们合并之后窗口的状态窗口。每次合并窗口操作发生之后,将mapping中被合并的窗口的映射关系移除,同时将合并后窗口和状态窗口的对应关系写入mapping

还有一个变量是initialMapping,它和mapping相同,只不过是在创建MergingWindowSet的时候被赋值,和mapping的值对比可用于判断mapping是否发生了变更,是否需要持久化。

接下来分析下它的addWindow方法,如下所示:

public W addWindow(W newWindow, MergeFunction<W> mergeFunction) throws Exception {

    List<W> windows = new ArrayList<>();

    // mapping 在创建MergingWindowSet对象的时候会读取ListState。该State保存了已合并的window
    windows.addAll(this.mapping.keySet());
    // 将新window增加进来
    windows.add(newWindow);

    // 此变量保存窗口合并的结果
    final Map<W, Collection<W>> mergeResults = new HashMap<>();
    // 调用windowAssigner的mergeWindows方法,合并后的结果被放入了mergeResults中
    windowAssigner.mergeWindows(windows,
            new MergingWindowAssigner.MergeCallback<W>() {
                @Override
                public void merge(Collection<W> toBeMerged, W mergeResult) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Merging {} into {}", toBeMerged, mergeResult);
                    }
                    mergeResults.put(mergeResult, toBeMerged);
                }
            });

    W resultWindow = newWindow;
    boolean mergedNewWindow = false;

    // perform the merge
    for (Map.Entry<W, Collection<W>> c: mergeResults.entrySet()) {
        // 合并之后的窗口
        W mergeResult = c.getKey();
        // 被合并的窗口
        Collection<W> mergedWindows = c.getValue();

        // if our new window is in the merged windows make the merge result the
        // result window
        // 如果被合并的window中包含有当前需要加入set的window(newWindow),那么窗口合并的结果就是mergeResult,将它赋给resultWindow
        if (mergedWindows.remove(newWindow)) {
            mergedNewWindow = true;
            resultWindow = mergeResult;
        }

        // pick any of the merged windows and choose that window's state window
        // as the state window for the merge result
        // 获取第一个被合并的窗口作为合并后窗口的状态窗口
        W mergedStateWindow = this.mapping.get(mergedWindows.iterator().next());

        // figure out the state windows that we are merging
        // 逐个寻找mapping的key中是否有本次操作已合并的window。
        // 这里将他们转移到mergedStateWindows变量中
        List<W> mergedStateWindows = new ArrayList<>();
        for (W mergedWindow: mergedWindows) {
            W res = this.mapping.remove(mergedWindow);
            if (res != null) {
                mergedStateWindows.add(res);
            }
        }

        // 在mapping中设置合并后的窗口和它的状态窗口的对应关系
        this.mapping.put(mergeResult, mergedStateWindow);

        // don't put the target state window into the merged windows
        // mergedStateWindows去掉状态窗口
        mergedStateWindows.remove(mergedStateWindow);

        // don't merge the new window itself, it never had any state associated with it
        // i.e. if we are only merging one pre-existing window into itself
        // without extending the pre-existing window
        // 如果被合并的窗口不包含已合并窗口,或者被合并窗口列表大小不为1的时候
        // 此处条件一定会满足,因为TimeWindow的mergeWindows方法保证了mergedWindows的size大于1的时候才会调用MergeCallback回调函数
        if (!(mergedWindows.contains(mergeResult) && mergedWindows.size() == 1)) {
            mergeFunction.merge(mergeResult,
                    mergedWindows,
                    this.mapping.get(mergeResult),
                    mergedStateWindows);
        }
    }

    // the new window created a new, self-contained window without merging
    // 如果一个window合并过后还是他自己(第一次调用addWindow,或者是开始了一个新的session),会进入这个if分支
    if (mergeResults.isEmpty() || (resultWindow.equals(newWindow) && !mergedNewWindow)) {
        this.mapping.put(resultWindow, resultWindow);
    }

    // 返回合并后的window
    return resultWindow;
}

让我们回到processElement方法处理MergingWindowAssigner的分支部分。代码如下:

// 获取MergingWindowSet对象,负责存放合并的窗口
MergingWindowSet<W> mergingWindows = getMergingWindowSet();

for (W window: elementWindows) {

    // adding the new window might result in a merge, in that case the actualWindow
    // is the merged window and we work with that. If we don't merge then
    // actualWindow == window
    // 调用addWindow方法,返回被合并的window
    W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
        @Override
        public void merge(W mergeResult,
                Collection<W> mergedWindows, W stateWindowResult,
                Collection<W> mergedStateWindows) throws Exception {

            // 检测两种异常情况,在新元素加入window的时候,合并后窗口的最大时间戳不可能比当前waterwark或者处理时间还小的
            // 如果是event time类型,并且合并后的窗口的maxTimestamp+allowedLateness小于等于当前的watermark会报错
            if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
                throw new UnsupportedOperationException("The end timestamp of an " +
                        "event-time window cannot become earlier than the current watermark " +
                        "by merging. Current watermark: " + internalTimerService.currentWatermark() +
                        " window: " + mergeResult);
            } else if (!windowAssigner.isEventTime()) {
                long currentProcessingTime = internalTimerService.currentProcessingTime();
                // 如果是基于processing time的窗口,合并后窗口的maxTimestamp小于等于当前处理时间的话,程序会报错
                if (mergeResult.maxTimestamp() <= currentProcessingTime) {
                    throw new UnsupportedOperationException("The end timestamp of a " +
                        "processing-time window cannot become earlier than the current processing time " +
                        "by merging. Current processing time: " + currentProcessingTime +
                        " window: " + mergeResult);
                }
            }

            // 设置triggerContext的key和window
            triggerContext.key = key;
            triggerContext.window = mergeResult;

            // 调用具体trigger实现类的注册定时器方法
            // 注册的timer触发事件为mergeResult窗口的maxTimestamp
            triggerContext.onMerge(mergedWindows);

            // 移除所有触发时间为被合并window的maxTimestamp的定时器
            for (W m: mergedWindows) {
                triggerContext.window = m;
                triggerContext.clear();
                deleteCleanupTimer(m);
            }

            // merge the merged state windows into the newly resulting state window
            // 将所有被合并的状态window对应的数据写入到合并后的window对应的状态window中
            windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
        }
    });

    // drop if the window is already late
    // 如果window迟到,则清除迟到的window。isWindowLate方法后面会分析
    if (isWindowLate(actualWindow)) {
        mergingWindows.retireWindow(actualWindow);
        continue;
    }
    // 接下来将处理此元素,需要设置为false
    isSkippedElement = false;

    // 获取状态window,状态window作为key,标记存储合并窗口内的数据
    W stateWindow = mergingWindows.getStateWindow(actualWindow);
    if (stateWindow == null) {
        throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
    }

    // 储存元素的值
    windowState.setCurrentNamespace(stateWindow);
    windowState.add(element.getValue());

    triggerContext.key = key;
    triggerContext.window = actualWindow;

    // 调用trigger的onElement
    TriggerResult triggerResult = triggerContext.onElement(element);

    if (triggerResult.isFire()) {
        ACC contents = windowState.get();
        if (contents == null) {
            continue;
        }
        // 如果触发计算,则在此提取出合并后窗口的内容,传入userFunction进行计算
        emitWindowContents(actualWindow, contents);
    }
    // 如果触发了purge操作,则需要清空window状态
    if (triggerResult.isPurge()) {
        windowState.clear();
    }
    // 注册window的cleanup timer 稍后分析
    registerCleanupTimer(actualWindow);
}

// need to make sure to update the merging state in state
// 将已合并的window持久化到ListState中
mergingWindows.persist();

非MergingWindowAssigner部分的处理逻辑

for (W window: elementWindows) {

    // drop if the window is already late
    // 判断window是否迟到,后面分析isWindowLate
    if (isWindowLate(window)) {
        continue;
    }
    // 标记该元素得到了处理
    isSkippedElement = false;

    // windowState为HeapListState
    // HeapListState为内存中存储的分区化的链表状态(ListState),使用namespace区分不同窗口的数据。可以理解为一个Map,key为window对象,value为元素的值
    windowState.setCurrentNamespace(window);
    windowState.add(element.getValue());

    // 设置trigger上下文对象的key和window
    triggerContext.key = key;
    triggerContext.window = window;

    // 调用trigger的onElement方法,询问trigger新element到来的时候需要作出什么动作。
    TriggerResult triggerResult = triggerContext.onElement(element);

    // isFire表示需要触发计算
    if (triggerResult.isFire()) {
        // 取出windowState当前namespace下所有的元素。即当前window下所有的元素。
        ACC contents = windowState.get();
        if (contents == null) {
            continue;
        }
        // 使用用户传入的处理函数来计算window内数据,稍后分析
        emitWindowContents(window, contents);
    }

    // 如果触发器返回需要清空数据,则删除window中所有的数据
    if (triggerResult.isPurge()) {
        windowState.clear();
    }
    // 注册timer,当前时间已经过了window的cleanup时间(后面有cleanup time的含义),会根据窗口的类型调用对应的onProcessingTime方法或者是onEventTime方法
    registerCleanupTimer(window);
}

isWindowLate函数,负责判断window是否迟到。迟到的window会被忽略。

protected boolean isWindowLate(W window) {
    // 如果window类型为eventtime并且window的cleanup时间比当前watermark早,说明window已经迟到
    return (windowAssigner.isEventTime() && (cleanupTime(window) <= internalTimerService.currentWatermark()));
}

cleanupTime方法,用来获取window的cleanup time

private long cleanupTime(W window) {
    if (windowAssigner.isEventTime()) {
        // cleanup time为window的end时间 + 允许迟到的时间 - 1
        long cleanupTime = window.maxTimestamp() + allowedLateness;
        // 如果cleanupTime溢出(返回负值),则使用Long.MAX_VALUE
        return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
    } else {
        return window.maxTimestamp();
    }
}

registerCleanupTimer方法,清除window在cleanup time时刻注册的定时器

protected void registerCleanupTimer(W window) {
    long cleanupTime = cleanupTime(window);
    if (cleanupTime == Long.MAX_VALUE) {
        // don't set a GC timer for "end of time"
        return;
    }

    // 增加对应onProcessingTime或onEventTime定时器
    if (windowAssigner.isEventTime()) {
        // 当前的watermark如果大于cleanupTime之时,会调用trigger的onElementTime方法
        triggerContext.registerEventTimeTimer(cleanupTime);
    } else {
        // 当前的系统时间大于cleanupTime之时,会调用trigger的onProcessingTime方法
        triggerContext.registerProcessingTimeTimer(cleanupTime);
    }
}

emitWindowContents,收集window中的元素,调用用户编写的处理函数。代码如下:

private void emitWindowContents(W window, ACC contents) throws Exception {
    timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
    processContext.window = window;
    // 此处调用用户编写的处理函数
    // 此处用户的函数,比如ProcessWindowFunction被InternalIterableProcessWindowFunction包装了
    userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector);
}

onEventTime

下面是onEventTime方法的代码。当event time触发计算的时候会调用该方法。

public void onEventTime(InternalTimer<K, W> timer) throws Exception {
    // 获取key和window
    triggerContext.key = timer.getKey();
    triggerContext.window = timer.getNamespace();

    MergingWindowSet<W> mergingWindows;

    // 如果是MergingWindowAssigner(Session Window)
    if (windowAssigner instanceof MergingWindowAssigner) {
        mergingWindows = getMergingWindowSet();
        // 获取状态window
        W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
        if (stateWindow == null) {
            // Timer firing for non-existent window, this can only happen if a
            // trigger did not clean up timers. We have already cleared the merging
            // window and therefore the Trigger state, however, so nothing to do.
            return;
        } else {
            // 设置当前window为triggerContext.window,方便以后调用windowState.get()方法获取window中的元素
            windowState.setCurrentNamespace(stateWindow);
        }
    } else {
        // 设置当前window为triggerContext.window,方便以后调用windowState.get()方法获取window中的元素
        windowState.setCurrentNamespace(triggerContext.window);
        mergingWindows = null;
    }
    // 调用trigger的onEventTime方法
    TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());

    // 此处为触发计算的逻辑
    if (triggerResult.isFire()) {
        ACC contents = windowState.get();
        if (contents != null) {
            emitWindowContents(triggerContext.window, contents);
        }
    }

    // 如果触发了purge操作,则清空window中的内容
    if (triggerResult.isPurge()) {
        windowState.clear();
    }

    // 如果是event time类型,并且定时器触发时间是window的cleanup时间的时候,意味着该窗口的数据已经处理完毕,需要清除该窗口的所有状态
    if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
        clearAllState(triggerContext.window, windowState, mergingWindows);
    }

    // 持久化window合并状态
    if (mergingWindows != null) {
        // need to make sure to update the merging state in state
        mergingWindows.persist();
    }
}

onProcessingTime

onProcessingTime方法和onEventTime方法相比,除了调用triggeronProcessingTime方法这一处不同外,其他的的逻辑基本类似,不再赘述。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,928评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,192评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,468评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,186评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,295评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,374评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,403评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,186评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,610评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,906评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,075评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,755评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,393评论 3 320
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,079评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,313评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,934评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,963评论 2 351

推荐阅读更多精彩内容

  • Flink源码分析系列文档目录 请点击:Flink 源码分析系列文档目录[https://www.jianshu....
    AlienPaul阅读 2,787评论 0 2
  • 原文连接 https://ci.apache.org/projects/flink/flink-docs-rele...
    Alex90阅读 3,456评论 0 5
  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,094评论 1 32
  • 了解Flink是什么,Flink应用程序运行的多样化,对比业界常用的流处理框架,Flink的发展趋势,Flink生...
    JavaEdge阅读 5,070评论 1 18
  • Basic API Concepts Flink程序是实现基于分布式采集的转换程序(如:过滤器,映射,更新状态,连...
    MiyoungCheng阅读 1,872评论 0 0