Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA

Released: -


Motivation

Apache Flink's durability story is a mystery to many users. One of the most common recurring questions from users comes from not understanding the relationship between state, state backends, and snapshots. Some of this confusion can be abated with learning material but . Still, the question is so pervasive that we believe Flink’s Flink's user APIs should be better communicate with what different components are responsible for. 

This FLIP aims to address There are two core issues in the API that need to be addressed; inconsistent naming and intermixing of unrelated configurations/responsibilities. 

Naming

Flink ships with three state backends out of the box: MemoryStateBackend, FsStateBackend, and RocksDBStateBackend. MemoryStateBackend and FsStateBackend are named based on where they write out their checkpoints. However, they both use the same in-memory data structure to store state locally. RocksDBStateBackend, on the other hand, is named based on where it stores data locally while it also snapshots to a durable filesystem. There is an additional common question about how to manage RocksDB. It is not well understood Users tend to misunderstand that RocksDB is embedded and does not need to be managed explicitly managed.  

Configuration

The second issue is that the StateBackend interface is overloaded with too much functionality. It currently contains four methods.  

Code Block
languagejava
public interface StateBackend extends java.io.Serializable {

  // ------------------------------------------------------------------------
  //  Checkpoint storage - the durable persistence of checkpoint data
  // ------------------------------------------------------------------------

  CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer);

  CheckpointStorage createCheckpointStorage(JobID jobId);

  // ------------------------------------------------------------------------
  //  Structure Backends
  // ------------------------------------------------------------------------
    <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
     Environment env,
     JobID jobID,
     String operatorIdentifier,
     TypeSerializer<K> keySerializer,
     int numberOfKeyGroups,
     KeyGroupRange keyGroupRange,
     TaskKvStateRegistry kvStateRegistry,
     TtlTimeProvider ttlTimeProvider,
     MetricGroup metricGroup,
     @Nonnull Collection<KeyedStateHandle> stateHandles,
     CloseableRegistry cancelStreamRegistry) throws Exception;
 
  OperatorStateBackend createOperatorStateBackend(
     Environment env,
     String operatorIdentifier,
     @Nonnull Collection<OperatorStateHandle> stateHandles,
     CloseableRegistry cancelStreamRegistry) throws Exception;
}


As you can see by the comments in the interface itselfinterface's comments, implementations are responsible for two separate and unrelated tasks. Checkpoint storage and local state backends. Consider some of the ways state backends can be set in code today:

...

Code Block
languagejava
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new MemoryStateBackend())
env.setStateBackend(new MemoryStateBackend(1048))
env.setStateBackend(new MemoryStateBackend("s3://checkpoints", 1048))
 
env.setStateBackend(new FsStateBackend("s3://checkpoints", 1048))
 
RocksDBStateBackend rocksDB = new RocksDBStateBackend("s3://checkpoints", 1028);
rocksDB.setOptionsFactory(/** blah **/);
 
RocksDBStateBackend rocksDB = new RocksDBStateBackend(new FsStateBackend("s3://checkpoints", 1048));
rocksDB.setOptionsFactory(/** blah **/);
 
RocksDBStateBackend rocksDB = new RocksDBStateBackend(new MemoryStateBackend());
rocksDB.setOptionsFactory(/** blah **/);
 
env.setStateBackend(rocksDB);


Constructors contain arguments for both how to configure the local state backend and checkpoint storage. It is not obvious apparent to a new user in the above example that RocksDB is going to will store data on local disk and checkpoint to S3. Most believe RocksDB will be working with S3 directly.

Proposed Changes


Separate the concerns of local state storage from checkpoint storage in user-facing APIs and provide better names.

...

CheckpointStorage:


To better separate concerns, we will remove the checkpoint storage methods from the StateBackend interface and placed them to into a new interface SnapshotStorage interface, CheckpointStorage. Flink runtime currently contains an internal interface called CheckpointStorage; it will be renamed to CheckpointStorageAccess.


Code Block
languagejava
public interface SnapshotStorageCheckpointStorage extends java.io.Serializable {

  CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer);

  CheckpointStorageCheckpointStorageAccess createCheckpointStorage(JobID jobId);
}

...

Flink will provide two default implementations: JobManagerStorage and FileSystemStorage. JobManagerStorage and FileSystemStorage will maintain the same functionality as the implementations of these methods in MemoryStateBackend and FsStateBackend respectively. We will also provide a SnapshotStorageFactory CheckpointStorageFactory to retain feature parity with existing functionality.


Code Block
languagejava
@PublicEvolving
public interface SnapshotStorageFactory<TCheckpointStorageFactory<T extends SnapshotStorage> {

	T createFromConfig(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException, IOException;
}

New StateBackend

...

User API:

To improve naming and general understanding, we will deprecate the classes MemoryStateBackend, FsStateBackend, and RocksDBStateBackend. The will be replaced with We will instead introduce HashMapStateBackend and EmbeddedRocksDBStateBackend. To be clear, we are not changing any of the runtime data structures or characteristics, ; these are simply new user-facing api API classes. 

The advantage is the names now clearly define where data lives during runtime, and we can remove the api’s API's for configuring checkpoint storage to have a clean break. 


Code Block
languagejava
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend())
 
EmbeddedRocksDBStateBackend rocksDB = new EmbeddedRocksDBStateBackend();
rocksDB.setOptionsFactory(/** blah **/);
 
env.setStateBackend(rocksDB);
 
env.getCheckpointConfig().setCheckpointLocation(new JobManagerSnapshot());
env.getCheckpointConfig().setCheckpointLocation(new FileSystemSnapshot("s3://checkpoints"));
env.getCheckpointConfig().setCheckpointLocation("s3://checkpoints");

...

The three existing state backends (Memory- MemoryStateBackend, Fs, RocksDB) FsStateBackend, and RocksDBStateBackend.- will be deprecated in favor of the new classes. In their JavaDoc and the release notes, we will provide guidance guide users on how migrating to migrate to the new api API in a compatible way. Because we are using the same internal data structures, users will be able to trivially migrate to the new API trivially, iei.e., MemoryStateBackend should be replaced with HashMapStateBackend and JobManagerStorage.

All flink-conf configurations will be duplicated for the new state backend or snapshot storage instance, which ever whichever is appropriate. Again, no functionality will be added or dropped in this change. Existing flink-conf keys will also be specified as deprecated keys of on the new state backends to ease migration.

To retain full backwards compatibility all All deprecated StateBackends (Memory, Fs, RocksDB) will implement the SnapshotStorage CheckpointStorage interface to retain full backward compatibility. When a state backend that implements implementing this interface is providedused, its snapshot checkpoint storage methods will take precedence over precede any other provided configuration. This way, all existing state backend implementations will continue to function as they do today, and users will not see any semantics changes in semantics.

While two methods will be removed from StateBackend, externally defined state backends will be able to migrate by simply merely adding `implements SnapshotStorage` CheckpointStorage` to their implementations. Again, this will be documented in the release notes.