Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Flink will provide two default implementations: JobManagerStorage  JobManagerCheckpointStorage and FileSystemStorage. JobManagerStorage and FileSystemStorage FileSystemCheckpointStorage. JobManagerCheckpointStorage and FileSystemCheckpointStorage will maintain the same functionality as the implementations of these methods in MemoryStateBackend and FsStateBackend. We will also provide a CheckpointStorageFactory to retain feature parity with existing functionality.

...

Code Block
languagejava
@PublicEvolving
public interface CheckpointStorageFactory<T extends SnapshotStorage>CheckpointStorage> {

	T createFromConfig(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException, IOException;
}

...

Code Block
languagejava
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend())
 
EmbeddedRocksDBStateBackend rocksDB = new EmbeddedRocksDBStateBackend();
rocksDB.setOptionsFactory(/** blah **/);
 
env.setStateBackend(rocksDB);
env.setDefaultSavepointDirectory("s3://savepoint");

env.getCheckpointConfig().setCheckpointStorage(new JobManagerSnapshotJobManagerCheckpointStorage());
env.getCheckpointConfig().setCheckpointStorage(new FileSystemSnapshotFileSystemCheckpointStorage("s3://checkpoints"));
env.getCheckpointConfig().setCheckpointStorage("s3://checkpoints");

...

The three existing state backends - MemoryStateBackend, FsStateBackend, and RocksDBStateBackend.- will be deprecated in favor of the new classes. In their JavaDoc and the release notes, we will guide users on migrating to the new API in a compatible way. Because we are using the same internal data structures, users will be able to migrate to the new API trivially, i.e., MemoryStateBackend should be replaced with HashMapStateBackend and JobManagerStorageJobManagerCheckpointStorage.

All flink-conf configurations will be duplicated for the new state backend or snapshot storage instance, whichever is appropriate. Again, no functionality will be added or dropped in this change. Existing flink-conf keys will also be specified as deprecated keys on the new state backends to ease migration.


In particular the following configuration keys will be added:


FileSystemCheckpointStorage:

  • state.snapshot.fs.memory-threshold
    • Deprecated key: state.backend.fs.memory-threshold
  • state.snapshot.fs.write-buffer-size
    • Deprecated key: state.backend.fs.write-buffer-size


All deprecated StateBackends (Memory, Fs, RocksDB) will implement the CheckpointStorage interface to retain full backward compatibility. When a state backend implementing this interface is used, its checkpoint storage methods will precede any other provided configuration. This way, all existing state backend implementations will continue to function as they do today, and users will not see any semantics changes.

...