You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

Status

Current state: Under Discussion

Discussion thread: https://lists.apache.org/thread/pgm3bf8vd5vqchlm29n6cro0gz4pbd3g

JIRA

Released: 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

We want to introduce a built-in retry mechanism for async operator which being as transparently as possible to the user's existing code, so as to flexibly satisfy the retry and exception handling needs for users.

Currently, Flink provides the Async I/O API to facilitate users to handle time-consuming I/O operations at lower cost and asynchronously to improve the throughput and response time of the program, but limited on retry support: the user interface AsyncFunction only provides an one-time callback handle, there is no good way to fire a retry based on asynchronous results(e.g. empty results or errors that asyncClient cannot handle, etc.).

Consider such an use case: user program does access external data via Async I/O, some data may not be updated in external systems in a timely manner(offen happens due to the lack of a global coordinator),  and the user want do delayed retry when encounter such unexpected 'missed' data, but can't elegantly implement through the current AsyncFunction (asyncInvoke and callback functions are executed synchronously by the main thread, which is not suitable adding long time blocking operations, and introducing additional thread will bring extra complexity for users).

Another common user feedback we've observed is about the async TimeoutException("Async function call has timed out.") when encountering burst high workload or node failure in the external system, users can only turn up the async client's internal retry times for higher fault tolerance, and make sure the total timeout parameter is lower than async function's timeout param so as to avoid job failover whenever possible. When the user wants to do retry based on specific exception, it is not easy to do so.

Public Interfaces

Use Case Via Current Async I/O API Without Retry

An example HBase AsyncFunction:

HBaseAsyncFunc
public class HBaseAsyncFunc implements AsyncFunction<String, String> {

  public void asyncInvoke(String row, ResultFuture<String> result) throws Exception {
    HBaseCallback cb = new HBaseCallback(result);
    Get get = new Get(Bytes.toBytes(row));
    hbase.asyncGet(get, cb);
  }
}

Create an async operation:

 DataStream<String> input = ...
 AsyncFunction<String, Tuple<String, String>> asyncFunc = new HBaseAsyncFunc...
 AsyncDataStream.orderedWait(input, asyncFunc, timeout, TimeUnit.MILLISECONDS, 100);

New API With Retry Support

The Same Use Case With Retry

Reuse the same user implemented HBaseAsyncFunc

DataStream<String> input = ...
// User's  HBaseAsyncFunc doesn't change.
AsyncFunction<String, Tuple<String, String>> asyncFunc = new HBaseAsyncFunc...
 
// Create an async retry strategy via utility class or a user defined strategy.
AsyncRetryStrategy asyncRetryStrategy =
                new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms
                        .retryIfResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
  						.retryIfException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
                        .build();

// ordered output mode
AsyncDataStream.orderedWaitWithRetry(input, asyncFunc, timeout, TimeUnit.MILLISECONDS, 100, asyncRetryStrategy);
// or use unordered output mode


Note: timeout here is the total timeout, including all retries. Why not introduce another totalTimeout?  The answer is that we don't want to confuse the user with two timeouts.
While two timeouts seem to be able to control the timeout for each retry so as to prevent one execution from getting stuck and causing the entire execution to wait until it eventually times out, in fact in most cases when a  stuck occurs, the retry result is still the same(we admit this is not always true), and we think this choice is no worse than it is now while keeping the api simple for the users.

New Interface

The AsyncRetryStrategy and related interface AsyncRetryPredicate to define the trigger condition for retry and when should retry:

AsyncRetryStrategy
public interface AsyncRetryStrategy<T> extends Serializable {

    /** whether the next attempt can happen. */
    boolean canRetry(int currentAttempts);

    /** the delay time of next attempt. */
    long getBackoffTimeMillis();

	/** the async retry condition defined by an {@link AsyncRetryPredicate}. */
    AsyncRetryPredicate<T> getRetryPredicate();
}


/** The async retry condition include result {@link Predicate} and exception {@link Predicate}. */
public interface AsyncRetryPredicate<T> {

    /**
     * An Optional Java {@Predicate} that defines a condition on asyncFunction's future result which
     * will trigger a later reattempt operation, will be called before user's ResultFuture#complete.
     */
    Optional<Predicate<Collection<T>>> resultPredicate();

    /**
     * An Optional Java {@Predicate} that defines a condition on asyncFunction's exception which
     * will trigger a later reattempt operation, will be called before user's
     * ResultFuture#completeExceptionally.
     */
    Optional<Predicate<Throwable>> exceptionPredicate();
}


Another choice is combining the two into one:

AsyncRetryStrategy
@PublicEvolving
public interface AsyncRetryStrategy<T> extends Serializable {

    /** whether the next attempt can happen. */
    boolean canRetry(int currentAttempts);

    /** the delay time of next attempt. */
    long getBackoffTimeMillis();

    /**
     * An Optional Java {@Predicate} that defines a condition on asyncFunction's future result which
     * will trigger a later reattempt operation, will be called before user's ResultFuture#complete.
     */
    default Optional<Predicate<Collection<T>>> resultPredicate() {
        return Optional.empty();
    }

    /**
     * An Optional Java {@Predicate} that defines a condition on asyncFunction's exception which
     * will trigger a later reattempt operation, will be called before user's
     * ResultFuture#completeExceptionally.
     */
    default Optional<Predicate<Throwable>> exceptionPredicate() {
        return Optional.empty();
    }
}


Proposed Changes

The current processing flow chart (processEelment & Watermark):

Tried two implementations:

  1. based on timer trigger (Poc1: https://github.com/lincoln-lil/flink/pull/new/async-retry-timer)
  2. based on DelayQueue with a pull fashion (Poc2: https://github.com/lincoln-lil/flink/pull/new/async-op-retry)


we prefer the timer-based solution, which obtains precise delay control through simple logic and only needs to pay (what we consider to be acceptable) timer instance cost for the retry element. 

Comparation of the two implementations:

Solution

Pros

Cons

based on timer

1. simple
2. precise delay time

1. rely on timer service and more timer instances

based on DelayQueue

1. no timer instance

1. rely on java DelayQueue and Slightly more complex
2. Loss of delay time accuracy
3. The cost of polling the DelayQueue

1. based on timer trigger

The main processing flow after retry added:


Main changes for AsyncWaitOperator:

  1. Add a Set<RetryableResultHandlerDelegator>(delayQueue)  for in-flight retry elements which stay uncompleted in worker queue.
  2. Add 3 attributes (currentAttempts, startTime, nextBackOffTime) to StreamRecordQueueEntry for retry state.
  3. Register a timer for delayed retry and add resultHandlerDelegator to delayQueue if matches retry predicate and satisfies the AsyncRetryStrategy
  4. When retry time is up,  will fire a retry for asyncFunc; When async future complete, will cleanup last retry timer & queue entry if needed.


The close phase:

  • Finish all in-flight delayed retry elements before operator finish (in endInput), and deny all following retries.


State snapshot and recovery:

  1. When snapshot, write all queue entry with current retry status (currentAttempts, startTime, nextBackOffTime) to the state
  2. When recover from state, restore retry status and fire a new process if necessary

2.  based on DelayQueue with a pull fashion

The main difference between solution 1. is not rely on the timer service and use a java.concurrent.DelayQueue to save toRetry items (Make StreamRecordQueueEntry retryable by implements Delayed Interface).
And check if any delayed retry element expires and do retry (both in processElement and processWatermark, yes the retry is only triggered by new inputs or watermark, this sacrifice some trigger time precision and some queue check cost, but no new thread here and can reduce timer instances)

The operator open-close and state snapshot-recovery phase are almost the same.

Changes that users may need to perceive

Note, the new retry feature may result in larger queue capacity requirements, the maximum number can be approximately evaluated as below:

inputRate * retryRate * avgRetryDuration

For example, for a task with inputRate = 100 records/sec, where 1% of the elements will trigger 1 retry on average, and the average retry time is 60s, the additional queue capacity requirement will be

100 records/sec * 1% * 60s = 60

That is, adding more 60 capacity to the work queue may not affect the throughput in unordered output mode , in case of ordered mode, the head element is the key point, and the longer it stays uncompleted, the longer the processing delay provided by the operator, the retry feature may increase the incomplete time of the head element, if in fact more retries are obtained with the same timeout constraint.

When the queue capacity grows(common way to ease the backpressure), the risk of OOM increases. Though in fact, for ListState storage, the theoretical upper limit is Integer.MAX_VALUE, so the queue capacity's limit is the same, but we can't increase the queue capacity too big in production, increase the task parallelism maybe a more viable way.

Compatibility

This change is backwards compatible that jobs can recover from state which was generated by prior flink versions, and if no retry strategy enabled the behavior is as before.
A new list state with async attempt status was added in AsyncWaitOperator to restore the retry after recovery.

Test Plan

Add more test cases in  AsyncWaitOperatorTest

Rejected Alternatives

Introduce a memory safe external queue to replace the worker/delay queue

As shown in the processing flow chart above, a delay queue was added to support delayed retry operation. We considered several implementations of the delayed queue and make choice based on two questions:

1. should the delayed queue store separate elements, as opposed to the worker queue?

2. should the delayed queue support a huge capacity (e.g., a memory safe external persistent queue, no oom-risk)?

For question 1:

If the element to be retried is removed from the worker queue and put into a separate delay queue, this becomes complicated in ordered output mode, requiring additional maintenance of the original order of the inputs, and in the case of multiple retries, the same element is removed from the queue and put in multiple times. When do snapshot state, the worker queue and delay queue should be sync to the state together (and with the original input order in ordered mode). All the complexity seems to be of no further benefit than the logically separated queues themselves.

For question 2:

As illustrate above, the additional queue capacity requirement may grows large

inputRate * retryRate * avgRetryDuration

for high inputRate and/or high retryRate and/or average retry duration, so it may exist oom-risk when the delayed retry queue must be set to very large number, however, this can be mitigated by increasing task parallelism.
In contrast, implementing a detached delay queue via an external persistence system (e.g., Redission's RDelayedQueue, Rabbitmq and Kafka ...) does not have such oom-risk, but will bring extra dependency and more cost.





  • No labels