Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

I will introduce blacklist module into Flink used for filter node when the speculative executions are request slots.

...

basic plan

Blacklist is a kind of scheduling constraint. According to FLINK-11000 description this is a bigger feature.

There are several levels of blacklist, including (JobVertex, TaskManager) blacklist, (JobVertex Host) blacklist, (Job, TaskManager) blacklist, (Job, Host) blacklist, (Session, TaskManager) blacklist and (Session Host) blacklist.

I will must implement (Job, Host) blacklist for speculative execution feature. In order to implement FLINK-11000  feiendly friendly in the future, my interface also suit other blacklist descripted above.

The blacklist module is a thread that maintains the black machines of this job and removes expired elements periodically. Each element in blacklist contains IP and timestamp. The timestamp is used to decide whether the elements of the blacklist is expired or not.

这里加一个整体逻辑图。

Classes and Interfaces of (JobVertex Host) blacklist

(1)Abstract Class BlackList, each type of blacklist will extends it.

 Since request resource logical both existed in JobMaster and ResourceManager side, so both JobMaster and ResourceManager need to get the blacklist information for filtering black node. But beacuse I only implement the (Job, Host) level blacklist in this feature, so I only consider maintaining the blacklist on the JobMaster side.

On the JobMaster side, the blacklist is encapsulated as an independent system component, which updates the blacklist by receiving the information of speculative execution happened, and filters resources through the blacklist when request resource.

On the ResourceManager side, it does not maintain the blacklist temporarily. When JobMaster request resources from ResourceManager, it pass the blacklist information to ResourceManagerResourceManager needs to consider blacklist when request or filters resources and blacklist information will be encapsulated in the form of filtering resources required by external resource management system(such as yarn) when request new container.

Classes and Interfaces of (Job, Host) blacklist

(1)Abstract Class BlackList, each type of blacklist will extends it.

Code Block
languagejava
titleAbstract Class BlackList
public abstract class BlackList implements Serializable {
	/** Black list configuration. */
	protected final BlackListConfig blackListConfig;

	public BlackList(BlackListConfig blackListConfig) {
        this.blackListConfig = blackListConfig;
    }

    
Code Block
languagejava
titleAbstract Class BlackList
public abstract class BlackList implements Serializable {
	/** Black list configuration. */
	protected final BlackListConfig blackListConfig;

	public BlackList(BlackListConfig blackListConfig) {
        this.blackListConfig = blackListConfig;
    }

    /**
     * Remove time out black list records.
     * @param timeout Time out time.
     * @return Minimum timestamp of black list records.
     */
    public abstract long removeTimeoutBlackList(long timeout);

    /** Clear black list. */
    public abstract void clear();

	/** Is black list empty. */
    public abstract boolean isEmpty();
}

...

The BlackListTracker has implement Runnable and that responsibility is maintains the black machines of this job and removes expired elements periodically. Each element in blacklist contains IP and timestamp. The timestamp is used to decide whether each element in the elements of the blacklist is expired or not. 


...

The same as yarn, when the executions are scheduled, I will add blacklist information to k8s PodSpec.

Mesos

According to 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-22352
 the community decided to deprecate Mesos support in Apache Flink.

So we don’t need to think about how to pass the blacklist information to mesos.

Manage input and output of each ExecutionVertex

Manage InputSplit

Manage sink files

Manage middle ResultPartition 

Metrics

// 必须要多少execution完成了,才开始预测执行的metrics
private AtomicInteger minFinishedForSpeculationThresholdMetrics;
// 已经完成的execution数的metrics
private AtomicInteger finishedExecutionCountMetrics;
// 有多少execution发生了预测执行的metrics
private Counter speculativeExecutionCountMetrics;
// execution运行时间维度,发生预测执行的阈值
private AtomicDouble speculativeThresholdOfTime;
// 每个ExecutionVertex中原execution的运行时间的metrics
private Map<String, AtomicDouble> executionIndex2RunningTimespan;
// 如果最快结束的execution是预测执行的execution,那么对相应的metrics进行汇报
private Counter speculativeExecutionFastestFinishedCountMetrics;

Web UI

todo...

Limitations

(1)JobType is Batch.

(2)Cluster ResourceManager is Yarn or K8s.

(3)input split的限制?

(4)yarn要对ip打标

(5)k8s的限制?

(6)The JobVertex with all input edges and all output edges are blocking could enable speculative execution feature.

Image Removed

Configuration

...

FLINK-22352the community has decided to deprecate Mesos support in Apache Flink.

So we don’t need to think about how to pass the blacklist information to mesos.

Manage input and output of ExecutionVertex

Manage InputSplit


Manage middle ResultPartition 


Manage sink files

Metrics


(1)
// 必须要多少execution完成了,才开始预测执行的metrics
private AtomicInteger minFinishedForSpeculationThresholdMetrics;

(2)
// 已经完成的execution数的metrics
private AtomicInteger finishedExecutionCountMetrics;

(3)
// 有多少execution发生了预测执行的metrics
private Counter speculativeExecutionCountMetrics;

(4)
// execution运行时间维度,发生预测执行的阈值
private AtomicDouble speculativeThresholdOfTime;

(5)
// 每个ExecutionVertex中原execution的运行时间的metrics
private Map<String, AtomicDouble> executionIndex2RunningTimespan;

(6)
// 如果最快结束的execution是预测执行的execution,那么对相应的metrics进行汇报
private Counter speculativeExecutionFastestFinishedCountMetrics;


Web UI

todo...

Limitations

(1)JobType is Batch.

(2)Cluster ResourceManager is Yarn or K8s.

(3)input split的限制?

(4)yarn要对ip打标

(5)k8s的限制?

(6)The JobVertex with all input edges and all output edges are blocking could enable speculative execution feature.

Image Added


Configuration

All configurations related to this feature are added in JobManagerOptions.class.
Code Block
languagejava
titleDefaultExecutionGraph
public static final ConfigOption<Boolean> SPECULATIVE_EXECUTION_ENABLED =
        key("flink.batch.speculative.enabled")
                .booleanType()
                .defaultValue(false)
                .withDescription("Whether to enable speculation of batch job.");

public static final ConfigOption<Long> SPECULATIVE_EXECUTION_INTERVAL_IN_MILLIS =
        key("flink.batch.speculative.interval.in.millis")
                .longType()
                .defaultValue(100L)
                .withDescription("How often to check for speculative tasks.");

public static final ConfigOption<Double> SPECULATIVE_EXECUTION_MULTIPLIER =
        key("flink.batch.speculative.multiplier")
                .doubleType()
                .defaultValue(1.5)
                .withDescription("When the running time of a unfinished executionVertex is several times of" +
                        " the median of all completed executionVertexs, it will be speculated.");

public static final ConfigOption<Double> SPECULATIVE_EXECUTION_QUANTILE =
        key("flink.batch.speculative.quantile")
                .doubleType()
                .defaultValue(0.75)
                .withDescription("When the percentage of ExecutionVertex in an executionJobVertex are completed," +
                        " the speculative execution mechanism will be started. 0.9 means that 90% of the" +
                        " executionVertex are completed, and the speculative execution mechanism will be started.");

public static final ConfigOption<Long> SPECULATIVE_EXECUTION_LOG_INTERVAL_IN_MILLIS =
        key("flink.batch.speculative.log.interval.in.millis")
                .longType()
                .defaultValue(1000 * 60 * 5L)
                .withDescription("Interval in millis for speculative related log.");

public static final ConfigOption<Boolean> BLINK_BLACKLIST_ENABLE =
        key("flink.blacklist.enable")
                .booleanType()
                .defaultValue(false)
                .withDescription("Whether to enable blacklist job.");

public static final ConfigOption<Long> BLINK_BLACKLIST_TIMEOUT_IN_MILLIS =
        key("

...

flink.blacklist.

...

timeout.

...

in.

...

millis")

...


                .

...

longType()

...


                .defaultValue(

...

60 * 1000L)
                .withDescription("

...

indicates how long a black list record will be removed after added to black list.");




@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
public static final ConfigOption<Long>ConfigOption<Boolean> SPECULATIVE_EXECUTION_INTERVAL_IN_MILLISENABLED =
key("blinkflink.batch.speculative.interval.in.millisenabled")
.longTypebooleanType()
.defaultValue(100Lfalse)
.withDescription("HowWhether oftento toenable checkspeculation forof speculativebatch tasksjob.");
@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER));

public static final ConfigOption<Double>ConfigOption<Long> SPECULATIVE_EXECUTION_INTERVAL_IN_MULTIPLIERMILLIS =
key("blinkflink.batch.speculative.interval.in.multipliermillis")
.doubleTypelongType()
.defaultValue(1.5)
.withDescription("When the running time of a unfinished executionVertex is several times of" +
100L)
.withDescription("How theoften medianto ofcheck allfor completed executionVertexs, it will be speculatedspeculative tasks.");
@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)

public static final ConfigOption<Double> SPECULATIVE_EXECUTION_QUANTILEMULTIPLIER =
key("blinkflink.batch.speculative.quantilemultiplier")
.doubleType()
.defaultValue(01.755)
.withDescription("When the percentagerunning time of a ExecutionVertexunfinished inexecutionVertex anis executionJobVertexseveral aretimes completed,of" +
" the median "of theall speculativecompleted executionexecutionVertexs, mechanismit will be startedspeculated. 0.9 means that 90% of the" +
");

public static final ConfigOption<Double> SPECULATIVE_EXECUTION_QUANTILE =
key("flink.batch.speculative.quantile")
.doubleType()
" executionVertex are completed, and the speculative execution mechanism will be started.");
@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
public static final ConfigOption<Long> SPECULATIVE_EXECUTION_LOG_INTERVAL_IN_MILLIS =
.defaultValue(0.75)
key("blink.batch.speculative.log.interval.in.millis").withDescription("When the percentage of ExecutionVertex in an executionJobVertex are completed," +
.longType()
" the speculative execution mechanism will be started.defaultValue(1000 * 60 * 5L)
0.9 means that 90% of the" +
.withDescription("Interval in millis for speculative related log.");
/**
* Key and default value" for 'blink.blacklist.enable' which indicates whether black
executionVertex are completed, and the speculative execution mechanism will be started."); * list service is enabled.
*/
@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)

public static final ConfigOption<Boolean> BLINK_BLACKLIST_ENABLEConfigOption<Long> SPECULATIVE_EXECUTION_LOG_INTERVAL_IN_MILLIS =
key("blink.blacklist.enable"flink.batch.speculative.log.interval.in.millis")
.longType()
.booleanType(defaultValue(1000 * 60 * 5L)
.defaultValue(false)
withDescription("Interval in millis for speculative related log.");

public static final ConfigOption<Boolean> BLINK_BLACKLIST_ENABLE =
.withDescription("Whether to enable key("flink.blacklist job.enable");
/**
* Key and default value (five minutes) for 'blink.blacklist.timeout' which indicates
* how long a black list record will be removed after added to black list.
*/
@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
.booleanType()
.defaultValue(false)
.withDescription("Whether to enable blacklist job.");

public static final ConfigOption<Long> BLINK_BLACKLIST_TIMEOUT_IN_MILLIS =
key("blinkflink.blacklist.timeout.in.millis")
.longType()
.defaultValue(60 * 1000L)
.withDescription("indicates how long a black list record will be removed after added to black list.");;


Compatibility, Deprecation, and Migration Plan

...