...
Proposed Changes
Planner Changes
Split Rules
One of the areas that have been used as inspiration for planner changes are the python calc rules. Most of the split rules (rules for complex calc nodes being split into multiple simpler calc nodes) will be generalized and
shared between the two, since remote python calls and async calls more generally share much of the same structure. If done correctly, the intention is to simplify the async operator to handle only FlinkLogicalCalcs
which contain async UDF calls in projections and no other calc logic (non async calls, field accesses, conditions).
The high level motivation is that anything that comes after an async call is easier to chain as a series of operators rather than internally within a single operator.
...
Rule | Original RelNode | Becomes (Bottom ==> Top) |
---|---|---|
SPLIT_CONDITION Splits FlinkLogicalCalcs which contain Remote functions in the condition into |
|
==>
|
SPLIT_PROJECT Splits projections with async functions and non async |
|
==>
|
SPLIT_PROJECTION_REX_FIELD Splits field accesses from the result of an async call in projections |
|
==>
|
SPLIT_CONDITION_REX_FIELD Splits field accesses from the result of an async call in condition |
|
==>
|
EXPAND_PROJECT Splits field accesses as inputs to async calls into two FlinkLogicalCalcs. |
|
==>
|
PUSH_CONDITION Pushes conditions down to minimize rows requiring the async call, |
|
==>
|
Async Specific: NESTED_SPLIT If there is a call with an async call as an argument, then it needs to be split |
|
==>
|
Async Specific: ONE_ASYNC_PROJECTION_PER_CALC If there are multiple projections containing async calls, it splits them into two |
|
==>
|
Physical Rules
In additional the split rules, there will also need to be a PhysicalAsyncCalcRule which converts FlinkLogicalCalcs to PhysicalAsyncCalcs.
This will check for the existence of any async calls in the calc, using the same AsyncCalcCallFinder logic above.
Disallowing Async functionality when not supported
...
Code Block |
---|
@Override public void asyncInvoke(RowData input, ResultFuture<RowData> resultFuture) throws Exception { final AsyncDelegatingResultFuture delegatingFuture = AsyncDelegatingResultFuture(resultFuture); try { java.util.function.Function<Object, GenericRowData> outputFactory = new java.util.function.Function<Object, GenericRowData>() { @Override public GenericRowData apply(Object udfResult) { // Gather the results and return the output object final GenericRowData out = new GenericRowData(2); out.setField(0, delegatingFuture.getSynchronousResult(0)); out.setField(1, udfResult); return out; } }; // Once it sees that the async future is done, the factory will be used to get the resulting output row delegatingFuture.setOutputFactory(outputFactory); // If an input is needed in the next operator, pass it along int passThroughField = input.getInt(0); delegatingFuture.addSynchronousResult(passThroughField); // Create a new future object and invoke the UDF. // The result will be converted to the internal type before calling the output factory. CompletableFuture<?> udfResultFuture = delegatingFuture.createAsyncFuture(typeConverter); udfInstanceasyncScalarFunctionUdf.eval(udfResultFuture); } catch (Throwable e) { resultFuture.completeExceptionally(e); } } |
...
Since the call to the AsyncScalarFunction is wrapped in a AsyncFunction taking input rows, we have the benefit of using the existing class AsyncWaitOperator,
which handles ordering, checkpointing, timeouts and other implementation details.
The PhysicalAsyncCalcs
mentioned in the planning phase will translate to an exec node, which creates the transformation containing this operator.
Compatibility, Deprecation, and Migration Plan
...