Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
[This FLIP proposal is a joint work between Xuannan Su and Rui Fan ]
Motivation
As Flink progresses to 2.0, we want to enhance the user experience with the existing configuration. In FLIP-77, we introduced ConfigOption
with DataType
and strongly encourage users to utilize ConfigOption
instead of string keys for accessing and setting Flink configurations. Presently, many string configuration keys have been deprecated and replaced with ConfigOptions
; however, some string configuration keys are still in use.
To ensure a better experience with the existing configuration in Flink 2.0, this FLIP will migrate all user-facing string configuration keys to ConfigOptions
. Additionally, we want to modify the Configuration infrastructure to promote the use of ConfigOption
over string configuration keys among developers and users.
Public Interfaces
1. Update ConfigConstants.java to deprecate and replace string configuration keys:
Original | Class | Key | Deprecated Key | Default Value | Type | Description |
TASK_MANAGER_LOG_PATH_KEY | TaskManagerOptions | taskmanager.log.path | System.getProperty("log.file") | String | The path to the log file of the taskmanager. | |
FS_STREAM_OPENING_TIMEOUT_KEY | TaskManagerOptions | taskmanager.runtime.fs-timeout | taskmanager.runtime.fs_timeout | 0 ms | Duration | The timeout for filesystem stream opening. A value of 0 indicates infinite waiting. |
YARN_CONTAINER_START_COMMAND_TEMPLATE | YarnConfigOptions | yarn.container-start-command-template | %java% %jvmmem% %jvmopts% %logging% %class% %args% %redirects% | String | Template for the YARN container start invocation. | |
LOCAL_NUMBER_TASK_MANAGER | TaskManagerOptions | minicluster.number-taskmanager | local.number-taskmanager | 1 | Integer | The number of task manager of MiniCluster. |
2. Mark unused constants in ConfigConstants.java as deprecated and remove in 2.0:
LOCAL_NUMBER_JOB_MANAGER
DEFAULT_LOCAL_NUMBER_JOB_MANAGER
HA_JOB_MANAGER_PORT
DEFAULT_EXECUTION_RETRIES
DEFAULT_FILESYSTEM_SCHEME
DEFAULT_FILESYSTEM_OVERWRITE
DEFAULT_STATE_BACKEND
ENV_FLINK_BIN_DIR
3. Update FileInputFormat.java, FileOutputFormat.java, BinaryInputFormat.java, and BinaryOutputFormat.java to deprecate and replace string configuration keys:
- We will introduce InputOutputFormatOptions class at org.apache.flink.api.common.io.
- FileInputFormat.java
Original | Class | Key | Deprecated Key | Default Value | Type | Description |
FILE_PARAMETER_KEY | InputOutputFormatOptions | input-format.file.path | input.file.path | none | String | The input file path. |
ENUMERATE_NESTED_FILES_FLAG | InputOutputFormatOptions | input-format.file.recursive.enabled | recursive.file.enumeration | false | Boolean | Whether input directories are recursively traversed. |
- FileOutputFormat.java
Original | Class | Key | Deprecated Key | Default Value | Type | Description |
FILE_PARAMETER_KEY | InputOutputFormatOptions | output-format.file.path | output.file.path | none | String | The output file path. |
- BinaryInputFormat.java
Original | Class | Key | Deprecated Key | Default Value | Type | Description |
BLOCK_SIZE_PARAMETER_KEY | InputOutputFormatOptions | input-format.binary.block_size | input.block_size | Long.MIN_VALUE | Long | The fixed length of a record. |
- BinaryOutputFormat.java
Original | Class | Key | Deprecated Key | Default Value | Type | Description |
BLOCK_SIZE_PARAMETER_KEY | InputOutputFormatOptions | output-format.binary.block_size | output.block_size | Long.MIN_VALUE | Long | The fixed length of a record. |
4. Update Configuration to encourage the usage of ConfigOption over string configuration key
- Deprecate getXXX(String key, XXX defaultValue) and setXXX(String key, XXX value), except getString(String key, String defaultValue), setString(String key, String value), getBytes(String key, byte[] defaultValue), and setBytes(String key, byte[] bytes).
- Discussed at this preview thread: https://lists.apache.org/thread/zzsf7glfcdjcjm1hfo1xdwc6jp37nb3m
- Update the comment in both getString(String key, String defaultValue) and setString(String key, String value) to encourage users to use ConfigOption.
@Public public class Configuration extends ExecutionConfig.GlobalJobParameters implements IOReadableWritable, java.io.Serializable, Cloneable, ReadableConfig, WritableConfig { ... /** * Returns the value associated with the given key as a string. Getting value with string key is * discouraged. Please use {@link #get(ConfigOption)} or {@link #getOptional(ConfigOption)}. * * @param key the key pointing to the associated value * @param defaultValue the default value which is returned in case there is no value associated * with the given key * @return the (default) value associated with the given key */ public String getString(String key, String defaultValue) { ... } /** * Adds the given key/value pair to the configuration object. Setting value with string key is * discouraged. Please use {@link #set(ConfigOption, Object)}. * * @param key the key of the key/value pair to be added * @param value the value of the key/value pair to be added */ public void setString(String key, String value) { ... } }
Proposed Changes
Migrate all usage of deprecated string key in ConfigConstants to ConfigOption.
Remove all the internal usage of string configuration keys, e.g., DistributedCache, TaskConfig, StreamConfig, YarnClusterDescriptor, etc.
Compatibility, Deprecation, and Migration Plan
All the string configuration keys in the @Public class must marked as deprecated and replaced with ConfigOption in Flink 1.19
ConfigConstants
FileInputFormats
FileOutputFormats
BinaryInputFormats
BinaryOutputFormats
Updates to the Configuration class must be made in Flink 1.19 since it is @Public
In Flink 2.0, we will remove all the deprecated string configuration keys completely
Test Plan
The existing UT and IT should already cover all new ConfigOptions introduced.