Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • external-resource.list. Define the {resourceName} list of enabled external resources, split by delimiter ",".
  • external-resource.{resourceName}.amount. Define the amount of external resources in a task executor.
  • external-resource.{resourceName}.driver-factory.class. Define the class name of ExternalResourceDriverExternalResourceDriverFactory.
  • external-resource.{resourceName}.kubernetes.key. Optional config which defines the configuration key of that external resource in Kubernetes. If you want the Flink to request the external resource from Kubernetes, you need to explicitly set this key. Only valid for Kubernetes mode.
  • external-resource.{resourceName}.yarn.key. Optional config which defines the configuration key of that external resource in Yarn. If you want the Flink to request the external resource from Yarn, you need to explicitly set this key. Only valid for Yarn mode.
  • external-resource.{resourceName}.param.{params}. Each ExternalResourceDriver could define their specific configs following this pattern.

On the TaskExecutor side, introduce the ExternalResourceDriverFactory and ExternalResourceDriver interface, take the responsibility to manage and provide information of the external resource. User could implement their third party ExternalResourceDriver for other external resources they want to leverage.

Code Block
languagejava
titleExternalResourceDriver
public interface ExternalResourceDriverExternalResourceDriverFactory {
	    /**
	    * ConfiguresConstruct thisthe driver.ExternalResourceDriver Thisfrom methodconfiguration.
 should always be called*/
 first on a newlyExternalResourceDriver instantiated driver.
	 *
	 * @param config configuration contains all parameters set for this driver
	 */
	void open(Configuration config) throws Exception;

	/**
	 * Closes this driver. Should be used to clean state and release resources.
	 */
	void close() throws Exception;

    /**
    * Retrieve the information of the external resources according to the resourceProfile.
    */
    Set<ExternalResourceInfo> retrieveResourceInfo(ResourceProfile resourceProfile);
}

Introduce the ExternalResourceInfo class, which contains the information of the external resources.

retrieveResourceInfo(Congiuration config);
}

public interface ExternalResourceDriver {
    /**
    * Retrieve the information of the external resources according to the resourceProfile.
    */
    Set<ExternalResourceInfo> retrieveResourceInfo(long amount);
}

Introduce the ExternalResourceInfo class, which contains the information of the external resources.

Code Block
Code Block
languagejava
titleExternalResourceInfo
public interface ExternalResourceInfo {
	String getInformation();
}

Operators and functions could get that information from the RuntimeContext.

...

  • external-resource.list. Define the {resourceName} list of enabled external resources with delimiter ",". If configured, ResourceManager and TaskExecutor would check if the relevant configs exist for resources in this list. ResourceManager will forward the request to the underlying external resource manager. TaskExecutor will launch the corresponding ExternalResourceDriver.
  • external-resource.{resourceName}.amount. Define the amount of external resources in a task executor.
  • external-resource.{resourceName}.driver-factory.class. Define  Define the class name of ExternalResourceDriverExternalResourceDriverFactory.
  • external-resource.{resourceName}.kubernetes.key. Optional config which defines the configuration key of that external resource in Kubernetes. If you want the Flink to request the external resource from Kubernetes, you need to explicitly set this key. Only valid for Kubernetes mode.
  • external-resource.{resourceName}.yarn.key. Optional config which defines the configuration key of that external resource in Yarn. If you want the Flink to request the external resource from Yarn, you need to explicitly set this key. Only valid for Yarn mode.
  • external-resource.{resourceName}.param.{params}. Each ExternalResourceDriver could define their specific configs following this pattern.

...

Code Block
languagejava
titleExternalResourceDriver
public interface ExternalResourceDriverExternalResourceDriverFactory {
	    /**
	    * Construct the ConfiguresExternalResourceDriver thisfrom driverconfiguration.
  This method should*/
 always be called firstExternalResourceDriver on a newly instantiated driver.
	 *
	 * @param config configuration contains all parameters set for this driver
	 */
	void open(Configuration config) throws Exception;

	/**
	 * Closes this driver. Should be used to clean state and release resources.
	 */
	void close() throws Exception;

    /**
    * Retrieve the information of the external resources according to the resourceProfile.
    */
    Set<ExternalResourceInfo> retrieveResourceInfo(ResourceProfile resourceProfile);
}

public interface ExternalResourceInfo {
	String getInformation();
}retrieveResourceInfo(Congiuration config);
}

public interface ExternalResourceDriver {
    /**
    * Retrieve the information of the external resources according to the resourceProfile.
    */
    Set<ExternalResourceInfo> retrieveResourceInfo(long amount);
}

public interface ExternalResourceInfo {
}

...

  • For standalone mode, it should be guaranteed by user, that the GPU indexes returned from discovery script (if any) exist and are accessible.
  • For Yarn/Kubernetes mode, the Yarn/Kubernetes will guarantee the requested GPU resources are accessible to the container.

Note: To make GPU resources accessible, certain setups/preparation are needed depending on your environment. See External Requirements Section.

...

  • the requested GPU resources are accessible

...

  • to

...

  • the container.


Note: To make GPU resources accessible, certain setups/preparation are needed depending on your environment. See External Requirements Section.

Once the required GPU resources are accessible to task executors, GPUDriver needs to discover GPU resources and provide the GPU resource information to operators.

GPUDriver will execute the discovery script and

...

GPUDriver will execute the discovery script and get the available GPU resources from the output. Then, as one of the ExternalResourceDriver, operators and functions could get the GPU information from RuntimeContext.

The GPUInformation contains the specific index of the GPU.

We introduce the configuration option “external-resource.gpu.param.discovery-script” and “external-resource.gpu.param.discovery-script.args”, which define the path of discovery script and its arguments. The discovery script should have two functions: allocate and deallocateAll. GPUDriver will execute the allocate function and get the available GPU resources from the output when it is opened and execute the deallocateAll function when it is closed. 

In the allocate function, the discovery script should:

  • Return a list of the available GPU indexes, split by a comma.
  • Exit with non-zero if the output does not meet the expectation. GPUDriver will throw exception in that case and then cause TaskExecutor initialization to fail.
  • Flink passes the keyword “allocate” and the amount (external-resource.gpu.amount) as the first two arguments into the script. The user-defined arguments would be appended after it.

In the deallocateAll function, the discovery script should:

  • Clean up all the state and file it produced in allocate function.
  • Exit with non-zero in failure. GPUDriver will throw exception and print error log.
  • Flink passes the keyword “deallocateAll” as the first argument into the script. The user-defined arguments would be appended after it.

We provide a default script. In its allocate function:

  • The script would first get all the indexes of visible GPU resources, by using the “nvidia-smi”/“rocm-smi” command.
  • It will return a list of indexes of discovered GPUs, split by a comma. 
  • The number of GPU indexes returned from the default script should always match the amount configured through “external-resource.gpu.amount”
    • If there are more GPUs discovered than configured, the script returns only as many indexes as the configured amount.
    • If there are not enough GPU discovered, the script will fail and exit with non-zero.

In its “deallocateAll” function, the script will do nothing and exit with zero.

For achieving worker-level isolation in standalone mode, we provide a privilege option for the default script. User needs to add "--privilege" to the “external-resource.gpu.param.discovery-script.args” to turn it on. For more discussion of worker-level isolation, See Worker-level Isolation section.

User can also provide their own discovery scripts, addressing their custom requirements, e.g., dynamically deciding the amount of returned GPU indexes, etc.

Unlike CPU, GPU has two dimensions resource: cores and video memory. While the cores could be shared by multiple jobs, video memory should be exclusive. Sharing the same GPU across multiple operators may result in OOMs and job failures if the total video memory limit is exceeded. Thus, we need isolation to make sure video memory usage does not exceed the physical limitation.

For Yarn/Kubernetes mode, the underlying system will guarantee that the visible GPU resources on the container should be strictly matching the requirement. Thus, worker-level isolation could be achieved without extra work.

For standalone mode, multiple task executors may be co-located on the same device, and each GPU is visible to all the task executors. To achieve worker-level isolation in such scenarios, we need first decide which task executor uses which GPU in a cooperative way and then make sure the task executor can only see the GPU resources allocated for it. We provide a privilege mode for this in the default script.

In the privilege mode, we use a common assignment file to synchronize the GPU assignment across different task executors. After retrieving indexes of all visible GPUs, the script should open the assignment file, check which GPUs are already in use, and write down which GPUs it decides to use (choosing from the non-used ones) with the PID of task executor. We leverage the “flock” mechanism to guarantee accesses to the assignment file are exclusive. Since only one process can access the assignment file at the same time, we ensure no GPU will be used by multiple workers.

  • The assignment file is “/var/tmp/flink-gpu-assignment” by default, user could set it through adding "--assign-file filePath" to the “external-resource.gpu.param.discovery-script.args”. User needs to ensure that it is same file for all the task executors in the same cluster and do not delete it before the cluster stopped. If the cluster is not stopped properly, this file needs to be deleted manually.

After that, the script leverages the “cgroups” mechanism[5], with which we could set the visibility of each GPU resource for a certain control group (task executor and its children processes). The script will create a specific cgroup for the task executor. Then add the indexes of GPU resources, which the task executor could not use, to the device blacklist of this cgroup. The task executor and its children processes will only see the GPU resources allocated for it since then.

The deallocateAll function will check the GPUs used by the task executor through PID and remove those records from the assignment file.

...

. Then, as one of the ExternalResourceDriver, operators and functions could get the GPU information from RuntimeContext.

The GPUInformation contains the specific index of the GPU.

We introduce the configuration option “external-resource.gpu.param.discovery-script” and “external-resource.gpu.param.discovery-script.args”, which define the path of discovery script and its arguments. GPUDriver will execute the allocate function and get the available GPU resources from the output when it is opened and execute the deallocateAll function when it is closed. 

When executed, the discovery script should:

  • Return a list of the available GPU indexes, split by a comma.
  • Exit with non-zero if the output does not meet the expectation. GPUDriver will throw exception in that case and then cause TaskExecutor initialization to fail.
  • Flink passes the amount (external-resource.gpu.amount) as the first argument into the script. The user-defined arguments would be appended after it.

We provide a default script:

  • The script would first get all the indexes of visible GPU resources, by using the “nvidia-smi”/“rocm-smi” command.
  • It will return a list of indexes of discovered GPUs, split by a comma. 
  • The number of GPU indexes returned from the default script should always match the amount configured through “external-resource.gpu.amount”
    • If there are more GPUs discovered than configured, the script returns only as many indexes as the configured amount.
    • If there are not enough GPU discovered, the script will fail and exit with non-zero.

For achieving worker-level isolation in standalone mode, we provide a privilege option for the default script. User needs to add "--privilege" to the “external-resource.gpu.param.discovery-script.args” to turn it on. For more discussion of worker-level isolation, See Worker-level Isolation section.

User can also provide their own discovery scripts, addressing their custom requirements, e.g., dynamically deciding the amount of returned GPU indexes, etc.

Unlike CPU, GPU has two dimensions resource: cores and video memory. While the cores could be shared by multiple jobs, video memory should be exclusive. Sharing the same GPU across multiple operators may result in OOMs and job failures if the total video memory limit is exceeded. Thus, we need isolation to make sure video memory usage does not exceed the physical limitation.

For Yarn/Kubernetes mode, the underlying system will guarantee that the visible GPU resources on the container should be strictly matching the requirement. Thus, worker-level isolation could be achieved without extra work.

For standalone mode, multiple task executors may be co-located on the same device, and each GPU is visible to all the task executors. To achieve worker-level isolation in such scenarios, we need first decide which task executor uses which GPU in a cooperative way. We provide a privilege mode for this in the default script.

In the privilege mode, we use a common assignment file to synchronize the GPU assignment across different task executors. After retrieving indexes of all visible GPUs, the script should open the assignment file, check which GPUs are already in use, and write down which GPUs it decides to use (choosing from the non-used ones) with the PID of task executor. We leverage the “flock” mechanism to guarantee accesses to the assignment file are exclusive. Since only one process can access the assignment file at the same time, we ensure no GPU will be used by multiple workers.

  • The assignment file is “/var/tmp/flink-gpu-assignment” by default, user could set it through adding "--assign-file filePath" to the “external-resource.gpu.param.discovery-script.args”. User needs to ensure that it is same file for all the task executors in the same cluster and do not delete it before the cluster stopped. If the cluster is not stopped properly, this file needs to be deleted manually.

When task executor stopped, the record may not be clean. In this scenario, new task executors start after this will read the dirty data. That may cause task executor mistakenly reports there is no enough GPU resource. To address this issue, the script provides a “--check-dead” option. If it is added to in “external-resource.gpu.param.discovery-script.args”, in case of no enough non-recorded GPU, the allocate function will check whether the associated processes of exist records are still alive, and take over those GPUs whose associated processes are already dead. 


For example, if user want to trigger privilege mode, they could set “external-resource.gpu.param.discovery-script.args” to "--privilege --check-dead --assign-file /tmp/flink-assign". This will execute the default discovery script in privilege mode, check if there is dead process occupy the GPU resources and locate the resource assignment file in "/tmp/flink-assign".

...