Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

To limit the scope for consideration, the goal is to allow AsyncScalarFunctions to be used in:

  • Projections (Ee.g. SELECT func(f1) FROM ...)

  • Simple Conditions (Ee.g. SELECT ... FROM T1 WHERE func(f1) > 10)

Out of consideration:

  • Join Conditions (Ee.g. SELECT ... T1 INNER JOIN T1 ON func(f1) = func(f2))

  • Subquery Conditions (Ee.g. SELECT ... WHERE func(f1) IN (SELECT id FROM Table)

  • Aggregate Projections (Ee.g. SELECT f1, func(count(*)) from t1 group by f1)

  • Other places where non deterministic ordering or other considerations may complicate execution or break SQL semantics.

Regarding the changelog modes supported by the async operator, incoming row types to support are:

  • Insert (+I)

  • Update After (+U)

Both of these types can be passed on to the UDF and executed. Any other row types won’t be supported.

...

The primary public class is AsyncScalarFunction, for being the base class of all async scalar functions.  The type is parameterized with a return type for the eval call.  This is similar to the definition of AsyncTableFunction.

Code Block
public class AsyncScalarFunction<T> extends UserDefinedFunction {
    @Override
    public final FunctionKind getKind() {
        return FunctionKind.ASYNC_SCALAR;
    }

    @Override
    public TypeInference getTypeInference(DataTypeFactory typeFactory) {
        TypeInference val = TypeInferenceExtractor.forAsyncScalarFunction(typeFactory, getClass());
        return val;
    }
}

...


These options are scoped with the catalog, db, and function name as registered in the table environment so that any given definition can be configured.  Similarly, a system function can be configured with the special prefix system, as in the last example.
The options have the following meanings:

Name (Prefix table.exec.async-scalar.catalog.db.func-name)

Meaning

buffer-capacity

The number of outstanding requests the operator allows at once

timeout

The time which can pass before a restart strategy is triggered

output-mode

Depending on whether the planner deems it possible to allow for the more performant unordered option.

retry-strategy

FIXED_DELAY is for a retry after a fixed amount of time

fixed-delay

The time to wait between retries for the FIXED_DELAY strategy.

max-attempts

The maximum number of attempts while retrying.


Proposed Changes

Planner Changes

One of the areas that have been used as inspiration for planner changes are the python calc rules.  Most of the split rules (rules for complex calc nodes being split into multiple simpler calc nodes) will be generalized and shared between the two, since remote python calls and async calls more generally share much of the same structure.  If done correctly, the intention is to simplify the async operator to handle only FlinkLogicalCalcs which contain async UDF calls in projections and no other calc logic (non async calls, field accesses, conditions).  The high level motivation is that anything that comes after an async call is easier to chain as a series of operators rather than internally within a single operator.

Specifically, PythonCalcSplitRuleBase will be generalized into RemoteCalcSplitRuleBase. It will be parameterized with a RemoteCalcCallFinder which can be used to analyze the RexNodes to look for python or async calls.

Code Block
public interface RemoteCalcCallFinder {
    // This RexNode contains either directly or indirectly a remote call
    // of the specified type.
    boolean containsRemoteCall(RexNode node);
    // This RexNode contains either directly or indirectly a call which is not
    // the specified remote type.
    boolean containsNonRemoteCall(RexNode node);
    // This RexNode is a remote call of the specified type.
    boolean isRemoteCall(RexNode node);
    // This RexNode is a call that is not the specified type.
    boolean isNonRemoteCall(RexNode node);
}

This will allow for PythonCalcCallFinder and AsyncCalcCallFinder implementations.

The rules we intend to adopt split up a FlinkLogicalCalc into two (or more ultimately) FlinkLogicalCalcs which feed into one another. The async split rules shared with Python will be:

Rule

Original RelNode
(IX are inputs from previous operator)

Becomes (Bottom ==> Top)
(IX are inputs from previous operator)

SPLIT_CONDITION

Splits FlinkLogicalCalcs which contain Remote functions in the condition into
multiple FlinkLogicalCalcs with the function call in a projection of one and the
condition checked in another using the result of the first.

FlinkLogicalCalc
Projections:
Condition: func(...) AND I0
FlinkLogicalCalc
Projections: I0 AS F0, func(...) AS F1
Condition:

==>

FlinkLogicalCalc
Projections:
Condition: F1 AND F0

SPLIT_PROJECT

Splits projections with async functions and non async
into two FlinkLogicalCalcs

FlinkLogicalCalc
Projections: Concat(func(...), I1)
Condition: 


FlinkLogicalCalc
Projections: I1 AS F0, func(...) as F1
Condition:

...

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

...

==>

FlinkLogicalCalc
Projections: Concat(F1, F0) 
Condition:

SPLIT_PROJECTION_REX_FIELD

Splits field accesses from the result of an async call in projections
into two FlinkLogicalCalcs

FlinkLogicalCalc
Projections:func(...).foobar
Condition: 
FlinkLogicalCalc
Projections: func(...) as F0
Condition:

==>

FlinkLogicalCalc
Projections: F0.foobar
Condition:

SPLIT_CONDITION_REX_FIELD

Splits field accesses from the result of an async call in condition
into two FlinkLogicalCalcs

FlinkLogicalCalc
Projections:
Condition: func(...).foobar
FlinkLogicalCalc
Projections: 
Condition: func(...)

==>

FlinkLogicalCalc
Projections: 
Condition: I0.foobar

EXPAND_PROJECT

Splits field accesses as inputs to async calls into two FlinkLogicalCalcs.

FlinkLogicalCalc
Projections: func(I5.foobar)
Condition: 
FlinkLogicalCalc
Projections: I5.foobar as F0
Condition:

==>

FlinkLogicalCalc
Projections: func(F0)
Condition: 

PUSH_CONDITION

Pushes conditions down to minimize rows requiring the async call,
creating two FlinkLogicalCalcs

FlinkLogicalCalc
Projections: func(...)
Condition: C1


FlinkLogicalCalc
Projections: 
Condition: C1

==>

FlinkLogicalCalc
Projections: func(...)
Condition:

Async Specific: NESTED_SPLIT

If there is a call with an async call as an argument, then it needs to be split
into two FlinkLogicalCalc with one feeding into the next.

FlinkLogicalCalc
Projections: func(func(...))
Condition:


FlinkLogicalCalc
Projections: func(...) as F0
Condition:

==>

FlinkLogicalCalc
Projections: func(F0)
Condition:

Disallowing Async functionality when not supported

It is most prudent to only allow async behavior where it is known to not violate SQL semantics.  
To do this, rules will be introduced which contain query structures which we don’t want to allow and if found, all of the async calls will be executed in synchronous mode.

This can be done by introducing a new trait AsyncOperatorModeTrait, which comes in sync mode and async mode (default), and which will be attached to a FlinkLogicalCalc
if it contains async calls which we would prefer to execute in sync mode.  Execution in synchronous mode just utilizes the same stack of as async, but waits on the result immediately after issuing the request.

An example of a query which could have unintended results without explicit handling:

  • Queries with ordering semantics:
    • e.g. SELECT func(f1) FROM Table ORDER BY f2;
      Here, we expect that the results of the query will be ordered by f2.  We wouldn't want to return results in the completion order from async function func.  
      We can solve it by either utilizing output-mode as ORDERED, and ensuring that we return the results in the input order, or by putting it into synchronous mode and ensuring ordering by doing one at a time.
  • Others 

Runtime Changes

Code Generation

Current code generation in Flink for ScalarFunctions assume that one call can be synchronously fed into another and results can be set on the output record right after being issued.
Neither of these will hold once we have async support. There are two phases for generated async code:

  • Call issuing phase

    • Same as a normal sync invocation w.r.t. to converting all of the parameters and calling the UDF

  • Result collection phase

    • Must wait for all async calls issued and also convert result types, if appropriate

    • Only once all results are ready can an output record be created and set

Image Added


Note that how the operator is going to invoke the generated code has to do with what rules are in effect above.
If every Async Operator is guaranteed to only invoke parallel async calls and no other generated Java/sql operations, then the generated code can be simplified, leaving support for everything else to existing Flink operators. This is the same approach taken by Python.

Since the code generator already supports generating AsyncFunctions (currently used by lookup joins), it’s simple enough to wrap the above logic in that class type with the fetching in asyncInvoke(...)

Operator

Since the call to the AsyncScalarFunction is wrapped in a AsyncFunction taking input rows, we have the benefit of using the existing class AsyncWaitOperator,
which handles ordering, checkpointing, timeouts and other implementation details.


Compatibility, Deprecation, and Migration Plan

This is only introducing new code paths, namely the use of AsyncScalarFunction, so there should be no compatibility issues with existing jobs/SQL queries.

Test Plan

  • Unit tests on all of the components

  • ITCases that cover:

    • Each split rule

    • Various query structures which include fallback cases

    • The ability to verify which plan was utilized, async or sync

Rejected Alternatives

AsyncTableFunction using a lookup Join

This requires you to model the lookups as a join with a table. For example:

Code Block
create TEMPORARY TABLE RemoteTable(table_lookup_key string, resp string,
    PRIMARY KEY (table_lookup_key) NOT ENFORCED) with ('connector' = 'remote_call');
SELECT i.table_lookup_key, resp FROM Inputs as i JOIN RemoteTable r FOR SYSTEM_TIME
    AS OF i.proc_time as a ON i.table_lookup_key = r.table_lookup_key;
  • + Already implemented

  • + High performance

  • + Good for modeling external databases with a table interface

  • - Can’t easily invoke the lookup multiple times per joining row

  • - Requires proc_time time attribute, which may be unnecessary or not already exist on a pre defined table

  • - Unintuitive compared to a scalar function

Polymorphic table function

This already exists in some capacity in Flink with window functions.
This would allow you to effectively specify a number of input keys for some remote call and issue calls at a high volume. For example:

Code Block
SELECT * FROM TABLE (REMOTE_CALL (Input => Table(TableToLookup) as d,
    Col => DESCRIPTOR("table_lookup_key")));
  • + More intuitive than a lookup join

  • - No support for user defined functions with PTFs.