Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

...

thread/ot352tp8t7mclzx9zfv704gcm0fwrq58
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-

...

28131

Release1.16

...


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

To solve this kind of problem, we propose a speculative execution mechanism to Flink. The basic idea is to start mirror tasks on other nodes when a slow task is detected. The mirror task processes the same input data and produces the same data as the original task. The original task and mirror tasks are treated equally. Once any of them finishes, its output will be admitted and the remaining homogeneous tasks will be canceled.

Public Interfaces

Configuration

Speculative executions will work along with adaptive batch scheduling (FLIP-187). New configuration options will be added for the scheduler:

...

  • jobmanager.adaptive-batch-scheduler.speculative.sourcemax-concurrent-enabledexecutions, default to "false2". It controls whether to enable speculative execution of sources. This is needed because some sources are not supported for speculative execution. how many executions (including the original one and speculative ones) of an ExecutionVertex can execute at the same time.
  • jobmanager.adaptive-batch-scheduler.speculative.sink-enabled, default to "false". It controls whether to enable speculative execution of sinks. This is needed because some sinks are not supported for speculative execution. jobmanager.adaptive-batch-scheduler.speculative.max-concurrent-executionsblock-slow-node-duration, default to "21 min". It controls how many executions (including the original one and speculative ones) of an ExecutionVertex can execute at the same timelong an identified slow node should be blocked for.


New configuration options will be added in SlowTaskDetectorOptions for slow task detection:

  • slow-task-detector.check-interval, default to "1 s". It defines the interval to check slow tasks.
  • slow-task-detector.execution-time.baseline-lower-bound, default to "1 min". It defines the lower bound of the slow task detection baseline.
  • slow-task-detector.execution-time.baseline-ratio, default to "0.75". It defines the finished execution ratio threshold to compute the slow task detection baseline.
  • slow-task-detector.execution-time.baseline-multiplier, default to "1.5". It defines the multiplier to compute the slow tasks detection baseline.

Metrics

We propose to add following metrics to expose job problems and show the effectiveness of speculative execution:

  1. numSlowExecutionVertices: Number of slow execution vertices at the moment.
  2. numEffectiveSpeculativeExecutions: Number of speculative executions which finish before their corresponding original executions finish.

Proposed Changes

When speculative execution is enabled, a SpeculativeScheduler(which extends AdaptiveBatchScheduler) will be used for task scheduling. SpeculativeScheduler will listen on slow tasks detected by SlowTaskDetector. It will create and deploy speculative executions for the slow tasks. Nodes that slow tasks located on will be treated as slow nodes and get blacklistedblocked, so that speculative executions will not be deployed on them. Once any execution finishes, the remaining homogeneous tasks will be canceled, so that only one execution will be admitted as finished and only its output will be visible to downstream consumer tasks or in external sink services.

Image Modified

SlowTaskDetector

...

  • SpeculativeScheduler needs to be able to directly deploy an Execution, while AdaptiveBatchScheduler can only perform ExecutionVertex level deployment.
  • SpeculativeScheduler does not restart the ExecutionVertex if an execution fails when any other current execution is still making progress
  • SpeculativeScheduler listens on slow tasks. Once there are slow tasks, it will blacklist the block the slow nodes and deploy speculative executions of the slow tasks on other nodes.
  • Once any execution finishes, SpeculativeScheduler will cancel all the remaining executions of the same execution vertex.

Scheduler directly deploys executions

...

Once notified about slow tasks, the SpeculativeScheduler will handle them as below:

  1. Blacklist Block nodes that the slow tasks locate on. To achieve this, the scheduler will notify locations of slow tasks with a SlowTaskException add the slow nodes to the BlacklistHandlerblocklist. The BlacklistHandler will use a SlowTaskBlacklistStrategy (see section below) to make decisions.block action will be MARK_BLOCKED so that future tasks will not be deployed to the slow node, while deployed tasks can keep running. (See FLIP-224 Blocklist Mechanism for more details)
  2. Create speculative executions for slow tasks until the current executions of each execution vertex reach the concurrency limit (defined via config jobmanager.adaptive-batch-scheduler.speculative.max-concurrent-executions)
  3. numSlowExecutionVertices: Number of slow execution vertices at the moment.
  4. Deploy the newly created speculative executions
SlowTaskBlacklistStrategy

A BlacklistStrategy will be used in the BlacklistHandler to decide task managers/nodes to blacklist and actions to perform. (See FLIP-224 Blacklist Mechanism for more details).

A SlowTaskBlacklistStrategy will be introduced for speculative execution. The strategy will blacklist the nodes if it is a SlowTaskException. The blacklist action will be MARK_BLACKLISTED so that future tasks will not be deployed to the node, while deployed tasks can keep running.

Metrics

We propose to add following metrics to expose job problems and show the effectiveness of speculative execution:

  1. numEffectiveSpeculativeExecutions: Number of speculative executions which finish before their corresponding original executions finish.

Limitations

Batch jobs only

...

Currently, AdaptiveBatchScheduler does not support jobs with PIPELINED data exchanges. As a result, speculative execution does not support PIPELINED data exchanges either. Requiring all data exchanges to be BLOCKING also simplifies things, because each ExecutionVertex is an individual pipelined region in this case and can have individual speculations. Otherwise multiple ExecutionVertex from one pipelined region need to do speculative execution together.

This also means that 

Speculative execution of sources and sinks are disabled by default

Speculative execution of sources and sinks are disabled by default, this is because not all sources and sinks can work with speculative execution. 

There will be several follow up proposals to enable speculative execution of sources and sinks:

  • FLIP-XXX(coming soon) is a proposal to enable speculative execution to work with most sources, without changing the source connectors
  • FLIP-XXX(coming soon) is a proposal to improve Flink interfaces to support speculative execution of sinks. Sinks must implement these interfaces to work with speculative execution
  • FLIP-XXX(coming soon) is a proposal to improve FileSink to work with speculative execution
  • FLIP-XXX(coming soon) is a proposal to improve HiveTableSink to work with speculative execution

With these proposed improvements, the scheduler will be able to see whether the given source or sink support speculative execution. It will enable speculative execution for a JobVertex only if all the contained sources and sinks support speculative execution.

No integration with Flink's web UI

The web UI does not show all the concurrent executions of each ExecutionVertex/subtask. It only shows the one with the fastest progress.

User defined functions must not be affected by its speculative instances

When a user defined function and its speculative instances run concurrently, they must not affect each other. For example,

  • access to the same exclusive resources
  • overriding the output to external services which happens as a side effect, i.e. not via Flink sinks
  • competition for data ingestion. Note that it includes cases that
    • user defined source function competition
    • data ingestion happens as a side effect, i.e. not via Flink sources.
  • ...

Once the concurrent instances can affect each other, it may result in task failures, or even worse, data inconsistency. So that speculative executions should not be enabled in this case.

Compatibility, Deprecation, and Migration Plan

...