Status
...
...
...
| Jira |
---|
server | ASF JIRA |
---|
serverId | 5aa69414-a9e9-3523-82ec-879b028fb15b |
---|
key | FLINK-4391 |
---|
|
|
---|
|
...
Google Doc: https://docs.google.com/document/d/1Lr9UYXEz6s6R_3PWg3bZQLF3upGaNEkc0rQCFSzaYDI/edit
...
Code Block |
---|
language | java |
---|
theme | Confluence |
---|
title | Example2.java |
---|
linenumbers | true |
---|
|
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ListenableFuture;
public class HBaseAsyncFunction implements AsyncFunction<String, String> {
// initialize it while reading object
transient Connection connection;
@Override
public void asyncInvoke(String val, AsyncCollector<String> c) {
Get get = new Get(Bytes.toBytes(val));
Table ht = connection.getTable(TableName.valueOf(Bytes.toBytes("test")));
ListenableFuture<Result> future = ht.asyncGet(get);
Futures.addCallback(future,
new FutureCallback<Result>() {
@Override public void onSuccess(Result result) {
List ret = new ArrayList<String>();
ret.add(result.get(...));
c.collect(ret);
}
@Override public void onFailure(Throwable t) {
c.collect(t);
}
},
MoreExecutors.newDirectExecutorService()
);
}
}
// create data stream
public void createHBaseAsyncTestStream(StreamExecutionEnvironment env) {
DataStream<String> source = getDataStream(env);
DataStream<String> stream = AsyncDataStream.unorderedWait(source, new HBaseAsyncFunction());
stream.print();
} |
...
Compatibility, Deprecation, and Migration Plan
...