在flink的ProcessFunction中,我们可以注册定时器设定延迟多长时间后执行某类操作,例如像这种:
context.timerService().registerEventTimeTimer(context.timestamp() + 10000);
很好奇这种定时器内部是如何进行工作的,带着这种疑问我们来看看源码。
首先,在源码中,所有的定时器管理都是通过InternalTimerServiceImpl.java这个类来实现的。
public void registerProcessingTimeTimer(N namespace, long time) {
InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
// check if we need to re-schedule our timer to earlier
if (time < nextTriggerTime) {
if (nextTimer != null) {
nextTimer.cancel(false);
}
nextTimer = processingTimeService.registerTimer(time, this);
}
}
}
@Override
public void registerEventTimeTimer(N namespace, long time) {
eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}
可以看到根据ProcessingTime和EventTime的不同,分别加入不同的queue队列中,其中如果是ProcessingTime的话,他还会判断当前queue中第一个元素的触发时间是否比当前加入的注册时间晚,如果晚于当前新加入的时间,则把下次触发时间改成当前的新加入的注册时间。
我们接下来看queue队列是如何实现的,processingTimeTimersQueue和eventTimeTimersQueue实现原理都是一样的,它的实现类是HeapPriorityQueueSet.java:
* Adds the element to the queue. In contrast to the superclass and to maintain set semantics, this happens only if
* no such element is already contained (determined by {@link #equals(Object)}).
*
* @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
* Only returns <code>false</code> iff the head element was not changed by this operation.
*/
@Override
public boolean add(@Nonnull T element) {
return getDedupMapForElement(element).putIfAbsent(element, element) == null && super.add(element);
}
/**
* In contrast to the superclass and to maintain set semantics, removal here is based on comparing the given element
* via {@link #equals(Object)}.
*
* @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
* Only returns <code>false</code> iff the head element was not changed by this operation.
*/
@Override
public boolean remove(@Nonnull T toRemove) {
T storedElement = getDedupMapForElement(toRemove).remove(toRemove);
return storedElement != null && super.remove(storedElement);
}
这里的逻辑简单概况就是先简单判断该注册时间是否有重复,如果没有重复就继续往里添加,再来看super.add(element)这个方法的实现:
@Override
public boolean add(@Nonnull T toAdd) {
addInternal(toAdd);
return toAdd.getInternalIndex() == getHeadElementIndex();
}
@Override
protected void addInternal(@Nonnull T element) {
final int newSize = increaseSizeByOne();
moveElementToIdx(element, newSize);
siftUp(newSize);
}
//小顶堆排序
private void siftUp(int idx) {
final T[] heap = this.queue;
final T currentElement = heap[idx];
int parentIdx = idx >>> 1;
while (parentIdx > 0 && isElementPriorityLessThen(currentElement, heap[parentIdx])) {
moveElementToIdx(heap[parentIdx], idx);
idx = parentIdx;
parentIdx >>>= 1;
}
moveElementToIdx(currentElement, idx);
}
重点看siftUp这个方法,这个方法实现的的就是堆排序并且还是小顶堆排序,先把新的定时器放到数组末尾,然后就进行小顶堆排序,永远把最小的元素(定时器)排到最前面,这样就最早触发。至此逻辑就很清楚了:添加定时器的时候,首先会判断是否有重复,然后进行小顶堆排序,把最小的定时器放到第一个。
接下来我们继续看定时器是如何触发的,先看InternalTimerServiceImpl.java:
public void advanceWatermark(long time) throws Exception {
currentWatermark = time;
InternalTimer<K, N> timer;
while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
eventTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onEventTime(timer);
}
}
在收到watermark之后开始执行advanceWatermark方法,这时候从eventTimeTimersQueue中获得第一个定时器(之前加入的时候已经保证了弹出的第一个永远是时间最早的定时器)与当前watermark时间比较,如果小于watermark则取出该定时器执行onEventTime也就是ProcessFunction中的onTimer方法。
取出第一个定时器之后,会触发eventTimeTimersQueue中的小顶堆再次排序:
public T poll() {
return size() > 0 ? removeInternal(getHeadElementIndex()) : null;
}
@Override
protected T removeInternal(int removeIdx) {
T[] heap = this.queue;
T removedValue = heap[removeIdx];
assert removedValue.getInternalIndex() == removeIdx;
final int oldSize = size;
if (removeIdx != oldSize) {
T element = heap[oldSize];
moveElementToIdx(element, removeIdx);
adjustElementAtIndex(element, removeIdx);
}
heap[oldSize] = null;
--size;
return removedValue;
}
private void adjustElementAtIndex(T element, int index) {
siftDown(index);
if (queue[index] == element) {
siftUp(index);
}
}
private void siftUp(int idx) {
final T[] heap = this.queue;
final T currentElement = heap[idx];
int parentIdx = idx >>> 1;
while (parentIdx > 0 && isElementPriorityLessThen(currentElement, heap[parentIdx])) {
moveElementToIdx(heap[parentIdx], idx);
idx = parentIdx;
parentIdx >>>= 1;
}
moveElementToIdx(currentElement, idx);
}
private void siftDown(int idx) {
final T[] heap = this.queue;
final int heapSize = this.size;
final T currentElement = heap[idx];
int firstChildIdx = idx << 1;
int secondChildIdx = firstChildIdx + 1;
if (isElementIndexValid(secondChildIdx, heapSize) &&
isElementPriorityLessThen(heap[secondChildIdx], heap[firstChildIdx])) {
firstChildIdx = secondChildIdx;
}
while (isElementIndexValid(firstChildIdx, heapSize) &&
isElementPriorityLessThen(heap[firstChildIdx], currentElement)) {
moveElementToIdx(heap[firstChildIdx], idx);
idx = firstChildIdx;
firstChildIdx = idx << 1;
secondChildIdx = firstChildIdx + 1;
if (isElementIndexValid(secondChildIdx, heapSize) &&
isElementPriorityLessThen(heap[secondChildIdx], heap[firstChildIdx])) {
firstChildIdx = secondChildIdx;
}
}
moveElementToIdx(currentElement, idx);
}
这里简单概况来说就是弹出第一个定时器,同时触发小顶堆再次排序,把数组中剩余的时间最小的定时器再次放到第一个位置.
eventTime触发的定时器逻辑这里就讲完了,我们再看看processTime的触发逻辑,在InternalTimerServiceImpl.java中:
@Override
public void onProcessingTime(long time) throws Exception {
// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
// inside the callback.
nextTimer = null;
InternalTimer<K, N> timer;
while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
processingTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onProcessingTime(timer);
}
if (timer != null && nextTimer == null) {
nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
}
}
processTime是依靠自身线程注册的定时器来触发的,processingTimeTimersQueue的逻辑与eventTimeTimersQueue一样,这里就不多讲了,当弹出第一个定时器执行的时候,会立即注册下一个定时器,保证下一个定时器顺利按时执行
总计一下,在ProcessFunction中,eventTime依靠watermark来触发,processTime依靠自身线程注册的定时器触发,两者都是在添加定时器的时候,把定时器放入队列里面进行小顶堆排序,把时间最小的定时器放到第一个位置,最早触发。