...
The first, state-based storage will not satisfy all requirements. Nevertheless, assuming that the in-flight data is rather small, we should still quickly come to the point where new output is produced.
Persistence
For persistence, existing mechanisms will be reused. The benefits are:
Including channel state into snapshot:
- Don’t duplicate re-distribution code and avoid inconsistencies between channel and operator state distribution (especially keyed)
- Simplicity: having state not split into channel and operator parts
- Being able to use existing snapshotting mechanism (below)
Reading/writing using the existing mechanisms (versus custom storage):
- Don’t duplicate provider-agnostic storage code
- Don’t duplicate configuration code
- Don’t duplicate local recovery code
- Ease deployment by having single storage for operator and channel state
- Avoid inconsistencies between operator and channel state (e.g. different retention times)
- Possibility to reuse incremental checkpoints; other alternatives avoid doubly-storing buffers differently, but probably at the cost of more complex cleanup
Disadvantages
- Less flexibility
- Risk to break snapshotting
- Probably more difficult to implement some parts
- Increased checkpoints size
Components
- Checkpoint Metadata
- Channel StateHandle extends StreamStateHandle, contains subtask index and information to place it to the correct subpartition/inputchannel
- TaskStateSnapshot, PendingCheckpoint, and CompletedCheckpoint - contains a collection of ChannelStateHandle
- Writer. Writes network buffers using a provided CheckpointStreamFactory and returns state handles
- Reader. Reads data from state handle and returns buffers (loads and deserializes)
- (de)Serializer
- StateAssignmentOperation (existing class) - reads snapshot metadata and distributes state handles across subtasks; need to add channel states distribution
TaskStateManagerImpl (existing class) - holds state assigned to task
Efficiency
The following optimizations won't necessarily be implemented in MVP:
- not writing the same buffer multiple times (when the job is backpressured); can be addressed by implementing incremental checkpointing for FS backend
- incremental loading and processing of state
- no additional memory: ideally, existing network buffers should be reused
Open questions
Avoiding double processing in downstream (if continuous spilling)
...