Dask Operator#
RAPIDS 中的许多库可以利用 Dask 将计算扩展到多个 GPU 和多个节点上。Dask 为 Kubernetes 提供了一个 Operator,它允许您将 Dask 集群作为原生的 Kubernetes 资源启动。
通过 Operator 和相关的 Custom Resource Definitions (CRD),您可以创建 DaskCluster
、DaskWorkerGroup
和 DaskJob
资源来描述您的 Dask 组件,Operator 将创建相应的 Kubernetes 资源,例如 Pods
和 Services
来启动集群。
graph TD DaskJob(DaskJob) DaskCluster(DaskCluster) SchedulerService(Scheduler Service) SchedulerPod(Scheduler Pod) DaskWorkerGroup(DaskWorkerGroup) WorkerPodA(Worker Pod A) WorkerPodB(Worker Pod B) WorkerPodC(Worker Pod C) JobPod(Job Runner Pod) DaskJob --> DaskCluster DaskJob --> JobPod DaskCluster --> SchedulerService SchedulerService --> SchedulerPod DaskCluster --> DaskWorkerGroup DaskWorkerGroup --> WorkerPodA DaskWorkerGroup --> WorkerPodB DaskWorkerGroup --> WorkerPodC classDef dask stroke:#FDA061,stroke-width:4px classDef dashed stroke-dasharray: 5 5 class DaskJob dask class DaskCluster dask class DaskWorkerGroup dask class SchedulerService dashed class SchedulerPod dashed class WorkerPodA dashed class WorkerPodB dashed class WorkerPodC dashed class JobPod dashed
安装#
您的 Kubernetes 集群必须拥有 GPU 节点并安装了最新的 NVIDIA 驱动程序。
要安装 Dask Operator,请遵循Dask 文档中的说明。
配置 RAPIDS DaskCluster
#
要配置 DaskCluster
资源以运行 RAPIDS,您需要设置几项内容
容器镜像必须包含 RAPIDS,官方 RAPIDS 容器镜像是一个不错的选择。
Dask Worker 必须配置一个或多个 NVIDIA GPU 资源。
Worker 命令必须设置为
dask-cuda-worker
。
使用 kubectl
的示例#
这是一个用于启动 RAPIDS Dask 集群的资源清单示例。
# rapids-dask-cluster.yaml apiVersion: kubernetes.dask.org/v1 kind: DaskCluster metadata: name: rapids-dask-cluster labels: dask.org/cluster-name: rapids-dask-cluster spec: worker: replicas: 2 spec: containers: - name: worker image: "nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12" imagePullPolicy: "IfNotPresent" args: - dask-cuda-worker - --name - $(DASK_WORKER_NAME) resources: limits: nvidia.com/gpu: "1" scheduler: spec: containers: - name: scheduler image: "nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12" imagePullPolicy: "IfNotPresent" env: args: - dask-scheduler ports: - name: tcp-comm containerPort: 8786 protocol: TCP - name: http-dashboard containerPort: 8787 protocol: TCP readinessProbe: httpGet: port: http-dashboard path: /health initialDelaySeconds: 5 periodSeconds: 10 livenessProbe: httpGet: port: http-dashboard path: /health initialDelaySeconds: 15 periodSeconds: 20 service: type: ClusterIP selector: dask.org/cluster-name: rapids-dask-cluster dask.org/component: scheduler ports: - name: tcp-comm protocol: TCP port: 8786 targetPort: "tcp-comm" - name: http-dashboard protocol: TCP port: 8787 targetPort: "http-dashboard"
您可以使用 kubectl
创建此集群。
$ kubectl apply -f rapids-dask-cluster.yaml
清单分解#
让我们逐节分解此清单。
元数据#
顶部我们看到 DaskCluster
资源类型和通用元数据。
apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
name: rapids-dask-cluster
labels:
dask.org/cluster-name: rapids-dask-cluster
spec:
worker:
# ...
scheduler:
# ...
然后在 spec
内部,我们有 worker
和 scheduler
部分。
Worker#
Worker 包含一个 replicas
选项来设置您需要的 Worker 数量,以及一个 spec
来描述每个 Worker Pod 应如何配置。该 spec
是一个嵌套的 Pod
spec,Operator 将在创建新的 Pod
资源时使用它。
# ... spec: worker: replicas: 2 spec: containers: - name: worker image: "nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12" imagePullPolicy: "IfNotPresent" args: - dask-cuda-worker - --name - $(DASK_WORKER_NAME) resources: limits: nvidia.com/gpu: "1" scheduler: # ...
在我们的 Pod spec 内部,我们配置了一个使用 rapidsai/base
容器镜像的容器。它还设置了 args
来启动 dask-cuda-worker
并配置一个 NVIDIA GPU。
Scheduler#
接下来我们有一个 scheduler
部分,其中也包含 scheduler Pod 的 spec
和一个 service
,Operator 将使用它来创建 Service
资源以暴露 scheduler。
# ... spec: worker: # ... scheduler: spec: containers: - name: scheduler image: "nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12" imagePullPolicy: "IfNotPresent" args: - dask-scheduler ports: - name: tcp-comm containerPort: 8786 protocol: TCP - name: http-dashboard containerPort: 8787 protocol: TCP readinessProbe: httpGet: port: http-dashboard path: /health initialDelaySeconds: 5 periodSeconds: 10 livenessProbe: httpGet: port: http-dashboard path: /health initialDelaySeconds: 15 periodSeconds: 20 service: # ...
对于 scheduler Pod,我们同样设置了 rapidsai/base
容器镜像,主要是为了确保 scheduler 和 worker 之间的 Dask 版本一致。我们确保配置了 dask-scheduler
命令。
然后我们配置 Dask 通信端口 8786
和 Dask dashboard 端口 8787
,并添加一些探针,以便 Kubernetes 可以监控 scheduler 的健康状况。
注意
如果您的 Kubernetes 集群使用 Istio,则端口必须带有 tcp-
和 http-
前缀,以确保 Envoy proxy 不会篡改流量。
然后我们配置 Service
。
# ...
spec:
worker:
# ...
scheduler:
spec:
# ...
service:
type: ClusterIP
selector:
dask.org/cluster-name: rapids-dask-cluster
dask.org/component: scheduler
ports:
- name: tcp-comm
protocol: TCP
port: 8786
targetPort: "tcp-comm"
- name: http-dashboard
protocol: TCP
port: 8787
targetPort: "http-dashboard"
本例展示了使用 ClusterIP
Service,它不会将 Dask 集群暴露在 Kubernetes 外部。如果您愿意,可以将其设置为 LoadBalancer
或 NodePort
以使其可从外部访问。
它有一个与 scheduler Pod 匹配的 selector
,并配置了相同的端口。
访问您的 Dask 集群#
创建 DaskCluster
资源后,我们可以使用 kubectl
检查它为我们创建的所有其他资源的状态。
$ kubectl get all -l dask.org/cluster-name=rapids-dask-cluster
NAME READY STATUS RESTARTS AGE
pod/rapids-dask-cluster-default-worker-group-worker-0c202b85fd 1/1 Running 0 4m13s
pod/rapids-dask-cluster-default-worker-group-worker-ff5d376714 1/1 Running 0 4m13s
pod/rapids-dask-cluster-scheduler 1/1 Running 0 4m14s
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/rapids-dask-cluster-service ClusterIP 10.96.223.217 <none> 8786/TCP,8787/TCP 4m13s
在这里您可以看到我们的 scheduler Pod 和两个 worker Pod 以及 scheduler Service。
如果您在 Kubernetes 集群内运行 Python 会话(例如Kubernetes 页面上的示例),您应该可以直接连接 Dask 分布式客户端。
from dask.distributed import Client
client = Client("rapids-dask-cluster-scheduler:8786")
或者,如果您在 Kubernetes 集群外部,可以更改 Service
使用 LoadBalancer
或 NodePort
,或者使用 kubectl
在本地进行端口转发连接。
$ kubectl port-forward svc/rapids-dask-cluster-service 8786:8786
Forwarding from 127.0.0.1:8786 -> 8786
from dask.distributed import Client
client = Client("localhost:8786")
使用 KubeCluster
的示例#
除了通过 kubectl
创建集群外,您还可以使用 Python 中的 dask_kubernetes.operator.KubeCluster
来完成。此类实现了 Dask 集群管理器接口,并在底层为您创建和管理 DaskCluster
资源。
from dask_kubernetes.operator import KubeCluster cluster = KubeCluster( name="rapids-dask", image="nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12", n_workers=3, resources={"limits": {"nvidia.com/gpu": "1"}}, worker_command="dask-cuda-worker", )
如果我们使用 kubectl
检查,可以看到上面的 Python 代码生成了与上面 kubectl
示例相同的 DaskCluster
资源。
$ kubectl get daskclusters
NAME AGE
rapids-dask-cluster 3m28s
$ kubectl get all -l dask.org/cluster-name=rapids-dask-cluster
NAME READY STATUS RESTARTS AGE
pod/rapids-dask-cluster-default-worker-group-worker-07d674589a 1/1 Running 0 3m30s
pod/rapids-dask-cluster-default-worker-group-worker-a55ed88265 1/1 Running 0 3m30s
pod/rapids-dask-cluster-default-worker-group-worker-df785ab050 1/1 Running 0 3m30s
pod/rapids-dask-cluster-scheduler 1/1 Running 0 3m30s
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/rapids-dask-cluster-service ClusterIP 10.96.200.202 <none> 8786/TCP,8787/TCP 3m30s
有了 Python 中的这个集群对象,我们也可以直接连接客户端,无需知道地址,因为 Dask 会为我们发现。如果您在 Kubernetes 集群外部,它还会自动设置端口转发。
from dask.distributed import Client
client = Client(cluster)
此对象也可用于扩缩 worker。
cluster.scale(5)
并手动关闭集群。
cluster.close()
注意
默认情况下,KubeCluster
命令会注册一个退出钩子,因此当 Python 进程退出时,集群会自动删除。您可以在启动集群时通过设置 KubeCluster(..., shutdown_on_close=False)
来禁用此功能。
如果您有一个由多个 Python 进程组成的多阶段管道,并且希望您的 Dask 集群在它们之间持久存在,这将非常有用。
如果您希望使用现有集群或将来手动调用 cluster.close()
,您也可以使用 cluster = KubeCluster.from_name(name="rapids-dask")
将 KubeCluster
对象连接到现有集群。