Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Operator states can be quite big. Some use cases where broadcast states are of big size were pointed out on stackoverflow (see  https://lists.apache.org/thread/2kylgj0fdmn21jk7x63696mgdvd1csxo). After discussion, adding operator state compression was decided.

Public Interfaces

Until now (snapshot v5 format), the operator state snapshots were not compressed. Regardless of weather the user enables state compression, Flink needs to be able to read compressed and uncompressed previous snapshots. So the snapshot format for operator states needs to evolve: we need to add a boolean stating that the operator state is compressed or not.

Proposed Changes

  • Use execution.checkpointing.snapshot-compression configuration parameter for operator state compression (this parameter is already used for keyed state compression)
  • Add a new snapshot format version 6 compatible with the previous versions that adds a boolean for snapshot compression
  • At snapshot time: depending on user configuration, write compressed regular and broadcast operator states (but not the snapshot meta info) and write the compression boolean accordingly
  • The compression unit is a single state (in case of keyed state it is a key-group of a single state)
  • As for now there is only one compression algorithm which is Snappy. It is in use for compressing keyed states. We propose to use the same for operator states
  • At restoration time: depending on the read compression boolean, read the snapshots as compressed or not.

Operator snapshot meta info format change (proposed addition is in bold):

  • integer version
  • boolean compression
  • short meta info snapshot count
  • binary meta info snapshots

 

Compatibility, Deprecation, and Migration Plan

The new meta info snapshot format is backward compatible with the previous ones:


// read the version from the snapshot 

if (readVersion >= 6) {
usingOperatorStateCompression = in.readBoolean();
} else {
   usingOperatorStateCompression = false;
}
// read the rest of the snapshot meta info
// read the state data as compressed or not depending on usingOperatorStateCompression


This way, old snapshots written with format version < 6 can still be read regardless of the execution.checkpointing.snapshot-compression configuration.

The support of previous snapshots formats will be similar to one existing for the keyed state (the systems adapts itself to the version it reads from the snapshot)

Test Plan

Operator snapshot meta info format change is covered by existing tests:  org.apache.flink.runtime.state.SerializationProxiesTest

State compression is already covered by existing tests doing operator state snapshot write and operator state snapshot restore: in modules flink-tests, flink-streaming-java, flink-runtime

Rejected Alternatives

None