...
There are at least three places where user can configure execution behavior on a per job case: ExecutionConfig, CheckpointConfig, StreamExecutionEnvironment. The following table lists all configuration parameters that are present in those classes. It also discussed suggested keys that we could use for those options.
ExecutionConfig:
Configuration | Comment | Suggested key | Type of the field | Suggested type of ConfigOption | Exists in 1.9 |
autoTypeRegistration | exec.auto-type-registration | boolean | ConfigOption<Boolean> | ||
autoWatermarkInterval | We could use Duration for ConfigOption | exec.auto-watermark-interval | long | ConfigOption<Duration> | |
closureCleaner | exec.closure-cleaner-level | ClosureCleanerLevel | ConfigOption<ClosureCleanerLevel> | ||
defaultInputDependencyConstraint | exec.input-dependency-constraint | Enum<InputDependencyConstraint> | ConfigOption<InputDependencyConstraint> | ||
defaultKryoSerializerClasses | We should support only this version as it takes Class<? extends Serializer<?>>. This simplifies the serialization. | exec.default-kryo-serializers | Map<Class, Class<Serializer<?>>> | ConfigOption<List<DefaultKryoSerializer>>, where DefaultKryoSerializer is a pojo | |
defaultKryoSerializers | We do not want to support this option as it stores SerializableSerializer. We see no point in configuring the serializers from Java-serialized strings in Base64 encoding. | not supported | Map<Class, SerializableSerializer<?>> | ||
disableGenericTypes | we should invert the logic in the ConfigOption. true means enabled, false disabled | exec.generic-types | boolean | ConfigOption<Boolean> | |
enableAutoGeneratedUIDs | exec.auto-generated-uids | boolean | ConfigOption<Boolean> | ||
executionMode | exec. |
exchange-mode | Enum<ExecutionMode> | ConfigOption<ExecutionMode> | |||
executionRetryDelay/ numberOfExecutionRetries | Deprecated | not supported | long/int | ||
forceAvro | exec.force-avro | boolean | ConfigOption<Boolean> | ||
forceKryo | exec.force-kryo | boolean | ConfigOption<Boolean> | ||
setGlobalJobParameters | exec.global-job-parameters | Map<String, String> | ConfigOption<Map<String, String>> | ||
latencyTrackingInterval | reuse the existing | long | metrics.latency.interval(MetricOptions) | ||
setMaxParallelism | exec.max-parallelism | int | ConfigOption<Integer> | ||
objectReuse | exec.object-reuse | boolean | ConfigOption<Boolean> | ||
setParallelism | reuse the existing | int | parallelism.default(CoreOptions) table.exec.resource.default-parallelism (ExecutionConfigOptions) - drop this Table specific option | ||
printProgressDuringExecution | exec.sysout-logging | boolean | ConfigOption<Boolean> | ||
registeredKryoTypes | exec.registered-kryo-types | List<Class<?>> | ConfigOption<List<Class<?>>> | ||
registeredPojoTypes | exec.registered-pojo-types | List<Class<?>> | ConfigOption<List<Class<?>>> | ||
registeredTypesWithKryoSerializers | We do not want to support this option as it stores SerializableSerializer. We see no point in configuring the serializers from MD5 encoding. | not supported | Map<Class, Serializer<?>> | ||
registeredTypesWithKryoSerializerClasses | We should support only this version as it takes Class<? extends Serializer<?>>. This simplifies the serialization. | not supported | Map<Class, Class<Serializer<?>>> | ||
setRestartStrategy | On the client side we need to convert the option to RestartStrategyConfiguration. It does not support custom restart strategy. We would also flatten the configuration structure. | exec.restart-strategy | RestartStrategy | restart-strategy(ConfigConstants) (not a proper ConfigOption yet) | |
taskCancellationIntervalMillis | reuse the existing | long | task.cancellation.interval (TaskManagerOptions) | ||
taskCancellationTimeoutMillis | reuse the existing | long | task.cancellation.timeout (TaskManagerOptions) | ||
useSnapshotCompression | exec.snapshot-compression | boolean | ConfigOption<Boolean> | ||
codeAnalysisMode | Deprecated, has no effect | not supported | --- | ||
failTaskOnCheckpoint | Deprecated | not supported | boolean |
CheckpointConfig:
Configuration | Comment | Suggested key | Type | Suggested type of ConfigOption | Exists in 1.9 |
checkpointingMode | checkpointing.mode | Enum<CheckpointingMode> | ConfigOption<CheckpointingMode> | ||
checkpointInterval | We could use Duration for ConfigOption | checkpointing.interval | long | ConfigOption<Duration> | |
checkpointTimeout | We could use Duration for ConfigOption | checkpointing.timeout | long | ConfigOption<Duration> | |
externalizedCheckpointCleanup | checkpointing.externalized-checkpoint-mode | Enum<ExternalizedCheckpointCleanup> | ConfigOption<ExternalizedCheckpointCleanup> | ||
failOnCheckpointingErrors | this is deprecated | not supported | boolean | ||
forceCheckpointing | this is deprecated | not supported | boolean | ||
maxConcurrentCheckpoints | checkpointing.max-concurrent-checkpoints | int | ConfigOption<Integer> | ||
minPauseBetweenCheckpoints | We could use Duration for ConfigOption | checkpointing.min-pause | long | ConfigOption<Duration> | |
preferCheckpointForRecovery | checkpointing.prefer-checkpoint-over-savepoint | boolean | ConfigOption<Boolean> | ||
tolerableCheckpointFailureNumber | checkpointing.tolerable-checkpoint-failures | int | ConfigOption<Integer> |
StreamExecutionEnvironment:
Configuration | Comment | Suggested key | Type | Suggested type of ConfigOption | Exists in 1.9 |
timeCharacteristic | exec.time-characteristic | Enum<StreamTimeCharacteristic> | ConfigOption<StreamTimeCharacteristic> | ||
defaultStateBackend | reuse options | StateBackend | state.backend (CheckpointingOptions) | ||
isChainingEnabled | exec.operator-chaining | boolean | ConfigOption<Boolean> | ||
bufferTimeout | We could use Duration for ConfigOption We could move to ExecutionConfig | exec.buffer-timeout | long | ConfigOption<Duration> | |
cachedFile |
| exec.cached-files | <String, String, Boolean> filePath/fileName/Executable | ConfigOption<List<CachedFile>>, where CachedFile is a pojo |
Compatibility, Deprecation, and Migration Plan
...