You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »


Current state: Under Discussion

Discussion thread:

JIRA

Released: -


Motivation

Apache Flink's durability story is a mystery to many users. One of the most common recurring questions from users comes from not understanding the relationship between state, state backends, and snapshots. Some of this confusion can be abated with learning material but the question is so pervasive that we believe Flink’s user APIs should be better communicate what different components are responsible for. 

There are two core issues in the API that need to be addressed; inconsistent naming and intermixing of unrelated configurations/responsibilities. 

Naming

Flink ships with three state backends out of the box: MemoryStateBackend, FsStateBackend, and RocksDBStateBackend. MemoryStateBackend and FsStateBackend are named based on where they write out their checkpoints. However, they both use the same in-memory data structure to store state locally. RocksDBStateBackend, on the other hand, is named based on where it stores data locally while it also snapshots to a durable filesystem. There is an additional common question about how to manage RocksDB. It is not well understood that RocksDB is embedded and does not need to be explicitly managed. 

Configuration

The second issue is that the StateBackend interface is overloaded with too much functionality. It currently contains four methods. 

public interface StateBackend extends java.io.Serializable {

  // ------------------------------------------------------------------------
  //  Checkpoint storage - the durable persistence of checkpoint data
  // ------------------------------------------------------------------------

  CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer);

  CheckpointStorage createCheckpointStorage(JobID jobId);

  // ------------------------------------------------------------------------
  //  Structure Backends
  // ------------------------------------------------------------------------
    <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
     Environment env,
     JobID jobID,
     String operatorIdentifier,
     TypeSerializer<K> keySerializer,
     int numberOfKeyGroups,
     KeyGroupRange keyGroupRange,
     TaskKvStateRegistry kvStateRegistry,
     TtlTimeProvider ttlTimeProvider,
     MetricGroup metricGroup,
     @Nonnull Collection<KeyedStateHandle> stateHandles,
     CloseableRegistry cancelStreamRegistry) throws Exception;
 
  OperatorStateBackend createOperatorStateBackend(
     Environment env,
     String operatorIdentifier,
     @Nonnull Collection<OperatorStateHandle> stateHandles,
     CloseableRegistry cancelStreamRegistry) throws Exception;
}


As you can see by the comments in the interface itself, implementations are responsible for two separate and unrelated tasks. Checkpoint storage and local state backends. Consider some of the ways state backends can be set in code today:


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new MemoryStateBackend())
env.setStateBackend(new MemoryStateBackend(1048))
env.setStateBackend(new MemoryStateBackend("s3://checkpoints", 1048))
 
env.setStateBackend(new FsStateBackend("s3://checkpoints", 1048))
 
RocksDBStateBackend rocksDB = new RocksDBStateBackend("s3://checkpoints", 1028);
rocksDB.setOptionsFactory(/** blah **/);
 
RocksDBStateBackend rocksDB = new RocksDBStateBackend(new FsStateBackend("s3://checkpoints", 1048));
rocksDB.setOptionsFactory(/** blah **/);
 
RocksDBStateBackend rocksDB = new RocksDBStateBackend(new MemoryStateBackend());
rocksDB.setOptionsFactory(/** blah **/);
 
env.setStateBackend(rocksDB);


Constructors contain arguments for both how to configure the local state backend and checkpoint storage. It is not obvious to a new user in the above example that RocksDB is going to store data on local dis and checkpoint to S3. Most believe RocksDB will be working with S3 directly.

Proposed Changes


Separate the concerns of local state storage from checkpoint storage in user-facing APIs and provide better names.

SnapshotStorage:


To better separate concerns, we will deprecate the checkpoint storage methods from the StateBackend interface.  We will move them to a new interface SnapshotStorage and provide two default implementations: JobManagerStorage and FileSystemStorage.


public interface SnapshotStorage extends java.io.Serializable {

  CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer);

  CheckpointStorage createCheckpointStorage(JobID jobId);
}

JobManagerStorage and FileSystemStorage will maintain the same functionality as the implementations of these methods in MemoryStateBackend and FsStateBackend respectively. We will also provide a SnapshotStorageFactory to retain feature parity with existing functionality.


@PublicEvolving
public interface SnapshotStorageFactory<T extends SnapshotStorage> {

	T createFromConfig(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException, IOException;
}

New StateBackend Implementations:

To improve naming and general understanding we will deprecate the classes MemoryStateBackend, FsStateBackend, and RocksDBStateBackend. The will be replaced with OnHeapStateBackend and EmbeddedRocksDBStateBackend. To be clear, we are not changing any of the runtime data structures or characteristics, these are simply new user-facing api classes. 

The advantage is the names now clearly define where data lives during runtime and we can remove the api’s for configuring checkpoint storage to have a clean break. 


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new OnHeapStateBackend())
 
EmbeddedRocksDBStateBackend rocksDB = new EmbeddedRocksDBStateBackend();
rocksDB.setOptionsFactory(/** blah **/);
 
env.setStateBackend(rocksDB);
 
env.getCheckpointConfig().setCheckpointLocation(new JobManagerSnapshot());
env.getCheckpointConfig().setCheckpointLocation(new FileSystemSnapshot("s3://checkpoints"));
env.getCheckpointConfig().setCheckpointLocation("s3://checkpoints");

Compatibility, Deprecation, and Migration Plan


To retain full backwards compatibility a new methods `boolean isLegacyStateBackend` will be added to the StateBackend interface with a default implementation of true. When in legacy mode, all existing state backend implementations will continue to function as they do today and any provided SnapshotStorage will be ignored. And because we are using the same internal data structures, users will be able to trivially migrate to the new API.


All flink-conf configurations will be duplicated for the new state backend or snapshot storage instance, which ever is appropriate. Again, no functionality will be added or dropped in this change. Existing flink-conf keys will also be specified as deprecated keys of on the new state backends to ease migration.

  • No labels