...
(4)SpeculativeExecution is effective only when the JobVertex with all input and output edges are blocking. So, there are only one ExecutionVertex in a region.
Proposed Changes
...
There is a SpeculativeScheduler thread detecting long tail executions periodically in each ExecutionJobVertex according to the criteria mentioned above. Its member-variables include SchedulerNG, ExecutionVertex[], and so on. SchedulerNG is used for scheduling the speculative executions and the ExecutionVertex[] is used for getting execution state timestamp in this ExecutionJobVertex.
Modification of scheduler logicality
...
Code Block | ||||
---|---|---|---|---|
| ||||
public class ExecutionVertexIDWithExecutionIndex { private ExecutionVertexID executionVertexID; private Integer executionIndex; } |
In order to reuse the code of scheduler, we need to extend the interface with an additional method. Then SchedulerBase should implements it.
...
Just like normal tasks, the speculative task is executed in parallel with the original one and shares the same failover and restart strategy. The original long tail tasks and speculative tasks can still retry with failure on their own tracks. But I think it should not be restarted globally when the counts of speculation execution failover reach the max-retry-counts. When a task fails, we could calculate its index(executionIndex) in the executionList by ExecutionAttemptID. Then the scheduler takes a series of processing for the corresponding execution according to the executionIndex as shown below.
Some classes will add a member-variable named executionIndex, for example, FailureHandlingResult、ExecutionVertexDeploymentOption, and so on.
...
In DefaultExecutionGraph, I will add a member-variable jobBlackListSpeculativeListeners. After creating ExecutionGraph, jobBlackListTracker will be add in jobBlackListSpeculativeListeners. JobBlackListSpeculativeListener.onSpeculativeExecutionHappened() will be called when the SpeculativeExecution detectes a long tail task and starts to notify scheduler to scheduling a speculative execution.
Code Block | ||||
---|---|---|---|---|
| ||||
public interface ExecutionGraph extends AccessExecutionGraph { void registerJobBlackListSpeculativeListener(JobBlackListTracker jobBlackListTracker); } |
...
For example, as shown below.
(a) In an ExecutionVertex, after execution_1 has consumed inputSplit_0, it goes on to consume inputSplit_1.
...
As shown below, for batch job with blocking shuffle(similar to MapReduce). Because of introducing speculative execution, all reduce executions in an ExecutionVertex will consume the resultPartition of map ExecutionVertex's fastest finished execution.
- Once all map ExecutionVertexs finish, all executions in reduce ExecutionVertex should be notified to update its inputChannels from UNKNOW to LOCAL/REMOTE. So there are some modifications in Execution.updatePartitionConsumers().
...