Status
Current state: Under Discussion
Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Proposal-for-Asynchronous-I-O-in-FLINK-tt13497.html
...
Page properties | ||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
...
|
Google Doc: https://docs.google.com/document/d/1Lr9UYXEz6s6R_3PWg3bZQLF3upGaNEkc0rQCFSzaYDI/edit
...
While restoring the operator’s state, the operator will scan all elements in the state, get AsyncCollectors, call AsyncFunction.asyncInvoke() and insert them back into AsyncCollectorBuffer.
Example
Here is an sample code to use Async I/O in FLINK streaming.
Notes
Async Resource Sharing
For the case to share async resources(like connection to hbase, netty connections) among different slots(task workers) in the same TaskManager(a.k.a the same JVM), we can make the connection static so that all threads in the same process can share the same instance.
Of course, please pay attention to thread safety while using those resources.
Example
For callback
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
/*** ***/
public class HBaseAsyncFunction implements AsyncFunction<String, String> {
// initialize it while reading object
transient Connection connection;
@Override
public void asyncInvoke(String val, AsyncCollector<String> c) {
Get get = new Get(Bytes.toBytes(val));
Table ht = connection.getTable(TableName.valueOf(Bytes.toBytes("test")));
// UserCallback is from user’s async client.
((AsyncableHTableInterface) ht).asyncGet(get, new UserCallback(c));
}
}
// create data stream
public void createHBaseAsyncTestStream(StreamExecutionEnvironment env) {
DataStream<String> source = getDataStream(env);
DataStream<String> stream = AsyncDataStream.unorderedWait(source, new HBaseAsyncFunction());
stream.print();
} |
...
For ListenableFuture
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ListenableFuture;
public class HBaseAsyncFunction implements AsyncFunction<String, String> {
// initialize it while reading object
transient Connection connection;
@Override
public void asyncInvoke(String val, AsyncCollector<String> c) {
Get get = new Get(Bytes.toBytes(val));
Table ht = connection.getTable(TableName.valueOf(Bytes.toBytes("test")));
ListenableFuture<Result> future = ht.asyncGet(get);
Futures.addCallback(future,
new FutureCallback<Result>() {
@Override public void onSuccess(Result result) {
List ret = new ArrayList<String>();
ret.add(result.get(...));
c.collect(ret);
}
@Override public void onFailure(Throwable t) {
c.collect(t);
}
},
MoreExecutors.newDirectExecutorService()
);
}
}
// create data stream
public void createHBaseAsyncTestStream(StreamExecutionEnvironment env) {
DataStream<String> source = getDataStream(env);
DataStream<String> stream = AsyncDataStream.unorderedWait(source, new HBaseAsyncFunction());
stream.print();
} |
Compatibility, Deprecation, and Migration Plan
...