THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
While restoring the operator’s state, the operator will scan all elements in the state, get AsyncCollectors, call AsyncFunction.asyncInvoke() and insert them back into AsyncCollectorBuffer.
Example
...
For callback
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
/*** For callback ***/ public class HBaseAsyncFunction implements AsyncFunction<String, String> { // initialize it while reading object transient Connection connection; @Override public void asyncInvoke(String val, AsyncCollector<String> c) { Get get = new Get(Bytes.toBytes(val)); Table ht = connection.getTable(TableName.valueOf(Bytes.toBytes("test"))); // UserCallback is from user’s async client. ((AsyncableHTableInterface) ht).asyncGet(get, new UserCallback(c)); } } // create data stream public void createHBaseAsyncTestStream(StreamExecutionEnvironment env) { DataStream<String> source = getDataStream(env); DataStream<String> stream = AsyncDataStream.unorderedWait(source, new HBaseAsyncFunction()); stream.print(); } |
For ListenableFuture
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
/*** For ListenableFuture ***/ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ListenableFuture; public class HBaseAsyncFunction implements AsyncFunction<String, String> { // initialize it while reading object transient Connection connection; @Override public void asyncInvoke(String val, AsyncCollector<String> c) { Get get = new Get(Bytes.toBytes(val)); Table ht = connection.getTable(TableName.valueOf(Bytes.toBytes("test"))); ListenableFuture<Result> future = ht.asyncGet(get); Futures.addCallback(future, new FutureCallback<Result>() { @Override public void onSuccess(Result result) { List ret = new ArrayList<String>(); ret.add(result.get(...)); c.collect(ret); } @Override public void onFailure(Throwable t) { c.collect(t); } }, MoreExecutors.newDirectExecutorService() ); } } // create data stream public void createHBaseAsyncTestStream(StreamExecutionEnvironment env) { DataStream<String> source = getDataStream(env); DataStream<String> stream = AsyncDataStream.unorderedWait(source, new HBaseAsyncFunction()); stream.print(); } |
...