Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The preliminary experiments in Alibaba's product cluster have demonstrated the effectiveness of this strategy.

Limitations

(1)JobType must be batch job.

(2)Cluster ResourceManager must be Yarn or K8s.

          If cluster ResourceManager is Yarn, NodeManagers' attributes should include machine IP attribute.

          If cluster ResourceManager is K8s, IP label should be attached to Nodes.

(3)If users don't allow to sink duplicate data to non-key-value databases. SpeculativeExecution can't be used.

(4)SpeculativeExecution is effective only when the JobVertex with all input and output edges are blocking. So, there are only one ExecutionVertex in a region.

Image Added

Proposed Changes

General design

...

Just like normal tasks, the speculative task is executed in parallel with the original one and shares the same failover and restart strategy. The original long tail tasks and speculative tasks can still retry with failure on their own tracks. But I think it should not be restarted globally when the counts of speculation execution failover reach the max-retry-counts. When a task fails, we could calculate its index(executionIndex) in the executionList by executionAttemptIDExecutionAttemptID. Then the scheduler takes a series of processing for the corresponding execution according to the executionIndex as shown below.

...

(2)finishedExecutionCountMetrics is defined as the number of finished ExecutionVertexs.

(3)speculativeExecutionCountMetrics is defined as the number of speculative executions that are scheduled by scheduler.

(4)speculativeExecutionFastestFinishedCountMetrics is defined as the number of ExecutionVertex's speculative execution that reach FINISHED state faster than the original execution.

(5)speculativeThresholdOfTime is defined as the threshold time of speculative execution.

(6)executionIndex2RunningTimespan is defined as the running time of the original execution in each ExecutionVertex.

Image Removed

Web UI

If we don't modify the code of web UI, when the speculative execution runs faster than the original execution, the web UI will show that this task has been cancelled. But the result of the batch job is correct.

More discussion is needed to decide whether the web UI needs to be modified.

      

Image Removed

Image Removed

Limitations

(1)JobType must be batch job.

(2)Cluster ResourceManager must be Yarn or K8s.

          If cluster ResourceManager is Yarn, NodeManagers' attributes should include machine IP attribute.

          If cluster ResourceManager is K8s, IP label should be attached to Nodes.

(3)If users don't allow to sink duplicate data to non-key-value databases. SpeculativeExecution can't be used.

(4)SpeculativeExecution is effective only when the JobVertex with all input and output edges are blocking. 

ExecutionVertexs.

(3)speculativeExecutionCountMetrics is defined as the number of speculative executions that are scheduled by scheduler.

(4)speculativeExecutionFastestFinishedCountMetrics is defined as the number of ExecutionVertex's speculative execution that reach FINISHED state faster than the original execution.

(5)speculativeThresholdOfTime is defined as the threshold time of speculative execution.

(6)executionIndex2RunningTimespan is defined as the running time of the original execution in each ExecutionVertex.

Image Added

Web UI

If we don't modify the code of web UI, when the speculative execution runs faster than the original execution, the web UI will show that this task has been cancelled. But the result of the batch job is correct.

More discussion is needed to decide whether the web UI needs to be modified.

Image AddedImage Removed

Configuration

...