You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current stateUnder Discussion

Discussion threadhere (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

With widespread advances in machine learning (or deep learning), more and more enterprises are beginning to incorporate ML models across a number of products. Supporting the ML scenarios is one of Flink’s roadmap targets. GPU is widely used as the accelerator by people from the ML community. It is necessary to add GPU support. 

Currently, Flink only supports to request GPU resource in Mesos integration while most users and enterprises deploying Flink on Yarn/Kubernetes or Standalone mode. Thus, we propose to add GPU support in Flink. As a first step, we propose to:

  • Enable user to configure the GPU cores per task executor and forward such requirements to the external resource managers (for Kubernetes/Yarn/Mesos setups).
  • Provide information of available GPU resources to operators.

Public Interfaces

Introduce task executor GPU resource configuration options:

  • “taskmanager.resource.gpu.amount”: Define how many GPUs in a task executor. The default value should be 0.
  • “taskmanager.resource.gpu.discovery-script.path”: Define the path of the discovery script. See Discovery Script Section.
  • “taskmanager.resource.gpu.discovery-script.args”: Define the arguments passed to the discovery script. See Discovery Script Section.
  • “kubernetes.taskmanager.resource.gpu.vendor”: Define the vendor of the GPU resource. In Kubernetes, the configuration key of GPU resource is “<vendor>.com/gpu”[3]. Only accept “nvidia” and “amd” at the moment. Only valid for Kubernetes mode.

Proposed Changes

  • User sets the “taskmanager.resource.gpu.amount”  and specifies the “taskmanager.resource.gpu.discovery-script.[path|args]” if needed.
  • For Yarn/Kubernetes mode, Flink maps the “taskmanager.resource.gpu.amount” to the corresponding field of resource requests to the external resource manager.
  • Introduce a GPUManager, which will execute the discovery script and get the available GPU resources from the output.
  • Operators get the GPU resource information from GPUManager

Flink requires the environment in which task executors run has required GPU resources and the GPU resources are accessible to task executors.

Regarding the amount of GPU resources:

  • We introduce the configuration option “taskmanager.resource.gpu.amount”, which defines how many GPU cores a task executor should have. Notice that this value will be passed to the discovery script as the first argument, See Discovery Script Section.
  • For standalone mode, it should be guaranteed by user that there are required GPU resources in the environment where task executors run.
  • For Yarn/Kubernetes mode, Flink will set the corresponding field of container/pod requests. For Yarn, it is "yarn.io/gpu". For Kubernetes, it is “<vendor>.com/gpu”. The vendor is configured by  “kubernetes.taskmanager.resource.gpu.vendor”. Then, the Yarn/Kubernetes will guarantee there are required amount of GPU resources in the container.


Regarding the accessibility of the GPU resources:

  • For standalone mode, it should be guaranteed by user, that the GPU indexes returned from discovery script (if any) exist and are accessible.
  • For Yarn/Kubernetes mode, the Yarn/Kubernetes will guarantee the requested GPU resources are accessible to the container.


Note: To make GPU resources accessible, certain setups/preparation are needed depending on your environment. See External Requirements Section.

Once the required GPU resources are accessible to task executors, task executors need to discover GPU resources and provide the GPU resource information to operators.

We introduce GPUManager as one of the task manager services to fulfill the above two responsibilities:

  • GPUManager will execute the discovery script and get the available GPU resources from the output.
  • The GPU resource information in GPUManager should be accessible to operators. We purposed to add GPUManager to the Environment and expose GPU information to RichFuntion and UserDefinedFunction through RuntimeContext and FunctionContext.

We introduce the configuration option “taskmanager.resource.gpu.discovery-script” and “taskmanager.resource.gpu.discovery-script.args”, which define the path of discovery script and its arguments. The discovery script should have two functions: allocate and release. GPUManager will execute the allocate function and get the available GPU resources from the output when it is opened and execute the release function when it is closed. 

In the allocate function, the discovery script should:

  • Return a list of the available GPU indexes, split by a comma.
  • Exit with non-zero if the output does not meet the expectation.
  • Flink passes the keyword “allocate” and the amount (taskmanager.resource.gpu.amount) as the first two arguments into the script. The user-defined arguments would be appended after it.

In the release function, the discovery script should:

  • Clean up all the state and file it produced in allocate function.
  • Exit with non-zero in failure.
  • Flink passes the keyword “release” and the amount (taskmanager.resource.gpu.amount) as the first two arguments into the script. The user-defined arguments would be appended after it.


We provide a default script. In its allocate function:

  • The script would first get all the indexes of visible GPU resources, by using the “nvidia-smi”/“rocm-smi” command.
  • It will return a list of indexes of discovered GPUs, split by a comma. 
  • The number of GPU indexes returned from the default script should always match the amount configured through “taskmanager.resource.gpu.amount”
    • If there are more GPUs discovered than configured, the script returns only as many indexes as the configured amount.
    • If there are not enough GPU discovered, the script will fail and exit with non-zero.

In its “release” function, the script will do nothing and exit with zero.

For achieving worker-level isolation in standalone mode, we provide a privilege option for the default script. User needs to configure the “taskmanager.resource.gpu.discovery-script.args” to turn it on. For more discussion of worker-level isolation, See Worker-level Isolation Section.

User can also provide their own discovery scripts, addressing their custom requirements, e.g., dynamically deciding the amount of returned GPU indexes, etc.

Unlike CPU, GPU has two dimensions resource: cores and video memory. While the cores could be shared by multiple jobs, video memory should be exclusive. Sharing the same GPU across multiple operators may result in OOMs and job failures if the total video memory limit is exceeded. Thus, we need isolation to make sure video memory usage does not exceed the physical limitation.

For Yarn/Kubernetes mode, the underlying system will guarantee that the visible GPU resources on the container should be strictly matching the requirement. Thus, worker-level isolation could be achieved without extra work.

For standalone mode, multiple task executors may be co-located on the same device, and each GPU is visible to all the task executors. To achieve worker-level isolation in such scenarios, we need first decide which task executor uses which GPU in a cooperative way and then make sure the task executor can only see the GPU resources allocated for it. We provide a privilege mode for this in the default script.

In the privilege mode, we use a common assignment file to synchronize the GPU assignment across different task executors. After retrieving indexes of all visible GPUs, the script should open the assignment file, check which GPUs are already in use, and write down which GPUs it decides to use (choosing from the non-used ones) with the PID of task executor. We leverage the “flock” mechanism to guarantee accesses to the assignment file are exclusive. Since only one process can access the assignment file at the same time, we ensure no GPU will be used by multiple workers.

  • The assignment file is “/var/tmp/flink-gpu-assignment” by default, user could set it through the “taskmanager.resource.gpu.discovery-script.args”. User needs to ensure that it is same file for all the task executors in the same cluster and do not delete it before the cluster stopped. If the cluster is not stopped properly, this file needs to be deleted manually.


After that, the script leverages the “cgroups” mechanism[5], with which we could set the visibility of each GPU resource for a certain control group (task executor and its children processes). The script will create a specific cgroup for the task executor. Then add the indexes of GPU resources, which the task executor could not use, to the device blacklist of this cgroup. The task executor and its children processes will only see the GPU resources allocated for it since then.

The release function will check the GPUs used by the task executor through PID and remove those records from the assignment file.

  • If the task executor failed unexpectedly, the release function may not be triggered.  In this scenario, task executors start after this will read the dirty data. That may cause task executor mistakenly reports there is no enough GPU resource. To address this issue, the script provides a “check-dead-process” option. If it is switched on in “taskmanager.resource.gpu.discovery-script.args”, in case of no enough non-recorded GPU, the allocate function will check whether the associated processes of exist records are still alive, and take over those GPUs whose associated processes are already dead. 

Note: This approach could not ensure the GPUs are isolated from other applications (another Flink cluster or non-flink applications).

We do not guarantee operator-level isolation in the first step. All the operators on the same task executor can see all the visible GPU indexes.

The approach for operator-level isolation depends on fine-grained resource management, which is not ready at the moment. We may revisit this once the fine-grained resource management approach is completed. An alternative solution to this problem is to have operators work cooperatively to share all GPU resources, but this is out of the scope of this proposal.

We list the external requirements for using GPU resources:

  • For standalone mode, admins should install NVIDIA drivers on all the nodes in the cluster.
  • For Yarn mode, admins should configure the cluster to enable GPU scheduling[1]. Notice the version requirement is 2.10+ and 3.1+. Notice that Yarn only supports the NVIDIA GPU at the moment.
  • For Kubernetes mode, admins should install the NVIDIA GPU device plugin[2][3]. Notice the version requirement is 1.10+. At the moment, Kubernetes only support NVIDIA GPU and AMD GPU.

Implementation Steps

  • Introduce config options “taskmanager.resource.gpu.amount” and forward the request to Yarn/Kubernetes.
  • Introduce “taskmanager.resource.gpu.discovery-script.[path|args]” config option and provide a default script.
  • Introduce GPUManager and add it to TaskManagerServices. Make it accessible to the operator.
  • Add privilege option to the default script.
  • Update the RestAPI/WebUI to properly display information of GPU Resources.

Known Limitations

  • Currently, Yarn only supports the NVIDIA GPU.
  • There is no operator-level isolation for GPU resources at the moment. Meanwhile, there is no GPU cooperating mechanism among operators.
    • The GPU resource of a task executor will be shared by all operators scheduled on it. If multiple operators scheduled to the same GPU, the video memory limitation may be exceeded and the job will fail with OOM.
    • The usage distribution of GPU resources could be skew, e.g. all the operators may run their jobs in the same GPU while other GPUs are idle. The idle GPU resources are wasted in this scenario.
    • The operator-level isolation and fine-grained GPU resource scheduling might be achieved in the future, when FLIP-56[4] is ready. Currently, we could rely on operators cooperatively share all GPU resources by themselves.

Test Plan

  • This FLIP proposes can check by its test case.
  • The GPU discovery logic could only be tested manually on the device with NVIDIA GPU.

Reference

[1] Hadoop 3.1.0 – Using GPU On YARN

[2] NVIDIA/k8s-device-plugin: NVIDIA device plugin for Kubernetes

[3] Device Plugins

[4] FLIP-56: Dynamic Slot Allocation - Apache Flink

[5] cgroups(7) - Linux manual page

  • No labels