Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

...

Page properties


Discussion thread

...

...

...

thread/ot352tp8t7mclzx9zfv704gcm0fwrq58
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-

...

28131

Release1.16


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Table of Contents

Motivation

In Flink batch jobprocessing, the a job is usually divided into multiple parallel tasks that executed cross execute across many nodes in the cluster. It is common to encounter the performance degradation on some nodes due to hardware problems, accident I/O busy, or high CPU load. This kind of degradation can probably slow the tasks running tasks on the node, that is so called long tail tasks. Although long tail tasks will finished finally, they can significantly increase the total job running time. Currently, this long tail problem has not been well solved.it. And the slow tasks may become the bottleneck of the job.

To solve this kind of problem, we propose a speculative execution mechanism to FlinkHere I propose a speculative execution strategy FLINK-10644 to handle this problem. The basic idea is to run a copy of the task on another node when the original task is identified as a long tail task. The speculative task is executed in parallel with the original one and shares the same failure retry mechanism. Once either task completes, the scheduler admits its output as the final result and cancels the other running one. A blacklist module is introduced to schedule the long tail task on different machine from the original task and modify FileOutputFormat.java to adapter speculative execution mechanism.

The preliminary experiments in Alibaba's product cluster have demonstrated the effectiveness of this strategy.

Limitations

(1)JobType must be batch job.

(2)Cluster ResourceManager must be Yarn or K8s.

          If cluster ResourceManager is Yarn, NodeManagers' attributes should include machine IP attribute.

          If cluster ResourceManager is K8s, IP label should be attached to Nodes.

(3)If users don't allow to sink duplicate data to non-key-value databases. SpeculativeExecution can't be used.

(4)SpeculativeExecution is effective only when the JobVertex with all input and output edges are blocking. So, there are only one ExecutionVertex in a region.

Image Removed

Proposed Changes

General design

Detection of Long Tail Tasks

A task will be classified as a long tail task when it meets the following three criteria.

Finished Tasks Percentage

When a configurable percentage(default 75%) of executions in an ExecutionJobVertex has been finished, the speculative execution thread begins to really work.

Long Running Time

In speculative execution thread, all executions' interval between the current time and its first creating/deploying time before its failover in one ExecutionJobVertex are calculated. When the running time of an execution is greater than a configurable multiple(default 1.5) of the median of the running time of the other finished executions, this execution is defined as a long tail execution.

Image Removed

Slow Processing Throughput

In our product cluster of Alibaba in China, the algorithm mentioned above could solve the long tail problem effectively. Currently, slow throughput is not included in this version. I will update a new version if Flink community users need this requirement.

Scheduling of Speculative Executions

Refactoring of ExecutionVertex

By default, the ExecutionVertex only has one running execution. However, as we introduce speculative execution, the ExecutionVertex could have more than one execution simultaneously. Some member-variables in ExecutionVertex need a bigger refactoring.

There are two ways of code refactoring(I suggest the second option.):

...

start mirror tasks on other nodes when a slow task is detected. The mirror task processes the same input data and produces the same data as the original task. The original task and mirror tasks are treated equally. Once any of them finishes, its output will be admitted and the remaining homogeneous tasks will be canceled.

Public Interfaces

Configuration

Speculative executions will work along with adaptive batch scheduling (FLIP-187). New configuration options will be added for the scheduler:

  • jobmanager.adaptive-batch-scheduler.speculative.enabled, default to "false".  It controls whether to enable speculative execution. Note that speculative execution only works with AdaptiveBatchScheduler, which requires "jobmanager.scheduler" to be set to "AdaptiveBatch".
  • jobmanager.adaptive-batch-scheduler.speculative.max-concurrent-executions, default to "2". It controls how many executions (including the original one and speculative ones) of an ExecutionVertex can execute at the same time.
  • jobmanager.adaptive-batch-scheduler.speculative.block-slow-node-duration, default to "1 min". It controls how long an identified slow node should be blocked for.


New configuration options will be added in SlowTaskDetectorOptions for slow task detection:

  • slow-task-detector.check-interval, default to "1 s". It defines the interval to check slow tasks.
  • slow-task-detector.execution-time.baseline-lower-bound, default to "1 min". It defines the lower bound of the slow task detection baseline.
  • slow-task-detector.execution-time.baseline-ratio, default to "0.75". It defines the finished execution ratio threshold to compute the slow task detection baseline.
  • slow-task-detector.execution-time.baseline-multiplier, default to "1.5". It defines the multiplier to compute the slow tasks detection baseline.

Metrics

We propose to add following metrics to expose job problems and show the effectiveness of speculative execution:

  1. numSlowExecutionVertices: Number of slow execution vertices at the moment.
  2. numEffectiveSpeculativeExecutions: Number of speculative executions which finish before their corresponding original executions finish.

Proposed Changes

When speculative execution is enabled, a SpeculativeScheduler(which extends AdaptiveBatchScheduler) will be used for task scheduling. SpeculativeScheduler will listen on slow tasks detected by SlowTaskDetector. It will create and deploy speculative executions for the slow tasks. Nodes that slow tasks located on will be treated as slow nodes and get blocked, so that speculative executions will not be deployed on them. Once any execution finishes, the remaining homogeneous tasks will be canceled, so that only one execution will be admitted as finished and only its output will be visible to downstream consumer tasks or in external sink services.

Image Added

SlowTaskDetector

A SlowTaskDetector will periodically check all the current tasks/executions and notify the SlowTaskDetectorListener about the detected slow tasks. SpeculativeScheduler will register itself as the SlowTaskDetectorListener.

...

Code Block
languagejava
titleRefactor member field of ExecutionVertex
private List<Execution> executionList = new ArrayList<>();

Introduction of SpeculativeScheduler thread

There is a SpeculativeScheduler thread detecting long tail executions periodically in each ExecutionJobVertex according to the criteria mentioned above. Its member-variables include SchedulerNG, ExecutionVertex[], and so on. SchedulerNG is used for scheduling the speculative executions and the ExecutionVertex[] is used for getting execution state timestamp in this ExecutionJobVertex.

Image Removed

Modification of scheduler logicality

The scheduler should schedule an execution according to its index in the executionList instead of that in the currentExecution by default. So we need to change ExecutionVertexID to ExecutionVertexIDWithExecutionIndex that represents which execution in ExecutionVertex should be scheduled in scheduler logicality. Besides, when task failover, executionIndex should also be calculated by fail task's ExecutionAttemptID, so that the scheduler knows which execution in the executionList should be restarted. Moreover, ExecutionVertexVersion and ExecutionVertexVersioner will be refactored to track all executions' version.

Code Block
languagejava
titleExecutionVertexIDWithExecutionIndex.java
public class ExecutionVertexIDWithExecutionIndex {
    private ExecutionVertexID executionVertexID;
    private Integer executionIndex;
}

Image Removed

In order to reuse the code of scheduler, we need to extend the interface with an additional method. Then SchedulerBase should implements it.

Code Block
languagejava
titleSchedulerNG interface extension
public interface SchedulerNG extends AutoCloseableAsync {
	default void schedulingSpeculativeExecutions(List<ExecutionVertexIDWithExecutionIndex> verticesToSchedule) {
        throw new UnsupportedOperationException();
    }
}

Processing failover situation

Just like normal tasks, the speculative task is executed in parallel with the original one and shares the same failover and restart strategy. The original long tail tasks and speculative tasks can still retry with failure on their own tracks. But I think it should not be restarted globally when the counts of speculation execution failover reach the max-retry-counts. When a task fails, we could calculate its index(executionIndex) in the executionList by ExecutionAttemptID. Then the scheduler takes a series of processing for the corresponding execution according to the executionIndex as shown below.

Image Removed

Some classes will add a member-variable named executionIndex, for example, FailureHandlingResult、ExecutionVertexDeploymentOption, and so on.

Black list

Most long tail tasks are caused by machine problems, so the speculative execution must run on a different machine from original execution.

I will introduce blacklist module into Flink used for filtering nodes when the speculative executions are request slots.

Basic plan

Blacklist is a kind of scheduling constraint. According to the description of FLINK-11000, this is a bigger feature.There are several levels of blacklist, including (JobVertex, TaskManager) blacklist, (JobVertex Host) blacklist, (Job, TaskManager) blacklist, (Job, Host) blacklist, (Session, TaskManager) blacklist and (Session Host) blacklist. I must implement (Job, Host) blacklist for speculative execution feature. In order to implement FLINK-11000 friendly in the future, my interface also suit other blacklist described above.

The blacklist module is a thread that maintains the black machines of this job and removes expired elements periodically. Each element in the blacklist contains IP and timestamp. The timestamp is used to decide whether the elements of the blacklist is expired or not. Since request resource logical exist in JobMaster and ResourceManager side, both JobMaster and ResourceManager need to get the blacklist information for filtering black node. Only the (Job, Host) level blacklist is implemented in this feature, so I only consider maintaining the blacklist on the JobMaster side.

On the JobMaster side, the blacklist is encapsulated as an independent system component, which updates the blacklist by receiving the information of speculative execution happened, and filters nodes through the blacklist when requests resource.

On the ResourceManager side, it does not maintain the blacklist temporarily. When JobMaster requests resources from ResourceManager, it passes the blacklist information to ResourceManager. ResourceManager needs to consider blacklist when requests or filters resources and blacklist information will be encapsulated in the form of filtering resources required by external resource management system(such as yarn) when request new container.

Classes and Interfaces of (Job, Host) blacklist

(1)Abstract Class BlackList, each type of blacklist will extends it.

Code Block
languagejava
titleAbstract Class BlackList
public abstract class BlackList implements Serializable {
	/** Black list configuration. */
	protected final BlackListConfig blackListConfig;

	public BlackList(BlackListConfig blackListConfig) {
        this.blackListConfig = blackListConfig;
    }

    /**
     * Remove time out black list records.
     * @param timeout Time out time.
     * @return Minimum timestamp of black list records.
     */
    public abstract long removeTimeoutBlackList(long timeout);

    /** Clear black list. */
    public abstract void clear();

	/** Is black list empty. */
    public abstract boolean isEmpty();
}

...

SlowTaskDetector
public interface SlowTaskDetector {

    void start(
               ExecutionGraph executionGraph,
               SlowTaskDetectorListener listener, 
               ComponentMainThreadExecutor mainThreadExecutor);

    void stop();
}

public interface SlowTaskDetectorListener {
    void notifySlowTasks(Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks);
}

In the first version, an ExecutionTimeBasedSlowTaskDetector will be used. In the future, we can make SlowTaskDetector customizable to support different slow task detecting strategies, e.g. based on task throughput.

ExecutionTimeBasedSlowTaskDetector

For ExecutionTimeBasedSlowTaskDetector, if a task's execution time is much longer than that of most tasks of the same JobVertex, the task will be identified as slow. More specifically, it will compute an execution time baseline for each JobVertex. Tasks which execute longer than or equals to the baseline will be identified as slow tasks.

Here's the process to compute the baseline of a JobVertex:

  • define N = parallelism of JobVertex
  • define R = finished task ratio, set via config
  • define M = baseline multiplier, set via config
  • define B = lower-bound of baseline, set via config
  • the baseline will be computed only if N*R ExecutionVertex of the JobVertex have finished
    • T = median of the execution time of the earliest N*R finished tasks
    • baseline = max(T * M, B)

Related configurations are:

  • finished task ratio: slow-task-detector.execution-time.baseline-ratio
  • baseline multiplier: slow-task-detector.execution-time.baseline-multiplier
  • lower-bound of baseline: slow-task-detector.execution-time.baseline-lower-bound

Note that the baseline will be used only after N*R ExecutionVertex of the JobVertex have finished. No task of the JobVertex will be identified as slow before that.

Definition of execution time

The execution time of a task/execution includes the deploying time, initializing time and running time. Because processes of task deploying, initializing and running all will be affected by a slow node. This means:

  • for a FINISHED task, execution time = timestamp[FINISHED] - timestamp[DEPLOYING]
  • for a DEPLOYING/INITIALIZING/RUNNING task, execution time = current timestamp - timestamp[DEPLOYING]
  • for other tasks, execution time = 0

SpeculativeExecutionVertex

SpeculativeExecutionVertex will be used if speculative execution is enabled, as a replacement of ExecutionVertex to form an ExecutionGraph. The core difference is that a SpeculativeExecutionVertex can have multiple current executions running at the same time.

SpeculativeExecutionVertex extends ExecutionVertex. It needs to implement new methods as below:

Code Block
languagejava
titleSpeculativeExecutionVertex
public class SpeculativeExecutionVertex extends ExecutionVertex {

    public Execution getCurrentExecution(ExecutionAttemptID id);

    public Collection<Execution> getCurrentExecutions();

    /**
    /* Create a new speculative execution and add it to current executions.
     */
    public Execution createNewSpeculativeExecution(long timestamp);

    /**
    /* Returns whether the vertex contains source operators.
     */
    public boolean containsSources();

    /**
    /* Returns whether the vertex contains sink operators.
     */
    public boolean containsSinks();

    ...
}

Besides that, it also needs to override some methods of ExecutionVertex:

  • Operations like cancel/fail/markFailed/suspend must be applied to all the current executions. 
  • The current execution (returned by getCurrentExecutionAttempt()) should point to the execution which best represents the status of the execution vertex. We defined a state priority to achieve this goal. 

State Priority

Even though SpeculativeExecutionVertex may have multiple current executions at the same time, ExecutionVertex#getCurrentExecutionAttempt() which returns one only current execution is still needed to

  • show task execution status on the web UI. This is needed for compatibility purpose before the web UI is fully reworked to support display multiple concurrent executions.
  • help with the deployment of downstream tasks. In this case, the current execution must be FINISHED.

For the purposes above, the current execution should point to the FINISHED execution once any current execution has finished. If no execution has finished, the current execution should point to the execution with the fastest progress. 

Image Added

To achieve this goal, we define a state priority. A state has higher priority if it is more likely to transition to the FINISHED state. And the current execution should always point to the execution of the highest priority. If multiple current executions have the same highest priority, choose the earliest created one.

States from the highest priority to the lowest priority are:

  • FINISHED
  • RUNNING
  • INITIALIZING
  • DEPLOYING
  • SCHEDULED
  • CREATED
  • CANCELING
  • FAILED
  • CANCELED

SpeculativeScheduler

A SpeculativeScheduler will be used if speculative execution is enabled. It extends AdaptiveBatchScheduler so that speculative execution can work along with the feature to adaptively tuning parallelisms for batch jobs.

The major differences of SpeculativeScheduler are:

  • SpeculativeScheduler needs to be able to directly deploy an Execution, while AdaptiveBatchScheduler can only perform ExecutionVertex level deployment.
  • SpeculativeScheduler does not restart the ExecutionVertex if an execution fails when any other current execution is still making progress
  • SpeculativeScheduler listens on slow tasks. Once there are slow tasks, it will block the slow nodes and deploy speculative executions of the slow tasks on other nodes.
  • Once any execution finishes, SpeculativeScheduler will cancel all the remaining executions of the same execution vertex.

Scheduler directly deploys executions

Currently, the DefaultScheduler(base of AdaptiveBatchScheduler) can only perform ExecutionVertex level deployment. However, in this case, the scheduler is actually deploying the current execution of the ExecutionVertex.

Therefore, conceptually it is naturally to rework the DefaultScheduler to directly deploy an execution.

Code Block
languagejava
titleDefaultScheduler
public class DefaultScheduler {

    protected void allocateSlotsAndDeploy(List<ExecutionDeploymentOption

> executionsToDeploy);

    ...
}

public class ExecutionDeploymentOption {
   ExecutionAttemptID executionId;
   DeploymentOption deploymentOption;
}

The ExecutionSlotAllocator also needs to be reworked to directly allocate slots for Executions.

Code Block
languagejava
titleExecutionSlotAllocator
public interface ExecutionSlotAllocator {

    CompletableFuture<LogicalSlot> allocateSlotFor(ExecutionAttemptID executionAttemptId);

    /**
     * Cancel the ongoing slot request or release the allocated slot of an execution.
     */
    void cancel(ExecutionAttemptID executionAttemptId);

    ...
}

Handling execution failures

For SpeculativeScheduler, when a task fails,

  1. if all the current executions of the execution vertex are no longer possible to finish, restart the execution vertex. An execution is possible to finish if and only if it is in CREATED/SCHEDULED/DEPLOYING/INITIALIZING/RUNNING/FINISHED state.
  2. else if the failure is a PartitionException, it means that the consumed data is corrupted or lost. It will restart the producer execution vertex of the problematic partition.
  3. otherwise, the execution vertex is still possible to finish. Do not trigger a restart in this case.

The failed execution will be removed from current executions to make room for possible speculative executions in the future.

Handling slow tasks

Once notified about slow tasks, the SpeculativeScheduler will handle them as below:

  1. Block nodes that the slow tasks locate on. To achieve this, the scheduler will add the slow nodes to the blocklist. The block action will be MARK_BLOCKED so that future tasks will not be deployed to the slow node, while deployed tasks can keep running. (See FLIP-224 Blocklist Mechanism for more details)
  2. Create speculative executions for slow tasks until the current executions of each execution vertex reach the concurrency limit (defined via config jobmanager.adaptive-batch-scheduler.speculative.max-concurrent-executions)
  3. Deploy the newly created speculative executions

Limitations

Batch jobs only

Speculative execution only works with batch jobs. It is enabled if jobmanager.scheduler is AdaptiveBatch and jobmanager.adaptive-batch-scheduler.speculative.enabled is true in Flink configuration. Exceptions will be thrown if a streaming job is submitted in this case.

No support for DataSet jobs

DataSet API will be deprecated in the future and therefore is not supported. DataStream API is now the recommended low level API to develop Flink batch jobs.

No support for PIPELINED data exchanges

Currently, AdaptiveBatchScheduler does not support jobs with PIPELINED data exchanges. As a result, speculative execution does not support PIPELINED data exchanges either. Requiring all data exchanges to be BLOCKING also simplifies things, because each ExecutionVertex is an individual pipelined region in this case and can have individual speculations. Otherwise multiple ExecutionVertex from one pipelined region need to do speculative execution together.

This also means that 

Speculative execution of sources and sinks are disabled by default

Speculative execution of sources and sinks are disabled by default, this is because not all sources and sinks can work with speculative execution.

There will be several follow up proposals to enable speculative execution of sources and sinks:

  • FLIP-XXX(coming soon) is a proposal to enable speculative execution to work with most sources, without changing the source connectors
  • FLIP-XXX(coming soon) is a proposal to improve Flink interfaces to support speculative execution of sinks. Sinks must implement these interfaces to work with speculative execution
  • FLIP-XXX(coming soon) is a proposal to improve FileSink to work with speculative execution
  • FLIP-XXX(coming soon) is a proposal to improve HiveTableSink to work with speculative execution

With these proposed improvements, the scheduler will be able to see whether the given source or sink support speculative execution. It will enable speculative execution for a JobVertex only if all the contained sources and sinks support speculative execution.

No integration with Flink's web UI

The web UI does not show all the concurrent executions of each ExecutionVertex/subtask. It only shows the one with the fastest progress.

User defined functions must not be affected by its speculative instances

When a user defined function and its speculative instances run concurrently, they must not affect each other. For example,

  • access to the same exclusive resources
  • overriding the output to external services which happens as a side effect, i.e. not via Flink sinks
  • competition for data ingestion. Note that it includes cases that
    • user defined source function competition
    • data ingestion happens as a side effect, i.e. not via Flink sources.
  • ...

Once the concurrent instances can affect each other, it may result in task failures, or even worse, data inconsistency. So that speculative executions should not be enabled in this case.

Compatibility, Deprecation, and Migration Plan

Speculative execution is enabled only if the newly introduced configuration option jobmanager.adaptive-batch-scheduler.speculative.enabled is true (false by default). This entails that Flink's default behavior won't change.

Test Plan

The proposed changes must be covered with UT, IT and E2E tests. It should also be tested via TPC-DS benchmarks in a real cluster

Code Block
languagejava
titleThe job-level black list
public class JobBlackList extends BlackList {

	/** The list of the black ip. This list is mainly for time out checking. */
    private final Queue<BlackListRecord> jobToIpBlackListRecords;

	/** The set of the black ip. This set is mainly for black host filter. */
    private final Set<String> jobBlackIpSet;

	public JobBlackList(BlackListConfig blackListConfig) {
        super(blackListConfig);
        this.jobToIpBlackListRecords = new ConcurrentLinkedQueue<>();
        this.jobBlackIpSet = Collections.newSetFromMap(new ConcurrentHashMap<>());
    }
	
	/** Add a ip to this black list. */
	public boolean addIpToBlackList(String ip) {}

	public Set<String> getAllBlackListIpSet() {}

	/** clear (job, ip) blacklist. */
	@Override
    public void clear() {}

	@Override
    public long removeTimeoutBlackList(long timeout) {}

	@Override
    public boolean isEmpty() {}

	/** Whether the given host has been added to black list. */
	public boolean containsIp(String ip) {}
}

(3)BlackListRecord, the item of blackList

Code Block
languagejava
titleThe item of black list
public class BlackListRecord implements Serializable {
	/**
 	 * Black list record which stores the black host ip and
 	 * the time stamp when this record is added.
 	 */
	public class BlackListRecord implements Serializable {}

	/** The black host ip. */
    private final String ip;

    /** The time stamp when this black list record is added. */
    private final long timeStamp;
}

(4)An abstract Class called BlackListTracker, a thread that maintain blacklist info.

Code Block
languagejava
titlea thread that maintain blacklist info
public abstract class BlackListTracker implements Runnable {

	/** The executor to run the time out checking task. */
    private final ScheduledExecutor executor;
	
	/** The black list configuration. */
    protected final BlackListConfig blackListConfig;

	/** The black list timeout check future, will be canceled when black black list destroyed. */
    private AtomicReference<ScheduledFuture> timeoutCheckFuture;

	public BlackListTracker(ScheduledExecutor executor, BlackListConfig blackListConfig) {
        Preconditions.checkNotNull(blackListConfig);
        this.executor = executor;
        this.blackListConfig = blackListConfig;
        this.timeoutCheckFuture = new AtomicReference<>();
    }

	/**
     * Given the minimum time stamp of black list record. The function schedules a task to remove the black list
     * record when it got timeout.
     * @param minTimeStamp The minimum time stamp of black list record.
     */
    public void scheduleTimeOutCheck(long minTimeStamp) {}

	public boolean isBlackListEnabled() {
        return blackListConfig.isBlackListEnabled();
    }

	/** Clear the black list. */
    public abstract void clearBlackList();

    /** Get all black list ip. */
    public abstract Set<String> getAllBlackListIp();

	/** Clear the black list and cancel the timeout check task. */
    public void destroy() {}
}

(6)JobBlackListSpeculativeListener, event listener.

Code Block
languagejava
titleListener to be notified when speculative execution happened.
/** Listener to be notified when speculative execution happened. */
public interface JobBlackListSpeculativeListener {
    /**
     * When a speculative execution happened, the listener will be notified.
     * @param ip the ip
     */
    void onSpeculativeExecutionHappened(String ip);
}

(7)JobBlackListTracker, per-job blackList tracker.

Code Block
languagejava
titleper-job blackList tracker
public class JobBlackListTracker extends BlackListTracker implements JobBlackListSpeculativeListener {

	/** The black list of this job. */
    private final JobBlackList jobBlackList;

	public JobBlackListTracker(ScheduledExecutor executor, BlackListConfig blackListConfig) {
        super(executor, blackListConfig);
        this.jobBlackList = new JobBlackList(blackListConfig);
        scheduleTimeOutCheck(blackListConfig.getBlackListTimeOutInMillis());
    }

	@Override
    public void clearBlackList() {
        jobBlackList.clear();
    }

	@Override
    public Set<String> getAllBlackListIp() {
        if (blackListConfig.isBlackListEnabled()) {
            return jobBlackList.getAllBlackListIpSet();
        }
        return Collections.emptySet();
    }

	/** The time out checking task to be scheduled. /
    @Override
    public void run() {
        long minTimeStamp = jobBlackList.removeTimeoutBlackList(blackListConfig.getBlackListTimeOutInMillis());
        scheduleTimeOutCheck(minTimeStamp);
    }

    @Override
    public void onSpeculativeExecutionHappened(String ip) {
        if (blackListConfig.isBlackListEnabled()) {
            jobBlackList.addIpToBlackList(ip);
        }
    }
}

Init black list 

In DefaultExecutionGraph, I will add a member-variable jobBlackListSpeculativeListeners. After creating ExecutionGraph, jobBlackListTracker will be add in jobBlackListSpeculativeListeners. JobBlackListSpeculativeListener.onSpeculativeExecutionHappened() will be called when the SpeculativeExecution detectes a long tail task and starts to notify scheduler to scheduling a speculative execution.

Image Removed

Code Block
languagejava
titleExecutionGraph
public interface ExecutionGraph extends AccessExecutionGraph {
	void registerJobBlackListSpeculativeListener(JobBlackListTracker jobBlackListTracker);
}
Code Block
languagejava
titleDefaultExecutionGraph
public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionGraphAccessor {

	private final List<JobBlackListSpeculativeListener> jobBlackListSpeculativeListeners;
	
	public DefaultExecutionGraph() {
		this.jobBlackListSpeculativeListeners = new ArrayList<>();
	}
	
	@Override
    public void registerJobBlackListSpeculativeListener(JobBlackListTracker listener) {
        if (listener != null) {
            jobBlackListSpeculativeListeners.add(listener);
        }
    }

	@Override
    public List<JobBlackListSpeculativeListener> getJobBlackListSpeculativeListeners() {
        return jobBlackListSpeculativeListeners;
    }
}

Add element to black list

First, JobBlackListSpeculativeListener.onSpeculativeExecutionHappened() will be called when the SpeculativeExecution detects a long tail task and start to notify scheduler to schedule a speculative execution.

Second, ExecutionGraph will notify listener(JobBlackListTracker) it. 

Third, IP of original execution location will be added to JobBlackList in JobBlackListTracker. 

Image Removed

Remove element in black list

The BlackListTracker has implemented Runnable which maintains the black machines of this job and removes expired elements periodically. Each element in the blacklist contains IP and timestamp. The timestamp is used to decide whether this element in the blacklist is expired or not. 


Pass the blacklist information to cluster ResourceManager

Yarn

Node's attributes should include machine IP attribute, which enables us to control containers not to allocate on some machines by Yarn PlacementConstraintsNow Flink uses Hadoop-2.x and requests container by ResourceRequest api. It don't support PlacementConstraintsSo, in order to use PlacementConstraints, I introduce Hadoop-3.x SchedulingRequest api by java reflect mechanism.

When the executions are scheduled, I add blacklist information to Yarn PlacementConstraint. In this way, I ensure that the Yarn container is not on the machines in the blacklist.

Kubernetes

We could achieve the same goal with Yarn PlacementConstraint in K8s integration by node-affinity if nodes have attached IP label.

Like Yarn, when the executions are scheduled, blacklist information will be add to k8s PodSpec.

Mesos

According to FLINK-22352the community has decided to deprecate Mesos support in Apache Flink.

So I don’t need to think about how to pass the blacklist information to Mesos.

Manage input and output of ExecutionVertex

Manage InputSplit

After I read FLINK-10205pr-6684 and code(master branch), I found that Flink now can't ensure the different attempt of a same ExecutionVertex to have the same InputSplits. Because when a task failover, now simply returning the InputSplits to the assigner and letting the next idling task take it should work. This is no problem because it should not matter which tasks processes which InputSplits. If a failure occurs and some other task takes over the failed InputSplits, it would as if this task had processed these InputSplits from the very beginning.

But because of introducing speculative execution, we must ensure that the InputSplits processed by speculative execution is the same as the original execution. So a Map will be added in ExecutionVertex that key indicates ExecutionAttemptId and value indicates the index of InputSplit that this execution currently consumes.

Code Block
languagejava
titleDefaultExecutionGraph
public class ExecutionVertex
        implements AccessExecutionVertex, Archiveable<ArchivedExecutionVertex> {
	private final ArrayList<InputSplit> inputSplits;
    private final Map<ExecutionAttemptID, Integer> inputSplitIndexMap;
}

When execution failover, in resetForNewExecutionInternal(), we clear up the information related to failover execution in inputSplitIndexMap instead of call returnInputSplit() now.

For example, as shown below.

Image Removed

(a) In an ExecutionVertex, after execution_1 has consumed inputSplit_0, it goes on to consume inputSplit_1.

      Now inputSplitIndexMap data is { (execution_1, 1) }.

(b) Specutative execution_2 has inited, it will consume inputSplit_0 in inputSplits first.

      Now inputSplitIndexMap data is { (execution_1, 1), (execution_2, 0) }.

(c) A failure occurs with execution_1, then resetForNewExecutionInternal() in ExecutionVertex will be called.

      Now inputSplitIndexMap data is { (execution_2, 0) }.

(d) execution_2 consumed inputSplit_0 finished, it goes on to consume inputSplit_1. And after execution_1 failover, execution_1_new occurs.

      Now inputSplitIndexMap data is { (execution_1_new, 0), (execution_2, 1) }.

Manage middle ResultPartition 

As shown below, for batch job with blocking shuffle(similar to MapReduce). Because of introducing speculative execution, all reduce executions in an ExecutionVertex will consume the resultPartition of map ExecutionVertex's fastest finished execution.

Image Removed

  • Once all map ExecutionVertexs finish, all executions in reduce ExecutionVertex should be notified to update its inputChannels from UNKNOW to LOCAL/REMOTE. So there are some modifications in Execution.updatePartitionConsumers().
  • If a reduce task can't read data from a blocking resultPartition(is not available), the producer ExecutionVertex and all consumer ExecutionVertexs will be restart. For map producer ExecutionVertex, I think that SpeculativeScheduler thread should still works. For all consumer ExecutionVertexs I think we should kill all executions and only restart the original execution in the region.
  • Executions of a map ExecutionVertex will produce multiple resultPartitions. When all map ExecutionVertexs finish, the inputChannel of reduce executions will be updated to consume the fastest finished execution of the map ExecutionVertex. To this end, I add a member-variable named fastestFinishedExecution in ExecutionVertex, which is used for creating PartitionInfo that is used for updating reduce executions' inputChannels from UNKNOW to LOCAL/REMOTE.
  • In order to avoid causing problems when multiple executions in one ExecutionVertex finish at the same time, a member-variable named vertexFinished is added in ExecutionVertex, which indicates whether this ExecutionVertex has finished and double check locking pattern in ExecutionVertex.executionFinished(). After double checking locking pattern in ExecutionVertex.executionFinished(), finishPartitionsAndUpdateConsumers() will be called instead of being called in Execution.markFinished(). Then non-fastest finish or running executions in this ExecutionVertex will be canceled.
Code Block
languagejava
titleDefaultExecutionGraph
public class ExecutionVertex
        implements AccessExecutionVertex, Archiveable<ArchivedExecutionVertex> {
	private Execution fastestFinishedExecution = null;
	private volatile boolean vertexFinished = false;
}

Manage sink files

When batch job writes record into file or Key-value databases, this feature could be enabled.

  • Sink to key-value databases, no more steps are needed to do with speculative execution feature.
  • Sink to file, a global unique ExecutionAttemptID suffix will be added after the file name. Then, some files will be deleted or renamed when finalizeOnMaster() is called.


As shown below, four hashSets will be created and some global unique ExecutionAttemptIDs will be added to them.

  • HashSet fastAttemptIdsWithSpeculative is responsible for storing all ExecutionAttemptIDs of the fastest finished executions in all speculated ExecutionVertex in this jobVertex.

  • HashSet slowAttemptIdsWithSpeculative is responsible for storing all ExecutionAttemptIDs of the non-fastest finished executions in all speculated ExecutionVertex in this jobVertex.

  • HashSet finishedAttemptIdsWithoutSpeculative is responsible for storing all ExecutionAttemptIDs of the success finished executions in all non-speculated ExecutionVertex in this jobVertex.

  • HashSet allAttemptIds is responsible for storing all ExecutionAttemptIDs in this jobVertex include failed execution.

Once all tasks finish, different processing methods will be applied to different files according to which HashSet its suffix is in.

Image Removed

Code Block
languagejava
titleDefaultExecutionGraph
public class JobVertex implements java.io.Serializable {
	// ExecutionAttemptIDs of the fastest finished executions in all speculated ExecutionVertex in this jobVertex.
	private Set<ExecutionAttemptID> fastAttemptIdsWithSpeculative = new HashSet<>();

	// ExecutionAttemptIDs of the non-fastest finished executions in all speculated ExecutionVertex in this jobVertex.
	private Set<ExecutionAttemptID> slowAttemptIdsWithSpeculative = new HashSet<>();

	// ExecutionAttemptIDs of the success finished executions in all non-speculated ExecutionVertex in this jobVertex.
	private Set<ExecutionAttemptID> finishedAttemptIdsWithoutSpeculative = new HashSet<>();

	// All ExecutionAttemptIDs in this jobVertex.
	private Set<ExecutionAttemptID> allAttemptIds = new HashSet<>();
}

Metrics

For each ExecutionJobVertex, I use six metrics to measure and evaluate the efficiency of speculative execution that can be summarized in the job status and web page.

(1)minFinishedForSpeculationThresholdMetrics is defined as the minimal number of the finished ExecutionVertexs before scheduling speculative executions.

(2)finishedExecutionCountMetrics is defined as the number of finished ExecutionVertexs.

(3)speculativeExecutionCountMetrics is defined as the number of speculative executions that are scheduled by scheduler.

(4)speculativeExecutionFastestFinishedCountMetrics is defined as the number of ExecutionVertex's speculative execution that reach FINISHED state faster than the original execution.

(5)speculativeThresholdOfTime is defined as the threshold time of speculative execution.

(6)executionIndex2RunningTimespan is defined as the running time of the original execution in each ExecutionVertex.

Image Removed

Web UI

If we don't modify the code of web UI, when the speculative execution runs faster than the original execution, the web UI will show that this task has been cancelled. But the result of the batch job is correct.

More discussion is needed to decide whether the web UI needs to be modified.

Image Removed

Configuration

All configurations related to this feature are added in JobManagerOptions.class.
Code Block
languagejava
titleJobManagerOptions
public static final ConfigOption<Boolean> SPECULATIVE_EXECUTION_ENABLED =
        key("flink.batch.speculative.enabled")
                .booleanType()
                .defaultValue(false)
                .withDescription("Whether to enable speculation of batch job.");

public static final ConfigOption<Long> SPECULATIVE_EXECUTION_INTERVAL_IN_MILLIS =
        key("flink.batch.speculative.interval.in.millis")
                .longType()
                .defaultValue(100L)
                .withDescription("How often to check for speculative executions.");

public static final ConfigOption<Double> SPECULATIVE_EXECUTION_MULTIPLIER =
        key("flink.batch.speculative.multiplier")
                .doubleType()
                .defaultValue(1.5)
                .withDescription("When the running time of a unfinished executionVertex is several times of" +
                        " the median of all completed ExecutionVertexs, it will be speculated.");

public static final ConfigOption<Double> SPECULATIVE_EXECUTION_QUANTILE =
        key("flink.batch.speculative.quantile")
                .doubleType()
                .defaultValue(0.75)
                .withDescription("When the percentage of ExecutionVertex in an ExecutionJobVertex are finished," +
                        " the speculative execution mechanism will be started. 0.9 means that 90% of the" +
                        " ExecutionVertex are finished, then the speculative execution mechanism will be started.");

public static final ConfigOption<Double> SPECULATIVE_EXECUTION_COUNT =
        key("flink.batch.speculative.execution.count")
                .longType()
                .defaultValue(1)
                .withDescription("The number of speculative executions that existed in an ExecutionVertex simultaneously.");

public static final ConfigOption<Long> SPECULATIVE_EXECUTION_LOG_INTERVAL_IN_MILLIS =
        key("flink.batch.speculative.log.interval.in.millis")
                .longType()
                .defaultValue(1000 * 60 * 5L)
                .withDescription("Interval in millis for speculative related log.");

public static final ConfigOption<Boolean> FLINK_BLACKLIST_ENABLE =
        key("flink.blacklist.enable")
                .booleanType()
                .defaultValue(false)
                .withDescription("Whether to enable blacklist job.");

public static final ConfigOption<Long> FLINK_BLACKLIST_TIMEOUT_IN_MILLIS =
        key("flink.blacklist.timeout.in.millis")
                .longType()
                .defaultValue(60 * 1000L)
                .withDescription("Indicates how long a black list record will be removed after added to the blacklist.");

Compatibility, Deprecation, and Migration Plan

This FLIP is a new feature. So there is no compatible issue with previous versions.

Test Plan

Covered by unit tests.

Rejected Alternatives

None so far.