...
Page properties |
---|
...
|
Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-59-Enable-execution-configuration-from-Configuration-object-td32359.html
JIRA:
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
This FLIP in few places is based on FLIP-54. In particular it assumes richer type support for ConfigOptions, and availability of ConfigurationReader#getOptional.
...
Proposed Changes
Methods to configure config objects
We suggest to add methods that enable to change the state of the three classes from a ConfigurationReader.
- Add configure methods:
- void ExecutionConfig.configure(ConfigurationReaderReadableConfig configuration, ClassLoader classLoader)
- void CheckpointConfig.configure(ConfigurationReaderReadableConfig)
- void StreamExecutionEnvironment.configure(ConfigurationReaderReadableConfig configuration, ClassLoader classLoader)
Those methods would mutate internal state of only those parameters for which a corresponding option is present in the ConfigurationReader. The rest would be left intact. The method StreamExecutionEnvironment.configure would also call the two other methods.
E.g.
public static final ConfigOption<ClosureCleanerLevel> CLOSURE_CLEANER_LEVEL =
ConfigOptions.key("execpipeline.closure-cleaner-level")
.enumType(ClosureCleanerLevel.class)
.defaultValue(ClosureCleanerLevel.RECURSIVE);
void configure(ConfigurationReader conf) {
...
conf.getOptional(CLOSURE_CLEANER_LEVEL).ifPresent(level -> {
this.closureCleanerLevel = level;
})
...
}
- Add three ConfigOptionSets:
- JobOptions in flink-core (this will include all options from ExecutionConfig + bufferTimeout, isChainingEnabled, CacheFile from StreamExecutionEnvironment)
- CheckpointJobOptions in flink-streaming-java (we do not want to use the CheckpointingOptions to better differentiate from cluster-side options + CheckpointingMode & ExternalizedCheckpointCleanup are not available in flink-core)
- StreamJobOptions in flink-streaming-java (for TimeCharacteristic) - at some point in the future we could merge with JobOptions, once we resolve dependencies (TimeCharacteristic is not in flink-core)
Proposed Changes
...
New ConfigOptions
- In class org.apache.flink.configuration.PipelineOptions:
key | setting | type | Expected format |
---|---|---|---|
pipeline.auto-generate-uids | ExecutionConfig#enableAutoGeneratedUids | ConfigOption<Boolean> | |
pipeline.auto-type-registration | ExecutionConfig#autoTypeRegistrationEnabled | ConfigOption<Boolean> | |
pipeline.auto-watermark-interval | ExecutionConfig#autoWatermarkInterval | ConfigOption<Duration> | |
pipeline.closure-cleaner-level | ExecutionConfig#closureCleanerLevel | ConfigOption<ExecutionConfig.ClosureCleanerLevel> | |
pipeline.force-avro | ExecutionConfig#forceAvro | ConfigOption<Boolean> | |
pipeline.force-kryo | ExecutionConfig#forceKryo | ConfigOption<Boolean> | |
pipeline.generic-types | ExecutionConfig#disableGenericTypes | ConfigOption<Boolean> | |
pipeline.global-job-parameters | ExecutionConfig#globalJobParameters | ConfigOption<Map<String, String>> | |
pipeline.max-parallelism | ExecutionConfig#maxParallelism | ConfigOption<Integer> | |
pipeline.object-reuse | ExecutionConfig#objectReuse | ConfigOption<Boolean> | |
pipeline.default-kryo-serializers | ExecutionConfig#defaultKryoSerializerClasses | ConfigOption<List<String>> | semicolon separated list of pairs of class names and serializers class names. Example: class:org.apache.flink.api.common.ExecutionConfigTest,serializer:org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1;class:org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1,serializer:org.apache.flink.api.common.ExecutionConfigTest$TestSerializer2 |
pipeline.registered-kryo-types | ExecutionConfig#registeredKryoTypes | ConfigOption<List<String>> | semicolon separated list of class names Example: org.apache.flink.api.common.ExecutionConfigTest;org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1 |
pipeline.registered-pojo-types | ExecutionConfig#registeredPojoTypes | ConfigOption<List<String>> | semicolon separated list of class names Example: org.apache.flink.api.common.ExecutionConfigTest;org.apache.flink.api.common.ExecutionConfigTest$TestSerializer1 |
pipeline.operator-chaining | StreamExecutionEnvironment#isChainingEnabled | ConfigOption<Boolean> | |
pipeline.cached-files | StreamExecutionEnvironment#cacheFile | ConfigOption<List<String>> | semicolon separated list of triples of cached file path, name, and executable flag(the executable flag is optional) Example: name:file1,path:/tmp1,executable:true;name:file2,path:/tmp2 |
In class org.apache.flink.streaming.api.environment.StreamPipelineOptions:
key setting type pipeline.time-characteristic StreamExecutionEnvironment#timeCharacteristic ConfigOption<TimeCharacteristic> In class org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions
key setting type execution.checkpointing.mode CheckpointConfig#checkpointingMode ConfigOption<CheckpointingMode> execution.checkpointing.interval CheckpointConfig#checkpointInterval ConfigOption<Duration> execution.checkpointing.timeout CheckpointConfig#checkpointTimeout ConfigOption<Duration> execution.checkpointing.max-concurrent-checkpoints CheckpointConfig#maxConcurrentCheckpoints ConfigOption<Integer> execution.checkpointing.min-pause CheckpointConfig#minPauseBetweenCheckpoints ConfigOption<Duration> execution.checkpointing.prefer-checkpoint-for-recovery CheckpointConfig#preferCheckpointForRecovery ConfigOption<Boolean> execution.checkpointing.tolerable-failed-checkpoints CheckpointConfig#tolerableCheckpointFailureNumber ConfigOption<Integer> execution.checkpointing.externalized-checkpoint CheckpointConfig#externalizedCheckpointCleanup ConfigOption<CheckpointConfig.ExternalizedCheckpointCleanup> - In class org.apache.flink.configuration.ExecutionOptions
key setting type execution.checkpointing.snapshot-compression ExecutionConfig#useSnapshotCompression ConfigOption<Boolean> execution.buffer-timeout StreamExecutionEnvironment#bufferTimeout ConfigOption<Duration>
Options not configurable from Configuration
- ExecutionConfig
- ExecutionConfig#executionMode
- ExecutionConfig#numberOfExecutionRetries
- ExecutionConfig#codeAnalysisMode
- ExecutionConfig#executionRetryDelay
- ExecutionConfig#failTaskOnCheckpointError
- ExecutionConfig#defaultInputDependencyConstraint
- ExecutionConfig#registeredTypesWithKryoSerializers
- ExecutionConfig#registeredTypesWithKryoSerializerClasses
- ExecutionConfig#defaultKryoSerializers
- CheckpointConfig
- CheckpointConfig#failOnCheckpointingErrors
Existing configuration options:
There are at least three places where user can configure execution behavior on a per job case: ExecutionConfig, CheckpointConfig, StreamExecutionEnvironment. The following table lists all configuration parameters that are present in those classes. It also discussed suggested keys that we could use for those options.
ExecutionConfig:
Configuration | Comment | Suggested key | Type of the field | Suggested type of ConfigOption | Exists in 1.9 |
autoTypeRegistration |
pipeline.auto-type-registration | boolean | ConfigOption<Boolean> | ||
autoWatermarkInterval | We could use Duration for ConfigOption |
pipeline.auto-watermark-interval | long | ConfigOption<Duration> | |
closureCleaner |
pipeline.closure-cleaner-level | ClosureCleanerLevel | ConfigOption<ClosureCleanerLevel> | ||
defaultInputDependencyConstraint |
exec.input-dependency-constraint
not supported | ||||
defaultKryoSerializerClasses | We should support only this version as it takes Class<? extends Serializer<?>>. This simplifies the serialization. |
pipeline.default-kryo-serializers | Map<Class, Class<Serializer<?>>> | ConfigOption<List<DefaultKryoSerializer>>, where DefaultKryoSerializer is a pojo | |||
defaultKryoSerializers | We do not want to support this option as it stores SerializableSerializer. We see no point in configuring the serializers from Java-serialized strings in Base64 encoding. | not supported | Map<Class, SerializableSerializer<?>> | ||
disableGenericTypes | we should invert the logic in the ConfigOption. true means enabled, false disabled |
pipeline.generic-types | boolean | ConfigOption<Boolean> | |
enableAutoGeneratedUIDs |
pipeline.auto-generated-uids | boolean | ConfigOption<Boolean> | ||
executionMode |
exec.mode
not supported | |||||
executionRetryDelay/ numberOfExecutionRetries | Deprecated | not supported | long/int | ||
forceAvro |
pipeline.force-avro | boolean | ConfigOption<Boolean> | ||
forceKryo |
pipeline.force-kryo | boolean | ConfigOption<Boolean> | ||
setGlobalJobParameters |
pipeline.global-job-parameters | Map<String, String> | ConfigOption<Map<String, String>> | |||
latencyTrackingInterval | reuse the existing | long | metrics.latency.interval(MetricOptions) | ||
setMaxParallelism |
pipeline.max-parallelism | int | ConfigOption<Integer> | ||
objectReuse |
pipeline.object-reuse | boolean | ConfigOption<Boolean> | |||
setParallelism | reuse the existing | int | parallelism.default(CoreOptions) table.exec.resource.default-parallelism (ExecutionConfigOptions) - drop this Table specific option |
printProgressDuringExecution
exec.sysout-logging
registeredKryoTypes |
pipeline.registered-kryo-types | List<Class<?>> | ConfigOption<List<Class<?>>> | ||
registeredPojoTypes |
pipeline.registered-pojo-types | List<Class<?>> | ConfigOption<List<Class<?>>> | |||
registeredTypesWithKryoSerializers | We do not want to support this option as it stores SerializableSerializer. We see no point in configuring the serializers from MD5 encoding. | not supported | Map<Class, Serializer<?>> | ||
registeredTypesWithKryoSerializerClasses | We should support only this version as it takes Class<? extends Serializer<?>>. This simplifies the serialization. This option in the serialization uses ids of serializers. | not supported as part of this FLIP | Map<Class, Class<Serializer<?>>> | ||
setRestartStrategy | On the client side we need to convert the option to RestartStrategyConfiguration. It does not support custom restart strategy. We would also flatten the configuration structure. |
reuse the existing | RestartStrategy | restart-strategy(ConfigConstants) (not a proper ConfigOption yet) | |||
taskCancellationIntervalMillis | reuse the existing | long | task.cancellation.interval (TaskManagerOptions) | ||
taskCancellationTimeoutMillis | reuse the existing | long | task.cancellation.timeout (TaskManagerOptions) | ||
useSnapshotCompression |
execution.checkpointing.snapshot-compression | boolean | ConfigOption<Boolean> | |||
printProgressDuringExecution | Deprecated, has no effect | not supported | |||
codeAnalysisMode | Deprecated, has no effect | not supported | --- | ||
failTaskOnCheckpoint | Deprecated | not supported | boolean |
CheckpointConfig:
Configuration | Comment | Suggested key | Type | Suggested type of ConfigOption | Exists in 1.9 |
checkpointingMode | execution.checkpointing.mode | Enum<CheckpointingMode> | ConfigOption<CheckpointingMode> | ||
checkpointInterval | We could use Duration for ConfigOption | execution.checkpointing.interval | long | ConfigOption<Duration> | |
checkpointTimeout | We could use Duration for ConfigOption | execution.checkpointing.timeout | long | ConfigOption<Duration> | |
externalizedCheckpointCleanup | execution.checkpointing.externalized-checkpoint-mode | Enum<ExternalizedCheckpointCleanup> | ConfigOption<ExternalizedCheckpointCleanup> | ||
failOnCheckpointingErrors | this is deprecated | not supported | boolean | ||
forceCheckpointing | this is deprecated | not supported | boolean | ||
maxConcurrentCheckpoints | execution.checkpointing.max-concurrent-checkpoints | int | ConfigOption<Integer> | ||
minPauseBetweenCheckpoints | We could use Duration for ConfigOption | execution.checkpointing.min-pause | long | ConfigOption<Duration> | |
preferCheckpointForRecovery | execution.checkpointing.prefer-checkpoint-over-savepoint | boolean | ConfigOption<Boolean> | ||
tolerableCheckpointFailureNumber | execution.checkpointing.tolerable-checkpoint-failures | int | ConfigOption<Integer> |
StreamExecutionEnvironment:
Configuration | Comment | Suggested key | Type | Suggested type of ConfigOption | Exists in 1.9 |
timeCharacteristic |
pipeline.time-characteristic | Enum<StreamTimeCharacteristic> | ConfigOption<StreamTimeCharacteristic> | |||
defaultStateBackend | reuse options | StateBackend | state.backend (CheckpointingOptions) | ||
isChainingEnabled |
pipeline.operator-chaining | boolean | ConfigOption<Boolean> | ||
bufferTimeout | We could use Duration for ConfigOption We could move to ExecutionConfig |
execution.buffer-timeout | long | ConfigOption<Duration> | |
cachedFile |
|
pipeline.cached-files | <String, String, Boolean> filePath/fileName/Executable | ConfigOption<List<CachedFile>>, where CachedFile is a pojo |
Compatibility, Deprecation, and Migration Plan
...