You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current state: Under Discussion

Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-59-Enable-execution-configuration-from-Configuration-object-td32359.html

JIRA:

Released: 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently the runtime parameters can only be configured via setters in ExecutionConfig / CheckpointConfig / StreamExecutionEnvironment via Java or Scala code. This has some drawbacks:

  • User must have access to the StreamExecutionEnvironment. We face this problem in the unified TableEnvironmen mentioned in FLIP-32. There, users do not see the underlying StreamExecutionEnvironment.
  • It’s impossible to set the parameters from command line. We face this issue e.g. in SQL Client.
  • It’s impossible to have job specific default values for execution parameters provided from a file.
  • Moreover some of the config options do not have a string serializable options, which makes it even harder to implement the two previous items.
  • These options do not have descriptions in documentation.

We suggest to improve the runtime configuration so that it’s configurable from a Configuration object and integrates nicely in a possible hierarchy of configuration layers in the future.

Assumption: 

This FLIP in few places is based on FLIP-54. In particular it assumes richer type support for ConfigOptions, and availability of ConfigurationReader#getOptional.


Public Interfaces

We suggest to add methods that enable to change the state of the three classes from a ConfigurationReader.

  • Add configure methods:
    • void ExecutionConfig.configure(ConfigurationReader)
    • void CheckpointConfig.configure(ConfigurationReader)
    • void StreamExecutionEnvironment.configure(ConfigurationReader)

    Those methods would mutate internal state of only those parameters for which a corresponding option is present in the ConfigurationReader. The rest would be left intact.

E.g.

    public static final ConfigOption<ClosureCleanerLevel> CLOSURE_CLEANER_LEVEL =

       ConfigOptions.key("exec.closure-cleaner-level")

           .enumType(ClosureCleanerLevel.class)

           .defaultValue(ClosureCleanerLevel.RECURSIVE);


void configure(ConfigurationReader conf) {

        ...

conf.getOptional(CLOSURE_CLEANER_LEVEL).ifPresent(level -> {

            this.closureCleanerLevel = level;

        })

        ...

}
  • Add three ConfigOptionSets:
    • JobOptions in flink-core (this will include all options from ExecutionConfig + bufferTimeout, isChainingEnabled, CacheFile from StreamExecutionEnvironment) 
    • CheckpointJobOptions in flink-streaming-java (we do not want to use the CheckpointingOptions to better differentiate from cluster-side options + CheckpointingMode & ExternalizedCheckpointCleanup are not available in flink-core)
    • StreamJobOptions in flink-streaming-java (for TimeCharacteristic) - at some point in the future we could merge with JobOptions, once we resolve dependencies (TimeCharacteristic is not in flink-core)


Proposed Changes

Configuration options: 

There are at least three places where user can configure execution behavior on a per job case: ExecutionConfig, CheckpointConfig, StreamExecutionEnvironment. The following table lists all configuration parameters that are present in those classes. It also discussed suggested keys that we could use for those options.

ExecutionConfig:


Configuration

Comment

Suggested key

Type

Exists in 1.9 

autoTypeRegistration


exec.auto-type-registration

boolean


autoWatermarkInterval

We could use Duration for ConfigOption

exec.auto-watermark-interval

long


closureCleaner


exec.closure-cleaner-level

ClosureCleanerLevel


defaultInputDependencyConstraint


exec.input-dependency-constraint

Enum<SuffleMode>

table.exec.shuffle-mode

(ExecutionConfigOptions) - drop this option

defaultKryoSerializerClasses

We should support only this version as it takes Class<? extends Serializer<?>>. This simplifies the serialization.

exec.default-kryo-serializers

Map<Class, Class<Serializer<?>>>


defaultKryoSerializers

We do not want to support this option as it stores SerializableSerializer. We see no point in configuring the serializers from Java-serialized strings in Base64 encoding.

not supported

Map<Class, SerializableSerializer<?>>


disableGenericTypes

we should invert the logic in the ConfigOption. true means enabled, false disabled

exec.generic-types

boolean


enableAutoGeneratedUIDs


exec.auto-generated-uids

boolean


executionMode


exec.mode

Enum<ExecutionMode>


executionRetryDelay/

numberOfExecutionRetries

Deprecated

not supported

long/int


forceAvro


exec.force-avro

boolean


forceKryo


exec.force-kryo

boolean


setGlobalJobParameters


exec.global-job-parameters

Map<String, String>


latencyTrackingInterval


reuse the existing

long

metrics.latency.interval(MetricOptions)

setMaxParallelism


exec.max-parallelism

int


objectReuse


exec.object-reuse

boolean


setParallelism


reuse the existing

int

parallelism.default(CoreOptions)

table.exec.resource.default-parallelism (ExecutionConfigOptions) - drop this Table specific option

printProgressDuringExecution


exec.sysout-logging

boolean


registeredKryoTypes


exec.registered-kryo-types

List<Class<?>>


registeredPojoTypes


exec.registered-pojo-types

List<Class<?>>


registeredTypesWithKryoSerializers

We do not want to support this option as it stores SerializableSerializer. We see no point in configuring the serializers from MD5 encoding.

not supported

Map<Class, Serializer<?>>


registeredTypesWithKryoSerializerClasses

We should support only this version as it takes Class<? extends Serializer<?>>. This simplifies the serialization.

not supported

Map<Class, Class<Serializer<?>>>


setRestartStrategy

On the client side we need to convert the option to RestartStrategyConfiguration. It does not support custom restart strategy. We would also flatten the configuration structure.

exec.restart-strategy

RestartStrategy

restart-strategy(ConfigConstants) (not a proper ConfigOption yet)

taskCancellationIntervalMillis


reuse the existing

long

task.cancellation.interval (TaskManagerOptions)

taskCancellationTimeoutMillis


reuse the existing

long

task.cancellation.timeout (TaskManagerOptions)

useSnapshotCompression


exec.snapshot-compression

boolean


codeAnalysisMode

Deprecated, has no effect

not supported

---


failTaskOnCheckpoint

Deprecated

not supported

boolean


CheckpointConfig:


Configuration

Comment

Suggested key

Type

Exists in 1.9 

checkpointingMode


checkpointing.mode

Enum<CheckpointingMode>


checkpointInterval

We could use Duration for ConfigOption

checkpointing.interval

long


checkpointTimeout

We could use Duration for ConfigOption

checkpointing.timeout

long


externalizedCheckpointCleanup


checkpointing.externalized-checkpoint-mode

Enum<ExternalizedCheckpointCleanup>


failOnCheckpointingErrors

this is deprecated

not supported

boolean


forceCheckpointing

this is deprecated

not supported

boolean


maxConcurrentCheckpoints


checkpointing.max-concurrent-checkpoints

int


minPauseBetweenCheckpoints

We could use Duration for ConfigOption

checkpointing.min-pause

long 


preferCheckpointForRecovery


checkpointing.prefer-checkpoint-over-savepoint

boolean


tolerableCheckpointFailureNumber


checkpointing.tolerable-checkpoint-failures

int



StreamExecutionEnvironment:


Configuration

Comment

Suggested key

Type

Exists in 1.9 

timeCharacteristic


exec.time-characteristic

Enum<StreamTimeCharacteristic>


defaultStateBackend


reuse options

StateBackend

state.backend

(CheckpointingOptions)

isChainingEnabled


exec.operator-chaining

boolean


bufferTimeout

We could use Duration for ConfigOption

We could move to ExecutionConfig

exec.buffer-timeout

long


cachedFile

We

exec.cached-files

<String, String, Boolean>

filePath/fileName/Executable



Compatibility, Deprecation, and Migration Plan

    The changes are backward compatible. It introduces an additional way of configuring ExecutionConfig, CheckpointConfig & StreamExecutionEnvironment. It reuses existing options wherever possible.

Test Plan

Covered by unit tests

Rejected Alternatives

---

  • No labels