HPC#

RAPIDS 在传统 HPC(高性能计算)环境中表现极佳,在这些环境中,GPU 通常与 InfiniBand 等加速网络硬件共存。在 HPC 上部署通常意味着使用 SLURM、LSF、PBS 等队列管理系统。

SLURM#

警告

这是一个旧页面,可能包含过时信息。我们正在努力更新文档,提供最新信息,感谢您的耐心等待。

如果您不熟悉 SLURM 或需要复习,我们推荐快速入门指南。根据您的节点配置方式,可能需要额外的设置,例如定义所需的 GPU 数量 ((--gpus)) 或每个节点的 GPU 数量 ((--gpus-per-node))。在以下示例中,我们假设每个分配都在一个 DGX1 上运行,可访问全部八个 GPU。

启动调度器#

首先,使用以下 SLURM 脚本启动调度器。此脚本及后续脚本可使用 salloc 进行交互式使用,或使用 sbatch 进行批处理运行。

#!/usr/bin/env bash

#SBATCH -J dask-scheduler
#SBATCH -n 1
#SBATCH -t 00:10:00

module load cuda/11.0.3
CONDA_ROOT=/nfs-mount/user/miniconda3
source $CONDA_ROOT/etc/profile.d/conda.sh
conda activate rapids

LOCAL_DIRECTORY=/nfs-mount/dask-local-directory
mkdir $LOCAL_DIRECTORY
CUDA_VISIBLE_DEVICES=0 dask-scheduler \
    --protocol tcp \
    --scheduler-file "$LOCAL_DIRECTORY/dask-scheduler.json" &

dask-cuda-worker \
    --rmm-pool-size 14GB \
    --scheduler-file "$LOCAL_DIRECTORY/dask-scheduler.json"

请注意,我们将调度器配置为将 scheduler-file 写入可由 NFS 访问的位置。此文件包含有关调度器的元数据,并将包括调度器的 IP 地址和端口。此文件将作为输入提供给工作节点,告知它们要连接的地址和端口。

调度器不需要独占整个节点,因此我们也可以在此节点上启动一个工作节点来填补未使用的资源。

启动 Dask CUDA 工作节点#

接下来启动其他 dask-cuda 工作节点。Dask-CUDA 扩展了传统的 Dask Worker 类,为 GPU 环境提供了特定的选项和增强功能。与调度器和客户端不同,工作节点脚本应该是可扩展的,并允许用户调整创建的工作节点数量。例如,我们可以将节点数量扩展到 3 个:sbatch/salloc -N3 dask-cuda-worker.script。在这种情况下,由于每个节点有 8 个 GPU,我们有 3 个节点,我们的作业将有 24 个工作节点。

#!/usr/bin/env bash

#SBATCH -J dask-cuda-workers
#SBATCH -t 00:10:00

module load cuda/11.0.3
CONDA_ROOT=/nfs-mount/miniconda3
source $CONDA_ROOT/etc/profile.d/conda.sh
conda activate rapids

LOCAL_DIRECTORY=/nfs-mount/dask-local-directory
mkdir $LOCAL_DIRECTORY
dask-cuda-worker \
    --rmm-pool-size 14GB \
    --scheduler-file "$LOCAL_DIRECTORY/dask-scheduler.json"

cuDF 示例工作流程#

最后,我们现在可以在建立的 Dask 集群上运行作业。

#!/usr/bin/env bash

#SBATCH -J dask-client
#SBATCH -n 1
#SBATCH -t 00:10:00

module load cuda/11.0.3
CONDA_ROOT=/nfs-mount/miniconda3
source $CONDA_ROOT/etc/profile.d/conda.sh
conda activate rapids

LOCAL_DIRECTORY=/nfs-mount/dask-local-directory

cat <<EOF >>/tmp/dask-cudf-example.py
import cudf
import dask.dataframe as dd
from dask.distributed import Client

client = Client(scheduler_file="$LOCAL_DIRECTORY/dask-scheduler.json")
cdf = cudf.datasets.timeseries()

ddf = dd.from_pandas(cdf, npartitions=10)
res = ddf.groupby(['id', 'name']).agg(['mean', 'sum', 'count']).compute()
print(res)
EOF

python /tmp/dask-cudf-example.py

确认输出#

将上述步骤组合起来,将得到以下输出

                      x                          y
                   mean        sum count      mean        sum count
id   name
1077 Laura     0.028305   1.868120    66 -0.098905  -6.527731    66
1026 Frank     0.001536   1.414839   921 -0.017223 -15.862306   921
1082 Patricia  0.072045   3.602228    50  0.081853   4.092667    50
1007 Wendy     0.009837  11.676199  1187  0.022978  27.275216  1187
976  Wendy    -0.003663  -3.267674   892  0.008262   7.369577   892
...                 ...        ...   ...       ...        ...   ...
912  Michael   0.012409   0.459119    37  0.002528   0.093520    37
1103 Ingrid   -0.132714  -1.327142    10  0.108364   1.083638    10
998  Tim       0.000587   0.747745  1273  0.001777   2.262094  1273
941  Yvonne    0.050258  11.358393   226  0.080584  18.212019   226
900  Michael  -0.134216  -1.073729     8  0.008701   0.069610     8

[6449 rows x 6 columns]