Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Although Flink 1.3 brings us operator-level state recovery within a chain, the chaining behavior will still affect state compatibility, as the generation of the Operator ID is dependent on its chained output nodes. For example, a simple source->sink DAG with source and sink chained together is state incompatible with an otherwise identical DAG with source and sink unchained (either because the parallelisms of the two ops are changed to be unequal or chaining is disabled). This greatly limits the flexibility to perform chain-breaking/building for performance tuning, especially for SQL jobs where user-set UIDs are currently not supported.

Proposal

Therefore, we propose introducing StreamGraphHasherV3, which is agnostic of the chaining behavior of operators, so that users are free to tune the parallelism of individual operators without worrying about state incompatibility. To avoid breaking backwards state-compatibility, we'll extend the current alternative operator ID mechanism for state assignment (which is actually a revival of what we do when introducing StreamGraphHasherV2, see FLINK-5290).

Public Interfaces

  1. Introduce a new job-level option execution.stream-graph-hasher.version to control the version of the Stream Graph Hasher to use, it . It can take the value of either v2 or v3, with v2 being the default value.
  2. Introduce the corrsponding corresponding getter/setter methods in ExecutionConfig to get/set the Stream Graph Hasher version.

Proposed Changes

StreamGraphHasherV3

Review of the current algorithm

  1. Sort sources by node ID, which keeps deterministic traversal order of the sources across job submissions
  2. Traverse the stream graph in the breadth-first manner, for each of the visited nodenodes:
    1. If the hashes of all upstream nodes have been generated
      1. Generate a node local hash using the murmur3_128 hash function: input = traverse order ID of the node, for each of its chaining output nodes, put the same traverse order ID again into the input (see HashingExplained · google/guava Wiki (github.com) for details on the Hashing algorithm API)
      2. For each of the upstream nodes, compute hash = hash * 37 XOR hash[upstream node] to incorporate upstream topology info into the node hash
    2. Else push back to the queue for visiting at a later time

Proposed changes

From the schetch sketch of the current algortihm algorithm above, we can see that the dependency on chaining output nodes only resides in the generation of the node local hash values (in bolded letters), and it suffices to just removes remove the relevant codes (L227-235 ofStreamGraphHasherV2) without changing anything else. The generated hash value for each node is still deterministic as we do not change the traverse order of the graph, and the generated hash value is also still statistically unqiue unique as we still provide different input values to the hash function. 

Compatibility, Deprecation, and Migration Plan

Alternative OperatorIDs for State Assignment

A bit of history backgound
Flink introduces StreamGraphHasherV2 in Flink 1.2 to replace the old hasher. To allow smooth migration between Flink 1.1 and Flink 1.2, Flink supports setting a list of alternative hashes for each operator. Flink automatically adds the hashes generated by the legacy hasher to the alternative hash list via FLINK-5290 for backward state compatibility (removed later) and further allows users to provide alternative hashes for operators in a StreamGraph via FLINK-5480 (still supported now).
Review of the current implementaion
In StreamingJobGraphGenerator, there is a list named legacyStreamGraphHashers, which contains a single hasher StreamGraphUserHashHasher (used for user-provided hashes) so far. For each operator, one OperatorIDPair per legacy hasher will be generated, where an OperatorIDPairconsists of the generated Operator ID (by the default hasher) and the optional alternative ID (by a legacy hasher). During state assignment, Flink will try to match the non-empty alternative ID first, and fallback to the generated ID otherwise.
Problem: The mechanism works fine when there's a single legacy hasher, but breaks when there are multiple (which used to work back in Flink 1.2).
Consider the following example:

  • SP made with HasherV3

  • New job with HasherV3 enabled, HasherV2 and UserHashHasher in the legacy hasher lists

Two OperatorIDPairs will be generated for each operator. When attempting to assign state to the first OperatorIDPair, the alternative ID cannot be matched, and we fallback to the generated ID, remove it from the localOperators list (L15 below), and put it in the map for later task state assignment (L23 below). When attempting to assign state to the second OperatorIDPair, the alternative ID still cannot be matched, and we fallback to the generated ID again. This time, we will get a null-valued operator state (as the actual value is removed before), and thus create an empty operator state and overwrite the map for later task state assignment. As a result, the state is lost.

Code Block
languagejava
linenumberstrue
// Method: StateAssignmentOperation#assignStates

// find the states of all operators belonging to this task and compute additional
// information in first pass
for (ExecutionJobVertex executionJobVertex : tasks) {
    List<OperatorIDPair> operatorIDPairs = executionJobVertex.getOperatorIDs();
    Map<OperatorID, OperatorState> operatorStates = new HashMap<>(operatorIDPairs.size());
    for (OperatorIDPair operatorIDPair : operatorIDPairs) {
        OperatorID operatorID =
                operatorIDPair
                        .getUserDefinedOperatorID()
                        .filter(localOperators::containsKey)
                        .orElse(operatorIDPair.getGeneratedOperatorID());

        OperatorState operatorState = localOperators.remove(operatorID);
        if (operatorState == null) {
            operatorState =
                    new OperatorState(
                            operatorID,
                            executionJobVertex.getParallelism(),
                            executionJobVertex.getMaxParallelism());
        }
        operatorStates.put(operatorIDPair.getGeneratedOperatorID(), operatorState);
    }
    ...
}

Proposed changes

  1. Extend OperatorIDPair to consist of a generated Operator ID and a list of alternative IDs. During state assignment, try matching with alternative IDs in order, fallback to the generated one otherwise.

  2. Add StreamGraphHasherV2 to the legacy hasher list when StreamGraphHasherV3 is enabled, ordered after StreamGraphUserHashHasher. We do not consider adding StreamGraphHasherV3 to the legacy hasher list when StreamGraphHasherV2 is enabled as StreamGraphHasherV3 will be the only choice in the future versions and users shall not go back toStreamGraphHasherV2.

Compatibility, Deprecation, and Migration Plan

The The change will break cross-version state compatibility, the migration plan is as follows:

  1. (v1.

...

  1. 20) Introduce HasherV3 as

...

  1. the default choice while keeping HasherV2 as

...

  1. an option.

  2. (v2.

...

  1. 1) Remove HasherV2.

Test Plan

The changes will be covered by UTs.

...