Dask Operator#
RAPIDS 中的许多库可以利用 Dask 将计算扩展到多个 GPU 和多个节点上。Dask 为 Kubernetes 提供了一个 Operator,它允许您将 Dask 集群作为原生的 Kubernetes 资源启动。
通过 Operator 和相关的 Custom Resource Definitions (CRD),您可以创建 DaskCluster、DaskWorkerGroup 和 DaskJob 资源来描述您的 Dask 组件,Operator 将创建相应的 Kubernetes 资源,例如 Pods 和 Services 来启动集群。
graph TD
DaskJob(DaskJob)
DaskCluster(DaskCluster)
SchedulerService(Scheduler Service)
SchedulerPod(Scheduler Pod)
DaskWorkerGroup(DaskWorkerGroup)
WorkerPodA(Worker Pod A)
WorkerPodB(Worker Pod B)
WorkerPodC(Worker Pod C)
JobPod(Job Runner Pod)
DaskJob --> DaskCluster
DaskJob --> JobPod
DaskCluster --> SchedulerService
SchedulerService --> SchedulerPod
DaskCluster --> DaskWorkerGroup
DaskWorkerGroup --> WorkerPodA
DaskWorkerGroup --> WorkerPodB
DaskWorkerGroup --> WorkerPodC
classDef dask stroke:#FDA061,stroke-width:4px
classDef dashed stroke-dasharray: 5 5
class DaskJob dask
class DaskCluster dask
class DaskWorkerGroup dask
class SchedulerService dashed
class SchedulerPod dashed
class WorkerPodA dashed
class WorkerPodB dashed
class WorkerPodC dashed
class JobPod dashed
安装#
您的 Kubernetes 集群必须拥有 GPU 节点并安装了最新的 NVIDIA 驱动程序。
要安装 Dask Operator,请遵循Dask 文档中的说明。
配置 RAPIDS DaskCluster#
要配置 DaskCluster 资源以运行 RAPIDS,您需要设置几项内容
容器镜像必须包含 RAPIDS,官方 RAPIDS 容器镜像是一个不错的选择。
Dask Worker 必须配置一个或多个 NVIDIA GPU 资源。
Worker 命令必须设置为
dask-cuda-worker。
使用 kubectl 的示例#
这是一个用于启动 RAPIDS Dask 集群的资源清单示例。
# rapids-dask-cluster.yaml
apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
name: rapids-dask-cluster
labels:
dask.org/cluster-name: rapids-dask-cluster
spec:
worker:
replicas: 2
spec:
containers:
- name: worker
image: "nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12"
imagePullPolicy: "IfNotPresent"
args:
- dask-cuda-worker
- --name
- $(DASK_WORKER_NAME)
resources:
limits:
nvidia.com/gpu: "1"
scheduler:
spec:
containers:
- name: scheduler
image: "nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12"
imagePullPolicy: "IfNotPresent"
env:
args:
- dask-scheduler
ports:
- name: tcp-comm
containerPort: 8786
protocol: TCP
- name: http-dashboard
containerPort: 8787
protocol: TCP
readinessProbe:
httpGet:
port: http-dashboard
path: /health
initialDelaySeconds: 5
periodSeconds: 10
livenessProbe:
httpGet:
port: http-dashboard
path: /health
initialDelaySeconds: 15
periodSeconds: 20
service:
type: ClusterIP
selector:
dask.org/cluster-name: rapids-dask-cluster
dask.org/component: scheduler
ports:
- name: tcp-comm
protocol: TCP
port: 8786
targetPort: "tcp-comm"
- name: http-dashboard
protocol: TCP
port: 8787
targetPort: "http-dashboard"
您可以使用 kubectl 创建此集群。
$ kubectl apply -f rapids-dask-cluster.yaml
清单分解#
让我们逐节分解此清单。
元数据#
顶部我们看到 DaskCluster 资源类型和通用元数据。
apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
name: rapids-dask-cluster
labels:
dask.org/cluster-name: rapids-dask-cluster
spec:
worker:
# ...
scheduler:
# ...
然后在 spec 内部,我们有 worker 和 scheduler 部分。
Worker#
Worker 包含一个 replicas 选项来设置您需要的 Worker 数量,以及一个 spec 来描述每个 Worker Pod 应如何配置。该 spec 是一个嵌套的 Pod spec,Operator 将在创建新的 Pod 资源时使用它。
# ...
spec:
worker:
replicas: 2
spec:
containers:
- name: worker
image: "nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12"
imagePullPolicy: "IfNotPresent"
args:
- dask-cuda-worker
- --name
- $(DASK_WORKER_NAME)
resources:
limits:
nvidia.com/gpu: "1"
scheduler:
# ...
在我们的 Pod spec 内部,我们配置了一个使用 rapidsai/base 容器镜像的容器。它还设置了 args 来启动 dask-cuda-worker 并配置一个 NVIDIA GPU。
Scheduler#
接下来我们有一个 scheduler 部分,其中也包含 scheduler Pod 的 spec 和一个 service,Operator 将使用它来创建 Service 资源以暴露 scheduler。
# ...
spec:
worker:
# ...
scheduler:
spec:
containers:
- name: scheduler
image: "nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12"
imagePullPolicy: "IfNotPresent"
args:
- dask-scheduler
ports:
- name: tcp-comm
containerPort: 8786
protocol: TCP
- name: http-dashboard
containerPort: 8787
protocol: TCP
readinessProbe:
httpGet:
port: http-dashboard
path: /health
initialDelaySeconds: 5
periodSeconds: 10
livenessProbe:
httpGet:
port: http-dashboard
path: /health
initialDelaySeconds: 15
periodSeconds: 20
service:
# ...
对于 scheduler Pod,我们同样设置了 rapidsai/base 容器镜像,主要是为了确保 scheduler 和 worker 之间的 Dask 版本一致。我们确保配置了 dask-scheduler 命令。
然后我们配置 Dask 通信端口 8786 和 Dask dashboard 端口 8787,并添加一些探针,以便 Kubernetes 可以监控 scheduler 的健康状况。
注意
如果您的 Kubernetes 集群使用 Istio,则端口必须带有 tcp- 和 http- 前缀,以确保 Envoy proxy 不会篡改流量。
然后我们配置 Service。
# ...
spec:
worker:
# ...
scheduler:
spec:
# ...
service:
type: ClusterIP
selector:
dask.org/cluster-name: rapids-dask-cluster
dask.org/component: scheduler
ports:
- name: tcp-comm
protocol: TCP
port: 8786
targetPort: "tcp-comm"
- name: http-dashboard
protocol: TCP
port: 8787
targetPort: "http-dashboard"
本例展示了使用 ClusterIP Service,它不会将 Dask 集群暴露在 Kubernetes 外部。如果您愿意,可以将其设置为 LoadBalancer 或 NodePort 以使其可从外部访问。
它有一个与 scheduler Pod 匹配的 selector,并配置了相同的端口。
访问您的 Dask 集群#
创建 DaskCluster 资源后,我们可以使用 kubectl 检查它为我们创建的所有其他资源的状态。
$ kubectl get all -l dask.org/cluster-name=rapids-dask-cluster
NAME READY STATUS RESTARTS AGE
pod/rapids-dask-cluster-default-worker-group-worker-0c202b85fd 1/1 Running 0 4m13s
pod/rapids-dask-cluster-default-worker-group-worker-ff5d376714 1/1 Running 0 4m13s
pod/rapids-dask-cluster-scheduler 1/1 Running 0 4m14s
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/rapids-dask-cluster-service ClusterIP 10.96.223.217 <none> 8786/TCP,8787/TCP 4m13s
在这里您可以看到我们的 scheduler Pod 和两个 worker Pod 以及 scheduler Service。
如果您在 Kubernetes 集群内运行 Python 会话(例如Kubernetes 页面上的示例),您应该可以直接连接 Dask 分布式客户端。
from dask.distributed import Client
client = Client("rapids-dask-cluster-scheduler:8786")
或者,如果您在 Kubernetes 集群外部,可以更改 Service 使用 LoadBalancer 或 NodePort,或者使用 kubectl 在本地进行端口转发连接。
$ kubectl port-forward svc/rapids-dask-cluster-service 8786:8786
Forwarding from 127.0.0.1:8786 -> 8786
from dask.distributed import Client
client = Client("localhost:8786")
使用 KubeCluster 的示例#
除了通过 kubectl 创建集群外,您还可以使用 Python 中的 dask_kubernetes.operator.KubeCluster 来完成。此类实现了 Dask 集群管理器接口,并在底层为您创建和管理 DaskCluster 资源。
from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(
name="rapids-dask",
image="nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12",
n_workers=3,
resources={"limits": {"nvidia.com/gpu": "1"}},
worker_command="dask-cuda-worker",
)
如果我们使用 kubectl 检查,可以看到上面的 Python 代码生成了与上面 kubectl 示例相同的 DaskCluster 资源。
$ kubectl get daskclusters
NAME AGE
rapids-dask-cluster 3m28s
$ kubectl get all -l dask.org/cluster-name=rapids-dask-cluster
NAME READY STATUS RESTARTS AGE
pod/rapids-dask-cluster-default-worker-group-worker-07d674589a 1/1 Running 0 3m30s
pod/rapids-dask-cluster-default-worker-group-worker-a55ed88265 1/1 Running 0 3m30s
pod/rapids-dask-cluster-default-worker-group-worker-df785ab050 1/1 Running 0 3m30s
pod/rapids-dask-cluster-scheduler 1/1 Running 0 3m30s
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/rapids-dask-cluster-service ClusterIP 10.96.200.202 <none> 8786/TCP,8787/TCP 3m30s
有了 Python 中的这个集群对象,我们也可以直接连接客户端,无需知道地址,因为 Dask 会为我们发现。如果您在 Kubernetes 集群外部,它还会自动设置端口转发。
from dask.distributed import Client
client = Client(cluster)
此对象也可用于扩缩 worker。
cluster.scale(5)
并手动关闭集群。
cluster.close()
注意
默认情况下,KubeCluster 命令会注册一个退出钩子,因此当 Python 进程退出时,集群会自动删除。您可以在启动集群时通过设置 KubeCluster(..., shutdown_on_close=False) 来禁用此功能。
如果您有一个由多个 Python 进程组成的多阶段管道,并且希望您的 Dask 集群在它们之间持久存在,这将非常有用。
如果您希望使用现有集群或将来手动调用 cluster.close(),您也可以使用 cluster = KubeCluster.from_name(name="rapids-dask") 将 KubeCluster 对象连接到现有集群。