Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Proposal-for-Asynchronous-I-O-in-FLINK-tt13497.html

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-4391

Released: Release this feature as soon as possible

Google Dochttps://docs.google.com/document/d/1Lr9UYXEz6s6R_3PWg3bZQLF3upGaNEkc0rQCFSzaYDI/edit#edit

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Emitter Thread: A working thread in AsyncCollectorBuffer, being signalled while some of AsyncCollectors have finished async i/o and emitting results to the following opeartors. 

Public Interfaces

A helper class, named AsyncDataStream, is added to provide two methods to add AsyncFunction, which will do async i/o operation, into FLINK streaming job.

Code Block
languagejava
themeConfluence
titleAsyncDataStream.java
linenumberstrue
public class AsyncDataStream {
 /**
  * Add an AsyncWaitOperator. The order of output stream records may be reordered.
  *
  * @param func AsyncWaitFunction
  * @return A new DataStream.
  */
 public static DataStream<OUT> unorderedWait(DataStream<IN>, AsyncWaitFunction<IN, OUT> func);

 /**
  * Add an AsyncWaitOperator. The order of output stream records is guaranteed to be the same as input ones.
  *
  * @param func AsyncWaitFunction
  * @return A new DataStream.
  */
 public static DataStream<OUT> orderedWait(DataStream<IN>, AsyncWaitFunction<IN, OUT> func);
}

Proposed Changes

Overview

The following diagram illustrates how the streaming records are processed while

  • arriving at AsyncWaitOperator
  • recovering from task failover
  • snapshotting state
  • being emitted by Emitter Thread

Sequence Diagram

Image Modified

AsyncFunction

AsyncFunction works as a user function in AsyncWaitOperator , which looks like StreamFlatMap operator, having open()/processElement(StreamRecord<IN> record)/processWatermark(Watermark mark).

...

For each input stream record of AsyncWaitOperator, they will be processed by AsyncFunction.asyncInvoke(IN input, AsyncCollector<OUT> cb). Then AsyncCollector will be append into AsyncCollectorBuffer. We will cover AsyncCollector and AsyncCollectorBuffer later. 

...

AsyncCollector is created by AsyncWaitOperator, and passed into AsyncFunction, where it should be added into user’s callback. It acts as a role to get results or errors from user codes and notify the AsyncCollectorBuffer to emit results.

 The functions specific for the user is the collect, and they should be called when async operation is done or errors are thrown out.

...

Before calling AsyncFunction.asyncInvoke(IN input, AsyncCollector<OUT> collector), AsyncWaitOperator will try to get an instance of AsyncCollector from AsyncCollectorBuffer. Then it will be taken into user’s callback function. If the buffer is full, it will wait until some of ongoing callbacks has finished.

 Once async operation has done, the AsyncCollector.collect() will take results or errors and AsyncCollectorBuffer will be notified.

AsyncCollector is implemented by FLINK.

 AsyncCollectorBuffer

AsyncCollectorBuffer keeps all AsyncCollectors, and emit results to the next nodes.

When AsyncCollector.collect() is called, a mark will be placed in AsyncCollectorBuffer, indicating finished AsyncCollectors. A working thread, named Emitter, will also be signalled once a AsyncCollector gets results, and then try to emit results depending on the ordered or unordered setting.

...

If the first task in the buffer is finished, then Emitter will collect its results, and then proceed to the second task. If the first task is not finished yet, just wait for it again.

  • Unordered Mode

...

The Emitter Thread and Task Thread will access exclusively by acquiring/releasing the lock.

Signal Task Thread when all tasks have finished to notify it that all data has been processed, and it is OK to close the operator.

Signal Task Thread after removing some tasks from the buffer.

Propagate Exceptions to Task Thread.

 Task Thread

Access AsyncCollectorBuffer exclusively against the Emitter Thread.

...

All input StreamRecords will be kept in state. Instead of storing each input stream records into state one by one while processing, AsyncWaitOperator will put all input stream records in AsyncCollectorBuffer into state while snapshotting operator state, which state. Old data in the state will be cleared before persisting those records.

...