...
In our product cluster of Alibaba in China, we only considered running time could solve the long tail task problem sufficiently. So first version I will not take throughput into consider. After release this feature if Flink community if users have this requirement I will delevop next version.
Scheduling of Speculative Executions
Introduction speculative scheduler thread
In each ExecutionJobVertex there is a SpeculativeScheduler thread used for detect the long tail execution in this ExecutionJobVertex periodically based on above three rules.
...
Refactoring of ExecutionVertex
Because of introduction speculative execution, the ExecutionVertex can't default to only one execution is running at the same time. Some member-variables in ExecutionVertex need bigger refactor.
...
Code Block | ||||
---|---|---|---|---|
| ||||
private List<Execution> executionList = new ArrayList<>(); |
Introduction SpeculativeScheduler thread
In each ExecutionJobVertex there is a SpeculativeScheduler thread used for detect the long tail execution in this ExecutionJobVertex periodically based on above rules. Its member-variables included SchedulerNG and ExecutionVertex[] etc. SchedulerNG used for scheduling the speculative executions and the ExecutionVertex[] used for get execution state timestamp in this ExecutionJobVertex.
Modify of scheduler logical
The scheduler should scheduling an execution according to the index of this execution in executionList instead of default to scheduling the currentExecution. So we need change ExecutionVertexID to ExecutionVertexIDAndExecutionIndex which represent which execution in ExecutionVertex should be scheduling in scheduler logical. Also when task failover, executionIndex also should be calculated by fail task's ExecutionAttemptID so that scheduler knows which execution in executionList should be restart.
...
Code Block | ||||
---|---|---|---|---|
| ||||
public class ExecutionVertexIDWithExecutionIndex { private ExecutionVertexID executionVertexID; private Integer executionIndex; } |
In order to reuse the code of scheduler, we need to extend the interface with an additional method then SchedulerBase should implements it.
Code Block | ||||
---|---|---|---|---|
| ||||
public interface SchedulerNG extends AutoCloseableAsync { default void schedulingSpeculativeExecutions(List<ExecutionVertexIDAndExecutionIndex>List<ExecutionVertexIDWithExecutionIndex> verticesToSchedule) { throw new UnsupportedOperationException(); } } |
...
Process failover situation
Just like the normal tasks, the speculative task is executed in parallel with the original one and share the same failover and restart strategy.
...
But I think it should not be restarted globally when the speculation execution failover count reach the max-retry-counts.
When a task fail, we could calculate its index(executionIndex) in executionList by executionAttemptID. Then the scheduler takes a series of processing for the corresponding execution according to the executionIndex as shown below.
Black list of node
...