Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Page properties

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

...

Discussion thread

...

JIRA

...

serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-19463

Release1.13


Motivation

Apache Flink's durability story is a mystery to many users. One of the most common recurring questions from users comes from not understanding the relationship between state, state backends, and snapshots. Some of this confusion can be abated with learning material. Still, the question is so pervasive that we believe Flink's user APIs should be better communicate with what different components are responsible.

...

1) "We use RocksDB because we don't need fault tolerance."
2) "We don't use RocksDB because we don't want to manage an external database."
3) Believing RocksDB is reading and writing directly with S3 or HDFS (vs. local disk)
4) Believing FsStateBackend spills to disk or has anything to do with the local filesystem
5) Pointing RocksDB at network-attached storage, believing that the state backend needs to be fault-tolerant

This question from the ml mailing list is very representative of where users are struggling [1]. Many of these questions were not from new users but from organizations that were in production! The current state backend abstraction is to too complex for many of our users. What all these questions have in common is misunderstanding the relationship between how data is stored locally on TMs vs how checkpoints make that state durable.

...

To better separate concerns, we will remove the checkpoint storage methods from the StateBackend interface and placed them into a new interface, CheckpointStorage. Flink runtime currently contains an internal interface called CheckpointStorage; it will be renamed to CheckpointStorageAccess.


Code Block
languagejava
/**
 * CheckpointStorage defines how checkpoint snapshots are persisted for fault tolerance.
 * Various implementations store their checkpoints in different fashions and have different requirements and
 * availability guarantees.
 *
 *<p>For example, JobManagerCheckpointStorage stores checkpoints in the memory of the JobManager.
 * It is lightweight and without additional dependencies but is not highly available
 * and only supports small state sizes. This checkpoint storage policy is convenient for
 * local testing and development.
 *
 *<p>FileSystemCheckpointStorage stores checkpoints in a filesystem. For systems like
 * HDFS, NFS Drives, S3, and GCS, this storage policy supports large state size,
 * in the magnitude of many terabytes while providing a highly available foundation
 * for stateful applications. This checkpoint storage policy is recommended for most
 * production deployments.
 */
public interface CheckpointStorage extends java.io.Serializable {

  CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer);

  CheckpointStorageAccess createCheckpointStorage(JobID jobId);
}

...

Code Block
languagejava
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend())
 
EmbeddedRocksDBStateBackend rocksDB = new EmbeddedRocksDBStateBackend();
rocksDB.setOptionsFactory(/** blah **/);
 
env.setStateBackend(rocksDB);
env.setDefaultSavepointDirectory("s3://savepoint");

env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("s3://checkpoints"));

// shortcut for env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("s3://checkpoints"));
env.getCheckpointConfig().setCheckpointStorage("s3://checkpoints");

...

OldNew
MemoryStateBackend()HashMapStateBackend() + JobManagerCheckpointStorage()
FsStateBackend()HashMapStateBackend() + FileSystemCheckpointStorage()
RocksDBStateBackend(new MemoryStateBackend())EmbeddedRocksDBStateBackend()  + JobManagerCheckpointStorage()
RocksDBStateBackend(new FsStateBackend())EmbeddedRocksDBStateBackend()  + FileSystemCheckpointStorage()
MemoryStateBackend("file://path")HashMapStateBackend() + JobManagerCheckpointStorage("file://path")


All flink-conf configurations will be duplicated for the new state backend or snapshot storage instance, whichever is appropriate. Again, no functionality will be added or dropped in this change. Existing flink-conf keys will also be specified as deprecated keys on the new state backends to ease migration.

...

While two methods will be removed from StateBackend, externally defined state backends will be able to migrate by merely adding `implements CheckpointStorage` to their implementations. Again, this will be documented in the release notes.


 [1http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/State-Storage-Questions-td37919.html

...