...
Member fields of ExecutionVertex need bigger reconstruction.So change the currentExecution in ExecutionVertex to an arrayList named executionList(The purpose of using list is to increase the code extensibility, in case that multiple speculative executions will run at the same time in the future), which means that there can be multiple executions in an ExecutionVertex at the same time and for each execution in executionList there is no difference of the behavior such as failover, request slot etc between them.
Code Block | ||||
---|---|---|---|---|
| ||||
private List<Execution> executionList = new ArrayList<>(); |
The scheduler should scheduling an execution according to the index of this execution in executionList instead of default to scheduling the currentExecution.todo..
In order to reuse code of scheduler, we need to extend the interface with an additional method:
Code Block | ||||
---|---|---|---|---|
| ||||
public interface SchedulerNG extends AutoCloseableAsync {
default void schedulingSpeculativeExecutions(List<ExecutionVertexIDAndExecutionIndex> verticesToSchedule) {
throw new UnsupportedOperationException();
}
} |
Handle of the situation of failover
...
Limitations
(1)JobType is Batch.
(2)External (2)Cluster ResourceManager is Yarn or K8s.
...