Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In one ExecutionJobVertex, when a configurable percentage(default 75%) of executions has been finished, the speculative execution thread begin really work.

...

In speculative execution thread all executions' interval between the current time and it first create/deploying time before it failover in one ExecutionJobVertex are calculated. when the running time of a execution is greater than a configurable multiple(default 1.5) of the median of the running time of other finished executions, this execution is defined as long tail execution.


Image Added

Slow Processing Throughput

In our product cluster of Alibaba in China, we only considered running time above algorithm could solve the long tail task problem sufficiently. So first version I will not take throughput into consider. After release this feature if Flink community if users have this requirement I will delevop develop next version. 

Scheduling of Speculative Executions

...

I will introduce blacklist module into Flink used for filter node when the speculative executions are request slots.

...

Basic plan

Blacklist is a kind of scheduling constraint. According to FLINK-11000 description this is a bigger feature.

...

Code Block
languagejava
titlea thread that maintain blacklist info
public abstract class BlackListTracker implements Runnable {

	/** The executor to run the time out checking task. */
    private final ScheduledExecutor executor;
	
	/** The black list configuration. */
    protected final BlackListConfig blackListConfig;

	/** The black list timeout check future, will be canceled when black black list destroyed. */
    private AtomicReference<ScheduledFuture> timeoutCheckFuture;

	public BlackListTracker(ScheduledExecutor executor, BlackListConfig blackListConfig) {
        Preconditions.checkNotNull(blackListConfig);
        this.executor = executor;
        this.blackListConfig = blackListConfig;
        this.timeoutCheckFuture = new AtomicReference<>();
    }

	/**
     * Given the minimum time stamp of black list record. The function schedules a task to remove the black list
     * record when it got timeout.
     * @param minTimeStamp The minimum time stamp of black list record.
     */
    public void scheduleTimeOutCheck(long minTimeStamp) {}

	public boolean isBlackListEnabled() {
        return blackListConfig.isBlackListEnabled();
    }

	/** Clear the black list. */
    public abstract void clearBlackList();

    /** Get all black list ip. */
    public abstract Set<String> getAllBlackListIp();

	/** Clear the black list and cancel the timeout check task. */
    public void destroy() {}
}


(5)JobBlackListSpeculativeListener(6)JobBlackListSpeculativeListener, event listener

Code Block
languagejava
titleListener to be notified when speculative execution happened.
/** Listener to be notified when speculative execution happened. */
public interface JobBlackListSpeculativeListener {
    /**
     * When a speculative execution happened, the listener will be notified.
     * @param ip the ip
     */
    void onSpeculativeExecutionHappened(String ip);
}


(5)JobBlackListTracker(7)JobBlackListTracker, per-job blackList tracker

...

First, nodes' attributes should include machine ip attribute, then we can control containers do not on some mechines by yarn PlacementConstraints. Now Flink use hadoop-2.x and request container by ResourceRequest api, it don't support PlacementConstraints. So for In order to use PlacementConstraints, I will introduce hadoop-3.x SchedulingRequest api by java reflect mechanism.

...

Code Block
languagejava
titleDefaultExecutionGraph
public class JobVertex implements java.io.Serializable {

	// ExecutionAttemptIDs of the fastest finished executions in all speculatived ExecutionVertex in this jobvertex.
	private Set<ExecutionAttemptID> fastAttemptIdsWithSpeculative = new HashSet<>();

	// ExecutionAttemptIDs of the non-fastest finished executions in all speculatived ExecutionVertex in this jobvertex.
	private Set<ExecutionAttemptID> slowAttemptIdsWithSpeculative = new HashSet<>();

	// ExecutionAttemptIDs of the success finished executions in all non-speculatived ExecutionVertex in this jobvertex.
	private Set<ExecutionAttemptID> finishedAttemptIdsWithoutSpeculative = new HashSet<>();

	// All executionAttemptIDs in this jobvertex.
	private Set<ExecutionAttemptID> allAttemptIds = new HashSet<>();

}

Metrics

We For each ExecutionJobVertex, we can use six metrics to measure and evaluate the efficiency of speculative execution that can be summarized in the job status and Web web page.

// SpeculationRatio is defined as the proportion of speculative tasks in the total tasks of a Job, and SuccessfulSpeculationRatio is the percent of speculative task which finish before the original long tail ones in the total launched speculative tasks. 

...

  • minFinishedForSpeculationThresholdMetrics is defined as how many ExecutionVertexs must be finished before scheduling speculative executions.

  • finishedExecutionCountMetrics is defined as the number of finished ExecutionVertexs.
  • speculativeExecutionCountMetrics is defined as how many speculative executions have been scheduled by scheduler.
  • speculativeThresholdOfTime is defined as the threshold of speculative executions in time dimension.
  • executionIndex2RunningTimespan is defined as the running time of the original execution in each ExecutionVertex.
  • speculativeExecutionFastestFinishedCountMetrics is defined as how many ExecutionVertex's speculative execution is faster reach FINISHED state than the original execution.

...

// 如果最快结束的execution是预测执行的execution,那么对相应的metrics进行汇报
If the fastest ending execution is the predicted execution, report the corresponding metrics

...




Web UI

After I have implemented this feature. When speculative execution runs faster then original execution, the flink ui will show that this task has been cancelled. But the result of the job is correct, which is in full compliance with our expectations.

...

todo...

Limitations

(1)JobType is must be Batch.

(2)Cluster ResourceManager is must be Yarn or K8s.

(3)input split的限制?

(4)yarn要对ip打标

(5)k8s的限制?

          If cluster ResourceManager is Yarn. NodeManagers' attributes should include machine ip attribute.

          If cluster ResourceManager is kubernetes. Nodes should be attached ip label.

(3)If user don't allow sink duplicate data to non key-value databases. SpeculativeExecution must not be enabled.

(4)Enable SpeculativeExecution only effective when a JobVertex (6)The JobVertex with all input edges and all output edges are blocking could enable speculative execution feature. 

Configuration

All configurations related to this feature are added in JobManagerOptions.class.

...