Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

...

flip-151-discussion
Vote threadhttps://lists.apache.org/thread/rqxt3l5gpzx06n3xy91qclw09www083t
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-21648

Release


Preview https://flink-packages.org/packages/heap-backend-incremental-checkpoints

...

JIRA:

Released:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

As an example, there was a question from Pinterest with a on user ML, 250G state deployment that fits into memory (Jan 30, 2020).

...

In the first version, the simplest solution seems preferable. 

Relationship to FLIP-158

FLIP-158 proposes a similar idea but in a more generic way.

However, this generality comes at cost, as with FLIP-151 Flink is able to:

1. "Squash" the changes made to the same key. For example, if some counter was changed 10 times then FLIP-151 will send only the last value (this allows to send AND store less data compared to FLIP-158).

2. Keep in memory only the changed keys and not the values (this allows to reduce memory AND latency (caused by serialization + copying on every update) compared to FLIP-158)

3. Changelogging involves synchronous serialization on each state access

4. Journalling allows for even smaller increments for collection-typed state (e.g. snapshotting only the appended ListState values rather than the whole list)


On the other hand, integration with FLIP-158 would allow "background" materialization, eliminating periodic full checkpoints (or more complex way to achieve this).

For such an integration:

  1. on startup, IncHeapKeyedBackend notifies ChangelogStateBackend that no changes should be sent (to the changelog)
  2. on checkpoint, ChangelogStateBackend asks IncHeapKeyedBackend to create an incremental checkpoint
  3. on materialization, ChangelogStateBackend asks for a full snapshot

This integration is out of scope of this FLIP.

Proposed Changes

  1. The following (~20) classes to be refactored to allow extension (extracting methods, adding hooks, etc.):
    1. (Heap-) KeyedStateBackend, SnapshotStrategy, RestoreOperation
    2. (CopyOnWrite-) Map, Entry, State, Snapshots
    3. RegisteredKeyValueStateBackendMetaInfo
  2. Incremental versions of the following (~10) classes to be added:
    1. HashMapStateBackend
    2. CoW StateMap, StateMapSnapshot, StateTable, StateTableSnapshot
    3. StateMapEntry
    4. KeyedStateBackend, RestoreOperation, SnapshotStrategy,
    5. RegisteredKeyValueStateBackendMetaInfo
    6. StateSnapshotRestore
  3. The following new classes to be added (~10):
    1. RemovalLog
    2. For each state type: Diff, DiffSerializer, Journal, JournalFactory
    3. StateSnapshotKeyGroupReaderV7
  4. The following classes to be updated:
    1. StateTableByKeyGroupReaders: add new version
    2. Fs- and MemoryStateBackend will have additional settings to construct incremental backend versions

The backend/new classes will reside in a new module under flink/flink-state-backends.

The refactorings are mostly to allow extension and customization.

Public Interfaces

Code: see Proposed changes

...

Limitations

In the first version:

  1. Single checkpoint failure (or no confirmation) leads to a non-incremental snapshot
  2. Only KV-states (sorted states, aka PQ, aka timers are snapshotted non-incrementally)
  3. Only states with immutable keys (e.g. not BinaryRowData)

...

  1. Unit tests for the new code
  2. Add incremental mode: FileStateBackendTest, EventTimeWindowCheckpointingITCase, KeyedStateCheckpointingITCase
  3. Add increment-specific integration tests to FileStateBackendTest
  4. Parameterize the whole CI build (each build randomly or as a separate periodic build) using existing state.backend.incremental=true/false. This affects the following tests:
    1. About 10 IT cases in stream.sql (e.g. RankITCase, JoinITCase)
    2. Planner IT cases (e.g. GroupWindowITCase)
    3. E2E: test_streaming_sql (planner) (especially blink), test_resume_savepoint, test_ha_datastream (HA*)
    4. Others: CEPOperatorTest, WindowOperatorMigrationTest, StreamOperatorSnapshotRestoreTest, FileStateBackendTest, MemoryStateBackendTest, HeapAsyncSnapshotTtlStateTest, LocalRecoveryITCase, RegionFailoverITCase, UnalignedCheckpointITCase, 
  5. Important for checking removal correctness: WindowAggregateITCase, WindowCheckpointingITCase
  6. Load testing - similar to the evaluation below but more detailed:
    1. job runtime
    2. recovery time
    3. checkpointing times
    4. memory overhead

(in the future, side-by-side comparisons with permuted configurations can be added)

...