Status
Current state: Under Discussion
Discussion thread: https://mail-archives.apache.org/mod_mbox/flink-dev/
JIRA:
Released: –
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This FLIP proposes the addition a non-partitioned state abstraction that can be rescaled.
Currently (as of cd94aad), stream task state consists of non-partitioned operator and function state as well as partitioned key-value state.
Dynamic scaling for partitioned state has been discussed in the Dynamic Scaling: Key Groups design document (no associated FLIP as the design doc predates the introduction of the FLIP process) and the initial implementation can be found in pull request #1988.
The main question for rescalable state is how to split or merge state when scaling up or down, respectively. For partitioned state the main observation is that splitting and merging can be based on the state key. This key is exposed to the runtime (see KvState) and it is possible to automatically redistribute state based on the key. For non-partitioned state on the other hand this is not possible as the state abstraction is agnostic to the contained state and the state is exposed as a single unit for each sub task.
Example: Kafka Source
Let's look at Flink's Kafka source as an example of non-partitioned state usage. Consider the following initial setup with 4 Kafka partitions (KP1-4) and 2 Flink Kafka source tasks (S1-2). Each sub task will have state for two Kafka partitions. When snapshotting, the state for two partitions (partition index, current offset, etc.) will be stored as one unit by each sub task, (KP1, KP2) and (KP2, KP3).
Scaling Down
When we now resume from this state with a scaled down job, only state (KP1, KP2) of S1 will be restored. The remaining state (KP3, KP4) cannot be matched to any sub task. This means that only partitions 1 and 2 will be consumed again by Flink, skipping partitions 3 and 4, which were previously mapped to sub task 2 (figure left). What we want instead is that after scaling down all state is merged and restored by S1 (figure right):
Scaling Up
When we scale our job up, the Kafka partitions will continue to be only mapped to subtasks 1 and 2, leaving sub tasks 3 and 4 idling (figure left). What we want instead is that after scaling up all state is split and distributed over all four sub tasks:
Public Interfaces
On the API level, the main objective is to allow the user to expose multiple units of state that can be redistributed. With respect to Kafka source example above, this would require the ability to express that KP1, KP2, KP3, KP4 are separate units of state. The following two sections describe a non-exhaustive set of options to accomplish this.
Option 1: Add CheckpointedUnioned Interface
Similar to the Checkpointed interfaces, we add another variant:
public interface CheckpointedUnioned<T extends Serializable> {
List<T> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception;
void restoreState(List<T> unionedState) throws Exception;
}
We collect the state from each function as a list and on recovery we send each operator the union of all collected lists. In restoreState
, the operator can then select the parts of the unioned list which are relevant for the current run.
Option 2: Add Unkeyed UnionState
Similar to the keyed state classes like ValueState
we add unkeyed variants to RuntimeContext:
<T> UnionState<T> getUnionState(UnionStateDescriptor<T> stateProperties);
The UnionState instance is similar to the existing state instances for keyed state:
public interface UnionState<T> extends MergingState<T, List<T>> {
T getLocal(int index);
void setLocal(int index, T state);
List<T> getAllLocal();
List<T> getUnion();
}
Proposed Changes
We gather the state handles as we currently do for all sub tasks and marked them to be merged. On restore, the checkpoint coordinator merges the state handles and sends them to the tasks
Compatibility, Deprecation, and Migration Plan
Existing users should not be affected by the proposed changes. All existing non-partitioned state variants should be kept. If the rescalabe APIs are accepted it should (in theory) be possible for users to easily replace all existing non-partitioned state variants with the redistributable variants. Given that the Checkpointed
interface is marked as @PublicEvolving
it should be possible to actually enforce such a change.
Test Plan
The implementation should be tested by running a job, triggering a savepoint and then resuming from this savepoint with both lower and higher parallelism. If support for rescaling of keyed state is already available, this should ideally be tested both separately and together with keyed state.
Rejected Alternatives
User-defined Split/Merge Methods
Instead of exposing the redistributable state units via a list and unioning them on restore, we could keep the current non-partitioned state interfaces and require user-defined split and merge methods.
This would have the advantage that users can keep on using the current APIs. On the other hand, state would likely need to be deserialized completely on the JobManager and then split or merged via the user-specified function, then serialized again, and send to the tasks. This would mean that the JobManager would not be able continue treating user state as black boxes.