Authors: Wei Zhong, Dian Fu
Page properties | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Status
Current state: "Under Discussion"
Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-UDF-Environment-and-Dependency-Management-td33514.html
JIRA: FLINK-14019
...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
It is necessary to support managing dependencies and environment through command line so that the python jobs with additional dependencies can be submitted via "flink run" and web UI or other approached in the future. The PythonDriver class will support several new options as follows:
Short Name | Full Name | Syntax | Description |
---|---|---|---|
-pyfs | --pyFiles | -pyfs |
<filePaths> | This option already exists but it only appends the file to client side PYTHONPATH currently. Now it will upload the file to cluster and append it to python worker’s PYTHONPATH, which is equivalent to "add_python_file". |
-pyexec | -- |
pyExecutable | -pyexec |
<pythonInterpreterPath> | This option is equivalent to `TableEnvironment#get_config().set_python_executable()`. |
-pyreq | -- |
pyRequirements | -pyreq <requirementsFile>#<requirementsCachedDir> | This option is equivalent to "set_python_requirements". "#" can be used to as the separator if "requirementsCachedDir" exists. |
-pyarch | -- |
pyArchives | -pyarch |
<archiveFile1>#<extractName>,<archiveFile2>#<extractName> | The |
option is equivalent to "add_python_archive". "," can be used as the separator for multiple archives and "#" can be used as the separator if "extractName" exists. |
Implementation
Implementation of SDK API
...
The structure of PythonDependencyManager is as follows:
public class PythonDependencyManager { // create PythonDependencyManager from ExecutionConfig.getGlobalJobParameters().toMap() and // distributedCaches. public static PythonDependencyManager create( Map<String, String> dependencyMetaData, DistributedCache distributedCache) {...} // key is the absolute path of the files to append to PYTHONPATH, value is the origin file name public Map<String, String> getPythonFiles() {...} // absolute path of requirements.txt public String getRequirementsFilePath() {...} // absolute path of the cached directory which contains user provided python packages public String getRequirementsDirPath() {...} //path of the python executable file public String getPythonExec() {...} // key is the name of the environment variable, value is the value of the environment variable public Map<String, String> getEnvironmentVariable() {...} // key is the absolute path of the zip file, value is the target directory name to be extracted to public Map<String, String> getArchives() {...} } |
PythonEnvironmentManager is used to manage the execution environment of python worker. The structure of PythonEnvironmentManager is as follows:
public interface PythonEnvironmentManager { /** * Create Apache Beam Environment object of python worker. */ RunnerApi.Environment createEnvironment(); /** * Create the RetrievalToken file which records all the files that need to be transferred via Apache Beam's * ArtifactService. */ String createRetrievalToken(); /** * Delete generated files during above actions. */ void cleanup(); } |
Flink Python UDF is implemented based on Apache Beam Portability Framework which uses a RetrievalToken file to record the information of users’ file. We will leverage the power of Apache Beam artifact staging for dependency management in docker mode.
...
The structure of ProcessEnvironmentManager is as follows:
public class ProcessEnvironmentManager implements PythonEnvironmentManager { public static ProcessEnvironmentManager create( PythonDependencyManager dependencyManager, String tmpDirectoryBase, Map<String, String> systemEnv) {
} public ProcessEnvironmentManager(...) { prepareEnvironment(); } @Override public void cleanup() { // perform the clean up work removeShutdownHook(); } @Override public RunnerApi.Environment createEnvironment() { // command = path of udf runner return Environments.createProcessEnvironment("", "", command, generateEnvironmentVariable()); } @Override public String createRetrievalToken() { // File transfer is unnecessary in process mode, // just create an empty RetrievalToken. return emptyRetrievalToken; } private Map<String, String> generateEnvironmentVariable() { // construct the environment variables such as PYTHONPATH, etc } private void prepareEnvironment() { registerShutdownHook(); prepareWorkingDir(); } private void prepareWorkingDir() {...} private Thread registerShutdownHook() { Thread thread = new Thread(new DeleteTemporaryFilesHook(pythonTmpDirectory)); Runtime.getRuntime().addShutdownHook(thread); return thread; } } |
This class is used to prepare and cleanup the working directory and other temporary directories of python worker. It needs the information provided by PythonDependencyManager and a temporary directory as the root of the python working directory. The configured temporary directory of current task manager can be obtained using "getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories()". In current design, 3 kinds of directory are needed to prepare:
...
The structure of DockerEnvironmentManager is as follows:
public class DockerEnvironmentManager implements PythonEnvironmentManager { public static DockerEnvironmentManager create( PythonDependencyManager dependencyManager, String tmpDirectoryBase, String dockerImageUrl) {
} public DockerEnvironmentManager(...) { registerShutdownHook(); } @Override public void cleanup() { // perform the clean up work removeShutdownHook(); } @Override public RunnerApi.Environment createEnvironment() { return Environments.createDockerEnvironment(dockerImageUrl); } @Override public String createRetrievalToken() { // construct the RetrievalToken according to user uploaded files } private Thread registerShutdownHook() { Thread thread = new Thread(new DeleteTemporaryFilesHook(pythonTmpDirectory)); Runtime.getRuntime().addShutdownHook(thread); return thread; } } |
Use Cases
- UDF relies on numpy:
...