Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Manage input and output of ExecutionVertex

Manage InputSplit

前提,当一个ExecutionJobVertex中,绝大多数ExecutionVertex运行完以后,剩下的每个ExecutionVertex要处理的inputSplit是确定的。

然后就要想办法,让预测执行的execution所处理的inputSplit跟原execution要一样,这样才能保证包含source operator的region能支持预测执行。

这里要画一张图。


Manage middle ResultPartition 

Manage sink files

...


这里要画一张图。把failover的情况加进去,然后I think that speculative execution means that two executions in a ExecutionVertex running at a same time, and failover means that two executions running at two different time. Based on this, I think this feature(speculative execution) is theoretically achievable. So, I have implemented a speculative execution for batch job based on Blink, and it had a significant effect in our product cluster.

Manage sink files

这个就是对于batch的sink file,到每个文件后面加全局唯一的后缀,然后最后在作业全部结束的时候,在主节点对于各种情况,去rename或者delete.


Metrics

We can use six metrics to measure and evaluate the efficiency of speculative execution that can be summarized in the job status and Web page.


// SpeculationRatio is defined as the proportion of speculative tasks in the total tasks of a Job, and SuccessfulSpeculationRatio is the percent of speculative task which finish before the original long tail ones in the total launched speculative tasks. 


(1)
// 必须要多少execution完成了,才开始预测执行的metrics
private AtomicInteger minFinishedForSpeculationThresholdMetrics;

(2)
// 已经完成的execution数的metrics
private AtomicInteger finishedExecutionCountMetrics;

(3)
// 有多少execution发生了预测执行的metrics
private Counter speculativeExecutionCountMetrics;

(4)
// execution运行时间维度,发生预测执行的阈值
private AtomicDouble speculativeThresholdOfTime;

(5)
// 每个ExecutionVertex中原execution的运行时间的metrics
private Map<String, AtomicDouble> executionIndex2RunningTimespan;

(6)

...