Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Although Flink 1.3 brings us operator-level state recovery within a chain, the chaining behavior will still affect state compatibility, as the generation of the Operator ID is dependent on its chained output nodes. For example, a simple source->sink DAG with source and sink chained together is state incompatible with an otherwise identical DAG with source and sink unchained (either because the parallelisms of the two ops are changed to be unequal or chaining is disabled). This greatly limits the flexibility to perform chain-breaking/building for performance tuning, especially for SQL jobs where user-set UIDs are currently not supported.

Proposal

Therefore, we propose introducing StreamGraphHasherV3, which is agnostic of the chaining behavior of operators, so that users are free to tune the parallelism of individual operators without worrying about state incompatibilityTo avoid breaking backwards state-compatibility, we'll extend the current alternative operator ID mechanism for state assignment (which is actually a revival of what we do when introducing StreamGraphHasherV2, see FLINK-5290).

Public Interfaces

  1. Introduce a new job-level option execution.stream-graph-hasher.version to control the version of the Stream Graph Hasher to use. It can take the value of either v2 or v3, with v2 being the default value.
  2. Introduce the corresponding getter/setter methods in ExecutionConfig to get/set the Stream Graph Hasher version.

Proposed Changes

StreamGraphHasherV3

Review of the current algorithm

...

From the sketch of the current algorithm above, we can see that the dependency on chaining output nodes only resides in the generation of the node local hash values (in bolded letters), and it suffices to just remove the relevant codes (L227-235 ofStreamGraphHasherV2) without changing anything else. The generated hash value for each node is still deterministic as we do not change the traverse order of the graph, and the generated hash value is also still statistically unique as we still provide different input values to the hash function. 

Compatibility, Deprecation, and Migration Plan

Alternative OperatorIDs for State Assignment

A bit of history backgound
Flink introduces StreamGraphHasherV2 in Flink 1.2 to replace the old hasher. To allow smooth migration between Flink 1.1 and Flink 1.2, Flink supports setting a list of alternative hashes for each operator. Flink automatically adds the hashes generated by the legacy hasher to the alternative hash list via FLINK-5290 for backward state compatibility (removed later) and further allows users to provide alternative hashes for operators in a StreamGraph via FLINK-5480 (still supported now).
Review of the current implementaion
In StreamingJobGraphGenerator, there is a list named legacyStreamGraphHashers, which contains a single hasher StreamGraphUserHashHasher (used for user-provided hashes) so far. For each operator, one OperatorIDPair per legacy hasher will be generated, where an OperatorIDPairconsists of the generated Operator ID (by the default hasher) and the optional alternative ID (by a legacy hasher). During state assignment, Flink will try to match the non-empty alternative ID first, and fallback to the generated ID otherwise.
Problem: The mechanism works fine when there's a single legacy hasher, but breaks when there are multiple (which used to work back in Flink 1.2).
Consider the following example:

  • SP made with HasherV3

  • New job with HasherV3 enabled, HasherV2 and UserHashHasher in the legacy hasher lists

Two OperatorIDPairs will be generated for each operator. When attempting to assign state to the first OperatorIDPair, the alternative ID cannot be matched, and we fallback to the generated ID, remove it from the localOperators list (the first bolded line), and put it in the map for later task state assignment (the second bolded line). When attempting to assign state to the second OperatorIDPair, the alternative ID still cannot be matched, and we fallback to the generated ID again. This time, we will get a null-valued operator state (as the actual value is removed before), and thus create an empty operator state and overwrite the map for later task state assignment. As a result, the state is lost.

Code Block
languagejava
linenumberstrue
// class: StateAssignmentOperation

// find the states of all operators belonging to this task and compute additional
// information in first pass
for (ExecutionJobVertex executionJobVertex : tasks) {
    List<OperatorIDPair> operatorIDPairs = executionJobVertex.getOperatorIDs();
    Map<OperatorID, OperatorState> operatorStates = new HashMap<>(operatorIDPairs.size());
    for (OperatorIDPair operatorIDPair : operatorIDPairs) {
        OperatorID operatorID =
                operatorIDPair
                        .getUserDefinedOperatorID()
                        .filter(localOperators::containsKey)
                        .orElse(operatorIDPair.getGeneratedOperatorID());

        OperatorState operatorState = localOperators.remove(operatorID);
        if (operatorState == null) {
            operatorState =
                    new OperatorState(
                            operatorID,
                            executionJobVertex.getParallelism(),
                            executionJobVertex.getMaxParallelism());
        }
        operatorStates.put(operatorIDPair.getGeneratedOperatorID(), operatorState);
    }
    ...
}

Proposed changes

  1. Extend OperatorIDPair to consist of a generated Operator ID and a list of alternative IDs. During state assignment, try matching with alternative IDs in order, fallback to the generated one otherwise.

  2. Add StreamGraphHasherV2 to the legacy hasher list when StreamGraphHasherV3 is enabled, ordered after StreamGraphUserHashHasher. We do not consider adding StreamGraphHasherV3 to the legacy hasher list when StreamGraphHasherV2 is enabled as StreamGraphHasherV3 will be the only choice in the future versions and users shall not go back toStreamGraphHasherV2.

Compatibility, Deprecation, and Migration Plan

The change will break cross-version state compatibility. The migration plan is as follows:

  1. (v1.

...

  1. 20) Introduce HasherV3 as

...

  1. the default choice while keeping HasherV2 as

...

  1. an option.

  2. (v2.

...

  1. 1) Remove HasherV2.

Test Plan

The changes will be covered by UTs.

...