Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Status

Current state: Under Discussion

...

Page properties


...

JIRA

...

qp9y1k0gldxymzmrso0xgsrwh15n6clc
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-27878

Release1.16


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagejava
titleAsyncRetryStrategy
public interface AsyncRetryStrategy<T>AsyncRetryStrategy<OUT> extends Serializable {

    /** whether the next attempt can happen. */
    boolean canRetry(int currentAttempts);

    /** the delay time of next attempt. */
    long getBackoffTimeMillis(int currentAttempts);

	/** the async retry condition defined by an {@link AsyncRetryPredicate}. */
    AsyncRetryPredicate<T>AsyncRetryPredicate<OUT> getRetryPredicate();
}


Code Block
/** The async retry condition include result {@link Predicate} and exception {@link Predicate}. */
public interface AsyncRetryPredicate<T>AsyncRetryPredicate<OUT> {

    /**
     * An Optional Java {@Predicate} that defines a condition on asyncFunction's future result which
     * will trigger a later reattempt operation, will be called before user's ResultFuture#complete.
     */
    Optional<Predicate<Collection<T>>>Optional<Predicate<Collection<OUT>>> resultPredicate();

    /**
     * An Optional Java {@Predicate} that defines a condition on asyncFunction's exception which
     * will trigger a later reattempt operation, will be called before user's
     * ResultFuture#completeExceptionally.
     */
    Optional<Predicate<Throwable>> exceptionPredicate();
}

...

New method in AsyncDataStream

Code Block
languagejavatitleAsyncRetryStrategy
@PublicEvolving
public interface AsyncRetryStrategy<T> extends Serializable {

/**
     /** whetherAdd thean nextAsyncWaitOperator attemptwith canan happen. */
    boolean canRetry(int currentAttempts);

    /** the delay time of next attempt. */AsyncRetryStrategy to support retry of AsyncFunction. The order to process input records is guaranteed to be the same as
    long getBackoffTimeMillis(int currentAttempts);

 * input ones.
     /**
     * An@param Optionalin JavaInput {@Predicate@link DataStream}
 that defines a condition on* asyncFunction's@param futurefunc result{@link whichAsyncFunction}
     * will trigger a later reattempt operation, @param timeout from first invoke to final completion of asynchronous operation, may include multiple retries, and will be reset calledin beforecase user's ResultFuture#complete.of failover
     */
 @param timeUnit of defaultthe Optional<Predicate<Collection<T>>> resultPredicate() {
        return Optional.empty();
    }

    /**given timeout
	 * @param asyncRetryStrategy {@link AsyncRetryStrategy} for {@link AsyncFunction}.
     * @param <IN> Type of input record
     * An Optional Java {@Predicate} that defines a condition on asyncFunction's exception which @param <OUT> Type of output record
     * @return A new {@link SingleOutputStreamOperator}.
     */
 will trigger a laterpublic reattemptstatic operation<IN, willOUT> be called before user'sSingleOutputStreamOperator<OUT> orderedWaitWithRetry(
     * ResultFuture#completeExceptionally.
     */
 DataStream<IN> in, AsyncFunction<IN, defaultOUT> Optional<Predicate<Throwable>> exceptionPredicate() {
    func, long timeout, TimeUnit timeUnit, AsyncRetryStrategy<OUT> asyncRetryStrategy)


    return Optional.empty();
    }
}

Proposed Changes

The current processing flow chart (processEelment & Watermark):

Image Removed

Tried two implementations:

  1. based on timer trigger (Poc1: https://github.com/lincoln-lil/flink/pull/new/async-retry-timer)
  2. based on DelayQueue with a pull fashion (Poc2: https://github.com/lincoln-lil/flink/pull/new/async-op-retry)

we prefer the timer-based solution, which obtains precise delay control through simple logic and only needs to pay (what we consider to be acceptable) timer instance cost for the retry element. 

Comparation of the two implementations:

...

Solution

...

Pros

...

Cons

...

based on timer

...

1. simple
2. precise delay time

...

1. rely on timer service and more timer instances

...

based on DelayQueue

...

1. no timer instance

...

1. rely on java DelayQueue and Slightly more complex
2. Loss of delay time accuracy
3. The cost of polling the DelayQueue

// The unorderedWaitWithRetry method is symmetrical.


Proposed Changes

The

1. based on timer trigger

The main processing flow after retry added:

...

  1. Add a Set<RetryableResultHandlerDelegator>(delayQueue)  for in-flight retry elements which stay uncompleted in worker queue  (delayQueue  is not a real queue, just for fast scan and finish retry when endInput) .
  2. Register a timer for delayed retry and add resultHandlerDelegator to delayQueue if matches retry predicate and satisfies the AsyncRetryStrategy
  3. When retry time is up,  will increment currentAttempts & fire a retry for asyncFunc; When async future complete, will cleanup last retry timer & queue entry if needed.

...

State snapshot and recovery: no changes

2.  based on DelayQueue with a pull fashion

The main difference between solution 1. is not rely on the timer service and use a java.concurrent.DelayQueue to save toRetry items (Make StreamRecordQueueEntry retryable by implements Delayed Interface).
And check if any delayed retry element expires and do retry (both in processElement and processWatermark, yes the retry is only triggered by new inputs or watermark, this sacrifice some trigger time precision and some queue check cost, but no new thread here and can reduce timer instances)

The operator open-close and state snapshot-recovery phase are almost the same.

How to deal with retry state and recovery?

Currently we choose not to store the retry state, which is the clearest for users and does not require any new changes to the current recovery behavior.

Option

RetryState

Recovery

Pros

Cons

1. w/o retry state

N/A

start from scratch

simple

may result more attempts and longer time for retry elements

2. w/ retry state

currentAttemps
firstExecTime

currentAttemps
new timeout = max{now() - firstExecTime, timeout/maxAttempts}

latency first

less attemps when restart took too long, maybe a regression when failure rate is non-strong time-dependent

3. w/ retry state

currentAttemps
costTime

currentAttemps
new timeout = timeout - costTime

a compromise, compare to 1 & 2

if costTime = lastExecTime - firstExecTime, tend to No.1
if costTime = now() - firstExecTime, tend to No.2

No.2 & No.3 are relatively more complex, and there's a special case (new timeout maybe negative when recovery and need a special process logic) not easy to explain to users clearly.

...

Add more test cases in  AsyncWaitOperatorTest

Rejected Alternatives

We tried two implementations:

  1. based on timer trigger (as described above)
  2. based on DelayQueue with a pull fashion 

and prefer the timer-based solution, which obtains precise delay control through simple logic and only needs to pay (what we consider to be acceptable) timer instance cost for the retry element. 

Comparation of the two implementations:

Solution

Pros

Cons

based on timer

1. simple
2. precise delay time

1. rely on timer service and more timer instances

based on DelayQueue

1. no timer instance

1. rely on java DelayQueue and Slightly more complex
2. Loss of delay time accuracy
3. The cost of polling the DelayQueue

Based on DelayQueue with a pull fashion

The main difference between solution 1. is not rely on the timer service and use a java.concurrent.DelayQueue to save toRetry items (Make StreamRecordQueueEntry retryable by implements Delayed Interface).
And check if any delayed retry element expires and do retry (both in processElement and processWatermark, yes the retry is only triggered by new inputs or watermark, this sacrifice some trigger time precision and some queue check cost, but no new thread here and can reduce timer instances)

The operator open-close and state snapshot-recovery phase are almost the same.

Introduce a memory safe external queue to replace the worker/delay queue

As shown in the processing flow chart above, a delay queue was added to support delayed retry operation. We considered several implementations of the delayed queue and make choice based on two questions:

...