Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current stateUnder Discussion

Discussion threadhere (<- link to http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Proposal-for-Asynchronous-I-O-in-FLINK-tt13497.html)

JIRAhere (<- link to 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-4391
)

Released: Release this feature as soon as possible

...

  • arriving at AsyncWaitOperator
  • recovering from task failover
  • snapshotting state
  • being emitted by Emitter Thread

Image Added

Sequence Diagram

Image Added

AsyncFunction

AsyncFunction works as a user function in AsyncWaitOperator, which looks like StreamFlatMap operator, having open()/processElement(StreamRecord<IN> record)/processWatermark(Watermark mark).

For user’s concrete AsyncFunction, the asyncInvoke(IN input, AsyncCollector<OUT> collector) has to be overriden to supply codes to start an async operation.

Code Block
languagejava
themeConfluence
titleAsyncFunction.java
linenumberstrue
public interface AsyncFunction<IN, OUT> extends Function, Serializable {
  /**
   * Trigger async operation for each stream input.
   * The AsyncCollector should be registered into async client.
   *
   * @param input Stream Input
   * @param collector AsyncCollector
   */
  void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception;
}

For each input stream record of AsyncWaitOperator, they will be processed by AsyncFunction.asyncInvoke(IN input, AsyncCollector<OUT> cb). Then AsyncCollector will be append into AsyncCollectorBuffer. We will cover AsyncCollector and AsyncCollectorBuffer later.

 

AsyncCollector

 

AsyncCollector is created by AsyncWaitOperator, and passed into AsyncFunction, where it should be added into user’s callback. It acts as a role to get results or errors from user codes and notify the AsyncCollectorBuffer to emit results.

The functions specific for the user is the collect, and they should be called when async operation is done or errors are thrown out.

 

 

Code Block
languagejava
themeConfluence
titleAsyncCollector.java
linenumberstrue
public class AsyncCollector<OUT> {
  private List<OUT> result;
  private Throwable error;
  private AsyncCollectorBuffer<OUT> buffer;

  /**
   * Set result
   * @param result A list of results.
   */
  public void collect(List<OUT> result) { 
    this.result = result;
    buffer.mark(this);
  }

  /**
   * Set error
   * @param error A Throwable object.
   */
  public void collect(Throwable error) {
    this.error = error;
    buffer.mark(this);
  }

  /**
   * Get result. Throw RuntimeException while encountering an error.
   * @return A List of result.
   * @throws RuntimeException RuntimeException wrapping errors from user codes.
   */
  public List<OUT> getResult() throws RuntimeException { ... }
}

 

 

Compatibility, Deprecation, and Migration Plan

...