Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleJobManagerOptions
public static final ConfigOption<Boolean> SPECULATIVE_EXECUTION_ENABLED =
        key("flink.batch.speculative.enabled")
                .booleanType()
                .defaultValue(false)
                .withDescription("Whether to enable speculation of batch job.");

public static final ConfigOption<Long> SPECULATIVE_EXECUTION_INTERVAL_IN_MILLIS =
        key("flink.batch.speculative.interval.in.millis")
                .longType()
                .defaultValue(100L)
                .withDescription("How often to check for speculative executions.");

public static final ConfigOption<Double> SPECULATIVE_EXECUTION_MULTIPLIER =
        key("flink.batch.speculative.multiplier")
                .doubleType()
                .defaultValue(1.5)
                .withDescription("When the running time of a unfinished executionVertex is several times of" +
                        " the median of all completed ExecutionVertexs, it will be speculated.");

public static final ConfigOption<Double> SPECULATIVE_EXECUTION_QUANTILE =
        key("flink.batch.speculative.quantile")
                .doubleType()
                .defaultValue(0.75)
                .withDescription("When the percentage of ExecutionVertex in an ExecutionJobVertex are finished," +
                        " the speculative execution mechanism will be started. 0.9 means that 90% of the" +
                        " ExecutionVertex are finished, then the speculative execution mechanism will be started.");

public static final ConfigOption<Double> SPECULATIVE_EXECUTION_COUNT =
        key("flink.batch.speculative.execution.count")
                .longType()
                .defaultValue(1)
                .withDescription("The number of speculative executions that existed in an ExecutionVertex simultaneously.");

public static final ConfigOption<Long> SPECULATIVE_EXECUTION_LOG_INTERVAL_IN_MILLIS =
        key("flink.batch.speculative.log.interval.in.millis")
                .longType()
                .defaultValue(1000 * 60 * 5L)
                .withDescription("Interval in millis for speculative related log.");

public static final ConfigOption<Boolean> FLINK_BLACKLIST_ENABLE =
        key("flink.blacklist.enable")
                .booleanType()
                .defaultValue(false)
                .withDescription("Whether to enable blacklist job.");

public static final ConfigOption<Long> FLINK_BLACKLIST_TIMEOUT_IN_MILLIS =
        key("flink.blacklist.timeout.in.millis")
                .longType()
                .defaultValue(60 * 1000L)
                .withDescription("Indicates how long a black list record will be removed after added to the blacklist.");


Implementation Plan

Task breakdown

  1. Implement speculative execution thread in JM and use thread pools for periodic run.

  2. Modify some logic of SchedulerNG to adapt speculative execution.

  3. Modify some logic of failover to adapt speculative execution.

  4. Modify some logic of executionFinished() in ExecutionVertex to adapt speculative execution.

  5. Implement BlackList module.

  6. Modify JM and RM to adapt BlackList logical.

  7. Modify some logic of flink-yarn to adapt blacklist logic.

  8. Modify some logic of flink-kubernetes to adapt blacklist logic.

  9. Refactor some logic of InputSplitAssigner to adapt speculative execution.
  10. Modify some logic of finishPartitionsAndUpdateConsumers() to adapt speculative execution.
  11. Modify related code when sink to file by FileOutputFormat and OutputFormatSinkFunction to adapt speculative execution.

  12. Modify related code when sink to file by FileSink in flink-connector-files to adapt speculative execution.

  13. Add some metrics to adapt speculative execution.

  14. Add some code of WebUI module to adapt speculative execution.

Compatibility, Deprecation, and Migration Plan

...