...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Background
Flink restores opeartor state from snapshots based on matching the operatorIDs. Since Flink 1.2, StreamGraphHasherV2 is used for operator ID generation when no user-set uid exists. The generated Operator ID is deterministic with respect to:
- node-local properties (the traverse order ID in the BFS for the stream graph)
- chained output nodes
- input nodes hashes
It is unclear at this point on why chained output nodes are involved in the algorithm, but the following histroy backgound might be related: prior to Flink 1.3, Flink runtime takes the snapshots by the operator ID of the first vertex in a chain, so it somewhat makes sense to include chained output nodes into the algorithm as chain-breaking/building is expected to break state-compatibility anyway.
Problem
Although Flink 1.3 brings us operator-level state recovery within a chain, the chaining behavior will still affect state compatibility, as the generation of the Operator ID is dependent on its chained output nodes. For example, a simple source->sink DAG with source and sink chained together is state incompatible with an otherwise identical DAG with source and sink unchained (either because the parallelisms of the two ops are changed to be unequal or chaining is disabled). This greatly limits the flexibility to perform chain-breaking/building for performance tuning.
Therefore, we propose to introduce StreamGraphHasherV3 that is agnostic of the chaining behavior of operators, so that users are free to tune the parallelism of individual operators without worrying about state
...
incompatibility.
Public Interfaces
- Introduce a new job-level option
execution.stream-graph-hasher.version
to control the version of the Stream Graph Hasher to use, it can take the value of eitherv2
orv3
, withv2
being the default value. - Introduce the corrsponding getter/setter methods in ExecutionConfig to get/set the Stream Graph Hasher version.
...
Compatibility, Deprecation, and Migration Plan
The change will break cross-version state compatibility, the migration plan is as follows:
- (v1.19) Introduce HasherV3 as an optional choice in v1.19 while keeping HasherV2 as the default choice.
- (v2.0) Make HasherV3 the default hasher, mark HasherV2 deprecated.
- (v2.2) Remove HasherV2.
...