Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Since the code generator already supports generating AsyncFunctions (currently used by lookup joins),  it will be used with the fetching logic in the method asyncInvoke(RowData input, ResultFuture<Collection<RowData> result).  The body of that method will use existing code generation to call the UDF and do the appropriate casting for the various arguments.  Additional logic will capture the UDF result Future, set a callback, convert results, and complete the AsyncFunction ResultFutureoutput row.

Utilizing a class AsyncDelegatingResultFuture similar to the existing DelegatingResultFuture (used for lookup joins), the generated method could look similar to the following:

Code Block
@Override
public void asyncInvoke(RowData input, ResultFuture<RowData> resultFuture) throws Exception {
   final AsyncDelegatingResultFuture delegatingFuture = AsyncDelegatingResultFuture(resultFuture);
   try {
       java.util.function.Function<Object, GenericRowData> outputFactory = new java.util.function.Function<Object, GenericRowData>() {
           @Override
           public GenericRowData apply(Object udfResult) {
			   // Gather the results and return the output object
               final GenericRowData out = new GenericRowData(2);
               out.setField(0, fdelegatingFuture.getSynchronousResult(0));
               out.setField(1, udfResult);
               return out;
           }
       };

	   // Once it sees that the async future is done, the factory will be used to get the resulting output row
       delegatingFuture.setOutputFactory(outputFactory);

       // If an input is needed in the next operator, pass it along
       int passThroughField = input.getInt(0);
       delegatingFuture.addSynchronousResult(passThroughField);

	   // Create a new future object and invoke the UDF.
       // The result will be converted to the internal type before calling the output factory.
       CompletableFuture<?> udfResultFuture = delegatingFuture.createAsyncFuture(typeConverter);
       udfInstance.eval(udfResultFuture);
    } catch (Throwable e) {
      resultFuture.completeExceptionally(e);
    }
}

...