Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, the job configuration in FLINK is spread out across different components, including StreamExecutionEnvironment, CheckpointConfig, and ExecutionConfig. This leads to inconsistencies between configurations stored in these components. For example, the 'execution.checkpointing.interval' in StreamExecutionEnvironment configuration may be different from the checkpoint interval specified in CheckpointConfig. This can confuse developers and higher-level components like the Table layer has to retrieve configuration from multiple sources.

...

Please note that this FLIP does not include deprecating fields related to serialization. The deprecation work for the serialization part will be carried out in conjunction with the relevant work in the FLINK-2.0 serialization section.

Public Interfaces

Deprecate following classes, fields and methods

  • RestartStrategy:

Class

Fields or Methods

Annotation

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment

getRestartStrategy()

@Public

setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration)

org.apache.flink.api.common.ExecutionConfig

getRestartStrategy()

@Public

setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration)

restartStrategyConfiguration

org.apache.flink.api.common.restartstrategy.RestartStrategies

Deprecate the entire class, includes its inner class

@PublicEvolving

Suggested alternative: Users can configure the RestartStrategyOptions related ConfigOptions, such as "restart-strategy.type", in the configuration, instead of passing a RestartStrategyConfiguration object.

  • CheckpointStorage

Class

Fields or Methods

Annotation

org.apache.flink.streaming.api.environment.CheckpointConfig

setCheckpointStorage(CheckpointStorage storage)

@Public

setCheckpointStorage(String checkpointDirectory)

setCheckpointStorage(URI checkpointDirectory)

setCheckpointStorage(Path checkpointDirectory)

getCheckpointStorage()

Suggested alternative: Users can configure "state.checkpoint-storage" in the configuration as the fully qualified name of the checkpoint storage or use some FLINK-provided checkpoint storage shortcut names such as "jobmanager" and "filesystem", and provide the necessary configuration options for building that storage, instead of passing a CheckpointStorage object.

  • StateBackend

Class

Fields or Methods

Annotation

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment

setStateBackend(StateBackend backend)

@Public

getStateBackend()

defaultStateBackend

Suggested alternative: Users can configure "state.backend.type" in the configuration as the fully qualified name of the state backend or use some FLINK-provided state backend shortcut names such as "hashmap" and "rocksdb", and provide the necessary configuration options for building that StateBackend, instead of passing a StateBackend object.

Proposed Changes

We propose deprecating the classes/methods mentioned above and updating the documentation from the Flink website.

Compatibility, Deprecation, and Migration Plan

The mentioned method, fields and class are planned to be deprecated in Flink 1.19 and subsequently removed in Flink 2.0. For users who rely on these, it is recommended to use the ConfigOption stack through configuration. For example, users should configure the application checkpoint storage and state backend like the following code:

Code Block
Configuration config = new Configuration();
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file://test");
config.set(StateBackendOptions.STATE_BACKEND, "org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendFactory");

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);

Test Plan

N.A.

Rejected Alternatives

N.A.