Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

There are a few scenarios in which operator states are used in combination with an additional implicit contract on data distribution. Those implicit contracts might not hold in case we restore state with partially finished operators. Therefore we would like to discuss how we want to use the operator state in combination with finished tasks.

BroadcastState

Logically the broadcast state could work well even if part of the tasks are finished for one vertex since the state of all the tasks are equal, thus we could restore the operator subtasks' states from any previous subtask if the operator is not fully finished. In case of broadcast state all operators snapshot their state. The assumption is that every such state is equal and can be used when restoring for any given subtask. We want to leverage that property and when restoring with some subtasks finished, we would use any of the non-empty state (taken for any running subtask).

UnionListState

The UnioinListState is more complex. A common usage pattern is to implement a "broadcast" state by storing state on a single subtask.  Afterwards, when initializing subtasks the single copy of the state would be distributed to all subtasks. Another common pattern is to use a UnionListState to create a global view. It is used for example to share information about offsets of partitions which has been consumed so far. This lets us restart from the given offset if after restore a partition is assigned to a different subtask than originally. Such a logic can only be implemented with a merged state of all original subtasks If a part of subtasks are finished and we only keep the remaining state in the checkpoint. This can obviously loose important bit of information, or even the entire state in case of described implementation of broadcast state. However, since the UnionListState is on the way to be deprecated and replaced by the OperatorCoordinator, we could first have a temporary fix by asking the tasks of operators using UnionListState to wait for each other to make sure the snapshot of the operator must be complete. We would decline checkpoints if some of all the tasks reported they have called the finish()  method, but some have not received notifyCheckpointComplete yet.

...