Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

(4)SpeculativeExecution is effective only when the JobVertex with all input and output edges are blocking. So, there are only one ExecutionVertex in a region.

Image RemovedImage Added

Proposed Changes

...

There is a SpeculativeScheduler thread detecting long tail executions periodically in each ExecutionJobVertex according to the criteria mentioned above. Its member-variables include SchedulerNG, ExecutionVertex[], and so on. SchedulerNG is used for scheduling the speculative executions and the ExecutionVertex[] is used for getting execution state timestamp in this ExecutionJobVertex.

Image RemovedImage Added

Modification of scheduler logicality

...

Code Block
languagejava
titleExecutionVertexIDWithExecutionIndex.java
public class ExecutionVertexIDWithExecutionIndex {
    private ExecutionVertexID executionVertexID;
    private Integer executionIndex;
}

Image Removed
Image Added

In order to reuse the code of scheduler, we need to extend the interface with an additional method. Then SchedulerBase should implements it.

...

Just like normal tasks, the speculative task is executed in parallel with the original one and shares the same failover and restart strategy. The original long tail tasks and speculative tasks can still retry with failure on their own tracks. But I think it should not be restarted globally when the counts of speculation execution failover reach the max-retry-counts. When a task fails, we could calculate its index(executionIndex) in the executionList by ExecutionAttemptID. Then the scheduler takes a series of processing for the corresponding execution according to the executionIndex as shown below.

Image RemovedImage Added

Some classes will add a member-variable named executionIndex, for example, FailureHandlingResult、ExecutionVertexDeploymentOption, and so on.

...

In DefaultExecutionGraph, I will add a member-variable jobBlackListSpeculativeListeners. After creating ExecutionGraph, jobBlackListTracker will be add in jobBlackListSpeculativeListeners. JobBlackListSpeculativeListener.onSpeculativeExecutionHappened() will be called when the SpeculativeExecution detectes a long tail task and starts to notify scheduler to scheduling a speculative execution.

Image RemovedImage Added

Code Block
languagejava
titleExecutionGraph
public interface ExecutionGraph extends AccessExecutionGraph {
	void registerJobBlackListSpeculativeListener(JobBlackListTracker jobBlackListTracker);
}

...

For example, as shown below.

Image Modified

(a) In an ExecutionVertex, after execution_1 has consumed inputSplit_0, it goes on to consume inputSplit_1.

...

As shown below, for batch job with blocking shuffle(similar to MapReduce). Because of introducing speculative execution, all reduce executions in an ExecutionVertex will consume the resultPartition of map ExecutionVertex's fastest finished execution.


Image RemovedImage Added

  • Once all map ExecutionVertexs finish, all executions in reduce ExecutionVertex should be notified to update its inputChannels from UNKNOW to LOCAL/REMOTE. So there are some modifications in Execution.updatePartitionConsumers().

...