Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Authors:  Wei Zhong, Xingbo Huang

...

Page properties

...


Discussion thread

...


Vote thread

...

JIRA

...


JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-16971

Release1.11


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Currently the format for Java UDF in SQL Client environment file is:

functions:

  - name: myUDF

    from: class

    class: foo.bar.AggregateUDF

    constructor:

      - 7.6

      - false


We expand the "from" property to support defining Python UDFs in the following formats:

functions:

  - name: func1

    from: python-object

    fully-qualified-name: test.func1


The "name" property defines the function name used in SQL query. "from: python-object" means this UDF comes from a python object. The value of the property "fully-qualified-name" is the fully qualified name of the Python UDF object, which contains its module name and object name. The format is "module_name.object_name".

...

Users can add Java Dependencies via command line option "-j" and "-l" when launching SQL Client. We follow the design and leverage the Python dependency management command line options in "flink run". Users can add Python Dependencies via such command line options when launching SQL Client:

Short Name

Full Name

Syntax

Description

-pyarch

--pyArchives

-pyarch <archiveFile1>#<extractName>,<archiveFile2>#<extractName>

Add python archive files for job. The archive files will be extracted to the working directory of python UDF worker. Currently only zip-format is supported. For each archive file, a target directory is specified. If the target directory name is specified, the archive file will be extracted to a name can directory with the specified name. Otherwise, the archive file will be extracted to a directory with the same name of the archive file. The files uploaded via this option are accessible via relative path. '#' could be used as the separator of the archive file path and the target directory name. Comma (',') could be used as the separator to specify multiple archive files. This option can be used to upload the virtual environment, the data files used in Python UDF. The data files could be accessed in Python UDF, e.g.: f = open('data/data.txt', 'r'). 

-pyexec

--pyExecutable

-pyexec <pythonInterpreterPath>

Specify the path of the python interpreter used to execute the python UDF worker. The python UDF worker depends on Python 3.5+, Apache Beam (version == 2.19.0), Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). Please ensure that the specified environment meets the above requirements.

-pyfs

--pyFiles

-pyfs <filePaths>

Attach custom python files for job. These files will be added to the PYTHONPATH of both the local client and the remote python UDF worker. The standard python resource file suffixes such as .py/.egg/.zip or directory are all supported. Comma (',') could be used as the separator to specify multiple files.

-pyreq

--pyRequirements

-pyreq <requirementsFile>#<requirementsCachedDir>

Specify a requirements.txt file which defines the third-party dependencies. These dependencies will be installed and added to the PYTHONPATH of the python UDF worker. A directory which contains the installation packages of these dependencies could be specified optionally. Use '#' as the separator if the optional parameter exists.


Examples

Python UDF in test.py:

from pyflink.table import DataTypes

from pyflink.table.udf import udf


@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())

def func1(s: str):

   return s.replace('bar', 'foo')


Define Python UDF in env.yaml:

functions:

 - name: myUDF  # Java UDF

   from: class

   class: foo.bar.AggregateUDF

   constructor:

   - 7.6

   - false

 - name: func1  # Python UDF

   from: python-object

   fully-qualified-name: test.func1


Add python dependencies:

$ sql-client.sh embedded -e env.yaml -pyfs test.py 


# full example of python dependency management

$ sql-client.sh embedded -e env.yaml -pyfs test.py,/home/my/func2.py -pyreq /home/my/requirements.txt#/home/my/pacakges_dir -pyarch py37.zip#venv -pyexec venv/py37/bin/python

Use Python UDF in SQL Client:

Flink SQL > SELECT func1(a) AS a FROM source_table;

Compatibility, Deprecation, and Migration Plan

...