...
In each ExecutionJobVertex there is a SpeculativeScheduler thread used for detect the long tail execution in this ExecutionJobVertex periodically based on above three rules.
Because of the introduction of speculative execution, the ExecutionVertex can't default to only one execution is running at the same time.
...
Code Block | ||||
---|---|---|---|---|
| ||||
public class ExecutionVertexIDAndExecutionIndex { private ExecutionVertexID executionVertexID; private Integer executionIndex; } |
In
...
order
...
to
...
reuse
...
code
...
of
...
scheduler,
...
we
...
need
...
to
...
extend
...
the
...
interface
...
with
...
an
...
additional
...
method then SchedulerBase should implements it.
Code Block | ||||
---|---|---|---|---|
| ||||
public interface SchedulerNG extends AutoCloseableAsync { default void schedulingSpeculativeExecutions(List<ExecutionVertexIDAndExecutionIndex> verticesToSchedule) { throw new UnsupportedOperationException(); } } |
...
I will introduce blacklist module into Flink Flink which
We have implemented a machine-dimensional blacklist per job. The machine IP was added in the blacklist when an execution is recognized as a long-tail execution. The blacklist would remove the machine IP when it is out of date.
When the executions are scheduled, we will add information of the blacklist to yarn PlacementConstraint. In this way, I can ensure that the yarn container is not on the machines in the blacklist.
The blacklist module is a thread that maintains the black machines of this job and removes expired elements periodically. Each element in blacklist contains IP and timestamp. The timestamp is used to decide whether the elements of the blacklist is expired or not.
My code only supports Yarn integration. But as far as I know, we could use nodeaffinity or podaffinity to achieve the same goal with Yarn PlacementConstraint in K8s integration. As the mesos integration will be deprecated in Flink-1.13, I didn’t consider it.
Yarn
k8s
Mesos
Manage input and output of each ExecutionVertex
...