EC2 集群 (通过 Dask)#

要在 AWS EC2 上启动多节点集群,我们建议您使用 Dask Cloud Provider,它是 Dask 的原生云集成。它可以帮助管理不同云平台上的 Dask 集群。

本地环境设置#

在运行这些说明之前,请确保您已安装 RAPIDS。

注意

这种部署 RAPIDS 的方法使您能够有效地从当前节点扩展到 EC2 虚拟机集群。但请注意,这要求您所处的环境支持 RAPIDS 并配备 GPU。

如果您使用的是配备 NVIDIA GPU 的机器,请遵循本地安装说明。如果您本地没有 GPU,可以考虑使用远程环境,例如SageMaker Notebook 实例

安装 AWS CLI#

遵循官方说明安装 AWS CLI 工具。

安装 Dask Cloud Provider#

同时安装 dask-cloudprovider 并确保选择了 aws 可选扩展项。

$ pip install "dask-cloudprovider[aws]"

集群设置#

我们现在将从 Dask Cloud Provider 设置 EC2Cluster

为此,您首先需要运行 aws configure 并确保凭据已更新。了解有关设置的更多信息。API 还需要一个安全组,该安全组允许访问端口 8786-8787 以及安全组中实例之间的所有流量。如果您不在此处指定安全组,dask-cloudprovider 将为您创建一个。

from dask_cloudprovider.aws import EC2Cluster

cluster = EC2Cluster(
    instance_type="g4dn.12xlarge",  # 4 T4 GPUs
    docker_image="nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12",
    worker_class="dask_cuda.CUDAWorker",
    worker_options={"rmm-managed-memory": True},
    security_groups=["<SECURITY GROUP ID>"],
    docker_args="--shm-size=256m",
    n_workers=3,
    security=False,
    availability_zone="us-east-1a",
    region="us-east-1",
)

警告

实例化此类可能需要 30 分钟以上。请参阅有关预构建 AMI 的Dask 文档以加快速度。

如果您使用非默认凭据,可能需要手动传递凭据。

这里提供了一个用于解析凭据配置文件的小工具。

import os
import configparser
import contextlib


def get_aws_credentials(*, aws_profile="default"):
    parser = configparser.RawConfigParser()
    parser.read(os.path.expanduser("~/.aws/config"))
    config = parser.items(
        f"profile {aws_profile}" if aws_profile != "default" else "default"
    )
    parser.read(os.path.expanduser("~/.aws/credentials"))
    credentials = parser.items(aws_profile)
    all_credentials = {key.upper(): value for key, value in [*config, *credentials]}
    with contextlib.suppress(KeyError):
        all_credentials["AWS_REGION"] = all_credentials.pop("REGION")
    return all_credentials
cluster = EC2Cluster(..., env_vars=get_aws_credentials(aws_profile="foo"))

连接客户端#

集群启动后,您可以连接 Dask 客户端来提交工作。

from dask.distributed import Client

client = Client(cluster)
import cudf
import dask_cudf

df = dask_cudf.from_cudf(cudf.datasets.timeseries(), npartitions=2)
df.x.mean().compute()

清理#

当您创建集群时,Dask Cloud Provider 会注册一个终结器来关闭集群。因此,当您的 Python 进程退出时,集群将被清理。

您也可以使用以下命令显式关闭集群

client.close()
cluster.close()

相关示例#

在 AWS 上使用 dask-cloudprovider 的多节点多 GPU 示例

cloud/aws/ec2-multi library/cuml library/dask library/numpy library/dask-ml library/cudf workflow/randomforest tools/dask-cloudprovider data-format/csv data-storage/gcs

在 AWS 上使用 dask-cloudprovider 的多节点多 GPU 示例