...
Page properties |
---|
...
|
...
JIRA:
Released:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
We want We want to introduce a built-in retry mechanism for async operator which being as transparently as possible to the user's existing code, so as to flexibly satisfy the retry and exception handling needs for users.
...
Code Block | ||
---|---|---|
| ||
/** * Add an AsyncWaitOperator with an AsyncRetryStrategy to support retry of AsyncFunction. The order to process input records is guaranteed to be the same as * input ones. * * @param in Input {@link DataStream} * @param func {@link AsyncFunction} * @param timeout from first invoke to final completion of asynchronous operation, may include multiple retries, and will be reset in case of failover * @param timeUnit of the given timeout * @param asyncRetryStrategy {@link AsyncRetryStrategy} for {@link AsyncFunction}. * @param <IN> Type of input record * @param <OUT> Type of output record * @return A new {@link SingleOutputStreamOperator}. */ public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWaitWithRetry( DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit, AsyncRetryStrategy<OUT> asyncRetryStrategy) // The unorderedWaitWithRetry method is symmetrical. |
Proposed Changes
The current main processing flow chart (processEelment & Watermark)after retry added:
We tried two implementations:
- based on timer trigger
- based on DelayQueue with a pull fashion
and prefer the timer-based solution, which obtains precise delay control through simple logic and only needs to pay (what we consider to be acceptable) timer instance cost for the retry element.
Comparation of the two implementations:
...
Solution
...
Pros
...
Cons
...
based on timer
...
1. simple
2. precise delay time
...
1. rely on timer service and more timer instances
...
based on DelayQueue
...
1. no timer instance
...
1. rely on java DelayQueue and Slightly more complex
2. Loss of delay time accuracy
3. The cost of polling the DelayQueue
Main changes for AsyncWaitOperator:
- Add a
Set<RetryableResultHandlerDelegator>
(delayQueue) for in-flight retry elements which stay uncompleted in worker queue (delayQueue is not a real queue, just for fast scan and finish retry when endInput) . - Register a timer for delayed retry and add resultHandlerDelegator to delayQueue if matches retry predicate and satisfies the AsyncRetryStrategy
- When retry time is up, will increment currentAttempts & fire a retry for asyncFunc; When async future complete, will cleanup last retry timer & queue entry if needed.
The close phase:
- Finish all in-flight delayed retry elements before operator finish (in endInput), and deny all following retries.
State snapshot and recovery: no changes
How to deal with retry state and recovery?
Currently we choose not to store the retry state, which is the clearest for users and does not require any new changes to the current recovery behavior.
Option | RetryState | Recovery | Pros | Cons |
1. w/o retry state | N/A | start from scratch | simple | may result more attempts and longer time for retry elements |
2. w/ retry state | currentAttemps | currentAttemps | latency first | less attemps when restart took too long, maybe a regression when failure rate is non-strong time-dependent |
3. w/ retry state | currentAttemps | currentAttemps | a compromise, compare to 1 & 2 | if costTime = lastExecTime - firstExecTime, tend to No.1 |
No.2 & No.3 are relatively more complex, and there's a special case (new timeout maybe negative when recovery and need a special process logic) not easy to explain to users clearly.
Changes that users may need to perceive
Note, the new retry feature may result in larger queue capacity requirements, the maximum number can be approximately evaluated as below:
Code Block |
---|
inputRate * retryRate * avgRetryDuration |
For example, for a task with inputRate = 100 records/sec, where 1% of the elements will trigger 1 retry on average, and the average retry time is 60s, the additional queue capacity requirement will be
Code Block |
---|
100 records/sec * 1% * 60s = 60 |
That is, adding more 60 capacity to the work queue may not affect the throughput in unordered output mode , in case of ordered mode, the head element is the key point, and the longer it stays uncompleted, the longer the processing delay provided by the operator, the retry feature may increase the incomplete time of the head element, if in fact more retries are obtained with the same timeout constraint.
When the queue capacity grows(common way to ease the backpressure), the risk of OOM increases. Though in fact, for ListState storage, the theoretical upper limit is Integer.MAX_VALUE, so the queue capacity's limit is the same, but we can't increase the queue capacity too big in production, increase the task parallelism maybe a more viable way.
Compatibility
This change is backwards compatible that jobs can recover from state which was generated by prior flink versions, and if no retry strategy enabled the behavior is as before.
No state changes in AsyncWaitOperator.
Test Plan
Add more test cases in AsyncWaitOperatorTest
Rejected Alternatives
We tried two implementations:
- based on timer trigger (as described above)
- based on DelayQueue with a pull fashion
and prefer the timer-based solution, which obtains precise delay control through simple logic and only needs to pay (what we consider to be acceptable) timer instance cost for the retry element.
Comparation of the two implementations:
Solution | Pros | Cons |
based on timer | 1. simple | 1. rely on timer service and more timer instances |
based on DelayQueue | 1. no timer instance | 1. rely on java DelayQueue and Slightly more complex |
1. based on timer trigger
The main processing flow after retry added:
Main changes for AsyncWaitOperator:
- Add a
Set<RetryableResultHandlerDelegator>
(delayQueue) for in-flight retry elements which stay uncompleted in worker queue (delayQueue is not a real queue, just for fast scan and finish retry when endInput) . - Register a timer for delayed retry and add resultHandlerDelegator to delayQueue if matches retry predicate and satisfies the AsyncRetryStrategy
- When retry time is up, will increment currentAttempts & fire a retry for asyncFunc; When async future complete, will cleanup last retry timer & queue entry if needed.
The close phase:
- Finish all in-flight delayed retry elements before operator finish (in endInput), and deny all following retries.
State snapshot and recovery: no changes
How to deal with retry state and recovery?
Currently we choose not to store the retry state, which is the clearest for users and does not require any new changes to the current recovery behavior.
...
Option
...
RetryState
...
Recovery
...
Pros
...
Cons
...
1. w/o retry state
...
N/A
...
start from scratch
...
simple
...
may result more attempts and longer time for retry elements
...
2. w/ retry state
...
currentAttemps
firstExecTime
...
currentAttemps
new timeout = max{now() - firstExecTime, timeout/maxAttempts}
...
latency first
...
less attemps when restart took too long, maybe a regression when failure rate is non-strong time-dependent
...
3. w/ retry state
...
currentAttemps
costTime
...
currentAttemps
new timeout = timeout - costTime
...
a compromise, compare to 1 & 2
...
if costTime = lastExecTime - firstExecTime, tend to No.1
if costTime = now() - firstExecTime, tend to No.2
No.2 & No.3 are relatively more complex, and there's a special case (new timeout maybe negative when recovery and need a special process logic) not easy to explain to users clearly.
Changes that users may need to perceive
Note, the new retry feature may result in larger queue capacity requirements, the maximum number can be approximately evaluated as below:
Code Block |
---|
inputRate * retryRate * avgRetryDuration |
For example, for a task with inputRate = 100 records/sec, where 1% of the elements will trigger 1 retry on average, and the average retry time is 60s, the additional queue capacity requirement will be
Code Block |
---|
100 records/sec * 1% * 60s = 60 |
That is, adding more 60 capacity to the work queue may not affect the throughput in unordered output mode , in case of ordered mode, the head element is the key point, and the longer it stays uncompleted, the longer the processing delay provided by the operator, the retry feature may increase the incomplete time of the head element, if in fact more retries are obtained with the same timeout constraint.
When the queue capacity grows(common way to ease the backpressure), the risk of OOM increases. Though in fact, for ListState storage, the theoretical upper limit is Integer.MAX_VALUE, so the queue capacity's limit is the same, but we can't increase the queue capacity too big in production, increase the task parallelism maybe a more viable way.
Compatibility
This change is backwards compatible that jobs can recover from state which was generated by prior flink versions, and if no retry strategy enabled the behavior is as before.
No state changes in AsyncWaitOperator.
Test Plan
Add more test cases in AsyncWaitOperatorTest
Rejected Alternatives
For The Interface
Another choice is combining the two interfaces into one:
Code Block | ||||
---|---|---|---|---|
| ||||
@PublicEvolving
public interface AsyncRetryStrategy<T> extends Serializable {
/** whether the next attempt can happen. */
boolean canRetry(int currentAttempts);
/** the delay time of next attempt. */
long getBackoffTimeMillis(int currentAttempts);
/**
* An Optional Java {@Predicate} that defines a condition on asyncFunction's future result which
* will trigger a later reattempt operation, will be called before user's ResultFuture#complete.
*/
default Optional<Predicate<Collection<T>>> resultPredicate() {
return Optional.empty();
}
/**
* An Optional Java {@Predicate} that defines a condition on asyncFunction's exception which
* will trigger a later reattempt operation, will be called before user's
* ResultFuture#completeExceptionally.
*/
default Optional<Predicate<Throwable>> exceptionPredicate() {
return Optional.empty();
}
} |
...
Based on DelayQueue with a pull fashion
...