Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In Flink batch job, the job is usually divided into multiple parallel tasks that executed cross many nodes in the cluster. It is common to encounter the performance degradation on some nodes due to hardware problems, or accident I/O busy, or high CPU load. This kind of degradation can probably cause slow the running tasks on the node to be quite slow, that is so called long tail tasks. Although the long tail tasks will not failfinished finally, they can severely affect significantly increase the total job running time. Flink task scheduling does not take Currently, this long tail problem into account currentlyhas not been well solved.

Here we I propose the a speculative execution strategy FLINK-10644 to handle the this problem. The basic idea is to run a copy of the task on another node when the original task is identified to be as a long tail task. The speculative task is executed in parallel with the original one and share shares the same failure retry mechanism. Once either task completecompletes, the scheduler admits its output as the final result and cancel cancels the other running one. I will introduce a A blacklist module is introduced to schedule the long tail task on different machine from the original task . And and modify FileOutputFormat.java to adapter speculative execution mechanism.

The preliminary experiments in Alibaba's product cluster have demonstrated the effectiveness in our product clustereffectiveness of this strategy.

Proposed Changes

General design

...

A task will be classified as a long tail task when it meets the following three rulescriteria.

Finished Tasks Percentage

In one ExecutionJobVertex, when a configurable When a configurable percentage(default 75%) of executions in an ExecutionJobVertex has been finished, the speculative execution thread begin begins to really work.

Long Running Time

In speculative execution thread, all executions' interval between the current time and it its first createcreating/deploying time before it its failover in one ExecutionJobVertex are calculated. when When the running time of a an execution is greater than a configurable multiple(default 1.5) of the median of the running time of the other finished executions, this execution is defined as a long tail execution.


Slow Processing Throughput

In our product cluster of Alibaba in China, the algorithm mentioned above algorithm could solve the long tail task problem sufficiently. So first version I will not take throughput into consider. After release this feature effectively. Currently, slow throughput is not included in this version. I will update a new version if Flink community users have need this requirement I will develop next version.

Scheduling of Speculative Executions

Refactoring of ExecutionVertex

Because of introduction speculative execution, the ExecutionVertex can't default to only one execution is running at the same timeBy default, the ExecutionVertex only has one running execution. However, as we introduce speculative execution, the ExecutionVertex could have more than one execution simultaneously. Some member-variables in ExecutionVertex need bigger refactorneed a bigger refactoring.

There are two ways of code refactoring:

  1. Add a member-variable named speculativeExecution that similar to the currentExecution. But However, by this way will lead to lots of , many if judgments will be appear in the scheduler and failover code. Also this way will Moreover, it will reduce the flexible of code flexibility if there are more than two executions existed at the same timesimultaneously
  2. Change the currentExecution in ExecutionVertex to an ArrayList named executionList, which means that there can be multiple executions in an ExecutionVertex at the same timesimultaneously. For each execution in the executionList, there is no difference of the behavior such as in their behavior(e.g., failover, request slot, etc between them.).
Code Block
languagejava
titleRefactor member field of ExecutionVertex
private List<Execution> executionList = new ArrayList<>();

Introduction of SpeculativeScheduler thread

In each ExecutionJobVertex there There is a SpeculativeScheduler thread used for detect the thread detecting long tail execution in this ExecutionJobVertex periodically based on above rulesexecutions periodically in each ExecutionJobVertex according to the criteria mentioned above. Its member-variables included include SchedulerNG and , ExecutionVertex[] etc. SchedulerNG , and so on. SchedulerNG is used for scheduling the speculative executions and the ExecutionVertex[] is used for get getting execution state timestamp in this ExecutionJobVertex.

...

Modification of scheduler

...

logicality

The scheduler should scheduling schedule an execution according to the index of this execution in executionList instead of default to scheduling the currentExecutionits index in the executionList instead of that in the currentExecution by default. So we need to change ExecutionVertexID to ExecutionVertexIDAndExecutionIndex which represent to ExecutionVertexIDWithExecutionIndex that represents which execution in ExecutionVertex should be scheduling scheduled in scheduler logicallogicality. Also Besides, when task failover, executionIndex should also should be calculated by fail task's ExecutionAttemptID, so that the scheduler knows which execution in executionList should be restartin the executionList should be restarted. Moreover, ExecutionVertexVersion and ExecutionVertexVersioner will be refactored to track all executions' version.

Code Block
languagejava
titleExecutionVertexIDWithExecutionIndex.java
public class ExecutionVertexIDWithExecutionIndex {
    private ExecutionVertexID executionVertexID;
    private Integer executionIndex;
}

...

In order to reuse the code of scheduler, we need to extend the interface with an additional method then . Then SchedulerBase should implements it.

Code Block
languagejava
titleSchedulerNG interface extension
public interface SchedulerNG extends AutoCloseableAsync {
	default void schedulingSpeculativeExecutions(List<ExecutionVertexIDWithExecutionIndex> verticesToSchedule) {
        throw new UnsupportedOperationException();
    }
}

...

Processing failover situation

Just like the normal tasks, the speculative task is executed in parallel with the original one and share shares the same failover and restart strategy.The original long tail task tasks and speculative tasks can still retry with failure on their own tracktracks.But I think it should not be restarted globally when the counts of speculation execution failover count reach the max-retry-counts. 

When a task failfails, we could calculate its index(executionIndex) in the executionList by executionAttemptID. Then the scheduler takes a series of processing for the corresponding execution according to the executionIndex as shown below.

In order to better failover logic, I will extend the calss FailureHandlingResult with an additional member-variable.

...

languagejava
titleFailureHandlingResult class extension

...

Some classes will add a member-variable named executionIndex, for example, FailureHandlingResult、ExecutionVertexDeploymentOption, and so on.

Black list

Most long tail tasks are caused by machine problems, so the speculative execution must runs on a different machine from origin execution.

...

This FLIP is a new feature and so there is no compatible issue with previous versions.

Test Plan

Covered by unit tests.

Rejected Alternatives

None so far.

...