Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

There are two ways of code refactoring(I suggest the second option.):

  1. Add a membera member-variable named speculativeExecution that is similar to the currentExecution. However, by this way, many if judgments will be appear are needed in the scheduler and failover code. Moreover, it will reduce reduces the code flexibility if there are more than two executions existed simultaneously. that only one running speculative execution exist, which is unable to meet the demands if more than two simultaneous running executions are needed.
  2. Change the currentExecution in ExecutionVertex to an ArrayList named executionList, which means that there can be multiple executions in an ExecutionVertex simultaneously. For each execution in the executionList, there is no difference in their behavior(e.g., failover, request slot, etc.).

...

Code Block
languagejava
titleThe item of black list
public class BlackListRecord implements Serializable {
	/**
 	 * Black list record which stores the black host ip and
 	 * the time stamp when this record is added.
 	 */
	public class BlackListRecord implements Serializable {}

	/** The black host ip. */
    private final String ip;

    /** The time stamp when this black list record is added. */
    private final long timeStamp;
}


(4)An abstract Class called BlackListTracker, a thread that maintain blacklist info.

Code Block
languagejava
titlea thread that maintain blacklist info
public abstract class BlackListTracker implements Runnable {

	/** The executor to run the time out checking task. */
    private final ScheduledExecutor executor;
	
	/** The black list configuration. */
    protected final BlackListConfig blackListConfig;

	/** The black list timeout check future, will be canceled when black black list destroyed. */
    private AtomicReference<ScheduledFuture> timeoutCheckFuture;

	public BlackListTracker(ScheduledExecutor executor, BlackListConfig blackListConfig) {
        Preconditions.checkNotNull(blackListConfig);
        this.executor = executor;
        this.blackListConfig = blackListConfig;
        this.timeoutCheckFuture = new AtomicReference<>();
    }

	/**
     * Given the minimum time stamp of black list record. The function schedules a task to remove the black list
     * record when it got timeout.
     * @param minTimeStamp The minimum time stamp of black list record.
     */
    public void scheduleTimeOutCheck(long minTimeStamp) {}

	public boolean isBlackListEnabled() {
        return blackListConfig.isBlackListEnabled();
    }

	/** Clear the black list. */
    public abstract void clearBlackList();

    /** Get all black list ip. */
    public abstract Set<String> getAllBlackListIp();

	/** Clear the black list and cancel the timeout check task. */
    public void destroy() {}
}


(6)JobBlackListSpeculativeListener, event listener.

Code Block
languagejava
titleListener to be notified when speculative execution happened.
/** Listener to be notified when speculative execution happened. */
public interface JobBlackListSpeculativeListener {
    /**
     * When a speculative execution happened, the listener will be notified.
     * @param ip the ip
     */
    void onSpeculativeExecutionHappened(String ip);
}


(7)JobBlackListTracker, per-job blackList tracker.

Code Block
languagejava
titleper-job blackList tracker
public class JobBlackListTracker extends BlackListTracker implements JobBlackListSpeculativeListener {

	/** The black list of this job. */
    private final JobBlackList jobBlackList;

	public JobBlackListTracker(ScheduledExecutor executor, BlackListConfig blackListConfig) {
        super(executor, blackListConfig);
        this.jobBlackList = new JobBlackList(blackListConfig);
        scheduleTimeOutCheck(blackListConfig.getBlackListTimeOutInMillis());
    }

	@Override
    public void clearBlackList() {
        jobBlackList.clear();
    }

	@Override
    public Set<String> getAllBlackListIp() {
        if (blackListConfig.isBlackListEnabled()) {
            return jobBlackList.getAllBlackListIpSet();
        }
        return Collections.emptySet();
    }

	/** The time out checking task to be scheduled. /
    @Override
    public void run() {
        long minTimeStamp = jobBlackList.removeTimeoutBlackList(blackListConfig.getBlackListTimeOutInMillis());
        scheduleTimeOutCheck(minTimeStamp);
    }

    @Override
    public void onSpeculativeExecutionHappened(String ip) {
        if (blackListConfig.isBlackListEnabled()) {
            jobBlackList.addIpToBlackList(ip);
        }
    }
}

Init black list 

In DefaultExecutionGraph, I will add a member-variable jobBlackListSpeculativeListeners. After creating ExecutionGraph, jobBlackListTracker will be add in jobBlackListSpeculativeListeners. JobBlackListSpeculativeListener.onSpeculativeExecutionHappened() will be called when the SpeculativeExecution detectes a long tail task and starts to notify scheduler to scheduling a speculative execution.

...

Code Block
languagejava
titleExecutionGraph
public interface ExecutionGraph extends AccessExecutionGraph {
	void registerJobBlackListSpeculativeListener(JobBlackListTracker jobBlackListTracker);
}


Code Block
languagejava
titleDefaultExecutionGraph
public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionGraphAccessor {

	private final List<JobBlackListSpeculativeListener> jobBlackListSpeculativeListeners;
	
	public DefaultExecutionGraph() {
		this.jobBlackListSpeculativeListeners = new ArrayList<>();
	}
	
	@Override
    public void registerJobBlackListSpeculativeListener(JobBlackListTracker listener) {
        if (listener != null) {
            jobBlackListSpeculativeListeners.add(listener);
        }
    }

	@Override
    public List<JobBlackListSpeculativeListener> getJobBlackListSpeculativeListeners() {
        return jobBlackListSpeculativeListeners;
    }
}

Add element to black list

First, JobBlackListSpeculativeListener.onSpeculativeExecutionHappened() will be called when the SpeculativeExecution detectes detects a long tail task and start to notify scheduler to schedule a speculative execution.

...

Node's attributes should include machine IP attribute, which enables us to control containers not to allocate on some machines by Yarn PlacementConstraintsNow Flink uses hadoopHadoop-2.x and requests container by ResourceRequest api. It don't support PlacementConstraintsSo, in order to use PlacementConstraints, I introduce hadoopHadoop-3.x SchedulingRequest api by java reflect mechanism.

...

So I don’t need to think about how to pass the blacklist information to mesosMesos.

Manage input and output of ExecutionVertex

...

As shown below, for batch job with blocking shuffle(similar to MapReduce). Because of introducing speculative execution, all reduce executions in an ExecutionVertex will consume the resultPartition of map ExecutionVertex's fastest finished execution.

  • When all upstream ExecutionVertexs run finishedOnce all map ExecutionVertexs finish, all executions in down stream reduce ExecutionVertex should be notifyed notified to update  update its inputChannels from unknow UNKNOW to local/remote so LOCAL/REMOTE. So there are some modifys modifications in Execution.updatePartitionConsumers().
  • When If a reduce task can't read data from a blocking result partition, if its input resultPartition(is not available), the produce ExecutionVertex producer ExecutionVertex and all consumer ExecutionVertexs consumer ExecutionVertexs will be revokedrestart. For upstream  produce ExecutionVertexmap producer ExecutionVertex, I think that SpeculativeScheduler thread should still workworks. For all consumer ExecutionVertexs all consumer ExecutionVertexs I think we should kill all speculative executions and only restart the original execution in the region.
  • Multiple executions Executions of a upstream map ExecutionVertex will produce Multiple ResultPartitionsproduce multiple resultPartitions. When all upstream map ExecutionVertexs run finishedfinish, the inputChannel of down stream reduce executions will be updated to consume the fastest finished execution of upstream. So the map ExecutionVertex. To this end, I add a member-variable named fastestFinishedExecution in ExecutionVertex, which is used for create creating PartitionInfo which that is used for update down stream updating reduce executions' inputChannels from unknow UNKNOW to localLOCAL/remoteREMOTE. 
  • In order to avoid cause causing problems when multiple executions in one ExecutionVertex finished finish at the same time. A , a member-variable named vertexFinished was is added in ExecutionVertex used for , which indicates whether this ExecutionVertex run has finished and double checked check locking pattern in ExecutionVertex.executionFinished(). In After double checking locking pattern in ExecutionVertex.executionFinished(), after double checked locking pattern finishPartitionsAndUpdateConsumers() should will be called instead of being called in Execution.markFinished(),then . Then non-fastest finish or running executions in this ExecutionVertex should will be canceled.
Code Block
languagejava
titleDefaultExecutionGraph
public class ExecutionVertex
        implements AccessExecutionVertex, Archiveable<ArchivedExecutionVertex> {
	private Execution fastestFinishedExecution = null;
	private volatile boolean vertexFinished = false;
}

Manage sink files

When batch job write writes record into file or Key-value databases, this feature could be enabled.

  • Sink to key-value databases, nothing should no more steps are needed to do with speculative execution feature.
  • Sink to file, I will add a a global unique ExecutionAttemptID suffix will be added after the normal task names. Finallyfile name. Then, some file files will be deleted or renamed when finalizeOnMaster() is called.


As shown below, four hashSet hashSets will be created and added some global unique ExecutionAttemptIDs . When all task finished, different files will face different processing methods according to which set the suffix is in.

Image Removed

will be added to them.

  • HashSet fastAttemptIdsWithSpeculative is responsible for storing all ExecutionAttemptIDs of the fastest finished executions in all speculated ExecutionVertex in this jobVertex.

  • HashSet slowAttemptIdsWithSpeculative is responsible for storing all ExecutionAttemptIDs of the non-fastest finished executions in all speculated ExecutionVertex in this jobVertex.

  • HashSet finishedAttemptIdsWithoutSpeculative is responsible for storing all ExecutionAttemptIDs of the success finished executions in all non-speculated ExecutionVertex in this jobVertex.

  • HashSet allAttemptIds is responsible for storing all ExecutionAttemptIDs in this jobVertex include failed execution.

Once all tasks finish, different processing methods will be applied to different files according to which HashSet its suffix is in.

Image Added

Code Block
languagejava
titleDefaultExecutionGraph
public class JobVertex implements java.io.Serializable {
	// ExecutionAttemptIDs of the fastest finished executions in all speculated ExecutionVertex in this jobVertex
Code Block
languagejava
titleDefaultExecutionGraph
public class JobVertex implements java.io.Serializable {
	// ExecutionAttemptIDs of the fastest finished executions in all speculatived ExecutionVertex in this jobvertex.
	private Set<ExecutionAttemptID> fastAttemptIdsWithSpeculative = new HashSet<>();

	// ExecutionAttemptIDs of the non-fastest finished executions in all speculatived ExecutionVertex in this jobvertex.
	private Set<ExecutionAttemptID> slowAttemptIdsWithSpeculative = new HashSet<>();

	// ExecutionAttemptIDs of the success finished executions in all non-speculatived ExecutionVertex in this jobvertex.
	private Set<ExecutionAttemptID> finishedAttemptIdsWithoutSpeculative = new HashSet<>();

	// All executionAttemptIDs in this jobvertex.
	private Set<ExecutionAttemptID> allAttemptIdsfastAttemptIdsWithSpeculative = new HashSet<>();
}

Metrics

For each ExecutionJobVertex, we can use six metrics to measure and evaluate the efficiency of speculative execution that can be summarized in the job status and web page.

  • minFinishedForSpeculationThresholdMetrics is defined as how many ExecutionVertexs must be finished before scheduling speculative executions.

  • finishedExecutionCountMetrics is defined as the number of finished ExecutionVertexs.
  • speculativeExecutionCountMetrics is defined as how many speculative executions have been scheduled by scheduler.
  • speculativeThresholdOfTime is defined as the threshold of speculative executions in time dimension.
  • executionIndex2RunningTimespan is defined as the running time of the original execution in each ExecutionVertex.
  • speculativeExecutionFastestFinishedCountMetrics is defined as how many ExecutionVertex's speculative execution is faster reach FINISHED state than the original execution.

Web UI

If we don't work with web ui, when speculative execution runs faster than original execution, the web ui will show that this task has been cancelled but the result of the job is correct.

Should take further discussion whether this work needs to be done.

Limitations

(1)JobType must be Batch.

(2)Cluster ResourceManager must be Yarn or K8s.

          If cluster ResourceManager is Yarn. NodeManagers' attributes should include machine ip attribute.

          If cluster ResourceManager is kubernetes. Nodes should be attached ip label.

(3)If user don't allow sink duplicate data to non key-value databases. SpeculativeExecution must not be enabled.

(4)Enable SpeculativeExecution only effective when a JobVertex with all input edges and all output edges are blocking. 

Image Removed

Configuration

All configurations related to this feature are added in JobManagerOptions.class.

	// ExecutionAttemptIDs of the non-fastest finished executions in all speculated ExecutionVertex in this jobVertex.
	private Set<ExecutionAttemptID> slowAttemptIdsWithSpeculative = new HashSet<>();

	// ExecutionAttemptIDs of the success finished executions in all non-speculated ExecutionVertex in this jobVertex.
	private Set<ExecutionAttemptID> finishedAttemptIdsWithoutSpeculative = new HashSet<>();

	// All ExecutionAttemptIDs in this jobVertex.
	private Set<ExecutionAttemptID> allAttemptIds = new HashSet<>();
}

Metrics

For each ExecutionJobVertex, I use six metrics to measure and evaluate the efficiency of speculative execution that can be summarized in the job status and web page.

  • minFinishedForSpeculationThresholdMetrics is defined as the minimal number of the finished ExecutionVertexs before scheduling speculative executions.

  • finishedExecutionCountMetrics is defined as the number of finished ExecutionVertexs.
  • speculativeExecutionCountMetrics is defined as the number of speculative executions that are scheduled by scheduler.
  • speculativeThresholdOfTime is defined as the threshold time of speculative executions.
  • executionIndex2RunningTimespan is defined as the running time of the original execution in each ExecutionVertex.
  • speculativeExecutionFastestFinishedCountMetrics is defined as the number of ExecutionVertex's speculative executions that reach FINISHED state faster than the original execution.

Web UI

If we don't modify the code of web UI, when the speculative execution runs faster than the original execution, the web UI will show that this task has been cancelled. But the result of the batch job is correct.

More discussion is needed to decide whether the web UI needs to be modified.

Limitations

(1)JobType must be batch job.

(2)Cluster ResourceManager must be Yarn or K8s.

          If cluster ResourceManager is Yarn, NodeManagers' attributes should include machine IP attribute.

          If cluster ResourceManager is K8s, IP label should be attached to Nodes.

(3)If users don't allow to sink duplicate data to non-key-value databases. SpeculativeExecution can't be used.

(4)SpeculativeExecution is effective only when the JobVertex with all input and output edges are blocking. 

Image Added

Configuration

All configurations related to this feature are added in JobManagerOptions.class.
Code Block
languagejava
titleJobManagerOptions
public static final ConfigOption<Boolean> SPECULATIVE_EXECUTION_ENABLED =
        key("flink.batch.speculative.enabled")
                .booleanType()
                .defaultValue(false)
                .withDescription("Whether to enable speculation of batch job.");

public static final ConfigOption<Long> SPECULATIVE_EXECUTION_INTERVAL_IN_MILLIS
Code Block
languagejava
titleJobManagerOptions
public static final ConfigOption<Boolean> SPECULATIVE_EXECUTION_ENABLED =
        key("flink.batch.speculative.enabledinterval.in.millis")
                .booleanTypelongType()
                .defaultValue(false100L)
                .withDescription("WhetherHow often to enablecheck speculationfor ofspeculative batch jobexecutions.");

public static final ConfigOption<Long>ConfigOption<Double> SPECULATIVE_EXECUTION_INTERVAL_IN_MILLISMULTIPLIER =
        key("flink.batch.speculative.interval.in.millismultiplier")
                .longTypedoubleType()
                .defaultValue(100L1.5)
                .withDescription("HowWhen often to check for speculative executions.");

public static final ConfigOption<Double> SPECULATIVE_EXECUTION_MULTIPLIER =
        key("flink.batch.speculative.multiplier")the running time of a unfinished executionVertex is several times of" +
                .doubleType()
        " the median of all completed ExecutionVertexs, it will be speculated.defaultValue(1.5");

public static final ConfigOption<Double> SPECULATIVE_EXECUTION_QUANTILE =
           .withDescription("When the running time of a unfinished executionVertex is several times of" +
key("flink.batch.speculative.quantile")
                .doubleType()
                .defaultValue(0.75)
   " the median of all completed executionVertexs, it will be speculated.");

public static final ConfigOption<Double> SPECULATIVE_EXECUTION_QUANTILE =
        key("flink.batch.speculative.quantile") .withDescription("When the percentage of ExecutionVertex in an ExecutionJobVertex are finished," +
                .doubleType()
        " the speculative execution mechanism will be started. .defaultValue(0.75)0.9 means that 90% of the" +
                .withDescription("When the percentage of ExecutionVertex in an executionJobVertex        " ExecutionVertex are finished," +
      then the speculative execution mechanism will be started.");

public static final ConfigOption<Double> SPECULATIVE_EXECUTION_COUNT =
        key("flink.batch.speculative.execution.count")
           " the speculative execution mechanism will be started. 0.9 means that 90% of the" +
 .longType()
                .defaultValue(1)
                .withDescription("The executionVertexnumber areof finished,speculative thenexecutions thethat speculativeexisted executionin mechanisman willExecutionVertex be startedsimultaneously.");

public static final ConfigOption<Long> SPECULATIVE_EXECUTION_LOG_INTERVAL_IN_MILLIS =
        key("flink.batch.speculative.log.interval.in.millis")
                .longType()
                .defaultValue(1000 * 60 * 5L)
                .withDescription("Interval in millis for speculative related log.");

public static final ConfigOption<Boolean> FLINK_BLACKLIST_ENABLE =
        key("flink.blacklist.enable")
                .booleanType()
                .defaultValue(false)
                .withDescription("Whether to enable blacklist job.");

public static final ConfigOption<Long> FLINK_BLACKLIST_TIMEOUT_IN_MILLIS =
        key("flink.blacklist.timeout.in.millis")
                .longType()
                .defaultValue(60 * 1000L)
                .withDescription("Indicates how long a black list record will be removed after added to the blacklist.");

...

This FLIP is a new feature and so . So there is no compatible issue with previous versions.

...