Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The primary public class is AsyncScalarFunction, for being the base class of all async scalar functions.  
The type is parameterized with a return type for the eval call.  This is similar to the definition of AsyncTableFunction.

Code Block
public class AsyncScalarFunction<T>AsyncScalarFunction extends UserDefinedFunction {
    @Override
    public final FunctionKind getKind() {
        return FunctionKind.ASYNC_SCALAR;
    }

    @Override
    public TypeInference getTypeInference(DataTypeFactory typeFactory) {
        TypeInference val = TypeInferenceExtractor.forAsyncScalarFunction(typeFactory, getClass());
        return val;
    }
}

...

Code Block
public class RemoteCallFunction extends AsyncScalarFunction<String>AsyncScalarFunction {

    private ExternalClient client;
    private ExecutorService executor;

    public RemoteCallFunction() {
    }

    @Override
    public void open(FunctionContext context) throws Exception {
        client = new Client();
        executor = Executors.newFixedThreadPool(
            context.getJobParameter("in-flight-requests", 10));
    }

    @Override
    public void close() throws Exception {
        client.close();
        executor.shutdownNow();
    }

    public final void eval(
            CompletableFuture<String> future,
            String param1,
            int param2) {
        executor.submit(() -> {
            try {
                String resp = client.rpc(param1, param2);
                future.complete(resp);
            } catch (Throwable t) {
                future.completeExceptionally(t);
            }
        });
    }
}

As with the standard ScalarFunction, there is an eval method, but with a 0th parameter of the type CompletableFuture<String> future. This is the primary method used to invoke the async functionality.  The generic parameter of the future is used to infer the return type for the type system.


New configurations will be introduced for the functionality, similar in nature to table.exec.async-lookup.* :

...