Status
...
Page properties | |
---|---|
|
...
...
...
|
...
|
Preview https://flink-packages.org/packages/heap-backend-incremental-checkpoints
...
JIRA:
Released:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
As an example, there was a question from Pinterest with a on user ML, 250G state deployment that fits into memory (Jan 30, 2020).
...
In the first version, the simplest solution seems preferable.
Relationship to FLIP-158
FLIP-158 proposes a similar idea but in a more generic way.
However, this generality comes at cost, as with FLIP-151 Flink is able to:
1. "Squash" the changes made to the same key. For example, if some counter was changed 10 times then FLIP-151 will send only the last value (this allows to send AND store less data compared to FLIP-158).
2. Keep in memory only the changed keys and not the values (this allows to reduce memory AND latency (caused by serialization + copying on every update) compared to FLIP-158)
3. Changelogging involves synchronous serialization on each state access
4. Journalling allows for even smaller increments for collection-typed state (e.g. snapshotting only the appended ListState values rather than the whole list)
On the other hand, integration with FLIP-158 would allow "background" materialization, eliminating periodic full checkpoints (or more complex way to achieve this).
For such an integration:
- on startup, IncHeapKeyedBackend notifies ChangelogStateBackend that no changes should be sent (to the changelog)
- on checkpoint, ChangelogStateBackend asks IncHeapKeyedBackend to create an incremental checkpoint
- on materialization, ChangelogStateBackend asks for a full snapshot
This integration is out of scope of this FLIP.
Proposed Changes
- The following (~20) classes to be refactored to allow extension (extracting methods, adding hooks, etc.):
- (Heap-) KeyedStateBackend, SnapshotStrategy, RestoreOperation
- (CopyOnWrite-) Map, Entry, State, Snapshots
- RegisteredKeyValueStateBackendMetaInfo
- Incremental versions of the following (~10) classes to be added:
- HashMapStateBackend
- CoW StateMap, StateMapSnapshot, StateTable, StateTableSnapshot
- StateMapEntry
- KeyedStateBackend, RestoreOperation, SnapshotStrategy,
- RegisteredKeyValueStateBackendMetaInfo
- StateSnapshotRestore
- The following new classes to be added (~10):
- RemovalLog
- For each state type: Diff, DiffSerializer, Journal, JournalFactory
- StateSnapshotKeyGroupReaderV7
- The following classes to be updated:
- StateTableByKeyGroupReaders: add new version
- Fs- and MemoryStateBackend will have additional settings to construct incremental backend versions
The backend/new classes will reside in a new module under flink/flink-state-backends.
The refactorings are mostly to allow extension and customization.
Public Interfaces
Code: see Proposed changes
...
Nothing is deprecated (at least for now). The feature is disabled by default.
TTL support: wrapping existing TTL transformers inside the new incremental StateMap iterator is enough to support TTL.
Limitations
In the first version:
Single checkpoint failure (or no confirmation) leads to a non-incremental snapshot- Only KV-states (sorted states, aka PQ, aka timers are snapshotted non-incrementally)
- Only states with immutable keys (e.g. not BinaryRowData)
...
- Unit tests for the new code
- Add incremental mode: FileStateBackendTest, EventTimeWindowCheckpointingITCase, KeyedStateCheckpointingITCase
- Add increment-specific integration tests to FileStateBackendTest
- Parameterize the whole CI build (each build randomly or as a separate periodic build) using existing state.backend.incremental=true/false. This affects the following tests:
- About 10 IT cases in stream.sql (e.g. RankITCase, JoinITCase)
- Planner IT cases (e.g. GroupWindowITCase)
- E2E: test_streaming_sql (planner) (especially blink), test_resume_savepoint, test_ha_datastream (HA*)
- Others: CEPOperatorTest, WindowOperatorMigrationTest, StreamOperatorSnapshotRestoreTest, FileStateBackendTest, MemoryStateBackendTest, HeapAsyncSnapshotTtlStateTest, LocalRecoveryITCase, RegionFailoverITCase, UnalignedCheckpointITCase,
- Important for checking removal correctness: WindowAggregateITCase, WindowCheckpointingITCase
- Load testing - similar to the evaluation below but more detailed:
- job runtime
- recovery time
- checkpointing times
- memory overhead
(in the future, side-by-side comparisons with permuted configurations can be added)
...