Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhttps://lists.apache.org/thread/5w0cwl1hk2lhqdnl7zn068fxzxknxojx
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Background

Flink restores operator state from snapshots based on matching the operatorIDs. Since Flink 1.2, StreamGraphHasherV2 is used for operator ID generation when no user-set uid exists. The generated Operator ID is deterministic with respect to:

  • node-local properties (the traverse order ID in the BFS for the stream graph)
  • chained output nodes
  • input nodes hashes

It is unclear at this point on why chained output nodes are involved in the algorithm, but the following history background might be related: prior to Flink 1.3, Flink runtime takes snapshots by the operator ID of the first vertex in a chain, so it somewhat makes sense to include chained output nodes into the algorithm as chain-breaking/building is expected to break state-compatibility anyway.

Problem

Although Flink 1.3 brings us operator-level state recovery within a chain, the chaining behavior will still affect state compatibility, as the generation of the Operator ID is dependent on its chained output nodes. For example, a simple source->sink DAG with source and sink chained together is state incompatible with an otherwise identical DAG with source and sink unchained (either because the parallelisms of the two ops are changed to be unequal or chaining is disabled). This greatly limits the flexibility to perform chain-breaking/building for performance tuning, especially for SQL jobs where user-set UIDs are currently not supported.

Proposal

Therefore, we propose introducing StreamGraphHasherV3, which is agnostic of the chaining behavior of operators, so that users are free to tune the parallelism of individual operators without worrying about state incompatibilityTo avoid breaking backwards state-compatibility, we'll extend the current alternative operator ID mechanism for state assignment (which is actually a revival of what we do when introducing StreamGraphHasherV2, see FLINK-5290).

Public Interfaces

  1. Introduce a new job-level option execution.stream-graph-hasher.version to control the version of the Stream Graph Hasher to use. It can take the value of either v2 or v3, with v2 being the default value.
  2. Introduce the corresponding getter/setter methods in ExecutionConfig to get/set the Stream Graph Hasher version.

Proposed Changes

StreamGraphHasherV3

Review of the current algorithm

  1. Sort sources by node ID, which keeps deterministic traversal order of the sources across job submissions
  2. Traverse the stream graph in the breadth-first manner, for each of the visited nodes:
    1. If the hashes of all upstream nodes have been generated
      1. Generate a node local hash using the murmur3_128 hash function: input = traverse order ID of the node, for each of its chaining output nodes, put the same traverse order ID again into the input (see HashingExplained · google/guava Wiki (github.com) for details on the Hashing algorithm API)
      2. For each of the upstream nodes, compute hash = hash * 37 XOR hash[upstream node] to incorporate upstream topology info into the node hash
    2. Else push back to the queue for visiting at a later time

Proposed changes

From the sketch of the current algorithm above, we can see that the dependency on chaining output nodes only resides in the generation of the node local hash values (in bolded letters), and it suffices to just remove the relevant codes (L227-235 of StreamGraphHasherV2) without changing anything else. The generated hash value for each node is still deterministic as we do not change the traverse order of the graph, and the generated hash value is also still statistically unique as we still provide different input values to the hash function. 

Alternative Operator IDs for State Assignment

A bit of history backgound
Flink introduces StreamGraphHasherV2 in Flink 1.2 to replace the old hasher. To allow smooth migration between Flink 1.1 and Flink 1.2, Flink supports setting a list of alternative hashes for each operator. Flink automatically adds the hashes generated by the legacy hasher to the alternative hash list via FLINK-5290 for backward state compatibility (removed later) and further allows users to provide alternative hashes for operators in a StreamGraph via FLINK-5480 (still supported now).
Review of the current implementaion
In StreamingJobGraphGenerator, there is a list named legacyStreamGraphHashers, which contains a single hasher StreamGraphUserHashHasher (used for user-provided hashes) so far. For each operator, one OperatorIDPair per legacy hasher will be generated, where an OperatorIDPairconsists of the generated Operator ID (by the default hasher) and the optional alternative ID (by a legacy hasher). During state assignment, Flink will try to match the non-empty alternative ID first, and fallback to the generated ID otherwise.
Problem: The mechanism works fine when there's a single legacy hasher, but breaks when there are multiple (which used to work back in Flink 1.2).
Consider the following example:

  • SP made with HasherV3

  • New job with HasherV3 enabled, HasherV2 and UserHashHasher in the legacy hasher lists

Two OperatorIDPairs will be generated for each operator. When attempting to assign state to the first OperatorIDPair, the alternative ID cannot be matched, and we fallback to the generated ID, remove it from the localOperators list (L15 below), and put it in the map for later task state assignment (L23 below). When attempting to assign state to the second OperatorIDPair, the alternative ID still cannot be matched, and we fallback to the generated ID again. This time, we will get a null-valued operator state (as the actual value is removed before), and thus create an empty operator state and overwrite the map for later task state assignment. As a result, the state is lost.

// Method: StateAssignmentOperation#assignStates

// find the states of all operators belonging to this task and compute additional
// information in first pass
for (ExecutionJobVertex executionJobVertex : tasks) {
    List<OperatorIDPair> operatorIDPairs = executionJobVertex.getOperatorIDs();
    Map<OperatorID, OperatorState> operatorStates = new HashMap<>(operatorIDPairs.size());
    for (OperatorIDPair operatorIDPair : operatorIDPairs) {
        OperatorID operatorID =
                operatorIDPair
                        .getUserDefinedOperatorID()
                        .filter(localOperators::containsKey)
                        .orElse(operatorIDPair.getGeneratedOperatorID());

        OperatorState operatorState = localOperators.remove(operatorID);
        if (operatorState == null) {
            operatorState =
                    new OperatorState(
                            operatorID,
                            executionJobVertex.getParallelism(),
                            executionJobVertex.getMaxParallelism());
        }
        operatorStates.put(operatorIDPair.getGeneratedOperatorID(), operatorState);
    }
    ...
}

Proposed changes

  1. Extend OperatorIDPair to consist of a generated Operator ID and a list of alternative IDs. During state assignment, try matching with alternative IDs in order, fallback to the generated one otherwise.

  2. Add StreamGraphHasherV2 to the legacy hasher list when StreamGraphHasherV3 is enabled, ordered after StreamGraphUserHashHasher. We do not consider adding StreamGraphHasherV3 to the legacy hasher list when StreamGraphHasherV2 is enabled as StreamGraphHasherV3 will be the only choice in the future versions and users shall not go back toStreamGraphHasherV2.

Compatibility, Deprecation, and Migration Plan

The migration plan is as follows:

  1. (v1.20) Introduce HasherV3 as the default choice while keeping HasherV2 as an option.

  2. (v2.1) Remove HasherV2.

Test Plan

The changes will be covered by UTs.

Rejected Alternatives

N/A