在使用多 GPU 节点时在 Kubernetes 上共同部署 Dask worker#
为了在使用多 GPU 节点时优化性能,最佳实践是以紧密分组的方式调度 Dask worker,从而最大程度地减少 worker pod 之间的通信开销。本指南提供了向 worker pod 添加 pod 亲和性的分步过程,确保它们尽可能在 Google Kubernetes Engine (GKE) 上共同调度,但这些原则也可适用于其他 Kubernetes 分发版本。
先决条件#
首先,您需要安装 gcloud
CLI 工具,以及 kubectl
、helm
等用于管理 Kubernetes 的工具。
确保您已登录 gcloud
CLI。
$ gcloud init
创建 Kubernetes 集群#
现在我们可以启动一个启用 GPU 的 GKE 集群。
$ gcloud container clusters create rapids-gpu \
--accelerator type=nvidia-tesla-a100,count=2 --machine-type a2-highgpu-2g \
--zone us-central1-c --release-channel stable
使用此命令,您已启动了一个名为 rapids-gpu
的 GKE 集群。您指定它应使用类型为 a2-highgpu-2g 的节点,每个节点具有两个 A100 GPU。
安装驱动程序#
接下来,在每个节点上安装 NVIDIA 驱动程序。
$ kubectl apply -f https://raw.githubusercontent.com/GoogleCloudPlatform/container-engine-accelerators/master/nvidia-driver-installer/cos/daemonset-preloaded-latest.yaml
daemonset.apps/nvidia-driver-installer created
验证 NVIDIA 驱动程序是否已成功安装。
$ kubectl get po -A --watch | grep nvidia
kube-system nvidia-driver-installer-6zwcn 1/1 Running 0 8m47s
kube-system nvidia-driver-installer-8zmmn 1/1 Running 0 8m47s
kube-system nvidia-driver-installer-mjkb8 1/1 Running 0 8m47s
kube-system nvidia-gpu-device-plugin-5ffkm 1/1 Running 0 13m
kube-system nvidia-gpu-device-plugin-d599s 1/1 Running 0 13m
kube-system nvidia-gpu-device-plugin-jrgjh 1/1 Running 0 13m
安装驱动程序后,您就可以测试您的集群了。
让我们创建一个使用 GPU 计算的示例 pod,以确保一切按预期工作。
cat << EOF | kubectl create -f -
apiVersion: v1
kind: Pod
metadata:
name: cuda-vectoradd
spec:
restartPolicy: OnFailure
containers:
- name: cuda-vectoradd
image: "nvidia/samples:vectoradd-cuda11.6.0-ubuntu18.04"
resources:
limits:
nvidia.com/gpu: 1
EOF
$ kubectl logs pod/cuda-vectoradd
[Vector addition of 50000 elements]
Copy input data from the host memory to the CUDA device
CUDA kernel launch with 196 blocks of 256 threads
Copy output data from the CUDA device to the host memory
Test PASSED
Done
如果您在输出中看到 Test PASSED
,您可以确信您的 Kubernetes 集群已正确设置了 GPU 计算。
接下来,清理该 pod。
$ kubectl delete pod cuda-vectoradd
pod "cuda-vectoradd" deleted
使用 Helm 安装 Dask operator#
该 operator 有一个 Helm chart,可用于管理 operator 的安装。按照 Dask 文档中提供的说明进行操作,或者也可以通过以下方式安装
$ helm install --create-namespace -n dask-operator --generate-name --repo https://helm.dask.org dask-kubernetes-operator
NAME: dask-kubernetes-operator-1666875935
NAMESPACE: dask-operator
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
Operator has been installed successfully.
配置 RAPIDS DaskCluster
#
要配置 DaskCluster
资源以运行 RAPIDS,您需要设置几项内容
容器镜像必须包含 RAPIDS,官方 RAPIDS 容器镜像是一个不错的选择。
必须为 Dask worker 配置一个或多个 NVIDIA GPU 资源。
worker 命令必须设置为
dask-cuda-worker
。
使用 kubectl
创建 RAPIDS DaskCluster
#
以下是用于启动具有 worker pod 亲和性的 RAPIDS Dask 集群的资源清单示例
# rapids-dask-cluster.yaml apiVersion: kubernetes.dask.org/v1 kind: DaskCluster metadata: name: rapids-dask-cluster labels: dask.org/cluster-name: rapids-dask-cluster spec: worker: replicas: 2 spec: containers: - name: worker image: "nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12" imagePullPolicy: "IfNotPresent" args: - dask-cuda-worker - --name - $(DASK_WORKER_NAME) resources: limits: nvidia.com/gpu: "1" affinity: podAffinity: preferredDuringSchedulingIgnoredDuringExecution: - weight: 100 podAffinityTerm: labelSelector: matchExpressions: - key: dask.org/component operator: In values: - worker topologyKey: kubernetes.io/hostname scheduler: spec: containers: - name: scheduler image: "nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12" imagePullPolicy: "IfNotPresent" env: args: - dask-scheduler ports: - name: tcp-comm containerPort: 8786 protocol: TCP - name: http-dashboard containerPort: 8787 protocol: TCP readinessProbe: httpGet: port: http-dashboard path: /health initialDelaySeconds: 5 periodSeconds: 10 livenessProbe: httpGet: port: http-dashboard path: /health initialDelaySeconds: 15 periodSeconds: 20 resources: limits: nvidia.com/gpu: "1" service: type: ClusterIP selector: dask.org/cluster-name: rapids-dask-cluster dask.org/component: scheduler ports: - name: tcp-comm protocol: TCP port: 8786 targetPort: "tcp-comm" - name: http-dashboard protocol: TCP port: 8787 targetPort: "http-dashboard"
您可以使用 kubectl
创建此集群。
$ kubectl apply -f rapids-dask-cluster.yaml
清单解析#
此清单的大部分内容已在 RAPIDS 文档的工具部分中的 Dask Operator 文档中进行了解释。
在上述文档页面的示例中唯一的额外添加是 worker 配置中的以下部分
# ...
affinity:
podAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: dask.org/component
operator: In
values:
- worker
topologyKey: kubernetes.io/hostname
# ...
对于 Dask Worker pod 配置,我们使用节点名称作为拓扑键设置了 pod 亲和性。Kubernetes 中的 Pod 亲和性允许您限制 Pod 可以调度到哪些节点上,并允许您配置一组应共同定位在同一已定义拓扑中的工作负载,在这种情况下,倾向于将两个 worker pod 放置在同一节点上。这也被视为一个软要求,因为我们使用的是 preferredDuringSchedulingIgnoredDuringExecution
类型的 pod 亲和性。Kubernetes 调度器会尝试查找符合规则的节点。如果找不到匹配的节点,Kubernetes 调度器仍会将 pod 调度到任何可用节点上。即使无法将 worker pod 放置在已被使用的节点上,这也能确保您的 Dask 集群不会遇到任何问题。
访问您的 Dask 集群#
创建 DaskCluster
资源后,我们可以使用 kubectl
检查它为我们创建的所有其他资源的状态。
$ kubectl get all -l dask.org/cluster-name=rapids-dask-cluster -o wide
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
pod/rapids-dask-cluster-default-worker-12a055b2db-7b5bf8f66c-9mb59 1/1 Running 0 2s 10.244.2.3 gke-rapids-gpu-1-default-pool-d85b49-2545 <none> <none>
pod/rapids-dask-cluster-default-worker-34437735ae-6fdd787f75-sdqzg 1/1 Running 0 2s 10.244.2.4 gke-rapids-gpu-1-default-pool-d85b49-2545 <none> <none>
pod/rapids-dask-cluster-scheduler-6656cb88f6-cgm4t 0/1 Running 0 3s 10.244.3.3 gke-rapids-gpu-1-default-pool-d85b49-2f31 <none> <none>
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE SELECTOR
service/rapids-dask-cluster-scheduler ClusterIP 10.96.231.110 <none> 8786/TCP,8787/TCP 3s dask.org/cluster-name=rapids-dask-cluster,dask.org/component=scheduler
在这里,您可以看到我们的调度器 pod 和两个 worker pod,以及调度器服务。如期望的那样,两个 worker pod 放置在同一节点上,而调度器 pod 放置在不同的节点上。
如果您在 Kubernetes 集群内部运行 Python 会话(例如Kubernetes 页面上的示例),您应该能够直接连接 Dask 分布式客户端。
from dask.distributed import Client
client = Client("rapids-dask-cluster-scheduler:8786")
此外,如果您在 Kubernetes 集群外部,可以将 Service
更改为使用 LoadBalancer
或 NodePort
,或者使用 kubectl
在本地转发连接端口。
$ kubectl port-forward svc/rapids-dask-cluster-service 8786:8786
Forwarding from 127.0.0.1:8786 -> 8786
from dask.distributed import Client
client = Client("localhost:8786")
使用 KubeCluster
的示例#
除了通过 kubectl
创建集群外,您还可以使用 Python 通过 dask_kubernetes.operator.KubeCluster
来创建。此类别实现了 Dask 集群管理器接口,并在底层为您创建和管理 DaskCluster
资源。您还可以使用 make_cluster_spec() 生成 KubeCluster 内部使用的 spec,然后用您的自定义选项对其进行修改。我们将使用它来为调度器添加节点亲和性。在以下示例中,使用了与 kubectl
示例相同的集群配置。
from dask_kubernetes.operator import KubeCluster, make_cluster_spec spec = make_cluster_spec( name="rapids-dask-cluster", image="nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12", n_workers=2, resources={"limits": {"nvidia.com/gpu": "1"}}, worker_command="dask-cuda-worker", )
要为 worker 添加节点亲和性,您可以创建一个自定义字典,指定 pod 亲和性的类型和拓扑键。
affinity_config = {
"podAffinity": {
"preferredDuringSchedulingIgnoredDuringExecution": [
{
"weight": 100,
"podAffinityTerm": {
"labelSelector": {
"matchExpressions": [
{
"key": "dask.org/component",
"operator": "In",
"values": ["worker"],
}
]
},
"topologyKey": "kubernetes.io/hostname",
},
}
]
}
}
现在您可以将此配置添加到上一步创建的 spec 中,并使用此自定义 spec 创建 Dask 集群。
spec["spec"]["worker"]["spec"]["affinity"] = affinity_config
cluster = KubeCluster(custom_cluster_spec=spec)
如果我们使用 kubectl
检查,我们可以看到上述 Python 生成了与上面的 kubectl
示例相同的 DaskCluster
资源。
$ kubectl get daskclusters
NAME AGE
rapids-dask-cluster 3m28s
$ kubectl get all -l dask.org/cluster-name=rapids-dask-cluster -o wide
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES
pod/rapids-dask-cluster-default-worker-12a055b2db-7b5bf8f66c-9mb59 1/1 Running 0 2s 10.244.2.3 gke-rapids-gpu-1-default-pool-d85b49-2545 <none> <none>
pod/rapids-dask-cluster-default-worker-34437735ae-6fdd787f75-sdqzg 1/1 Running 0 2s 10.244.2.4 gke-rapids-gpu-1-default-pool-d85b49-2545 <none> <none>
pod/rapids-dask-cluster-scheduler-6656cb88f6-cgm4t 0/1 Running 0 3s 10.244.3.3 gke-rapids-gpu-1-default-pool-d85b49-2f31 <none> <none>
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE SELECTOR
service/rapids-dask-cluster-scheduler ClusterIP 10.96.231.110 <none> 8786/TCP,8787/TCP 3s dask.org/cluster-name=rapids-dask-cluster,dask.org/component=scheduler
使用 Python 中的此集群对象,我们也可以直接连接客户端,无需知道地址,因为 Dask 会为我们发现。如果您在 Kubernetes 集群外部,它还会自动设置端口转发。
from dask.distributed import Client
client = Client(cluster)
此对象还可用于向上或向下伸缩 worker。
cluster.scale(5)
以及手动关闭集群。
cluster.close()
注意
默认情况下,KubeCluster
命令会注册一个退出钩子,以便当 Python 进程退出时,集群会自动删除。您可以在启动集群时通过设置 KubeCluster(..., shutdown_on_close=False)
来禁用此功能。
如果您有一个由多个 Python 进程组成的多阶段管道,并且希望您的 Dask 集群在它们之间持久存在,这将非常有用。
如果您希望使用现有集群或将来手动调用 cluster.close()
,您还可以使用 cluster = KubeCluster.from_name(name="rapids-dask")
将 KubeCluster
对象连接到现有集群。