...
Current state: Under Discussion
Discussion thread: here (<- link to http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Proposal-for-Asynchronous-I-O-in-FLINK-tt13497.html)
JIRA: here (<- link to
) Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key FLINK-4391
Released: Release this feature as soon as possible
...
- arriving at AsyncWaitOperator
- recovering from task failover
- snapshotting state
- being emitted by Emitter Thread
Sequence Diagram
AsyncFunction
AsyncFunction works as a user function in AsyncWaitOperator, which looks like StreamFlatMap operator, having open()/processElement(StreamRecord<IN> record)/processWatermark(Watermark mark).
For user’s concrete AsyncFunction, the asyncInvoke(IN input, AsyncCollector<OUT> collector) has to be overriden to supply codes to start an async operation.
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
public interface AsyncFunction<IN, OUT> extends Function, Serializable {
/**
* Trigger async operation for each stream input.
* The AsyncCollector should be registered into async client.
*
* @param input Stream Input
* @param collector AsyncCollector
*/
void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception;
} |
For each input stream record of AsyncWaitOperator, they will be processed by AsyncFunction.asyncInvoke(IN input, AsyncCollector<OUT> cb). Then AsyncCollector will be append into AsyncCollectorBuffer. We will cover AsyncCollector and AsyncCollectorBuffer later.
AsyncCollector
AsyncCollector is created by AsyncWaitOperator, and passed into AsyncFunction, where it should be added into user’s callback. It acts as a role to get results or errors from user codes and notify the AsyncCollectorBuffer to emit results.
The functions specific for the user is the collect, and they should be called when async operation is done or errors are thrown out.
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
public class AsyncCollector<OUT> {
private List<OUT> result;
private Throwable error;
private AsyncCollectorBuffer<OUT> buffer;
/**
* Set result
* @param result A list of results.
*/
public void collect(List<OUT> result) {
this.result = result;
buffer.mark(this);
}
/**
* Set error
* @param error A Throwable object.
*/
public void collect(Throwable error) {
this.error = error;
buffer.mark(this);
}
/**
* Get result. Throw RuntimeException while encountering an error.
* @return A List of result.
* @throws RuntimeException RuntimeException wrapping errors from user codes.
*/
public List<OUT> getResult() throws RuntimeException { ... }
} |
Compatibility, Deprecation, and Migration Plan
...