Status
This is an unfinished paper. I am writing. After finish I will send e-mail to dev@flink.apache.org.
Table of Contents |
---|
Status
Current state: Under Discussion
...
Page properties | |
---|---|
|
...
...
|
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Table of Contents |
---|
Motivation
In Flink batch jobprocessing, the a job is usually divided into multiple parallel tasks executed cross that execute across many nodes in the cluster. It is common to encounter the performance degradation on some nodes due to hardware problems, or accident I/O busy, or high CPU load. This kind of degradation can probably cause slow the tasks running tasks on the node to be quite slow, that is so called long tail tasks. Although the long tail tasks will not fail, they can severely affect the total job running time. Flink task scheduling does not take this long tail problem into account currently.it. And the slow tasks may become the bottleneck of the job.
To solve this kind of problem, we propose a speculative execution mechanism to FlinkHere we propose the speculative execution strategy FLINK-10644 to handle the problem. The basic idea is to run a copy of task on another node when the original task is identified to be long tail. The speculative task is executed in parallel with the original one and share the same failure retry mechanism. Once either task complete, the scheduler admits its output as the final result and cancel the other running one. I will introduce a blacklist module to schedule the long tail task on different machine from the original task. And modify FileOutputFormat.java to adapter speculative execution mechanism.
The preliminary experiments have demonstrated the effectiveness in our product cluster.
Proposed Changes
General design
Detection of Long Tail Tasks
A task will be classified as a long tail task when it meets the following three rules.
Finished Tasks Percentage
In one ExecutionJobVertex, when a configurable percentage of executions has been finished, the speculative execution thread begin really work.
Long Running Time
In speculative execution thread all executions' interval between the current time and it first create/deploying time before it failover in one ExecutionJobVertex are calculated. when the running time of a execution is greater than a configurable multiple of the median of the running time of other finished executions, this execution is defined as long tail execution.
Slow Processing Throughput
In our product cluster of Alibaba in China, we only considered running time could solve the long tail task problem sufficiently. So first version I will not take throughput into consider. After release this feature if Flink community if users have this requirement I will delevop next version.
Scheduling of Speculative Executions
Refactoring of ExecutionVertex
Because of introduction speculative execution, the ExecutionVertex can't default to only one execution is running at the same time. Some member-variables in ExecutionVertex need bigger refactor.
There are two ways of code refactoring:
- Add a member-variable named speculativeExecution that similar to currentExecution. But this way will lead to lots of if judgments in the scheduler and failover code. Also this way will reduce the flexible of code if there are more than two executions existed at the same time.
- Change the currentExecution in ExecutionVertex to an ArrayList named executionList which means there can be multiple executions in an ExecutionVertex at the same time. For each execution in executionList there is no difference of the behavior such as failover, request slot etc between them.
Code Block | ||||
---|---|---|---|---|
| ||||
private List<Execution> executionList = new ArrayList<>(); |
Introduction SpeculativeScheduler thread
In each ExecutionJobVertex there is a SpeculativeScheduler thread used for detect the long tail execution in this ExecutionJobVertex periodically based on above rules. Its member-variables included SchedulerNG and ExecutionVertex[] etc. SchedulerNG used for scheduling the speculative executions and the ExecutionVertex[] used for get execution state timestamp in this ExecutionJobVertex.
Modify of scheduler logical
The scheduler should scheduling an execution according to the index of this execution in executionList instead of default to scheduling the currentExecution. So we need change ExecutionVertexID to ExecutionVertexIDAndExecutionIndex which represent which execution in ExecutionVertex should be scheduling in scheduler logical. Also when task failover, executionIndex also should be calculated by fail task's ExecutionAttemptID so that scheduler knows which execution in executionList should be restart.
Code Block | ||||
---|---|---|---|---|
| ||||
public class ExecutionVertexIDWithExecutionIndex {
private ExecutionVertexID executionVertexID;
private Integer executionIndex;
} |
In order to reuse the code of scheduler, we need to extend the interface with an additional method then SchedulerBase should implements it.
Code Block | ||||
---|---|---|---|---|
| ||||
public interface SchedulerNG extends AutoCloseableAsync {
default void schedulingSpeculativeExecutions(List<ExecutionVertexIDWithExecutionIndex> verticesToSchedule) {
throw new UnsupportedOperationException();
}
} |
Process failover situation
Just like the normal tasks, the speculative task is executed in parallel with the original one and share the same failover and restart strategy.
The original long tail task and speculative tasks can still retry with failure on their own track.
But I think it should not be restarted globally when the speculation execution failover count reach the max-retry-counts.
When a task fail, we could calculate its index(executionIndex) in executionList by executionAttemptID. Then the scheduler takes a series of processing for the corresponding execution according to the executionIndex as shown below.
In order to better failover logic, I will extend the calss FailureHandlingResult with an additional member-variable.
Code Block | ||||
---|---|---|---|---|
| ||||
public class FailureHandlingResult {
@Nullable private final Integer executionIndex;
} |
Black list
Most long tail tasks are caused by machine problems, so the speculative execution must runs on a different machine from origin execution.
I will introduce blacklist module into Flink used for filter node when the speculative executions are request slots.
concept of black list
Blacklist is a kind of scheduling constraint. According to FLINK-11000 description this is a bigger feature.
There are several levels of blacklist, including (JobVertex, TaskManager) blacklist, (JobVertex Host) blacklist, (Job, TaskManager) blacklist, (Job, Host) blacklist, (Session, TaskManager) blacklist and (Session Host) blacklist.
I will implement (Job, Host) blacklist for speculative execution feature. In order to implement FLINK-11000 feiendly in the future, my interface also suit other blacklist descripted above.The blacklist module is a thread that maintains the black machines of this job and removes expired elements periodically.Each element in blacklist contains IP and timestamp. The timestamp is used to decide whether the elements of the blacklist is expired or not.
Classes and Interfaces of (JobVertex Host) blacklist
(1)Abstract Class BlackList, each type of blacklist will extends it.
Code Block | ||||
---|---|---|---|---|
| ||||
public abstract class BlackList implements Serializable {
/** Black list configuration. */
protected final BlackListConfig blackListConfig;
public BlackList(BlackListConfig blackListConfig) {
this.blackListConfig = blackListConfig;
}
/**
* Remove time out black list records.
* @param timeout Time out time.
* @return Minimum timestamp of black list records.
*/
public abstract long removeTimeoutBlackList(long timeout);
/** Clear black list. */
public abstract void clear();
/** Is black list empty. */
public abstract boolean isEmpty();
} |
...
Code Block | ||||
---|---|---|---|---|
| ||||
public class JobBlackList extends BlackList {
/** The list of the black ip. This list is mainly for time out checking. */
private final Queue<BlackListRecord> jobToIpBlackListRecords;
/** The set of the black ip. This set is mainly for black host filter. */
private final Set<String> jobBlackIpSet;
public JobBlackList(BlackListConfig blackListConfig) {
super(blackListConfig);
this.jobToIpBlackListRecords = new ConcurrentLinkedQueue<>();
this.jobBlackIpSet = Collections.newSetFromMap(new ConcurrentHashMap<>());
}
/** Add a ip to this black list. */
public boolean addIpToBlackList(String ip) {}
public Set<String> getAllBlackListIpSet() {}
/** clear (job, ip) blacklist. */
@Override
public void clear() {}
@Override
public long removeTimeoutBlackList(long timeout) {}
@Override
public boolean isEmpty() {}
/** Whether the given host has been added to black list. */
public boolean containsIp(String ip) {}
} |
(3)BlackListRecord, the item of blackList
Code Block | ||||
---|---|---|---|---|
| ||||
public class BlackListRecord implements Serializable {
/**
* Black list record which stores the black host ip and
* the time stamp when this record is added.
*/
public class BlackListRecord implements Serializable {}
/** The black host ip. */
private final String ip;
/** The time stamp when this black list record is added. */
private final long timeStamp;
}
|
(4)A abstract Class called BlackListTracker, a thread that maintain blacklist info
Code Block | ||||
---|---|---|---|---|
| ||||
public abstract class BlackListTracker implements Runnable {
/** The executor to run the time out checking task. */
private final ScheduledExecutor executor;
/** The black list configuration. */
protected final BlackListConfig blackListConfig;
/** The black list timeout check future, will be canceled when black black list destroyed. */
private AtomicReference<ScheduledFuture> timeoutCheckFuture;
public BlackListTracker(ScheduledExecutor executor, BlackListConfig blackListConfig) {
Preconditions.checkNotNull(blackListConfig);
this.executor = executor;
this.blackListConfig = blackListConfig;
this.timeoutCheckFuture = new AtomicReference<>();
}
/**
* Given the minimum time stamp of black list record. The function schedules a task to remove the black list
* record when it got timeout.
* @param minTimeStamp The minimum time stamp of black list record.
*/
public void scheduleTimeOutCheck(long minTimeStamp) {}
public boolean isBlackListEnabled() {
return blackListConfig.isBlackListEnabled();
}
/** Clear the black list. */
public abstract void clearBlackList();
/** Get all black list ip. */
public abstract Set<String> getAllBlackListIp();
/** Clear the black list and cancel the timeout check task. */
public void destroy() {}
}
|
(5)JobBlackListSpeculativeListener, event listener
Code Block | ||||
---|---|---|---|---|
| ||||
/** Listener to be notified when speculative execution happened. */
public interface JobBlackListSpeculativeListener {
/**
* When a speculative execution happened, the listener will be notified.
* @param ip the ip
*/
void onSpeculativeExecutionHappened(String ip);
}
|
(5)JobBlackListTracker, per-job blackList tracker
Code Block | ||||
---|---|---|---|---|
| ||||
public class JobBlackListTracker extends BlackListTracker implements JobBlackListSpeculativeListener {
/** The black list of this job. */
private final JobBlackList jobBlackList;
public JobBlackListTracker(ScheduledExecutor executor, BlackListConfig blackListConfig) {
super(executor, blackListConfig);
this.jobBlackList = new JobBlackList(blackListConfig);
scheduleTimeOutCheck(blackListConfig.getBlackListTimeOutInMillis());
}
@Override
public void clearBlackList() {
jobBlackList.clear();
}
@Override
public Set<String> getAllBlackListIp() {
if (blackListConfig.isBlackListEnabled()) {
return jobBlackList.getAllBlackListIpSet();
}
return Collections.emptySet();
}
/** The time out checking task to be scheduled. /
@Override
public void run() {
long minTimeStamp = jobBlackList.removeTimeoutBlackList(blackListConfig.getBlackListTimeOutInMillis());
scheduleTimeOutCheck(minTimeStamp);
}
@Override
public void onSpeculativeExecutionHappened(String ip) {
if (blackListConfig.isBlackListEnabled()) {
jobBlackList.addIpToBlackList(ip);
}
}
}
|
Init black list
In DefaultExecutionGraph I will add a member-variable jobBlackListSpeculativeListeners and when after create ExecutionGraph jobBlackListTracker will be add in jobBlackListSpeculativeListeners. JobBlackListSpeculativeListener.onSpeculativeExecutionHappened() will be called when the SpeculativeExecution detected a long tail task and start notify scheduler to scheduling a speculative execution.
Code Block | ||||
---|---|---|---|---|
| ||||
public interface ExecutionGraph extends AccessExecutionGraph {
void registerJobBlackListSpeculativeListener(JobBlackListTracker jobBlackListTracker);
}
|
Code Block | ||||
---|---|---|---|---|
| ||||
public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionGraphAccessor {
private final List<JobBlackListSpeculativeListener> jobBlackListSpeculativeListeners;
public DefaultExecutionGraph() {
this.jobBlackListSpeculativeListeners = new ArrayList<>();
}
@Override
public void registerJobBlackListSpeculativeListener(JobBlackListTracker listener) {
if (listener != null) {
jobBlackListSpeculativeListeners.add(listener);
}
}
@Override
public List<JobBlackListSpeculativeListener> getJobBlackListSpeculativeListeners() {
return jobBlackListSpeculativeListeners;
}
}
|
Add element to black list
First JobBlackListSpeculativeListener.onSpeculativeExecutionHappened() will be called when the SpeculativeExecution detected a long tail task and start notify scheduler to scheduling a speculative execution.
Second ExecutionGraph will notify listener(JobBlackListTracker) it.
Third JobBlackList in JobBlackListTracker will be added ip of original execution location.
Remove element in black list
The BlackListTracker has implement Runnable and maintains the black machines of this job and removes expired elements periodically. Each element in blacklist contains IP and timestamp. The timestamp is used to decide whether the elements of the blacklist is expired or not.
Pass the blacklist information to cluster ResourceManager
Yarn
Now Flink use hadoop version is 2.4.1, but if I want to use PlacementConstraints of Yarn, I must
<hadoop.version>2.4.1</hadoop.version>
反射,yarn-3.0
When the executions are scheduled, we will add information of the blacklist to yarn PlacementConstraint. In this way, I can ensure that the yarn container is not on the machines in the blacklist.
https://hadoop.apache.org/docs/r3.1.1/hadoop-yarn/hadoop-yarn-site/PlacementConstraints.html
k8s
My code only supports Yarn integration. But as far as I know, we could use nodeaffinity or podaffinity to achieve the same goal with Yarn PlacementConstraint in K8s integration.
Mesos
According to
Jira | ||||||
---|---|---|---|---|---|---|
|
So we don’t need to think about how to pass the blacklist information to mesos.
Manage input and output of each ExecutionVertex
Manage InputSplit
Manage sink files
Manage middle ResultPartition
Metrics
// 必须要多少execution完成了,才开始预测执行的metrics
private AtomicInteger minFinishedForSpeculationThresholdMetrics;
// 已经完成的execution数的metrics
private AtomicInteger finishedExecutionCountMetrics;
// 有多少execution发生了预测执行的metrics
private Counter speculativeExecutionCountMetrics;
// execution运行时间维度,发生预测执行的阈值
private AtomicDouble speculativeThresholdOfTime;
// 每个ExecutionVertex中原execution的运行时间的metrics
private Map<String, AtomicDouble> executionIndex2RunningTimespan;
// 如果最快结束的execution是预测执行的execution,那么对相应的metrics进行汇报
private Counter speculativeExecutionFastestFinishedCountMetrics;
Web UI
todo...
Limitations
(1)JobType is Batch.
(2)Cluster ResourceManager is Yarn or K8s.
(3)input split的限制?
(4)yarn要对ip打标
(5)k8s的限制?
(6)The JobVertex with all input edges and all output edges are blocking could enable speculative execution feature.
Configuration
...
start mirror tasks on other nodes when a slow task is detected. The mirror task processes the same input data and produces the same data as the original task. The original task and mirror tasks are treated equally. Once any of them finishes, its output will be admitted and the remaining homogeneous tasks will be canceled.
Public Interfaces
Configuration
Speculative executions will work along with adaptive batch scheduling (FLIP-187). New configuration options will be added for the scheduler:
- jobmanager.adaptive-batch-scheduler.speculative.enabled, default to "false". It controls whether to enable speculative execution. Note that speculative execution only works with AdaptiveBatchScheduler, which requires "jobmanager.scheduler" to be set to "AdaptiveBatch".
- jobmanager.adaptive-batch-scheduler.speculative.max-concurrent-executions, default to "2". It controls how many executions (including the original one and speculative ones) of an ExecutionVertex can execute at the same time.
- jobmanager.adaptive-batch-scheduler.speculative.block-slow-node-duration, default to "1 min". It controls how long an identified slow node should be blocked for.
New configuration options will be added in SlowTaskDetectorOptions for slow task detection:
- slow-task-detector.check-interval, default to "1 s". It defines the interval to check slow tasks.
- slow-task-detector.execution-time.baseline-lower-bound, default to "1 min". It defines the lower bound of the slow task detection baseline.
- slow-task-detector.execution-time.baseline-ratio, default to "0.75". It defines the finished execution ratio threshold to compute the slow task detection baseline.
- slow-task-detector.execution-time.baseline-multiplier, default to "1.5". It defines the multiplier to compute the slow tasks detection baseline.
Metrics
We propose to add following metrics to expose job problems and show the effectiveness of speculative execution:
- numSlowExecutionVertices: Number of slow execution vertices at the moment.
- numEffectiveSpeculativeExecutions: Number of speculative executions which finish before their corresponding original executions finish.
Proposed Changes
When speculative execution is enabled, a SpeculativeScheduler(which extends AdaptiveBatchScheduler) will be used for task scheduling. SpeculativeScheduler will listen on slow tasks detected by SlowTaskDetector. It will create and deploy speculative executions for the slow tasks. Nodes that slow tasks located on will be treated as slow nodes and get blocked, so that speculative executions will not be deployed on them. Once any execution finishes, the remaining homogeneous tasks will be canceled, so that only one execution will be admitted as finished and only its output will be visible to downstream consumer tasks or in external sink services.
SlowTaskDetector
A SlowTaskDetector will periodically check all the current tasks/executions and notify the SlowTaskDetectorListener about the detected slow tasks. SpeculativeScheduler will register itself as the SlowTaskDetectorListener.
Code Block | ||||
---|---|---|---|---|
| ||||
public interface SlowTaskDetector {
void start(
ExecutionGraph executionGraph,
SlowTaskDetectorListener listener,
ComponentMainThreadExecutor mainThreadExecutor);
void stop();
}
public interface SlowTaskDetectorListener {
void notifySlowTasks(Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks);
} |
In the first version, an ExecutionTimeBasedSlowTaskDetector will be used. In the future, we can make SlowTaskDetector customizable to support different slow task detecting strategies, e.g. based on task throughput.
ExecutionTimeBasedSlowTaskDetector
For ExecutionTimeBasedSlowTaskDetector, if a task's execution time is much longer than that of most tasks of the same JobVertex, the task will be identified as slow. More specifically, it will compute an execution time baseline for each JobVertex. Tasks which execute longer than or equals to the baseline will be identified as slow tasks.
Here's the process to compute the baseline of a JobVertex:
- define N = parallelism of JobVertex
- define R = finished task ratio, set via config
- define M = baseline multiplier, set via config
- define B = lower-bound of baseline, set via config
- the baseline will be computed only if N*R ExecutionVertex of the JobVertex have finished
- T = median of the execution time of the earliest N*R finished tasks
- baseline = max(T * M, B)
Related configurations are:
- finished task ratio: slow-task-detector.execution-time.baseline-ratio
- baseline multiplier: slow-task-detector.execution-time.baseline-multiplier
- lower-bound of baseline: slow-task-detector.execution-time.baseline-lower-bound
Note that the baseline will be used only after N*R ExecutionVertex of the JobVertex have finished. No task of the JobVertex will be identified as slow before that.
Definition of execution time
The execution time of a task/execution includes the deploying time, initializing time and running time. Because processes of task deploying, initializing and running all will be affected by a slow node. This means:
- for a FINISHED task, execution time = timestamp[FINISHED] - timestamp[DEPLOYING]
- for a DEPLOYING/INITIALIZING/RUNNING task, execution time = current timestamp - timestamp[DEPLOYING]
- for other tasks, execution time = 0
SpeculativeExecutionVertex
SpeculativeExecutionVertex will be used if speculative execution is enabled, as a replacement of ExecutionVertex to form an ExecutionGraph. The core difference is that a SpeculativeExecutionVertex can have multiple current executions running at the same time.
SpeculativeExecutionVertex extends ExecutionVertex. It needs to implement new methods as below:
Code Block | ||||
---|---|---|---|---|
| ||||
public class SpeculativeExecutionVertex extends ExecutionVertex {
public Execution getCurrentExecution(ExecutionAttemptID id);
public Collection<Execution> getCurrentExecutions();
/**
/* Create a new speculative execution and add it to current executions.
*/
public Execution createNewSpeculativeExecution(long timestamp);
/**
/* Returns whether the vertex contains source operators.
*/
public boolean containsSources();
/**
/* Returns whether the vertex contains sink operators.
*/
public boolean containsSinks();
...
} |
Besides that, it also needs to override some methods of ExecutionVertex:
- Operations like cancel/fail/markFailed/suspend must be applied to all the current executions.
- The current execution (returned by getCurrentExecutionAttempt()) should point to the execution which best represents the status of the execution vertex. We defined a state priority to achieve this goal.
State Priority
Even though SpeculativeExecutionVertex may have multiple current executions at the same time, ExecutionVertex#getCurrentExecutionAttempt() which returns one only current execution is still needed to
- show task execution status on the web UI. This is needed for compatibility purpose before the web UI is fully reworked to support display multiple concurrent executions.
- help with the deployment of downstream tasks. In this case, the current execution must be FINISHED.
For the purposes above, the current execution should point to the FINISHED execution once any current execution has finished. If no execution has finished, the current execution should point to the execution with the fastest progress.
To achieve this goal, we define a state priority. A state has higher priority if it is more likely to transition to the FINISHED state. And the current execution should always point to the execution of the highest priority. If multiple current executions have the same highest priority, choose the earliest created one.
States from the highest priority to the lowest priority are:
- FINISHED
- RUNNING
- INITIALIZING
- DEPLOYING
- SCHEDULED
- CREATED
- CANCELING
- FAILED
- CANCELED
SpeculativeScheduler
A SpeculativeScheduler will be used if speculative execution is enabled. It extends AdaptiveBatchScheduler so that speculative execution can work along with the feature to adaptively tuning parallelisms for batch jobs.
The major differences of SpeculativeScheduler are:
- SpeculativeScheduler needs to be able to directly deploy an Execution, while AdaptiveBatchScheduler can only perform ExecutionVertex level deployment.
- SpeculativeScheduler does not restart the ExecutionVertex if an execution fails when any other current execution is still making progress
- SpeculativeScheduler listens on slow tasks. Once there are slow tasks, it will block the slow nodes and deploy speculative executions of the slow tasks on other nodes.
- Once any execution finishes, SpeculativeScheduler will cancel all the remaining executions of the same execution vertex.
Scheduler directly deploys executions
Currently, the DefaultScheduler(base of AdaptiveBatchScheduler) can only perform ExecutionVertex level deployment. However, in this case, the scheduler is actually deploying the current execution of the ExecutionVertex.
Therefore, conceptually it is naturally to rework the DefaultScheduler to directly deploy an execution.
Code Block | ||||
---|---|---|---|---|
| ||||
public class DefaultScheduler {
protected void allocateSlotsAndDeploy(List<ExecutionDeploymentOption
> executionsToDeploy);
...
}
public class ExecutionDeploymentOption {
ExecutionAttemptID executionId;
DeploymentOption deploymentOption;
} |
The ExecutionSlotAllocator also needs to be reworked to directly allocate slots for Executions.
Code Block | ||||
---|---|---|---|---|
| ||||
public interface ExecutionSlotAllocator {
CompletableFuture<LogicalSlot> allocateSlotFor(ExecutionAttemptID executionAttemptId);
/**
* Cancel the ongoing slot request or release the allocated slot of an execution.
*/
void cancel(ExecutionAttemptID executionAttemptId);
...
} |
Handling execution failures
For SpeculativeScheduler, when a task fails,
- if all the current executions of the execution vertex are no longer possible to finish, restart the execution vertex. An execution is possible to finish if and only if it is in CREATED/SCHEDULED/DEPLOYING/INITIALIZING/RUNNING/FINISHED state.
- else if the failure is a PartitionException, it means that the consumed data is corrupted or lost. It will restart the producer execution vertex of the problematic partition.
- otherwise, the execution vertex is still possible to finish. Do not trigger a restart in this case.
The failed execution will be removed from current executions to make room for possible speculative executions in the future.
Handling slow tasks
Once notified about slow tasks, the SpeculativeScheduler will handle them as below:
- Block nodes that the slow tasks locate on. To achieve this, the scheduler will add the slow nodes to the blocklist. The block action will be MARK_BLOCKED so that future tasks will not be deployed to the slow node, while deployed tasks can keep running. (See FLIP-224 Blocklist Mechanism for more details)
- Create speculative executions for slow tasks until the current executions of each execution vertex reach the concurrency limit (defined via config jobmanager.adaptive-batch-scheduler.speculative.max-concurrent-executions)
- Deploy the newly created speculative executions
Limitations
Batch jobs only
Speculative execution only works with batch jobs. It is enabled if jobmanager.scheduler is AdaptiveBatch and jobmanager.adaptive-batch-scheduler.speculative.enabled is true in Flink configuration. Exceptions will be thrown if a streaming job is submitted in this case.
No support for DataSet jobs
DataSet API will be deprecated in the future and therefore is not supported. DataStream API is now the recommended low level API to develop Flink batch jobs.
No support for PIPELINED data exchanges
Currently, AdaptiveBatchScheduler does not support jobs with PIPELINED data exchanges. As a result, speculative execution does not support PIPELINED data exchanges either. Requiring all data exchanges to be BLOCKING also simplifies things, because each ExecutionVertex is an individual pipelined region in this case and can have individual speculations. Otherwise multiple ExecutionVertex from one pipelined region need to do speculative execution together.
This also means that
Speculative execution of sources and sinks are disabled by default
Speculative execution of sources and sinks are disabled by default, this is because not all sources and sinks can work with speculative execution.
There will be several follow up proposals to enable speculative execution of sources and sinks:
- FLIP-XXX(coming soon) is a proposal to enable speculative execution to work with most sources, without changing the source connectors
- FLIP-XXX(coming soon) is a proposal to improve Flink interfaces to support speculative execution of sinks. Sinks must implement these interfaces to work with speculative execution
- FLIP-XXX(coming soon) is a proposal to improve FileSink to work with speculative execution
- FLIP-XXX(coming soon) is a proposal to improve HiveTableSink to work with speculative execution
With these proposed improvements, the scheduler will be able to see whether the given source or sink support speculative execution. It will enable speculative execution for a JobVertex only if all the contained sources and sinks support speculative execution.
No integration with Flink's web UI
The web UI does not show all the concurrent executions of each ExecutionVertex/subtask. It only shows the one with the fastest progress.
User defined functions must not be affected by its speculative instances
When a user defined function and its speculative instances run concurrently, they must not affect each other. For example,
- access to the same exclusive resources
- overriding the output to external services which happens as a side effect, i.e. not via Flink sinks
- competition for data ingestion. Note that it includes cases that
- user defined source function competition
- data ingestion happens as a side effect, i.e. not via Flink sources.
- ...
Once the concurrent instances can affect each other, it may result in task failures, or even worse, data inconsistency. So that speculative executions should not be enabled in this case.
Compatibility, Deprecation, and Migration Plan
Speculative execution is enabled only if the newly introduced configuration option jobmanager.adaptive-batch-scheduler.speculative.enabled is true (false by default). This entails that Flink's default behavior won't change.
Test Plan
The proposed changes must be covered with UT, IT and E2E tests. It should also be tested via TPC-DS benchmarks in a real cluster
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Test Plan
Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.