Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

...


Discussion thread

...

JIRA

Released: 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents
maxLevel3

Motivation

Low end-to-end latency is a much-demanded property in many Flink setups. With exactly-once, this latency depends on checkpoint interval/duration which in turn is defined by the slowest node (usually the one doing a full non-incremental snapshot). In large setups with many nodes, the probability of at least one node being slow gets higher, making almost every checkpoint slow.

Vote threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-158-Generalized-incremental-checkpoints-td48485.html
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-21352
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-25842

Release1.15


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents
maxLevel3

Motivation

The goal of this FLIP is to establish a way to drastically reduce the checkpoint interval for streaming applications, across state backends, reliably for both small and large scales. We are aiming at intervals in the order of few seconds even for larger scales (> 100 nodes, TBs of state).
Depending on user adoption of this feature and further requirements, the architecture here can also serve as the a foundation to further reduce the checkpoint interval in the future.

A faster checkpoint interval has a number of benefits for streaming applications:

  • Less work on recovery. The more frequently the checkpoint, the fewer events need to be re-processed after recovery.
  • Lower latency for transactional sinks: Transactional sinks commit on checkpoint, so faster checkpoints mean more frequent commits.
  • More predictable checkpoint intervals: Currently the length of the checkpoint depends on the size of the artifacts that need to be persisted on the checkpoint storage.
    For example, if RocksDB created only a new Level-0 SST since the last checkpoint, the checkpoint will be fast.
    But if RocksDB finished a new compaction and created a large SST for Level-3/-4/-5, the checkpoint will take longer.
  • A frequent checkpoint interval allows Flink to persist sink data in a checkpoint before writing it to the external system (write ahead log style), without adding too much latency. This can simplify the design of sinks for systems that don't expose transactional APIs well. For example the exactly-once Kafka sink is currently quite complex, due to way Kafka's transactions work, specifically the lack to recover transactions well (and rather relying on transactions timing out).

In addition, the here proposed approach will also help to reduce the small file fragmentation issue that can occur with using RocksDB with incremental checkpoints.

High-level Overview

The core idea of this proposal is to introduce a state changelog; this changelog allows operators to persist state changes in a very fine-grained manner, as described below:

  • Stateful operators write the state changes to that log (logging the state), in addition to applying them to the state tables in RocksDB or the in-mem Hashtable.
  • An operator can acknowledge a checkpoint as soon as the changes in the log have reached the durable checkpoint storage.
  • The state tables are persisted periodically, independent of the checkpoints. We call this the materialization of the state on the checkpoint storage.
  • Once the state is materialized on checkpoint storage, the state changelog can be truncated to the corresponding point.


Image Added

Image Added

Image Added


This approach mirrors what database systems do, adjusted to distributed checkpoints:

  • Changes (inserts/updates/deletes) are written to the transaction log, and the transaction is considered durable once the log is synced to disk (or other durable storage).
  • The changes are also materialized in the tables (so the database system can efficiently query the table). The table are usually persisted asynchronously (blocks are flushed to storage at a later point).
  • Once all relevant parts of the changed tables have been persisted, the transaction log can be truncated from the perspective of that particular transaction. That truncation procedure is commonly, and not coincidentally, called a "checkpoint"; the similarities here really go quite far.


We will call the component that manages the state changelog henceforth "Durable Short-term Log" (DSTL). This name is chosen to clarify and emphasize the difference in requirements and usage compared to logs as implemented by systems like Apache Kafka, Apache Pulsar, or Pravega:

  • The DSTL is always written to, but rarely read. It holds data for seconds to minutes.
  • Logs Kafka, Pulsar, Pravega are often read more than they are written (have multiple subscribers/consumers) and hold data for hours to months.

The section "StateChangelog implementations" has more details on the requirements and comparison of log implementationsThis FLIP proposes a mechanism to deal with this by materializing and uploading state continuously and only uploading the changed part during the checkpoint itself. It differs from other approaches in that 1) checkpoints are always incremental; 2) works for any state backend.


Public Interfaces

  • No changes to the public APIs (except for configuration)
  • Configuration
    • Toggle to enable
    • StateChangelog: persist delay, retry policy, file prefix
    • State backends: materialization interval, file prefix
  • Monitoring
    • StateChangelog: DFS request size, latency, number of “Logs” per request, errors
    • State backends: DFS request size, latency, errors

...

There will be likely a single changeset per keygroup but there is no such guarantee (each time StateBackend requests lastAppendedSqn a new changeset is started).

State change lifecycle

Image Added

Example

Let's consider a job with one stateful operator having a list state in RocksDB. Assume max retained checkpoints: 1.

On startup, the state is empty

StateChangelog.sqn = T0

StateBackend.lastMaterializedSqn = T0

list.add(a)

During normal processing, operator updates its state and simultaneously adds state changes to the StateChangelog. Logical time is not updated.

CP1 triggered

StateBackend calls StateChangelog.persisDurably(after = lastMaterializedSqn = T0).

StateChangelog.sqn = T1 (as a result of the call above)

StateChangelog returns a Future representing the completion of write of (T0:T1 = a) and remembers it internally.

list.add(b)

State materialization is triggered

StateBackend obtains the sqn from StateChangelog (T2)

StateChangelog.sqn = T2 (as a result of the call above)

StateBackend flushes RocksDB memtables to SST files; starts the async phase passing it obtained sqn=T2 and snapshot (~list of files).

Materialized state will be (T0:T2 = a,b)

list.add(c)

CP2 triggered

StateBackend calls StateChangelog.persisDurablypersistDurably(after = lastMaterializedSqn = T0).

StateChangelog.sqn = T3 (as a result of the call above)

StateChangelog finds the previous Future (T0:T1 = a) and combines it with a new one (T1:T3 = b,c).

CP1-Future completes

It contains a shared StateHandle pointing to a file on S3.

This handle is included in TaskSnapshot and sent to JM.

CP2-Future completes

It contains a combined StateHandle pointing to two files on S3. 

StateHandle is included in TaskSnapshot and is sent to JM.

JM finalizes CP1, and then CP2

CP1 is subsumed but nothing is removed yet.

State materialization completes

StateBackend stores the consolidated state handle (T0:T2 = a,b)

StateBackend.lastMaterializedSqn = T2

StateChangelog.truncate(before = T2) // remove (T0:T1 = a) object reference (not any state that is already saved or is being saved)

list.add(d)

CP3 is triggered

StateBackend calls StateChangelog.persisDurably(after = lastMaterializedSqn = T2).

StateChangelog.sqn = T4 (as a result of the call above)

StateChangelog searches for reference matching the requested (T2:*) range. It finds completed (T1:T3 = b,c). To be able to filter out (T1=b) on recovery, the returned handle specifies the range (T2:T4). Files also specify the range for each changeset internally.

Found Future is combined with a new one for (T3:T4 = d).

CP3-Future completes

StateBackend combines its result with the consolidated handle and sends to JM:

  • (T0:T1 = a,b) - consolidated changes
  • (T1:T3 = b,c) - previously saved changelog (“b” filtered on recovery)
  • (T3:T4 = d) - the newly saved changelog

JM finalizes CP3

CP2 is subsumed, the state handle pointing to the file with (T0:T1 = a) is not referenced anymore and so removed.

...

  • It should be possible to load the existing state as a consolidated state without any changes (but not vice-versa)
  • Only keyed state in the initial version
  • At most 1 concurrent checkpoint
  • Probably, only RocksDB in the initial version
  • Nothing is deprecated and no migration needed
  • Changeset of a checkpoint must fit in memory

...