You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion

Discussion thread: https://mail-archives.apache.org/mod_mbox/flink-dev/

JIRA

Released: 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

We want to introduce a built-in retry mechanism for async operator which being as transparently as possible to the user's existing code, so as to flexibly satisfy the retry and exception handling needs for users.

Currently, Flink provides the AsyncI/O API to facilitate users to handle time-consuming I/O operations at lower cost and asynchronously to improve the throughput and response time of the program, but limited on retry support: the user interface AsyncFunction only provides an one-time callback handle, there is no good way to fire a retry based on asynchronous results(e.g. empty results or errors that asyncClient cannot handle, etc.).

Consider such an use case: user program does access external data via asyncI/O, some data may not be updated in external systems in a timely manner(offen happens due to the lack of a global coordinator),  and the user want do delayed retry when encounter such unexpected 'missed' data, but can't elegantly implement through the current AsyncFunction (asyncInvoke and callback functions are executed synchronously by the main thread, which is not suitable adding long time blocking operations, and introducing additional thread will bring extra complexity for users).

Another common user feedback we've observed is about the async TimeoutException("Async function call has timed out.") when encountering burst high workload or node failure in the external system, users can only turn up the async client's internal retry times for higher fault tolerance, and make sure the total timeout parameter is lower than async function's timeout param so as to avoid job failover whenever possible. When the user wants to do retry based on specific exception, it is not easy to do so.

Public Interfaces

Use Case Via Current AsyncI/O API Without Retry

An example HBase AsyncFunction:

{code}

public class HBaseAsyncFunc implements AsyncFunction<String, String> {

  public void asyncInvoke(String row, ResultFuture<String> result) throws Exception {
    HBaseCallback cb = new HBaseCallback(result);
    Get get = new Get(Bytes.toBytes(row));
    hbase.asyncGet(get, cb);
  }
}

{code}

Create an async operation:

{code}

 DataStream<String> input = ...
 AsyncFunction<String, Tuple<String, String>> asyncFunc = new HBaseAsyncFunc...
 AsyncDataStream.orderedWait(input, asyncFunc, timeout, TimeUnit.MILLISECONDS, 100);

{code}


Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? 
  • If we are changing behavior how will we phase out the older behavior? 
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels