...
Manage input and output of ExecutionVertex
Manage InputSplit
前提,当一个ExecutionJobVertex中,绝大多数ExecutionVertex运行完以后,剩下的每个ExecutionVertex要处理的inputSplit是确定的。
然后就要想办法,让预测执行的execution所处理的inputSplit跟原execution要一样,这样才能保证包含source operator的region能支持预测执行。
这里要画一张图。
Manage middle ResultPartition
Manage sink files
...
这里要画一张图。把failover的情况加进去,然后I think that speculative execution means that two executions in a ExecutionVertex running at a same time, and failover means that two executions running at two different time. Based on this, I think this feature(speculative execution) is theoretically achievable. So, I have implemented a speculative execution for batch job based on Blink, and it had a significant effect in our product cluster.
Manage sink files
这个就是对于batch的sink file,到每个文件后面加全局唯一的后缀,然后最后在作业全部结束的时候,在主节点对于各种情况,去rename或者delete.
Metrics
We can use six metrics to measure and evaluate the efficiency of speculative execution that can be summarized in the job status and Web page.
// SpeculationRatio is defined as the proportion of speculative tasks in the total tasks of a Job, and SuccessfulSpeculationRatio is the percent of speculative task which finish before the original long tail ones in the total launched speculative tasks.
(1)
// 必须要多少execution完成了,才开始预测执行的metrics
private AtomicInteger minFinishedForSpeculationThresholdMetrics;
(2)
// 已经完成的execution数的metrics
private AtomicInteger finishedExecutionCountMetrics;
(3)
// 有多少execution发生了预测执行的metrics
private Counter speculativeExecutionCountMetrics;
(4)
// execution运行时间维度,发生预测执行的阈值
private AtomicDouble speculativeThresholdOfTime;
(5)
// 每个ExecutionVertex中原execution的运行时间的metrics
private Map<String, AtomicDouble> executionIndex2RunningTimespan;
(6)
...