Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Just like normal tasks, the speculative task is executed in parallel with the original one and shares the same failover and restart strategy.The original long tail tasks and speculative tasks can still retry with failure on their own tracks.But I think it should not be restarted globally when the counts of speculation execution failover reach the max-retry-counts. When a task fails, we could calculate its index(executionIndex) in the executionList by executionAttemptID. Then the scheduler takes a series of processing for the corresponding execution according to the executionIndex as shown below.

...

Most long tail tasks are caused by machine problems, so the speculative execution must runs run on a different machine from origin original execution.

I will introduce blacklist module into Flink used for filter node filtering nodes when the speculative executions are request slots.

...

Blacklist is a kind of scheduling constraint. According to to the description of FLINK-11000 description this , this is a bigger feature.There are several levels of blacklist, including (JobVertex, TaskManager) blacklist, (JobVertex Host) blacklist, (Job, TaskManager) blacklist, (Job, Host) blacklist, (Session, TaskManager) blacklist and (Session Host) blacklist. I must implement (Job, Host) blacklist for speculative execution feature. In order to implement FLINK-11000 friendly in the future, my interface also suit other blacklist descripted other blacklist described above.

The blacklist module is a thread that maintains the black machines of this job and removes expired elements periodically. Each element in the blacklist contains IP and timestamp. The timestamp is used to decide whether the elements of the blacklist is expired or not. Since request resource logical both existed exist in JobMaster and ResourceManager side, so both JobMaster and ResourceManager need to get the blacklist information for filtering black node. But beacuse I only implement Only the (Job, Host) level blacklist is implemented in this feature, so I only consider maintaining the blacklist on the JobMaster side.

On the JobMaster side, the blacklist is encapsulated as an independent system component, which updates the blacklist by receiving the information of speculative execution happened, and filters resources nodes through the blacklist when request requests resource.

On the ResourceManager side, it does not maintain the blacklist temporarily. When JobMaster request requests resources from ResourceManager, it pass passes the blacklist information to ResourceManager. ResourceManager needs to consider blacklist when request when requests or filters resources and blacklist information will be encapsulated in the form of filtering resources required by external resource management system(such as yarn) when request new container.

...

Code Block
languagejava
titleThe item of black list
public class BlackListRecord implements Serializable {
	/**
 	 * Black list record which stores the black host ip and
 	 * the time stamp when this record is added.
 	 */
	public class BlackListRecord implements Serializable {}

	/** The black host ip. */
    private final String ip;

    /** The time stamp when this black list record is added. */
    private final long timeStamp;
}


(4)A (4)An abstract Class called BlackListTracker, a thread that maintain blacklist info.

Code Block
languagejava
titlea thread that maintain blacklist info
public abstract class BlackListTracker implements Runnable {

	/** The executor to run the time out checking task. */
    private final ScheduledExecutor executor;
	
	/** The black list configuration. */
    protected final BlackListConfig blackListConfig;

	/** The black list timeout check future, will be canceled when black black list destroyed. */
    private AtomicReference<ScheduledFuture> timeoutCheckFuture;

	public BlackListTracker(ScheduledExecutor executor, BlackListConfig blackListConfig) {
        Preconditions.checkNotNull(blackListConfig);
        this.executor = executor;
        this.blackListConfig = blackListConfig;
        this.timeoutCheckFuture = new AtomicReference<>();
    }

	/**
     * Given the minimum time stamp of black list record. The function schedules a task to remove the black list
     * record when it got timeout.
     * @param minTimeStamp The minimum time stamp of black list record.
     */
    public void scheduleTimeOutCheck(long minTimeStamp) {}

	public boolean isBlackListEnabled() {
        return blackListConfig.isBlackListEnabled();
    }

	/** Clear the black list. */
    public abstract void clearBlackList();

    /** Get all black list ip. */
    public abstract Set<String> getAllBlackListIp();

	/** Clear the black list and cancel the timeout check task. */
    public void destroy() {}
}

...

(6)JobBlackListSpeculativeListener, event listener.

Code Block
languagejava
titleListener to be notified when speculative execution happened.
/** Listener to be notified when speculative execution happened. */
public interface JobBlackListSpeculativeListener {
    /**
     * When a speculative execution happened, the listener will be notified.
     * @param ip the ip
     */
    void onSpeculativeExecutionHappened(String ip);
}

...

(7)JobBlackListTracker, per-job blackList tracker.

Code Block
languagejava
titleper-job blackList tracker
public class JobBlackListTracker extends BlackListTracker implements JobBlackListSpeculativeListener {

	/** The black list of this job. */
    private final JobBlackList jobBlackList;

	public JobBlackListTracker(ScheduledExecutor executor, BlackListConfig blackListConfig) {
        super(executor, blackListConfig);
        this.jobBlackList = new JobBlackList(blackListConfig);
        scheduleTimeOutCheck(blackListConfig.getBlackListTimeOutInMillis());
    }

	@Override
    public void clearBlackList() {
        jobBlackList.clear();
    }

	@Override
    public Set<String> getAllBlackListIp() {
        if (blackListConfig.isBlackListEnabled()) {
            return jobBlackList.getAllBlackListIpSet();
        }
        return Collections.emptySet();
    }

	/** The time out checking task to be scheduled. /
    @Override
    public void run() {
        long minTimeStamp = jobBlackList.removeTimeoutBlackList(blackListConfig.getBlackListTimeOutInMillis());
        scheduleTimeOutCheck(minTimeStamp);
    }

    @Override
    public void onSpeculativeExecutionHappened(String ip) {
        if (blackListConfig.isBlackListEnabled()) {
            jobBlackList.addIpToBlackList(ip);
        }
    }
}

Init black list 

In DefaultExecutionGraph, I will add a member-variable jobBlackListSpeculativeListeners and when after create ExecutionGraph jobBlackListTracker . After creating ExecutionGraph, jobBlackListTracker will be add in jobBlackListSpeculativeListeners. JobBlackListSpeculativeListener.onSpeculativeExecutionHappened() will be called when the SpeculativeExecution detected detectes a long tail task and start starts to notify scheduler to scheduling a speculative execution.

...

Add element to black list

First JobBlackListSpeculativeListenerFirst, JobBlackListSpeculativeListener.onSpeculativeExecutionHappened() will be called when the SpeculativeExecution detected detectes a long tail task and start to notify scheduler to scheduling schedule a speculative execution.

Second, ExecutionGraph will notify listener(JobBlackListTracker) it. 

Third JobBlackList in JobBlackListTracker , IP of original execution location will be added ip of original execution location.to JobBlackList in JobBlackListTracker. 

Remove element in black list

The BlackListTracker has implement implemented Runnable that responsibility is which maintains the black machines of this job and removes expired elements periodically. Each element in the blacklist contains IP and timestamp. The timestamp is used to decide whether each this element in the blacklist is expired or not. 


Pass the blacklist information to cluster ResourceManager

Yarn

First, nodesNode's attributes should include machine ip attribute, then we can IP attribute, which enables us to control containers do not to allocate on some mechines by yarnmachines by Yarn PlacementConstraintsNow Flink use uses hadoop-2.x and request requests container by ResourceRequest api, it . It don't support PlacementConstraintsSo In So, in order to use PlacementConstraints, I will introduce  introduce hadoop-3.x SchedulingRequest api by java reflect mechanism.

When the executions are scheduled, I will add blacklist information to yarn PlacementConstraintto Yarn PlacementConstraint. In this way, I can ensure that the yarn the Yarn container is not on the machines in the blacklist.

...

We could achieve the same goal with Yarn PlacementConstraint in K8s integration by node-affinity if nodes had have attached ip IP label.

The same as yarnLike Yarn, when the executions are scheduled, I blacklist information will be add blacklist information to k8s PodSpec.

Mesos

According to FLINK-22352the community has decided to deprecate Mesos support in Apache Flink.

So we I don’t need to think about how to pass the blacklist information to mesos.

...

After I read FLINK-10205pr-6684 and code(master branch), I found that Flink now can't ensure the different attempt of a same ExecutionVertex to have the same inputsInputSplits. Because when a task failover, now simply returning the input splits InputSplits to the assigner and letting the next idling task take it should work. This is no problem because it should not matter which tasks processes which input splitInputSplits. If a failure occurs and some other task takes over the failed input splitsInputSplits, it would as if this task had processed these input splits these InputSplits from the very beginning.

But because of introduce introducing speculative execution, we must ensure that the InputSplits processed by speculative execution is the same as the original execution. So I will add a Map will be added in ExecutionVertex that key indicates ExecutionAttemptId and value indicates the index of InputSplit that this execution currently consumes.

...

For example, as shown below.

(a) In a an ExecutionVertex, after execution_1 has consumed inputSplit_0, it go goes on to consume inputSplit_1.

...

      Now inputSplitIndexMap data is { (execution_1, 1), (execution_2, 0) }.

(c) A failure occurs of with execution_1, then resetForNewExecutionInternal() in ExecutionVertex will be called.

...

(d) execution_2 consumed inputSplit_0 finished, it go goes on to consume inputSplit_1. And after execution_1 failover, execution_1_new occurs.

...

As shown below, for batch job with blocking shuffle. Because of introduce introducing speculative execution, all reduce executions in an ExecutionVertex will consume the resultPartition of map ExecutionVertex's fastest finished execution.

...