Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This FLIP in few places is based on FLIP-54. In particular it assumes richer type support for ConfigOptions, and availability of ConfigurationReader#getOptional.

...


Proposed Changes

Methods to configure config objects

We suggest to add methods that enable to change the state of the three classes from a ConfigurationReader.

  • Add configure methods:
    • void ExecutionConfig.configure(ConfigurationReaderReadableConfig configuration, ClassLoader classLoader)
    • void CheckpointConfig.configure(ConfigurationReaderReadableConfig)
    • void StreamExecutionEnvironment.configure(ConfigurationReaderReadableConfig configuration, ClassLoader classLoader)

    Those methods would mutate internal state of only those parameters for which a corresponding option is present in the ConfigurationReader. The rest would be left intact. The method StreamExecutionEnvironment.configure would also call the two other methods.

E.g.

    public static final ConfigOption<ClosureCleanerLevel> CLOSURE_CLEANER_LEVEL =

       ConfigOptions.key("execpipeline.closure-cleaner-level")

           .enumType(ClosureCleanerLevel.class)

           .defaultValue(ClosureCleanerLevel.RECURSIVE);


void configure(ConfigurationReader conf) {

        ...

conf.getOptional(CLOSURE_CLEANER_LEVEL).ifPresent(level -> {

            this.closureCleanerLevel = level;

        })

        ...

}
  • Add three ConfigOptionSets:
    • JobOptions in flink-core (this will include all options from ExecutionConfig + bufferTimeout, isChainingEnabled, CacheFile from StreamExecutionEnvironment) 
    • CheckpointJobOptions in flink-streaming-java (we do not want to use the CheckpointingOptions to better differentiate from cluster-side options + CheckpointingMode & ExternalizedCheckpointCleanup are not available in flink-core)
    • StreamJobOptions in flink-streaming-java (for TimeCharacteristic) - at some point in the future we could merge with JobOptions, once we resolve dependencies (TimeCharacteristic is not in flink-core)

Proposed Changes

...



New ConfigOptions

  • In class org.apache.flink.configuration.PipelineOptions:
keysettingtypeExpected format
pipeline.auto-generate-uidsExecutionConfig#enableAutoGeneratedUidsConfigOption<Boolean>
pipeline.auto-type-registrationExecutionConfig#autoTypeRegistrationEnabledConfigOption<Boolean>
pipeline.auto-watermark-intervalExecutionConfig#autoWatermarkIntervalConfigOption<Duration>
pipeline.closure-cleaner-levelExecutionConfig#closureCleanerLevelConfigOption<ExecutionConfig.ClosureCleanerLevel>
pipeline.force-avroExecutionConfig#forceAvroConfigOption<Boolean>
pipeline.force-kryoExecutionConfig#forceKryoConfigOption<Boolean>
pipeline.generic-typesExecutionConfig#disableGenericTypesConfigOption<Boolean>
pipeline.global-job-parametersExecutionConfig#globalJobParametersConfigOption<Map<String, String>>
pipeline.max-parallelismExecutionConfig#maxParallelismConfigOption<Integer>
pipeline.object-reuseExecutionConfig#objectReuseConfigOption<Boolean>
pipeline.default-kryo-serializersExecutionConfig#defaultKryoSerializerClassesConfigOption<List<String>>

semicolon separated list of pairs of class names and serializers class names.

Example:

class:org.apache.flink.api.common.ExecutionConfigTest,serializer:org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1;class:org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1,serializer:org.apache.flink.api.common.ExecutionConfigTest$TestSerializer2

pipeline.registered-kryo-typesExecutionConfig#registeredKryoTypesConfigOption<List<String>>

semicolon separated list of class names

Example:

org.apache.flink.api.common.ExecutionConfigTest;org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1
pipeline.registered-pojo-typesExecutionConfig#registeredPojoTypesConfigOption<List<String>>

semicolon separated list of class names

Example:

org.apache.flink.api.common.ExecutionConfigTest;org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1
pipeline.operator-chainingStreamExecutionEnvironment#isChainingEnabledConfigOption<Boolean>
pipeline.cached-filesStreamExecutionEnvironment#cacheFileConfigOption<List<String>>

semicolon separated list of triples of cached file path, name, and executable flag(the executable flag is optional)

Example:

name:file1,path:/tmp1,executable:true;name:file2,path:/tmp2
  • In class org.apache.flink.streaming.api.environment.StreamPipelineOptions:

    keysettingtype
    pipeline.time-characteristicStreamExecutionEnvironment#timeCharacteristicConfigOption<TimeCharacteristic>


  • In class org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions

    keysettingtype
    execution.checkpointing.modeCheckpointConfig#checkpointingModeConfigOption<CheckpointingMode>
    execution.checkpointing.intervalCheckpointConfig#checkpointIntervalConfigOption<Duration>
    execution.checkpointing.timeoutCheckpointConfig#checkpointTimeoutConfigOption<Duration>
    execution.checkpointing.max-concurrent-checkpointsCheckpointConfig#maxConcurrentCheckpointsConfigOption<Integer>
    execution.checkpointing.min-pauseCheckpointConfig#minPauseBetweenCheckpointsConfigOption<Duration>
    execution.checkpointing.prefer-checkpoint-for-recoveryCheckpointConfig#preferCheckpointForRecoveryConfigOption<Boolean>
    execution.checkpointing.tolerable-failed-checkpointsCheckpointConfig#tolerableCheckpointFailureNumberConfigOption<Integer>
    execution.checkpointing.externalized-checkpointCheckpointConfig#externalizedCheckpointCleanupConfigOption<CheckpointConfig.ExternalizedCheckpointCleanup>


  • In class org.apache.flink.configuration.ExecutionOptions


    key settingtype
    execution.checkpointing.snapshot-compressionExecutionConfig#useSnapshotCompressionConfigOption<Boolean>
    execution.buffer-timeoutStreamExecutionEnvironment#bufferTimeoutConfigOption<Duration>


    • Where

Options not configurable from Configuration

  • ExecutionConfig
    • ExecutionConfig#executionMode
    • ExecutionConfig#numberOfExecutionRetries
    • ExecutionConfig#codeAnalysisMode
    • ExecutionConfig#executionRetryDelay
    • ExecutionConfig#failTaskOnCheckpointError
    • ExecutionConfig#defaultInputDependencyConstraint
    • ExecutionConfig#registeredTypesWithKryoSerializers
    • ExecutionConfig#registeredTypesWithKryoSerializerClasses
    • ExecutionConfig#defaultKryoSerializers
  • CheckpointConfig
    • CheckpointConfig#failOnCheckpointingErrors

Existing configuration options: 

There are at least three places where user can configure execution behavior on a per job case: ExecutionConfig, CheckpointConfig, StreamExecutionEnvironment. The following table lists all configuration parameters that are present in those classes. It also discussed suggested keys that we could use for those options.

...

ConfigOption<ExecutionMode>

Configuration

Comment

Suggested key

Type of the field

Suggested type of ConfigOption

Exists in 1.9 

autoTypeRegistration


execpipeline.auto-type-registration

boolean

ConfigOption<Boolean>

autoWatermarkInterval

We could use Duration for ConfigOption

execpipeline.auto-watermark-interval

long

ConfigOption<Duration>

closureCleaner


execpipeline.closure-cleaner-level

ClosureCleanerLevel

ConfigOption<ClosureCleanerLevel>

defaultInputDependencyConstraint

exec.input-dependency-constraint

Enum<InputDependencyConstraint>

ConfigOption<InputDependencyConstraint>

not supported




defaultKryoSerializerClasses

We should support only this version as it takes Class<? extends Serializer<?>>. This simplifies the serialization.

execpipeline.default-kryo-serializers

Map<Class, Class<Serializer<?>>>

ConfigOption<List<DefaultKryoSerializer>>, where DefaultKryoSerializer is a pojo

defaultKryoSerializers

We do not want to support this option as it stores SerializableSerializer. We see no point in configuring the serializers from Java-serialized strings in Base64 encoding.

not supported

Map<Class, SerializableSerializer<?>>



disableGenericTypes

we should invert the logic in the ConfigOption. true means enabled, false disabled

execpipeline.generic-types

boolean

ConfigOption<Boolean>

enableAutoGeneratedUIDs


execpipeline.auto-generated-uids

boolean

ConfigOption<Boolean>

executionMode

exec.exchange-mode

Enum<ExecutionMode>


not supported




executionRetryDelay/

numberOfExecutionRetries

Deprecated

not supported

long/int



forceAvro


execpipeline.force-avro

boolean

ConfigOption<Boolean>

forceKryo


execpipeline.force-kryo

boolean

ConfigOption<Boolean>

setGlobalJobParameters


execpipeline.global-job-parameters

Map<String, String>

ConfigOption<Map<String, String>>

latencyTrackingInterval


reuse the existing

long


metrics.latency.interval(MetricOptions)

setMaxParallelism


execpipeline.max-parallelism

int

ConfigOption<Integer>

objectReuse


execpipeline.object-reuse

boolean

ConfigOption<Boolean>

setParallelism


reuse the existing

int


parallelism.default(CoreOptions)

table.exec.resource.default-parallelism (ExecutionConfigOptions) - drop this Table specific option
printProgressDuringExecution







exec.sysout-logging

boolean

ConfigOption<Boolean>

registeredKryoTypes


execpipeline.registered-kryo-types

List<Class<?>>

ConfigOption<List<Class<?>>>

registeredPojoTypes


execpipeline.registered-pojo-types

List<Class<?>>

ConfigOption<List<Class<?>>>

registeredTypesWithKryoSerializers

We do not want to support this option as it stores SerializableSerializer. We see no point in configuring the serializers from MD5 encoding.

not supported

Map<Class, Serializer<?>>



registeredTypesWithKryoSerializerClasses

We should support only this version as it takes Class<? extends Serializer<?>>. This simplifies the serialization.


This option in the serialization uses ids of serializers.

not supported as part of this FLIP

Map<Class, Class<Serializer<?>>>



setRestartStrategy

On the client side we need to convert the option to RestartStrategyConfiguration. It does not support custom restart strategy. We would also flatten the configuration structure.exec.restart-strategy

reuse the existing

RestartStrategy


restart-strategy(ConfigConstants) (not a proper ConfigOption yet)

taskCancellationIntervalMillis


reuse the existing

long


task.cancellation.interval (TaskManagerOptions)

taskCancellationTimeoutMillis


reuse the existing

long


task.cancellation.timeout (TaskManagerOptions)

useSnapshotCompression


execexecution.checkpointing.snapshot-compression

boolean

ConfigOption<Boolean>

printProgressDuringExecution

Deprecated, has no effect

not supported




codeAnalysisMode

Deprecated, has no effect

not supported

---



failTaskOnCheckpoint

Deprecated

not supported

boolean



...

Configuration

Comment

Suggested key

Type

Suggested type of ConfigOption

Exists in 1.9 

checkpointingMode


execution.checkpointing.mode

Enum<CheckpointingMode>

ConfigOption<CheckpointingMode>

checkpointInterval

We could use Duration for ConfigOption

execution.checkpointing.interval

long

ConfigOption<Duration>

checkpointTimeout

We could use Duration for ConfigOption

execution.checkpointing.timeout

long

ConfigOption<Duration>

externalizedCheckpointCleanup


execution.checkpointing.externalized-checkpoint-mode

Enum<ExternalizedCheckpointCleanup>

ConfigOption<ExternalizedCheckpointCleanup>

failOnCheckpointingErrors

this is deprecated

not supported

boolean



forceCheckpointing

this is deprecated

not supported

boolean



maxConcurrentCheckpoints


execution.checkpointing.max-concurrent-checkpoints

int

ConfigOption<Integer>

minPauseBetweenCheckpoints

We could use Duration for ConfigOption

execution.checkpointing.min-pause

long 

ConfigOption<Duration>

preferCheckpointForRecovery


execution.checkpointing.prefer-checkpoint-over-savepoint

boolean

ConfigOption<Boolean>

tolerableCheckpointFailureNumber


execution.checkpointing.tolerable-checkpoint-failures

int

ConfigOption<Integer>

...

Configuration

Comment

Suggested key

Type

Suggested type of ConfigOption

Exists in 1.9 

timeCharacteristic


execpipeline.time-characteristic

Enum<StreamTimeCharacteristic>

ConfigOption<StreamTimeCharacteristic>

defaultStateBackend


reuse options

StateBackend


state.backend

(CheckpointingOptions)

isChainingEnabled


execpipeline.operator-chaining

boolean

ConfigOption<Boolean>

bufferTimeout

We could use Duration for ConfigOption

We could move to ExecutionConfig

execexecution.buffer-timeout

long

ConfigOption<Duration>

cachedFile

We

execpipeline.cached-files

<String, String, Boolean>

filePath/fileName/Executable

ConfigOption<List<CachedFile>>, where CachedFile is a pojo

...