Kubernetes 上 Dask 调度器的 GPU 优化#
用户在部署 Dask 集群时可以进行的一项优化是确保调度器放置在配备较低性能 GPU 的节点上,以降低总体成本。这篇之前的指南解释了为什么调度器需要访问与工作器相同的环境,因为在一些边缘情况下,调度器确实会序列化数据并反序列化高级图。
警告
本指南概述了我们目前关于调度器硬件要求的建议,但这可能会有所变动。
然而,在使用配备多个 GPU 的节点时,将调度器放置在其中一个节点上会浪费资源。本指南将逐步介绍如何在 GKE 上创建 Kubernetes 集群,并创建一个包含较低性能 Nvidia Tesla T4 GPU 的节点池,然后使用 Kubernetes 节点亲和性将调度器放置在该节点上。
前提条件#
首先,你需要安装 gcloud
CLI 工具,以及用于管理 Kubernetes 的 kubectl
、helm
等工具。
确保你已登录 gcloud
CLI。
$ gcloud init
创建 Kubernetes 集群#
现在我们可以启动一个启用 GPU 的 GKE 集群。
$ gcloud container clusters create rapids-gpu \
--accelerator type=nvidia-tesla-a100,count=2 --machine-type a2-highgpu-2g \
--zone us-central1-c --release-channel stable
使用此命令,你已启动一个名为 rapids-gpu
的 GKE 集群。你指定它应该使用 a2-highgpu-2g 类型的节点,每个节点配备两个 A100 GPU。
为调度器创建专用节点池#
现在在此 GPU 集群上创建一个新的节点池。
$ gcloud container node-pools create scheduler-pool --cluster rapids-gpu \
--accelerator type=nvidia-tesla-t4,count=1 --machine-type n1-standard-2 \
--num-nodes 1 --node-labels dedicated=scheduler --zone us-central1-c
使用此命令,你已创建了一个名为 scheduler-pool
的附加节点池,包含 1 个节点。你还指定它应该使用 n1-standard-2 类型的节点,配备一个 T4 GPU。我们还为此节点池中的节点添加了一个 Kubernetes 标签 dedicated=scheduled
,该标签将用于将调度器放置在此节点上。
安装驱动程序#
接下来,在每个节点上安装 NVIDIA 驱动程序。
$ kubectl apply -f https://raw.githubusercontent.com/GoogleCloudPlatform/container-engine-accelerators/master/nvidia-driver-installer/cos/daemonset-preloaded-latest.yaml
daemonset.apps/nvidia-driver-installer created
验证 NVIDIA 驱动程序已成功安装。
$ kubectl get po -A --watch | grep nvidia
kube-system nvidia-driver-installer-6zwcn 1/1 Running 0 8m47s
kube-system nvidia-driver-installer-8zmmn 1/1 Running 0 8m47s
kube-system nvidia-driver-installer-mjkb8 1/1 Running 0 8m47s
kube-system nvidia-gpu-device-plugin-5ffkm 1/1 Running 0 13m
kube-system nvidia-gpu-device-plugin-d599s 1/1 Running 0 13m
kube-system nvidia-gpu-device-plugin-jrgjh 1/1 Running 0 13m
驱动程序安装完成后,你就可以测试你的集群了。
让我们创建一个使用 GPU 计算的示例 Pod,以确保一切按预期工作。
cat << EOF | kubectl create -f -
apiVersion: v1
kind: Pod
metadata:
name: cuda-vectoradd
spec:
restartPolicy: OnFailure
containers:
- name: cuda-vectoradd
image: "nvidia/samples:vectoradd-cuda11.6.0-ubuntu18.04"
resources:
limits:
nvidia.com/gpu: 1
EOF
$ kubectl logs pod/cuda-vectoradd
[Vector addition of 50000 elements]
Copy input data from the host memory to the CUDA device
CUDA kernel launch with 196 blocks of 256 threads
Copy output data from the CUDA device to the host memory
Test PASSED
Done
如果在输出中看到 Test PASSED
,则可以确信你的 Kubernetes 集群已正确设置 GPU 计算。
接下来,清理该 Pod。
$ kubectl delete pod cuda-vectoradd
pod "cuda-vectoradd" deleted
使用 Helm 安装 Dask Operator#
该 Operator 拥有一个 Helm Chart,可用于管理 Operator 的安装。该 Chart 发布在 Dask Helm Repo 仓库中,可通过以下方式安装:
$ helm repo add dask https://helm.dask.org
"dask" has been added to your repositories
$ helm repo update
Hang tight while we grab the latest from your chart repositories...
...Successfully got an update from the "dask" chart repository
Update Complete. ⎈Happy Helming!⎈
$ helm install --create-namespace -n dask-operator --generate-name dask/dask-kubernetes-operator
NAME: dask-kubernetes-operator-1666875935
NAMESPACE: dask-operator
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
Operator has been installed successfully.
然后你应该能够通过 kubectl
列出你的 Dask 集群。
$ kubectl get daskclusters
No resources found in default namespace.
我们还可以检查 Operator Pod 是否正在运行
$ kubectl get pods -A -l app.kubernetes.io/name=dask-kubernetes-operator
NAMESPACE NAME READY STATUS RESTARTS AGE
dask-operator dask-kubernetes-operator-775b8bbbd5-zdrf7 1/1 Running 0 74s
配置 RAPIDS DaskCluster
#
要配置 DaskCluster
资源以运行 RAPIDS,你需要设置一些内容
容器镜像必须包含 RAPIDS,官方 RAPIDS 容器镜像是一个不错的选择。
Dask 工作器必须配置一个或多个 NVIDIA GPU 资源。
工作器命令必须设置为
dask-cuda-worker
。
使用 kubectl
创建 RAPIDS DaskCluster
#
这里有一个用于启动经过调度器优化的 RAPIDS Dask 集群的资源清单示例
# rapids-dask-cluster.yaml apiVersion: kubernetes.dask.org/v1 kind: DaskCluster metadata: name: rapids-dask-cluster labels: dask.org/cluster-name: rapids-dask-cluster spec: worker: replicas: 2 spec: containers: - name: worker image: "nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12" imagePullPolicy: "IfNotPresent" args: - dask-cuda-worker - --name - $(DASK_WORKER_NAME) resources: limits: nvidia.com/gpu: "1" scheduler: spec: containers: - name: scheduler image: "nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12" imagePullPolicy: "IfNotPresent" env: args: - dask-scheduler ports: - name: tcp-comm containerPort: 8786 protocol: TCP - name: http-dashboard containerPort: 8787 protocol: TCP readinessProbe: httpGet: port: http-dashboard path: /health initialDelaySeconds: 5 periodSeconds: 10 livenessProbe: httpGet: port: http-dashboard path: /health initialDelaySeconds: 15 periodSeconds: 20 resources: limits: nvidia.com/gpu: "1" affinity: nodeAffinity: preferredDuringSchedulingIgnoredDuringExecution: - weight: 100 preference: matchExpressions: - key: dedicated operator: In values: - scheduler service: type: ClusterIP selector: dask.org/cluster-name: rapids-dask-cluster dask.org/component: scheduler ports: - name: tcp-comm protocol: TCP port: 8786 targetPort: "tcp-comm" - name: http-dashboard protocol: TCP port: 8787 targetPort: "http-dashboard"
你可以使用 kubectl
创建此集群。
$ kubectl apply -f rapids-dask-cluster.yaml
清单分解#
此清单的大部分内容在 RAPIDS 文档的工具部分中的 Dask Operator 文档中有所解释。
在上述文档页面示例中唯一添加的内容是调度器配置中的以下部分
# ...
affinity:
nodeAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
preference:
matchExpressions:
- key: dedicated
operator: In
values:
- scheduler
# ...
对于 Dask 调度器 Pod,我们使用之前在专用节点上指定的标签设置了节点亲和性。Kubernetes 中的节点亲和性允许你根据节点标签约束 Pod 可以调度到哪些节点。由于我们使用的是 preferredDuringSchedulingIgnoredDuringExecution
类型的节点亲和性,这也旨在成为一项软性要求。Kubernetes 调度器会尝试找到满足规则的节点。如果匹配的节点不可用,Kubernetes 调度器仍会将 Pod 调度到任何可用节点上。这确保了即使 T4 节点不可用,你的 Dask 集群也不会出现任何问题。
访问你的 Dask 集群#
创建 DaskCluster
资源后,我们可以使用 kubectl
检查它为我们创建的所有其他资源的状态。
$ kubectl get all -l dask.org/cluster-name=rapids-dask-cluster
NAME READY STATUS RESTARTS AGE
pod/rapids-dask-cluster-default-worker-group-worker-0c202b85fd 1/1 Running 0 4m13s
pod/rapids-dask-cluster-default-worker-group-worker-ff5d376714 1/1 Running 0 4m13s
pod/rapids-dask-cluster-scheduler 1/1 Running 0 4m14s
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/rapids-dask-cluster-service ClusterIP 10.96.223.217 <none> 8786/TCP,8787/TCP 4m13s
在这里你可以看到我们的调度器 Pod 和两个工作器 Pod 以及调度器服务。
如果你在 Kubernetes 集群内运行 Python 会话(例如Kubernetes 页面上的示例),你应该能够直接连接 Dask 分布式客户端。
from dask.distributed import Client
client = Client("rapids-dask-cluster-scheduler:8786")
或者,如果你在 Kubernetes 集群外部,可以将 Service
更改为使用 LoadBalancer
或 NodePort
,或者使用 kubectl
在本地转发连接。
$ kubectl port-forward svc/rapids-dask-cluster-service 8786:8786
Forwarding from 127.0.0.1:8786 -> 8786
from dask.distributed import Client
client = Client("localhost:8786")
使用 KubeCluster
的示例#
除了通过 kubectl
创建集群外,你还可以通过 Python 使用 dask_kubernetes.operator.KubeCluster
来创建。此类实现了 Dask Cluster Manager 接口,并在底层为你创建和管理 DaskCluster
资源。你还可以使用 make_cluster_spec()
生成 KubeCluster 在内部使用的规范,然后使用你的自定义选项对其进行修改。我们将使用它来为调度器添加节点亲和性。
from dask_kubernetes.operator import KubeCluster, make_cluster_spec spec = make_cluster_spec( name="rapids-dask-cluster", image="nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12", n_workers=2, resources={"limits": {"nvidia.com/gpu": "1"}}, worker_command="dask-cuda-worker", )
要为调度器添加节点亲和性,你可以创建一个自定义字典,指定节点亲和性的类型和节点的标签。
affinity_config = {
"nodeAffinity": {
"preferredDuringSchedulingIgnoredDuringExecution": [
{
"weight": 100,
"preference": {
"matchExpressions": [
{"key": "dedicated", "operator": "In", "values": ["scheduler"]}
]
},
}
]
}
}
现在你可以将此配置添加到上一步创建的规范中,并使用此自定义规范创建 Dask 集群。
spec["spec"]["scheduler"]["spec"]["affinity"] = affinity_config
cluster = KubeCluster(custom_cluster_spec=spec)
如果我们使用 kubectl
检查,可以看到上面的 Python 代码生成了与上面的 kubectl
示例相同的 DaskCluster
资源。
$ kubectl get daskclusters
NAME AGE
rapids-dask-cluster 3m28s
$ kubectl get all -l dask.org/cluster-name=rapids-dask-cluster
NAME READY STATUS RESTARTS AGE
pod/rapids-dask-cluster-default-worker-group-worker-07d674589a 1/1 Running 0 3m30s
pod/rapids-dask-cluster-default-worker-group-worker-a55ed88265 1/1 Running 0 3m30s
pod/rapids-dask-cluster-scheduler 1/1 Running 0 3m30s
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/rapids-dask-cluster-service ClusterIP 10.96.200.202 <none> 8786/TCP,8787/TCP 3m30s
通过 Python 中的此集群对象,我们还可以直接连接客户端而无需知道地址,因为 Dask 会为我们发现地址。如果你在 Kubernetes 集群外部,它还会自动设置端口转发。
from dask.distributed import Client
client = Client(cluster)
此对象也可用于扩缩工作器数量。
cluster.scale(5)
并手动关闭集群。
cluster.close()
注意
默认情况下,KubeCluster
命令会注册一个退出钩子,以便当 Python 进程退出时,集群会自动删除。在启动集群时,你可以通过设置 KubeCluster(..., shutdown_on_close=False)
来禁用此功能。
如果你有一个由多个 Python 进程组成的多阶段管道,并且希望 Dask 集群在它们之间持久存在,这会很有用。
如果你希望使用现有集群,或者将来手动调用 cluster.close()
,你还可以使用 cluster = KubeCluster.from_name(name="rapids-dask")
将 KubeCluster
对象连接到你的现有集群。