Kubernetes 上 Dask 调度器的 GPU 优化#

用户在部署 Dask 集群时可以进行的一项优化是确保调度器放置在配备较低性能 GPU 的节点上,以降低总体成本。这篇之前的指南解释了为什么调度器需要访问与工作器相同的环境,因为在一些边缘情况下,调度器确实会序列化数据并反序列化高级图。

警告

本指南概述了我们目前关于调度器硬件要求的建议,但这可能会有所变动。

然而,在使用配备多个 GPU 的节点时,将调度器放置在其中一个节点上会浪费资源。本指南将逐步介绍如何在 GKE 上创建 Kubernetes 集群,并创建一个包含较低性能 Nvidia Tesla T4 GPU 的节点池,然后使用 Kubernetes 节点亲和性将调度器放置在该节点上。

前提条件#

首先,你需要安装 gcloud CLI 工具,以及用于管理 Kubernetes 的 kubectlhelm 等工具。

确保你已登录 gcloud CLI。

$ gcloud init

创建 Kubernetes 集群#

现在我们可以启动一个启用 GPU 的 GKE 集群。

$ gcloud container clusters create rapids-gpu \
  --accelerator type=nvidia-tesla-a100,count=2 --machine-type a2-highgpu-2g \
  --zone us-central1-c --release-channel stable

使用此命令,你已启动一个名为 rapids-gpu 的 GKE 集群。你指定它应该使用 a2-highgpu-2g 类型的节点,每个节点配备两个 A100 GPU。

为调度器创建专用节点池#

现在在此 GPU 集群上创建一个新的节点池。

$ gcloud container node-pools create scheduler-pool --cluster rapids-gpu \
  --accelerator type=nvidia-tesla-t4,count=1 --machine-type n1-standard-2 \
  --num-nodes 1 --node-labels dedicated=scheduler --zone us-central1-c

使用此命令,你已创建了一个名为 scheduler-pool 的附加节点池,包含 1 个节点。你还指定它应该使用 n1-standard-2 类型的节点,配备一个 T4 GPU。我们还为此节点池中的节点添加了一个 Kubernetes 标签 dedicated=scheduled,该标签将用于将调度器放置在此节点上。

安装驱动程序#

接下来,在每个节点上安装 NVIDIA 驱动程序

$ kubectl apply -f https://raw.githubusercontent.com/GoogleCloudPlatform/container-engine-accelerators/master/nvidia-driver-installer/cos/daemonset-preloaded-latest.yaml
daemonset.apps/nvidia-driver-installer created

验证 NVIDIA 驱动程序已成功安装。

$ kubectl get po -A --watch | grep nvidia
kube-system   nvidia-driver-installer-6zwcn                                 1/1     Running   0         8m47s
kube-system   nvidia-driver-installer-8zmmn                                 1/1     Running   0         8m47s
kube-system   nvidia-driver-installer-mjkb8                                 1/1     Running   0         8m47s
kube-system   nvidia-gpu-device-plugin-5ffkm                                1/1     Running   0         13m
kube-system   nvidia-gpu-device-plugin-d599s                                1/1     Running   0         13m
kube-system   nvidia-gpu-device-plugin-jrgjh                                1/1     Running   0         13m

驱动程序安装完成后,你就可以测试你的集群了。

让我们创建一个使用 GPU 计算的示例 Pod,以确保一切按预期工作。

cat << EOF | kubectl create -f -
apiVersion: v1
kind: Pod
metadata:
  name: cuda-vectoradd
spec:
  restartPolicy: OnFailure
  containers:
  - name: cuda-vectoradd
    image: "nvidia/samples:vectoradd-cuda11.6.0-ubuntu18.04"
    resources:
       limits:
         nvidia.com/gpu: 1
EOF
$ kubectl logs pod/cuda-vectoradd
[Vector addition of 50000 elements]
Copy input data from the host memory to the CUDA device
CUDA kernel launch with 196 blocks of 256 threads
Copy output data from the CUDA device to the host memory
Test PASSED
Done

如果在输出中看到 Test PASSED,则可以确信你的 Kubernetes 集群已正确设置 GPU 计算。

接下来,清理该 Pod。

$ kubectl delete pod cuda-vectoradd
pod "cuda-vectoradd" deleted

使用 Helm 安装 Dask Operator#

该 Operator 拥有一个 Helm Chart,可用于管理 Operator 的安装。该 Chart 发布在 Dask Helm Repo 仓库中,可通过以下方式安装:

$ helm repo add dask https://helm.dask.org
"dask" has been added to your repositories

$ helm repo update
Hang tight while we grab the latest from your chart repositories...
...Successfully got an update from the "dask" chart repository
Update Complete. ⎈Happy Helming!⎈

$ helm install --create-namespace -n dask-operator --generate-name dask/dask-kubernetes-operator
NAME: dask-kubernetes-operator-1666875935
NAMESPACE: dask-operator
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
Operator has been installed successfully.

然后你应该能够通过 kubectl 列出你的 Dask 集群。

$ kubectl get daskclusters
No resources found in default namespace.

我们还可以检查 Operator Pod 是否正在运行

$ kubectl get pods -A -l app.kubernetes.io/name=dask-kubernetes-operator
NAMESPACE       NAME                                        READY   STATUS    RESTARTS   AGE
dask-operator   dask-kubernetes-operator-775b8bbbd5-zdrf7   1/1     Running   0          74s

配置 RAPIDS DaskCluster#

要配置 DaskCluster 资源以运行 RAPIDS,你需要设置一些内容

  • 容器镜像必须包含 RAPIDS,官方 RAPIDS 容器镜像是一个不错的选择。

  • Dask 工作器必须配置一个或多个 NVIDIA GPU 资源。

  • 工作器命令必须设置为 dask-cuda-worker

使用 kubectl 创建 RAPIDS DaskCluster#

这里有一个用于启动经过调度器优化的 RAPIDS Dask 集群的资源清单示例

# rapids-dask-cluster.yaml
apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
  name: rapids-dask-cluster
  labels:
    dask.org/cluster-name: rapids-dask-cluster
spec:
  worker:
    replicas: 2
    spec:
      containers:
        - name: worker
          image: "nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12"
          imagePullPolicy: "IfNotPresent"
          args:
            - dask-cuda-worker
            - --name
            - $(DASK_WORKER_NAME)
          resources:
            limits:
              nvidia.com/gpu: "1"
  scheduler:
    spec:
      containers:
        - name: scheduler
          image: "nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12"
          imagePullPolicy: "IfNotPresent"
          env:
          args:
            - dask-scheduler
          ports:
            - name: tcp-comm
              containerPort: 8786
              protocol: TCP
            - name: http-dashboard
              containerPort: 8787
              protocol: TCP
          readinessProbe:
            httpGet:
              port: http-dashboard
              path: /health
            initialDelaySeconds: 5
            periodSeconds: 10
          livenessProbe:
            httpGet:
              port: http-dashboard
              path: /health
            initialDelaySeconds: 15
            periodSeconds: 20
          resources:
            limits:
              nvidia.com/gpu: "1"
      affinity:
        nodeAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
            - weight: 100
              preference:
                matchExpressions:
                  - key: dedicated
                    operator: In
                    values:
                      - scheduler
    service:
      type: ClusterIP
      selector:
        dask.org/cluster-name: rapids-dask-cluster
        dask.org/component: scheduler
      ports:
        - name: tcp-comm
          protocol: TCP
          port: 8786
          targetPort: "tcp-comm"
        - name: http-dashboard
          protocol: TCP
          port: 8787
          targetPort: "http-dashboard"

你可以使用 kubectl 创建此集群。

$ kubectl apply -f rapids-dask-cluster.yaml

清单分解#

此清单的大部分内容在 RAPIDS 文档的工具部分中的 Dask Operator 文档中有所解释。

在上述文档页面示例中唯一添加的内容是调度器配置中的以下部分

# ...
affinity:
  nodeAffinity:
    preferredDuringSchedulingIgnoredDuringExecution:
      - weight: 100
        preference:
          matchExpressions:
            - key: dedicated
              operator: In
              values:
                - scheduler
# ...

对于 Dask 调度器 Pod,我们使用之前在专用节点上指定的标签设置了节点亲和性。Kubernetes 中的节点亲和性允许你根据节点标签约束 Pod 可以调度到哪些节点。由于我们使用的是 preferredDuringSchedulingIgnoredDuringExecution 类型的节点亲和性,这也旨在成为一项软性要求。Kubernetes 调度器会尝试找到满足规则的节点。如果匹配的节点不可用,Kubernetes 调度器仍会将 Pod 调度到任何可用节点上。这确保了即使 T4 节点不可用,你的 Dask 集群也不会出现任何问题。

访问你的 Dask 集群#

创建 DaskCluster 资源后,我们可以使用 kubectl 检查它为我们创建的所有其他资源的状态。

$ kubectl get all -l dask.org/cluster-name=rapids-dask-cluster
NAME                                                             READY   STATUS    RESTARTS   AGE
pod/rapids-dask-cluster-default-worker-group-worker-0c202b85fd   1/1     Running   0          4m13s
pod/rapids-dask-cluster-default-worker-group-worker-ff5d376714   1/1     Running   0          4m13s
pod/rapids-dask-cluster-scheduler                                1/1     Running   0          4m14s

NAME                                  TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)             AGE
service/rapids-dask-cluster-service   ClusterIP   10.96.223.217   <none>        8786/TCP,8787/TCP   4m13s

在这里你可以看到我们的调度器 Pod 和两个工作器 Pod 以及调度器服务。

如果你在 Kubernetes 集群内运行 Python 会话(例如Kubernetes 页面上的示例),你应该能够直接连接 Dask 分布式客户端。

from dask.distributed import Client

client = Client("rapids-dask-cluster-scheduler:8786")

或者,如果你在 Kubernetes 集群外部,可以将 Service 更改为使用 LoadBalancerNodePort,或者使用 kubectl 在本地转发连接。

$ kubectl port-forward svc/rapids-dask-cluster-service 8786:8786
Forwarding from 127.0.0.1:8786 -> 8786
from dask.distributed import Client

client = Client("localhost:8786")

使用 KubeCluster 的示例#

除了通过 kubectl 创建集群外,你还可以通过 Python 使用 dask_kubernetes.operator.KubeCluster 来创建。此类实现了 Dask Cluster Manager 接口,并在底层为你创建和管理 DaskCluster 资源。你还可以使用 make_cluster_spec() 生成 KubeCluster 在内部使用的规范,然后使用你的自定义选项对其进行修改。我们将使用它来为调度器添加节点亲和性。

from dask_kubernetes.operator import KubeCluster, make_cluster_spec

spec = make_cluster_spec(
    name="rapids-dask-cluster",
    image="nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12",
    n_workers=2,
    resources={"limits": {"nvidia.com/gpu": "1"}},
    worker_command="dask-cuda-worker",
)

要为调度器添加节点亲和性,你可以创建一个自定义字典,指定节点亲和性的类型和节点的标签。

affinity_config = {
    "nodeAffinity": {
        "preferredDuringSchedulingIgnoredDuringExecution": [
            {
                "weight": 100,
                "preference": {
                    "matchExpressions": [
                        {"key": "dedicated", "operator": "In", "values": ["scheduler"]}
                    ]
                },
            }
        ]
    }
}

现在你可以将此配置添加到上一步创建的规范中,并使用此自定义规范创建 Dask 集群。

spec["spec"]["scheduler"]["spec"]["affinity"] = affinity_config
cluster = KubeCluster(custom_cluster_spec=spec)

如果我们使用 kubectl 检查,可以看到上面的 Python 代码生成了与上面的 kubectl 示例相同的 DaskCluster 资源。

$ kubectl get daskclusters
NAME                  AGE
rapids-dask-cluster   3m28s

$ kubectl get all -l dask.org/cluster-name=rapids-dask-cluster
NAME                                                             READY   STATUS    RESTARTS   AGE
pod/rapids-dask-cluster-default-worker-group-worker-07d674589a   1/1     Running   0          3m30s
pod/rapids-dask-cluster-default-worker-group-worker-a55ed88265   1/1     Running   0          3m30s
pod/rapids-dask-cluster-scheduler                                1/1     Running   0          3m30s

NAME                                  TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)             AGE
service/rapids-dask-cluster-service   ClusterIP   10.96.200.202   <none>        8786/TCP,8787/TCP   3m30s

通过 Python 中的此集群对象,我们还可以直接连接客户端而无需知道地址,因为 Dask 会为我们发现地址。如果你在 Kubernetes 集群外部,它还会自动设置端口转发。

from dask.distributed import Client

client = Client(cluster)

此对象也可用于扩缩工作器数量。

cluster.scale(5)

并手动关闭集群。

cluster.close()

注意

默认情况下,KubeCluster 命令会注册一个退出钩子,以便当 Python 进程退出时,集群会自动删除。在启动集群时,你可以通过设置 KubeCluster(..., shutdown_on_close=False) 来禁用此功能。

如果你有一个由多个 Python 进程组成的多阶段管道,并且希望 Dask 集群在它们之间持久存在,这会很有用。

如果你希望使用现有集群,或者将来手动调用 cluster.close(),你还可以使用 cluster = KubeCluster.from_name(name="rapids-dask")KubeCluster 对象连接到你的现有集群。