Status
Current state: Under Discussion
Discussion thread: https://mail-archives.apache.org/mod_mbox/flink-dev/
JIRA:
Released: –
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This FLIP proposes the addition a non-partitioned state abstraction that can be rescaled.
Currently (as of cd94aad), stream task state consists of non-partitioned operator and function state as well as partitioned key-value state.
Dynamic scaling for partitioned state has been discussed in the Dynamic Scaling: Key Groups design document (no associated FLIP as the design doc predates the introduction of the FLIP process) and the initial implementation can be found in pull request #1988.
The main question for rescalable state is how to split or merge state when scaling up or down, respectively. For partitioned state the main observation is that splitting and merging can be based on the state key. This key is exposed to the runtime (see KvState) and it is possible to automatically redistribute state based on the key. For non-partitioned state on the other hand this is not possible as the state abstraction is agnostic to the contained state and the state is exposed as a single unit for each sub task.
Example: Kafka Source
Let's look at Flink's Kafka source as an example of non-partitioned state usage. Consider the following initial setup with 4 Kafka partitions (KP1-4) and 2 Flink Kafka source tasks (S1-2). Each sub task will have state for two Kafka partitions. When snapshotting, the state for two partitions (partition index, current offset, etc.) will be stored as one unit by each sub task, (KP1, KP2) and (KP2, KP3)
Scaling Down
When we now resume from this state with a scaled down job (i.e. single sub task), only state (KP1, KP2) of S1 will be restored. The remaining state (KP3, KP4) cannot be matched to any sub task. This means that only partitions 1 and 2 will be consumed again by Flink, skipping partitions 3 and 4, which were previously mapped to sub task 2 (figure left). What we want instead is that after scaling down all state is merged and restored by the S1 (figure right):
Scaling Up
When we scale our job up (e.g. to four sub tasks), the Kafka partitions will continue to be only mapped to subtasks 1 and 2, leaving sub tasks 3 and 4 idling (figure left). What we want instead is that after scaling up all state is split and distributed over all four sub tasks:
Public Interfaces
Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.
A public interface is any change to the following:
- DataStream and DataSet API, including classes related to that, such as StreamExecutionEnvironment
- Classes marked with the @Public annotation
- On-disk binary formats, such as checkpoints/savepoints
- User-facing scripts/command-line tools, i.e. bin/flink, Yarn scripts, Mesos scripts
- Configuration settings
- Exposed monitoring information
Proposed Changes
Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Test Plan
Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.