Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In Flink batch job, the job is usually divided into multiple parallel tasks executed cross many nodes in the cluster. It is common to encounter the performance degradation on some nodes due to hardware problems, or accident I/O busy, or high CPU load. This kind of degradation can probably cause the running tasks on the node to be quite slow, that is so called long tail tasks. Although the long tail tasks will not fail, they can severely affect the total job running time. Flink task scheduling does not take this long tail problem into account currently.

Here we propose the speculative execution strategy FLINK-10644 to  to handle the problem. The basic idea is to run a copy of task on another node when the original task is identified to be long tail. The speculative task is executed in parallel with the original one and share the same failure retry mechanism. Once either task complete, the scheduler admits its output as the final result and cancel the other running one. I will introduce a blacklist module to schedule the long tail task on different machine from the original task. And modify FileOutputFormat.java to adapter speculative execution mechanism.

The preliminary experiments have demonstrated the effectiveness in effectiveness in our product cluster.

Proposed Changes

General design

Detection of Long Tail Tasks

A task will be classified as a long tail task when it meets the following three rules.

Finished Tasks Percentage

In one ExecutionJobVertex, when a when a configurable percentage(default 75%) of executions has been finished, the speculative execution thread begin really work.

Long Running Time

In speculative execution thread all all executions' interval between the current time and it first create/deploying time before it failover in one ExecutionJobVertex are calculated. when the running time of a execution is greater than a configurable multiple(default 1.5) of the median of the running time of other finished executions, this execution is defined as long tail execution.


Slow Processing Throughput

In our product cluster of Alibaba in China, above algorithm could solve the long tail task problem sufficiently. So first version I will not take throughput into consider. After release this feature if Flink community users have this requirement I will develop next version.

Scheduling of Speculative Executions

Refactoring of ExecutionVertexof ExecutionVertex

Because of introduction speculative execution, the ExecutionVertex canthe ExecutionVertex can't default to only one execution is running at the same time. Some member-variables in ExecutionVertex need bigger refactor.

There are two ways of code refactoring:

  1. Add a member-variable named speculativeExecution that similar to currentExecution. But this way will lead to lots of if judgments in the scheduler and failover code. Also this way will way will reduce the flexible of code if there are more than two executions existed at the same time. 
  2. Change Change the currentExecution in ExecutionVertex to an ArrayList named executionList which means there can be multiple executions in an ExecutionVertex at the same time. For each execution in executionList there is no difference of the behavior such as failover, request slot etc between them.
Code Block
languagejava
titleRefactor member field of ExecutionVertex
private List<Execution> executionList = new ArrayList<>();

Introduction SpeculativeScheduler thread

In each ExecutionJobVertex there is a SpeculativeScheduler thread used SpeculativeScheduler thread used for detect the long tail execution in this ExecutionJobVertex periodically based on above rules. Its member-variables included SchedulerNG and ExecutionVertex[] etc. SchedulerNG used for scheduling the speculative executions and the ExecutionVertex[] used for get execution state timestamp in this ExecutionJobVertex.

Modify of scheduler logical

The scheduler should scheduling an execution according to the index of this execution in executionList instead of default to scheduling the currentExecution. So we need change ExecutionVertexID to ExecutionVertexIDAndExecutionIndex which represent which execution in ExecutionVertex should be scheduling in scheduler logical. Also when task failover, executionIndex also should be calculated by fail task's ExecutionAttemptID so that scheduler knows which execution in executionList should be restart.


Code Block
languagejava
titleExecutionVertexIDWithExecutionIndex.java
public class ExecutionVertexIDWithExecutionIndex {
    private ExecutionVertexID executionVertexID;
    private Integer executionIndex;
}

In order to reuse the code of scheduler, we need to extend the interface with an additional method then SchedulerBase should implements it.

Code Block
languagejava
titleSchedulerNG interface extension
public interface SchedulerNG extends AutoCloseableAsync {
	default void schedulingSpeculativeExecutions(List<ExecutionVertexIDWithExecutionIndex> verticesToSchedule) {
        throw new UnsupportedOperationException();
    }
}

Process failover situation

Just like the normal tasks, the the speculative task is executed in parallel with the original one and share the same failover and restart strategy.

The original long tail task and speculative tasks can still retry with failure on their own track.

But I think it should not be restarted globally when the speculation execution failover count reach the max-retry-counts. 

When a task fail, we could calculate its index(executionIndex) in executionList by executionAttemptID. Then the scheduler takes a series of processing for the corresponding execution according to the executionIndex as shown below.

In order to better failover logic, I will extend the calss FailureHandlingResult with calss FailureHandlingResult with an additional member-variable.

Code Block
languagejava
titleFailureHandlingResult class extension
public class FailureHandlingResult {
	@Nullable private final Integer executionIndex;
}

Black list

Most long tail tasks are caused by machine problems, so the speculative execution must runs on a different machine from origin execution.

I will introduce blacklist module into Flink used for filter node when the when the speculative executions are request slots.

Basic plan

Blacklist is a kind of scheduling constraint. According to FLINK-11000 description this  description this is a bigger feature.

There are several levels of blacklist, including (JobVertex, TaskManager) blacklist, (JobVertex Host) blacklist, (Job, TaskManager) blacklist, (Job, Host) blacklist, (Session, TaskManager) blacklist and (Session Host) blacklist.

I must implement (Job, Host) blacklist for speculative execution feature. In order to implement FLINK-11000 friendly  friendly in the future, my interface also suit other blacklist descripted above.

The blacklist module is a thread that maintains the black machines of this job and removes expired elements periodically. Each  Each element in blacklist contains IP and timestamp. The timestamp is used to decide whether the elements of the blacklist is expired or not. Since request resource logical both existed in JobMaster and ResourceManager side, so both JobMaster and ResourceManager need to get the blacklist information for filtering black node. But beacuse I only implement the the (Job, Host) level  level blacklist in this feature, so I only consider maintaining the blacklist on the JobMaster side.

On the JobMaster side, the blacklist is encapsulated as an independent system component, which updates the blacklist by receiving the information of speculative execution happened, and filters resources through the blacklist when request resource.

On the ResourceManager the ResourceManager side, it does not maintain the blacklist temporarily. When JobMaster When JobMaster request resources from ResourceManager, it pass the blacklist information to ResourceManager. ResourceManager needs  ResourceManager needs to consider blacklist when request or filters resources and blacklist information will be encapsulated in the form of filtering resources required by external resource management system(such as yarn) when request new container.

Classes and Interfaces of (Job, Host) blacklist

(1)Abstract Class BlackList, each type of blacklist will extends it.

Code Block
languagejava
titleAbstract Class BlackList
public abstract class BlackList implements Serializable {
	/** Black list configuration. */
	protected final BlackListConfig blackListConfig;

	public BlackList(BlackListConfig blackListConfig) {
        this.blackListConfig = blackListConfig;
    }

    /**
     * Remove time out black list records.
     * @param timeout Time out time.
     * @return Minimum timestamp of black list records.
     */
    public abstract long removeTimeoutBlackList(long timeout);

    /** Clear black list. */
    public abstract void clear();

	/** Is black list empty. */
    public abstract boolean isEmpty();
}


(2)JobBlackList, the job-level black list.

Code Block
languagejava
titleThe job-level black list
public class JobBlackList extends BlackList {

	/** The list of the black ip. This list is mainly for time out checking. */
    private final Queue<BlackListRecord> jobToIpBlackListRecords;

	/** The set of the black ip. This set is mainly for black host filter. */
    private final Set<String> jobBlackIpSet;

	public JobBlackList(BlackListConfig blackListConfig) {
        super(blackListConfig);
        this.jobToIpBlackListRecords = new ConcurrentLinkedQueue<>();
        this.jobBlackIpSet = Collections.newSetFromMap(new ConcurrentHashMap<>());
    }
	
	/** Add a ip to this black list. */
	public boolean addIpToBlackList(String ip) {}

	public Set<String> getAllBlackListIpSet() {}

	/** clear (job, ip) blacklist. */
	@Override
    public void clear() {}

	@Override
    public long removeTimeoutBlackList(long timeout) {}

	@Override
    public boolean isEmpty() {}

	/** Whether the given host has been added to black list. */
	public boolean containsIp(String ip) {}
}


(3)BlackListRecord, (3)BlackListRecord, the item of blackList

Code Block
languagejava
titleThe item of black list
public class BlackListRecord implements Serializable {
	/**
 	 * Black list record which stores the black host ip and
 	 * the time stamp when this record is added.
 	 */
	public class BlackListRecord implements Serializable {}

	/** The black host ip. */
    private final String ip;

    /** The time stamp when this black list record is added. */
    private final long timeStamp;
}


(4)A abstract Class called BlackListTrackercalled BlackListTracker, a thread that maintain blacklist info

Code Block
languagejava
titlea thread that maintain blacklist info
public abstract class BlackListTracker implements Runnable {

	/** The executor to run the time out checking task. */
    private final ScheduledExecutor executor;
	
	/** The black list configuration. */
    protected final BlackListConfig blackListConfig;

	/** The black list timeout check future, will be canceled when black black list destroyed. */
    private AtomicReference<ScheduledFuture> timeoutCheckFuture;

	public BlackListTracker(ScheduledExecutor executor, BlackListConfig blackListConfig) {
        Preconditions.checkNotNull(blackListConfig);
        this.executor = executor;
        this.blackListConfig = blackListConfig;
        this.timeoutCheckFuture = new AtomicReference<>();
    }

	/**
     * Given the minimum time stamp of black list record. The function schedules a task to remove the black list
     * record when it got timeout.
     * @param minTimeStamp The minimum time stamp of black list record.
     */
    public void scheduleTimeOutCheck(long minTimeStamp) {}

	public boolean isBlackListEnabled() {
        return blackListConfig.isBlackListEnabled();
    }

	/** Clear the black list. */
    public abstract void clearBlackList();

    /** Get all black list ip. */
    public abstract Set<String> getAllBlackListIp();

	/** Clear the black list and cancel the timeout check task. */
    public void destroy() {}
}


(6)JobBlackListSpeculativeListener, event listener

Code Block
languagejava
titleListener to be notified when speculative execution happened.
/** Listener to be notified when speculative execution happened. */
public interface JobBlackListSpeculativeListener {
    /**
     * When a speculative execution happened, the listener will be notified.
     * @param ip the ip
     */
    void onSpeculativeExecutionHappened(String ip);
}


(7)JobBlackListTracker, perper-job blackList tracker

Code Block
languagejava
titleper-job blackList tracker
public class JobBlackListTracker extends BlackListTracker implements JobBlackListSpeculativeListener {

	/** The black list of this job. */
    private final JobBlackList jobBlackList;

	public JobBlackListTracker(ScheduledExecutor executor, BlackListConfig blackListConfig) {
        super(executor, blackListConfig);
        this.jobBlackList = new JobBlackList(blackListConfig);
        scheduleTimeOutCheck(blackListConfig.getBlackListTimeOutInMillis());
    }

	@Override
    public void clearBlackList() {
        jobBlackList.clear();
    }

	@Override
    public Set<String> getAllBlackListIp() {
        if (blackListConfig.isBlackListEnabled()) {
            return jobBlackList.getAllBlackListIpSet();
        }
        return Collections.emptySet();
    }

	/** The time out checking task to be scheduled. /
    @Override
    public void run() {
        long minTimeStamp = jobBlackList.removeTimeoutBlackList(blackListConfig.getBlackListTimeOutInMillis());
        scheduleTimeOutCheck(minTimeStamp);
    }

    @Override
    public void onSpeculativeExecutionHappened(String ip) {
        if (blackListConfig.isBlackListEnabled()) {
            jobBlackList.addIpToBlackList(ip);
        }
    }
}

Init black list 

In DefaultExecutionGraph In DefaultExecutionGraph I will add a membera member-variable jobBlackListSpeculativeListeners and when after create ExecutionGraph jobBlackListTracker ExecutionGraph jobBlackListTracker will be add in jobBlackListSpeculativeListeners. JobBlackListSpeculativeListener JobBlackListSpeculativeListener.onSpeculativeExecutionHappened() will be called when the SpeculativeExecution detected a long tail task and start notify scheduler to scheduling a speculative execution.

Code Block
languagejava
titleExecutionGraph
public interface ExecutionGraph extends AccessExecutionGraph {
	void registerJobBlackListSpeculativeListener(JobBlackListTracker jobBlackListTracker);
}

...

Code Block
languagejava
titleDefaultExecutionGraph
public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionGraphAccessor {

	private final List<JobBlackListSpeculativeListener> jobBlackListSpeculativeListeners;
	
	public DefaultExecutionGraph() {
		this.jobBlackListSpeculativeListeners = new ArrayList<>();
	}
	
	@Override
    public void registerJobBlackListSpeculativeListener(JobBlackListTracker listener) {
        if (listener != null) {
            jobBlackListSpeculativeListeners.add(listener);
        }
    }

	@Override
    public List<JobBlackListSpeculativeListener> getJobBlackListSpeculativeListeners() {
        return jobBlackListSpeculativeListeners;
    }
}

Add element to black list

First JobBlackListSpeculativeListenerFirst JobBlackListSpeculativeListener.onSpeculativeExecutionHappened() will be called when the SpeculativeExecution detected a long tail task and start notify scheduler to scheduling a speculative execution.

Second ExecutionGraph will notify listener(JobBlackListTracker) it. 

Third JobBlackList in JobBlackListTracker will be added ip of original execution location.

Remove element in black list

The BlackListTracker has implement Runnable that responsibility is maintains the black machines of this job and removes expired elements periodically. Each element in blacklist contains IP and timestamp. The timestamp is used to decide whether each element in the blacklist is expired or not. 


Pass the blacklist information to cluster ResourceManager

Yarn

First, nodes' attributes should include machine ip attribute, then we we can control containers do not on some mechines by yarn PlacementConstraintsNow Flink use hadoop-2.x and request container by by ResourceRequest api api, it don't support PlacementConstraintsSo In order to use use PlacementConstraints, I will introduce hadoop-3.x SchedulingRequest api  api by java reflect mechanismreflect mechanism.

When the executions are scheduled, I will add blacklist information to yarn PlacementConstraint. In this way, I can ensure that the yarn container is not on the machines in the blacklist.

Kubernetes

We could achieve the same goal with Yarn PlacementConstraint in K8s integration by node-affinity if  if nodes had attached ip label.

The same as yarn, when when the executions are scheduled, I will add blacklist information to k8s PodSpec.

Mesos

According to to FLINK-22352the community has decided to deprecate Mesos support in Apache Flink.

So we don’t need to think about how to pass the blacklist information to mesos.

Manage input and output of ExecutionVertex

Manage InputSplit

After I read read FLINK-10205pr-6684 and code(master branch), I found that Flink now can't ensure the ensure the different attempt of a same ExecutionVertex to have the same inputs. Because when a task failover, now simply now simply returning the input splits to the assigner and letting the next idling task take it should work. This is no problem because it should not matter which tasks processes which input split. If a failure occurs and some other task takes over the failed input splits, it would as if this task had processed these input splits from the very beginning.

But because of introduce speculative execution, we must ensure that the InputSplits processed by speculative execution is the same as the original execution. So I will add a Map in ExecutionVertex that key indicates ExecutionAttemptId and value indicates the index of InputSplit that this execution currently consumes.

Code Block
languagejava
titleDefaultExecutionGraph
public class ExecutionVertex
        implements AccessExecutionVertex, Archiveable<ArchivedExecutionVertex> {
	private final ArrayList<InputSplit> inputSplits;
    private final Map<ExecutionAttemptID, Integer> inputSplitIndexMap;
}

When execution failover, in resetForNewExecutionInternal(), we clear up the information related to failover execution in inputSplitIndexMap instead of call returnInputSplit() now.

For example, as shown below.

(a) In a ExecutionVertex, after execution_1 has consumed inputSplit_0, it go on to consume inputSplit_1.

      Now inputSplitIndexMap data is { (execution_1, 1) }.

(b) Specutative execution_2 has inited, it will consume inputSplit_0 in inputSplits first.

      Now inputSplitIndexMap data is { (execution_1, 1), (execution_2, 0) }.

(c) A failure occurs of execution_1, then resetForNewExecutionInternal() in ExecutionVertex will be called.

      Now inputSplitIndexMap data is { (execution_2, 0) }.

(d) execution_2 consumed inputSplit_0 finished, it go on to consume inputSplit_1. And after execution_1 failover, execution_1_new occurs.

      Now inputSplitIndexMap data is { (execution_1_new, 0), (execution_2, 1) }.

Manage middle ResultPartition 

As shown below, for for batch job with blocking shuffle. Because of introduce speculative execution all reduce executions in an ExecutionVertex will consume the resultPartition of map ExecutionVertex's fastest finished execution.

  • When all upstream ExecutionVertexs run finished, all executions in down stream ExecutionVertex should be notifyed to update  inputChannels from unknow to local/remote so there are some modifys in Execution.updatePartitionConsumers().
  • When a task read from a blocking result partition, if its input is not available, the produce ExecutionVertex and all consumer ExecutionVertexs will be revoked. For upstream  produce ExecutionVertex, I think that SpeculativeScheduler thread should still work. For all consumer ExecutionVertexs I think we should kill all speculative executions and only restart original execution.
  • Multiple executions of a upstream ExecutionVertex will produce Multiple ResultPartitions. When all upstream ExecutionVertexs run finished, the inputChannel of down stream executions will be updated to consume the fastest finished execution of upstream. So add a member-variable named fastestFinishedExecution in ExecutionVertex used for create PartitionInfo which used for update down stream executions' inputChannels from unknow to local/remote. 
  • In order to avoid cause problems when multiple executions in one ExecutionVertex finished at the same time. A member-variable named vertexFinished was added in ExecutionVertex used for indicates whether this ExecutionVertex run finished and double checked locking pattern in ExecutionVertex.executionFinished().In ExecutionVertex.executionFinished(), after double checked locking pattern finishPartitionsAndUpdateConsumers() should be called instead of Execution.markFinished(),then non-fastest executions in this ExecutionVertex should be canceled.
Code Block
languagejava
titleDefaultExecutionGraph
public class ExecutionVertex
        implements AccessExecutionVertex, Archiveable<ArchivedExecutionVertex> {
	private Execution fastestFinishedExecution = null;
	private volatile boolean vertexFinished = false;
}


Manage sink files

When batch job write record into file or Keyor Key-value databases, this feature could enabled.

  • Sink to keykey-value databases, nothing should do with speculative execution feature.
  • Sink to file, I will add a add a global unique ExecutionAttemptID suffix after the normal task names. Finally Finally, some file will be deleted or renamed when finalizeOnMaster() called.


As shown below, four hashSet will be created and added some global unique ExecutionAttemptIDs. When all task finished, different files will face different processing methods according to which set the suffix is in.

Code Block
languagejava
titleDefaultExecutionGraph
public class JobVertex implements java.io.Serializable {

	// ExecutionAttemptIDs of the fastest finished executions in all speculatived ExecutionVertex in this jobvertex.
	private Set<ExecutionAttemptID> fastAttemptIdsWithSpeculative = new HashSet<>();

	// ExecutionAttemptIDs of the non-fastest finished executions in all speculatived ExecutionVertex in this jobvertex.
	private Set<ExecutionAttemptID> slowAttemptIdsWithSpeculative = new HashSet<>();

	// ExecutionAttemptIDs of the success finished executions in all non-speculatived ExecutionVertex in this jobvertex.
	private Set<ExecutionAttemptID> finishedAttemptIdsWithoutSpeculative = new HashSet<>();

	// All executionAttemptIDs in this jobvertex.
	private Set<ExecutionAttemptID> allAttemptIds = new HashSet<>();

}

Metrics

For each ExecutionJobVertex, we can use six metrics to measure and evaluate the efficiency of speculative execution that can be summarized in the job status and web page.

  • minFinishedForSpeculationThresholdMetrics is defined as how many ExecutionVertexs must be finished before scheduling speculative executions.

  • finishedExecutionCountMetrics is defined as the number of finished ExecutionVertexs.
  • speculativeExecutionCountMetrics is defined as how many speculative executions have been scheduled by scheduler.
  • speculativeThresholdOfTime is defined as the threshold of speculative executions in time dimension.
  • executionIndex2RunningTimespan is defined as the running time of the original execution in each ExecutionVertex.
  • speculativeExecutionFastestFinishedCountMetrics is defined as how many ExecutionVertex's speculative execution is faster reach FINISHED state than the original execution.

Web UI

If we don't work with web ui, when when speculative execution runs faster than original execution, the web ui will show that this task has been cancelled but the result of the job is correct.

Should take further discussion whether this work needs to be done.

Limitations

(1)JobType must be Batch.

(2)Cluster ResourceManager must be Yarn or K8s.

          If cluster ResourceManager is Yarn. NodeManagers' attributes should include machine ip attribute.

          If cluster ResourceManager is kubernetes. Nodes should be attached ip label.

(3)If user don't allow sink duplicate data to non key-value databases. SpeculativeExecution must not be enabled.

(4)Enable SpeculativeExecution only (4)Enable SpeculativeExecution only effective when a JobVertex a JobVertex with all input edges and all output edges are blocking. 

Configuration

All configurations related to this feature are added in JobManagerOptions.class.
Code Block
languagejava
titleJobManagerOptions
public static final ConfigOption<Boolean> SPECULATIVE_EXECUTION_ENABLED =
        key("flink.batch.speculative.enabled")
                .booleanType()
                .defaultValue(false)
                .withDescription("Whether to enable speculation of batch job.");

public static final ConfigOption<Long> SPECULATIVE_EXECUTION_INTERVAL_IN_MILLIS =
        key("flink.batch.speculative.interval.in.millis")
                .longType()
                .defaultValue(100L)
                .withDescription("How often to check for speculative executions.");

public static final ConfigOption<Double> SPECULATIVE_EXECUTION_MULTIPLIER =
        key("flink.batch.speculative.multiplier")
                .doubleType()
                .defaultValue(1.5)
                .withDescription("When the running time of a unfinished executionVertex is several times of" +
                        " the median of all completed executionVertexs, it will be speculated.");

public static final ConfigOption<Double> SPECULATIVE_EXECUTION_QUANTILE =
        key("flink.batch.speculative.quantile")
                .doubleType()
                .defaultValue(0.75)
                .withDescription("When the percentage of ExecutionVertex in an executionJobVertex are finished," +
                        " the speculative execution mechanism will be started. 0.9 means that 90% of the" +
                        " executionVertex are finished, then the speculative execution mechanism will be started.");

public static final ConfigOption<Long> SPECULATIVE_EXECUTION_LOG_INTERVAL_IN_MILLIS =
        key("flink.batch.speculative.log.interval.in.millis")
                .longType()
                .defaultValue(1000 * 60 * 5L)
                .withDescription("Interval in millis for speculative related log.");

public static final ConfigOption<Boolean> FLINK_BLACKLIST_ENABLE =
        key("flink.blacklist.enable")
                .booleanType()
                .defaultValue(false)
                .withDescription("Whether to enable blacklist job.");

public static final ConfigOption<Long> FLINK_BLACKLIST_TIMEOUT_IN_MILLIS =
        key("flink.blacklist.timeout.in.millis")
                .longType()
                .defaultValue(60 * 1000L)
                .withDescription("Indicates how long a black list record will be removed after added to the blacklist.");


Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? 
  • If we are changing behavior how will we phase out the older behavior? 
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

This FLIP is a new feature and so there is no compatible issue with previous versions.

Test Plan

Covered by unit tests.

Rejected Alternatives

None so farIf there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.