Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Background

Flink restores opeartor state from snapshots based on matching the operatorIDs. Since Flink 1.2, StreamGraphHasherV2 is used for operator ID generation when no user-set uid exists. The generated Operator ID is deterministic with respect to:

  • node-local properties (the traverse order ID in the BFS for the stream graph)
  • chained output nodes
  • input nodes hashes

It is unclear at this point on why chained output nodes are involved in the algorithm, but the following histroy backgound might be related: prior to Flink 1.3, Flink runtime takes the snapshots by the operator ID of the first vertex in a chain, so it somewhat makes sense to include chained output nodes into the algorithm as chain-breaking/building is expected to break state-compatibility anyway.

Problem

Although Flink 1.3 brings us operator-level state recovery within a chain, the chaining behavior will still affect state compatibility, as the generation of the Operator ID is dependent on its chained output nodes. For example, a simple source->sink DAG with source and sink chained together is state incompatible with an otherwise identical DAG with source and sink unchained (either because the parallelisms of the two ops are changed to be unequal or chaining is disabled). This greatly limits the flexibility to perform chain-breaking/building for performance tuning.

Therefore, we propose to introduce StreamGraphHasherV3 that is agnostic of the chaining behavior of operators, so that users are free to tune the parallelism of individual operators without worrying about state

...

incompatibility.

Public Interfaces

  1. Introduce a new job-level option execution.stream-graph-hasher.version to control the version of the Stream Graph Hasher to use, it can take the value of either v2 or v3, with v2 being the default value.
  2. Introduce the corrsponding getter/setter methods in ExecutionConfig to get/set the Stream Graph Hasher version.

...

Compatibility, Deprecation, and Migration Plan

The change will break cross-version state compatibility, the migration plan is as follows:

  1. (v1.19) Introduce HasherV3 as an optional choice in v1.19 while keeping HasherV2 as the default choice.
  2. (v2.0) Make HasherV3 the default hasher, mark HasherV2 deprecated.
  3. (v2.2) Remove HasherV2.

...