THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
A helper class, named AsyncDataStream, is added to provide two methods to add AsyncFunction, which will do async i/o operation, into FLINK streaming job.
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
public class AsyncDataStream { /** * Add an AsyncWaitOperator. The order of output stream records may be reordered. * * @param in Input data stream * @param func AsyncWaitFunctionAsyncFunction * @bufSize The max number of async i/o operation that can be triggered * @return A new DataStream. */ public static DataStream<OUT> unorderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func, int bufSize); public static DataStream<OUT> unorderedWait(DataStream<IN>, AsyncWaitFunction<IN in, AsyncFunction<IN, OUT> func); /** * Add an AsyncWaitOperator with RichAsyncFunction. */ public static DataStream<OUT> unorderedWait(DataStream<IN> in, RichAsyncFunction<IN, OUT> func, int bufSize); public static DataStream<OUT> unorderedWait(DataStream<IN> in, RichAsyncFunction<IN, OUT> func); /** * Add an AsyncWaitOperator. The order of output stream records is guaranteed to be the same as input ones. * * @param func AsyncWaitFunction * @param func AsyncFunction * @bufSize The max number of async i/o operation that can be triggered * @return A new DataStream. */ public static DataStream<OUT> orderedWait(DataStream<IN> in, AsyncFunction<IN, OUT> func, int bufSize); public static DataStream<OUT> orderedWait(DataStream<IN>, AsyncWaitFunction<IN in, AsyncFunction<IN, OUT> func); /** * Add an AsyncWaitOperator with RichAsyncFunction. */ public static DataStream<OUT> orderedWait(DataStream<IN> in, RichAsyncFunction<IN, OUT> func, int bufSize); public static DataStream<OUT> orderedWait(DataStream<IN> in, RichAsyncFunction<IN, OUT> func); } |
Proposed Changes
...