Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...


(2)JobBlackList, the job-level black list.

Code Block
languagejava
titleThe job-level black list
public class JobBlackList extends BlackList {

	/** The list of the black ip. This list is mainly for time out checking. */
    private final Queue<BlackListRecord> jobToIpBlackListRecords;

	/** The set of the black ip. This set is mainly for black host filter. */
    private final Set<String> jobBlackIpSet;

	public JobBlackList(BlackListConfig blackListConfig) {
        super(blackListConfig);
        this.jobToIpBlackListRecords = new ConcurrentLinkedQueue<>();
        this.jobBlackIpSet = Collections.newSetFromMap(new ConcurrentHashMap<>());
    }
	
	/** Add a ip to this black list. */
	public boolean addIpToBlackList(String ip) {}

	public Set<String> getAllBlackListIpSet() {}

	/** clear (job, ip) blacklist. */
	@Override
    public void clear() {}

	@Override
    public long removeTimeoutBlackList(long timeout) {}

	@Override
    public boolean isEmpty() {}

	/** Whether the given host has been added to black list. */
	public boolean containsIp(String ip) {}
}

...

In DefaultExecutionGraph I will add a member-variable jobBlackListSpeculativeListeners and when after create ExecutionGraph jobBlackListTracker will be add in jobBlackListSpeculativeListeners. JobBlackListTrackerJobBlackListSpeculativeListener.onSpeculativeExecutionHappened() will be called when the SpeculativeExecution detected a long tail task and start notify scheduler to scheduling a speculative execution.

Image Added

Code Block
languagejava
titleExecutionGraph
public interface ExecutionGraph extends AccessExecutionGraph {
	void registerJobBlackListSpeculativeListener(JobBlackListTracker jobBlackListTracker);
}

...

Add element to black list

We have implemented a machine-dimensional blacklist per job. The machine IP was added in the blacklist when an execution is recognized as a long-tail execution.

Remove element in black list

First JobBlackListSpeculativeListener.onSpeculativeExecutionHappened() will be called when the SpeculativeExecution detected a long tail task and start notify scheduler to scheduling a speculative execution.

Second ExecutionGraph will notify listener(JobBlackListTracker) it. 

Third JobBlackList in JobBlackListTracker will be added ip of original execution location.

Image Added

Remove element in black list

The BlackListTracker has implement Runnable and maintains the black machines of this job and removes expired elements periodically. Each element in blacklist contains IP and timestamp. The timestamp is used to decide whether the elements of the blacklist is expired or not. 
 The blacklist would remove the machine IP when it is out of date.

Pass the blacklist information to cluster ResourceManager

Yarn

Now Flink use hadoop version is 2.4.1, but if I want to use PlacementConstraints of Yarn, I must 

<hadoop.version>2.4.1</hadoop.version>

反射,yarn反射,yarn-3.0

When the executions are scheduled, we will add information of the blacklist to yarn PlacementConstraint. In this way, I can ensure that the yarn container is not on the machines in the blacklist.

k8s

https://hadoop.apache.org/docs/r3.1.1/hadoop-yarn/hadoop-yarn-site/PlacementConstraints.html

k8s

My code only supports Yarn integration. But as far as I know, we could My code only supports Yarn integration. But as far as I know, we could use nodeaffinity or podaffinity to achieve the same goal with Yarn PlacementConstraint in K8s integration. 

...

Manage middle ResultPartition 


Table of Contents
Metrics

// 必须要多少execution完成了,才开始预测执行的metrics
private AtomicInteger minFinishedForSpeculationThresholdMetrics;
// 已经完成的execution数的metrics
private AtomicInteger finishedExecutionCountMetrics;
// 有多少execution发生了预测执行的metrics
private Counter speculativeExecutionCountMetrics;
// execution运行时间维度,发生预测执行的阈值
private AtomicDouble speculativeThresholdOfTime;
// 每个ExecutionVertex中原execution的运行时间的metrics
private Map<String, AtomicDouble> executionIndex2RunningTimespan;

...

private Counter speculativeExecutionFastestFinishedCountMetrics;


Web UI

todo...

Limitations

(1)JobType is Batch.

(2)Cluster ResourceManager is Yarn or K8s.

(3)input split的限制?

(4)yarn要对ip打标

(5)k8s的限制?

(6)The (3)The JobVertex with all input edges and all output edges are blocking could enable speculative execution feature.


Configuration



@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
public static final ConfigOption<Boolean> SPECULATIVE_EXECUTION_ENABLED =
key("blink.batch.speculative.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Whether to enable speculation of batch job.");

@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
public static final ConfigOption<Long> SPECULATIVE_EXECUTION_INTERVAL_IN_MILLIS =
key("blink.batch.speculative.interval.in.millis")
.longType()
.defaultValue(100L)
.withDescription("How often to check for speculative tasks.");

@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
public static final ConfigOption<Double> SPECULATIVE_EXECUTION_MULTIPLIER =
key("blink.batch.speculative.multiplier")
.doubleType()
.defaultValue(1.5)
.withDescription("When the running time of a unfinished executionVertex is several times of" +
" the median of all completed executionVertexs, it will be speculated.");

@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
public static final ConfigOption<Double> SPECULATIVE_EXECUTION_QUANTILE =
key("blink.batch.speculative.quantile")
.doubleType()
.defaultValue(0.75)
.withDescription("When the percentage of ExecutionVertex in an executionJobVertex are completed," +
" the speculative execution mechanism will be started. 0.9 means that 90% of the" +
" executionVertex are completed, and the speculative execution mechanism will be started.");

@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
public static final ConfigOption<Long> SPECULATIVE_EXECUTION_LOG_INTERVAL_IN_MILLIS =
key("blink.batch.speculative.log.interval.in.millis")
.longType()
.defaultValue(1000 * 60 * 5L)
.withDescription("Interval in millis for speculative related log.");

/**
* Key and default value for 'blink.blacklist.enable' which indicates whether black
* list service is enabled.
*/
@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
public static final ConfigOption<Boolean> BLINK_BLACKLIST_ENABLE =
key("blink.blacklist.enable")
.booleanType()
.defaultValue(false)
.withDescription("Whether to enable blacklist job.");

/**
* Key and default value (five minutes) for 'blink.blacklist.timeout' which indicates
* how long a black list record will be removed after added to black list.
*/
@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
public static final ConfigOption<Long> BLINK_BLACKLIST_TIMEOUT_IN_MILLIS =
key("blink.blacklist.timeout.in.millis")
.longType()
.defaultValue(60 * 1000L)
.withDescription("indicates how long a black list record will be removed after added to black list.");;


Compatibility, Deprecation, and Migration Plan

...