Discussion thread | https://lists.apache.org/thread/om4kgd6trx2lctwm6x92q2kdjngxtz9k |
---|---|
Vote thread | |
JIRA |
|
Release |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, there is no retry mechanism for downloading and uploading RocksDB state files. Any jittering of remote filesystem might lead to a checkpoint failure. By supporting retry mechanism in RocksDBStateDataTransfer, we can significantly reduce the failure rate of checkpoint during asynchronous phrase.
Public Interfaces
Notice: In order to be consistent with the original behavior, the retry mechanism will be disabled for the recent release.
Option name | Description | Default Value |
---|---|---|
state.backend.rocksdb.checkpoint.transfer.retry.times | Retry times for data transfer operation, 0 means no retry will be performed. | 0 |
state.backend.rocksdb.checkpoint.transfer.retry.interval | Interval between each retry operation. | 1 s |
Proposed Changes
RocksDBStateDataTransfer
will introduce aFixedRetryStrategy
and wrapped retry behavior.RocksDBStateUploader
andRocksDBStateDownloader
will reuse the wrapped retry behavior inRocksDBStateDataTransfer。
Code Example:
/** Data transfer base class for {@link RocksDBKeyedStateBackend}. */ class RocksDBStateDataTransfer implements Closeable { protected final ExecutorService executorService; /** ExecutorService to run data transfer operations that can be retried on exceptions. */ protected final ScheduledExecutorService retryExecutorService; private final int retryTimes; private final Duration retryInterval; RocksDBStateDataTransfer(int threadNum, int retryTimes, Duration retryInterval) { if (threadNum > 1) { executorService = Executors.newFixedThreadPool( threadNum, new ExecutorThreadFactory("Flink-RocksDBStateDataTransfer")); } else { executorService = newDirectExecutorService(); } this.retryTimes = retryTimes; this.retryInterval = retryInterval; retryExecutorService = Executors.newSingleThreadScheduledExecutor( new ExecutorThreadFactory("Flink-RocksDBStateDataTransfer-Retry")); } @Override public void close() { executorService.shutdownNow(); retryExecutorService.shutdownNow(); } protected <T> CompletableFuture<T> retry(CompletableFuture<T> dataTransferOperation) { if (this.retryTimes > 0) { return FutureUtils.retryWithDelay( () -> dataTransferOperation, new FixedRetryStrategy(retryTimes, retryInterval), throwable -> true, new ScheduledExecutorServiceAdapter(retryExecutorService)); } else { // No retry will be performed. return dataTransferOperation; } } }
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users? NO
Test Plan
UT & IT verify that the retry mechanism is working as expected.