Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Slow Processing Throughput

The primary characteristic of long tail tasks is that their processing throughput are much slower than the expected or than other normal tasks.

todo..In our product cluster of Alibaba in China, we only considered running time could solve the long tail task problem sufficiently. So first version I will not take throughput into consider. After release this feature if Flink community users have this requirement I will delevop next version. 

Scheduling of Speculative Executions

Introduction speculative scheduler thread

In each ExecutionJobVertex there is a SpeculativeScheduler thread used for detect the long tail execution in this ExecutionJobVertex periodically based on above three rules.


refactoring of ExecutionVertex


Because of the introduction of speculative execution, the ExecutionVertex can't default to only one execution is running at the same time. Member fields of Some member-variables in ExecutionVertex need bigger reconstruction.So change the currentExecution in ExecutionVertex to an ArrayList named executionList(The purpose of using list is to increase the code extensibility, in case that multiple speculative executions will run at the same time in the future), which means that refactor.

There are two ways of code refactoring:

  1. Add a member-variable named speculativeExecution that similar to currentExecution. But this way will lead to lots of if judgments in the scheduler and failover code. Also this way will reduce the flexible of code if there are more than two executions existed at the same time. 
  2. Change the currentExecution in ExecutionVertex to an ArrayList named executionList which means there can be multiple executions in an ExecutionVertex at the same time. For each execution in executionList there is no difference of the behavior such as failover, request slot etc between them.

...

Code Block
languagejava
titleRefactor member field of ExecutionVertex
private List<Execution> executionList = new ArrayList<>();

...

Code Block
languagejava
titleExecutionVertexIDAndExecutionIndexExecutionVertexIDWithExecutionIndex.java
public class ExecutionVertexIDAndExecutionIndexExecutionVertexIDWithExecutionIndex {
    private ExecutionVertexID executionVertexID;
    private Integer executionIndex;
}

Image Added



In order to reuse code of scheduler, we need to extend the interface with an additional method then SchedulerBase should implements it.

...