Authors: Wei Zhong, Xingbo Huang
Status
Current state: Under Discussion
Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-114-Support-Python-UDF-in-SQL-Client-td38655.html
JIRA:
Released:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Currently the format for Java UDF in SQL Client environment file is:
functions: - name: myUDF from: class class: foo.bar.AggregateUDF constructor: - 7.6 - false |
We expand the "from" property to support defining Python UDFs in the following formats:
functions: - name: func1 from: python-object fully-qualified-name: test.func1 |
The "name" property defines the function name used in SQL query. "from: python-object" means this UDF comes from a python object. The value of the property "fully-qualified-name" is the fully qualified name of the Python UDF object, which contains its module name and object name. The format is "module_name.object_name".
...
Users can add Java Dependencies via command line option "-j" and "-l" when launching SQL Client. We follow the design and leverage the Python dependency management command line options in "flink run". Users can add Python Dependencies via such command line options when launching SQL Client:
Short Name | Full Name | Syntax | Description |
-pyarch | --pyArchives | -pyarch <archiveFile1>#<extractName>,<archiveFile2>#<extractName> | Add python archive files for job. The archive files will be extracted to the working directory of python UDF worker. Currently only zip-format is supported. For each archive file, a target directory is specified. If the target directory name is specified, the archive file will be extracted to a name can directory with the specified name. Otherwise, the archive file will be extracted to a directory with the same name of the archive file. The files uploaded via this option are accessible via relative path. '#' could be used as the separator of the archive file path and the target directory name. Comma (',') could be used as the separator to specify multiple archive files. This option can be used to upload the virtual environment, the data files used in Python UDF. The data files could be accessed in Python UDF, e.g.: f = open('data/data.txt', 'r'). |
-pyexec | --pyExecutable | -pyexec <pythonInterpreterPath> | Specify the path of the python interpreter used to execute the python UDF worker. The python UDF worker depends on Python 3.5+, Apache Beam (version == 2.19.0), Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). Please ensure that the specified environment meets the above requirements. |
-pyfs | --pyFiles | -pyfs <filePaths> | Attach custom python files for job. These files will be added to the PYTHONPATH of both the local client and the remote python UDF worker. The standard python resource file suffixes such as .py/.egg/.zip or directory are all supported. Comma (',') could be used as the separator to specify multiple files. |
-pyreq | --pyRequirements | -pyreq <requirementsFile>#<requirementsCachedDir> | Specify a requirements.txt file which defines the third-party dependencies. These dependencies will be installed and added to the PYTHONPATH of the python UDF worker. A directory which contains the installation packages of these dependencies could be specified optionally. Use '#' as the separator if the optional parameter exists. |
Examples
Python UDF in test.py:
from pyflink.table import DataTypes from pyflink.table.udf import udf @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING()) def func1(s: str): return s.replace('bar', 'foo') |
Define Python UDF in env.yaml:
functions: - name: myUDF # Java UDF from: class class: foo.bar.AggregateUDF constructor: - 7.6 - false - name: func1 # Python UDF from: python-object fully-qualified-name: test.func1 |
Add python dependencies:
$ sql-client.sh embedded -e env.yaml -pyfs test.py |
Compatibility, Deprecation, and Migration Plan
...