Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Launch separate language-specific execution environment for user-defined function execution. For example, Python VM would be launched for Python user-defined functions execution.
  2. Send the input data to the remote execution environment for execution.
  3. Execute the user-defined functions. It may need to access state, logging, report metrics during execution.
  4. Fetch the execution results from the remote execution environment.

Image Modified

Moving the execution of user-defined functions to language-specific execution environment and using an RPC service between the operator and the remote execution environment allows for executing user-defined functions other than Java. The diagram is as follows:

...

The most straight-forward way to execute Python user-defined function is that the execution logic is wrapped inside the regular Java user-defined function generated during registration. Suppose these are two Python user-defined functions in a StreamOperator, the workflow is as follows:

Image Modified

Pros:

  1. The changes to the existing Flink Table module is small. All the Python user-defined function execution logic is wrapped inside the generated Java user-defined function.
  2. The scenarios where Python user-defined functions could be used is wide. They could be used in any places where Java user-defined functions could be used. 

...

Suppose that there are two Python user-defined functions: UDF1 and UDF2, UDF1 takes the column x and y as inputs and UDF2 takes the column y and z as inputs. The workflow of this solution is as follows:

Image Modified

It should be noted that the inputs are processed in a pipeline manner, it will not wait for the execution results before processing another input.

...

  1. The height of a UDF is 0 if all its inputs are from the original input row
  2. For a Java UDF, if all its inputs are not Python UDFs, its height is the same as the maximum height of its inputs. Otherwise, its height is the maximum height of its inputs + 1
  3. For a Python UDF, its height is the same as the maximum height of its inputs.
    Image RemovedImage Added

Regarding the case shown in the above diagram, the height of java_udf1 and py_udf1 is 0, the height of java_udf2, py_udf2 and py_udf3 is 1.

...

The physical plan will be adjusted as follows:

Image Modified

Correlate

Regarding the Python UDFs contained in the join condition of Correlate node, it should be pushed down. There is no need to push down the Java UDFs contained in the join condition of Correlate node.

...

The following diagram shows the architecture of the PythonUserDefinedFunctionRunner based on Beam portability framework.

Image Modified

The components in color are the components of Beam portability framework which should be extended:

  1. OutputReceiverFactoryImpl is the callback to receive execution results
  2. StateRequestHandler, MapStateImpl, ListStateImpl, etc are the callback to receive state requests
  3. MapState, ListState in Python SDK Harness are the state stub to access state
  4. UserDefinedFunctionOperation is responsible for user-defined function execution.

High Level Flow

Image Modified

The high-level flow could be summarized as two parts:

...