Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For example, as shown below.

Image Modified

(a) In a ExecutionVertex, after execution_1 has consumed inputSplit_0, it go on to consume inputSplit_1.

...

I think that speculative execution means that multiple executions in a ExecutionVertex running at the same time, and failover means that two executions running at two different time. Down-stream tasks will consume resultPartition 



第一个要改的地方:Multiple executions of a upstream ExecutionVertex will produce Multiple ResultPartitions. When all upstream ExecutionVertexs run finished, the inputChannel of down stream executions will be updated to consume the fastest finished execution of upstream. So add a member-variable named fastestFinishedExecution in ExecutionVertex used for create PartitionInfo which used for update down stream executions' inputChannels. 

...

这里要画一张图。把failover的情况加进去,特别是下游带着上游重启的情况。


Image Added


Manage sink files

这个就是对于batch的sink file,到每个文件后面加全局唯一的后缀,然后最后在作业全部结束的时候,在主节点对于各种情况,去rename或者delete.

...

How does the speculative execution play together with other sinks? Does it only work for the file based sinks?

The speculative execution could also support sink to Key-value databases, such as Hologres, HBase etc.

...

Code Block
languagejava
titleDefaultExecutionGraphJobManagerOptions
public static final ConfigOption<Boolean> SPECULATIVE_EXECUTION_ENABLED =
        key("flink.batch.speculative.enabled")
                .booleanType()
                .defaultValue(false)
                .withDescription("Whether to enable speculation of batch job.");

public static final ConfigOption<Long> SPECULATIVE_EXECUTION_INTERVAL_IN_MILLIS =
        key("flink.batch.speculative.interval.in.millis")
                .longType()
                .defaultValue(100L)
                .withDescription("How often to check for speculative tasks.");

public static final ConfigOption<Double> SPECULATIVE_EXECUTION_MULTIPLIER =
        key("flink.batch.speculative.multiplier")
                .doubleType()
                .defaultValue(1.5)
                .withDescription("When the running time of a unfinished executionVertex is several times of" +
                        " the median of all completed executionVertexs, it will be speculated.");

public static final ConfigOption<Double> SPECULATIVE_EXECUTION_QUANTILE =
        key("flink.batch.speculative.quantile")
                .doubleType()
                .defaultValue(0.75)
                .withDescription("When the percentage of ExecutionVertex in an executionJobVertex are completed," +
                        " the speculative execution mechanism will be started. 0.9 means that 90% of the" +
                        " executionVertex are completed, and the speculative execution mechanism will be started.");

public static final ConfigOption<Long> SPECULATIVE_EXECUTION_LOG_INTERVAL_IN_MILLIS =
        key("flink.batch.speculative.log.interval.in.millis")
                .longType()
                .defaultValue(1000 * 60 * 5L)
                .withDescription("Interval in millis for speculative related log.");

public static final ConfigOption<Boolean> BLINK_BLACKLIST_ENABLE =
        key("flink.blacklist.enable")
                .booleanType()
                .defaultValue(false)
                .withDescription("Whether to enable blacklist job.");

public static final ConfigOption<Long> BLINK_BLACKLIST_TIMEOUT_IN_MILLIS =
        key("flink.blacklist.timeout.in.millis")
                .longType()
                .defaultValue(60 * 1000L)
                .withDescription("indicates how long a black list record will be removed after added to black list.");

...