Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...


Proposed Changes

Planner Changes

Split Rules

One of the areas that have been used as inspiration for planner changes are the python calc rules.  Most of the split rules (rules for complex calc nodes being split into multiple simpler calc nodes) will be generalized and
shared between the two, since remote python calls and async calls more generally share much of the same structure.  If done correctly, the intention is to simplify the async operator to handle only FlinkLogicalCalcs
which contain async UDF calls in projections and no other calc logic (non async calls, field accesses, conditions).

The high level motivation is that anything that comes after an async call is easier to chain as a series of operators rather than internally within a single operator.

...

Rule

Original RelNode
(IX are inputs from previous operator)

Becomes (Bottom ==> Top)
(IX are inputs from previous operator)

SPLIT_CONDITION

Splits FlinkLogicalCalcs which contain Remote functions in the condition into
multiple FlinkLogicalCalcs with the function call in a projection of one and the
condition checked in another using the result of the first.

FlinkLogicalCalc
Projections:
Condition: func(...) AND I0
FlinkLogicalCalc
Projections: I0 AS F0, func(...) AS F1
Condition:

==>

FlinkLogicalCalc
Projections:
Condition: F1 AND F0

SPLIT_PROJECT

Splits projections with async functions and non async
into two FlinkLogicalCalcs

FlinkLogicalCalc
Projections: Concat(func(...), I1)
Condition: 


FlinkLogicalCalc
Projections: I1 AS F0, func(...) as F1
Condition:

==>

FlinkLogicalCalc
Projections: Concat(F1, F0) 
Condition:

SPLIT_PROJECTION_REX_FIELD

Splits field accesses from the result of an async call in projections
into two FlinkLogicalCalcs

FlinkLogicalCalc
Projections:func(...).foobar
Condition: 
FlinkLogicalCalc
Projections: func(...) as F0
Condition:

==>

FlinkLogicalCalc
Projections: F0.foobar
Condition:

SPLIT_CONDITION_REX_FIELD

Splits field accesses from the result of an async call in condition
into two FlinkLogicalCalcs

FlinkLogicalCalc
Projections:
Condition: func(...).foobar
FlinkLogicalCalc
Projections: 
Condition: func(...)

==>

FlinkLogicalCalc
Projections: 
Condition: I0.foobar

EXPAND_PROJECT

Splits field accesses as inputs to async calls into two FlinkLogicalCalcs.

FlinkLogicalCalc
Projections: func(I5.foobar)
Condition: 
FlinkLogicalCalc
Projections: I5.foobar as F0
Condition:

==>

FlinkLogicalCalc
Projections: func(F0)
Condition: 

PUSH_CONDITION

Pushes conditions down to minimize rows requiring the async call,
creating two FlinkLogicalCalcs

FlinkLogicalCalc
Projections: func(...)
Condition: C1


FlinkLogicalCalc
Projections: 
Condition: C1

==>

FlinkLogicalCalc
Projections: func(...)
Condition:

Async Specific: NESTED_SPLIT

If there is a call with an async call as an argument, then it needs to be split
into two FlinkLogicalCalc with one feeding into the next.

FlinkLogicalCalc
Projections: func(func(...))
Condition:


FlinkLogicalCalc
Projections: func(...) as F0
Condition:

==>

FlinkLogicalCalc
Projections: func(F0)
Condition:

Async Specific: ONE_ASYNC_PROJECTION_PER_CALC

If there are multiple projections containing async calls, it splits them into two
FlinkLogicalCalc with one feeding into the next.

FlinkLogicalCalc
Projections: func(...), func(...)
Condition:

FlinkLogicalCalc
Projections: func(...) as F0
Condition:

==>

FlinkLogicalCalc
Projections: F0, func(...)
Condition:

Physical Rules

In additional the split rules, there will also need to be a PhysicalAsyncCalcRule which converts FlinkLogicalCalcs to PhysicalAsyncCalcs. This will check for the existence of any async calls in the calc, using the same AsyncCalcCallFinder logic above.

Disallowing Async functionality when not supported

...

Code Block
@Override
public void asyncInvoke(RowData input, ResultFuture<RowData> resultFuture) throws Exception {
   final AsyncDelegatingResultFuture delegatingFuture = AsyncDelegatingResultFuture(resultFuture);
   try {
       java.util.function.Function<Object, GenericRowData> outputFactory = new java.util.function.Function<Object, GenericRowData>() {
           @Override
           public GenericRowData apply(Object udfResult) {
			   // Gather the results and return the output object
               final GenericRowData out = new GenericRowData(2);
               out.setField(0, delegatingFuture.getSynchronousResult(0));
               out.setField(1, udfResult);
               return out;
           }
       };

	   // Once it sees that the async future is done, the factory will be used to get the resulting output row
       delegatingFuture.setOutputFactory(outputFactory);

       // If an input is needed in the next operator, pass it along
       int passThroughField = input.getInt(0);
       delegatingFuture.addSynchronousResult(passThroughField);

	   // Create a new future object and invoke the UDF.
       // The result will be converted to the internal type before calling the output factory.
       CompletableFuture<?> udfResultFuture = delegatingFuture.createAsyncFuture(typeConverter);
       udfInstanceasyncScalarFunctionUdf.eval(udfResultFuture);
    } catch (Throwable e) {
      resultFuture.completeExceptionally(e);
    }
}

...

Since the call to the AsyncScalarFunction is wrapped in a AsyncFunction taking input rows, we have the benefit of using the existing class AsyncWaitOperator,
which handles ordering, checkpointing, timeouts and other implementation details.

The PhysicalAsyncCalcs mentioned in the planning phase will translate to an exec node, which creates the transformation containing this operator.


Compatibility, Deprecation, and Migration Plan

...