Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

4 new public method will be added to Python TableEnvironment and one new public method will be added to Python TableConfig:

class TableEnvironment(object):

    ...

    

    def add_python_file(self, file_path):

        pass


    def set_python_requirements(self, requirements_list_file, requirements_cached_dir=None):

        pass


    def add_python_archive(self, archive_path, extract_name):

        pass


class TableConfig(object):

    

    …

    

    def set_python_executable(self, exec_path):

        pass

add_python_file(self, file_path)

...

Then you can prepare a directory and download all the packages listed in the requirements.txt to the directory. This step is optional. A recommended approach is executing the following command:

pip download -d {cached_dir} -r {requirements_txt_path} --no-binary :all:

After that, call this interface with the path of requirements.txt and the cached dir if exists. Before running python worker, the packages listed in requirements.txt will be installed to the environment of python worker one by one according to their order in requirements.txt. If users specified the cached dir, it will be uploaded to cluster and the installation program will search packages in this directory instead of default repository(usually pypi.org), which can work in the environments without Internet.

This is a more complete example of this interface, suppose we need to install numpy on the cluster:

# command executed in shell:

echo numpy==1.16.5 > requirements.txt

pip download -d cached_dir -r requirements.txt --no-binary :all:

# python code:

t_env.set_python_requirements("requirements.txt", "cached_dir")

add_python_archive(self, archive_path, extract_name)

...

If the uploaded zip file is a python environment, please make sure that the python executable file can run on the platform which the cluster is running on, and the path of python executable file must be specified by TableEnvironment#get_config().set_python_executable(), e.g.:

# Suppose the python executable file is py37/bin/python

# command executed in shell:

zip -r venv.zip py37

# python code:

t_env.add_python_archive("venv.zip", "my_venv")

t_env.get_config().set_python_executable("my_venv/py37/bin/python")

set_python_executable(self, exec_path)

...

It is necessary to support managing dependencies and environment through command line so that the python jobs with additional dependencies can be submitted via "flink run" and web UI or other approached in the future. The PythonDriver class will support several new options as follows:

Short Name

Full Name

Syntax

Description

-pyfs

--pyFiles

-pyfs <file-path>

This option already exists but it only appends the file to client side PYTHONPATH currently. Now it will upload the file to cluster and append it to python worker’s PYTHONPATH, which is equivalent to "add_python_file".

-pyexec

--python-executable-path

-pyexec <python-executable-path>

This option is equivalent to `TableEnvironment#get_config().set_python_executable()`. 

-pyreq

--python-requirements

-pyreq <requirements-file-path> <cache-dir-path>

This option is equivalent to "set_python_requirements".

-pyarch

--python-archive

-pyarch <archive-file-path> <extract-name>

This option is equivalent to "add_python_archive".

Implementation

Implementation of SDK API

...

All logic of the environment and dependency management interfaces can be put in an independent class so that it can be easy to test. A simple architecture is as follows:

class DependencyManager(object):

    PYTHON_FILE_MAP = "PYTHON_FILE_MAP"

    PYTHON_REQUIREMENTS = "PYTHON_REQUIREMENTS"

    PYTHON_REQUIREMENTS_FILE = "PYTHON_REQUIREMENTS_FILE"

    PYTHON_ARCHIVES_MAP = "PYTHON_ARCHIVES_MAP"

    def __init__(self):

        ...

    def add_python_file(self, file_path):

        ...

    def set_python_requirements(self, requirements_list_file, requirements_cached_dir):

        ...

    def add_python_archive(self, archive_path, extract_name):

        ...

    def transmit_to_jvm(self, j_env, conf):

        conf.set_string(PYTHON_FILE_MAP, ….)

        conf.set_string(PYTHON_REQUIREMENTS, ….)

        …

        j_env.registerCachedFile(xxx, xxx)

        ...


class TableEnvironment(object):

    def __init__(self, ...):

        ….

        self._dependency_manager = DependencyManager()

    

    def add_python_file(self, file_path):

        self._dependency_manager.add_python_file(file_path)


    def set_python_requirements(self, requirements_list_file, requirements_cached_dir):

        self._dependency_manager.set_python_requirements(requirements_list_file, requirements_cached_dir)


    def add_python_archive(self, archive_path, extract_name):

        self._dependency_manager.add_python_archive(archive_path, extract_name)

    ...

    def execute(self, job_name):

        self._dependency_manager.transmit_to_jvm(self._j_tenv.execEnv(), self.get_config().get_configuration())

        self._j_tenv.execute(job_name)


class TableConfig(object):      

    ...    

    def set_python_executable(self, exec_path):

        ...

Implementation of New PythonDriver Options

...

The structure of PythonDependencyManager is as follows:

public class PythonDependencyManager {

  // create PythonDependencyManager from ExecutionConfig.getGlobalJobParameters().toMap() and

  // distributedCaches.

  public static PythonDependencyManager create(

     Map<String, String> dependencyMetaData,

     DistributedCache distributedCache) {...}


  // key is the absolute path of the files to append to PYTHONPATH, value is the origin file name

  public Map<String, String> getPythonFiles() {...


  // absolute path of requirements.txt

  public String getRequirementsFilePath() {...}


  // absolute path of the cached directory which contains user provided python packages

  public String getRequirementsDirPath() {...}


  // path of python executable file

  public String getPythonExec() {...}


  // key is the absolute path of the zip file, value is the target directory name to be extracted to

  public Map<String, String> getArchives() {...}

}

This class is used to parse the information uploaded from client, and provide that information to ProcessEnvironmentManager.

The structure of ProcessEnvironmentManager is as follows:

public class ProcessEnvironmentManager {

  

   public static ProcessEnvironmentManager create(

     PythonDependencyManager dependencyManager,

     String tmpDirectoryBase) {...}


  public void prepareEnvironment(Map<String, String> systemEnv) {

     registerShutdownHook(...);

     prepareWorkingDir(...);

     updateEnvironmentVariable(systemEnv);

  }


  public void cleanup() {removeShutdownHook(); ...}


  public void updateEnvironmentVariable(Map<String, String> systemEnv) {...}


  public void prepareWorkingDir(...) {...}


  public Thread registerShutdownHook(String pythonTmpDirectory) {

     Thread thread = new Thread(new DeleteTemporaryFilesHook(pythonTmpDirectory));

     Runtime.getRuntime().addShutdownHook(thread);

     return thread;

  }

}

This class is used to prepare and cleanup the working directory and other temporary directories of python worker. It needs the information provided by PythonDependencyManager and a temporary directory as the root of the python working directory. The configured temporary directory of current task manager can be obtained using  "getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories()". In current design, 3 kinds of directory are needed to prepare:

...

After the above directories are all ready, the shell script to launch python workers will be executed. The installation of required packages and the changing of working directory will be completed in this script. For each line of the requirements.txt file, the following command will be executed:

# just indicate the intention of appending the site-packages directory to PYTHONPATH

# actual code are more complicated

PYTHONPATH=${install_directory}/lib/pythonXY/site-packages:${PYTHONPATH}

export PYTHONPATH

PATH=${install_directory}/bin:${PATH}

${python} -m pip install ${every_line_content} --prefix ${install_directory} --ignore-installed --no-index --find-links ${cached_dir}

If users did not specify the cached dir the param "--no-index --find-links ${cached_dir}" will not be added.

...

Use Cases

  1. UDF relies on numpy:

# command executed in shell:

echo numpy==1.16.5 > requirements.txt

pip download -d cached_dir -r requirements.txt --no-binary :all:

# python code:

t_env.set_python_requirements("requirements.txt", "cached_dir")

       2. UDF relies on users' other libraries

t_env.add_python_file("/user/pyfile/1.py")

t_env.add_python_file("/user/pyfile/2.py")

# directory is also supported

t_env.add_python_file("/user/lib1")

t_env.add_python_file("/user/lib2.zip")

...

       3. UDF relies on python3.7 but the python on flink cluster is 2.7

# command executed in shell:

virtualenv py37 --python=python3 --always-copy

zip -r venv.zip py37

# python code:

t_env.add_python_archives("venv.zip", "venv")

t_env.get_config().set_python_executable("venv/py37/bin/python")

       4. UDF relies on a specific file, data.txt

# command executed in shell:

zip data.zip data.txt

# python code:

t_env.add_python_archives("data.zip", "data")

# in UDF:

with open("data/data.txt", "r") as f:

     ….

Compatibility, Deprecation, and Migration Plan

  • This FLIP is a new feature and so there is no compatible issue with previous versions.

Document

https://docs.google.com/document/d/1vq5J3TSyhscQXbpRhz-Yd3KCX62PBJeC_a_h3amUvJ4/edit?usp=sharing

...