...
I will introduce blacklist module into Flink used for filter node when the speculative executions are request slots.
...
basic plan
Blacklist is a kind of scheduling constraint. According to FLINK-11000 description this is a bigger feature.
There are several levels of blacklist, including (JobVertex, TaskManager) blacklist, (JobVertex Host) blacklist, (Job, TaskManager) blacklist, (Job, Host) blacklist, (Session, TaskManager) blacklist and (Session Host) blacklist.
I will must implement (Job, Host) blacklist for speculative execution feature. In order to implement FLINK-11000 feiendly friendly in the future, my interface also suit other blacklist descripted above.
The blacklist module is a thread that maintains the black machines of this job and removes expired elements periodically. Each element in blacklist contains IP and timestamp. The timestamp is used to decide whether the elements of the blacklist is expired or not.
这里加一个整体逻辑图。
Classes and Interfaces of (JobVertex Host) blacklist
(1)Abstract Class BlackList, each type of blacklist will extends it.
Since request resource logical both existed in JobMaster and ResourceManager side, so both JobMaster and ResourceManager need to get the blacklist information for filtering black node. But beacuse I only implement the (Job, Host) level blacklist in this feature, so I only consider maintaining the blacklist on the JobMaster side.
On the JobMaster side, the blacklist is encapsulated as an independent system component, which updates the blacklist by receiving the information of speculative execution happened, and filters resources through the blacklist when request resource.
On the ResourceManager side, it does not maintain the blacklist temporarily. When JobMaster request resources from ResourceManager, it pass the blacklist information to ResourceManager. ResourceManager needs to consider blacklist when request or filters resources and blacklist information will be encapsulated in the form of filtering resources required by external resource management system(such as yarn) when request new container.
Classes and Interfaces of (Job, Host) blacklist
(1)Abstract Class BlackList, each type of blacklist will extends it.
Code Block | ||||
---|---|---|---|---|
| ||||
public abstract class BlackList implements Serializable {
/** Black list configuration. */
protected final BlackListConfig blackListConfig;
public BlackList(BlackListConfig blackListConfig) {
this.blackListConfig = blackListConfig;
}
| ||||
Code Block | ||||
| ||||
public abstract class BlackList implements Serializable {
/** Black list configuration. */
protected final BlackListConfig blackListConfig;
public BlackList(BlackListConfig blackListConfig) {
this.blackListConfig = blackListConfig;
}
/**
* Remove time out black list records.
* @param timeout Time out time.
* @return Minimum timestamp of black list records.
*/
public abstract long removeTimeoutBlackList(long timeout);
/** Clear black list. */
public abstract void clear();
/** Is black list empty. */
public abstract boolean isEmpty();
} |
...
The BlackListTracker has implement Runnable and that responsibility is maintains the black machines of this job and removes expired elements periodically. Each element in blacklist contains IP and timestamp. The timestamp is used to decide whether each element in the elements of the blacklist is expired or not.
...
The same as yarn, when the executions are scheduled, I will add blacklist information to k8s PodSpec.
Mesos
According to
Jira | ||||||
---|---|---|---|---|---|---|
|
So we don’t need to think about how to pass the blacklist information to mesos.
Manage input and output of each ExecutionVertex
Manage InputSplit
Manage sink files
Manage middle ResultPartition
Metrics
// 必须要多少execution完成了,才开始预测执行的metrics
private AtomicInteger minFinishedForSpeculationThresholdMetrics;
// 已经完成的execution数的metrics
private AtomicInteger finishedExecutionCountMetrics;
// 有多少execution发生了预测执行的metrics
private Counter speculativeExecutionCountMetrics;
// execution运行时间维度,发生预测执行的阈值
private AtomicDouble speculativeThresholdOfTime;
// 每个ExecutionVertex中原execution的运行时间的metrics
private Map<String, AtomicDouble> executionIndex2RunningTimespan;
// 如果最快结束的execution是预测执行的execution,那么对相应的metrics进行汇报
private Counter speculativeExecutionFastestFinishedCountMetrics;
Web UI
todo...
Limitations
(1)JobType is Batch.
(2)Cluster ResourceManager is Yarn or K8s.
(3)input split的限制?
(4)yarn要对ip打标
(5)k8s的限制?
(6)The JobVertex with all input edges and all output edges are blocking could enable speculative execution feature.
Configuration
...
FLINK-22352, the community has decided to deprecate Mesos support in Apache Flink.
So we don’t need to think about how to pass the blacklist information to mesos.
Manage input and output of ExecutionVertex
Manage InputSplit
Manage middle ResultPartition
Manage sink files
Metrics
(1)
// 必须要多少execution完成了,才开始预测执行的metrics
private AtomicInteger minFinishedForSpeculationThresholdMetrics;
(2)
// 已经完成的execution数的metrics
private AtomicInteger finishedExecutionCountMetrics;
(3)
// 有多少execution发生了预测执行的metrics
private Counter speculativeExecutionCountMetrics;
(4)
// execution运行时间维度,发生预测执行的阈值
private AtomicDouble speculativeThresholdOfTime;
(5)
// 每个ExecutionVertex中原execution的运行时间的metrics
private Map<String, AtomicDouble> executionIndex2RunningTimespan;
(6)
// 如果最快结束的execution是预测执行的execution,那么对相应的metrics进行汇报
private Counter speculativeExecutionFastestFinishedCountMetrics;
Web UI
todo...
Limitations
(1)JobType is Batch.
(2)Cluster ResourceManager is Yarn or K8s.
(3)input split的限制?
(4)yarn要对ip打标
(5)k8s的限制?
(6)The JobVertex with all input edges and all output edges are blocking could enable speculative execution feature.
Configuration
All configurations related to this feature are added in JobManagerOptions.class.
Code Block | ||||
---|---|---|---|---|
| ||||
public static final ConfigOption<Boolean> SPECULATIVE_EXECUTION_ENABLED = key("flink.batch.speculative.enabled") .booleanType() .defaultValue(false) .withDescription("Whether to enable speculation of batch job."); public static final ConfigOption<Long> SPECULATIVE_EXECUTION_INTERVAL_IN_MILLIS = key("flink.batch.speculative.interval.in.millis") .longType() .defaultValue(100L) .withDescription("How often to check for speculative tasks."); public static final ConfigOption<Double> SPECULATIVE_EXECUTION_MULTIPLIER = key("flink.batch.speculative.multiplier") .doubleType() .defaultValue(1.5) .withDescription("When the running time of a unfinished executionVertex is several times of" + " the median of all completed executionVertexs, it will be speculated."); public static final ConfigOption<Double> SPECULATIVE_EXECUTION_QUANTILE = key("flink.batch.speculative.quantile") .doubleType() .defaultValue(0.75) .withDescription("When the percentage of ExecutionVertex in an executionJobVertex are completed," + " the speculative execution mechanism will be started. 0.9 means that 90% of the" + " executionVertex are completed, and the speculative execution mechanism will be started."); public static final ConfigOption<Long> SPECULATIVE_EXECUTION_LOG_INTERVAL_IN_MILLIS = key("flink.batch.speculative.log.interval.in.millis") .longType() .defaultValue(1000 * 60 * 5L) .withDescription("Interval in millis for speculative related log."); public static final ConfigOption<Boolean> BLINK_BLACKLIST_ENABLE = key("flink.blacklist.enable") .booleanType() .defaultValue(false) .withDescription("Whether to enable blacklist job."); public static final ConfigOption<Long> BLINK_BLACKLIST_TIMEOUT_IN_MILLIS = key(" |
...
flink.blacklist. |
...
timeout. |
...
in. |
...
millis") |
...
. |
...
longType() |
...
.defaultValue( |
...
60 * 1000L) .withDescription(" |
...
indicates how long a black list record will be removed after added to black list."); |
@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
public static final ConfigOption<Long>ConfigOption<Boolean> SPECULATIVE_EXECUTION_INTERVAL_IN_MILLISENABLED =
key("blinkflink.batch.speculative.interval.in.millisenabled")
.longTypebooleanType()
.defaultValue(100Lfalse)
.withDescription("HowWhether oftento toenable checkspeculation forof speculativebatch tasksjob.");
@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER));
public static final ConfigOption<Double>ConfigOption<Long> SPECULATIVE_EXECUTION_INTERVAL_IN_MULTIPLIERMILLIS =
key("blinkflink.batch.speculative.interval.in.multipliermillis")
.doubleTypelongType()
.defaultValue(1.5)
.withDescription("When the running time of a unfinished executionVertex is several times of" +
100L)
.withDescription("How theoften medianto ofcheck allfor completed executionVertexs, it will be speculatedspeculative tasks.");
@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
public static final ConfigOption<Double> SPECULATIVE_EXECUTION_QUANTILEMULTIPLIER =
key("blinkflink.batch.speculative.quantilemultiplier")
.doubleType()
.defaultValue(01.755)
.withDescription("When the percentagerunning time of a ExecutionVertexunfinished inexecutionVertex anis executionJobVertexseveral aretimes completed,of" +
" the median "of theall speculativecompleted executionexecutionVertexs, mechanismit will be startedspeculated. 0.9 means that 90% of the" +
");
public static final ConfigOption<Double> SPECULATIVE_EXECUTION_QUANTILE =
key("flink.batch.speculative.quantile")
.doubleType()
" executionVertex are completed, and the speculative execution mechanism will be started.");
@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
public static final ConfigOption<Long> SPECULATIVE_EXECUTION_LOG_INTERVAL_IN_MILLIS =
.defaultValue(0.75)
key("blink.batch.speculative.log.interval.in.millis").withDescription("When the percentage of ExecutionVertex in an executionJobVertex are completed," +
.longType()
" the speculative execution mechanism will be started.defaultValue(1000 * 60 * 5L)
0.9 means that 90% of the" +
.withDescription("Interval in millis for speculative related log.");
/**
* Key and default value" for 'blink.blacklist.enable' which indicates whether black
executionVertex are completed, and the speculative execution mechanism will be started."); * list service is enabled.
*/
@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
public static final ConfigOption<Boolean> BLINK_BLACKLIST_ENABLEConfigOption<Long> SPECULATIVE_EXECUTION_LOG_INTERVAL_IN_MILLIS =
key("blink.blacklist.enable"flink.batch.speculative.log.interval.in.millis")
.longType()
.booleanType(defaultValue(1000 * 60 * 5L)
.defaultValue(false)
withDescription("Interval in millis for speculative related log.");
public static final ConfigOption<Boolean> BLINK_BLACKLIST_ENABLE =
.withDescription("Whether to enable key("flink.blacklist job.enable");
/**
* Key and default value (five minutes) for 'blink.blacklist.timeout' which indicates
* how long a black list record will be removed after added to black list.
*/
@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
.booleanType()
.defaultValue(false)
.withDescription("Whether to enable blacklist job.");
public static final ConfigOption<Long> BLINK_BLACKLIST_TIMEOUT_IN_MILLIS =
key("blinkflink.blacklist.timeout.in.millis")
.longType()
.defaultValue(60 * 1000L)
.withDescription("indicates how long a black list record will be removed after added to black list.");;
Compatibility, Deprecation, and Migration Plan
...