You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion thread
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Background

Flink restores opeartor state from snapshots based on matching the operatorIDs. Since Flink 1.2, StreamGraphHasherV2 is used for operator ID generation when no user-set uid exists. The generated Operator ID is deterministic with respect to:

  • node-local properties (the traverse order ID in the BFS for the stream graph)
  • chained output nodes
  • input nodes hashes

It is unclear at this point on why chained output nodes are involved in the algorithm, but the following histroy backgound might be related: prior to Flink 1.3, Flink runtime takes the snapshots by the operator ID of the first vertex in a chain, so it somewhat makes sense to include chained output nodes into the algorithm as chain-breaking/building is expected to break state-compatibility anyway.

Problem

Although Flink 1.3 brings us operator-level state recovery within a chain, the chaining behavior will still affect state compatibility, as the generation of the Operator ID is dependent on its chained output nodes. For example, a simple source->sink DAG with source and sink chained together is state incompatible with an otherwise identical DAG with source and sink unchained (either because the parallelisms of the two ops are changed to be unequal or chaining is disabled). This greatly limits the flexibility to perform chain-breaking/building for performance tuning.

Therefore, we propose to introduce StreamGraphHasherV3 that is agnostic of the chaining behavior of operators, so that users are free to tune the parallelism of individual operators without worrying about state incompatiblility.

Public Interfaces

  1. Introduce a new job-level option execution.stream-graph-hasher.version to control the version of the Stream Graph Hasher to use, it can take the value of either v2 or v3, with v2 being the default value.
  2. Introduce the corrsponding getter/setter methods in ExecutionConfig to get/set the Stream Graph Hasher version.

Proposed Changes

Review of the current algorithm

  1. Sort sources by node ID, which keeps deterministic traversal order of the sources across job submissions
  2. Traverse the stream graph in the breadth-first manner, for each of the visited node:
    1. If the hashes of all upstream nodes have been generated
      1. Generate a node local hash using the murmur3_128 hash function: input = traverse order ID of the node, for each of its chaining output nodes, put the same traverse order ID again into the input (see HashingExplained · google/guava Wiki (github.com) for details on the Hashing algorithm API)
      2. For each of the upstream nodes, compute hash = hash * 37 XOR hash[upstream node] to incorporate upstream topology info into the node hash
    2. Else push back to the queue for visiting at a later time

Proposed changes

From the schetch of the current algortihm above, we can see that the dependency on chaining output nodes only resides in the generation of the node local hash values (in bolded letters), and it suffices to just removes the relevant codes (L227-235 of StreamGraphHasherV2) without changing anything else. The generated hash value for each node is still deterministic as we do not change the traverse order of the graph, and the generated hash value is also still statistically unqiue as we still provide different input values to the hash function. 

Compatibility, Deprecation, and Migration Plan

The change will break cross-version state compatibility, the migration plan is as follows:

  1. (v1.19) Introduce HasherV3 as an optional choice in v1.19 while keeping HasherV2 as the default choice.
  2. (v2.0) Make HasherV3 the default hasher, mark HasherV2 deprecated.
  3. (v2.2) Remove HasherV2.

Test Plan

The changes will be covered by UTs.

Rejected Alternatives

N/A

  • No labels