EC2 集群 (通过 Dask)#
要在 AWS EC2 上启动多节点集群,我们建议您使用 Dask Cloud Provider,它是 Dask 的原生云集成。它可以帮助管理不同云平台上的 Dask 集群。
本地环境设置#
在运行这些说明之前,请确保您已安装 RAPIDS。
注意
这种部署 RAPIDS 的方法使您能够有效地从当前节点扩展到 EC2 虚拟机集群。但请注意,这要求您所处的环境支持 RAPIDS 并配备 GPU。
如果您使用的是配备 NVIDIA GPU 的机器,请遵循本地安装说明。如果您本地没有 GPU,可以考虑使用远程环境,例如SageMaker Notebook 实例。
安装 AWS CLI#
遵循官方说明安装 AWS CLI 工具。
安装 Dask Cloud Provider#
同时安装 dask-cloudprovider
并确保选择了 aws
可选扩展项。
$ pip install "dask-cloudprovider[aws]"
集群设置#
我们现在将从 Dask Cloud Provider 设置 EC2Cluster。
为此,您首先需要运行 aws configure
并确保凭据已更新。了解有关设置的更多信息。API 还需要一个安全组,该安全组允许访问端口 8786-8787 以及安全组中实例之间的所有流量。如果您不在此处指定安全组,dask-cloudprovider
将为您创建一个。
from dask_cloudprovider.aws import EC2Cluster cluster = EC2Cluster( instance_type="g4dn.12xlarge", # 4 T4 GPUs docker_image="nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12", worker_class="dask_cuda.CUDAWorker", worker_options={"rmm-managed-memory": True}, security_groups=["<SECURITY GROUP ID>"], docker_args="--shm-size=256m", n_workers=3, security=False, availability_zone="us-east-1a", region="us-east-1", )
警告
实例化此类可能需要 30 分钟以上。请参阅有关预构建 AMI 的Dask 文档以加快速度。
如果您使用非默认凭据,可能需要手动传递凭据。
这里提供了一个用于解析凭据配置文件的小工具。
import os
import configparser
import contextlib
def get_aws_credentials(*, aws_profile="default"):
parser = configparser.RawConfigParser()
parser.read(os.path.expanduser("~/.aws/config"))
config = parser.items(
f"profile {aws_profile}" if aws_profile != "default" else "default"
)
parser.read(os.path.expanduser("~/.aws/credentials"))
credentials = parser.items(aws_profile)
all_credentials = {key.upper(): value for key, value in [*config, *credentials]}
with contextlib.suppress(KeyError):
all_credentials["AWS_REGION"] = all_credentials.pop("REGION")
return all_credentials
cluster = EC2Cluster(..., env_vars=get_aws_credentials(aws_profile="foo"))
连接客户端#
集群启动后,您可以连接 Dask 客户端来提交工作。
from dask.distributed import Client
client = Client(cluster)
import cudf
import dask_cudf
df = dask_cudf.from_cudf(cudf.datasets.timeseries(), npartitions=2)
df.x.mean().compute()
清理#
当您创建集群时,Dask Cloud Provider 会注册一个终结器来关闭集群。因此,当您的 Python 进程退出时,集群将被清理。
您也可以使用以下命令显式关闭集群
client.close()
cluster.close()