Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
themeConfluence
titleExample.java
linenumberstrue
/*** For callback ***/
public class HBaseAsyncFunction implements AsyncFunction<String, String> {
  // initialize it while reading object
  transient Connection connection;

  @Override
  public void asyncInvoke(String val, AsyncCollector<String> c) {
    Get get = new Get(Bytes.toBytes(val));
    Table ht = connection.getTable(TableName.valueOf(Bytes.toBytes("test")));
    // UserCallback is from user’s async client.
    ((AsyncableHTableInterface) ht).asyncGet(get, new UserCallback(c));
  }
}

// create data stream
public void createHBaseAsyncTestStream(StreamExecutionEnvironment env) {
  DataStream<String> source = getDataStream(env);
  DataStream<String> stream = AsyncDataStream.unorderedWait(source, new HBaseAsyncFunction());
  stream.print();
}

/*** For ListenableFuture ***/
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ListenableFuture;

public class HBaseAsyncFunction implements AsyncFunction<String, String> {
  // initialize it while reading object
  transient Connection connection;

  @Override
  public void asyncInvoke(String val, AsyncCollector<String> c) {
    Get get = new Get(Bytes.toBytes(val));
    Table ht = connection.getTable(TableName.valueOf(Bytes.toBytes("test")));

    ListenableFuture<Result> future = ht.asyncGet(get);
    Futures.addCallback(future, 600
      new FutureCallback<Result>() {
        @Override public void onSuccess(Result result) {
          List ret = new ArrayList<String>();
          ret.add(result.get(...));
          c.collect(ret);
        }

        @Override public void onFailure(Throwable t) {
          c.collect(t);
        }
      },
      MoreExecutors.newDirectExecutorService()
    );
  }
}
 
// create data stream
public void createHBaseAsyncTestStream(StreamExecutionEnvironment env) {
  DataStream<String> source = getDataStream(env);
  DataStream<String> stream = AsyncDataStream.unorderedWait(source, new HBaseAsyncFunction());
  stream.print();
}

 

Compatibility, Deprecation, and Migration Plan

...