Discussion threadhttps://lists.apache.org/thread/om4kgd6trx2lctwm6x92q2kdjngxtz9k
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, there is no retry mechanism for downloading and uploading RocksDB state files. Any jittering of remote filesystem might lead to a checkpoint failure. By supporting retry mechanism in RocksDBStateDataTransfer, we can significantly reduce the failure rate of checkpoint during asynchronous phrase.

Public Interfaces

Notice: In order to be consistent with the original behavior, the retry mechanism will be disabled for the recent release.

Option name

Description

Default Value

state.backend.rocksdb.checkpoint.transfer.retry.times

Retry times for data transfer operation, 0 means no retry will be performed. 

0
state.backend.rocksdb.checkpoint.transfer.retry.intervalInterval between each retry operation.1 s


Proposed Changes

  • RocksDBStateDataTransfer will introduce a FixedRetryStrategy and wrapped retry behavior.
  • RocksDBStateUploader  and RocksDBStateDownloader  will reuse the wrapped retry behavior in RocksDBStateDataTransfer。


Code Example:

/** Data transfer base class for {@link RocksDBKeyedStateBackend}. */
class RocksDBStateDataTransfer implements Closeable {

    protected final ExecutorService executorService;

    /** ExecutorService to run data transfer operations that can be retried on exceptions. */
    protected final ScheduledExecutorService retryExecutorService;

    private final int retryTimes;

    private final Duration retryInterval;

    RocksDBStateDataTransfer(int threadNum, int retryTimes, Duration retryInterval) {
        if (threadNum > 1) {
            executorService =
                    Executors.newFixedThreadPool(
                            threadNum, new ExecutorThreadFactory("Flink-RocksDBStateDataTransfer"));
        } else {
            executorService = newDirectExecutorService();
        }

        this.retryTimes = retryTimes;
        this.retryInterval = retryInterval;

        retryExecutorService =
                Executors.newSingleThreadScheduledExecutor(
                        new ExecutorThreadFactory("Flink-RocksDBStateDataTransfer-Retry"));
    }

    @Override
    public void close() {
        executorService.shutdownNow();
        retryExecutorService.shutdownNow();
    }

    protected <T> CompletableFuture<T> retry(CompletableFuture<T> dataTransferOperation) {
        if (this.retryTimes > 0) {
            return FutureUtils.retryWithDelay(
                    () -> dataTransferOperation,
                    new FixedRetryStrategy(retryTimes, retryInterval),
                    throwable -> true,
                    new ScheduledExecutorServiceAdapter(retryExecutorService));
        } else {
            // No retry will be performed.
            return dataTransferOperation;
        }
    }
}


Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?  NO

Test Plan

UT & IT verify that the retry mechanism is working as expected.

Rejected Alternatives


  • No labels