Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

...


Discussion thread

...

JIRA

Released: 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents
maxLevel3

Motivation

Low end-to-end latency is a much-demanded property in many Flink setups. With exactly-once, this latency depends on checkpoint interval/duration which in turn is defined by the slowest node (usually the one doing a full non-incremental snapshot). In large setups with many nodes, the probability of at least one node being slow gets higher, making almost every checkpoint slow.

Vote threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-158-Generalized-incremental-checkpoints-td48485.html
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-21352
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-25842

Release1.15


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents
maxLevel3

Motivation

The goal of this FLIP is to establish a way to drastically reduce the checkpoint interval for streaming applications, across state backends, reliably for both small and large scales. We are aiming at intervals in the order of few seconds even for larger scales (> 100 nodes, TBs of state).
Depending on user adoption of this feature and further requirements, the architecture here can also serve as the a foundation to further reduce the checkpoint interval in the future.

A faster checkpoint interval has a number of benefits for streaming applications:

  • Less work on recovery. The more frequently the checkpoint, the fewer events need to be re-processed after recovery.
  • Lower latency for transactional sinks: Transactional sinks commit on checkpoint, so faster checkpoints mean more frequent commits.
  • More predictable checkpoint intervals: Currently the length of the checkpoint depends on the size of the artifacts that need to be persisted on the checkpoint storage.
    For example, if RocksDB created only a new Level-0 SST since the last checkpoint, the checkpoint will be fast.
    But if RocksDB finished a new compaction and created a large SST for Level-3/-4/-5, the checkpoint will take longer.
  • A frequent checkpoint interval allows Flink to persist sink data in a checkpoint before writing it to the external system (write ahead log style), without adding too much latency. This can simplify the design of sinks for systems that don't expose transactional APIs well. For example the exactly-once Kafka sink is currently quite complex, due to way Kafka's transactions work, specifically the lack to recover transactions well (and rather relying on transactions timing out).

In addition, the here proposed approach will also help to reduce the small file fragmentation issue that can occur with using RocksDB with incremental checkpoints.

High-level Overview

The core idea of this proposal is to introduce a state changelog; this changelog allows operators to persist state changes in a very fine-grained manner, as described below:

  • Stateful operators write the state changes to that log (logging the state), in addition to applying them to the state tables in RocksDB or the in-mem Hashtable.
  • An operator can acknowledge a checkpoint as soon as the changes in the log have reached the durable checkpoint storage.
  • The state tables are persisted periodically, independent of the checkpoints. We call this the materialization of the state on the checkpoint storage.
  • Once the state is materialized on checkpoint storage, the state changelog can be truncated to the corresponding point.


Image Added

Image Added

Image Added


This approach mirrors what database systems do, adjusted to distributed checkpoints:

  • Changes (inserts/updates/deletes) are written to the transaction log, and the transaction is considered durable once the log is synced to disk (or other durable storage).
  • The changes are also materialized in the tables (so the database system can efficiently query the table). The table are usually persisted asynchronously (blocks are flushed to storage at a later point).
  • Once all relevant parts of the changed tables have been persisted, the transaction log can be truncated from the perspective of that particular transaction. That truncation procedure is commonly, and not coincidentally, called a "checkpoint"; the similarities here really go quite far.


We will call the component that manages the state changelog henceforth "Durable Short-term Log" (DSTL). This name is chosen to clarify and emphasize the difference in requirements and usage compared to logs as implemented by systems like Apache Kafka, Apache Pulsar, or Pravega:

  • The DSTL is always written to, but rarely read. It holds data for seconds to minutes.
  • Logs Kafka, Pulsar, Pravega are often read more than they are written (have multiple subscribers/consumers) and hold data for hours to months.

The section "StateChangelog implementations" has more details on the requirements and comparison of log implementationsThis FLIP proposes a mechanism to deal with this by materializing and uploading state continuously and only uploading the changed part during the checkpoint itself. It differs from other approaches in that 1) checkpoints are always incremental; 2) works for any state backend.


Public Interfaces

  • No changes to the public APIs (except for configuration)
  • Configuration
    • Toggle to enable
    • StateChangelog: persist delay, retry policy, file prefix
    • State backends: materialization interval, file prefix
  • Monitoring
    • StateChangelog: DFS request size, latency, number of “Logs” per request, errors
    • State backends: DFS request size, latency, errors

...

  • It should be possible to load the existing state as a consolidated state without any changes (but not vice-versa)
  • Only keyed state in the initial version
  • At most 1 concurrent checkpoint
  • Probably, only RocksDB in the initial version
  • Nothing is deprecated and no migration needed
  • Changeset of a checkpoint must fit in memory

...