This is an unfinished paper. I am writing. After finish I will send e-mail to dev@flink.apache.org.
Status
Current state: Under Discussion
Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)
JIRA:
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In Flink batch job, the job is usually divided into multiple parallel tasks executed cross many nodes in the cluster. It is common to encounter the performance degradation on some nodes due to hardware problems, or accident I/O busy, or high CPU load. This kind of degradation can probably cause the running tasks on the node to be quite slow, that is so called long tail tasks. Although the long tail tasks will not fail, they can severely affect the total job running time. Flink task scheduling does not take this long tail problem into account currently.
Here we propose the speculative execution strategy [FLINK-10644] to handle the problem. The basic idea is to run a copy of task on another node when the original task is identified to be long tail. The speculative task is executed in parallel with the original one and share the same failure retry mechanism. Once either task complete, the scheduler admits its output as the final result and cancel the other running one. I will introduce a blacklist module to schedule the long tail task on different machine from the original task. And modify FileOutputFormat.java to adapter speculative execution mechanism.
The preliminary experiments have demonstrated the effectiveness in our product cluster.
Proposed Changes
General design
Detection of Long Tail Tasks
A task will be classified as a long tail task when it meets the following three rules.
Finished Tasks Percentage
In one ExecutionJobVertex, when a configurable percentage of executions has been finished, the speculative execution thread begin really work.
Long Running Time
In speculative execution thread all executions' interval between the current time and it first create/deploying time before it failover in one ExecutionJobVertex are calculated. when the running time of a execution is greater than a configurable multiple of the median of the running time of other finished executions, this execution is defined as long tail execution.
Slow Processing Throughput
The primary characteristic of long tail tasks is that their processing throughput are much slower than the expected or than other normal tasks.
todo..
Scheduling of Speculative Executions
In each ExecutionJobVertex there is a SpeculativeScheduler thread used for detect the long tail execution in this ExecutionJobVertex periodically based on above three rules.
Because of the introduction of speculative execution, the ExecutionVertex can't default to only one execution is running at the same time.
Member fields of ExecutionVertex need bigger reconstruction.So change the currentExecution in ExecutionVertex to an arrayList named executionList(The purpose of using list is to increase the code extensibility, in case that multiple speculative executions will run at the same time in the future), which means that there can be multiple executions in an ExecutionVertex at the same time and for each execution in executionList there is no difference of the behavior such as failover, request slot etc between them.
private List<Execution> executionList = new ArrayList<>();
The scheduler should scheduling an execution according to the index of this execution in executionList instead of default to scheduling the currentExecution. So we need change ExecutionVertexID to ExecutionVertexIDAndExecutionIndex which represent which execution in ExecutionVertex should be scheduling in scheduler logical.
public class ExecutionVertexIDAndExecutionIndex { private ExecutionVertexID executionVertexID; private Integer executionIndex; }
In order to reuse code of scheduler, we need to extend the interface with an additional method then SchedulerBase should implements it.
public interface SchedulerNG extends AutoCloseableAsync { default void schedulingSpeculativeExecutions(List<ExecutionVertexIDAndExecutionIndex> verticesToSchedule) { throw new UnsupportedOperationException(); } }
Handle of the situation of failover
Just like the normal tasks, the speculative task is executed in parallel with the original one and share the same failover and restart strategy.
The original long tail task and speculative tasks can still retry with failure on their own track.
But I think it should not be restarted globally when the speculation execution failover count reach the max-retry-counts.
Black list of node
Most long tail task are caused by cluster problems, so I must ensure speculative execution runs on different node from origin execution.
I will introduce blacklist module into Flink which
We have implemented a machine-dimensional blacklist per job. The machine IP was added in the blacklist when an execution is recognized as a long-tail execution. The blacklist would remove the machine IP when it is out of date.
When the executions are scheduled, we will add information of the blacklist to yarn PlacementConstraint. In this way, I can ensure that the yarn container is not on the machines in the blacklist.
The blacklist module is a thread that maintains the black machines of this job and removes expired elements periodically. Each element in blacklist contains IP and timestamp. The timestamp is used to decide whether the elements of the blacklist is expired or not.
My code only supports Yarn integration. But as far as I know, we could use nodeaffinity or podaffinity to achieve the same goal with Yarn PlacementConstraint in K8s integration. As the mesos integration will be deprecated in Flink-1.13, I didn’t consider it.
Yarn
k8s
Mesos
Manage input and output of each ExecutionVertex
Manage InputSplit
Manage sink files
Manage middle ResultPartition
Metrics
Web UI
Limitations
(1)JobType is Batch.
(2)Cluster ResourceManager is Yarn or K8s.
(3)The JobVertex with all input edges and all output edges are blocking could enable speculative execution feature.
Configuration
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Test Plan
Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.