Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

When batch job write record into file or Key-value databases, this feature could enabled.

  • Sink to Keyto key-value databases, nothing should do with speculative execution feature.
  • Sink to file, we should I will add a global unical unique ExecutionAttemptID suffix after the normal task names. Finally, I some file will delete or rename these files in be deleted or renamed when finalizeOnMaster() called.


As shown below, four hashSet will be created and added some global unique ExecutionAttemptIDs. When all task finished, different files will face different processing methods according to which set the suffix is in.

Image AddedImage Removed

Code Block
languagejava
titleDefaultExecutionGraph
public class JobVertex implements java.io.Serializable {

	// ExecutionAttemptIDs of the fastest finished executions in all speculatived ExecutionVertex in this jobvertex.
	private Set<ExecutionAttemptID> fastAttemptIdsWithSpeculative = new HashSet<>();

	// ExecutionAttemptIDs of the non-fastest finished executions in all speculatived ExecutionVertex in this jobvertex.
	private Set<ExecutionAttemptID> slowAttemptIdsWithSpeculative = new HashSet<>();

	// ExecutionAttemptIDs of the success finished executions in all non-speculatived ExecutionVertex in this jobvertex.
	private Set<ExecutionAttemptID> finishedAttemptIdsWithoutSpeculative = new HashSet<>();

	// All executionAttemptIDs in this jobvertex.
	private Set<ExecutionAttemptID> allAttemptIds = new HashSet<>();

}

...

  • speculativeExecutionFastestFinishedCountMetrics is defined as how many ExecutionVertex's speculative execution is faster reach FINISHED state than the original execution.

Web UI

After I have implemented this feature. When If we don't work with web ui, when speculative execution runs faster then than original execution, the flink the web ui will show that this task has been cancelled . But but the result of the job is correct, which is in full compliance with our expectations.

再加一张图

.

Should take further discussion whether this work needs to be donetodo...

Limitations

(1)JobType must be Batch.

...

Code Block
languagejava
titleJobManagerOptions
public static final ConfigOption<Boolean> SPECULATIVE_EXECUTION_ENABLED =
        key("flink.batch.speculative.enabled")
                .booleanType()
                .defaultValue(false)
                .withDescription("Whether to enable speculation of batch job.");

public static final ConfigOption<Long> SPECULATIVE_EXECUTION_INTERVAL_IN_MILLIS =
        key("flink.batch.speculative.interval.in.millis")
                .longType()
                .defaultValue(100L)
                .withDescription("How often to check for speculative tasksexecutions.");

public static final ConfigOption<Double> SPECULATIVE_EXECUTION_MULTIPLIER =
        key("flink.batch.speculative.multiplier")
                .doubleType()
                .defaultValue(1.5)
                .withDescription("When the running time of a unfinished executionVertex is several times of" +
                        " the median of all completed executionVertexs, it will be speculated.");

public static final ConfigOption<Double> SPECULATIVE_EXECUTION_QUANTILE =
        key("flink.batch.speculative.quantile")
                .doubleType()
                .defaultValue(0.75)
                .withDescription("When the percentage of ExecutionVertex in an executionJobVertex are completedfinished," +
                        " the speculative execution mechanism will be started. 0.9 means that 90% of the" +
                        " executionVertex are completedfinished, andthen the speculative execution mechanism will be started.");

public static final ConfigOption<Long> SPECULATIVE_EXECUTION_LOG_INTERVAL_IN_MILLIS =
        key("flink.batch.speculative.log.interval.in.millis")
                .longType()
                .defaultValue(1000 * 60 * 5L)
                .withDescription("Interval in millis for speculative related log.");

public static final ConfigOption<Boolean> BLINKFLINK_BLACKLIST_ENABLE =
        key("flink.blacklist.enable")
                .booleanType()
                .defaultValue(false)
                .withDescription("Whether to enable blacklist job.");

public static final ConfigOption<Long> BLINKFLINK_BLACKLIST_TIMEOUT_IN_MILLIS =
        key("flink.blacklist.timeout.in.millis")
                .longType()
                .defaultValue(60 * 1000L)
                .withDescription("indicatesIndicates how long a black list record will be removed after added to blackthe listblacklist.");


Compatibility, Deprecation, and Migration Plan

...