Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

...


Discussion thread

Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-59-Enable-execution-configuration-from-Configuration-object-td32359.html

JIRA:

...


Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-14785

Release1.10


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

This FLIP in few places is based on FLIP-54. In particular it assumes richer type support for ConfigOptions, and availability of ConfigurationReader#getOptional.

...


Proposed Changes

Methods to configure config objects

We suggest to add methods that enable to change the state of the three classes from a ConfigurationReader.

  • Add configure methods:
    • void ExecutionConfig.configure(ConfigurationReaderReadableConfig configuration, ClassLoader classLoader)
    • void CheckpointConfig.configure(ConfigurationReaderReadableConfig)
    • void StreamExecutionEnvironment.configure(ConfigurationReaderReadableConfig configuration, ClassLoader classLoader)

    Those methods would mutate internal state of only those parameters for which a corresponding option is present in the ConfigurationReader. The rest would be left intact. The method StreamExecutionEnvironment.configure would also call the two other methods.

E.g.

    public static final ConfigOption<ClosureCleanerLevel> CLOSURE_CLEANER_LEVEL =

       ConfigOptions.key("execpipeline.closure-cleaner-level")

           .enumType(ClosureCleanerLevel.class)

           .defaultValue(ClosureCleanerLevel.RECURSIVE);


void configure(ConfigurationReader conf) {

        ...

conf.getOptional(CLOSURE_CLEANER_LEVEL).ifPresent(level -> {

            this.closureCleanerLevel = level;

        })

        ...

}
  • Add three ConfigOptionSets:
    • JobOptions in flink-core (this will include all options from ExecutionConfig + bufferTimeout, isChainingEnabled, CacheFile from StreamExecutionEnvironment) 
    • CheckpointJobOptions in flink-streaming-java (we do not want to use the CheckpointingOptions to better differentiate from cluster-side options + CheckpointingMode & ExternalizedCheckpointCleanup are not available in flink-core)
    • StreamJobOptions in flink-streaming-java (for TimeCharacteristic) - at some point in the future we could merge with JobOptions, once we resolve dependencies (TimeCharacteristic is not in flink-core)

Proposed Changes

...


New ConfigOptions

  • In class org.apache.flink.configuration.PipelineOptions:
keysettingtypeExpected format
pipeline.auto-generate-uidsExecutionConfig#enableAutoGeneratedUidsConfigOption<Boolean>
pipeline.auto-type-registrationExecutionConfig#autoTypeRegistrationEnabledConfigOption<Boolean>
pipeline.auto-watermark-intervalExecutionConfig#autoWatermarkIntervalConfigOption<Duration>
pipeline.closure-cleaner-levelExecutionConfig#closureCleanerLevelConfigOption<ExecutionConfig.ClosureCleanerLevel>
pipeline.force-avroExecutionConfig#forceAvroConfigOption<Boolean>
pipeline.force-kryoExecutionConfig#forceKryoConfigOption<Boolean>
pipeline.generic-typesExecutionConfig#disableGenericTypesConfigOption<Boolean>
pipeline.global-job-parametersExecutionConfig#globalJobParametersConfigOption<Map<String, String>>
pipeline.max-parallelismExecutionConfig#maxParallelismConfigOption<Integer>
pipeline.object-reuseExecutionConfig#objectReuseConfigOption<Boolean>
pipeline.default-kryo-serializersExecutionConfig#defaultKryoSerializerClassesConfigOption<List<String>>

semicolon separated list of pairs of class names and serializers class names.

Example:

class:org.apache.flink.api.common.ExecutionConfigTest,serializer:org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1;class:org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1,serializer:org.apache.flink.api.common.ExecutionConfigTest$TestSerializer2

pipeline.registered-kryo-typesExecutionConfig#registeredKryoTypesConfigOption<List<String>>

semicolon separated list of class names

Example:

org.apache.flink.api.common.ExecutionConfigTest;org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1
pipeline.registered-pojo-typesExecutionConfig#registeredPojoTypesConfigOption<List<String>>

semicolon separated list of class names

Example:

org.apache.flink.api.common.ExecutionConfigTest;org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1
pipeline.operator-chainingStreamExecutionEnvironment#isChainingEnabledConfigOption<Boolean>
pipeline.cached-filesStreamExecutionEnvironment#cacheFileConfigOption<List<String>>

semicolon separated list of triples of cached file path, name, and executable flag(the executable flag is optional)

Example:

name:file1,path:/tmp1,executable:true;name:file2,path:/tmp2
  • In class org.apache.flink.streaming.api.environment.StreamPipelineOptions:

    keysettingtype
    pipeline.time-characteristicStreamExecutionEnvironment#timeCharacteristicConfigOption<TimeCharacteristic>


  • In class org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions

    keysettingtype
    execution.checkpointing.modeCheckpointConfig#checkpointingModeConfigOption<CheckpointingMode>
    execution.checkpointing.intervalCheckpointConfig#checkpointIntervalConfigOption<Duration>
    execution.checkpointing.timeoutCheckpointConfig#checkpointTimeoutConfigOption<Duration>
    execution.checkpointing.max-concurrent-checkpointsCheckpointConfig#maxConcurrentCheckpointsConfigOption<Integer>
    execution.checkpointing.min-pauseCheckpointConfig#minPauseBetweenCheckpointsConfigOption<Duration>
    execution.checkpointing.prefer-checkpoint-for-recoveryCheckpointConfig#preferCheckpointForRecoveryConfigOption<Boolean>
    execution.checkpointing.tolerable-failed-checkpointsCheckpointConfig#tolerableCheckpointFailureNumberConfigOption<Integer>
    execution.checkpointing.externalized-checkpointCheckpointConfig#externalizedCheckpointCleanupConfigOption<CheckpointConfig.ExternalizedCheckpointCleanup>


  • In class org.apache.flink.configuration.ExecutionOptions


    keysettingtype
    execution.checkpointing.snapshot-compressionExecutionConfig#useSnapshotCompressionConfigOption<Boolean>
    execution.buffer-timeoutStreamExecutionEnvironment#bufferTimeoutConfigOption<Duration>


Options not configurable from Configuration

  • ExecutionConfig
    • ExecutionConfig#executionMode
    • ExecutionConfig#numberOfExecutionRetries
    • ExecutionConfig#codeAnalysisMode
    • ExecutionConfig#executionRetryDelay
    • ExecutionConfig#failTaskOnCheckpointError
    • ExecutionConfig#defaultInputDependencyConstraint
    • ExecutionConfig#registeredTypesWithKryoSerializers
    • ExecutionConfig#registeredTypesWithKryoSerializerClasses
    • ExecutionConfig#defaultKryoSerializers
  • CheckpointConfig
    • CheckpointConfig#failOnCheckpointingErrors

Existing configuration options: 

There are at least three places where user can configure execution behavior on a per job case: ExecutionConfig, CheckpointConfig, StreamExecutionEnvironment. The following table lists all configuration parameters that are present in those classes. It also discussed suggested keys that we could use for those options.

ExecutionConfig:


Configuration

Comment

Suggested key

Type of the field

Suggested type of ConfigOption

Exists in 1.9 

autoTypeRegistration

exec

pipeline.auto-type-registration

boolean

ConfigOption<Boolean>

autoWatermarkInterval

We could use Duration for ConfigOption

exec

pipeline.auto-watermark-interval

long

ConfigOption<Duration>

closureCleaner

exec

table.exec.shuffle-mode

(ExecutionConfigOptions) - drop this option

pipeline.closure-cleaner-level

ClosureCleanerLevel

ConfigOption<ClosureCleanerLevel>

defaultInputDependencyConstraint

exec.input-dependency-constraint

Enum<SuffleMode>


not supported




defaultKryoSerializerClasses

We should support only this version as it takes Class<? extends Serializer<?>>. This simplifies the serialization.

exec

pipeline.default-kryo-serializers

Map<Class, Class<Serializer<?>>>

ConfigOption<List<DefaultKryoSerializer>>, where DefaultKryoSerializer is a pojo

defaultKryoSerializers

We do not want to support this option as it stores SerializableSerializer. We see no point in configuring the serializers from Java-serialized strings in Base64 encoding.

not supported

Map<Class, SerializableSerializer<?>>



disableGenericTypes

we should invert the logic in the ConfigOption. true means enabled, false disabled

exec

pipeline.generic-types

boolean

ConfigOption<Boolean>

enableAutoGeneratedUIDs

exec

pipeline.auto-generated-uids

boolean

ConfigOption<Boolean>

executionMode

exec.mode

Enum<ExecutionMode>

not supported




executionRetryDelay/

numberOfExecutionRetries

Deprecated

not supported

long/int



forceAvro

exec

pipeline.force-avro

boolean

ConfigOption<Boolean>

forceKryo

exec

pipeline.force-kryo

boolean

ConfigOption<Boolean>

setGlobalJobParameters

exec

pipeline.global-job-parameters

Map<String, String>

ConfigOption<Map<String, String>>

latencyTrackingInterval


reuse the existing

long


metrics.latency.interval(MetricOptions)

setMaxParallelism

exec

pipeline.max-parallelism

int

ConfigOption<Integer>

objectReuse

exec

pipeline.object-reuse

boolean

ConfigOption<Boolean>

setParallelism


reuse the existing

int


parallelism.default(CoreOptions)

table.exec.resource.default-parallelism (ExecutionConfigOptions) - drop this Table specific option


printProgressDuringExecution

exec.sysout-logging

boolean








registeredKryoTypes

exec

pipeline.registered-kryo-types

List<Class<?>>

ConfigOption<List<Class<?>>>

registeredPojoTypes

exec

pipeline.registered-pojo-types

List<Class<?>>

ConfigOption<List<Class<?>>>

registeredTypesWithKryoSerializers

We do not want to support this option as it stores SerializableSerializer. We see no point in configuring the serializers from MD5 encoding.

not supported

Map<Class, Serializer<?>>



registeredTypesWithKryoSerializerClasses

We should support only this version as it takes Class<? extends Serializer<?>>. This simplifies the serialization.


This option in the serialization uses ids of serializers.

not supported as part of this FLIP

Map<Class, Class<Serializer<?>>>



setRestartStrategy

On the client side we need to convert the option to RestartStrategyConfiguration. It does not support custom restart strategy. We would also flatten the configuration structure.

exec.restart-strategy

reuse the existing

RestartStrategy


restart-strategy(ConfigConstants) (not a proper ConfigOption yet)

taskCancellationIntervalMillis


reuse the existing

long


task.cancellation.interval (TaskManagerOptions)

taskCancellationTimeoutMillis


reuse the existing

long


task.cancellation.timeout (TaskManagerOptions)

useSnapshotCompression

exec

execution.checkpointing.snapshot-compression

boolean

ConfigOption<Boolean>

printProgressDuringExecution

Deprecated, has no effect

not supported




codeAnalysisMode

Deprecated, has no effect

not supported

---



failTaskOnCheckpoint

Deprecated

not supported

boolean



CheckpointConfig:


Configuration

Comment

Suggested key

Type

Suggested type of ConfigOption

Exists in 1.9 

checkpointingMode


execution.checkpointing.mode

Enum<CheckpointingMode>

ConfigOption<CheckpointingMode>

checkpointInterval

We could use Duration for ConfigOption

execution.checkpointing.interval

long

ConfigOption<Duration>

checkpointTimeout

We could use Duration for ConfigOption

execution.checkpointing.timeout

long

ConfigOption<Duration>

externalizedCheckpointCleanup


execution.checkpointing.externalized-checkpoint-mode

Enum<ExternalizedCheckpointCleanup>

ConfigOption<ExternalizedCheckpointCleanup>

failOnCheckpointingErrors

this is deprecated

not supported

boolean



forceCheckpointing

this is deprecated

not supported

boolean



maxConcurrentCheckpoints


execution.checkpointing.max-concurrent-checkpoints

int

ConfigOption<Integer>

minPauseBetweenCheckpoints

We could use Duration for ConfigOption

execution.checkpointing.min-pause

long 

ConfigOption<Duration>

preferCheckpointForRecovery


execution.checkpointing.prefer-checkpoint-over-savepoint

boolean

ConfigOption<Boolean>

tolerableCheckpointFailureNumber


execution.checkpointing.tolerable-checkpoint-failures

int

ConfigOption<Integer>


StreamExecutionEnvironment:


Configuration

Comment

Suggested key

Type

Suggested type of ConfigOption

Exists in 1.9 

timeCharacteristic

exec

pipeline.time-characteristic

Enum<StreamTimeCharacteristic>

ConfigOption<StreamTimeCharacteristic>

defaultStateBackend


reuse options

StateBackend


state.backend

(CheckpointingOptions)

isChainingEnabled

exec


pipeline.operator-chaining

boolean

ConfigOption<Boolean>

bufferTimeout

We could use Duration for ConfigOption

We could move to ExecutionConfig

exec

execution.buffer-timeout

long

ConfigOption<Duration>

cachedFile

We

exec

pipeline.cached-files

<String, String, Boolean>

filePath/fileName/Executable

ConfigOption<List<CachedFile>>, where CachedFile is a pojo


Compatibility, Deprecation, and Migration Plan

...