...
- Add a
Set<RetryableResultHandlerDelegator>
(delayQueue) for in-flight retry elements which stay uncompleted in worker queue. - Add 3 attributes (currentAttempts, startTime, nextBackOffTime) to StreamRecordQueueEntry for retry state.
- Register a timer for delayed retry and add resultHandlerDelegator to delayQueue if matches retry predicate and satisfies the AsyncRetryStrategy
- When retry time is up, will increment currentAttempts & fire a retry for asyncFunc; When async future complete, will cleanup last retry timer & queue entry if needed.
...
State snapshot and recovery:
- When snapshot, write all queue entry with current retry status (currentAttempts, startTime, nextBackOffTime) to the state
- When recover from state, restore retry status and fire a new process if necessary
no changes
2. based on DelayQueue with a pull fashion
...
The operator open-close and state snapshot-recovery phase are almost the same.
How to deal with retry state and recovery?
Currently we choose not to store the retry state, which is the clearest for users and does not require any new changes to the current recovery behavior.
Option | RetryState | Recovery | Pros | Cons |
1. w/o retry state | N/A | start from scratch | simple | may result more attempts and longer time for retry elements |
2. w/ retry state | currentAttemps | currentAttemps | latency first | less attemps when restart took too long, maybe a regression when failure rate is non-strong time-dependent |
3. w/ retry state | currentAttemps | currentAttemps | a compromise, compare to 1 & 2 | if costTime = lastExecTime - firstExecTime, tend to No.1 |
No.2 & No.3 are relatively more complex, and there's a special case (new timeout maybe negative when recovery and need a special process logic) not easy to explain to users clearly.
Changes that users may need to perceive
...
This change is backwards compatible that jobs can recover from state which was generated by prior flink versions, and if no retry strategy enabled the behavior is as before.
A new list state with async attempt status was added in AsyncWaitOperator to restore the retry after recoveryNo state changes in AsyncWaitOperator.
Test Plan
Add more test cases in AsyncWaitOperatorTest
...