Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In our product cluster of Alibaba in China, we only considered running time could solve the long tail task problem sufficiently. So first version I will not take throughput into consider. After release this feature if Flink community if users have this requirement I will delevop next version. 

Scheduling of Speculative Executions

Introduction speculative scheduler thread

In each ExecutionJobVertex there is a SpeculativeScheduler thread used for detect the long tail execution in this ExecutionJobVertex periodically based on above three rules.

Image Removed

...

Refactoring of ExecutionVertex

Because of introduction speculative execution, the ExecutionVertex can't default to only one execution is running at the same time. Some member-variables in ExecutionVertex need bigger refactor.

...

Code Block
languagejava
titleRefactor member field of ExecutionVertex
private List<Execution> executionList = new ArrayList<>();

Introduction SpeculativeScheduler thread

In each ExecutionJobVertex there is a SpeculativeScheduler thread used for detect the long tail execution in this ExecutionJobVertex periodically based on above rules. Its member-variables included SchedulerNG and ExecutionVertex[] etc. SchedulerNG used for scheduling the speculative executions and the ExecutionVertex[] used for get execution state timestamp in this ExecutionJobVertex.

Image Added

Modify of scheduler logical

The scheduler should scheduling an execution according to the index of this execution in executionList instead of default to scheduling the currentExecution. So we need change ExecutionVertexID to ExecutionVertexIDAndExecutionIndex which represent which execution in ExecutionVertex should be scheduling in scheduler logical. Also when task failover, executionIndex also should be calculated by fail task's ExecutionAttemptID so that scheduler knows which execution in executionList should be restart.

...

Code Block
languagejava
titleExecutionVertexIDWithExecutionIndex.java
public class ExecutionVertexIDWithExecutionIndex {
    private ExecutionVertexID executionVertexID;
    private Integer executionIndex;
}

In order to reuse the code of scheduler, we need to extend the interface with an additional method then SchedulerBase should implements it.

Code Block
languagejava
titleSchedulerNG interface extension
public interface SchedulerNG extends AutoCloseableAsync {
	default void schedulingSpeculativeExecutions(List<ExecutionVertexIDAndExecutionIndex>List<ExecutionVertexIDWithExecutionIndex> verticesToSchedule) {
        throw new UnsupportedOperationException();
    }
}

...

Process failover situation

Just like the normal tasks, the speculative task is executed in parallel with the original one and share the same failover and restart strategy.

...

But I think it should not be restarted globally when the speculation execution failover count reach the max-retry-counts. 

When a task fail, we could calculate its index(executionIndex) in executionList by executionAttemptID. Then the scheduler takes a series of processing for the corresponding execution according to the executionIndex as shown below.

Image Added

Black list of node

...