Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Public Interfaces

Introduce the ExternalResourceDriver for external resource allocation and management. User could implement their third party driver for other external resources they want to leverage. The pattern of configuration options is:

  • external-resource.{resourceName}.amount. Define the amount of external resources in a task executor.
  • external-resource.{resourceName}.driver.class. Define the class name of ExternalResourceDriver.
  • external-resource.{resourceName}.param.{params}. Each ExternalResourceDriver could define their specific configs following this pattern.

For GPU resource, we introduce the following configuration options:

  • “taskmanager.external-resource.gpu.amount”: Define how many GPUs in a task executor. The default value should be 0.
  • “taskmanager.external-resource.gpu.param.discovery-script.path”: Define the path of the discovery script. See See Discovery Script Section.
  • “taskmanager.external-resource.gpu.param.discovery-script.args”: Define the arguments passed to the discovery script. See See Discovery Script Section.
  • “kubernetes.taskmanager.external-resource.gpu.param.vendor”: Define the vendor of the GPU resource. In Kubernetes, the configuration key of GPU resource is “<vendor>.com/gpu”[3]. Only accept “nvidia” and “amd” at the moment. Only valid for Kubernetes mode.


RestAPI / WebUI (Need to get the information of GPU resource through the RestAPI and WebUI)

Introduce the GPUInforamtion the ExternalResourceInfo class, which contains the information of a GPU card. UDF could get it from RuntimeContext and FunctionContextthe external resources. RichFunction/UserDefinedFunction could get those information from the RuntimeContext.

Proposed Changes

  • We introduce the ExternalResourceDriver for external resource allocation and management.
  • User sets the “taskmanager.resource.gpu.amount”  and specifies the “taskmanager.“external-resource.gpu.param.discovery-script.[path|args]” if needed.
  • For Yarn/Kubernetes mode, Flink maps the “taskmanager.resource.gpu.amount” to the corresponding field of resource requests to the external resource manager.
  • Introduce a GPUManager, which will execute the discovery script and get the available GPU resources from the output.
  • Operators RichFunction/UserDefinedFunction get the GPU resource information from GPUManager

To provide extensibility and decouple the TaskExecutor/ResourceManager from the external resource management/allocation(following the separation of concern rule), we introduce the ExternalResourceDriver for the external resource allocation and management. This class could be extended by third-party for other external resources they want to leverage.

The ExternalResourceDriver drives the end-to-end workflow of external resource allocation and management.

On the ResourceManager side, user defines the amount of the external resource. ExternalResourceDriver takes the responsibility to allocate resources from external resource managers(Yarn/Kubernetes). ResourceManager does not need to understand how to allocate a specific external resource.

  • For Yarn, the ExternalResourceDriver needs to add the external resource to the ContainerRequest.
  • For Kubernetes, the pod for TaskExecutor is built by multiple decorators. ExternalResourceDriver needs to provide a specific decorator to forward the external resource request to the pod definition.

On the TaskExecutor side, ExternalResourceDriver takes the responsibility to detect and provide information of external resources. TaskExecutor does not need to manage a specific external resource by itself, RichFunction/UserDefinedFunction would get the ExternalResourceInfo from RuntimeConext.

Regarding the configuration, the common config keys are the amount of the external resources and the class name of ExternalResourceDriver. Besides, each driver could define their own configs following the specific pattern. In summary:

  • external-resource.{resourceName}.amount. Define the amount of external resources in a task executor.
  • external-resource.{resourceName}.driver.class. Define the class name of ExternalResourceDriver.
  • external-resource.{resourceName}.param.{params}. Each ExternalResourceDriver could define their specific configs following this pattern.

The definition of ExternalResourceDriver and ExternalResourceInfo is:

public abstract class ExternalResourceDriver {

    /**

    * Retrieve the information of the external resources according to the resourceProfile.

    */

    List<ExternalResourceInfo> retrieveResourceInfo(ResourceProfile resourceProfile);

    /**

    * When running in Kubernetes, we need to decorate the TM pod to request the external resource.

    */

    AbstractKubernetesStepDecorator getExternalResourceDecorator();

    /**

    * When running in Yarn, we need to add the external resource request to the Resource of 

    */

    void addExternalResourceToRequest(AMRMClient.ContainerRequest containerRequest);

}

public abstract class ExternalResourceInfo {

    // Return the name of that external resource.

    String getName();

}

We introduce the GPUDriver for the GPU resources.

Flink requires the environment in which task executors run has required GPU resources and the GPU resources are accessible to task executors.

...

  • We introduce the configuration option “taskmanager.external-resource.gpu.amount”, which defines how many GPU cores a task executor should have. Notice that this value will be passed to the discovery script as the first argument, See See Discovery Script Section.
  • For standalone mode, it should be guaranteed by user that there are required GPU resources in the environment where task executors run.
  • For Yarn/Kubernetes mode, Flink will they will guarantee there are required amount of GPU resources in the container if we set the corresponding field of container/pod requests. in the request.
    • For Yarn,
    it is "
    • Flink will set the corresponding field(yarn.io/gpu
    ".
    • ) of container requests by GPUDriver#addExternalResourceToRequest.
    • For Kubernetes,
    it
    • Flink will set the corresponding field of pod requests by applying the decorator from GPUDriver#getExternalResourceDecorator. The corresponding field is “<vendor>.com/gpufor Kubernetes. The vendor is configured by 
    “kubernetes.taskmanager.
    • “external-resource.gpu.param.vendor”
    . Then, the Yarn/Kubernetes will guarantee there are required amount of GPU resources in the container
    • .


Regarding the accessibility of the GPU resources:

...

Note: To make GPU resources accessible, certain setups/preparation are needed depending on your environment. See External Requirements Section.

...

Once the required GPU resources are accessible to task executors, task executors need GPUDriver needs to discover GPU resources and provide the GPU resource information to operators.

We introduce GPUManager as one of the task manager services to fulfill the above two responsibilities:

...

GPUDriver will execute the discovery script and get the available GPU resources from the output.

...

Then, as one of the ExternalResourceDriver, operators and functions could get the GPU information from RuntimeContext.

The GPUInformation contains the specific index of the GPU

...

.

We introduce the configuration option “taskmanager.“external-resource.gpu.param.discovery-script” and “taskmanager.“external-resource.gpu.param.discovery-script.args”, which define the path of discovery script and its arguments. The discovery script should have two functions: allocate and release. GPUManager will execute the allocate function and get the available GPU resources from the output when it is opened and execute the release function when it is closed. 

...

  • The script would first get all the indexes of visible GPU resources, by using the “nvidia-smi”/“rocm-smi” command.
  • It will return a list of indexes of discovered GPUs, split by a comma. 
  • The number of GPU indexes returned from the default script should always match the amount configured through “taskmanager.“external-resource.gpu.amount”
    • If there are more GPUs discovered than configured, the script returns only as many indexes as the configured amount.
    • If there are not enough GPU discovered, the script will fail and exit with non-zero.

In its “release” function, the script will do nothing and exit with zero.

For achieving worker-level isolation in standalone mode, we provide a privilege option for the default script. User needs to configure the “taskmanager.resource.gpu.discovery-script.args” to turn it on. For more discussion of worker-level isolation, See Worker-level Isolation Sectionsection.

User can also provide their own discovery scripts, addressing their custom requirements, e.g., dynamically deciding the amount of returned GPU indexes, etc.

...

  • The assignment file is “/var/tmp/flink-gpu-assignment” by default, user could set it through the “taskmanager.“external-resource.gpu.param.discovery-script.args”. User needs to ensure that it is same file for all the task executors in the same cluster and do not delete it before the cluster stopped. If the cluster is not stopped properly, this file needs to be deleted manually.

...

  • If the task executor failed unexpectedly, the release function may not be triggered.  In this scenario, task executors start after this will read the dirty data. That may cause task executor mistakenly reports there is no enough GPU resource. To address this issue, the script provides a “check-dead-process” option. If it is switched on in “taskmanager.“external-resource.gpu.param.discovery-script.args”, in case of no enough non-recorded GPU, the allocate function will check whether the associated processes of exist records are still alive, and take over those GPUs whose associated processes are already dead. 

We do not guarantee operator-level isolation in the first step. All the operators on the same task executor can see all the visible GPU indexes.

...

Implementation Steps

  • Introduce config options “taskmanager.resource.gpu.amount” and forward the request to the ExternalResourceDriver class.
  • Construct ExternalResourceDriver to the ResourceManager and allocate external resources in container/Pod of Yarn/Kubernetes.
  • Introduce “taskmanager.resource.gpu.discovery-script.[path|args]” config option and provide a default script.
  • Construct ExternalResourceDriver to the TaskExecutor and provide the external resource information through the RuntimeContext.
  • Introduce GPUDriver.Introduce GPUManager and add it to TaskManagerServices. Make it accessible to the operator.
  • Add privilege option to the default discovery script.
  • Update the RestAPI/WebUI to properly display information of GPU Resources.

...