Dask Operator#

RAPIDS 中的许多库可以利用 Dask 将计算扩展到多个 GPU 和多个节点上。Dask 为 Kubernetes 提供了一个 Operator,它允许您将 Dask 集群作为原生的 Kubernetes 资源启动。

通过 Operator 和相关的 Custom Resource Definitions (CRD),您可以创建 DaskClusterDaskWorkerGroupDaskJob 资源来描述您的 Dask 组件,Operator 将创建相应的 Kubernetes 资源,例如 PodsServices 来启动集群。

        graph TD
    DaskJob(DaskJob)
    DaskCluster(DaskCluster)
    SchedulerService(Scheduler Service)
    SchedulerPod(Scheduler Pod)
    DaskWorkerGroup(DaskWorkerGroup)
    WorkerPodA(Worker Pod A)
    WorkerPodB(Worker Pod B)
    WorkerPodC(Worker Pod C)
    JobPod(Job Runner Pod)

    DaskJob --> DaskCluster
    DaskJob --> JobPod
    DaskCluster --> SchedulerService
    SchedulerService --> SchedulerPod
    DaskCluster --> DaskWorkerGroup
    DaskWorkerGroup --> WorkerPodA
    DaskWorkerGroup --> WorkerPodB
    DaskWorkerGroup --> WorkerPodC

    classDef dask stroke:#FDA061,stroke-width:4px
    classDef dashed stroke-dasharray: 5 5
    class DaskJob dask
    class DaskCluster dask
    class DaskWorkerGroup dask
    class SchedulerService dashed
    class SchedulerPod dashed
    class WorkerPodA dashed
    class WorkerPodB dashed
    class WorkerPodC dashed
    class JobPod dashed
    

安装#

您的 Kubernetes 集群必须拥有 GPU 节点并安装了最新的 NVIDIA 驱动程序

要安装 Dask Operator,请遵循Dask 文档中的说明

配置 RAPIDS DaskCluster#

要配置 DaskCluster 资源以运行 RAPIDS,您需要设置几项内容

  • 容器镜像必须包含 RAPIDS,官方 RAPIDS 容器镜像是一个不错的选择。

  • Dask Worker 必须配置一个或多个 NVIDIA GPU 资源。

  • Worker 命令必须设置为 dask-cuda-worker

使用 kubectl 的示例#

这是一个用于启动 RAPIDS Dask 集群的资源清单示例。

# rapids-dask-cluster.yaml
apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
  name: rapids-dask-cluster
  labels:
    dask.org/cluster-name: rapids-dask-cluster
spec:
  worker:
    replicas: 2
    spec:
      containers:
        - name: worker
          image: "nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12"
          imagePullPolicy: "IfNotPresent"
          args:
            - dask-cuda-worker
            - --name
            - $(DASK_WORKER_NAME)
          resources:
            limits:
              nvidia.com/gpu: "1"
  scheduler:
    spec:
      containers:
        - name: scheduler
          image: "nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12"
          imagePullPolicy: "IfNotPresent"
          env:
          args:
            - dask-scheduler
          ports:
            - name: tcp-comm
              containerPort: 8786
              protocol: TCP
            - name: http-dashboard
              containerPort: 8787
              protocol: TCP
          readinessProbe:
            httpGet:
              port: http-dashboard
              path: /health
            initialDelaySeconds: 5
            periodSeconds: 10
          livenessProbe:
            httpGet:
              port: http-dashboard
              path: /health
            initialDelaySeconds: 15
            periodSeconds: 20
    service:
      type: ClusterIP
      selector:
        dask.org/cluster-name: rapids-dask-cluster
        dask.org/component: scheduler
      ports:
        - name: tcp-comm
          protocol: TCP
          port: 8786
          targetPort: "tcp-comm"
        - name: http-dashboard
          protocol: TCP
          port: 8787
          targetPort: "http-dashboard"

您可以使用 kubectl 创建此集群。

$ kubectl apply -f rapids-dask-cluster.yaml

清单分解#

让我们逐节分解此清单。

元数据#

顶部我们看到 DaskCluster 资源类型和通用元数据。

apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
  name: rapids-dask-cluster
  labels:
    dask.org/cluster-name: rapids-dask-cluster
spec:
  worker:
    # ...
  scheduler:
    # ...

然后在 spec 内部,我们有 workerscheduler 部分。

Worker#

Worker 包含一个 replicas 选项来设置您需要的 Worker 数量,以及一个 spec 来描述每个 Worker Pod 应如何配置。该 spec 是一个嵌套的 Pod spec,Operator 将在创建新的 Pod 资源时使用它。

# ...
spec:
  worker:
    replicas: 2
    spec:
      containers:
        - name: worker
          image: "nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12"
          imagePullPolicy: "IfNotPresent"
          args:
            - dask-cuda-worker
            - --name
            - $(DASK_WORKER_NAME)
          resources:
            limits:
              nvidia.com/gpu: "1"
  scheduler:
    # ...

在我们的 Pod spec 内部,我们配置了一个使用 rapidsai/base 容器镜像的容器。它还设置了 args 来启动 dask-cuda-worker 并配置一个 NVIDIA GPU。

Scheduler#

接下来我们有一个 scheduler 部分,其中也包含 scheduler Pod 的 spec 和一个 service,Operator 将使用它来创建 Service 资源以暴露 scheduler。

# ...
spec:
  worker:
    # ...
  scheduler:
    spec:
      containers:
        - name: scheduler
          image: "nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12"
          imagePullPolicy: "IfNotPresent"
          args:
            - dask-scheduler
          ports:
            - name: tcp-comm
              containerPort: 8786
              protocol: TCP
            - name: http-dashboard
              containerPort: 8787
              protocol: TCP
          readinessProbe:
            httpGet:
              port: http-dashboard
              path: /health
            initialDelaySeconds: 5
            periodSeconds: 10
          livenessProbe:
            httpGet:
              port: http-dashboard
              path: /health
            initialDelaySeconds: 15
            periodSeconds: 20
    service:
      # ...

对于 scheduler Pod,我们同样设置了 rapidsai/base 容器镜像,主要是为了确保 scheduler 和 worker 之间的 Dask 版本一致。我们确保配置了 dask-scheduler 命令。

然后我们配置 Dask 通信端口 8786 和 Dask dashboard 端口 8787,并添加一些探针,以便 Kubernetes 可以监控 scheduler 的健康状况。

注意

如果您的 Kubernetes 集群使用 Istio,则端口必须带有 tcp-http- 前缀,以确保 Envoy proxy 不会篡改流量。

然后我们配置 Service

# ...
spec:
  worker:
    # ...
  scheduler:
    spec:
      # ...
    service:
      type: ClusterIP
      selector:
        dask.org/cluster-name: rapids-dask-cluster
        dask.org/component: scheduler
      ports:
        - name: tcp-comm
          protocol: TCP
          port: 8786
          targetPort: "tcp-comm"
        - name: http-dashboard
          protocol: TCP
          port: 8787
          targetPort: "http-dashboard"

本例展示了使用 ClusterIP Service,它不会将 Dask 集群暴露在 Kubernetes 外部。如果您愿意,可以将其设置为 LoadBalancerNodePort 以使其可从外部访问。

它有一个与 scheduler Pod 匹配的 selector,并配置了相同的端口。

访问您的 Dask 集群#

创建 DaskCluster 资源后,我们可以使用 kubectl 检查它为我们创建的所有其他资源的状态。

$ kubectl get all -l dask.org/cluster-name=rapids-dask-cluster
NAME                                                             READY   STATUS    RESTARTS   AGE
pod/rapids-dask-cluster-default-worker-group-worker-0c202b85fd   1/1     Running   0          4m13s
pod/rapids-dask-cluster-default-worker-group-worker-ff5d376714   1/1     Running   0          4m13s
pod/rapids-dask-cluster-scheduler                                1/1     Running   0          4m14s

NAME                                  TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)             AGE
service/rapids-dask-cluster-service   ClusterIP   10.96.223.217   <none>        8786/TCP,8787/TCP   4m13s

在这里您可以看到我们的 scheduler Pod 和两个 worker Pod 以及 scheduler Service。

如果您在 Kubernetes 集群内运行 Python 会话(例如Kubernetes 页面上的示例),您应该可以直接连接 Dask 分布式客户端。

from dask.distributed import Client

client = Client("rapids-dask-cluster-scheduler:8786")

或者,如果您在 Kubernetes 集群外部,可以更改 Service 使用 LoadBalancerNodePort,或者使用 kubectl 在本地进行端口转发连接。

$ kubectl port-forward svc/rapids-dask-cluster-service 8786:8786
Forwarding from 127.0.0.1:8786 -> 8786
from dask.distributed import Client

client = Client("localhost:8786")

使用 KubeCluster 的示例#

除了通过 kubectl 创建集群外,您还可以使用 Python 中的 dask_kubernetes.operator.KubeCluster 来完成。此类实现了 Dask 集群管理器接口,并在底层为您创建和管理 DaskCluster 资源。

from dask_kubernetes.operator import KubeCluster

cluster = KubeCluster(
    name="rapids-dask",
    image="nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12",
    n_workers=3,
    resources={"limits": {"nvidia.com/gpu": "1"}},
    worker_command="dask-cuda-worker",
)

如果我们使用 kubectl 检查,可以看到上面的 Python 代码生成了与上面 kubectl 示例相同的 DaskCluster 资源。

$ kubectl get daskclusters
NAME                  AGE
rapids-dask-cluster   3m28s

$ kubectl get all -l dask.org/cluster-name=rapids-dask-cluster
NAME                                                             READY   STATUS    RESTARTS   AGE
pod/rapids-dask-cluster-default-worker-group-worker-07d674589a   1/1     Running   0          3m30s
pod/rapids-dask-cluster-default-worker-group-worker-a55ed88265   1/1     Running   0          3m30s
pod/rapids-dask-cluster-default-worker-group-worker-df785ab050   1/1     Running   0          3m30s
pod/rapids-dask-cluster-scheduler                                1/1     Running   0          3m30s

NAME                                  TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)             AGE
service/rapids-dask-cluster-service   ClusterIP   10.96.200.202   <none>        8786/TCP,8787/TCP   3m30s

有了 Python 中的这个集群对象,我们也可以直接连接客户端,无需知道地址,因为 Dask 会为我们发现。如果您在 Kubernetes 集群外部,它还会自动设置端口转发。

from dask.distributed import Client

client = Client(cluster)

此对象也可用于扩缩 worker。

cluster.scale(5)

并手动关闭集群。

cluster.close()

注意

默认情况下,KubeCluster 命令会注册一个退出钩子,因此当 Python 进程退出时,集群会自动删除。您可以在启动集群时通过设置 KubeCluster(..., shutdown_on_close=False) 来禁用此功能。

如果您有一个由多个 Python 进程组成的多阶段管道,并且希望您的 Dask 集群在它们之间持久存在,这将非常有用。

如果您希望使用现有集群或将来手动调用 cluster.close(),您也可以使用 cluster = KubeCluster.from_name(name="rapids-dask")KubeCluster 对象连接到现有集群。