Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In each ExecutionJobVertex there is a SpeculativeScheduler thread used for detect the long tail execution in this ExecutionJobVertex periodically based on above three rules.

Image RemovedImage Added

Because of the introduction of speculative execution, the ExecutionVertex can't default to only one execution is running at the same time.

...

Code Block
languagejava
titleExecutionVertexIDAndExecutionIndex.java
public class ExecutionVertexIDAndExecutionIndex {
    private ExecutionVertexID executionVertexID;
    private Integer executionIndex;
}

In

...

order

...

to

...

reuse

...

code

...

of

...

scheduler,

...

we

...

need

...

to

...

extend

...

the

...

interface

...

with

...

an

...

additional

...

method then SchedulerBase should implements it.

Code Block
languagejava
titleSchedulerNG interface extension
public interface SchedulerNG extends AutoCloseableAsync {
	default void schedulingSpeculativeExecutions(List<ExecutionVertexIDAndExecutionIndex> verticesToSchedule) {
        throw new UnsupportedOperationException();
    }
}

...

I will introduce blacklist module into Flink Flink which 


We have implemented a machine-dimensional blacklist per job. The machine IP was added in the blacklist when an execution is recognized as a long-tail execution. The blacklist would remove the machine IP when it is out of date.
When the executions are scheduled, we will add information of the blacklist to yarn PlacementConstraint. In this way, I can ensure that the yarn container is not on the machines in the blacklist.

The blacklist module is a thread that maintains the black machines of this job and removes expired elements periodically. Each element in blacklist contains IP and timestamp. The timestamp is used to decide whether the elements of the blacklist is expired or not. 

My code only supports Yarn integration. But as far as I know, we could use nodeaffinity or podaffinity to achieve the same goal with Yarn PlacementConstraint in K8s integration. As the mesos integration will be deprecated in Flink-1.13, I didn’t consider it.


Yarn


k8s


Mesos


Manage input and output of each ExecutionVertex

...