Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current stateUnder Discussion

Discussion thread

JIRA:

Page properties


Discussion threadhttps://s.apache.org/flip-151-discussion
Vote threadhttps://lists.apache.org/thread/rqxt3l5gpzx06n3xy91qclw09www083t
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-21648

Release


Preview https://flink-packages.org/packages/heap-backend-incremental-checkpointsReleased:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents
maxLevel3
outlinetrue

Motivation

Currently, the most widely used Flink state backends are RocksDB- and Heap-based.

...

As an example, there was a question from Pinterest with a on user ML, 250G state deployment that fits into memory (Jan 30, 2020).

...

For (1), potentially multiple unconfirmed versions have to be maintained (corresponding to multiple checkpoints, to avoid full snapshot on failure, see unconfirmed checkpoints

Removed keys

Currently, entries are just “unlinked” on removal. Therefore, they wouldn’t be included in the snapshot, but will be present after recovery.

...

  1. An entry can be removed many times - keep (and write) it only once
  2. An entry can be re-added (and then re-removed)
  3. A snapshot with a removal can still be unconfirmed by JM by the time of the next checkpoint (see unconfirmed checkpoints)
  4. An entry key can be mutable, such as BinaryRowData (this doesn’t cause issues with non-incremental snapshots because removal takes place straight ahead, without tracking)

...

The former has the advantage of having more knowledge about the operations performed, so less fewer full replacements have to be made.

OTH, places placing journals in entries allows to:

  1. Bind journal lifecycle to entry/version
  2. Avoid extra hash lookup
  3. Easily access state and track changes

These properties allow accessing the state without adding significant complexity or storing updates which would increase memory overhead. Therefore, placing journals in entries was chosen.

List state

For a list, if only appends were made, it’s enough to store its length right after the previous snapshot to get the added elements. Otherwise, if it was truncated for example, the full list is snapshotted.

...

For a map, sets of removed and updated keys need to be maintained. Again, if there were any clear() or “black-box” operations (such as entrySet() with potential replace/remove), then the full map is snapshotted.

As an optimization, map.clear() call can remove the entry completely.

Value, Aggregating and Reducing states

...

Once a snapshot is sent to the JM, its corresponding checkpoint ID is stored. Upon receiving a confirmation, it becomes a base for the next one.

Unconfirmed checkpoints
Anchor
unconfirmed-checkpoints
unconfirmed-checkpoints

If, at the time of a checkpoint N, the previous one N-1 is still unconfirmed:

...

Older checkpoints are removed by JM once they aren’t referenced anymore (existing functionality).

Alternatively, compaction can be done incrementally. For that, each snapshot should have (for each map):
- vStart: minimum version of the map needed to restore using this snapshot
- vTaken: version of the map at which it was taken
Upon finalization, all snapshots that are not needed for restore (i.e. oldSnapshot.vTaken < newSnapshot.vStart for each map) can be dropped. This tracking can be done in SharedStateRegistry by adding some (ordered) mapping between StateMaps and their versions, per key group.

vStart is calculated as a minimum version across all map entries (each entry maintains a version when it was last fully snapshotted to support journaling). Each time, some configurable fraction of entries is fully snapshotted (chosen by the lowest lastFullSnapshot version).

In the first version, the simplest solution seems preferable. 

Relationship to FLIP-158

FLIP-158 proposes a similar idea but in a more generic way.

However, this generality comes at cost, as with FLIP-151 Flink is able to:

1. "Squash" the changes made to the same key. For example, if some counter was changed 10 times then FLIP-151 will send only the last value (this allows to send AND store less data compared to FLIP-158).

2. Keep in memory only the changed keys and not the values (this allows to reduce memory AND latency (caused by serialization + copying on every update) compared to FLIP-158)

3. Changelogging involves synchronous serialization on each state access

4. Journalling allows for even smaller increments for collection-typed state (e.g. snapshotting only the appended ListState values rather than the whole list)


On the other hand, integration with FLIP-158 would allow "background" materialization, eliminating periodic full checkpoints (or more complex way to achieve this).

For such an integration:

  1. on startup, IncHeapKeyedBackend notifies ChangelogStateBackend that no changes should be sent (to the changelog)
  2. on checkpoint, ChangelogStateBackend asks IncHeapKeyedBackend to create an incremental checkpoint
  3. on materialization, ChangelogStateBackend asks for a full snapshot

This integration is out of scope of this FLIP.

Proposed Changes

  1. The following (~20) classes to be refactored to allow extension (extracting methods, adding hooks, etc.):
    1. (Heap-) KeyedStateBackend, SnapshotStrategy, RestoreOperation
    2. (CopyOnWrite-) Map, Entry, State, Snapshots
    3. RegisteredKeyValueStateBackendMetaInfo
  2. Incremental versions of the following (~10) classes to be added:
    1. HashMapStateBackend
    2. CoW StateMap, StateMapSnapshot, StateTable, StateTableSnapshot
    3. StateMapEntry
    4. KeyedStateBackend, RestoreOperation, SnapshotStrategy,
    5. RegisteredKeyValueStateBackendMetaInfo
    6. StateSnapshotRestore
  3. The following new classes to be added (~10):
    1. RemovalLog
    2. For each state type: Diff, DiffSerializer, Journal, JournalFactory
    3. StateSnapshotKeyGroupReaderV7
  4. The following classes to be updated:
    1. StateTableByKeyGroupReaders: add new version
    2. Fs- and MemoryStateBackend will have additional settings to construct incremental backend versions

The backend/new classes will reside in a new module under flink/flink-state-backends.

The refactorings are mostly to allow extension and customization.

Public Interfaces

Code: see Proposed changes

...

Nothing is deprecated (at least for now). The feature is disabled by default.

TTL support: wrapping existing TTL transformers inside the new incremental StateMap iterator is enough to support TTL.

Limitations

In the first version:

  1. Single checkpoint failure (or no confirmation) leads to a non-incremental snapshot
  2. Only KV-states (sorted states, aka PQ, aka timers are snapshotted non-incrementally)
  3. Only states with immutable keys (e.g. not BinaryRowData)

...

  1. Unit tests for the new code
  2. Add incremental mode: FileStateBackendTest, EventTimeWindowCheckpointingITCase, KeyedStateCheckpointingITCase
  3. Add increment-specific integration tests to FileStateBackendTest
  4. Parameterize the whole CI build (each build randomly or as a separate periodic build) using existing state.backend.incremental=true/false. This affects the following tests:
    1. About 10 IT cases in stream.sql (e.g. RankITCase, JoinITCase)
    2. Planner IT cases (e.g. GroupWindowITCase)
    3. E2E: test_streaming_sql (planner) (especially blink), test_resume_savepoint, test_ha_datastream (HA*)
    4. Others: CEPOperatorTest, WindowOperatorMigrationTest, StreamOperatorSnapshotRestoreTest, FileStateBackendTest, MemoryStateBackendTest, HeapAsyncSnapshotTtlStateTest, LocalRecoveryITCase, RegionFailoverITCase, UnalignedCheckpointITCase, 
  5. Important for checking removal correctness: WindowAggregateITCase, WindowCheckpointingITCase
  6. Load testing - similar to the evaluation below but more detailed:
    1. job runtime
    2. recovery time
    3. checkpointing times
    4. memory overhead

(in the future, side-by-side comparisons with permuted configurations can be added)

...