flink中ProcessFunction的注册定时器功能

在flink的ProcessFunction中,我们可以注册定时器设定延迟多长时间后执行某类操作,例如像这种:
context.timerService().registerEventTimeTimer(context.timestamp() + 10000);
很好奇这种定时器内部是如何进行工作的,带着这种疑问我们来看看源码。
首先,在源码中,所有的定时器管理都是通过InternalTimerServiceImpl.java这个类来实现的。

    public void registerProcessingTimeTimer(N namespace, long time) {
        InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
        if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
            long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
            // check if we need to re-schedule our timer to earlier
            if (time < nextTriggerTime) {
                if (nextTimer != null) {
                    nextTimer.cancel(false);
                }
                nextTimer = processingTimeService.registerTimer(time, this);
            }
        }
    }

    @Override
    public void registerEventTimeTimer(N namespace, long time) {
        eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
    }

可以看到根据ProcessingTime和EventTime的不同,分别加入不同的queue队列中,其中如果是ProcessingTime的话,他还会判断当前queue中第一个元素的触发时间是否比当前加入的注册时间晚,如果晚于当前新加入的时间,则把下次触发时间改成当前的新加入的注册时间。
我们接下来看queue队列是如何实现的,processingTimeTimersQueue和eventTimeTimersQueue实现原理都是一样的,它的实现类是HeapPriorityQueueSet.java:

     * Adds the element to the queue. In contrast to the superclass and to maintain set semantics, this happens only if
     * no such element is already contained (determined by {@link #equals(Object)}).
     *
     * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
     * Only returns <code>false</code> iff the head element was not changed by this operation.
     */
    @Override
    public boolean add(@Nonnull T element) {
        return getDedupMapForElement(element).putIfAbsent(element, element) == null && super.add(element);
    }

    /**
     * In contrast to the superclass and to maintain set semantics, removal here is based on comparing the given element
     * via {@link #equals(Object)}.
     *
     * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
     * Only returns <code>false</code> iff the head element was not changed by this operation.
     */
    @Override
    public boolean remove(@Nonnull T toRemove) {
        T storedElement = getDedupMapForElement(toRemove).remove(toRemove);
        return storedElement != null && super.remove(storedElement);
    }

这里的逻辑简单概况就是先简单判断该注册时间是否有重复,如果没有重复就继续往里添加,再来看super.add(element)这个方法的实现:

@Override
    public boolean add(@Nonnull T toAdd) {
        addInternal(toAdd);
        return toAdd.getInternalIndex() == getHeadElementIndex();
    }
@Override
    protected void addInternal(@Nonnull T element) {
        final int newSize = increaseSizeByOne();
        moveElementToIdx(element, newSize);
        siftUp(newSize);
    }
//小顶堆排序
private void siftUp(int idx) {
        final T[] heap = this.queue;
        final T currentElement = heap[idx];
        int parentIdx = idx >>> 1;

        while (parentIdx > 0 && isElementPriorityLessThen(currentElement, heap[parentIdx])) {
            moveElementToIdx(heap[parentIdx], idx);
            idx = parentIdx;
            parentIdx >>>= 1;
        }

        moveElementToIdx(currentElement, idx);
    }

重点看siftUp这个方法,这个方法实现的的就是堆排序并且还是小顶堆排序,先把新的定时器放到数组末尾,然后就进行小顶堆排序,永远把最小的元素(定时器)排到最前面,这样就最早触发。至此逻辑就很清楚了:添加定时器的时候,首先会判断是否有重复,然后进行小顶堆排序,把最小的定时器放到第一个。
接下来我们继续看定时器是如何触发的,先看InternalTimerServiceImpl.java:

public void advanceWatermark(long time) throws Exception {
        currentWatermark = time;

        InternalTimer<K, N> timer;

        while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            eventTimeTimersQueue.poll();
            keyContext.setCurrentKey(timer.getKey());
            triggerTarget.onEventTime(timer);
        }
    }

在收到watermark之后开始执行advanceWatermark方法,这时候从eventTimeTimersQueue中获得第一个定时器(之前加入的时候已经保证了弹出的第一个永远是时间最早的定时器)与当前watermark时间比较,如果小于watermark则取出该定时器执行onEventTime也就是ProcessFunction中的onTimer方法。
取出第一个定时器之后,会触发eventTimeTimersQueue中的小顶堆再次排序:

public T poll() {
        return size() > 0 ? removeInternal(getHeadElementIndex()) : null;
    }

    @Override
    protected T removeInternal(int removeIdx) {
        T[] heap = this.queue;
        T removedValue = heap[removeIdx];

        assert removedValue.getInternalIndex() == removeIdx;

        final int oldSize = size;

        if (removeIdx != oldSize) {
            T element = heap[oldSize];
            moveElementToIdx(element, removeIdx);
            adjustElementAtIndex(element, removeIdx);
        }

        heap[oldSize] = null;

        --size;
        return removedValue;
    }

    private void adjustElementAtIndex(T element, int index) {
        siftDown(index);
        if (queue[index] == element) {
            siftUp(index);
        }
    }

    private void siftUp(int idx) {
        final T[] heap = this.queue;
        final T currentElement = heap[idx];
        int parentIdx = idx >>> 1;

        while (parentIdx > 0 && isElementPriorityLessThen(currentElement, heap[parentIdx])) {
            moveElementToIdx(heap[parentIdx], idx);
            idx = parentIdx;
            parentIdx >>>= 1;
        }

        moveElementToIdx(currentElement, idx);
    }

    private void siftDown(int idx) {
        final T[] heap = this.queue;
        final int heapSize = this.size;

        final T currentElement = heap[idx];
        int firstChildIdx = idx << 1;
        int secondChildIdx = firstChildIdx + 1;

        if (isElementIndexValid(secondChildIdx, heapSize) &&
            isElementPriorityLessThen(heap[secondChildIdx], heap[firstChildIdx])) {
            firstChildIdx = secondChildIdx;
        }

        while (isElementIndexValid(firstChildIdx, heapSize) &&
            isElementPriorityLessThen(heap[firstChildIdx], currentElement)) {
            moveElementToIdx(heap[firstChildIdx], idx);
            idx = firstChildIdx;
            firstChildIdx = idx << 1;
            secondChildIdx = firstChildIdx + 1;

            if (isElementIndexValid(secondChildIdx, heapSize) &&
                isElementPriorityLessThen(heap[secondChildIdx], heap[firstChildIdx])) {
                firstChildIdx = secondChildIdx;
            }
        }

        moveElementToIdx(currentElement, idx);
    }

这里简单概况来说就是弹出第一个定时器,同时触发小顶堆再次排序,把数组中剩余的时间最小的定时器再次放到第一个位置.
eventTime触发的定时器逻辑这里就讲完了,我们再看看processTime的触发逻辑,在InternalTimerServiceImpl.java中:

@Override
    public void onProcessingTime(long time) throws Exception {
        // null out the timer in case the Triggerable calls registerProcessingTimeTimer()
        // inside the callback.
        nextTimer = null;

        InternalTimer<K, N> timer;

        while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            processingTimeTimersQueue.poll();
            keyContext.setCurrentKey(timer.getKey());
            triggerTarget.onProcessingTime(timer);
        }

        if (timer != null && nextTimer == null) {
            nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
        }
    }

processTime是依靠自身线程注册的定时器来触发的,processingTimeTimersQueue的逻辑与eventTimeTimersQueue一样,这里就不多讲了,当弹出第一个定时器执行的时候,会立即注册下一个定时器,保证下一个定时器顺利按时执行

总计一下,在ProcessFunction中,eventTime依靠watermark来触发,processTime依靠自身线程注册的定时器触发,两者都是在添加定时器的时候,把定时器放入队列里面进行小顶堆排序,把时间最小的定时器放到第一个位置,最早触发。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容