Status

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

`ConfigOption` and `Configuration` are crucial parts of the Flink project because every component in the stack needs possibilities of parameterization.

Ideally, every parameterization should also be persistable in a config file and changeable programmatically or in a CLI session via string properties. If one takes a look at the currently defined config options, there are many inconsistencies and shortcomings such as:

  • A user does not know the expected data type of an option or allowed values. If the description is not good enough, an option is difficult to configure.
  • Many components have implemented custom validation logic to perform common tasks such as checking for >= 0.
  • Many components have implemented custom parsing logic to perform common tasks such as list splitting or duration parsing.
  • List separators are not used consistently: sometimes comma sometimes semicolon.
  • Parsing of durations uses Scala classes.
  • List of tuples or list of objects cannot be stored easily.
  • Options that depend on each other need validation logic somewhere in the code.
  • There is no concept of optional properties which means implementers come up with "fallback" values such as "parallelism of -1" means fallback to parallelism defined in flink-conf.yaml.
  • Eager validation is not supported yet.

Currently, classes such as `o.a.f.table.descriptors.DescriptorProperties` are symptoms of the root cause of missing functionality in Flink's configuration capabilities.

We should evolve ConfigOption and Configuration to replace DescriptorProperties and have a unified configuration for Flink from higher levels (e.g. SQL Client CLI) over core classes (e.g. new Executor) down to connectors (e.g. Kafka with JSON format).

Public Interfaces

List of new interfaces:

  • OptionBuilder#intType(...)/stringType(...)/...
  • TypedConfigOptionBuilder, ListConfigOptionBuilder
  • ConfigurationReader/ConfigurationWriter, ConfigurableFactory
  • Configuration implements ConfigurationReader/ConfigurationWriter thus receives new get(...)/getOptional(...)/#set(...)
  • ConfigOption#withValidator(...)/validate(...)/withExtendedDescription(...)
  • OptionValidators, OptionValidator
  • ConfigOptionSet, OptionSetValidator, OptionSetValidators

Proposed Changes

Overview

Because config options are already used at a couple of places in the code base, we aimed to minimize the amount of changes necessary while enriching a config option with more declarative information.

Example:

ConfigOptions.key("key")
.intType()
.defaultValue(12);

Proposed changes to ConfigOption:

In order for ConfigOption to contain information about the class it describes, we should add two additional fields to ConfigOption:

    private final Class atomicClass;

    private final boolean isList;

The atomicClass field describes the atomic type that this ConfigOption describes. There are 5 cases:

  • atomicClass == e.g. Integer.class -> ConfigOption<Integer>
  • atomicClass == Map.class -> ConfigOption<Map<String, String>>
  • atomicClass == ? extends ConfigurableFactory<T> -> ConfigOption<T>
  • atomicClass == Class.class -> for ConfigOption<Class<?>>
  • atomicClass == e.g. Integer.class & isList = true for ConfigOption<List<Integer>>

This way we can describe all necessary types without backwards incompatible changes to the ConfigOption class

Proposed New Builder Pattern:

The current builder pattern in ConfigOptions is not expressive enough to define a type, a list of types, or a type describing a class or enum. We suggest to introduce a new builder that can be accessed via:


ConfigOptions.key("key")


The entire builder is defined as:

public static class OptionBuilder {
   	 private final String key;

   	 OptionBuilder(String key) {
   		 this.key = key;
   	 }

   	 TypedConfigOptionBuilder<Integer> intType() {
   		 return new TypedConfigOptionBuilder<>(key, Integer.class);
   	 }

   	 TypedConfigOptionBuilder<String> stringType() {
   		 return new TypedConfigOptionBuilder<>(key, String.class);
   	 }

   	 TypedConfigOptionBuilder<Duration> durationType() {
   		 return new TypedConfigOptionBuilder<>(key, Duration.class);
   	 }

   	 TypedConfigOptionBuilder<Map<String, String>> propertiesType() {
   		 return new TypedConfigOptionBuilder<>(key, Map.class);
   	 }

   	 <T> TypedConfigOptionBuilder<T> enumType(Class<T extends Enum<T>> clazz) {
   		 return new TypedConfigOptionBuilder<>(key, clazz);
   	 }
   	 
   	 // All supported atomic types: Boolean, Integer, Long, Double, Float, String, Duration, MemorySize, Enum, Properties(Map<String, String>)


   	 <T> TypedConfigOptionBuilder<T> configurableType(Class<? extends ConfigurableFactory<T>> clazz) {
   return new TypedConfigOptionBuilder<>(key, clazz);
}

   	 <T> TypedConfigOptionBuilder<Class<T>> classType(Class<T> clazz) {
   		 return new TypedConfigOptionBuilder<>(key, Class.class);
   	 }

    	/**
   	  * Creates a ConfigOption with the given default value.
   	  *
   	  * <p>This method does not accept "null". For options with no default value, choose
   	  * one of the {@code noDefaultValue} methods.
   	  *
   	  * @param value The default value for the config option
   	  * @param <T> The type of the default value.
   	  * @return The config option with the default value.
   	  */
   	 @Deprecated
   	 public <T> ConfigOption<T> defaultValue(T value) {
   		 checkNotNull(value);
   		 return new ConfigOption<>(key, value);
   	 }

   	 /**
   	  * Creates a string-valued option with no default value.
   	  * String-valued options are the only ones that can have no
   	  * default value.
   	  *
   	  * @return The created ConfigOption.
   	  */
   	 @Deprecated
   	 public ConfigOption<String> noDefaultValue() {
   		 return new ConfigOption<>(key, null);
   	 }

    }

    public static class TypedConfigOptionBuilder<T> {
   	 private final String key;
   	 private final Class clazz;

   	 TypedConfigOptionBuilder(String key, Class clazz) {
   		 this.key = key;
   		 this.clazz = clazz;
   	 }

   	 public ListConfigOptionBuilder<T> asList() {
   		 return new ListConfigOptionBuilder<>(key, clazz);
   	 }

   	 public ConfigOption<T> defaultValue(T value) {
   		 return new ConfigOption<>(
   			 key,
   			 clazz,
   			 false,
   			 Description.builder().text("").build(),
   			 value,
   			 EMPTY);
   	 }

   	 public ConfigOption<T> noDefaultValue() {
   		 return new ConfigOption<>(
   			 key,
   			 clazz,
   			 false,
   			 Description.builder().text("").build(),
   			 null,
   			 EMPTY);
   	 }
    }

    public static class ListConfigOptionBuilder<T> {
   	 private final String key;
   	 private final Class clazz;

   	 ListConfigOptionBuilder(String key, Class clazz) {
   		 this.key = key;
   		 this.clazz = clazz;
   	 }

   	 @SafeVarargs
   	 public final ConfigOption<List<T>> defaultValues(T... values) {
   		 return new ConfigOption<>(
   			 key,
   			 clazz,
   			 true,
   			 Description.builder().text("").build(),
   			 Arrays.asList(values),
   			 EMPTY);
   	 }

   	 public ConfigOption<List<T>> noDefaultValue() {
   		 return new ConfigOption<>(
   			 key,
   			 clazz,
   			 true,
   			 Description.builder().text("").build(),
   			 null,
   			 EMPTY);
   	 }
    }

We will deprecate two methods on OptionBuilder#noDefaultValue & OptionBuilder#defaultValue as they do not define the option type properly.

Proposed changes to Configuration:

We suggest to introduce new interfaces & make Configuration extend from it:

interface ConfigurationReader {

    T get(ConfigOption<T> configOption);

    Optional<T> getOptional(ConfigOption<T> configOption);

}

We will not support nullability but we distinguish between an option that is present or not. This is necessary e.g. for handling fallback options. Those would return Optional.empty() in case the backing map does not contain the given key, or the value of the given key is null.

interface ConfigurationWriter {

    ConfigurationWriter set(ConfigOption<T> configOption, T value)

}

class Configuration implements ConfigurationReader, ConfigurationWriter

Option Validation

A validation logic allows to verify ConfigOptions early (e.g. during start-up) and perform validation at a central location instead of each caller verifying the option again and again.

Proposed changes to Configuration:


A validation is performed when getting or setting a ConfigOption in ConfigurationReader's new get(ConfigOption) and set(ConfigOption) methods.

We will not perform validation in the old getters and setters of Configuration for backwards-compatibility. It could otherwise mess up exception assumptions at other code locations (e.g. during TaskManager start-up) if validation will be added to existing ConfigOptions in the future.

Proposed changes to ConfigOption:


/**

 * Creates a new config option, using this option's attributes, and adding the given validator.

 *

 * <p>The validator is used when accessing (reading and writing) the value from the configuration.

 *

 * @see OptionValidators

 *

 * @param validator The validator for this option.

 * @return A new config option with the given validator.

 */

public ConfigOption<T> withValidator(OptionValidator<T> validator)

/**

 * Validates a value for this option and always returns a valid value or throws an InvalidOptionException with explanation.

 */

public T validate(T value)


Proposed interface: OptionValidator


/**

 * Validator logic for a {@link ConfigOption}.

 *

 * <p>A validator is used when accessing (reading and writing) the value from the configuration.

 */

@PublicEvolving

public interface OptionValidator<T> {

/**

* Validates the value of a {@link ConfigOption}.

*

* @param configOption The config option that is currently being validated.

* @param value The config option's value to be validated.

* @return True if option is valid, false otherwise.

*/

boolean validate(ConfigOption<T> configOption, T value);

/**

* Returns an explanatory string. It is used for printing exception as well as generating

* documentation.

*

* <p>For consistency, use camel-case syntax and square brackets for nesting.

*

* <p>For example, {@code or[values[1, 2, 3], values[5]]}.

*/

String explain();

}

Proposed utilities: OptionValidators


/**

 * Returns a {@link OptionValidator} that always passes.

 */

public static <T> OptionValidator<T> alwaysPassing()

/**

 * Returns a {@link OptionValidator} for checking if the value is contained in the list of values.

 */

public static <T> OptionValidator<T> values(T... values)

/**

 * Returns a {@link OptionValidator} for checking if a {@link Comparable} is within its bounds (both

 * inclusive).

 */

public static <T extends Comparable<T>> OptionValidator<T> between(T min, T max)

/**

 * Returns a {@link OptionValidator} that checks that all given validators succeed.

 */

public static <T> OptionValidator<T> and(OptionValidator<T>... validators)

/**

 * Returns a {@link OptionValidator} that checks that at least one given validator succeeds.

 */

public static <T> OptionValidator<T> or(OptionValidator<T>... validators)

/**

 * Returns a {@link OptionValidator} that checks that the given validator does not succeed.

 */

public static <T> OptionValidator<T> not(OptionValidator<T> validators)

Option Description Extension

Some options are used across modules and are accessed by different layers. This results in the following issues:

  1. In different layers, an option might get additional or slightly different semantics. For example, a key `parallelism` defined in flink-core is used in DataStream API but also in Table API. The semantics are almost the same but maybe setting this property in Table API has an additional side-effect that should be documented. Instead of polluting the flink-core, we could extend the option.
  2. Some options might be used from other layers but it is difficult to tell users which option need special attention in the long list of configuration options. For example, it would be nice to link from Table API to some important core options.

Example:

public static final ConfigOption<Integer> MAX_PARALLELISM =
	CoreOptions.MAX_PARALLELISM.withExtendedDescription(
		"Note: That this property means that a table program has a side-effect XYZ.");

Proposed changes to ConfigOption:


/**
 * Creates a new config option, using this option's attributes, and extending its description by 
 * the given description.
 *
 * <p>The extended description is highlighted when generation the configuration documentation.
 *
 * @param description The description for this option.
 * @return A new config option, with given description.
 */
public ConfigOption<T> withExtendedDescription(final String description)

Object Options

Depending on the configuration, it might be necessary to introduce an additional level of nesting for a list of grouped options. For example, List<Tuple3<String, String, Boolean>> for cached files in ExecutionConfig or List<ElasticsearchUpsertTableSinkBase.Host> for the Elasticsearch connector or Map<Class, Class>.

We propose the following structure similar to Hadoop or Kafka for introducing configurable objects.

Note: We will not support another level of nesting. Having nested object enables basically bypassing the main configuration structure. However, one level of nesting might be necessary esp. for a list of objects.

Proposed interface: ConfigurableFactory

Instead of allowing Tuple3 or anonymous Maps, we propose a new interface that allows constructing a named POJO that can be immutable.

/**
 * Allows to read and write an instance from and to {@link Configuration}. A configurable instance
 * operates in its own key space in {@link Configuration} and will be (de)prefixed by the framework. It cannot access keys from other options. A factory must have a default constructor.
 *
 */
public interface ConfigurableFactory<T> {

	/**
	 * Creates an instance from the given configuration.
	 */
	T fromConfiguration(ConfigurationReader configuration);

	/**
	 * Writes this instance to the given configuration.
	 */
	void toConfiguration(T value, ConfigurationWriter configuration);
}

Proposed changes to documentation generation:

We need to know which keys an object owns, the default values, and description for documentation.

We suggest that the documentation generation would follow a ConfigurableFactory class of an option and perform the same reflection-based lookup after public static options within the configurable class.

Example:

public static final ConfigOption<CachedFile> CACHED_FILE =
		key("cached-file")
			.configurableType(CachedFileFactory.class)
.defaultValue(new CachedFile("empty", "empty", false));

class CachedFileFactory implements ConfigurableFactory {
public static final ConfigOption<String> PATH = key("path").defaultValue("path"); public static final ConfigOption<String> FILE = key("file").defaultValue("file"); public static final ConfigOption<Boolean> FLAG = key("flag").defaultValue(true); // call regular config options and assign fields: // fromConfiguration()/toConfiguration()
} class CachedFile implements Configurable { private String path; private String file; private Boolean flag; public CachedFile(String path, String file, String flag) { } }

This would result in the following key-value Configuration:

path: path

file: file

flag: true

The framework converts it to a flattened representation:

cached-file: path=path,file=file,flag=true

For escaping both key and value, they can be surrounded by single quotes or double quotes for escaping reserved characters (;,:=whitespace). Two following quotes escape the quote itself.

cached-file: path="Bob's path",file='Bob''s path',flag=true

For serializing and deserializing configurable objects, we would introduce a new serialization type in Configuration that is serialized in the following format:


TYPE_CONFIGURABLE | clazz.getName() | serialized configuration

List Options

The ObjectOption section mentioned the need for having a list of objects like List<ElasticsearchUpsertTableSinkBase.Host>. We suggest adding the possibility of lists that play nicely together with objects but can also be used on their own.

While objects use comma as a separator, lists use semicolon. The escaping logic is defined in the object section.

Example:

public static final ConfigOption<List<String>> PATHS =
	ConfigOption.key("paths")
		.stringType()
		.asList()
		.defaultValue(List.of("/usr/bin", "/tmp/bin"));

public static final ConfigOption<List<CachedFile>> CACHED_FILES =
	ConfigOption.key("cached-files")
		.configurableType(CashedFile.class)
		.asList()
                .defaultValues(new CashedFile(), new CashedFile());

The string representation of those examples would look like:

paths: /usr/bin;/tmp/bin

cached-files: path=path,file=file,flag=true;path=path,file=file,flag=true

For serializing and deserializing lists, we would introduce a new serialization type in Configuration that is serialized in the following format:


TYPE_LIST | length | like serialized configuration without key

Rejected Alternatives:

Alternative 1:

cached-files.0.file=a0

cached-files.0.path=b0

cached-files.0.other=c0

cached-files.1.file=a1

cached-files.1.path=b1

cached-files.1.other=c1

Pros:

- already used in DescriptorProperties

- easy to define manually

Cons:

- Users need to keep track of the indices

- The key space is not constant. Validation of keys would require prefix magic and wildcards. Like in TableFactories: `cached-files.#.file.*`

- An object spans multiple keys and cannot be set in one CLI `SET` command.

Alternative 2:

cached-files=[{file: a0, path: b0, other: c0}, {file: a1, path: b1, other: c1}]

Pros:

- Uses the JSON standard

- easy to define manually

- entire (nested) object under a common key

Cons:

- Avoids the actual property map and basically opens the gate for complex configuration that is hard to validate and to document.

- Problems with escaping but little because defined by the JSON standard.

Alternative 3:

cached-files.file=a0,a1

cached-files.path=b0,b1

cached-files.other=c0,c1

Pros:

- Fits nicely into a property map

- Is based on ConfigOptions for upper level key and sublevel keys and can be easily validated and documented.

Cons:

- Not easy to declare manually e.g. a key-value map would be split and a user would need to ensure that the length of both list is consistent

- Problems with escaping

Duration Options

We suggest to add native support for ConfigOption<java.time.Duration>:

ConfigOption<Duration> option = ...

Duration size = conf.get(option)


For serializing and deserializing memory size, we would introduce a new serialization type in Configuration that is serialized in the following format:

TYPE_DURATION | <long, number of seconds> | <int, number of nanos>

The built-in string format of java.time.Duration (e.g. PT0.020S for “20ms”) is not user friendly and really hard to specify manually. Therefore we suggest to introduce custom parsing logic for string representation. We suggest to add a logic that allows for writing the duration value and unit in which it is given, e.g. 20ms. But at the same time we should also support the original format.

The format should look like “d+w*[unit]”, where unit is one of [“ns”, “us”, “ms”, “s”, “m”, “min”, “h”, “d”]. Whitespaces are ignored. We will use and extend `org.apache.flink.util.TimeUtils` for this purposes.

Memory Size Options

We should add native support for ConfigOption<MemorySize>:

ConfigOption<MemorySize> option = ...

MemorySize size = conf.get(option)


For serializing and deserializing memory size, we would introduce a new serialization type in Configuration that is serialized in the following format:TYPE_MEMORY_SIZE | <long, number of bytes>

For the string representation we would reuse the parsing logic from new MemorySize().

The format is as follows “d+ [unit]”, where unit is one of [“b”, “bytes”, “k”, “kb”, "kibibytes", "m", "mb", "mebibytes", "g", "gb", "gibibytes", "t", "tb", "tebibytes"].

Option Sets and Set Validation

Many options depend on each other and have a correlation. For example:

  • two integer option keys `A` and `B` should have a minimum distance of `A - B = 5`
  • config key X > 0 is only valid if strategy enum Z is set to "NEVER", etc.


We suggest to perform the option set check at a central location and next to the options itself. Similar to validation, it also allows to print helpful exceptions and documentation.

Proposed class ConfigOptionSet:

`ConfigOptionSet` is the new top-level class for classes such as `BlobServerOptions` or `ExecutionConfigOptions`.

It enables accessing all defined options and cross-option validation. We can provide a static utility function that instantiates the set and calls `validate(conf)` for eager set validation.

/** Class for a set of {@link ConfigOption}s. */
class ConfigOptionSet {

	// make option set classes instantiable and assume
	// default constructor

	/**
	 * Returns all options of this set.
	 */
	public List<ConfigOption<?>> getOptions(){
		// uses reflection by default
	}

	/**
	 * Returns all cross-option validators defined for this set.
	 */
	public List<SetValidator> getSetValidators() {
		// empty by default
	}

	/**
	 * Validates this set of options in the given configuration.
	 */
	public void validate(ConfigurationReader reader) {
		// calls validate on all options
		// calls validate on all set validators
	}
}

Proposed interface SetValidator:


/**
 * Validates a set of {@link ConfigOption}s in {@link ConfigOptionSet}.
 */
public interface OptionSetValidator {

	/**
	 * Validates and potentially correlates options stored in a {@link Configuration}.
	 *
	 * @param configuration Gives read-only access to the configuration.
	 * @return True if option is valid; false otherwise.
	 */
	boolean validate(ConfigurationReader configuration);

	/**
	 * Returns an explanatory string. It is used for printing exception as well as generating
	 * documentation.
	 *
	 * <p>For consistency, use camel-case syntax and square brackets for nesting.
	 *
	 * <p>For example, {@code minLongDelta[“min-retention”, ”max-retention”, 5]}.
	 */
	String explain();
}

Proposed utilities: OptionSetValidators

We can offer a set of predefined validators such as `minLongDelta` that would verify the distance between two separate config options.

Rejected Alternative:

Example:


Defining a correlation is a chicken-or-egg problem because it requires the config options to be already defined while defining a correlation for those config options. A proxy allows to set the actual correlation once.

private static final ProxyCorrelation idleStateCorrelation = OptionCorrelation.proxy();

public static final ConfigOption<Long> IDLE_STATE_MIN_RETENTION =
	key("idle-state.min-retention")
		.defaultValue(0L)
		.withCorrelation(idleStateCorrelation);

public static final ConfigOption<Long> IDLE_STATE_MAX_RETENTION =
	key("idle-state.max-retention")
		.defaultValue(0L)
		.withCorrelation(idleStateCorrelation);

static {
	idleStateCorrelation.fill(
		PredefinedCorrelation.minLongDelta(
		IDLE_STATE_MIN_RETENTION,
		IDLE_STATE_MAX_RETENTION,
		5000));
}

Proposed changes to Configuration:

A correlation check is performed when getting a ConfigOption in Configuration. Setting is not checked because users might need to set a couple of options that are temporarily invalid before.

Proposed changes to ConfigOption:

/**
 * Creates a new config option, using this option's attributes, and adding the given correlation.
 *
 * <p>The correlation is checked when reading the value from the configuration.
 *
 * @see PredefinedCorrelations
 *
 * @param correlation The correlation for this option to other options.
 * @return A new config option with the given validator.
 */
public ConfigOption<T> withCorrelation(OptionCorrelation correlation)

/**
 * Validates a value for this option and always returns a valid value or throws an InvalidOptionException with explanation.
 *
 * <p>This method includes validating the correlation to other options in the given {@link Configuration}.
 */
public T validate(ConfigurationReader configuration, T value)

Proposed interface: OptionCorrelation

/**
 * Checks the correlation among {@link ConfigOption}.
 *
 */
@PublicEvolving
public interface OptionCorrelation {

	/**
	 * Correlates options stored in a {@link Configuration}.
	 *
	 * @param configuration Gives read-only access to the configuration.
	 * @return True if option is valid; false otherwise.
	 */
	boolean correlate(ConfigurationReader configuration);

	/**
	 * Returns an explanatory string. It is used for printing exception as well as generating
	 * documentation.
	 *
	 * <p>For consistency, use camel-case syntax and square brackets for nesting.
	 *
	 * <p>For example, {@code minLongDelta[“min-retention”, ”max-retention”, 5]}.
	 */
	String explain();

	/**
	 * Placeholder {@link OptionCorrelation} for defining an immutable {@link ConfigOption} and the actual
	 * correlation afterwards.
	 */
	static ProxyConfiguration proxy() {
		return new ProxyConfiguration();
	}
}

Documentation changes

We suggest to extend the documentation generator with Type column that will describe the expected type & optionally constraints of the values. We did not want to introduce yet another column as we are already short on table width. We thought we can reuse the "Type" column for type with its restrictions.

We add group validation constraints to the end of the table:

Key

Type

Default

Description

key1

MemorySize

greaterThan[1024m]

(none)

description of key1

key2

MemorySize

1024m

description of key2


Group constraints:
- minSizeDelta[key1, key2, “512m”]

Compatibility, Deprecation, and Migration Plan

  • All existing config options are still valid and have no changed behavior
  • Deprecated ConfigOption#defaultValue(...)/noDefaultValue

Implementation Plan

Each feature section can be a separate commit or issue. Such as:

  • New typed ConfigOption with builder pattern
  • Option validation
  • Option description extension
  • Object options
  • Lists
  • Duration
  • Memory Size
  • Option set validation

Test Plan

The implementation can be tested with unit tests for every new feature section listed in Proposed Changes.

Rejected Alternatives

See corresponding feature sections.