自动伸缩多租户 Kubernetes 深度解析#

在本示例中,我们将深入探讨如何在 Kubernetes 上启动自动伸缩的多租户 RAPIDS 环境。

能够扩展您的工作负载并仅为您使用的资源付费是使用 RAPIDS 时节省成本的绝佳方式。如果您组织中有许多人都希望能够这样做,您可以将资源汇集到一个自动伸缩的 Kubernetes 集群中,从而获得额外的好处。

让我们逐步完成在 Google Cloud 上启动 Kubernetes 集群所需的步骤,然后模拟许多用户共享集群的工作负载。然后我们可以从用户角度和成本角度探讨这种体验如何。

先决条件#

在我们开始之前,您需要确保安装了几个 CLI 工具。

获取 Kubernetes 集群#

在本示例中,我们将使用 Google Cloud 的 Google Kubernetes Engine (GKE) 启动集群。

参阅文档

我们将遵循 RAPIDS GKE 部署说明,但会修改集群创建命令以启用 Kubernetes 集群的开箱即用自动伸缩功能。

--enable-autoscaling --autoscaling-profile optimize-utilization \
--num-nodes 1 --min-nodes 1 --max-nodes 20

数据科学容器镜像通常很大,因此我们将启用镜像流式传输以加快容器创建速度。

--image-type="COS_CONTAINERD" --enable-image-streaming
! gcloud container clusters create multi-tenant-rapids \
    --accelerator type=nvidia-tesla-t4,count=2 --machine-type n1-standard-4 \
    --region us-central1 --node-locations us-central1-b,us-central1-c \
    --release-channel stable \
    --enable-autoscaling --autoscaling-profile optimize-utilization \
    --num-nodes 1 --min-nodes 1 --max-nodes 20 \
    --image-type="COS_CONTAINERD" --enable-image-streaming
Default change: VPC-native is the default mode during cluster creation for versions greater than 1.21.0-gke.1500. To create advanced routes based clusters, please pass the `--no-enable-ip-alias` flag
Default change: During creation of nodepools or autoscaling configuration changes for cluster versions greater than 1.24.1-gke.800 a default location policy is applied. For Spot and PVM it defaults to ANY, and for all other VM kinds a BALANCED policy is used. To change the default values use the `--location-policy` flag.
Note: Your Pod address range (`--cluster-ipv4-cidr`) can accommodate at most 1008 node(s).
Note: Machines with GPUs have certain limitations which may affect your workflow. Learn more at https://cloud.google.com/kubernetes-engine/docs/how-to/gpus
Creating cluster multi-tenant-rapids in us-central1... Cluster is being configu
red...⠼                                                                        
Creating cluster multi-tenant-rapids in us-central1... Cluster is being deploye
d...⠏                                                                          
Creating cluster multi-tenant-rapids in us-central1... Cluster is being health-
checked (master is healthy)...done.                                            
Created [https://container.googleapis.com/v1/projects/nv-ai-infra/zones/us-central1/clusters/multi-tenant-rapids].
To inspect the contents of your cluster, go to: https://console.cloud.google.com/kubernetes/workload_/gcloud/us-central1/multi-tenant-rapids?project=nv-ai-infra
kubeconfig entry generated for multi-tenant-rapids.
NAME                 LOCATION     MASTER_VERSION    MASTER_IP       MACHINE_TYPE   NODE_VERSION      NUM_NODES  STATUS
multi-tenant-rapids  us-central1  1.23.14-gke.1800  104.197.37.225  n1-standard-4  1.23.14-gke.1800  2          RUNNING

现在我们有了集群,让我们 安装 NVIDIA 驱动程序

! kubectl apply -f https://raw.githubusercontent.com/GoogleCloudPlatform/container-engine-accelerators/master/nvidia-driver-installer/cos/daemonset-preloaded-latest.yaml
daemonset.apps/nvidia-driver-installer created

可观测性#

在我们的 Kubernetes 集群上运行了一些工作负载后,我们会想要回顾集群的遥测数据,查看自动伸缩的表现。为此,让我们安装 Prometheus,以便记录集群指标,稍后进行探索。

Prometheus Stack#

首先,让我们安装 Kubernetes Prometheus Stack,其中包含了在我们的集群上运行 Prometheus 所需的一切。

我们需要添加一些额外的配置选项,以确保 Prometheus 频繁收集数据以便分析,这些选项可以在 prometheus-stack-values.yaml 中找到。

! cat prometheus-stack-values.yaml
# prometheus-stack-values.yaml
serviceMonitorSelectorNilUsesHelmValues: false

prometheus:
  prometheusSpec:
    # Setting this to a high frequency so that we have richer data for analysis later
    scrapeInterval: 1s
! helm install --repo https://prometheus-community.github.io/helm-charts kube-prometheus-stack kube-prometheus-stack \
   --create-namespace --namespace prometheus \
   --values prometheus-stack-values.yaml
NAME: kube-prometheus-stack
LAST DEPLOYED: Tue Feb 21 09:19:39 2023
NAMESPACE: prometheus
STATUS: deployed
REVISION: 1
NOTES:
kube-prometheus-stack has been installed. Check its status by running:
  kubectl --namespace prometheus get pods -l "release=kube-prometheus-stack"

Visit https://github.com/prometheus-operator/kube-prometheus for instructions on how to create & configure Alertmanager and Prometheus instances using the Operator.

现在 Prometheus 正在运行并收集数据,我们可以继续安装 RAPIDS 并运行一些工作负载。稍后当我们想探索收集到的数据时,会再回到这些工具。

安装 RAPIDS#

对于这次 RAPIDS 安装,我们将使用单个 Jupyter Notebook PodDask Operator。在实际部署中,您会使用类似 JupyterHubKubeflow Notebooks 的工具创建带用户认证的 Notebook 启动服务,但这超出了本示例的范围。

参阅文档

在 Kubernetes 上安装 RAPIDS 有许多方法。您可以在文档中找到所有各种方法的详细说明。

镜像流式传输 (可选)#

为了将容器镜像流式传输到 GKE 节点,我们的镜像需要存储在与集群同一区域的 Google Cloud Artifact Registry 中。

$ docker pull nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12

$ docker tag nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12 REGION-docker.pkg.dev/PROJECT/REPO/IMAGE:TAG

$ docker push REGION-docker.pkg.dev/PROJECT/REPO/IMAGE:TAG

请务必将 Notebook 中使用的镜像替换为您已推送到您自己的 Google Cloud 项目中的镜像。

镜像预拉取器 (可选)#

如果您知道许多用户会频繁拉取某个特定的容器镜像,我喜欢运行一个小的 DaemonSet,以确保镜像在新节点加入集群后立即开始流式传输到该节点。这是可选的,但可以减少用户的等待时间。

! cat ./image-prepuller.yaml
# image-prepuller.yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: prepull-rapids
spec:
  selector:
    matchLabels:
      name: prepull-rapids
  template:
    metadata:
      labels:
        name: prepull-rapids
    spec:
      initContainers:
        - name: prepull-rapids
          image: us-central1-docker.pkg.dev/nv-ai-infra/rapidsai/rapidsai/base:example
          command: ["sh", "-c", "'true'"]
      containers:
        - name: pause
          image: gcr.io/google_containers/pause
! kubectl apply -f image-prepuller.yaml
daemonset.apps/prepull-rapids created

RAPIDS Notebook Pod#

现在让我们启动一个 Notebook Pod。

注意

从这个 Pod 中,我们将需要能够在 Kubernetes 上启动 Dask 集群资源,因此我们需要确保该 Pod 具有与 Kubernetes API 交互的适当权限。

参阅文档

查看扩展的 Notebook 配置文档了解更多详细信息。

! kubectl apply -f rapids-notebook.yaml
serviceaccount/rapids-dask created
role.rbac.authorization.k8s.io/rapids-dask created
rolebinding.rbac.authorization.k8s.io/rapids-dask created
configmap/jupyter-server-proxy-config created
service/rapids-notebook created
pod/rapids-notebook created

安装 Dask Operator#

最后,我们需要安装 Dask Operator,以便我们可以从 Notebook 会话中启动 RAPIDS Dask 集群。

参阅文档

参阅 RAPIDS Dask Operator 文档了解更多信息。

! helm install --repo https://helm.dask.org dask-kubernetes-operator \
    --generate-name --create-namespace --namespace dask-operator 
NAME: dask-kubernetes-operator-1676971371
LAST DEPLOYED: Tue Feb 21 09:23:06 2023
NAMESPACE: dask-operator
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
Operator has been installed successfully.

运行一些工作#

接下来,让我们连接到 Jupyter 会话并在我们的集群上运行一些工作。您可以通过将 Jupyter 服务端口转发到本地机器来完成此操作。

$ kubectl port-forward svc/rapids-notebook 8888:8888                                                                                                                        
Forwarding from 127.0.0.1:8888 -> 8888
Forwarding from [::1]:8888 -> 8888

然后在浏览器中打开 http://localhost:8888。

注意

如果您正在本地跟着这个 Notebook 操作,您也会想要将其上传到 Jupyter 会话并从那里继续运行单元格。

检查能力#

让我们通过检查我们的能力来确保环境已正确设置。我们可以先运行 nvidia-smi 来检查我们的 Notebook GPU。

! nvidia-smi
Tue Feb 21 14:50:01 2023       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 510.47.03    Driver Version: 510.47.03    CUDA Version: 11.6     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|===============================+======================+======================|
|   0  Tesla T4            Off  | 00000000:00:04.0 Off |                    0 |
| N/A   41C    P8    14W /  70W |      0MiB / 15360MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Processes:                                                                  |
|  GPU   GI   CI        PID   Type   Process name                  GPU Memory |
|        ID   ID                                                   Usage      |
|=============================================================================|
|  No running processes found                                                 |
+-----------------------------------------------------------------------------+

很好,我们可以看到我们的 Notebook 有一个 NVIDIA T4。现在让我们使用 kubectl 来检查我们的集群。实际上,我们的远程 Jupyter 环境中没有安装 kubectl,所以先做这件事。

! mamba install --quiet -c conda-forge kubernetes-client -y
Preparing transaction: ...working... done
Verifying transaction: ...working... done
Executing transaction: ...working... done
! kubectl get pods
NAME                   READY   STATUS    RESTARTS   AGE
prepull-rapids-l5qgt   1/1     Running   0          3m24s
prepull-rapids-w8xcj   1/1     Running   0          3m24s
rapids-notebook        1/1     Running   0          2m54s

我们可以看到我们之前创建的预拉取 Pod 以及我们当前所在的 rapids-notebook Pod。由于我们通过 DaemonSet 创建了预拉取 Pod,我们也知道我们的 Kubernetes 集群中有两个节点,因为有两个预拉取 Pod。随着集群的扩展,我们会看到更多这样的 Pod 出现。

! kubectl get daskclusters
No resources found in default namespace.

我们还可以看到当前没有 DaskCluster 资源,但这 நல்லது,因为我们没有收到 server doesn't have a resource type "daskclusters" 错误,所以我们知道 Dask Operator 也成功安装了。

小型工作负载#

让我们运行一个小型的 RAPIDS 工作负载,稍微拉伸一下我们的 Kubernetes 集群,并使其进行伸缩。

我们知道我们的 Kubernetes 集群中有两个节点,并且我们在 GKE 上启动时选择了一个带有 2 个 GPU 的节点类型。我们的 Notebook Pod 占用了一个 GPU,所以还剩三个。如果我们启动一个 Dask 集群,Scheduler 需要一个 GPU,每个 Worker 需要一个 GPU。所以让我们创建一个带有四个 Worker 的 Dask 集群,这将导致我们的 Kubernetes 再添加一个节点。

首先让我们安装 dask-kubernetes,这样我们就可以从 Python 创建 DaskCluster 资源。我们还将安装 gcsfs,以便我们的工作负载可以从 Google Cloud Storage 读取数据。

! mamba install --quiet -c conda-forge dask-kubernetes gcsfs -y
Preparing transaction: ...working... done
Verifying transaction: ...working... done
Executing transaction: ...working... done
from dask_kubernetes.operator import KubeCluster

cluster = KubeCluster(
    name="rapids-dask-1",
    image="nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12",  # Replace me with your cached image
    n_workers=4,
    resources={"limits": {"nvidia.com/gpu": "1"}},
    env={"EXTRA_PIP_PACKAGES": "gcsfs"},
    worker_command="dask-cuda-worker",
)
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f81225ac820>
Unclosed connection
client_connection: Connection<ConnectionKey(host='10.32.0.1', port=443, is_ssl=True, ssl=None, proxy=None, proxy_auth=None, proxy_headers_hash=None)>

很好,我们的 Dask 集群已创建,但目前只有一个 Scheduler 和一半的 Worker。我们可以使用 kubectl 查看正在发生的事情。

! kubectl get pods
NAME                                      READY   STATUS    RESTARTS   AGE
prepull-rapids-l5qgt                      1/1     Running   0          6m18s
prepull-rapids-w8xcj                      1/1     Running   0          6m18s
rapids-dask-1-default-worker-5f59bc8e7a   0/1     Pending   0          68s
rapids-dask-1-default-worker-88ab088b7c   0/1     Pending   0          68s
rapids-dask-1-default-worker-b700343afe   1/1     Running   0          68s
rapids-dask-1-default-worker-e0bb7fff2d   1/1     Running   0          68s
rapids-dask-1-scheduler                   1/1     Running   0          69s
rapids-notebook                           1/1     Running   0          5m48s

我们在这里看到我们的大部分 Pod 都处于 Running 状态,但有两个 Worker 处于 Pending 状态。这是因为我们现在没有足够的 GPU 供它们使用。我们可以查看待定 Pod 的事件以获取更多信息。

! kubectl get event --field-selector involvedObject.name=rapids-dask-1-default-worker-5f59bc8e7a
LAST SEEN   TYPE      REASON             OBJECT                                        MESSAGE
50s         Warning   FailedScheduling   pod/rapids-dask-1-default-worker-5f59bc8e7a   0/2 nodes are available: 2 Insufficient nvidia.com/gpu.
12s         Normal    TriggeredScaleUp   pod/rapids-dask-1-default-worker-5f59bc8e7a   pod triggered scale-up: [{https://www.googleapis.com/compute/v1/projects/nv-ai-infra/zones/us-central1-b/instanceGroups/gke-multi-tenant-rapids-default-pool-3a6a793f-grp 1->2 (max: 20)}]

在这里,我们可以看到我们的 Pod 触发了集群从一个节点扩展到两个节点。如果我们等待新节点上线,应该会看到一些事情发生。

  • 首先,将在新节点上调度一个新的预拉取 Pod,它将开始流式传输 RAPIDS 容器镜像。

  • kube-system 命名空间中的其他 Pod 将被调度来安装 NVIDIA 驱动程序并更新 Kubernetes API。

  • 然后一旦 GPU 驱动程序安装完成,Worker Pod 将被调度到我们的新节点上。

  • 然后一旦镜像准备好,我们的 Pod 将进入 Running 阶段。

! kubectl get pods -w
NAME                                      READY   STATUS    RESTARTS   AGE
prepull-rapids-l5qgt                      1/1     Running   0          6m41s
prepull-rapids-w8xcj                      1/1     Running   0          6m41s
rapids-dask-1-default-worker-5f59bc8e7a   0/1     Pending   0          91s
rapids-dask-1-default-worker-88ab088b7c   0/1     Pending   0          91s
rapids-dask-1-default-worker-b700343afe   1/1     Running   0          91s
rapids-dask-1-default-worker-e0bb7fff2d   1/1     Running   0          91s
rapids-dask-1-scheduler                   1/1     Running   0          92s
rapids-notebook                           1/1     Running   0          6m11s
prepull-rapids-69pbq                      0/1     Pending   0          0s
prepull-rapids-69pbq                      0/1     Pending   0          0s
prepull-rapids-69pbq                      0/1     Init:0/1   0          4s
rapids-dask-1-default-worker-88ab088b7c   0/1     Pending    0          2m3s
prepull-rapids-69pbq                      0/1     Init:0/1   0          9s
prepull-rapids-69pbq                      0/1     PodInitializing   0          15s
rapids-dask-1-default-worker-5f59bc8e7a   0/1     Pending           0          2m33s
prepull-rapids-69pbq                      1/1     Running           0          3m7s
rapids-dask-1-default-worker-5f59bc8e7a   0/1     Pending           0          5m13s
rapids-dask-1-default-worker-88ab088b7c   0/1     Pending           0          5m13s
rapids-dask-1-default-worker-5f59bc8e7a   0/1     ContainerCreating   0          5m14s
rapids-dask-1-default-worker-88ab088b7c   0/1     ContainerCreating   0          5m14s
rapids-dask-1-default-worker-5f59bc8e7a   1/1     Running             0          5m26s
rapids-dask-1-default-worker-88ab088b7c   1/1     Running             0          5m26s
^C

太棒了,我们现在可以在 Dask 集群上运行一些工作了。

from dask.distributed import Client, wait

client = Client(cluster)
client

客户端

客户端-3722820c-b1f8-11ed-8042-fa6ca111b70e

连接方法: Cluster 对象 集群类型: dask_kubernetes.KubeCluster
仪表板: /proxy/rapids-dask-1-scheduler.default:8787/status

集群信息

让我们从 GCS 将一些数据加载到 GPU 内存中。

%%time
import dask.config
import dask.dataframe as dd

dask.config.set({"dataframe.backend": "cudf"})

df = dd.read_parquet(
    "gcs://anaconda-public-data/nyc-taxi/2015.parquet/part.1*",
    storage_options={"token": "anon"},
).persist()
wait(df)
df

现在我们可以进行一些计算。这可以是您想对数据做的任何事情,在本示例中,让我们做一些快速的事情,比如计算 pickup 和 dropoff 位置之间的 haversine 距离(是的,在 ~100M 行上计算这个对 RAPIDS 来说是一项快速任务 😁)。

import cuspatial


def map_haversine(part):
    pickup = cuspatial.GeoSeries.from_points_xy(
        part[["pickup_longitude", "pickup_latitude"]].interleave_columns()
    )
    dropoff = cuspatial.GeoSeries.from_points_xy(
        part[["dropoff_longitude", "dropoff_latitude"]].interleave_columns()
    )
    return cuspatial.haversine_distance(pickup, dropoff)


df["haversine_distance"] = df.map_partitions(map_haversine)
%%time
df["haversine_distance"].compute()
CPU times: user 1.44 s, sys: 853 ms, total: 2.29 s
Wall time: 4.66 s
tpep_pickup_datetime
2015-01-01 00:00:00       4.326464
2015-01-01 00:00:00    8666.633292
2015-01-01 00:00:00       1.285498
2015-01-01 00:00:01       0.827326
2015-01-01 00:00:03       2.267110
                          ...     
2015-12-31 23:59:56       1.570824
2015-12-31 23:59:58       2.340270
2015-12-31 23:59:59       2.801575
2015-12-31 23:59:59       5.091840
2015-12-31 23:59:59       0.927577
Name: haversine_distance, Length: 146112989, dtype: float64

很好,所以我们现在有一个小小的模拟工作负载,它打开一些数据,进行一些计算,并花费一些时间。

让我们移除单个 Dask 集群,转而模拟同时运行的许多工作负载。

client.close()
cluster.close()

模拟多个多租户工作负载#

现在我们有一个模拟工作负载,可以用来代表我们多租户集群上的一个用户。

现在让我们构建一个更大的图来模拟大量用户启动 Dask 集群并运行工作负载的情况。

首先,让我们创建一个包含我们整个工作负载(包括集群设置)的函数。

import dask.delayed


@dask.delayed
def run_haversine(*args):
    import uuid

    import dask.config
    import dask.dataframe as dd
    from dask.distributed import Client
    from dask_kubernetes.operator import KubeCluster

    dask.config.set({"dataframe.backend": "cudf"})

    def map_haversine(part):
        from cuspatial import haversine_distance

        return haversine_distance(
            part["pickup_longitude"],
            part["pickup_latitude"],
            part["dropoff_longitude"],
            part["dropoff_latitude"],
        )

    with KubeCluster(
        name="rapids-dask-" + uuid.uuid4().hex[:5],
        image="nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12",  # Replace me with your cached image
        n_workers=2,
        resources={"limits": {"nvidia.com/gpu": "1"}},
        env={"EXTRA_PIP_PACKAGES": "gcsfs"},
        worker_command="dask-cuda-worker",
        resource_timeout=600,
    ) as cluster:
        with Client(cluster) as client:
            client.wait_for_workers(2)
            df = dd.read_parquet(
                "gcs://anaconda-public-data/nyc-taxi/2015.parquet",
                storage_options={"token": "anon"},
            )
            client.compute(df.map_partitions(map_haversine))

现在,如果我们运行这个函数,我们将启动一个 Dask 集群并运行我们的工作负载。我们将使用上下文管理器来确保我们的 Dask 集群在工作完成后被清理。鉴于我们没有活动的 Dask 集群,这个函数将在 Notebook Pod 上执行。

%%time
run_haversine().compute()
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7fd0f4b8b340>
Unclosed connection
client_connection: Connection<ConnectionKey(host='10.32.0.1', port=443, is_ssl=True, ssl=None, proxy=None, proxy_auth=None, proxy_headers_hash=None)>
CPU times: user 194 ms, sys: 30 ms, total: 224 ms
Wall time: 23.6 s

太棒了,它工作了,所以我们有了一个自包含的 RAPIDS 工作负载,它可以启动自己的 Dask 集群并执行一些工作。

模拟我们的多租户工作负载#

为了查看当许多用户共享 Kubernetes 集群时它的行为如何,我们将多次运行我们的 haversine 工作负载。

注意

如果您对我们如何模拟此工作负载不感兴趣,请随时跳到分析部分。

为此,我们可以创建另一个 Dask 集群,用它来引导我们的工作负载。这个集群将充当我们用户将与之交互的 Jupyter 会话的代理。然后,我们将构建一个 Dask 图,该图将在各种配置下多次运行我们的 haversine 工作负载,以模拟不同用户按需提交不同工作负载的情况。

from dask_kubernetes.operator import KubeCluster, make_cluster_spec

cluster_spec = make_cluster_spec(
    name="mock-jupyter-cluster",
    image="nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12",  # Replace me with your cached image
    n_workers=1,
    resources={"limits": {"nvidia.com/gpu": "1"}, "requests": {"cpu": "50m"}},
    env={"EXTRA_PIP_PACKAGES": "gcsfs dask-kubernetes"},
)
cluster_spec["spec"]["worker"]["spec"]["serviceAccountName"] = "rapids-dask"

cluster = KubeCluster(custom_cluster_spec=cluster_spec)
cluster
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f80300cfd30>

我们需要确保我们的 Worker 具有与此处的 Notebook 会话相同的依赖项,以便它可以启动更多的 Dask 集群,因此我们安装了 gcsfsdask-kubernetes

client = Client(cluster)
client

客户端

客户端-85a16987-b1f9-11ed-8042-fa6ca111b70e

连接方法: Cluster 对象 集群类型: dask_kubernetes.KubeCluster
仪表板: /proxy/mock-jupyter-cluster-scheduler.default:8787/status

集群信息

现在让我们再次提交工作负载,但这次是提交到我们的集群。我们的函数将被发送到我们的“Jupyter”Worker,然后该 Worker 将启动另一个 Dask 集群来运行工作负载。我们的集群中没有足够的 GPU 来完成此操作,因此这将触发另一次伸缩操作。

%%time
run_haversine().compute()
CPU times: user 950 ms, sys: 9.1 ms, total: 959 ms
Wall time: 27.1 s

现在让我们编写一个小型函数,我们可以用它来构建任意复杂的工作负载。我们可以定义有多少阶段、应该有多少并发的 Dask 集群、随时间变化的宽度速度等。

from random import randrange


def generate_workload(
    stages=3, min_width=1, max_width=3, variation=1, input_workload=None
):
    graph = [input_workload] if input_workload is not None else [run_haversine()]
    last_width = min_width
    for _ in range(stages):
        width = randrange(
            max(min_width, last_width - variation),
            min(max_width, last_width + variation) + 1,
        )
        graph = [run_haversine(*graph) for _ in range(width)]
        last_width = width
    return run_haversine(*graph)
cluster.scale(3)  # Let's also bump up our user cluster to show more users logging in.

为了可视化我们的图,让我们检查是否安装了 graphviz

!mamba install -c conda-forge --quiet graphviz python-graphviz -y
Preparing transaction: ...working... done
Verifying transaction: ...working... done
Executing transaction: ...working... 

done

让我们从一个小型工作负载开始,它将运行几个阶段并触发一次伸缩。

workload = generate_workload(stages=2, max_width=2)
workload.visualize()
../../../_images/cccc976825e9ae4a19bbbfb8e30993d047df658a2730c0537310768bdb2d353c.png

这太好了,我们有多个阶段,其中一两个用户同时运行工作负载。现在让我们将这些工作负载串联起来,以模拟更长时间段内不同的需求。

我们还将跟踪运行的开始时间和结束时间,以便稍后可以从 Prometheus 获取正确的数据。

import datetime

警告

下一个单元格将运行大约 1 小时。

%%time
start_time = (datetime.datetime.now() - datetime.timedelta(minutes=15)).strftime(
    "%Y-%m-%dT%H:%M:%SZ"
)
try:
    # Start with a couple of concurrent workloads
    workload = generate_workload(stages=10, max_width=2)
    # Then increase demand as more users appear
    workload = generate_workload(
        stages=5, max_width=5, min_width=3, variation=5, input_workload=workload
    )
    # Now reduce the workload for a longer period of time, this could be over a lunchbreak or something
    workload = generate_workload(stages=30, max_width=2, input_workload=workload)
    # Everyone is back from lunch and it hitting the cluster hard
    workload = generate_workload(
        stages=10, max_width=10, min_width=3, variation=5, input_workload=workload
    )
    # The after lunch rush is easing
    workload = generate_workload(
        stages=5, max_width=5, min_width=3, variation=5, input_workload=workload
    )
    # As we get towards the end of the day demand slows off again
    workload = generate_workload(stages=10, max_width=2, input_workload=workload)
    workload.compute()
finally:
    client.close()
    cluster.close()
    end_time = (datetime.datetime.now() + datetime.timedelta(minutes=15)).strftime(
        "%Y-%m-%dT%H:%M:%SZ"
    )
Task exception was never retrieved
future: <Task finished name='Task-724' coro=<Client._gather.<locals>.wait() done, defined at /opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/client.py:2119> exception=AllExit()>
Traceback (most recent call last):
  File "/opt/conda/envs/rapids/lib/python3.9/site-packages/distributed/client.py", line 2128, in wait
    raise AllExit()
distributed.client.AllExit
CPU times: user 2min 43s, sys: 3.04 s, total: 2min 46s
Wall time: 1h 18min 18s

好的,太好了,我们的巨型工作负载图导致在整个运行期间启动了大约 200 个具有不同容量需求的集群,并且运行时间刚刚超过一小时。

分析#

让我们探索我们使用 Prometheus 收集的数据,看看我们的集群在模拟工作负载期间的表现。我们可以在 Grafana 中进行此操作,但我们选择留在 Notebook 中使用 prometheus-pandas

! pip install prometheus-pandas
Collecting prometheus-pandas
  Downloading prometheus_pandas-0.3.2-py3-none-any.whl (6.1 kB)
Requirement already satisfied: numpy in /opt/conda/envs/rapids/lib/python3.9/site-packages (from prometheus-pandas) (1.23.5)
Requirement already satisfied: pandas in /opt/conda/envs/rapids/lib/python3.9/site-packages (from prometheus-pandas) (1.5.2)
Requirement already satisfied: python-dateutil>=2.8.1 in /opt/conda/envs/rapids/lib/python3.9/site-packages (from pandas->prometheus-pandas) (2.8.2)
Requirement already satisfied: pytz>=2020.1 in /opt/conda/envs/rapids/lib/python3.9/site-packages (from pandas->prometheus-pandas) (2022.6)
Requirement already satisfied: six>=1.5 in /opt/conda/envs/rapids/lib/python3.9/site-packages (from python-dateutil>=2.8.1->pandas->prometheus-pandas) (1.16.0)
Installing collected packages: prometheus-pandas
Successfully installed prometheus-pandas-0.3.2
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pythonlang.cn/warnings/venv

连接到集群内的 Prometheus 端点。

from prometheus_pandas import query

p = query.Prometheus("http://kube-prometheus-stack-prometheus.prometheus:9090")

待定 Pod#

首先让我们看看每个 Pod 在 Pending 阶段花费了多长时间。这是用户创建 Dask 集群时必须等待其工作开始运行的时间。

pending_pods = p.query_range(
    'kube_pod_status_phase{phase="Pending",namespace="default"}',
    start_time,
    end_time,
    "1s",
).sum()
from dask.utils import format_time

Pod 创建的平均时间。

format_time(pending_pods.median())
'2.00 s'
format_time(pending_pods.mean())
'22.35 s'

Pod 创建时间的第 99 百分位。

format_time(pending_pods.quantile(0.99))
'326.00 s'

这些数字看起来很棒,集群最常见的启动时间是两秒钟!平均时间约为 20 秒。但是,如果您的集群触发 Kubernetes 进行扩容,您可能需要等待 5 分钟。让我们看看有多少用户会遇到这种情况。

有多少百分比的用户在不到 2 秒、5 秒、60 秒等时间内获得 Worker?

from scipy import stats

stats.percentileofscore(pending_pods, 2.01)
59.70873786407767
stats.percentileofscore(pending_pods, 5.01)
72.00647249190939
stats.percentileofscore(pending_pods, 60.01)
91.10032362459548

好的,这看起来很合理。将近 75% 的用户在不到 5 秒内获得集群,超过 90% 的用户在不到一分钟内获得集群。但是,如果您是另外 10% 的用户,可能需要等待 5 分钟。

让我们将这些数据分桶,以便直观地查看启动时间的分布。

ax = pending_pods.hist(bins=range(0, 600, 30))
ax.set_title("Dask Worker Pod wait times")
ax.set_xlabel("Seconds")
ax.set_ylabel("Pods")
Text(0, 0.5, 'Pods')
../../../_images/cb4518978697fbadfb789dd0bcd8fcc9545f47b04f716994df629617f10192c1.png
ax = pending_pods.hist(bins=range(0, 60, 2))
ax.set_title("Dask Worker Pod wait times (First minute)")
ax.set_xlabel("Seconds")
ax.set_ylabel("Pods")
Text(0, 0.5, 'Pods')
../../../_images/ef5611bf4440096e680ecdeab85d730669a282ef604914eaf2a9eb50998e2273.png

在这里我们可以清楚地看到,大多数用户在不到 5 秒内就成功调度了他们的 Worker Pod。

集群伸缩和效率#

好的,所以我们的用户能够快速获得集群,这是因为 Kubernetes 集群中存在一些暖容量,他们可以获取这些容量。当达到上限时,GKE 会自动伸缩以添加新节点。当需求暂时下降时,容量会再次释放以节省成本。

让我们查询以查看运行期间有多少节点,并将其与正在运行的 GPU Pod 数量结合起来,看看我们资源的利用效率如何。

running_pods = p.query_range(
    'kube_pod_status_phase{phase=~"Running|ContainerCreating",namespace="default"}',
    start_time,
    end_time,
    "1s",
)
running_pods = running_pods[
    running_pods.columns.drop(list(running_pods.filter(regex="prepull")))
]
nodes = p.query_range("count(kube_node_info)", start_time, end_time, "1s")
nodes.columns = ["Available GPUs"]
nodes["Available GPUs"] = (
    nodes["Available GPUs"] * 2
)  # We know our nodes each had 2 GPUs
nodes["Utilized GPUs"] = running_pods.sum(axis=1)
nodes.plot()
<AxesSubplot: >
../../../_images/76df8517c91f33fb0b85d111c482c0ec9fabbdd46af4a4d8659698b41c98186b.png

太棒了,所以我们可以看到随着我们的工作负载需求变化,集群在添加和移除节点。橙色线和蓝色线之间的空间是我们的暖容量。理想情况下,我们希望这个间隔尽可能小。让我们计算一下这个差距是多少。

我们的用户总共使用了多少 GPU 小时?

gpu_hours_utilized = nodes["Utilized GPUs"].sum() / 60 / 60
gpu_hours_utilized
15.208055555555555

我们总共支付了多少 GPU 小时的费用?

gpu_hours_cost = nodes["Available GPUs"].sum() / 60 / 60
gpu_hours_cost
23.938333333333333

开销是多少?

overhead = (1 - (gpu_hours_utilized / gpu_hours_cost)) * 100
str(int(overhead)) + "% overhead"
'36% overhead'

好的,还不错,在我们的交互式集群中,我们实现了 64% 的 GPU 资源利用率。与用户交互式使用长时间运行的工作站和集群的非自动伸缩工作负载相比,这非常棒。

如果我们衡量运行时间更长的批处理工作负载,我们会看到利用率攀升得更高。

总结思考#

通过在许多用户之间共享一个 Kubernetes 集群,这些用户都启动许多短暂的 Dask 集群来执行他们的工作,我们能够在成本与用户时间之间取得平衡。在多租户模型中,单个用户需求的峰值会随着时间被平滑,而一天中的总体峰值和谷值则由 Kubernetes 集群自动伸缩器来应对。

我们为用户创造了响应迅速的体验,他们通常在几秒钟内就能获得 Dask 集群。我们还成功地达到了集群中 GPU 64% 的利用率,这对于一个交互式集群来说是一个非常可观的数字。

还有更多我们可以调整来提高利用率的地方,但这其中也存在一些权衡。如果我们更积极地缩减规模,那么我们将不得不更频繁地进行扩容,导致更多用户等待集群的时间更长。

我们还可以看到,在节点启动和工作负载运行之间存在一些未使用的容量。这是进行镜像拉取、驱动程序安装等操作的时间。我们肯定可以做一些事情来改进这一点,以便节点在启动后立即准备就绪。

与每个用户为其单独的工作负载启动专用节点,并每次都支付驱动程序安装和环境拉取所需的等待时间及额外开销相比,我们正在有效汇集资源并重复利用我们的容量。

清理#

最后,要清理所有内容,我们可以在本地运行以下命令删除我们的 GKE 集群。

! gcloud container clusters delete multi-tenant-rapids --region us-central1 --quiet
Deleting cluster multi-tenant-rapids...done.                                   
Deleted [https://container.googleapis.com/v1/projects/nv-ai-infra/zones/us-central1/clusters/multi-tenant-rapids].