THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
Status
Current state: Under Discussion
Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Proposal-for-Asynchronous-I-O-in-FLINK-tt13497.html
...
Page properties | ||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
...
|
Google Doc: https://docs.google.com/document/d/1Lr9UYXEz6s6R_3PWg3bZQLF3upGaNEkc0rQCFSzaYDI/edit
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ListenableFuture; public class HBaseAsyncFunction implements AsyncFunction<String, String> { // initialize it while reading object transient Connection connection; @Override public void asyncInvoke(String val, AsyncCollector<String> c) { Get get = new Get(Bytes.toBytes(val)); Table ht = connection.getTable(TableName.valueOf(Bytes.toBytes("test"))); ListenableFuture<Result> future = ht.asyncGet(get); Futures.addCallback(future, new FutureCallback<Result>() { @Override public void onSuccess(Result result) { List ret = new ArrayList<String>(); ret.add(result.get(...)); c.collect(ret); } @Override public void onFailure(Throwable t) { c.collect(t); } }, MoreExecutors.newDirectExecutorService() ); } } // create data stream public void createHBaseAsyncTestStream(StreamExecutionEnvironment env) { DataStream<String> source = getDataStream(env); DataStream<String> stream = AsyncDataStream.unorderedWait(source, new HBaseAsyncFunction()); stream.print(); } |
...
Compatibility, Deprecation, and Migration Plan
...