Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

JIRA:

Vote threadhttps://lists.apache.org/thread/rqxt3l5gpzx06n3xy91qclw09www083t
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-21648

Release


Preview https://flink-packages.org/packages/heap-backend-incremental-checkpointsReleased:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

As an example, there was a question from Pinterest with a on user ML, 250G state deployment that fits into memory (Jan 30, 2020).

...

For (1), potentially multiple unconfirmed versions have to be maintained (corresponding to multiple checkpoints, to avoid full snapshot on failure, see unconfirmed checkpoints

Removed keys

Currently, entries are just “unlinked” on removal. Therefore, they wouldn’t be included in the snapshot, but will be present after recovery.

...

  1. An entry can be removed many times - keep (and write) it only once
  2. An entry can be re-added (and then re-removed)
  3. A snapshot with a removal can still be unconfirmed by JM by the time of the next checkpoint (see unconfirmed checkpoints)
  4. An entry key can be mutable, such as BinaryRowData (this doesn’t cause issues with non-incremental snapshots because removal takes place straight ahead, without tracking)

...

In the first version, the simplest solution seems preferable

Relationship to FLIP-158

FLIP-158 proposes a similar idea but in a more generic way.

However, this generality comes at cost, as with FLIP-151 Flink is able to:

1. "Squash" the changes made to the same key. For example, if some counter was changed 10 times then FLIP-151 will send only the last value (this allows to send AND store less data compared to FLIP-158).

2. Keep in memory only the changed keys and not the values (this allows to reduce memory AND latency (caused by serialization + copying on every update) compared to FLIP-158)

3. Changelogging involves synchronous serialization on each state access

4. Journalling allows for even smaller increments for collection-typed state (e.g. snapshotting only the appended ListState values rather than the whole list)


On the other hand, integration with FLIP-158 would allow "background" materialization, eliminating periodic full checkpoints (or more complex way to achieve this).

For such an integration:

  1. on startup, IncHeapKeyedBackend notifies ChangelogStateBackend that no changes should be sent (to the changelog)
  2. on checkpoint, ChangelogStateBackend asks IncHeapKeyedBackend to create an incremental checkpoint
  3. on materialization, ChangelogStateBackend asks for a full snapshot

This integration is out of scope of this FLIP.

Proposed Changes

  1. The following (~20) classes to be refactored to allow extension (extracting methods, adding hooks, etc.):
    1. (Heap-) KeyedStateBackend, SnapshotStrategy, RestoreOperation
    2. (CopyOnWrite-) Map, Entry, State, Snapshots
    3. RegisteredKeyValueStateBackendMetaInfo
  2. Incremental versions of the following (~10) classes to be added:
    1. HashMapStateBackend
    2. CoW StateMap, StateMapSnapshot, StateTable, StateTableSnapshot
    3. StateMapEntry
    4. KeyedStateBackend, RestoreOperation, SnapshotStrategy,
    5. RegisteredKeyValueStateBackendMetaInfo
    6. StateSnapshotRestore
  3. The following new classes to be added (~10):
    1. RemovalLog
    2. For each state type: Diff, DiffSerializer, Journal, JournalFactory
    3. StateSnapshotKeyGroupReaderV7
  4. The following classes to be updated:
    1. StateTableByKeyGroupReaders: add new version
    2. Fs- and MemoryStateBackend will have additional settings to construct incremental backend versions

The backend/new classes will reside in a new module under flink/flink-state-backends.

The refactorings are mostly to allow extension and customization.

Public Interfaces

Code: see Proposed changes

...

Limitations

In the first version:

  1. Single checkpoint failure (or no confirmation) leads to a non-incremental snapshot
  2. Only KV-states (sorted states, aka PQ, aka timers are snapshotted non-incrementally)
  3. Only states with immutable keys (e.g. not BinaryRowData)

...