Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The situation of the second part is equivalent to the source operators. However, the non-source operators rarely have the similar logic as legacy source. Instead, the result related to ΔF is usually sent before finished and does not need to resent after failover. For example, the operators doing aggregation and sending the final result on endOfInput only need sending the remaining aggregation value after failover. Even if there are operators that does not satisfy the condition, the operators could push the states could not be discarded to OperatorCoordinator instead, like the new Source API does. 

Special State Types

However, except for the normal operator list state and keyed state, Flink also supports other types of state, namely broadcast state and union list state. 

Operator state

There are a few scenarios in which operator states are used in combination with an additional implicit contract on data distribution. Those implicit contracts might not hold in case we restore state with partially finished operators. Therefore we would like to discuss how we want to use the operator state in combination with finished tasks.

BroadcastState

Logically the broadcast state Logically the broadcast state could work well even if part of the tasks are finished for one vertex since the state of each all the tasks are the sameequal, thus we could restore the operator subtasks' states from any previous subtask if the operator is not fully finished. 

UnionListState

The UnioinListState would be is more complex. Unlike the normal operator list state, user might have special usage with union list state. For example, users may only have one subtask to store the state, and initialize the subtask with the shared value after restart to implement a more flexible "broadcast" state. Besides, users may have complex initialization logic based on the whole state, like minus the state from a global set of values, which is impossible for normal operator list state. A common usage pattern is to implement a "broadcast" state by storing state on a single subtask.  Afterwards, when initializing subtasks the single copy of the state would be distributed to all subtasks. Another common pattern is to use a UnionListState to create a global view. It is used for example to share information about offsets of partitions which has been consumed so far. This lets us restart from the given offset if after restore a partition is assigned to a different subtask than originally. Such a logic can only be implemented with a merged state of all original subtasks If a part of subtasks are finished and we only keep the left remaining state in the checkpoint. This can obviously loose important bit of information, the subtask would have different initialized state after restartedor even the entire state in case of described implementation of broadcast state. However, since the UnionListState is on the way to be deprecated and replaced by the OperatorCoordinator, we could first have a temporary fix by asking the tasks of operators using UnionListState to wait for each other to make sure the snapshot of the operator must be complete.  TODO ListState (operator state)We would decline checkpoints if some of all the tasks reported they have called the finish()  method, but some have not received notifyCheckpointComplete yet.

Based on the above discussion discarding the final states of the finish tasks would only change behavior for a very little fraction of the possible existing jobs whose non-source operators have special logic on initialization or endOfInput, and these jobs could also be modified to keep the current behaviors.  Therefore, we lean towards option 3, which simplify the implementation and leave more room for optimization in the future. 

...