Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

...


Discussion thread

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-108-Add-GPU-support-in-Flink-td38286.html

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>


Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-17044

Release1.11


Please Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Public Interfaces

Introduce the ExternalResourceDriver external resource framework for external resource allocation and management. User could implement their third party driver for other external resources they want to leverage. The  The pattern of configuration options is:

  • external-resources. Define the {resourceName} list of enabled external resources, split by delimiter ",".
  • external-resource.{resourceName}.amount. Define the amount of external resources in a task executor.
  • external-resource.{resourceName}.driver-factory.class. Define the class name of

...

  • ExternalResourceDriverFactory.
  • external-resource.{

...

  • resourceName}.

...

  • kubernetes.

...

  • key. Optional config which defines the configuration key of that external resource in Kubernetes. If you want the Flink to request the external resource from Kubernetes(through its Device Plugin mechanism[3]), you need to explicitly set this key. Only valid for Kubernetes mode.
  • external-resource.{resourceName}.yarn.key. Optional config which defines the configuration key of that external resource in Yarn. If you want the Flink to request the external resource from Yarn, you need to explicitly set this key. Only valid for Yarn mode.
  • external-resource.{resourceName}.param.{params}. Each ExternalResourceDriver could define their specific configs following this pattern.

On the TaskExecutor side, introduce the ExternalResourceDriverFactory and ExternalResourceDriver interface, take the responsibility to manage and provide information of the external resource. User could implement their third party ExternalResourceDriver for other external resources they want to leverage.

Code Block
languagejava
titleExternalResourceDriver
public interface ExternalResourceDriverFactory {
    /**
    * Construct the ExternalResourceDriver from configuration.
    */
    ExternalResourceDriver createExternalResourceDriver(Congiuration config);
}

public interface ExternalResourceDriver {
    /**
    * Retrieve the information of the external resources according to the amount.
    */
    Set<? extends ExternalResourceInfo> retrieveResourceInfo(long amount);
}

Introduce the ExternalResourceInfo class, which contains the information of the external resources.

Code Block
languagejava
titleExternalResourceInfo
public interface ExternalResourceInfo {
}

Operators and functions could get that information from the RuntimeContext.

Code Block
languagejava
titleRuntimeContext
public interface RuntimeContext {
    /**
	 * Get the specific external resource information by the resourceName.
	 */
	Set<ExternalResourceInfo> getExternalResourceInfos(String resourceName);
}

For GPU resource, we introduce the following configuration options:

  • external-resource.gpu.amount”: Define how many GPUs in a task executor. The default value should be 0.
  • external-resource.gpu.param.discovery-script.path”: Define the path of the discovery script. See Discovery Script Section.
  • external-resource.gpu.param.discovery-script.args”: Define the arguments passed to the discovery script. If using default gpu discovery script, the allowed param is [--privilege] [--check-dead] and [--assign-file filePath]. See Discovery Script Section.
  • external-resource.{resourceName}.kubernetes.key. Define the configuration key of GPU in Kubernetes. The default value is “nvidia.com/gpu”. If using amd GPU, user could set it to "amd.com/gpu"
  • external-resource.{resourceName}.yarn.key. Define the configuration key of GPU in Yarn. The default value is "yarn.io/gpu".

We provide a GPUDriver which discovers the GPU resource through user-defined discovery script and provide the available GPU index.

For GPU resource, we introduce the GPUInforamtion class, which only contains the index of a GPU card.

Proposed Changes

  • We introduce the external resource framework for external resource allocation and management.
  • User sets the “external-resource.gpu.amount”, “external-resource.gpu.driver-factory.class” and specifies the “external-resource.gpu.param.discovery-script.[path|args]” if needed.
  • For Yarn/Kubernetes mode, Flink maps the “external-resource.gpu.amount” to the corresponding field of resource requests to the external resource manager.
  • Introduce a GPUDriver, which will execute the discovery script and get the available GPU resources from the output.
  • Operators and functions get the GPU resource information from GPUDriver

To provide extensibility and decouple the TaskExecutor/ResourceManager from the external resource management/allocation(following the separation of concern rule), we introduce the external resource framework for external resource allocation and management. This framework could be extended by third-party for other external resources they want to leverage.

The external resource framework drives the end-to-end workflow of external resource allocation and management. All enabled external resources should be added to "external-resource.list".

On the ResourceManager side, user defines the amount of the external resource. The framework takes the responsibility to allocate resources from external resource managers(Yarn/Kubernetes). User needs to specify the configuration key of that external resource on Yarn/Kubernetes. Then, Yarn/KubernetesResourceManager forward this external resource request to the external resource managers.

  • For Yarn, the YarnResourceManager adds the external resource to the ContainerRequest.
  • For Kubernetes, the KubernetesResourceManager adds the external resource to the pod for TaskExecutor(leverage the Device Plugin mechanism[3]).

On the TaskExecutor side, we introduce ExternalResourceDriver, which takes the responsibility to detect and provide information of external resources. TaskExecutor does not need to manage a specific external resource by itself, Operators and functions would get the ExternalResourceInfo from RuntimeConext.

Regarding the configuration, the common config keys are the amount of the external resources and the class name of ExternalResourceDriver. Besides, each driver could define their own configs following the specific pattern. In summary:

  • external-resources. Define the {resourceName} list of enabled external resources with delimiter ",". If configured, ResourceManager and TaskExecutor would check if the relevant configs exist for resources in this list. ResourceManager will forward the request to the underlying external resource manager. TaskExecutor will launch the corresponding ExternalResourceDriver.
  • external-resource.{resourceName}.amount. Define the amount of external resources in a task executor.
  • external-resource.{resourceName}.driver-factory.class. Define the class name of ExternalResourceDriverFactory.
  • external-resource.{resourceName}.kubernetes.key. Optional config which defines the configuration key of that external resource in Kubernetes. If you want the Flink to request the external resource from Kubernetes, you need to explicitly set this key. Only valid for Kubernetes mode.
  • external-resource.{resourceName}.yarn.key. Optional config which defines the configuration key of that external resource in Yarn. If you want the Flink to request the external resource from Yarn, you need to explicitly set this key. Only valid for Yarn mode

...

For GPU resource, we introduce the following configuration options:

  • external-resource.gpu.amount”: Define how many GPUs in a task executor. The default value should be 0.
  • external-resource.gpu.param.discovery-script.path”: Define the path of the discovery script. See Discovery Script Section.
  • external-resource.gpu.param.discovery-script.args”: Define the arguments passed to the discovery script. See Discovery Script Section.
  • external-resource.gpu.param.vendor”: Define the vendor of the GPU resource. In Kubernetes, the configuration key of GPU resource is “<vendor>.com/gpu”[3]. Only accept “nvidia” and “amd” at the moment. Only valid for Kubernetes mode.

Introduce the ExternalResourceInfo class, which contains the information of the external resources. Operators and functions could get that information from the RuntimeContext.

For GPU resource, we introduce the GPUInforamtion class, which contains the index of a GPU card.

RestAPI / WebUI (Need to get the information of GPU resource through the RestAPI and WebUI)

Proposed Changes

  • We introduce the ExternalResourceDriver for external resource allocation and management.
  • User sets the “taskmanager.resource.gpu.amount”  and specifies the “external-resource.gpu.param.discovery-script.[path|args]” if needed.
  • For Yarn/Kubernetes mode, Flink maps the “taskmanager.resource.gpu.amount” to the corresponding field of resource requests to the external resource manager.
  • Introduce a GPUManager, which will execute the discovery script and get the available GPU resources from the output.
  • Operators and functions get the GPU resource information from GPUManager

To provide extensibility and decouple the TaskExecutor/ResourceManager from the external resource management/allocation(following the separation of concern rule), we introduce the ExternalResourceDriver for the external resource allocation and management. This class could be extended by third-party for other external resources they want to leverage.

The ExternalResourceDriver drives the end-to-end workflow of external resource allocation and management.

On the ResourceManager side, user defines the amount of the external resource. ExternalResourceDriver takes the responsibility to allocate resources from external resource managers(Yarn/Kubernetes). ResourceManager does not need to understand how to allocate a specific external resource.

  • For Yarn, the ExternalResourceDriver needs to add the external resource to the ContainerRequest.
  • For Kubernetes, the pod for TaskExecutor is built by multiple decorators. ExternalResourceDriver needs to provide a specific decorator to forward the external resource request to the pod definition.

On the TaskExecutor side, ExternalResourceDriver takes the responsibility to detect and provide information of external resources. TaskExecutor does not need to manage a specific external resource by itself, Operators and functions would get the ExternalResourceInfo from RuntimeConext.

Regarding the configuration, the common config keys are the amount of the external resources and the class name of ExternalResourceDriver. Besides, each driver could define their own configs following the specific pattern. In summary:

  • external-resource.{resourceName}.amount. Define the amount of external resources in a task executor.
  • external-resource.{resourceName}.driver.class. Define the class name of ExternalResourceDriver.
  • external-resource.{resourceName}.param.{params}. Each ExternalResourceDriver could define their specific configs following this pattern.

The definition of ExternalResourceDriver and ExternalResourceInfo is:


Code Block
languagejava
titleExternalResourceDriver
titleExternalResourceDriver
public interface ExternalResourceDriverFactory {public abstract class ExternalResourceDriver {

    /**
    * Retrieve the information of the external resources according to the resourceProfile.
    */
    List<ExternalResourceInfo> retrieveResourceInfo(ResourceProfile resourceProfile);

    /**
    * When running* in Kubernetes, we need to decorate Construct the TMExternalResourceDriver pod to request the external resource.
   from configuration.
    */
      AbstractKubernetesStepDecoratorExternalResourceDriver getExternalResourceDecoratorcreateExternalResourceDriver(Congiuration config);
}

public interface ExternalResourceDriver {
    /**
    * When running* inRetrieve Yarn,the weinformation need toof add the external resourceresources requestaccording to the Resource of 
   amount.
    */
    void addExternalResourceToRequest(AMRMClient.ContainerRequest containerRequest    Set<? extends ExternalResourceInfo> retrieveResourceInfo(long amount);
}

public abstract classinterface ExternalResourceInfo {

    // Return theString name of that external resource.
    String getNamegetProperty(String key);
    Collection<String> getKeys();

}

...

On the ResourceManager side, Flink requires the environment in which task executors run has required GPU resources and the GPU resources are accessible to task executors.

...

  • We introduce the configuration option “external-resource.gpu.amount”, which defines how many GPU cores a task executor should have. Notice that this value will be passed to the discovery script as the first argument, See Discovery Script Section.
  • For standalone mode, it should be guaranteed by user that there are required GPU resources in the environment where task executors run.
  • For Yarn/Kubernetes mode, they will guarantee there are required amount of GPU resources in the container if we set the corresponding field in the request.
    • For Yarn, Flink will set the corresponding field(external-resource.gpu.yarn.io/gpukey) of container requests by GPUDriver#addExternalResourceToRequest.
    • For Kubernetes, Flink will set the corresponding field of pod requests by applying the decorator from GPUDriver#getExternalResourceDecorator. The corresponding field is “<vendor>.com/gpu” for Kubernetes. The vendor is configured by  “external(external-resource.gpu.param.vendor”.kubernetes.keyof pod requests. 

Regarding the accessibility of the GPU resources:

...

Note: To make GPU resources accessible, certain setups/preparation are needed depending on your environment. See External Requirements Section.

...

Once the required GPU resources are accessible to task executors, GPUDriver needs to discover GPU resources and provide the GPU resource information to operators.

...

We introduce the configuration option “external-resource.gpu.param.discovery-script” and “external-resource.gpu.param.discovery-script.args”, which define the path of discovery script and its arguments. The discovery script should have two functions: allocate and release. GPUManager will execute the allocate function and get the available GPU resources from the output when it is opened and execute the release function when it is closed. 

In the allocate function, the discovery script should:

  • Return a list of the available GPU indexes, split by a comma.
  • Exit with non-zero if the output does not meet the expectation.
  • Flink passes the keyword “allocate” and the amount (taskmanager.resource.gpu.amount) as the first two arguments into the script. The user-defined arguments would be appended after it.

.param.discovery-script.args”, which define the path of discovery script and its arguments. GPUDriver will execute the allocate function and get the available GPU resources from the output when it is opened and execute the deallocateAll function when it is closed. 

When executedIn the release function, the discovery script should:

  • Clean up all the state and file it produced in allocate functionReturn a list of the available GPU indexes, split by a comma.
  • Exit with non-zero in failureif the output does not meet the expectation. GPUDriver will throw exception in that case and then cause TaskExecutor initialization to fail.
  • Flink passes the keyword “release” and the amount (taskmanager.external-resource.gpu.amount) as the first two arguments argument into the script. The user-defined arguments would be appended after it.

We provide a default script. In its allocate function:

  • The script would first get all the indexes of visible GPU resources, by using the “nvidia-smi”/“rocm-smi” command.
  • It will return a list of indexes of discovered GPUs, split by a comma. 
  • The number of GPU indexes returned from the default script should always match the amount configured through “external-resource.gpu.amount”
    • If there are more GPUs discovered than configured, the script returns only as many indexes as the configured amount.
    • If there are not enough GPU discovered, the script will fail and exit with non-zero.

...

    • zero.

For achieving worker-level isolation in standalone mode, we provide a privilege option for the default script. User needs to configure the “taskmanager.add "--privilege" to the “external-resource.gpu.param.discovery-script.args” to turn it on. For more discussion of worker-level isolation, See Worker-level Isolation section.

...

For standalone mode, multiple task executors may be co-located on the same device, and each GPU is visible to all the task executors. To achieve worker-level isolation in such scenarios, we need first decide which task executor uses which GPU in a cooperative way and then make sure the task executor can only see the GPU resources allocated for it. We provide a privilege mode for this in the default script.

...

  • The assignment file is “/var/tmp/flink-gpu-assignment” by default, user could set it through through adding "--assign-file filePath" to the “external-resource.gpu.param.discovery-script.args”. User needs to ensure that it is same file for all the task executors in the same cluster and do not delete it before the cluster stopped. If the cluster is not stopped properly, this file needs to be deleted manually.

After that, the script leverages the “cgroups” mechanism[5], with which we could set the visibility of each GPU resource for a certain control group (task executor and its children processes). The script will create a specific cgroup for the task executor. Then add the indexes of GPU resources, which the task executor could not use, to the device blacklist of this cgroup. The task executor and its children processes will only see the GPU resources allocated for it since then.

The release function will check the GPUs used by the task executor through PID and remove those records from the assignment file.

...

When task executor stopped, the record may not be clean. In this scenario, new task executors start after this will read the dirty data. That may cause task executor mistakenly reports there is no enough GPU resource. To address this issue, the script provides a “--check-dead” option. If it is added to in “external-resource.gpu.param.discovery-script.args”, in case of no enough non-recorded GPU, the allocate function will check whether the associated processes of exist records are still alive, and take over those GPUs whose associated processes are already dead. 


For example, if user want to trigger privilege mode, they could set “external-resource.gpu.param.discovery-script.args” to "--privilege --check-dead --assign-file /tmp/flink-assign". This will execute the default discovery script in privilege mode, check if there is dead process occupy the GPU resources and locate the resource assignment file in "/tmp/flink-assign".

...

  • Introduce the ExternalResourceDriver class and related config options.
  • Construct ExternalResourceDriver to the Let ResourceManager and allocate external resources in container/Pod of Yarn/Kubernetes accordingly.
  • Construct ExternalResourceDriver to the TaskExecutor and provide the external resource information through the RuntimeContext.
  • Introduce GPUDriver.
  • Add privilege option to the default discovery script.
  • Update the RestAPI/WebUI to properly display information of GPU Resources.

Known Limitations

  • Currently, Yarn only supports the NVIDIA GPU.
  • There is no operator-level isolation for GPU resources at the moment. Meanwhile, there is no GPU cooperating mechanism among operators.
    • The GPU resource of a task executor will be shared by all operators scheduled on it. If multiple operators scheduled to the same GPU, the video memory limitation may be exceeded and the job will fail with OOM.
    • The usage distribution of GPU resources could be skew, e.g. all the operators may run their jobs in the same GPU while other GPUs are idle. The idle GPU resources are wasted in this scenario.
    • The operator-level isolation and fine-grained GPU resource scheduling might be achieved in the future, when FLIP-56[4] is ready. Currently, we could rely on operators cooperatively share all GPU resources by themselves.

...

  • This FLIP proposes can check by its test case.
  • The GPU discovery logic could only be tested manually on the device with NVIDIA GPU.

Future work

  • Add external resource information to RestAPI / WebAPI.

Reference

[1] Hadoop 3.1.0 – Using GPU On YARN

...