...
Since the code generator already supports generating AsyncFunctions (currently used by lookup joins), it will be used with the fetching logic in the method asyncInvoke(RowData input, ResultFuture<Collection<RowData> result). The body of that method will use existing code generation to call the UDF and do the appropriate casting for the various arguments. Additional logic will capture the UDF result Future, set a callback, convert results, and complete the AsyncFunction ResultFutureoutput row.
Utilizing a class AsyncDelegatingResultFuture similar to the existing DelegatingResultFuture (used for lookup joins), the generated method could look similar to the following:
Code Block |
---|
@Override public void asyncInvoke(RowData input, ResultFuture<RowData> resultFuture) throws Exception { final AsyncDelegatingResultFuture delegatingFuture = AsyncDelegatingResultFuture(resultFuture); try { java.util.function.Function<Object, GenericRowData> outputFactory = new java.util.function.Function<Object, GenericRowData>() { @Override public GenericRowData apply(Object udfResult) { // Gather the results and return the output object final GenericRowData out = new GenericRowData(2); out.setField(0, fdelegatingFuture.getSynchronousResult(0)); out.setField(1, udfResult); return out; } }; // Once it sees that the async future is done, the factory will be used to get the resulting output row delegatingFuture.setOutputFactory(outputFactory); // If an input is needed in the next operator, pass it along int passThroughField = input.getInt(0); delegatingFuture.addSynchronousResult(passThroughField); // Create a new future object and invoke the UDF. // The result will be converted to the internal type before calling the output factory. CompletableFuture<?> udfResultFuture = delegatingFuture.createAsyncFuture(typeConverter); udfInstance.eval(udfResultFuture); } catch (Throwable e) { resultFuture.completeExceptionally(e); } } |
...