序
本文主要研究一下flink的OperatorStateBackend
OperatorStateBackend
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/OperatorStateBackend.java
/**
* Interface that combines both, the user facing {@link OperatorStateStore} interface and the system interface
* {@link Snapshotable}
*
*/
public interface OperatorStateBackend extends
OperatorStateStore,
Snapshotable<SnapshotResult<OperatorStateHandle>, Collection<OperatorStateHandle>>,
Closeable,
Disposable {
@Override
void dispose();
}
- OperatorStateBackend接口继承了OperatorStateStore、Snapshotable、Closeable、Disposable接口
OperatorStateStore
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/OperatorStateStore.java
/**
* This interface contains methods for registering operator state with a managed store.
*/
@PublicEvolving
public interface OperatorStateStore {
<K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) throws Exception;
<S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
<S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
Set<String> getRegisteredStateNames();
Set<String> getRegisteredBroadcastStateNames();
// -------------------------------------------------------------------------------------------
// Deprecated methods
// -------------------------------------------------------------------------------------------
@Deprecated
<S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception;
@Deprecated
<T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception;
}
- OperatorStateStore定义了getBroadcastState、getListState、getUnionListState方法用于create或restore BroadcastState或者ListState;同时也定义了getRegisteredStateNames、getRegisteredBroadcastStateNames用于返回当前注册的state的名称
Snapshotable
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/Snapshotable.java
/**
* Interface for operators that can perform snapshots of their state.
*
* @param <S> Generic type of the state object that is created as handle to snapshots.
* @param <R> Generic type of the state object that used in restore.
*/
@Internal
public interface Snapshotable<S extends StateObject, R> extends SnapshotStrategy<S> {
/**
* Restores state that was previously snapshotted from the provided parameters. Typically the parameters are state
* handles from which the old state is read.
*
* @param state the old state to restore.
*/
void restore(@Nullable R state) throws Exception;
}
- Snapshotable接口继承了SnapshotStrategy接口,同时定义了restore方法用于restore state
SnapshotStrategy
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/SnapshotStrategy.java
/**
* Interface for different snapshot approaches in state backends. Implementing classes should ideally be stateless or at
* least threadsafe, i.e. this is a functional interface and is can be called in parallel by multiple checkpoints.
*
* @param <S> type of the returned state object that represents the result of the snapshot operation.
*/
@Internal
public interface SnapshotStrategy<S extends StateObject> {
/**
* Operation that writes a snapshot into a stream that is provided by the given {@link CheckpointStreamFactory} and
* returns a @{@link RunnableFuture} that gives a state handle to the snapshot. It is up to the implementation if
* the operation is performed synchronous or asynchronous. In the later case, the returned Runnable must be executed
* first before obtaining the handle.
*
* @param checkpointId The ID of the checkpoint.
* @param timestamp The timestamp of the checkpoint.
* @param streamFactory The factory that we can use for writing our state to streams.
* @param checkpointOptions Options for how to perform this checkpoint.
* @return A runnable future that will yield a {@link StateObject}.
*/
@Nonnull
RunnableFuture<S> snapshot(
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions) throws Exception;
}
- SnapshotStrategy定义了snapshot方法,给不同的snapshot策略去实现,这里要求snapshot结果返回的类型是StateObject类型
AbstractSnapshotStrategy
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/AbstractSnapshotStrategy.java
/**
* Abstract base class for implementing {@link SnapshotStrategy}, that gives a consistent logging across state backends.
*
* @param <T> type of the snapshot result.
*/
public abstract class AbstractSnapshotStrategy<T extends StateObject> implements SnapshotStrategy<SnapshotResult<T>> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractSnapshotStrategy.class);
private static final String LOG_SYNC_COMPLETED_TEMPLATE = "{} ({}, synchronous part) in thread {} took {} ms.";
private static final String LOG_ASYNC_COMPLETED_TEMPLATE = "{} ({}, asynchronous part) in thread {} took {} ms.";
/** Descriptive name of the snapshot strategy that will appear in the log outputs and {@link #toString()}. */
@Nonnull
protected final String description;
protected AbstractSnapshotStrategy(@Nonnull String description) {
this.description = description;
}
/**
* Logs the duration of the synchronous snapshot part from the given start time.
*/
public void logSyncCompleted(@Nonnull Object checkpointOutDescription, long startTime) {
logCompletedInternal(LOG_SYNC_COMPLETED_TEMPLATE, checkpointOutDescription, startTime);
}
/**
* Logs the duration of the asynchronous snapshot part from the given start time.
*/
public void logAsyncCompleted(@Nonnull Object checkpointOutDescription, long startTime) {
logCompletedInternal(LOG_ASYNC_COMPLETED_TEMPLATE, checkpointOutDescription, startTime);
}
private void logCompletedInternal(
@Nonnull String template,
@Nonnull Object checkpointOutDescription,
long startTime) {
long duration = (System.currentTimeMillis() - startTime);
LOG.debug(
template,
description,
checkpointOutDescription,
Thread.currentThread(),
duration);
}
@Override
public String toString() {
return "SnapshotStrategy {" + description + "}";
}
}
- AbstractSnapshotStrategy是个抽象类,它没有实现SnapshotStrategy定义的snapshot方法,这里只是提供了logSyncCompleted方法打印debug信息
StateObject
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/StateObject.java
/**
* Base of all handles that represent checkpointed state in some form. The object may hold
* the (small) state directly, or contain a file path (state is in the file), or contain the
* metadata to access the state stored in some external database.
*
* <p>State objects define how to {@link #discardState() discard state} and how to access the
* {@link #getStateSize() size of the state}.
*
* <p>State Objects are transported via RPC between <i>JobManager</i> and
* <i>TaskManager</i> and must be {@link java.io.Serializable serializable} to support that.
*
* <p>Some State Objects are stored in the checkpoint/savepoint metadata. For long-term
* compatibility, they are not stored via {@link java.io.Serializable Java Serialization},
* but through custom serializers.
*/
public interface StateObject extends Serializable {
void discardState() throws Exception;
long getStateSize();
}
- StateObject继承了Serializable接口,因为会通过rpc在JobManager及TaskManager之间进行传输;这个接口定义了discardState及getStateSize方法,discardState用于清理资源,而getStateSize用于返回state的大小
StreamStateHandle
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/StreamStateHandle.java
/**
* A {@link StateObject} that represents state that was written to a stream. The data can be read
* back via {@link #openInputStream()}.
*/
public interface StreamStateHandle extends StateObject {
/**
* Returns an {@link FSDataInputStream} that can be used to read back the data that
* was previously written to the stream.
*/
FSDataInputStream openInputStream() throws IOException;
}
- StreamStateHandle继承了StateObject接口,多定义了openInputStream方法
OperatorStateHandle
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/OperatorStateHandle.java
/**
* Interface of a state handle for operator state.
*/
public interface OperatorStateHandle extends StreamStateHandle {
/**
* Returns a map of meta data for all contained states by their name.
*/
Map<String, StateMetaInfo> getStateNameToPartitionOffsets();
/**
* Returns an input stream to read the operator state information.
*/
@Override
FSDataInputStream openInputStream() throws IOException;
/**
* Returns the underlying stream state handle that points to the state data.
*/
StreamStateHandle getDelegateStateHandle();
//......
}
- OperatorStateHandle继承了StreamStateHandle,它多定义了getStateNameToPartitionOffsets、getDelegateStateHandle方法,其中getStateNameToPartitionOffsets提供了state name到可用partitions的offset的映射信息
OperatorStreamStateHandle
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/OperatorStreamStateHandle.java
/**
* State handle for partitionable operator state. Besides being a {@link StreamStateHandle}, this also provides a
* map that contains the offsets to the partitions of named states in the stream.
*/
public class OperatorStreamStateHandle implements OperatorStateHandle {
private static final long serialVersionUID = 35876522969227335L;
/**
* unique state name -> offsets for available partitions in the handle stream
*/
private final Map<String, StateMetaInfo> stateNameToPartitionOffsets;
private final StreamStateHandle delegateStateHandle;
public OperatorStreamStateHandle(
Map<String, StateMetaInfo> stateNameToPartitionOffsets,
StreamStateHandle delegateStateHandle) {
this.delegateStateHandle = Preconditions.checkNotNull(delegateStateHandle);
this.stateNameToPartitionOffsets = Preconditions.checkNotNull(stateNameToPartitionOffsets);
}
@Override
public Map<String, StateMetaInfo> getStateNameToPartitionOffsets() {
return stateNameToPartitionOffsets;
}
@Override
public void discardState() throws Exception {
delegateStateHandle.discardState();
}
@Override
public long getStateSize() {
return delegateStateHandle.getStateSize();
}
@Override
public FSDataInputStream openInputStream() throws IOException {
return delegateStateHandle.openInputStream();
}
@Override
public StreamStateHandle getDelegateStateHandle() {
return delegateStateHandle;
}
//......
}
- OperatorStreamStateHandle实现了OperatorStateHandle接口,它定义了stateNameToPartitionOffsets属性(
Map<String, StateMetaInfo>
),而getStateNameToPartitionOffsets方法就是返回的stateNameToPartitionOffsets属性
SnapshotResult
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/SnapshotResult.java
/**
* This class contains the combined results from the snapshot of a state backend:
* <ul>
* <li>A state object representing the state that will be reported to the Job Manager to acknowledge the checkpoint.</li>
* <li>A state object that represents the state for the {@link TaskLocalStateStoreImpl}.</li>
* </ul>
*
* Both state objects are optional and can be null, e.g. if there was no state to snapshot in the backend. A local
* state object that is not null also requires a state to report to the job manager that is not null, because the
* Job Manager always owns the ground truth about the checkpointed state.
*/
public class SnapshotResult<T extends StateObject> implements StateObject {
private static final long serialVersionUID = 1L;
/** An singleton instance to represent an empty snapshot result. */
private static final SnapshotResult<?> EMPTY = new SnapshotResult<>(null, null);
/** This is the state snapshot that will be reported to the Job Manager to acknowledge a checkpoint. */
private final T jobManagerOwnedSnapshot;
/** This is the state snapshot that will be reported to the Job Manager to acknowledge a checkpoint. */
private final T taskLocalSnapshot;
/**
* Creates a {@link SnapshotResult} for the given jobManagerOwnedSnapshot and taskLocalSnapshot. If the
* jobManagerOwnedSnapshot is null, taskLocalSnapshot must also be null.
*
* @param jobManagerOwnedSnapshot Snapshot for report to job manager. Can be null.
* @param taskLocalSnapshot Snapshot for report to local state manager. This is optional and requires
* jobManagerOwnedSnapshot to be not null if this is not also null.
*/
private SnapshotResult(T jobManagerOwnedSnapshot, T taskLocalSnapshot) {
if (jobManagerOwnedSnapshot == null && taskLocalSnapshot != null) {
throw new IllegalStateException("Cannot report local state snapshot without corresponding remote state!");
}
this.jobManagerOwnedSnapshot = jobManagerOwnedSnapshot;
this.taskLocalSnapshot = taskLocalSnapshot;
}
public T getJobManagerOwnedSnapshot() {
return jobManagerOwnedSnapshot;
}
public T getTaskLocalSnapshot() {
return taskLocalSnapshot;
}
@Override
public void discardState() throws Exception {
Exception aggregatedExceptions = null;
if (jobManagerOwnedSnapshot != null) {
try {
jobManagerOwnedSnapshot.discardState();
} catch (Exception remoteDiscardEx) {
aggregatedExceptions = remoteDiscardEx;
}
}
if (taskLocalSnapshot != null) {
try {
taskLocalSnapshot.discardState();
} catch (Exception localDiscardEx) {
aggregatedExceptions = ExceptionUtils.firstOrSuppressed(localDiscardEx, aggregatedExceptions);
}
}
if (aggregatedExceptions != null) {
throw aggregatedExceptions;
}
}
@Override
public long getStateSize() {
return jobManagerOwnedSnapshot != null ? jobManagerOwnedSnapshot.getStateSize() : 0L;
}
@SuppressWarnings("unchecked")
public static <T extends StateObject> SnapshotResult<T> empty() {
return (SnapshotResult<T>) EMPTY;
}
public static <T extends StateObject> SnapshotResult<T> of(@Nullable T jobManagerState) {
return jobManagerState != null ? new SnapshotResult<>(jobManagerState, null) : empty();
}
public static <T extends StateObject> SnapshotResult<T> withLocalState(
@Nonnull T jobManagerState,
@Nonnull T localState) {
return new SnapshotResult<>(jobManagerState, localState);
}
}
- SnapshotResult类实现了StateObject接口,它包装了snapshot的结果,这里包括jobManagerOwnedSnapshot、taskLocalSnapshot;它实现的discardState方法,调用了jobManagerOwnedSnapshot及taskLocalSnapshot的discardState方法;getStateSize方法则返回的是jobManagerOwnedSnapshot的stateSize
DefaultOperatorStateBackend
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
/**
* Default implementation of OperatorStateStore that provides the ability to make snapshots.
*/
@Internal
public class DefaultOperatorStateBackend implements OperatorStateBackend {
private static final Logger LOG = LoggerFactory.getLogger(DefaultOperatorStateBackend.class);
/**
* The default namespace for state in cases where no state name is provided
*/
public static final String DEFAULT_OPERATOR_STATE_NAME = "_default_";
/**
* Map for all registered operator states. Maps state name -> state
*/
private final Map<String, PartitionableListState<?>> registeredOperatorStates;
/**
* Map for all registered operator broadcast states. Maps state name -> state
*/
private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;
/**
* CloseableRegistry to participate in the tasks lifecycle.
*/
private final CloseableRegistry closeStreamOnCancelRegistry;
/**
* Default serializer. Only used for the default operator state.
*/
private final JavaSerializer<Serializable> javaSerializer;
/**
* The user code classloader.
*/
private final ClassLoader userClassloader;
/**
* The execution configuration.
*/
private final ExecutionConfig executionConfig;
/**
* Flag to de/activate asynchronous snapshots.
*/
private final boolean asynchronousSnapshots;
/**
* Map of state names to their corresponding restored state meta info.
*
* <p>TODO this map can be removed when eager-state registration is in place.
* TODO we currently need this cached to check state migration strategies when new serializers are registered.
*/
private final Map<String, StateMetaInfoSnapshot> restoredOperatorStateMetaInfos;
/**
* Map of state names to their corresponding restored broadcast state meta info.
*/
private final Map<String, StateMetaInfoSnapshot> restoredBroadcastStateMetaInfos;
/**
* Cache of already accessed states.
*
* <p>In contrast to {@link #registeredOperatorStates} and {@link #restoredOperatorStateMetaInfos} which may be repopulated
* with restored state, this map is always empty at the beginning.
*
* <p>TODO this map should be moved to a base class once we have proper hierarchy for the operator state backends.
*
* @see <a href="https://issues.apache.org/jira/browse/FLINK-6849">FLINK-6849</a>
*/
private final HashMap<String, PartitionableListState<?>> accessedStatesByName;
private final Map<String, BackendWritableBroadcastState<?, ?>> accessedBroadcastStatesByName;
private final AbstractSnapshotStrategy<OperatorStateHandle> snapshotStrategy;
public DefaultOperatorStateBackend(
ClassLoader userClassLoader,
ExecutionConfig executionConfig,
boolean asynchronousSnapshots) {
this.closeStreamOnCancelRegistry = new CloseableRegistry();
this.userClassloader = Preconditions.checkNotNull(userClassLoader);
this.executionConfig = executionConfig;
this.javaSerializer = new JavaSerializer<>();
this.registeredOperatorStates = new HashMap<>();
this.registeredBroadcastStates = new HashMap<>();
this.asynchronousSnapshots = asynchronousSnapshots;
this.accessedStatesByName = new HashMap<>();
this.accessedBroadcastStatesByName = new HashMap<>();
this.restoredOperatorStateMetaInfos = new HashMap<>();
this.restoredBroadcastStateMetaInfos = new HashMap<>();
this.snapshotStrategy = new DefaultOperatorStateBackendSnapshotStrategy();
}
@Override
public Set<String> getRegisteredStateNames() {
return registeredOperatorStates.keySet();
}
@Override
public Set<String> getRegisteredBroadcastStateNames() {
return registeredBroadcastStates.keySet();
}
@Override
public void close() throws IOException {
closeStreamOnCancelRegistry.close();
}
@Override
public void dispose() {
IOUtils.closeQuietly(closeStreamOnCancelRegistry);
registeredOperatorStates.clear();
registeredBroadcastStates.clear();
}
// -------------------------------------------------------------------------------------------
// State access methods
// -------------------------------------------------------------------------------------------
@SuppressWarnings("unchecked")
@Override
public <K, V> BroadcastState<K, V> getBroadcastState(final MapStateDescriptor<K, V> stateDescriptor) throws StateMigrationException {
//......
}
@Override
public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
return getListState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
}
@Override
public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
return getListState(stateDescriptor, OperatorStateHandle.Mode.UNION);
}
@Nonnull
@Override
public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions) throws Exception {
long syncStartTime = System.currentTimeMillis();
RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshotRunner =
snapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
snapshotStrategy.logSyncCompleted(streamFactory, syncStartTime);
return snapshotRunner;
}
//......
}
- DefaultOperatorStateBackend实现了OperatorStateBackend接口
- getRegisteredStateNames方法返回的是registeredOperatorStates.keySet();getRegisteredBroadcastStateNames方法返回的是registeredBroadcastStates.keySet(),可以看到这两个都是基于内存的Map来实现的
- close方法主要是调用closeStreamOnCancelRegistry的close方法;dispose方法也会关闭closeStreamOnCancelRegistry,同时清空registeredOperatorStates及registeredBroadcastStates
- getListState及getUnionListState方法都调用了getListState(ListStateDescriptor<S> stateDescriptor,OperatorStateHandle.Mode mode)方法
- snapshot方法使用的snapshotStrategy是DefaultOperatorStateBackendSnapshotStrategy
DefaultOperatorStateBackend.getListState
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
private <S> ListState<S> getListState(
ListStateDescriptor<S> stateDescriptor,
OperatorStateHandle.Mode mode) throws StateMigrationException {
Preconditions.checkNotNull(stateDescriptor);
String name = Preconditions.checkNotNull(stateDescriptor.getName());
@SuppressWarnings("unchecked")
PartitionableListState<S> previous = (PartitionableListState<S>) accessedStatesByName.get(name);
if (previous != null) {
checkStateNameAndMode(
previous.getStateMetaInfo().getName(),
name,
previous.getStateMetaInfo().getAssignmentMode(),
mode);
return previous;
}
// end up here if its the first time access after execution for the
// provided state name; check compatibility of restored state, if any
// TODO with eager registration in place, these checks should be moved to restore()
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
TypeSerializer<S> partitionStateSerializer = Preconditions.checkNotNull(stateDescriptor.getElementSerializer());
@SuppressWarnings("unchecked")
PartitionableListState<S> partitionableListState = (PartitionableListState<S>) registeredOperatorStates.get(name);
if (null == partitionableListState) {
// no restored state for the state name; simply create new state holder
partitionableListState = new PartitionableListState<>(
new RegisteredOperatorStateBackendMetaInfo<>(
name,
partitionStateSerializer,
mode));
registeredOperatorStates.put(name, partitionableListState);
} else {
// has restored state; check compatibility of new state access
checkStateNameAndMode(
partitionableListState.getStateMetaInfo().getName(),
name,
partitionableListState.getStateMetaInfo().getAssignmentMode(),
mode);
StateMetaInfoSnapshot restoredSnapshot = restoredOperatorStateMetaInfos.get(name);
RegisteredOperatorStateBackendMetaInfo<S> metaInfo =
new RegisteredOperatorStateBackendMetaInfo<>(restoredSnapshot);
// check compatibility to determine if state migration is required
TypeSerializer<S> newPartitionStateSerializer = partitionStateSerializer.duplicate();
@SuppressWarnings("unchecked")
TypeSerializerSnapshot<S> stateSerializerSnapshot = Preconditions.checkNotNull(
(TypeSerializerSnapshot<S>) restoredSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
TypeSerializerSchemaCompatibility<S> stateCompatibility =
stateSerializerSnapshot.resolveSchemaCompatibility(newPartitionStateSerializer);
if (stateCompatibility.isIncompatible()) {
throw new StateMigrationException("The new state serializer for operator state must not be incompatible.");
}
partitionableListState.setStateMetaInfo(
new RegisteredOperatorStateBackendMetaInfo<>(name, newPartitionStateSerializer, mode));
}
accessedStatesByName.put(name, partitionableListState);
return partitionableListState;
}
- 从registeredOperatorStates获取对应PartitionableListState,没有的话则创建,有的话则检查下兼容性,然后往partitionableListState设置stateMetaInfo
DefaultOperatorStateBackendSnapshotStrategy
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
/**
* Snapshot strategy for this backend.
*/
private class DefaultOperatorStateBackendSnapshotStrategy extends AbstractSnapshotStrategy<OperatorStateHandle> {
protected DefaultOperatorStateBackendSnapshotStrategy() {
super("DefaultOperatorStateBackend snapshot");
}
@Nonnull
@Override
public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
final long checkpointId,
final long timestamp,
@Nonnull final CheckpointStreamFactory streamFactory,
@Nonnull final CheckpointOptions checkpointOptions) throws IOException {
if (registeredOperatorStates.isEmpty() && registeredBroadcastStates.isEmpty()) {
return DoneFuture.of(SnapshotResult.empty());
}
final Map<String, PartitionableListState<?>> registeredOperatorStatesDeepCopies =
new HashMap<>(registeredOperatorStates.size());
final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStatesDeepCopies =
new HashMap<>(registeredBroadcastStates.size());
ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(userClassloader);
try {
// eagerly create deep copies of the list and the broadcast states (if any)
// in the synchronous phase, so that we can use them in the async writing.
if (!registeredOperatorStates.isEmpty()) {
for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStates.entrySet()) {
PartitionableListState<?> listState = entry.getValue();
if (null != listState) {
listState = listState.deepCopy();
}
registeredOperatorStatesDeepCopies.put(entry.getKey(), listState);
}
}
if (!registeredBroadcastStates.isEmpty()) {
for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry : registeredBroadcastStates.entrySet()) {
BackendWritableBroadcastState<?, ?> broadcastState = entry.getValue();
if (null != broadcastState) {
broadcastState = broadcastState.deepCopy();
}
registeredBroadcastStatesDeepCopies.put(entry.getKey(), broadcastState);
}
}
} finally {
Thread.currentThread().setContextClassLoader(snapshotClassLoader);
}
AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>> snapshotCallable =
new AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>>() {
@Override
protected SnapshotResult<OperatorStateHandle> callInternal() throws Exception {
CheckpointStreamFactory.CheckpointStateOutputStream localOut =
streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
registerCloseableForCancellation(localOut);
// get the registered operator state infos ...
List<StateMetaInfoSnapshot> operatorMetaInfoSnapshots =
new ArrayList<>(registeredOperatorStatesDeepCopies.size());
for (Map.Entry<String, PartitionableListState<?>> entry :
registeredOperatorStatesDeepCopies.entrySet()) {
operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
}
// ... get the registered broadcast operator state infos ...
List<StateMetaInfoSnapshot> broadcastMetaInfoSnapshots =
new ArrayList<>(registeredBroadcastStatesDeepCopies.size());
for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :
registeredBroadcastStatesDeepCopies.entrySet()) {
broadcastMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
}
// ... write them all in the checkpoint stream ...
DataOutputView dov = new DataOutputViewStreamWrapper(localOut);
OperatorBackendSerializationProxy backendSerializationProxy =
new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);
backendSerializationProxy.write(dov);
// ... and then go for the states ...
// we put BOTH normal and broadcast state metadata here
int initialMapCapacity =
registeredOperatorStatesDeepCopies.size() + registeredBroadcastStatesDeepCopies.size();
final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =
new HashMap<>(initialMapCapacity);
for (Map.Entry<String, PartitionableListState<?>> entry :
registeredOperatorStatesDeepCopies.entrySet()) {
PartitionableListState<?> value = entry.getValue();
long[] partitionOffsets = value.write(localOut);
OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
writtenStatesMetaData.put(
entry.getKey(),
new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
}
// ... and the broadcast states themselves ...
for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :
registeredBroadcastStatesDeepCopies.entrySet()) {
BackendWritableBroadcastState<?, ?> value = entry.getValue();
long[] partitionOffsets = {value.write(localOut)};
OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
writtenStatesMetaData.put(
entry.getKey(),
new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
}
// ... and, finally, create the state handle.
OperatorStateHandle retValue = null;
if (unregisterCloseableFromCancellation(localOut)) {
StreamStateHandle stateHandle = localOut.closeAndGetHandle();
if (stateHandle != null) {
retValue = new OperatorStreamStateHandle(writtenStatesMetaData, stateHandle);
}
return SnapshotResult.of(retValue);
} else {
throw new IOException("Stream was already unregistered.");
}
}
@Override
protected void cleanupProvidedResources() {
// nothing to do
}
@Override
protected void logAsyncSnapshotComplete(long startTime) {
if (asynchronousSnapshots) {
logAsyncCompleted(streamFactory, startTime);
}
}
};
final FutureTask<SnapshotResult<OperatorStateHandle>> task =
snapshotCallable.toAsyncSnapshotFutureTask(closeStreamOnCancelRegistry);
if (!asynchronousSnapshots) {
task.run();
}
return task;
}
}
- DefaultOperatorStateBackendSnapshotStrategy继承了AbstractSnapshotStrategy,它实现的snapshot方法主要是创建registeredOperatorStatesDeepCopies及registeredBroadcastStatesDeepCopies,然后通过AsyncSnapshotCallable来实现
- AsyncSnapshotCallable抽象类实现了Callable接口的call方法,该方法会调用callInternal方法,然后再执行logAsyncSnapshotComplete方法
- AsyncSnapshotCallable的callInternal方法返回的是SnapshotResult<OperatorStateHandle>,它里头主要是将registeredOperatorStatesDeepCopies及registeredBroadcastStatesDeepCopies的数据写入到CheckpointStreamFactory(
比如MemCheckpointStreamFactory
).CheckpointStateOutputStream及writtenStatesMetaData,最后通过CheckpointStateOutputStream的closeAndGetHandle返回的stateHandle及writtenStatesMetaData创建OperatorStreamStateHandle返回
小结
- OperatorStateBackend接口继承了OperatorStateStore、Snapshotable、Closeable、Disposable接口
- OperatorStateStore定义了getBroadcastState、getListState、getUnionListState方法用于create或restore BroadcastState或者ListState;同时也定义了getRegisteredStateNames、getRegisteredBroadcastStateNames用于返回当前注册的state的名称;DefaultOperatorStateBackend实现了OperatorStateStore接口,getRegisteredStateNames方法返回的是registeredOperatorStates.keySet();getRegisteredBroadcastStateNames方法返回的是registeredBroadcastStates.keySet()(
registeredOperatorStates及registeredBroadcastStates这两个都是内存的Map
);getListState及getUnionListState方法都调用了getListState(ListStateDescriptor<S> stateDescriptor,OperatorStateHandle.Mode mode)方法 - Snapshotable接口继承了SnapshotStrategy接口,同时定义了restore方法用于restore state;SnapshotStrategy定义了snapshot方法,给不同的snapshot策略去实现,这里要求snapshot结果返回的类型是StateObject类型;AbstractSnapshotStrategy是个抽象类,它没有实现SnapshotStrategy定义的snapshot方法,这里只是提供了logSyncCompleted方法打印debug信息
- DefaultOperatorStateBackend实现了Snapshotable接口,snapshot方法使用的snapshotStrategy是DefaultOperatorStateBackendSnapshotStrategy;DefaultOperatorStateBackendSnapshotStrategy继承了AbstractSnapshotStrategy,它实现的snapshot方法主要是创建registeredOperatorStatesDeepCopies及registeredBroadcastStatesDeepCopies,然后通过AsyncSnapshotCallable来实现,它里头主要是将registeredOperatorStatesDeepCopies及registeredBroadcastStatesDeepCopies的数据写入到CheckpointStreamFactory(
比如MemCheckpointStreamFactory
).CheckpointStateOutputStream及writtenStatesMetaData - Snapshotable接口要求source的泛型为StateObject类型,StateObject继承了Serializable接口,因为会通过rpc在JobManager及TaskManager之间进行传输;OperatorStateBackend继承Snapshotable接口时,指定source为SnapshotResult<OperatorStateHandle>,而result的为Collection<OperatorStateHandle>类型
- StreamStateHandle继承了StateObject接口,多定义了openInputStream方法;OperatorStateHandle继承了StreamStateHandle,它多定义了getStateNameToPartitionOffsets、getDelegateStateHandle方法,其中getStateNameToPartitionOffsets提供了state name到可用partitions的offset的映射信息;OperatorStreamStateHandle实现了OperatorStateHandle接口,它定义了stateNameToPartitionOffsets属性(
Map<String,StateMetaInfo>
),而getStateNameToPartitionOffsets方法就是返回的stateNameToPartitionOffsets属性 - SnapshotResult类实现了StateObject接口,它包装了snapshot的结果,这里包括jobManagerOwnedSnapshot、taskLocalSnapshot;它实现的discardState方法,调用了jobManagerOwnedSnapshot及taskLocalSnapshot的discardState方法;getStateSize方法则返回的是jobManagerOwnedSnapshot的stateSize