Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

A common UDF type is the ScalarFunction.  This works well for CPU-intensive operations, but less well for IO bound or otherwise long-running computations.  One example of this is remote calls to external systems where networking, serialization, and database lookups might dominate the execution time. StreamTask has a single thread serially executing operators and their contained calls, which happen synchronously in the remote call case. Since each call can take, say, 1 second or more, that limits throughput and the overall performance, potentially accumulating backpressure to the upstream operator. The solution is to either: increase the parallelism of the query (resulting in a higher resource cost, overhead, etc.) or asynchronously fire off many requests concurrently and receive results as they complete. This FLIP aims to address the latter solution by introducing AsyncScalarFunction, a new UDF type which allows for issuing concurrent function calls.

Scope

There are lots of combinations of modes and Job types in Flink such as the changelog mode and streaming vs batch.  To make clear the scope this FLIP intends to cover, the functionality will be limited to the following:

  • Ordered Async operators: Much discussion has been centered around which changelog modes, SQL queries could be compatible with an operator which allowed unordered results, since there is a performance benefit. For now we'll only consider an operator that retains the input ordering.
  • Streaming mode: Some of the design considerations we're considering are focused on streaming.  To get good performance on batch, it's possible we might want to allow batching of async calls, but we're not addressing this at the moment.


Public Interfaces

The primary public class is AsyncScalarFunction, for being the base class of all async scalar functions.  
The type is parameterized with a return type for the eval call.  This is similar to the definition of AsyncTableFunction.

...

Name (Prefix table.exec.async-scalar.catalog.db.func-name)

Meaning

buffer-capacity

The number of outstanding requests the operator allows at once

timeout

The total time which can pass before a restart strategy is triggeredthe request is failed

output-mode

Depending on whether the planner deems it possible to allow for the more performant unordered option.

retry-strategy

FIXED_DELAY is for a retry after a fixed amount of time

fixedretry-delay

The time to wait between retries for the FIXED_DELAY strategy.  Could be the base delay time for a (not yet proposed) exponential backoff.

max-attempts

The maximum number of attempts while retrying.

...