在使用多 GPU 节点时在 Kubernetes 上共同部署 Dask worker#

为了在使用多 GPU 节点时优化性能,最佳实践是以紧密分组的方式调度 Dask worker,从而最大程度地减少 worker pod 之间的通信开销。本指南提供了向 worker pod 添加 pod 亲和性的分步过程,确保它们尽可能在 Google Kubernetes Engine (GKE) 上共同调度,但这些原则也可适用于其他 Kubernetes 分发版本。

先决条件#

首先,您需要安装 gcloud CLI 工具,以及 kubectlhelm 等用于管理 Kubernetes 的工具。

确保您已登录 gcloud CLI。

$ gcloud init

创建 Kubernetes 集群#

现在我们可以启动一个启用 GPU 的 GKE 集群。

$ gcloud container clusters create rapids-gpu \
  --accelerator type=nvidia-tesla-a100,count=2 --machine-type a2-highgpu-2g \
  --zone us-central1-c --release-channel stable

使用此命令,您已启动了一个名为 rapids-gpu 的 GKE 集群。您指定它应使用类型为 a2-highgpu-2g 的节点,每个节点具有两个 A100 GPU。

安装驱动程序#

接下来,在每个节点上安装 NVIDIA 驱动程序

$ kubectl apply -f https://raw.githubusercontent.com/GoogleCloudPlatform/container-engine-accelerators/master/nvidia-driver-installer/cos/daemonset-preloaded-latest.yaml
daemonset.apps/nvidia-driver-installer created

验证 NVIDIA 驱动程序是否已成功安装。

$ kubectl get po -A --watch | grep nvidia
kube-system   nvidia-driver-installer-6zwcn                                 1/1     Running   0         8m47s
kube-system   nvidia-driver-installer-8zmmn                                 1/1     Running   0         8m47s
kube-system   nvidia-driver-installer-mjkb8                                 1/1     Running   0         8m47s
kube-system   nvidia-gpu-device-plugin-5ffkm                                1/1     Running   0         13m
kube-system   nvidia-gpu-device-plugin-d599s                                1/1     Running   0         13m
kube-system   nvidia-gpu-device-plugin-jrgjh                                1/1     Running   0         13m

安装驱动程序后,您就可以测试您的集群了。

让我们创建一个使用 GPU 计算的示例 pod,以确保一切按预期工作。

cat << EOF | kubectl create -f -
apiVersion: v1
kind: Pod
metadata:
  name: cuda-vectoradd
spec:
  restartPolicy: OnFailure
  containers:
  - name: cuda-vectoradd
    image: "nvidia/samples:vectoradd-cuda11.6.0-ubuntu18.04"
    resources:
       limits:
         nvidia.com/gpu: 1
EOF
$ kubectl logs pod/cuda-vectoradd
[Vector addition of 50000 elements]
Copy input data from the host memory to the CUDA device
CUDA kernel launch with 196 blocks of 256 threads
Copy output data from the CUDA device to the host memory
Test PASSED
Done

如果您在输出中看到 Test PASSED,您可以确信您的 Kubernetes 集群已正确设置了 GPU 计算。

接下来,清理该 pod。

$ kubectl delete pod cuda-vectoradd
pod "cuda-vectoradd" deleted

使用 Helm 安装 Dask operator#

该 operator 有一个 Helm chart,可用于管理 operator 的安装。按照 Dask 文档中提供的说明进行操作,或者也可以通过以下方式安装

$ helm install --create-namespace -n dask-operator --generate-name --repo https://helm.dask.org dask-kubernetes-operator
NAME: dask-kubernetes-operator-1666875935
NAMESPACE: dask-operator
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
Operator has been installed successfully.

配置 RAPIDS DaskCluster#

要配置 DaskCluster 资源以运行 RAPIDS,您需要设置几项内容

  • 容器镜像必须包含 RAPIDS,官方 RAPIDS 容器镜像是一个不错的选择。

  • 必须为 Dask worker 配置一个或多个 NVIDIA GPU 资源。

  • worker 命令必须设置为 dask-cuda-worker

使用 kubectl 创建 RAPIDS DaskCluster#

以下是用于启动具有 worker pod 亲和性的 RAPIDS Dask 集群的资源清单示例

# rapids-dask-cluster.yaml
apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
  name: rapids-dask-cluster
  labels:
    dask.org/cluster-name: rapids-dask-cluster
spec:
  worker:
    replicas: 2
    spec:
      containers:
        - name: worker
          image: "nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12"
          imagePullPolicy: "IfNotPresent"
          args:
            - dask-cuda-worker
            - --name
            - $(DASK_WORKER_NAME)
          resources:
            limits:
              nvidia.com/gpu: "1"
      affinity:
        podAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
            - weight: 100
              podAffinityTerm:
                labelSelector:
                  matchExpressions:
                    - key: dask.org/component
                      operator: In
                      values:
                        - worker
                topologyKey: kubernetes.io/hostname
  scheduler:
    spec:
      containers:
        - name: scheduler
          image: "nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12"
          imagePullPolicy: "IfNotPresent"
          env:
          args:
            - dask-scheduler
          ports:
            - name: tcp-comm
              containerPort: 8786
              protocol: TCP
            - name: http-dashboard
              containerPort: 8787
              protocol: TCP
          readinessProbe:
            httpGet:
              port: http-dashboard
              path: /health
            initialDelaySeconds: 5
            periodSeconds: 10
          livenessProbe:
            httpGet:
              port: http-dashboard
              path: /health
            initialDelaySeconds: 15
            periodSeconds: 20
          resources:
            limits:
              nvidia.com/gpu: "1"
    service:
      type: ClusterIP
      selector:
        dask.org/cluster-name: rapids-dask-cluster
        dask.org/component: scheduler
      ports:
        - name: tcp-comm
          protocol: TCP
          port: 8786
          targetPort: "tcp-comm"
        - name: http-dashboard
          protocol: TCP
          port: 8787
          targetPort: "http-dashboard"

您可以使用 kubectl 创建此集群。

$ kubectl apply -f rapids-dask-cluster.yaml

清单解析#

此清单的大部分内容已在 RAPIDS 文档的工具部分中的 Dask Operator 文档中进行了解释。

在上述文档页面的示例中唯一的额外添加是 worker 配置中的以下部分

# ...
affinity:
podAffinity:
    preferredDuringSchedulingIgnoredDuringExecution:
    - weight: 100
      podAffinityTerm:
        labelSelector:
            matchExpressions:
            - key: dask.org/component
                operator: In
                values:
                - worker
        topologyKey: kubernetes.io/hostname
# ...

对于 Dask Worker pod 配置,我们使用节点名称作为拓扑键设置了 pod 亲和性。Kubernetes 中的 Pod 亲和性允许您限制 Pod 可以调度到哪些节点上,并允许您配置一组应共同定位在同一已定义拓扑中的工作负载,在这种情况下,倾向于将两个 worker pod 放置在同一节点上。这也被视为一个软要求,因为我们使用的是 preferredDuringSchedulingIgnoredDuringExecution 类型的 pod 亲和性。Kubernetes 调度器会尝试查找符合规则的节点。如果找不到匹配的节点,Kubernetes 调度器仍会将 pod 调度到任何可用节点上。即使无法将 worker pod 放置在已被使用的节点上,这也能确保您的 Dask 集群不会遇到任何问题。

访问您的 Dask 集群#

创建 DaskCluster 资源后,我们可以使用 kubectl 检查它为我们创建的所有其他资源的状态。

$ kubectl get all -l dask.org/cluster-name=rapids-dask-cluster -o wide
NAME                                                                 READY   STATUS    RESTARTS   AGE   IP           NODE                                         NOMINATED NODE   READINESS GATES
pod/rapids-dask-cluster-default-worker-12a055b2db-7b5bf8f66c-9mb59   1/1     Running   0          2s    10.244.2.3   gke-rapids-gpu-1-default-pool-d85b49-2545    <none>           <none>
pod/rapids-dask-cluster-default-worker-34437735ae-6fdd787f75-sdqzg   1/1     Running   0          2s    10.244.2.4   gke-rapids-gpu-1-default-pool-d85b49-2545    <none>           <none>
pod/rapids-dask-cluster-scheduler-6656cb88f6-cgm4t                   0/1     Running   0          3s    10.244.3.3   gke-rapids-gpu-1-default-pool-d85b49-2f31    <none>           <none>

NAME                                    TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)             AGE   SELECTOR
service/rapids-dask-cluster-scheduler   ClusterIP   10.96.231.110   <none>        8786/TCP,8787/TCP   3s    dask.org/cluster-name=rapids-dask-cluster,dask.org/component=scheduler

在这里,您可以看到我们的调度器 pod 和两个 worker pod,以及调度器服务。如期望的那样,两个 worker pod 放置在同一节点上,而调度器 pod 放置在不同的节点上。

如果您在 Kubernetes 集群内部运行 Python 会话(例如Kubernetes 页面上的示例),您应该能够直接连接 Dask 分布式客户端。

from dask.distributed import Client

client = Client("rapids-dask-cluster-scheduler:8786")

此外,如果您在 Kubernetes 集群外部,可以将 Service 更改为使用 LoadBalancerNodePort,或者使用 kubectl 在本地转发连接端口。

$ kubectl port-forward svc/rapids-dask-cluster-service 8786:8786
Forwarding from 127.0.0.1:8786 -> 8786
from dask.distributed import Client

client = Client("localhost:8786")

使用 KubeCluster 的示例#

除了通过 kubectl 创建集群外,您还可以使用 Python 通过 dask_kubernetes.operator.KubeCluster 来创建。此类别实现了 Dask 集群管理器接口,并在底层为您创建和管理 DaskCluster 资源。您还可以使用 make_cluster_spec() 生成 KubeCluster 内部使用的 spec,然后用您的自定义选项对其进行修改。我们将使用它来为调度器添加节点亲和性。在以下示例中,使用了与 kubectl 示例相同的集群配置。

from dask_kubernetes.operator import KubeCluster, make_cluster_spec

spec = make_cluster_spec(
    name="rapids-dask-cluster",
    image="nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12",
    n_workers=2,
    resources={"limits": {"nvidia.com/gpu": "1"}},
    worker_command="dask-cuda-worker",
)

要为 worker 添加节点亲和性,您可以创建一个自定义字典,指定 pod 亲和性的类型和拓扑键。

affinity_config = {
    "podAffinity": {
        "preferredDuringSchedulingIgnoredDuringExecution": [
            {
                "weight": 100,
                "podAffinityTerm": {
                    "labelSelector": {
                        "matchExpressions": [
                            {
                                "key": "dask.org/component",
                                "operator": "In",
                                "values": ["worker"],
                            }
                        ]
                    },
                    "topologyKey": "kubernetes.io/hostname",
                },
            }
        ]
    }
}

现在您可以将此配置添加到上一步创建的 spec 中,并使用此自定义 spec 创建 Dask 集群。

spec["spec"]["worker"]["spec"]["affinity"] = affinity_config
cluster = KubeCluster(custom_cluster_spec=spec)

如果我们使用 kubectl 检查,我们可以看到上述 Python 生成了与上面的 kubectl 示例相同的 DaskCluster 资源。

$ kubectl get daskclusters
NAME                  AGE
rapids-dask-cluster   3m28s

$ kubectl get all -l dask.org/cluster-name=rapids-dask-cluster -o wide
NAME                                                                 READY   STATUS    RESTARTS   AGE   IP           NODE                                         NOMINATED NODE   READINESS GATES
pod/rapids-dask-cluster-default-worker-12a055b2db-7b5bf8f66c-9mb59   1/1     Running   0          2s    10.244.2.3   gke-rapids-gpu-1-default-pool-d85b49-2545    <none>           <none>
pod/rapids-dask-cluster-default-worker-34437735ae-6fdd787f75-sdqzg   1/1     Running   0          2s    10.244.2.4   gke-rapids-gpu-1-default-pool-d85b49-2545    <none>           <none>
pod/rapids-dask-cluster-scheduler-6656cb88f6-cgm4t                   0/1     Running   0          3s    10.244.3.3   gke-rapids-gpu-1-default-pool-d85b49-2f31    <none>           <none>

NAME                                    TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)             AGE   SELECTOR
service/rapids-dask-cluster-scheduler   ClusterIP   10.96.231.110   <none>        8786/TCP,8787/TCP   3s    dask.org/cluster-name=rapids-dask-cluster,dask.org/component=scheduler

使用 Python 中的此集群对象,我们也可以直接连接客户端,无需知道地址,因为 Dask 会为我们发现。如果您在 Kubernetes 集群外部,它还会自动设置端口转发。

from dask.distributed import Client

client = Client(cluster)

此对象还可用于向上或向下伸缩 worker。

cluster.scale(5)

以及手动关闭集群。

cluster.close()

注意

默认情况下,KubeCluster 命令会注册一个退出钩子,以便当 Python 进程退出时,集群会自动删除。您可以在启动集群时通过设置 KubeCluster(..., shutdown_on_close=False) 来禁用此功能。

如果您有一个由多个 Python 进程组成的多阶段管道,并且希望您的 Dask 集群在它们之间持久存在,这将非常有用。

如果您希望使用现有集群或将来手动调用 cluster.close(),您还可以使用 cluster = KubeCluster.from_name(name="rapids-dask")KubeCluster 对象连接到现有集群。