Page properties | |||
---|---|---|---|
|
This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.
Page properties | ||||||||
---|---|---|---|---|---|---|---|---|
Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
...
|
Motivation
A common UDF type is the ScalarFunction. This works well for CPU-intensive operations, but less well for IO bound or otherwise long-running computations. One example of this is remote calls to external systems where networking, serialization, and database lookups might dominate the execution time. StreamTask has a single thread serially executing operators and their contained calls, which happen synchronously in the remote call case. Since each call can take, say, 1 second or more, that limits throughput and the overall performance, potentially accumulating backpressure to the upstream operator. The solution is to either: increase the parallelism of the query (resulting in a higher resource cost, overhead, etc.) or asynchronously fire off many requests concurrently and receive results as they complete. This FLIP aims to address the latter solution by introducing AsyncScalarFunction, a new UDF type which allows for issuing concurrent function calls.
Scope
There are lots of combinations of modes and Job types in Flink such as the changelog mode and streaming vs batch. To make clear the scope this FLIP intends to cover, the functionality will be limited to the following:
- Ordered Async operators: Much discussion has been centered around which changelog modes, SQL queries could be compatible with an operator which allowed unordered results, since there is a performance benefit. For now we'll only consider an operator that retains the input ordering.
- Streaming mode: Some of the design considerations we're considering are focused on streaming. To get good performance on batch, it's possible we might want to allow batching of async calls, but we're not addressing this at the moment.
Public Interfaces
The primary public class is AsyncScalarFunction, for being the base class of all async scalar functions.
The type is parameterized with a return type for the eval call. This is similar to the definition of AsyncTableFunction.
Code Block |
---|
public class AsyncScalarFunction<T>AsyncScalarFunction extends UserDefinedFunction { @Override public final FunctionKind getKind() { return FunctionKind.ASYNC_SCALAR; } @Override public TypeInference getTypeInference(DataTypeFactory typeFactory) { TypeInference val = TypeInferenceExtractor.forAsyncScalarFunction(typeFactory, getClass()); return val; } } |
...
Code Block |
---|
public class RemoteCallFunction extends AsyncScalarFunction<String>AsyncScalarFunction { private ExternalClient client; private ExecutorService executor; public RemoteCallFunction() { } @Override public void open(FunctionContext context) throws Exception { client = new Client(); executor = Executors.newFixedThreadPool( context.getJobParameter("in-flight-requests", 10)); } @Override public void close() throws Exception { client.close(); executor.shutdownNow(); } public final void eval( CompletableFuture<String> future, String param1, int param2) { executor.submit(() -> { try { String resp = client.rpc(param1, param2); future.complete(resp); } catch (Throwable t) { future.completeExceptionally(t); } }); } } |
As with the standard ScalarFunction, there is an eval method, but with a 0th parameter of the type CompletableFuture<String> future.
This is the primary method used to invoke the async functionality. The generic parameter of the future is used to infer the return type for the type system.
New configurations will be introduced for the functionality, similar in nature to table.exec.async-lookup.*
:
Code Block |
---|
table.exec.async-scalar.catalog.db.func-name.buffer-capacity: 10 table.exec.async-scalar.catalog.db.func-name.timeout: 30s table.exec.async-scalar.catalog.db.func-name.output-mode: ORDERED table.exec.async-scalar.catalog.db.func-name.retry-strategy: FIXED_DELAY table.exec.async-scalar.catalog.db.func-name.fixed-delay: 10s table.exec.async-scalar.catalog.db.func-name.max-attempts: 3 table.exec.async-scalar.system.func-name.buffer-capacity: 10 |
These options are scoped with the catalog, db, and function name as registered in the table environment so that any given definition can be configured. Similarly, a system function can be configured with the special prefix system, as in the last example.ideally would be function scoped, but since `ConfigOption` doesn't make it easy to have a per-function config, they are global. Future work could allow these to be overridden on a per definition basis.
The following configurations will be availableThe options have the following meanings:
Name (Prefix table.exec.async-scalar.catalog.db.func-name.exec.async-scalar) | Meaning |
---|---|
buffer-capacity | The number of outstanding requests the operator allows at once |
timeout | The total time which can pass before a restart strategy is triggered |
output-mode | Depending on whether the planner deems it possible to allow for the more performant unordered option. |
the invocation (including retries) is considered timed out and task execution is failed | |
retry-strategy | FIXED_DELAY is for a retry after a fixed amount of time |
fixedretry-delay | The time to wait between retries for the FIXED_DELAY strategyretries for the FIXED_DELAY strategy. Could be the base delay time for a (not yet proposed) exponential backoff. |
max-attempts | The maximum number of attempts while retrying. |
...
- Queries with ordering semantics:
- e.g. SELECT func(f1) FROM Table ORDER BY f2;
Here, we expect that the results of the query will be ordered by f2. We wouldn't want to return results in the completion order from async function func.
We can solve it by either utilizing output-mode as outputting in ORDERED, and ensuring that we return the results in the input order, or by putting it into synchronous mode and ensuring ordering by doing one at a time.
- e.g. SELECT func(f1) FROM Table ORDER BY f2;
- Others? Would be great to get feedback on other cases that should be considered.
For the first version of this functionality where the operator outputs only in ordered mode, synchronous mode may not need to be enabled.
Runtime Changes
Code Generation
...
Since the call to the AsyncScalarFunction is wrapped in a AsyncFunction taking input rows, we have the benefit of using the existing class AsyncWaitOperator, which handles ordering, checkpointing, timeouts and other implementation details. Since only ordered results are handled in this scope, ORDERED will be the default behavior.
The PhysicalAsyncCalcs
mentioned in the planning phase will translate to an exec node, which creates the transformation containing this operator.
...