Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Public Interfaces

Introduce the ExternalResourceDriver external resource framework for external resource allocation and management. User could implement their third party driver for other external resources they want to leverage. The  The pattern of configuration options is:

  • external-resource.{resourceName}.amount. Define the amount of external resources in a task executor.
  • external-resource.{resourceName}.driver.class. Define the class name of ExternalResourceDriver.
  • external-resource.{resourceName}.kubernetes.key. Optional config which defines the configuration key of that external resource in Kubernetes. If you want the Flink to request the external resource from Kubernetes, you need to explicitly set this key. Only valid for Kubernetes mode.
  • external-resource.{resourceName}.yarn.key. Optional config which defines the configuration key of that external resource in Yarn. If you want the Flink to request the external resource from Yarn, you need to explicitly set this key. Only valid for Yarn mode.
  • external-resource.{resourceName}.param.{params}. Each ExternalResourceDriver could define their specific configs following this pattern.

For GPU resource, we introduce the following configuration options:

...

On the TaskExecutor side, introduce the ExternalResourceDriver interface, take the responsibility to manage and provide information of the external resource. User could implement their third party ExternalResourceDriver for other external resources they want to leverage.

Code Block
languagejava
titleExternalResourceDriver
public interface ExternalResourceDriver {
	/**
	 * Configures this driver. This method should always be called first on a newly instantiated driver.
	 *
	 * @param config configuration contains all parameters set for this driver
	 */
	void open(Configuration config) throws Exception;

	/**
	 * Closes this driver. Should be used to clean state and release resources.
	 */
	void close() throws Exception;

    /**
    * Retrieve the information of the external resources according to the resourceProfile.
    */
    List<ExternalResourceInfo> retrieveResourceInfo(ResourceProfile resourceProfile);
}

...

Introduce the ExternalResourceInfo class, which contains the information of the external resources. Operators and

Code Block
languagejava
titleExternalResourceInfo
public interface ExternalResourceInfo {
	String getInformation();
}

Operators and functions could get that information from the RuntimeContext.

For GPU resource, we introduce the GPUInforamtion class, which contains the index of a GPU card.

RestAPI / WebUI (Need to get the information of GPU resource through the RestAPI and WebUI)

Proposed Changes

Code Block
languagejava
titleRuntimeContext
public interface RuntimeContext {
    /**
	 * Get the external resource information.
	 */
	Map<String, Set<ExternalResourceInfo>> getExternalResourceInfo(ResourceSpec resourceSpec);
}

For GPU resource, we introduce the following configuration options:

  • external-resource.gpu.amount”: Define how many GPUs in a task executor. The default value should be 0.
  • external
  • We introduce the ExternalResourceDriver framework for external resource allocation and management.
  • User sets the “taskmanager.resource.gpu.amount”  and specifies the “external-resource.gpu.param.discovery-script.[path|args]” if needed.
  • For Yarn/Kubernetes mode, Flink maps the “taskmanager.resource.gpu.amount” to the corresponding field of resource requests to the external resource manager.
  • Introduce a GPUManager, which will execute the discovery script and get the available GPU resources from the output.
  • Operators and functions get the GPU resource information from GPUManager

To provide extensibility and decouple the TaskExecutor/ResourceManager from the external resource management/allocation(following the separation of concern rule), we introduce the ExternalResourceDriver framework for external resource allocation and management. This class could be extended by third-party for other external resources they want to leverage.

The ExternalResourceDriver framework drives the end-to-end workflow of external resource allocation and management.

On the ResourceManager side, user defines the amount of the external resource. ExternalResourceDriver framework takes the responsibility to allocate resources from external resource managers(Yarn/Kubernetes). User needs to specify the configuration key of that external resource on Yarn/Kubernetes. Then, Yarn/KubernetesResourceManager forward this external resource request to the external resource managers.

  • For Yarn, the YarnResourceManager adds the external resource to the ContainerRequest.
  • For Kubernetes, the KubernetesResourceManager adds the external resource to the pod for TaskExecutor.

On the TaskExecutor side, ExternalResourceDriver takes the responsibility to detect and provide information of external resources. TaskExecutor does not need to manage a specific external resource by itself, Operators and functions would get the ExternalResourceInfo from RuntimeConext.

Regarding the configuration, the common config keys are the amount of the external resources and the class name of ExternalResourceDriver. Besides, each driver could define their own configs following the specific pattern. In summary:

  • external-resource.{resourceName}.amount. Define the amount of external resources in a task executor.
  • external-resource.{resourceName}.driver.class. Define the class name of ExternalResourceDriver.
  • external-resource.{resourceName}.kubernetes.key. Optional config which defines the configuration key of that external resource in Kubernetes. If you want the Flink to request the external resource from Kubernetes, you need to explicitly set this key. Only valid for Kubernetes mode.
  • external-resource.{resourceName}.yarn.key. Optional config which defines the configuration key of that external resource in Yarn. If you want the Flink to request the external resource from Yarn, you need to explicitly set this key. Only valid for Yarn mode.
  • external-resource.{resourceName}.param.{params}. Each ExternalResourceDriver could define their specific configs following this pattern.

The definition of ExternalResourceDriver and ExternalResourceInfo is:

  • path”: Define the path of the discovery script. See Discovery Script Section.
  • external-resource.gpu.param.discovery-script.args”: Define the arguments passed to the discovery script. See Discovery Script Section.
  • external-resource.{resourceName}.kubernetes.key. Define the configuration key of GPU in Kubernetes. The default value is “nvidia.com/gpu”. If using amd GPU, user could set it to "amd.com/gpu"
  • external-resource.{resourceName}.yarn.key. Define the configuration key of GPU in Yarn. The default value is "yarn.io/gpu".

We provide a GPUDriver which discovers the GPU resource through user-defined discovery script and provide the available GPU index.

For GPU resource, we introduce the GPUInforamtion class, which only contains the index of a GPU card.

Proposed Changes

  • We introduce the ExternalResourceDriver framework for external resource allocation and management.
  • User sets the “taskmanager.resource.gpu.amount”  and specifies the “external-resource.gpu.param.discovery-script.[path|args]” if needed.
  • For Yarn/Kubernetes mode, Flink maps the “taskmanager.resource.gpu.amount” to the corresponding field of resource requests to the external resource manager.
  • Introduce a GPUDriver, which will execute the discovery script and get the available GPU resources from the output.
  • Operators and functions get the GPU resource information from GPUDriver

To provide extensibility and decouple the TaskExecutor/ResourceManager from the external resource management/allocation(following the separation of concern rule), we introduce the external resource framework for external resource allocation and management. This framework could be extended by third-party for other external resources they want to leverage.

The external resource framework drives the end-to-end workflow of external resource allocation and management.

On the ResourceManager side, user defines the amount of the external resource. The framework takes the responsibility to allocate resources from external resource managers(Yarn/Kubernetes). User needs to specify the configuration key of that external resource on Yarn/Kubernetes. Then, Yarn/KubernetesResourceManager forward this external resource request to the external resource managers.

  • For Yarn, the YarnResourceManager adds the external resource to the ContainerRequest.
  • For Kubernetes, the KubernetesResourceManager adds the external resource to the pod for TaskExecutor.

On the TaskExecutor side, we introduce ExternalResourceDriver, which takes the responsibility to detect and provide information of external resources. TaskExecutor does not need to manage a specific external resource by itself, Operators and functions would get the ExternalResourceInfo from RuntimeConext.

Regarding the configuration, the common config keys are the amount of the external resources and the class name of ExternalResourceDriver. Besides, each driver could define their own configs following the specific pattern. In summary:

  • external-resource.{resourceName}.amount. Define the amount of external resources in a task executor.
  • external-resource.{resourceName}.driver.class. Define the class name of ExternalResourceDriver.
  • external-resource.{resourceName}.kubernetes.key. Optional config which defines the configuration key of that external resource in Kubernetes. If you want the Flink to request the external resource from Kubernetes, you need to explicitly set this key. Only valid for Kubernetes mode.
  • external-resource.{resourceName}.yarn.key. Optional config which defines the configuration key of that external resource in Yarn. If you want the Flink to request the external resource from Yarn, you need to explicitly set this key. Only valid for Yarn mode.
  • external-resource.{resourceName}.param.{params}. Each ExternalResourceDriver could define their specific configs following this pattern.

The definition of ExternalResourceDriver and ExternalResourceInfo is:


Code Block
languagejava
titleExternalResourceDriver
public interface ExternalResourceDriver {
	/**
	 * Configures this driver. This method should always be called first on a newly instantiated driver.
	 *
	 * @param config configuration contains all parameters set for this driver
	 */
	void open(Configuration config) throws Exception;

	/**
	 * Closes this driver. Should be used to clean state and release resources.
	 */
	void close() throws Exception;
Code Block
languagejava
titleExternalResourceDriver
public abstract class ExternalResourceDriver {
    /**
    * Retrieve the information of the external resources according to the resourceProfile.
    */
    List<ExternalResourceInfo> retrieveResourceInfo(ResourceProfile resourceProfile);
}

public interface ExternalResourceInfo {
	String getInformation();
}

...

We introduce the configuration option “external-resource.gpu.param.discovery-script” and “external-resource.gpu.param.discovery-script.args”, which define the path of discovery script and its arguments. The discovery script should have two functions: allocate and release. GPUManager will GPUDriver will execute the allocate function and get the available GPU resources from the output when it is opened and execute the release function when it is closed. 

...

  • Introduce the ExternalResourceDriver class and related config options.
  • Let ResourceManager allocate external resources in container/Pod of Yarn/Kubernetes accordingly.
  • Construct ExternalResourceDriver to the TaskExecutor and provide the external resource information through the RuntimeContext.
  • Introduce GPUDriver.
  • Add privilege option to the default discovery script.Update the RestAPI/WebUI to properly display information of GPU Resources.

Known Limitations

  • Currently, Yarn only supports the NVIDIA GPU.
  • There is no operator-level isolation for GPU resources at the moment. Meanwhile, there is no GPU cooperating mechanism among operators.
    • The GPU resource of a task executor will be shared by all operators scheduled on it. If multiple operators scheduled to the same GPU, the video memory limitation may be exceeded and the job will fail with OOM.
    • The usage distribution of GPU resources could be skew, e.g. all the operators may run their jobs in the same GPU while other GPUs are idle. The idle GPU resources are wasted in this scenario.
    • The operator-level isolation and fine-grained GPU resource scheduling might be achieved in the future, when FLIP-56[4] is ready. Currently, we could rely on operators cooperatively share all GPU resources by themselves.

...

  • This FLIP proposes can check by its test case.
  • The GPU discovery logic could only be tested manually on the device with NVIDIA GPU.

Future work

  • Add external resource information to RestAPI / WebAPI.

Reference

[1] Hadoop 3.1.0 – Using GPU On YARN

...