Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: persistence section

...

The first, state-based storage will not satisfy all requirements. Nevertheless, assuming that the in-flight data is rather small, we should still quickly come to the point where new output is produced.

Persistence

For persistence, existing mechanisms will be reused. The benefits are:

Including channel state into snapshot:

  1. Don’t duplicate re-distribution code and avoid inconsistencies between channel and operator state distribution (especially keyed)
  2. Simplicity: having state not split into channel and operator parts
  3. Being able to use existing snapshotting mechanism (below)

Reading/writing using the existing mechanisms (versus custom storage):

  1. Don’t duplicate provider-agnostic storage code
  2. Don’t duplicate configuration code
  3. Don’t duplicate local recovery code
  4. Ease deployment by having single storage for operator and channel state
  5. Avoid inconsistencies between operator and channel state (e.g. different retention times)
  6. Possibility to reuse incremental checkpoints; other alternatives avoid doubly-storing buffers differently, but probably at the cost of more complex cleanup

Disadvantages

  1. Less flexibility
  2. Risk to break snapshotting
  3. Probably more difficult to implement some parts
  4. Increased checkpoints size

Components

  1. Checkpoint Metadata
    1. Channel StateHandle extends StreamStateHandle, contains subtask index and information to place it to the correct subpartition/inputchannel
    2. TaskStateSnapshot, PendingCheckpoint, and CompletedCheckpoint - contains a collection of ChannelStateHandle
  2. Writer. Writes network buffers using a provided CheckpointStreamFactory and returns state handles
  3. Reader. Reads data from state handle and returns buffers (loads and deserializes)
  4. (de)Serializer
  5. StateAssignmentOperation (existing class) - reads snapshot metadata and distributes state handles across subtasks; need to add channel states distribution

TaskStateManagerImpl (existing class) - holds state assigned to task

Efficiency

The following optimizations won't necessarily be implemented in MVP:

  1. not writing the same buffer multiple times (when the job is backpressured); can be addressed by implementing incremental checkpointing for FS backend
  2. incremental loading and processing of state
  3. no additional memory: ideally, existing network buffers should be reused

Open questions

Avoiding double processing in downstream (if continuous spilling)

...