...
Discussion thread | https://lists.apache.org/thread/0oc10cr2q2ms855dbo29s7v08xs3bvqg | ||||||||
---|---|---|---|---|---|---|---|---|---|
Vote thread | https://lists.apache.org/thread/lrqjg44v0s82shbpvbqp6ojqv873q1wr | ||||||||
JIRA |
| ||||||||
Release | 1.1920, 2.0 |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
In addition, there are several individual options such as state.checkpoint-storage
and state.checkpoints.dir
that fall outside of these prefixes. The current arrangement of these options, which span multiple modules, is somewhat haphazard and lacks a systematic structure. For example, the options under the CheckpointingOptions
and ExecutionCheckpointingOptions
are related and have no clear boundaries from the user's perspective, but there is no unified prefix for them. With the upcoming release of Flink 2.0, we have an excellent opportunity to overhaul and restructure the configurations related to checkpointing, recovery, and state management. This FLIP proposes to reorganize these settings, making it more coherent by module, which would significantly lower the barriers for understanding and reduce the development costs moving forward.
Public Interfaces & Proposed Changes
This FLIP proposes a transition from existing configuration options to new ones, which are categorized by prefixes as listed below:
- execution.checkpointing: all configurations associated with checkpointing and savepoint.
- execution.state-recovery: all configurations pertinent to state recovery.
- state.*: all configurations related to the state accessing.
- state.backend.*: specific options for individual state backends, such as RocksDB.
- state.changelog: configurations for the changelog, as outlined in FLIP-158, including the options for the "Durable Short-term Log" (DSTL).
- state.latency-track: configurations related to the latency tracking of state access.
Additional key naming rule for boolean type options: ONLY add 'enabled' when there are more detailed configurations under the same prefix, to prevent one name from serving as a prefix to another. Otherwise, do not add 'enabled' as suffix. Alternatives:
Option 1: Use enumeration options instead if possible. But this may cause some name collisions or confusion as we discussed and we should unify the statement everywhere.
Option 2: Use boolean options and add 'enabled' as the suffix.
- Option 3 (Current preferred choice): Use boolean options and ONLY add 'enabled' when there are more detailed configurations under the same prefix, to prevent one name from serving as a prefix to another.
Please note that this FLIP does not change any state accessing, checkpointing and recovery logic. To be more specific, the detailed migration plan for the individual options is as follows:
Checkpointing
Original (Current) option | Proposed New option | Type | Note |
state.checkpoints.num-retained | execution.checkpointing.num-retained | Integer | |
state.checkpoint.cleaner.parallel-mode | execution.checkpointing.cleaner.parallel-mode | Boolean | |
state.backend.incremental | execution.checkpointing.incremental | Boolean | |
state.backend.local-recovery | execution.checkpointing.local- |
backup.enabled | Boolean | Whether to do checkpoint on local disk, another option |
taskmanager.state.local.root-dirs | execution.checkpointing.local- |
backup.dir | String | |
state.savepoints.dir | execution.checkpointing.savepoint |
-dir | String | ||
state.checkpoints.dir state.checkpoint-storage | execution.checkpointing.dir | String | One URI is enough for specifying the checkpoint directory and storage. 'jobmanager' and 'jm://' are special URI for job manager based storage. |
state.storage.fs.memory-threshold | execution.checkpointing.data-inline-threshold | MemorySize | |
state.storage.fs.write-buffer-size | execution.checkpointing.write-buffer-size | MemorySize | |
execution.checkpointing.checkpoints-after-tasks-finish.enabled | execution.checkpointing.checkpoints-after-tasks-finish | Boolean | Remove the 'enabled' if the naming rule for boolean type option 3 is chosen |
All those options will be moved into `ExecutionCheckpointingOptions``CheckpointingOptions` if they are not currently in.
Recovery
Original (Current) option | Proposed New option | Type | Note |
execution.savepoint.path | execution.state-recovery.path | String | |
execution.savepoint.ignore-unclaimed-state | execution.state-recovery.ignore-unclaimed-state | Boolean | |
execution.savepoint-restore-mode | execution.state-recovery.claim-mode | Enum | Enum of org.apache.flink.runtime.jobgraph.RestoreMode |
state.backend.local-recovery | execution.state-recovery.from-local | Boolean | Specifies whether to recover from local, the local files are created while the |
| |||
execution.checkpointing.approximate-local-recovery | execution.state-recovery.approximate-local-recovery | Boolean | |
execution.checkpointing.recover-without-channel-state.checkpoint-id | execution.state-recovery.without-channel-state.checkpoint-id | Long |
All those options will be moved into a newly introduced class `ExecutionRecoveryOptions``RecoveryOptions` in flink-core.
State
Original (Current) option | Proposed New option | Note |
state.backend.changelog.* | state.changelog.* | |
dstl.* | state.changelog.dstl.* | |
state.backend.latency-track.* | state.latency-track.* |
...
As the most special one, the state.backend.local-recovery
defines whether the Flink do checkpoint in local disk, as well as whether the Flink would recover from the local. This FLIP proposes to spilt this option into two, named execution.checkpointing.local-copybackup
and execution.state-recovery.from-local
, representing checkpointing and recovery behavior respectively.
Additionally, some existing option classes lack annotations,
...
which are:
org.apache.flink.configuration.CheckpointingOptions
- org.apache.flink.configuration.StateBackendOptions
org.apache.flink.contrib.streaming.state.RocksDBOptions
This FLIP will annotate all such existing classes, alongside any new classes that are introduced, with the @PublicEvolving in 2.0. Note that the migration period for deprecated options in those classes will meet the requirement for @PublicEvolving (Will last for at least one minor release). With above changes, the options for state, checkpointing and recovery are organized in a more straightforward structure. It is simple to categorize new options under appropriate prefixes.
Compatibility, Deprecation, and Migration Plan
In Flink 1.1920, the new options will be introduced and the current ones will be annotated as deprecated. Flink will read new options first, and if the user does not configure them, fallback to the old(current) options. The deprecation will last for two one minor versions(1.19 and 1.20), and in Flink 2.0, the old options will be removed entirely.
It there is a migration tool for current YAML file transition (current flink-conf.yaml to new config.yaml) that FLIP-366 introduced, the migration of above changes will also integrated in the tool.
Test Plan
Existing UT/IT can ensure the compatibility for old options. New tests will cover the new options.
Rejected Alternatives
...
- state.backend.incremental → execution.checkpoint.type (an enumeration option which can be 'full' (default) or 'incremental')
- `Checkpoint Type` is used to describe aligned/unaligned checkpoint in the document
- state.savepoints.dir → execution.checkpointing.savepoint.dir
- checkpointing.savepoint is wired.
- state.backend.local-recovery → execution.checkpointing.local-copy
- local-backup sounds better
- execution.recovery as the configuration naming prefix for state recovery
- `recovery` means too many things. Use `state-recovery` instead.