Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current stateUnder Discussion

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Proposal-for-Asynchronous-I-O-in-FLINK-tt13497.html

...

Page properties


Discussion thread
Vote thread
JIRA

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-4391

...

Release1.2


Google Dochttps://docs.google.com/document/d/1Lr9UYXEz6s6R_3PWg3bZQLF3upGaNEkc0rQCFSzaYDI/edit

...

Code Block
languagejava
themeConfluence
titleAsyncDataStream.java
linenumberstrue
public class AsyncDataStream {
 /**
  * Add an AsyncWaitOperator. The order of output stream records may be reordered.
  *
  * @param in Input data stream
  * @param func AsyncFunction
  * @bufSize The max number of async i/o operation that can be triggered
  * @return A new DataStream.
  */
 public static DataStream<OUT> unorderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func, int bufSize);
 public static DataStream<OUT> unorderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func);
 
 /**
  * Add an AsyncWaitOperator with RichAsyncFunction.
  */
 public static DataStream<OUT> unorderedWait(DataStream<IN> in, RichAsyncFunction<IN, OUT> func, int bufSize);
 public static DataStream<OUT> unorderedWait(DataStream<IN> in, RichAsyncFunction<IN, OUT> func);
 
 /**
  * Add an AsyncWaitOperator. The order of output stream records is guaranteed to be the same as input ones.
  *
  * @param func AsyncWaitFunction
  * @param func AsyncFunction
  * @bufSize The max number of async i/o operation that can be triggered
  * @return A new DataStream.
  */
 public static DataStream<OUT> orderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func, int bufSize);
 public static DataStream<OUT> orderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func);
 
 /**
  * Add an AsyncWaitOperator with RichAsyncFunction.
  */
 public static DataStream<OUT> orderedWait(DataStream<IN> in, RichAsyncFunction<IN, OUT> func, int bufSize);
 public static DataStream<OUT> orderedWait(DataStream<IN> in, RichAsyncFunction<IN, OUT> func);
}

Proposed Changes

...

Code Block
languagejava
themeConfluence
titleAsyncFunction.java
linenumberstrue
public interface AsyncFunction<IN, OUT> extends Function, Serializable {
  /**
   * Trigger async operation for each stream input.
   * The AsyncCollector should be registered into async client.
   *
   * @param input Stream Input
   * @param collector AsyncCollector
   */
  void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception;
}
 
public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction
    implements AsyncFunction<IN, OUT> {
  @Override
  public abstract void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception;
}

...

While restoring the operator’s state, the operator will scan all elements in the state, get AsyncCollectors, call AsyncFunction.asyncInvoke() and insert them back into AsyncCollectorBuffer.

 

Notes

Example

...

Async Resource Sharing

For the case to share async resources(like connection to hbase, netty connections) among different slots(task workers) in the same TaskManager(a.k.a the same JVM), we can make the connection static so that all threads in the same process can share the same instance.

Of course, please pay attention to thread safety while using those resources.

Example

For callback

Code Block
languagejava
themeConfluence
titleExample.java
linenumberstrue
/***  ***/
public class HBaseAsyncFunction implements AsyncFunction<String, String> {
  // initialize it while reading object
  transient Connection connection;

  @Override
  public void asyncInvoke(String val, AsyncCollector<String> c) {
    Get get = new Get(Bytes.toBytes(val));
    Table ht = connection.getTable(TableName.valueOf(Bytes.toBytes("test")));
    // UserCallback is from user’s async client.
    ((AsyncableHTableInterface) ht).asyncGet(get, new UserCallback(c));
  }
}

// create data stream
public void createHBaseAsyncTestStream(StreamExecutionEnvironment env) {
  DataStream<String> source = getDataStream(env);
  DataStream<String> stream = AsyncDataStream.unorderedWait(source, new HBaseAsyncFunction());
  stream.print();
}

...

For ListenableFuture

Code Block
languagejava
themeConfluence
titleExample2.java
linenumberstrue
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ListenableFuture;

public class HBaseAsyncFunction implements AsyncFunction<String, String> {
  // initialize it while reading object
  transient Connection connection;

  @Override
  public void asyncInvoke(String val, AsyncCollector<String> c) {
    Get get = new Get(Bytes.toBytes(val));
    Table ht = connection.getTable(TableName.valueOf(Bytes.toBytes("test")));

    ListenableFuture<Result> future = ht.asyncGet(get);
    Futures.addCallback(future,
      new FutureCallback<Result>() {
        @Override public void onSuccess(Result result) {
          List ret = new ArrayList<String>();
          ret.add(result.get(...));
          c.collect(ret);
        }

        @Override public void onFailure(Throwable t) {
          c.collect(t);
        }
      },
      MoreExecutors.newDirectExecutorService()
    );
  }
}
 
// create data stream
public void createHBaseAsyncTestStream(StreamExecutionEnvironment env) {
  DataStream<String> source = getDataStream(env);
  DataStream<String> stream = AsyncDataStream.unorderedWait(source, new HBaseAsyncFunction());
  stream.print();
}


Compatibility, Deprecation, and Migration Plan

...