序
本文主要研究一下flink的AbstractTtlState
InternalKvState
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/internal/InternalKvState.java
/**
* The {@code InternalKvState} is the root of the internal state type hierarchy, similar to the
* {@link State} being the root of the public API state hierarchy.
*
* <p>The internal state classes give access to the namespace getters and setters and access to
* additional functionality, like raw value access or state merging.
*
* <p>The public API state hierarchy is intended to be programmed against by Flink applications.
* The internal state hierarchy holds all the auxiliary methods that are used by the runtime and not
* intended to be used by user applications. These internal methods are considered of limited use to users and
* only confusing, and are usually not regarded as stable across releases.
*
* <p>Each specific type in the internal state hierarchy extends the type from the public
* state hierarchy:
*
* <pre>
* State
* |
* +-------------------InternalKvState
* | |
* MergingState |
* | |
* +-----------------InternalMergingState
* | |
* +--------+------+ |
* | | |
* ReducingState ListState +-----+-----------------+
* | | | |
* +-----------+ +----------- -----------------InternalListState
* | |
* +---------InternalReducingState
* </pre>
*
* @param <K> The type of key the state is associated to
* @param <N> The type of the namespace
* @param <V> The type of values kept internally in state
*/
public interface InternalKvState<K, N, V> extends State {
TypeSerializer<K> getKeySerializer();
TypeSerializer<N> getNamespaceSerializer();
TypeSerializer<V> getValueSerializer();
void setCurrentNamespace(N namespace);
byte[] getSerializedValue(
final byte[] serializedKeyAndNamespace,
final TypeSerializer<K> safeKeySerializer,
final TypeSerializer<N> safeNamespaceSerializer,
final TypeSerializer<V> safeValueSerializer) throws Exception;
}
- InternalKvState接口定义内部的kvState要实现的方法,这里主要是getKeySerializer、getNamespaceSerializer、getValueSerializer、setCurrentNamespace、getSerializedValue
AbstractTtlState
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/AbstractTtlState.java
/**
* Base class for TTL logic wrappers of state objects.
*
* @param <K> The type of key the state is associated to
* @param <N> The type of the namespace
* @param <SV> The type of values kept internally in state without TTL
* @param <TTLSV> The type of values kept internally in state with TTL
* @param <S> Type of originally wrapped state object
*/
abstract class AbstractTtlState<K, N, SV, TTLSV, S extends InternalKvState<K, N, TTLSV>>
extends AbstractTtlDecorator<S>
implements InternalKvState<K, N, SV> {
private final TypeSerializer<SV> valueSerializer;
AbstractTtlState(S original, StateTtlConfig config, TtlTimeProvider timeProvider, TypeSerializer<SV> valueSerializer) {
super(original, config, timeProvider);
this.valueSerializer = valueSerializer;
}
<SE extends Throwable, CE extends Throwable, T> T getWithTtlCheckAndUpdate(
SupplierWithException<TtlValue<T>, SE> getter,
ThrowingConsumer<TtlValue<T>, CE> updater) throws SE, CE {
return getWithTtlCheckAndUpdate(getter, updater, original::clear);
}
@Override
public TypeSerializer<K> getKeySerializer() {
return original.getKeySerializer();
}
@Override
public TypeSerializer<N> getNamespaceSerializer() {
return original.getNamespaceSerializer();
}
@Override
public TypeSerializer<SV> getValueSerializer() {
return valueSerializer;
}
@Override
public void setCurrentNamespace(N namespace) {
original.setCurrentNamespace(namespace);
}
@Override
public byte[] getSerializedValue(
byte[] serializedKeyAndNamespace,
TypeSerializer<K> safeKeySerializer,
TypeSerializer<N> safeNamespaceSerializer,
TypeSerializer<SV> safeValueSerializer) {
throw new FlinkRuntimeException("Queryable state is not currently supported with TTL.");
}
@Override
public void clear() {
original.clear();
}
}
- AbstractTtlState实现了InternalKvState接口的方法,同时继承了AbstractTtlDecorator;它提供了getWithTtlCheckAndUpdate方法,该方法主要是调用AbstractTtlDecorator的getWithTtlCheckAndUpdate来实现TTL逻辑
AbstractTtlDecorator
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
/**
* Base class for TTL logic wrappers.
*
* @param <T> Type of originally wrapped object
*/
abstract class AbstractTtlDecorator<T> {
/** Wrapped original state handler. */
final T original;
final StateTtlConfig config;
final TtlTimeProvider timeProvider;
/** Whether to renew expiration timestamp on state read access. */
final boolean updateTsOnRead;
/** Whether to renew expiration timestamp on state read access. */
final boolean returnExpired;
/** State value time to live in milliseconds. */
final long ttl;
AbstractTtlDecorator(
T original,
StateTtlConfig config,
TtlTimeProvider timeProvider) {
Preconditions.checkNotNull(original);
Preconditions.checkNotNull(config);
Preconditions.checkNotNull(timeProvider);
this.original = original;
this.config = config;
this.timeProvider = timeProvider;
this.updateTsOnRead = config.getUpdateType() == StateTtlConfig.UpdateType.OnReadAndWrite;
this.returnExpired = config.getStateVisibility() == StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp;
this.ttl = config.getTtl().toMilliseconds();
}
<V> V getUnexpired(TtlValue<V> ttlValue) {
return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue();
}
<V> boolean expired(TtlValue<V> ttlValue) {
return TtlUtils.expired(ttlValue, ttl, timeProvider);
}
<V> TtlValue<V> wrapWithTs(V value) {
return TtlUtils.wrapWithTs(value, timeProvider.currentTimestamp());
}
<V> TtlValue<V> rewrapWithNewTs(TtlValue<V> ttlValue) {
return wrapWithTs(ttlValue.getUserValue());
}
<SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> V getWithTtlCheckAndUpdate(
SupplierWithException<TtlValue<V>, SE> getter,
ThrowingConsumer<TtlValue<V>, CE> updater,
ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
TtlValue<V> ttlValue = getWrappedWithTtlCheckAndUpdate(getter, updater, stateClear);
return ttlValue == null ? null : ttlValue.getUserValue();
}
<SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> TtlValue<V> getWrappedWithTtlCheckAndUpdate(
SupplierWithException<TtlValue<V>, SE> getter,
ThrowingConsumer<TtlValue<V>, CE> updater,
ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
TtlValue<V> ttlValue = getter.get();
if (ttlValue == null) {
return null;
} else if (expired(ttlValue)) {
stateClear.run();
if (!returnExpired) {
return null;
}
} else if (updateTsOnRead) {
updater.accept(rewrapWithNewTs(ttlValue));
}
return ttlValue;
}
}
- AbstractTtlDecorator对TTL逻辑进行了封装,其主要的逻辑在getWrappedWithTtlCheckAndUpdate方法,它在每次访问的时候对于非null的value会先判断下是否expired(
TtlUtils.expired(ttlValue, ttl, timeProvider)
),如果过期了则调用stateClear(ThrowingRunnable类型,这里是original::clear
),对于非returnExpired的则直接返回null;对于没有expired的,则判断是否updateTsOnRead,若是则调用updater进行处理,最后返回ttlValue
TtlUtils.expired
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/TtlUtils.java
/** Common functions related to State TTL. */
class TtlUtils {
static <V> boolean expired(@Nullable TtlValue<V> ttlValue, long ttl, TtlTimeProvider timeProvider) {
return expired(ttlValue, ttl, timeProvider.currentTimestamp());
}
static <V> boolean expired(@Nullable TtlValue<V> ttlValue, long ttl, long currentTimestamp) {
return ttlValue != null && expired(ttlValue.getLastAccessTimestamp(), ttl, currentTimestamp);
}
private static boolean expired(long ts, long ttl, long currentTimestamp) {
return getExpirationTimestamp(ts, ttl) <= currentTimestamp;
}
private static long getExpirationTimestamp(long ts, long ttl) {
long ttlWithoutOverflow = ts > 0 ? Math.min(Long.MAX_VALUE - ts, ttl) : ttl;
return ts + ttlWithoutOverflow;
}
//......
}
- TtlUtils的expired方法主要是通过getExpirationTimestamp获取过期时间,然后跟currentTimestamp进行比较;而getExpirationTimestamp这里是根据ttlValue.getLastAccessTimestamp()及ttl值进行判断,这里利用Long.MAX_VALUE处理了overflow的情况,防止最后的值超出long类型的最大范围
ThrowingRunnable
flink-core-1.7.0-sources.jar!/org/apache/flink/util/function/ThrowingRunnable.java
/**
* Similar to a {@link Runnable}, this interface is used to capture a block of code
* to be executed. In contrast to {@code Runnable}, this interface allows throwing
* checked exceptions.
*/
@PublicEvolving
@FunctionalInterface
public interface ThrowingRunnable<E extends Throwable> {
/**
* The work method.
*
* @throws E Exceptions may be thrown.
*/
void run() throws E;
/**
* Converts a {@link ThrowingRunnable} into a {@link Runnable} which throws all checked exceptions
* as unchecked.
*
* @param throwingRunnable to convert into a {@link Runnable}
* @return {@link Runnable} which throws all checked exceptions as unchecked.
*/
static Runnable unchecked(ThrowingRunnable<?> throwingRunnable) {
return () -> {
try {
throwingRunnable.run();
} catch (Throwable t) {
ExceptionUtils.rethrow(t);
}
};
}
}
- stateClear是ThrowingRunnable类型,它与Runnable不同,ThrowingRunnable允许抛出checked exceptions,它提供了一个unchecked的静态方法,用于将非Error及非RuntimeException的转为RuntimeException抛出来,从而将ThrowingRunnable转换为Runnable
小结
- InternalKvState接口定义内部的kvState要实现的方法,这里主要是getKeySerializer、getNamespaceSerializer、getValueSerializer、setCurrentNamespace、getSerializedValue
- AbstractTtlState实现了InternalKvState接口的方法,同时继承了AbstractTtlDecorator;它提供了getWithTtlCheckAndUpdate方法,该方法主要是调用AbstractTtlDecorator的getWithTtlCheckAndUpdate来实现TTL逻辑
- AbstractTtlDecorator的getWrappedWithTtlCheckAndUpdate方法,在每次访问的时候对于非null的value会先判断下是否expired(
TtlUtils.expired(ttlValue, ttl, timeProvider)
),如果过期了则调用stateClear(ThrowingRunnable类型,这里是original::clear
),对于非returnExpired的则直接返回null;对于没有expired的,则判断是否updateTsOnRead,若是则调用updater进行处理,最后返回ttlValue