Status
Current state: Under Discussion
Discussion thread: https://mail-archives.apache.org/mod_mbox/flink-dev/
JIRA:
Released: –
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This FLIP proposes the addition a non-partitioned state abstraction that can be rescaled.
Currently (as of cd94aad), stream task state consists of non-partitioned operator and function state as well as partitioned key-value state.
Dynamic scaling for partitioned state has been discussed in the Dynamic Scaling: Key Groups design document (no associated FLIP as the design doc predates the introduction of the FLIP process) and the initial implementation can be found in pull request #1988.
The main question for rescalable state is how to split or merge state when scaling up or down, respectively. For partitioned state the main observation is that splitting and merging can be based on the state key. This key is exposed to the runtime (see KvState) and it is possible to automatically redistribute state based on the key. For non-partitioned state on the other hand this is not possible as the state abstraction is agnostic to the contained state and the state is exposed as a single unit for each sub task.
Example: Kafka Source
Let's look at Flink's Kafka source as an example of non-partitioned state usage. Consider the following initial setup with 4 Kafka partitions (KP1-4) and 2 Flink Kafka source tasks (S1-2). Each sub task will have state for two Kafka partitions. When snapshotting, the state for two partitions (partition index, current offset, etc.) will be stored as one unit by each sub task, (KP1, KP2) and (KP2, KP3).
Scaling Down
When we now resume from this state with a scaled down job, only state (KP1, KP2) of S1 will be restored. The remaining state (KP3, KP4) cannot be matched to any sub task. This means that only partitions 1 and 2 will be consumed again by Flink, skipping partitions 3 and 4, which were previously mapped to sub task 2 (figure left). What we want instead is that after scaling down all state is merged and restored by S1 (figure right):
Scaling Up
When we scale our job up, the Kafka partitions will continue to be only mapped to subtasks 1 and 2, leaving sub tasks 3 and 4 idling (figure left). What we want instead is that after scaling up all state is split and distributed over all four sub tasks:
Public Interfaces
On the API level, the main objective is to allow the user to expose multiple units of state that can be redistributed. With respect to Kafka source example above, this would require the ability to express that KP1, KP2, KP3, KP4 are separate units of state. The following two sections describe a non-exhaustive set of options to accomplish this.
Option 1: Add CheckpointedUnioned Interface
Similar to the Checkpointed interfaces, we add another variant:
public interface CheckpointedUnioned<T extends Serializable> {
List<T> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception;
void restoreState(List<T> unionedState) throws Exception;
}
We collect the state from each function as a list and on recovery we send each operator the union of all collected lists. In restoreState
, the operator can then select the parts of the unioned list which are relevant for the current run.
Option 2: Add Unkeyed UnionState
Similar to the keyed state classes like ValueState
we add unkeyed variants to RuntimeContext:
<T> UnionState<T> getUnionState(UnionStateDescriptor<T> stateProperties);
The UnionState instance is similar to the existing state instances for keyed state:
public interface UnionState<T> extends State {
void setState(List<T> state);
List<T> getState();
}
Proposed Changes
TBD
Compatibility, Deprecation, and Migration Plan
Existing users should not be affected by the proposed changes. All existing non-partitioned state variants should be kept. If the rescalabe APIs are accepted it should (in theory) be possible for users to easily replace all existing non-partitioned state variants with the redistributable variants. Given that the Checkpointed
interface is marked as @PublicEvolving
it should be possible to actually enforce such a change.
Test Plan
Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.