Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Add a Set<RetryableResultHandlerDelegator>(delayQueue)  for in-flight retry elements which stay uncompleted in worker queue.
  2. Add 3 attributes (currentAttempts, startTime, nextBackOffTime) to StreamRecordQueueEntry for retry state.
  3. Register a timer for delayed retry and add resultHandlerDelegator to delayQueue if matches retry predicate and satisfies the AsyncRetryStrategy
  4. When retry time is up,  will increment currentAttempts & fire a retry for asyncFunc; When async future complete, will cleanup last retry timer & queue entry if needed.

...

State snapshot and recovery:

  1. When snapshot, write all queue entry with current retry status (currentAttempts, startTime, nextBackOffTime) to the state
  2. When recover from state, restore retry status and fire a new process if necessary

no changes

2.  based on DelayQueue with a pull fashion

...

The operator open-close and state snapshot-recovery phase are almost the same.

How to deal with retry state and recovery?

Currently we choose not to store the retry state, which is the clearest for users and does not require any new changes to the current recovery behavior.


Option

RetryState

Recovery

Pros

Cons

1. w/o retry state

N/A

start from scratch

simple

may result more attempts and longer time for retry elements

2. w/ retry state

currentAttemps
firstExecTime

currentAttemps
new timeout = max{now() - firstExecTime, timeout/maxAttempts}

latency first

less attemps when restart took too long, maybe a regression when failure rate is non-strong time-dependent

3. w/ retry state

currentAttemps
costTime

currentAttemps
new timeout = timeout - costTime

a compromise, compare to 1 & 2

if costTime = lastExecTime - firstExecTime, tend to No.1
if costTime = now() - firstExecTime, tend to No.2

No.2 & No.3 are relatively more complex, and there's a special case (new timeout maybe negative when recovery and need a special process logic) not easy to explain to users clearly.

Changes that users may need to perceive

...

This change is backwards compatible that jobs can recover from state which was generated by prior flink versions, and if no retry strategy enabled the behavior is as before.
A new list state with async attempt status was added in AsyncWaitOperator to restore the retry after recoveryNo state changes in AsyncWaitOperator.

Test Plan

Add more test cases in  AsyncWaitOperatorTest

...