Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion thread

preview discussion : https://lists.apache.org/thread/zzsf7glfcdjcjm1hfo1xdwc6jp37nb3m

Official discussion: https://lists.apache.org/thread/zfw1b1g3679yn0ppjbsokfrsx9k7ybg0

Vote threadhttps://lists.apache.org/thread/joyr7bxpo0lcj1zfzdj5nv0lrhb303rx
JIRA

Unable to render Jira issues macro, execution error.

Release1.19, 1.20, 2.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


[This FLIP proposal is a joint work between Xuannan Su and Rui Fan ]

1. Motivation

As Flink progresses to 2.0, we want to enhance the user experience with the existing configuration. In FLIP-77, we introduced ConfigOption with DataType and strongly encourage users to utilize ConfigOption instead of string keys for accessing and setting Flink configurations. Presently, many string configuration keys have been deprecated and replaced with ConfigOptions; however, some string configuration keys are still in use.

To ensure a better experience with the existing configuration in Flink 2.0, this FLIP will migrate all user-facing string configuration keys to ConfigOptions. Additionally, we want to modify the Configuration infrastructure to promote the use of ConfigOption over string configuration keys among developers and users.

2. Public Interfaces

2.1 Public interfaces part1: updating all string keys to ConfigOption

2.1.1 Update ConfigConstants.java to deprecate and replace string configuration keys:


Original

Class

Key

Deprecated Key

Default Value

Type

Description

TASK_MANAGER_LOG_PATH_KEY

TaskManagerOptions

taskmanager.log.path


System.getProperty("log.file")

String

The path to the log file of the taskmanager. 

FS_STREAM_OPENING_TIMEOUT_KEY

TaskManagerOptions

taskmanager.runtime.fs-timeout

taskmanager.runtime.fs_timeout

0 ms

Duration

The timeout for filesystem stream opening. A value of 0 indicates infinite waiting.

YARN_CONTAINER_START_COMMAND_TEMPLATE

YarnConfigOptions

yarn.container-start-command-template


%java% %jvmmem% %jvmopts% %logging% %class% %args% %redirects%

String

Template for the YARN container start invocation.

LOCAL_NUMBER_TASK_MANAGER

TaskManagerOptions

minicluster.number-of-taskmanagers

local.number-taskmanager

1

Integer

The number of task manager of MiniCluster.


2.1.2 Mark unused constants in ConfigConstants.java as deprecated and remove in 2.0:

  • LOCAL_NUMBER_JOB_MANAGER

  • DEFAULT_LOCAL_NUMBER_JOB_MANAGER

  • HA_JOB_MANAGER_PORT

  • DEFAULT_EXECUTION_RETRIES

  • DEFAULT_FILESYSTEM_SCHEME

  • DEFAULT_FILESYSTEM_OVERWRITE

  • DEFAULT_STATE_BACKEND

  • ENV_FLINK_BIN_DIR


2.1.3 Update FileInputFormat.java, FileOutputFormat.java, BinaryInputFormat.java, and BinaryOutputFormat.java to deprecate string configuration keys:

  • FileInputFormat.java
    • FILE_PARAMETER_KEY

    • ENUMERATE_NESTED_FILES_FLAG
  • FileOutputFormat.java
    • FILE_PARAMETER_KEY
  • BinaryInputFormat.java
    • BLOCK_SIZE_PARAMETER_KEY
  • BinaryOutputFormat.java
    • BLOCK_SIZE_PARAMETER_KEY

2.2 Public interfaces part2: Simplify the Configuration

2.2.1 Update Configuration to encourage the usage of ConfigOption over string configuration key

  • Deprecate getXXX(String key, XXX defaultValue) and setXXX(String key, XXX value), discussed in the preview thread, except the following:
    • getString(String key, String defaultValue)
    • setString(String key, String value)
    • getBytes(String key, byte[] defaultValue) will be marked as @Internal
    • setBytes(String key, byte[] bytes) will be marked as @Internal
  • Update the comment in getString(String key, String defaultValue) and setString(String key, String value) to encourage users to use ConfigOption.


Configuration.java
@Public
public class Configuration extends ExecutionConfig.GlobalJobParameters
        implements IOReadableWritable,
                java.io.Serializable,
                Cloneable,
                ReadableConfig,
                WritableConfig {
...      

	/**
     * Returns the value associated with the given key as a string. We encourage users and
     * developers to always use ConfigOption for getting the configurations if possible, for its
     * rich description, type, default-value and other supports. The string-key-based getter should
     * only be used when ConfigOption is not applicable, e.g., the key is programmatically generated
     * in runtime.
     *
     * @param key the key pointing to the associated value
     * @param defaultValue the default value which is returned in case there is no value associated
     *     with the given key
     * @return the (default) value associated with the given key
     */     
	public String getString(String key, String defaultValue) {
		...
    }


    /**
     * Adds the given key/value pair to the configuration object. We encourage users and developers
     * to always use ConfigOption for setting the configurations if possible, for its rich
     * description, type, default-value and other supports. The string-key-based setter should only
     * be used when ConfigOption is not applicable, e.g., the key is programmatically generated in
     * runtime.
     *
     * @param key the key of the key/value pair to be added
     * @param value the value of the key/value pair to be added
     */
    public void setString(String key, String value) {
        ...
    }
	
	...			
}

2.2.2 Introduce public <T> T get(ConfigOption<T> configOption, T overrideDefault)


    /**
     * Returns the value associated with the given config option as a T. If no value is mapped
     * under any key of the option, it returns the specified default instead of the option's default
     * value.
     *
     * @param configOption The configuration option
     * @param overrideDefault The value to return if no value was mapper for any key of the option
     * @return the configured value associated with the given config option, or the overrideDefault      
	 */
    @PublicEvolving
    public <T> T get(ConfigOption<T> configOption, T overrideDefault) {
        return getOptional(configOption).orElse(overrideDefault);
    }


`public <T> T get(ConfigOption<T> option, T overrideDefault)` can replace all old `getXxx(ConfigOption<Xxx> configOption, Xxx overrideDefault)` methods.

2.2.3 Deprecate some unnecessary setXxx and getXxx methods in Configuration

Changes:

  • Mark all public Xxx getXxx(ConfigOption<Xxx> configOption)  methods as @Deprecated, and remove them in 2.0. Such as:
    • public int getInteger(ConfigOption<Integer> configOption)
    • public String getString(ConfigOption<String> configOption)
    • public long getLong(ConfigOption<Long> configOption)
    • public long getFloat(ConfigOption<Float> configOption)
    • public long getDouble(ConfigOption<Double> configOption)
    • public boolean getBoolean(ConfigOption<Boolean> configOption)
  • Mark all public void setXxx(ConfigOption<Xxx> key, Xxx value)  methods as @Deprecated, and remove them in 2.0. Such as:
    • public void setInteger(ConfigOption<Integer> key, int value)
    • public void setString(ConfigOption<String> key, String value)
    • public void setLong(ConfigOption<Long> key, long value)
    • public void setDouble(ConfigOption<Double> key, double value)
    • public void setFloat(ConfigOption<Float> key, float value)
    • public void setBoolean(ConfigOption<Boolean> key, boolean value)
  • Mark all public Xxx getXxx(ConfigOption<Xxx> configOption, Xxx overrideDefault)  methods as @Deprecated, and remove them in 2.0. Such as:
    • public String getString(ConfigOption<String> configOption, String overrideDefault)
    • public long getLong(ConfigOption<Long> configOption, long overrideDefault)
    • public int getInteger(ConfigOption<Integer> configOption, int overrideDefault)
    • public double getDouble(ConfigOption<Double> configOption, double overrideDefault)
    • public float getFloat(ConfigOption<Float> configOption, float overrideDefault)
    • public boolean getBoolean(ConfigOption<Boolean> configOption, boolean overrideDefault)

Reason:

  • Configuration has a public <T> T get(ConfigOption<T> option)   and public <T> Configuration set(ConfigOption<T> option, T value)  method
  • These getXxx and setXxx methods can be replaced with get and set directly without any extra efforts.
  • get and set methods are easier to use than getXxx and setXxx
    • Callers can call get directly, and users or flink developers don't need to care about should they call getInteger or getString.
  • Flink code is easier to maintain.
  • T get(ConfigOption<T> option)  is designed later than Xxx getXxx(ConfigOption<Xxx> configOption) , I guess if T get(ConfigOption<T> option)  is designed first, all Xxx getXxx(ConfigOption<Xxx> configOption)  methods aren't needed.

3. Proposed Changes

  1. Migrate all usage of deprecated string key in ConfigConstants to ConfigOption.

  2. Remove all the internal usage of string configuration keys, e.g., DistributedCache, TaskConfig, StreamConfig, YarnClusterDescriptor, etc.

  3. Refactor all callers that using the public Xxx getXxx(ConfigOption<Xxx> configOption)  and public void setXxx(ConfigOption<Xxx> key, Xxx value) 

4. Compatibility, Deprecation, and Migration Plan

  • All the string configuration keys in the @Public class must marked as deprecated and replaced with ConfigOption in Flink 1.19

    • ConfigConstants

    • FileInputFormats

    • FileOutputFormats

    • BinaryInputFormats

    • BinaryOutputFormats

  • Updates to the Configuration class must be made in Flink 1.19 since it is @Public

  • In Flink 2.0, we will remove all the deprecated string configuration keys completely

5. Test Plan

The existing UT and IT should already cover all new ConfigOptions introduced.