Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Page properties


This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Page properties

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

here (<- link to list.html?dev@flink.apache.org)
Discussion threadhttps://lists.apache.org/
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

thread/dt4tnwk8hcfj0sp3l3qwloqlljog8xm7
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-33978

Release<Flink Version>



Motivation

A common UDF type is the ScalarFunction.  This works well for CPU-intensive operations, but less well for IO bound or otherwise long-running computations.  
One  One example of this is remote calls to external systems where networking, serialization, and database lookups might dominate the execution time.
StreamTask has a single thread serially executing operators and their contained calls, which happen synchronously in the remote call case.
Since each call can take, say, 1 second or more, that limits throughput and the overall performance, potentially accumulating backpressure to the upstream operator.
The solution is to either: increase the parallelism of the query (resulting in a higher resource cost, overhead, etc.) or asynchronously fire off many requests concurrently and receive results as they complete.
This FLIP aims to address the latter solution by introducing AsyncScalarFunction, a new UDF type which allows for issuing concurrent function calls.

Scope

To limit the scope for consideration, the goal is to allow AsyncScalarFunctions to be used in:

  • Projections e.g. SELECT func(f1) FROM ...

  • Simple Conditions e.g. SELECT ... FROM T1 WHERE func(f1) > 10

Out of consideration:

  • Join Conditions e.g. SELECT ... T1 INNER JOIN T1 ON func(f1) = func(f2)

  • Subquery Conditions e.g. SELECT ... WHERE func(f1) IN (SELECT id FROM Table

  • Aggregate Projections e.g. SELECT f1, func(count(*)) from t1 group by f1

  • Other places where non deterministic ordering or other considerations may complicate execution or break SQL semantics.

Regarding the changelog modes supported by the async operator, incoming row types to support are:

  • Insert (+I)

  • Update After (+U)

Both of these types can be passed on to the UDF and executed. Any other row types won’t be supported.

...

There are lots of combinations of modes and Job types in Flink such as the changelog mode and streaming vs batch.  To make clear the scope this FLIP intends to cover, the functionality will be limited to the following:

  • Ordered Async operators: Much discussion has been centered around which changelog modes, SQL queries could be compatible with an operator which allowed unordered results, since there is a performance benefit. For now we'll only consider an operator that retains the input ordering.
  • Streaming mode: Some of the design considerations we're considering are focused on streaming.  To get good performance on batch, it's possible we might want to allow batching of async calls, but we're not addressing this at the moment.


Public Interfaces

The primary public class is AsyncScalarFunction, for being the base class of all async scalar functions.  
The type is parameterized with a return type for the eval call.  This is similar to the definition of AsyncTableFunction.

Code Block
public class AsyncScalarFunction extends UserDefinedFunction {
    @Override
    public final FunctionKind getKind() {
        return FunctionKind.ASYNC_SCALAR;
    }

    @Override
    public TypeInference getTypeInference(DataTypeFactory typeFactory) {
        TypeInference val = TypeInferenceExtractor.forAsyncScalarFunction(typeFactory, getClass());
        return val;
    }
}


An example implementing class could be the following:

Code Block
public class RemoteCallFunction extends AsyncScalarFunction {

    private ExternalClient client;
    private ExecutorService executor;

    public RemoteCallFunction() {

The primary public class is AsyncScalarFunction, for being the base class of all async scalar functions.  
The type is parameterized with a return type for the eval call.  This is similar to the definition of AsyncTableFunction.

Code Block
public class AsyncScalarFunction<T> extends UserDefinedFunction {
    @Override
    public final FunctionKind getKind() {
        return FunctionKind.ASYNC_SCALAR;
    }

    @Override
    public TypeInferencevoid getTypeInferenceopen(DataTypeFactoryFunctionContext typeFactorycontext) throws Exception {
        TypeInference valclient = TypeInferenceExtractor.forAsyncScalarFunction(typeFactory, getClassnew Client());
        return val;executor = Executors.newFixedThreadPool(
    }
}

An example implementing class could be the following:

Code Block
public class RemoteCallFunction extends AsyncScalarFunction<String> {

    private ExternalClient client context.getJobParameter("in-flight-requests", 10));
    private ExecutorService executor;}

    public RemoteCallFunction() {
    }
@Override
    @Override
    public void openclose(FunctionContext context) throws Exception {
        client = new Client.close();
        executor.shutdownNow();
    }

    public executorfinal =void Executors.newFixedThreadPooleval(
            context.getJobParameter("in-flight-requests", 10));
CompletableFuture<String> future,
     }

    @Override
   String publicparam1,
 void close() throws Exception {
       int client.close();param2) {
        executor.shutdownNowsubmit(();
    }
-> {
    public final void eval(
     try {
      CompletableFuture<String> future,
         String resp  String = client.rpc(param1, param2);
               int param2) {
future.complete(resp);
            } catch executor.submit(((Throwable t) -> {
            try {
   future.completeExceptionally(t);
            }
 String resp = client.rpc(param1, param2);
                future.complete(resp});
            } catch (Throwable t) {
                future.completeExceptionally(t);
            }
        });
    }
}
}

As with the standard As with the standard ScalarFunction, there is an eval method, but with a 0th parameter of the type   CompletableFuture<String> future.
This is the primary method used to invoke the async functionality.  The generic parameter of the future is used to infer the return type for the type system.


New configurations will be introduced for the functionality, similar in nature to table.exec.async-lookup.* :

Code Block
table.exec.async-scalar.catalog.db.func-name.buffer-capacity: 10
table.exec.async-scalar.catalog.db.func-name.timeout: 30s
table.exec.async-scalar.catalog.db.func-name.output-mode: ORDEREDretry-strategy: FIXED_DELAY
table.exec.async-scalar.fixed-delay: 10s
table.exec.async-scalar.catalog.db.func-name.retry-strategy: FIXED_DELAY
table.exec.async-scalar.catalog.db.func-name.fixed-delay: 10s
table.exec.async-scalar.catalog.db.func-name.max-attempts: 3
max-attempts: 3


These options ideally would be function scoped, but since `ConfigOption` doesn't make it easy to have a per-function config, they are global.  Future work could allow these to be overridden on a per definition basis.


The following configurations will be available:

Name (Prefix table.exec.async-scalar

...

)

Meaning

buffer-capacity

The number of outstanding requests the operator allows at once

timeout

The total time which can pass before the invocation (including retries) is considered timed out and task execution is failed

retry-strategy

FIXED_DELAY is for a retry after a fixed amount of time

retry-delay

The time to wait between retries for the FIXED_DELAY strategy.  Could be the base delay time for a (not yet proposed) exponential backoff.

max-attempts

The maximum number of attempts while retrying.


Proposed Changes

Planner Changes

Split Rules

One of the areas that have been used as inspiration for planner changes are the python calc rules.  Most of the split rules

Name (Prefix table.exec.async-scalar.catalog.db.func-name)

Meaning

buffer-capacity

The number of outstanding requests the operator allows at once

timeout

The time which can pass before a restart strategy is triggered

output-mode

Depending on whether the planner deems it possible to allow for the more performant unordered option.

retry-strategy

FIXED_DELAY is for a retry after a fixed amount of time

fixed-delay

The time to wait between retries for the FIXED_DELAY strategy.

max-attempts

The maximum number of attempts while retrying.

...

Planner Changes

One of the areas that have been used as inspiration for planner changes are the python calc rules.  Most of the split rules (rules for complex calc nodes being split into multiple simpler calc nodes) will be generalized and
shared between the two, since remote python calls and async calls more generally share much of the same structure.  If done correctly, the intention is to simplify the async operator to handle only FlinkLogicalCalcs
which contain async UDF calls in projections and no other calc logic (non async calls, field accesses, conditions).

The high level motivation is that anything that comes after an async call is easier to chain as a series of operators rather than internally within a single operator.

Specifically, PythonCalcSplitRuleBase will be generalized into RemoteCalcSplitRuleBase. It will be parameterized with a RemoteCalcCallFinder which can be used to analyze the RexNodes RexNodes to look for python or async calls.

...

This will allow for PythonCalcCallFinder and AsyncCalcCallFinder implementations.

The rules we intend to adopt split up a FlinkLogicalCalc into two (or more ultimately) FlinkLogicalCalcs FlinkLogicalCalcs which feed into one another. The async split rules shared with Python will be:

Rule

Original RelNode
(IX are inputs from previous operator)

Becomes (Bottom ==> Top)
(IX are inputs from previous operator)

SPLIT_CONDITION

Splits FlinkLogicalCalcs which contain Remote functions in the condition into
multiple FlinkLogicalCalcs with the function call in a projection of one and the
condition checked in another using the result of the first.

FlinkLogicalCalc
Projections:
Condition: func(...) AND I0
FlinkLogicalCalc
Projections: I0 AS F0, func(...) AS F1
Condition:

==>

FlinkLogicalCalc
Projections:
Condition: F1 AND F0

SPLIT_PROJECT

Splits projections with async functions and non async
into two FlinkLogicalCalcs

FlinkLogicalCalc
Projections: Concat(func(...), I1)
Condition: 


FlinkLogicalCalc
Projections: I1 AS F0, func(...) as F1
Condition:

==>

FlinkLogicalCalc
Projections: Concat(F1, F0) 
Condition:

SPLIT_PROJECTION_REX_FIELD

Splits field accesses from the result of an async call in projections
into two FlinkLogicalCalcs

FlinkLogicalCalc
Projections:func(...).foobar
Condition: 
FlinkLogicalCalc
Projections: func(...) as F0
Condition:

==>

FlinkLogicalCalc
Projections: F0.foobar
Condition:

SPLIT_CONDITION_REX_FIELD

Splits field accesses from the result of an async call in condition
into two FlinkLogicalCalcs

FlinkLogicalCalc
Projections:
Condition: func(...).foobar
FlinkLogicalCalc
Projections: 
Condition: func(...)

==>

FlinkLogicalCalc
Projections: 
Condition: I0.foobar

EXPAND_PROJECT

Splits field accesses as inputs to async calls into two FlinkLogicalCalcs.

FlinkLogicalCalc
Projections: func(I5.foobar)
Condition: 
FlinkLogicalCalc
Projections: I5.foobar as F0
Condition:

==>

FlinkLogicalCalc
Projections: func(F0)
Condition: 

PUSH_CONDITION

Pushes conditions down to minimize rows requiring the async call,
creating two FlinkLogicalCalcs

FlinkLogicalCalc
Projections: func(...)
Condition: C1


FlinkLogicalCalc
Projections: 
Condition: C1

==>

FlinkLogicalCalc
Projections: func(...)
Condition:

Async Specific: NESTED_SPLIT

If there is a call with an async call as an argument, then it needs to be split
into two FlinkLogicalCalc with one feeding into the next.

FlinkLogicalCalc
Projections: func(func(...))
Condition:


FlinkLogicalCalc
Projections: func(...) as F0
Condition:

==>

FlinkLogicalCalc
Projections: func(F0)
Condition:

Disallowing Async functionality when not supported

It is most prudent to only allow async behavior where it is known to not violate SQL semantics.  
To do this, rules will be introduced which contain query structures which we don’t want to allow and if found, all of the async calls will be executed in synchronous mode.

This can be done by introducing a new trait AsyncOperatorModeTrait, which comes in sync mode and async mode (default), and which will be attached to a FlinkLogicalCalc
if it contains async calls which we would prefer to execute in sync mode.  Execution in synchronous mode just utilizes the same stack of as async, but waits on the result immediately after issuing the request.

An example of a query which could have unintended results without explicit handling:

  • Queries with ordering semantics:
    • e.g. SELECT func(f1) FROM Table ORDER BY f2;
      Here, we expect that the results of the query will be ordered by f2.  We wouldn't want to return results in the completion order from async function func.  
      We can solve it by either utilizing output-mode as ORDERED, and ensuring that we return the results in the input order, or by putting it into synchronous mode and ensuring ordering by doing one at a time.
  • Others 

Runtime Changes

Code Generation

Current code generation in Flink for ScalarFunctions assume that one call can be synchronously fed into another and results can be set on the output record right after being issued.
Neither of these will hold once we have async support. There are two phases for generated async code:

  • Call issuing phase

    • Same as a normal sync invocation w.r.t. to converting all of the parameters and calling the UDF

  • Result collection phase

    • Must wait for all async calls issued and also convert result types, if appropriate

    • Only once all results are ready can an output record be created and set

Image Removed

Note that how the operator is going to invoke the generated code has to do with what rules are in effect above.
If every Async Operator is guaranteed to only invoke parallel async calls and no other generated Java/sql operations, then the generated code can be simplified, leaving support for everything else to existing Flink operators. This is the same approach taken by Python.

...

Async Specific: ONE_ASYNC_PROJECTION_PER_CALC

If there are multiple projections containing async calls, it splits them into two
FlinkLogicalCalc with one feeding into the next.

FlinkLogicalCalc
Projections: func(...), func(...)
Condition:

FlinkLogicalCalc
Projections: func(...) as F0
Condition:

==>

FlinkLogicalCalc
Projections: F0, func(...)
Condition:

Physical Rules

In additional the split rules, there will also need to be a PhysicalAsyncCalcRule which converts FlinkLogicalCalcs to PhysicalAsyncCalcs. This will check for the existence of any async calls in the calc, using the same AsyncCalcCallFinder logic above.

Disallowing Async functionality when not supported

It is most prudent to only allow async behavior where it is known to not violate SQL semantics.  To do this, rules will be introduced which contain query structures which we don’t want to allow and if found, all of the async calls will be executed in synchronous mode.

This can be done by introducing a new trait AsyncOperatorModeTrait, which comes in sync mode and async mode (default), and which will be attached to a FlinkLogicalCalc if it contains async calls which we would prefer to execute in sync mode. Execution in synchronous mode just utilizes the same stack of as async, but waits on the result immediately after issuing the request.

An example of a query which could have unintended results without explicit handling:

  • Queries with ordering semantics:
    • e.g. SELECT func(f1) FROM Table ORDER BY f2;
      Here, we expect that the results of the query will be ordered by f2.  We wouldn't want to return results in the completion order from async function func.  
      We can solve it by either outputting in ORDERED, and ensuring that we return the results in the input order, or by putting it into synchronous mode and ensuring ordering by doing one at a time.
  • Others?  Would be great to get feedback on other cases that should be considered.

For the first version of this functionality where the operator outputs only in ordered mode, synchronous mode may not need to be enabled.

Runtime Changes

Code Generation

Current code generation in Flink for ScalarFunctions assume that one call can be synchronously fed into another and results can be set on the output record right after being issued.
Neither of these will hold once we have async support. There are two phases for generated async code:

  • Call issuing phase

    • Same as a normal sync invocation w.r.t. to converting all of the parameters and calling the UDF, but with the result being returned with a Future.

  • Result collection phase

    • Must wait for all async calls issued and also convert result types, if appropriate.

    • Only once all results are ready can an output record be created and set.

    • Must happen with a callback on the Futures rather than synchronously.

Image Added


Note that how the operator is going to invoke the generated code has to do with what planner rules are in effect above. If every Async Operator is guaranteed to have only parallel async calls and no other generated Java/sql operations, then the generated code can be simplified, leaving support for everything else to existing Flink operators. This is a similar approach to that taken by Python.  With the last split rule above, the code can be simplified further by requiring only one async request per operator.

Since the code generator already supports generating AsyncFunctions (currently used by lookup joins),  this will be used with the main logic in asyncInvokeThe body of that method will use existing code generation to call the UDF and do the appropriate casting for the various arguments.  Additional logic will capture the UDF result Future, set a callback, convert results, and complete the output row.

Utilizing a class AsyncDelegatingResultFuture similar to the existing DelegatingResultFuture (used for lookup joins), the generated method could look similar to the following:

Code Block
@Override
public void asyncInvoke(RowData input, ResultFuture<RowData> resultFuture) throws Exception {
   // Invokes callbacks on resultFuture once the async call is complete.
   final AsyncDelegatingResultFuture delegatingFuture = AsyncDelegatingResultFuture(resultFuture);

   try {
       Function<Object, GenericRowData> outputFactory = new Function<Object, GenericRowData>() {
           @Override
           public GenericRowData apply(Object udfResult) {
			   // Gather the results and return the output object
               final GenericRowData out = new GenericRowData(2);
               out.setField(0, delegatingFuture.getSynchronousResult(0));
               out.setField(1, udfResult);
               return out;
           }
       };

	   // Once it sees that the async future is done, the factory will be used to get the resulting output row
       delegatingFuture.setOutputFactory(outputFactory);

       // If an input is needed in the next operator, pass it along
       int passThroughField = input.getInt(0);
       delegatingFuture.addSynchronousResult(passThroughField);

	   // Create a new future object and invoke the UDF.
       // The result will be converted to the internal type before calling the output factory.
       CompletableFuture<?> udfResultFuture = delegatingFuture.createAsyncFuture(typeConverter);
       asyncScalarFunctionUdf.eval(udfResultFuture);
    } catch (Throwable e) {
      resultFuture.completeExceptionally(e);
    }
}

Operator

Since the call to the AsyncScalarFunction is wrapped in a AsyncFunction taking input rows, we have the benefit of using the existing class AsyncWaitOperator,
which handles ordering, checkpointing, timeouts and other implementation details.  Since only ordered results are handled in this scope, ORDERED will be the default behavior.

The PhysicalAsyncCalcs mentioned in the planning phase will translate to an exec node, which creates the transformation containing this operator.


Compatibility, Deprecation, and Migration Plan

...