Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-108-Add-GPU-support-in-Flink-td38286.html

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-17044

Released: <Flink Version>

...

Code Block
languagejava
titleRuntimeContext
public interface RuntimeContext {
    /**
	 * Get the external resource information.
	 */
	Map<String, Set<ExternalResourceInfo>> getExternalResourceInfo(ResourceSpec resourceSpec);
}

For GPU resource, we introduce the following configuration options:

  • external-resource.gpu.amount”: Define how many GPUs in a task executor. The default value should be 0.
  • external-resource.gpu.param.discovery-script.path”: Define the path of the discovery script. See Discovery Script Section.
  • external-resource.gpu.param.discovery-script.args”: Define the arguments passed to the discovery script. See Discovery Script Section.
  • external-resource.{resourceName}.kubernetes.key. Define the configuration key of GPU in Kubernetes. The default value is “nvidia.com/gpu”. If using amd GPU, user could set it to "amd.com/gpu"
  • external-resource.{resourceName}.yarn.key. Define the configuration key of GPU in GPU in Yarn. The default value is "yarn.io/gpu".

...

  • We introduce the ExternalResourceDriver framework for external resource allocation and management.
  • User sets the “taskmanager.“external-resource.gpu.amount”  and specifies the “external-resource.gpu.param.discovery-script.[path|args]” if needed.
  • For Yarn/Kubernetes mode, Flink maps the “taskmanager.“external-resource.gpu.amount” to the corresponding field of resource requests to the external resource manager.
  • Introduce a GPUDriver, which will execute the discovery script and get the available GPU resources from the output.
  • Operators and functions get the GPU resource information from GPUDriver

...

  • Return a list of the available GPU indexes, split by a comma.
  • Exit with non-zero if the output does not meet the expectation. GPUDriver will throw exception in that case and then cause TaskExecutor initialization to fail.
  • Flink passes Flink passes the keyword “allocate” and the amount (taskmanager.external-resource.gpu.amount) as the first two arguments into the script. The user-defined arguments would be appended after it.

...

  • Clean up all the state and file it produced in allocate function.
  • Exit with non-zero in failure. GPUDriver will throw exception and print error log.
  • Flink passes the keyword “release” and the amount (taskmanager.external-resource.gpu.amount) as the first two arguments into the script. The user-defined arguments would be appended after it.

...

For achieving worker-level isolation in standalone mode, we provide a privilege option for the default script. User needs to configure the “taskmanager.add "--privilege" to the “external-resource.gpu.param.discovery-script.args” to turn it on. For more discussion of worker-level isolation, See Worker-level Isolation section.

...

  • The assignment file is “/var/tmp/flink-gpu-assignment” by default, user could set it through adding "--assign-file filePath" to the “external-resource.gpu.param.discovery-script.args”. User needs to ensure that it is same file for all the task executors in the same cluster and do not delete it before the cluster stopped. If the cluster is not stopped properly, this file needs to be deleted manually.

...

  • If the task executor failed unexpectedly, the release function may not be triggered.  In this scenario, task executors start after this will read the dirty data. That may cause task executor mistakenly reports there is no enough GPU resource. To address this issue, the script provides a “check--check-dead-process” option. If it is switched on added to in “external-resource.gpu.param.discovery-script.args”, in case of no enough non-recorded GPU, the allocate function will check whether the associated processes of exist records are still alive, and take over those GPUs whose associated processes are already dead. 


For example, if user want to trigger privilege mode, they could set “external-resource.gpu.param.discovery-script.args” to "--privilege --check-dead --assign-file /tmp/flink-assign". This will execute the default discovery script in privilege mode, check if there is dead process occupy the GPU resources and locate the resource assignment file in "/tmp/flink-assign".

...