Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
linenumberstrue
// classMethod: StateAssignmentOperationStateAssignmentOperation#assignStates

// find the states of all operators belonging to this task and compute additional
// information in first pass
for (ExecutionJobVertex executionJobVertex : tasks) {
    List<OperatorIDPair> operatorIDPairs = executionJobVertex.getOperatorIDs();
    Map<OperatorID, OperatorState> operatorStates = new HashMap<>(operatorIDPairs.size());
    for (OperatorIDPair operatorIDPair : operatorIDPairs) {
        OperatorID operatorID =
                operatorIDPair
                        .getUserDefinedOperatorID()
                        .filter(localOperators::containsKey)
                        .orElse(operatorIDPair.getGeneratedOperatorID());

        OperatorState operatorState = localOperators.remove(operatorID);
        if (operatorState == null) {
            operatorState =
                    new OperatorState(
                            operatorID,
                            executionJobVertex.getParallelism(),
                            executionJobVertex.getMaxParallelism());
        }
        operatorStates.put(operatorIDPair.getGeneratedOperatorID(), operatorState);
    }
    ...
}

...