Status
Motivation
With the current state registration interface, managed state (both operator and keyed state) can be lazily declared any time during processing. This poses problems in several dimensions for Flink’s state management capabilities:
- State migration / schema evolution: Flink can only know whether or not state migration or schema evolution is required for a state at the time of the state’s declaration (i.e., when the state descriptor is provided). Consequently, state migration / schema evolution would not work with lazy state declaration because we would then need a global stop during during processing to perform full migrations.
Queryable state availability: Currently, a state is queryable only when it is registered at the key-value state registry, which happens at the time of the state’s declaration. This is also the case when restoring from a checkpoint; before the state is declared again, the state is not queryable.
JobManager lacks information about state: all information about the state that is provided through state declaration is not visible to the JobManager, i.e. serialization, operator state handle redistribution scheme, etc. For example, for operator state redistribution, it should be possible that the JobManager redistributes state partition handles with whatever mode is currently specified (SPLIT_REDISTRIBUTE or BROADCAST). However, with lazy state declaration, the only way to carry on this information to the JobManager on restore is to encode it within the written state, and therefore cannot be changed at restore time.
Proposed Changes & Public Interfaces
The problems mentioned above are restraining the possibility of pushing out more powerful state management capabilities. Towards this, we propose to limit a more restricted form of state declaration.
(1) Pre-flight, eager state declaration
All operators, as well as rich UDFs can be stateful. Previously, state can be lazily declared with a state descriptor any time during processing by using getRuntimeContext().getState(descriptor)
. Both concerns of state access as well as state registration is addressed through the same call.
We propose to decompose this into 2 steps: (a) pre-flight (i.e. before the job graph is submitted) state descriptor registration, and (b) runtime access to registered state using the state ID only.
(1.a) New state descriptor registration methods
For (a), new state descriptor registration methods will be added to the StreamOperator
and RichFunction
interfaces. These methods will only be functional when invoked pre-flight. Invocation after that would fail with an exception.
StreamOperator interface new eager state declaration methods:
public interface StreamOperator<OUT> {
...
void registerKeyedState(StateDescriptor<?,?> descriptor);
void registerOperatorState(OperatorStateDescriptor<?,?> descriptor);
Collection<StateDescriptor<?,?>> getRegisteredKeyedStateDescriptors();
Collection<OperatorStateDescriptor<?,?>> getRegisteredOperatorStateDescriptors();
...
}
RichFunction interface new eager state declaration methods:
public interface RichFunction {
...
void registerKeyedState(StateDescriptor<?,?> descriptor);
void registerOperatorState(OperatorStateDescriptor<?,?> descriptor);
Collection<StateDescriptor<?,?>> getRegisteredKeyedStateDescriptors();
Collection<OperatorStateDescriptor<?,?>> getRegisteredOperatorStateDescriptors();
...
}
Operators carry the registered state descriptors as they are shipped to JM / TMs. This essentially also exposes information about registered states of each JobVertex
in the JobGraph
to the JobManager
, opening up opportunities for future state management capabilities.
When creating keyed / operator state backends for operators (i.e. StreamTask#createKeyedStateBackend()
and StreamTask#createOperatorStateBackend()
), the registered descriptors must be provided, and will be the only opportunity to do so. All other registration methods (e.g. KeyedStateBackend#getOrCreateKeyedState
) will be deprecated and eventually removed.
Note that OperatorStateDescriptor
would be a new class extending StateDescriptor
, which should wrap operator-state specific information such as the redistribution scheme. Previously, this information was not carried by operator state descriptors.
(2.a) Runtime access to registered states via state ID only
State access for both operator and keyed state will be provided through the RuntimeContext
by exposing the KeyedStateStore
and OperatorStateStore
:
public interface RuntimeContext {
...
KeyedStateStore getKeyedStateStore();
OperatorStateStore getOperatorStateStore();
...
}
The state access methods on the state stores would be:
State access methods for KeyedStateStore:
public interface KeyedStateStore {
<T> ValueState<T> getValueState(String stateId);
<T> ListState<T> getListState(String stateId);
<T> ReducingState<T> getReducingState(String stateId);
<K,V> MapState<K,V> getMapState(String stateId);
}
State access methods for OperatorStateStore:
public interface OperatorStateStore {
<T> ListState<T> getPartitionableListState(String stateId);
}
The new methods take only the target state's ID as argument, to make it explicitly clear that the state is only accessible if it had been registered with the API described in (1.a).
All other state access / registration methods in RuntimeContext
will be deprecated and eventually removed.
(2) Syntactic sugar via annotations
Besides the programmatic state declaration API described in (1), we also propose to provide a higher level, easy-to-use syntactic sugar for eager state declaration via Java annotations.
While the programmatic API is required for layered frameworks on top of Flink (such as Apache Beam) to register state, this annotation-based API is the targeted interface that Flink users should program against, and is intended to keep boilerplate state descriptor instantiation / state registration / state access code to the minimum for cleaner and more intuitive user application code.
Annotation for keyed state:
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface KeyedState {
String stateId();
String queryableStateName() default "";
Class<? extends TypeInfoFactory> typeInfoFactory() default NullTypeInfoFactory.class;
Class<? extends TypeSerializerFactory> typeSerializerFactory() default NullTypeSerializerFactory.class;
Class<? extends Function> function() default NullFunction.class
}
Internally, a StateDescriptor
will be built using the fields. Apart from the stateId
, all other fields are optional:
queryableStateName
: queryable state handle. If provided, the state would be queryable.typeInfoFactory
: corresponds to providing a customTypeInformation
inStateDescriptor
s.typeSerializerFactory
: corresponds to providing a customTypeSerializer
inStateDescriptor
s.- if none of
typeInfoFactory
ortypeSerializerFactory
is provided, the state type will be extracted using Java reflection and analyzed using Flink's type stack. function
: any functions that needs to be associated with the state, e.g. aReduceFunction
forReducingState
.
Annotation for operator state:
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface OperatorState {
String stateId();
OperatorStateHandle.Mode redistributeMode() default OperatorStateHandle.SPLIT_REDISTRIBUTE;
Class<? extends TypeInfoFactory> typeInfoFactory() default NullTypeInfoFactory.class;
Class<? extends TypeSerializerFactory> typeSerializerFactory() default NullTypeSerializerFactory.class;
}
Internally, a OperatorStateDescriptor
will be built using the fields. Apart from the stateId
, all other fields are optional:
redistributeMode
: the redistribution scheme of this operator list state. For the initial version, we would not allow changing the used mode for previous state. Ideally, this can be changed freely depending on what redistribution scheme the user wishes to use for the current restore.
These annotations can be used at the operator-level or in rich UDFs. Briefly speaking, users' custom operators / UDFs should declare fields typed with the user-facing State
types, and annotate them as either KeyedState
or OperatorState
.
Information provided through annotation fields subsume the need to provide a state descriptor, making the use of StateDescriptor
s specific to the programmatic API only.
Example usage:
Declaring managed keyed state:
@KeyedState(
stateId = "myPojoListStateId",
queryableStateName = "myPojoListStateQueryHandle", // optional
typeSerializerFactory = MyPojoSerializerFactory.class // optional
)
private ListState<MyPojo> pojoListState;
Declaring managed operator state:
@OperatorState(
stateId = "myPojoListStateId",
redistributeMode = OperatorStateHandle.Mode.BROADCAST // optional
)
private ListState<MyPojo> pojoOperatorState;
When a transformation is added to the stream graph, a StateDeclarationExtractor
is used to extract the declared keyed / operator states using state annotations, returning a Tuple2<Collection<StateDescriptor<?,?>>, Collection<OperatorStateDescriptor<?,?>>>
(tuple of all keyed state descriptors and all operator state descriptors). The extracted state descriptors would then be registered at the operator / rich UDF using the programmatic APIs, and from there follows the same code paths as described in (1).
Annotated state fields will also be automatically initialized during the StreamOperator#initializeState
phase using Java reflection. The user is not required to access state via the RuntimeContext
if the state was declared using state annotations. This should also enhance the perceived user experience, such that programming against Flink managed state is similar to manipulating a local variable.
(3) Deprecate existing lazy state declaration interfaces
All existing lazy state declaration interfaces will be deprecated, and eventually removed (Flink version for removal TBD). This includes:
get*State
methods onRuntimeContext
.- All existing
get*State
methods onKeyedStateStore
andOperatorStateStore
. getKeyedStateStore
andgetOperatorStateStore
onFunctionInitializationContext
, as this will be subsumed by exposing them through theRuntimeContext
.
New state management functionalities (e.g. state migration / schema evolution) will only be available for eagerly registered state.
Compatibility, Deprecation, and Migration Plan
- Existing API is not broken or removed, only deprecated and discouraged.
- Users are strongly encouraged to migrate to using eager state declaration. State will not be lost as long as the same state IDs are used.
- New state management functionalities (e.g. state migration / schema evolution) will only be available for eagerly registered state using the new API.
- Legacy dynamic state declaration APIs are planned to be removed in future Flink versions (TBD).
Test Plan
All code in Flink that use managed state should be switched to the new API. This should ensure that using the new API does not result in different state behaviours.
Apart from this, at a minimum new integration tests should include:
- registering states after job submission should fail, with meaningful messages
- accessing non-registered state IDs should fail, with meaningful messages
- state backends are instantiated with eagerly declared states' descriptors
- state declarations are visible and correct at the
JobManager
Rejected Alternatives
None so far.