Dask cuDF 最佳实践#

本页面概述了有效使用 Dask cuDF 的几项重要指南。

注意

由于 Dask cuDF 是 Dask DataFrame 的后端扩展,Dask DataFrames 最佳实践 文档中讨论的指南也适用于 Dask cuDF(不包括任何特定于 pandas 的详细信息)。

部署和配置#

使用 Dask-CUDA#

要在多个 GPU 上执行 Dask 工作流,必须使用 Dask-CUDADask.distributed 部署 Dask 集群。

在单机上运行时,强烈建议使用 LocalCUDACluster 便利函数。无论机器上有多少个 GPU(即使只有一个!),使用 Dask-CUDA 都比默认(线程)执行有许多优势。列举几个优势:

  • Dask-CUDA 可以轻松地将 Worker 绑定到特定设备。

  • Dask-CUDA 可以轻松配置内存溢出选项。

  • 分布式调度程序收集有用的诊断信息,可以在仪表板上实时查看。

有关详细信息,请参阅 Dask-CUDA 的 API最佳实践 文档。Dask cuDF 的 文档的多 GPU 部分也演示了典型的 LocalCUDACluster 用法。

注意

在云基础设施或 HPC 系统上运行时,最好利用特定于系统的部署库,例如 Dask OperatorDask-Jobqueue

有关更多详细信息和示例,请参阅 RAPIDS 部署文档

使用诊断工具#

Dask 生态系统包含几个您绝对应该使用的诊断工具。这些工具包括直观的 浏览器仪表板 以及用于收集 性能配置文件 的专用 API。

无论工作流如何,强烈建议使用仪表板。它提供了 Worker 资源和计算进度的可视化表示。它还显示基本的 GPU 内存和利用率指标(在 GPU 标签下)。要在 JupyterLab 中可视化更详细的 GPU 指标,请使用 NVDashboard

启用 cuDF 溢出#

在使用 Dask cuDF 处理经典的 ETL 工作负载时,最好启用 cuDF 中的原生溢出支持。使用 dask_cuda.LocalCUDACluster 时,通过设置 enable_cudf_spill=True 可以轻松实现这一点。

当 Dask cuDF 工作流包含 DataFrame 和 Array 表示形式之间的转换时,原生 cuDF 溢出可能不足。对于这些情况,JIT-unspill 可能更能有效防止内存不足 (OOM) 错误。有关详细信息和指导,请参阅 Dask-CUDA 的溢出文档

使用 RMM#

当在 Worker 进程上正确配置了 RAPIDS 内存管理器 (RMM) 库时,cuDF 中的内存分配会更快、更高效。在大多数情况下,管理内存的最佳方法是在执行工作流之前在每个 Worker 上初始化一个 RMM 池。使用 dask_cuda.LocalCUDACluster 时,通过将 rmm_pool_size 设置为一个较大的比例(例如 0.9)可以轻松实现这一点。

有关更多详细信息,请参阅 Dask-CUDA 内存管理文档

使用 Dask DataFrame API#

尽管 Dask cuDF 提供了公共的 dask_cudf Python 模块,但我们强烈建议您使用 CPU/GPU 可移植的 dask.dataframe API。只需 使用 Dask 配置系统"dataframe.backend" 选项设置为 "cudf"dask_cudf 模块将被隐式导入和使用。

如果您需要在不同的 DataFrame 后端之间转换,请务必使用 dask.dataframe.DataFrame.to_backend() 方法。例如:

df = df.to_backend("pandas")  # This gives us a pandas-backed collection

注意

尽管 dask.dataframe.DataFrame.to_backend() 使得在 pandas 和 cuDF 之间移动数据变得容易,但重复的 CPU-GPU 数据移动会显著降低性能。为了获得最佳结果,请尽量将数据保留在 GPU 上。

避免急切执行#

虽然 Dask DataFrame 集合默认是惰性的,但有几个值得注意的方法会导致底层任务图立即执行:

dask.dataframe.DataFrame.compute(): 调用 ddf.compute() 将具体化 ddf 的结果并返回一个单一的 cuDF 对象。这是通过执行与 ddf 关联的整个任务图并将其分区连接到客户端进程的本地内存中来实现的。

注意

切勿对无法舒适地放入单个 GPU 内存中的大型集合调用 dask.dataframe.DataFrame.compute()

dask.dataframe.DataFrame.persist(): 与 dask.dataframe.DataFrame.compute() 一样,调用 ddf.persist() 将执行与 ddf 关联的整个任务图。最重要的区别是,计算出的分区将保留在分布式 Worker 内存中,而不是在客户端进程上连接在一起。另一个区别是,在分布式集群上执行时,dask.dataframe.DataFrame.persist() 将立即返回。如果您需要在工作流中设置一个阻塞同步点,只需使用 distributed.wait() 函数:

ddf = ddf.persist()
wait(ddf)

注意

避免对无法舒适地放入全局 Worker 内存中的大型集合调用 dask.dataframe.DataFrame.persist()。如果分区总和大小大于所有 GPU 内存总和,调用 persist 将导致从设备内存大量溢出。如果单个分区大小较大,这可能会导致 OOM 错误。

len() / dask.dataframe.DataFrame.head() / dask.dataframe.DataFrame.tail(): 尽管这些操作经常在 pandas/cuDF 代码中用于快速检查数据,但在 Dask DataFrame 中最好避免它们。在大多数情况下,这些操作将执行部分或全部底层任务图以具体化集合。

dask.dataframe.DataFrame.sort_values() / dask.dataframe.DataFrame.set_index() : 这些操作都要求 Dask 急切地收集关于全局排序操作所针对的列的分位数信息。有关排序注意事项,请参阅下一节。

注意

使用 dask.dataframe.DataFrame.set_index() 时,如果全局集合不 需要 按新索引排序,请务必传入 sort=False

避免排序#

Dask DataFrame 的设计 使其更有利于处理创建时已按其索引排序的数据。对于大多数其他情况,除非工作流的逻辑使得全局排序绝对必要,否则最好避免排序。

如果 dask.dataframe.DataFrame.sort_values() 操作的目的是确保 by 中的所有唯一值都被移动到同一个输出分区,那么 shuffle 通常是更好的选择。

读取数据#

调整分区大小#

理想的分区大小通常在单个 GPU 内存容量的 1/32 到 1/8 之间。增加分区大小通常会减少工作流中的任务数量并提高每个任务的 GPU 利用率。但是,如果分区太大,OOM 错误的风险会显著增加。

注意

作为一般经验法则,对于 shuffle 密集型工作流(例如大型排序和连接),从 1/32-1/16 开始,否则从 1/16-1/8 开始。对于病态倾斜的数据分布,可能需要目标瞄准 1/64 或更小。这个经验法则来自轶事优化和 OOM 调试经验。由于每个工作流都不同,选择最佳分区大小既是艺术也是科学。

调整分区大小的最简单方法是在第一次由 dask.dataframe.read_parquet()dask.dataframe.read_csv()dask.dataframe.from_map() 等函数创建 DataFrame 集合时。例如,dask.dataframe.read_parquet()dask.dataframe.read_csv() 都公开了 blocksize 参数来调整最大分区大小。

如果在创建时无法有效调整分区大小,可以使用 repartition 方法作为最后手段。

使用 Parquet#

Parquet 是 Dask cuDF 推荐的文件格式。它提供高效的列式存储,并使 Dask 能够执行有价值的查询优化,例如列投影和谓词下推。

dask.dataframe.read_parquet() 最重要的参数是 blocksizeaggregate_files

blocksize: 使用此参数指定最大分区大小。默认值为 “256 MiB”,但在内存大于 8 GiB 的 GPU 上,较大的值通常性能更好。Dask 将使用 blocksize 值将离散数量的 Parquet 行组(或文件)映射到每个输出分区。此映射仅考虑每个行组的未压缩存储大小,这通常小于相应的 cudf.DataFrame

aggregate_files: 使用此参数指定 Dask 是否应将多个文件映射到同一个 DataFrame 分区。默认值为 False,但当数据集包含许多小于 blocksize 一半的文件时,aggregate_files=True 通常性能更好。

如果您知道文件在拆分或聚合之前对应于合理的分区大小,请设置 blocksize=None 以禁止文件拆分。在没有列投影下推的情况下,这将导致文件和输出分区之间建立简单的 1 对 1 映射。

注意

如果您的工作流需要文件和分区之间严格的 1 对 1 映射,请使用 dask.dataframe.from_map() 手动构建您的分区,使用 cudf.read_parquet。使用 dask.dataframe.read_parquet() 时,查询规划优化可能会自动将不同文件聚合到同一个分区中(即使 aggregate_files=False)。

注意

从远程存储(例如 S3 和 GCS)读取时,元数据收集可能会非常慢。读取许多对应于合理分区大小的远程文件时,使用 blocksize=None 可以避免不必要的元数据收集。

注意

从远程存储(例如 S3 和 GCS)读取时,使用 filesystem="arrow" 可能会提高性能。设置此选项时,PyArrow 将用于在多个 CPU 线程上执行 IO。请注意,此功能是实验性的,未来的行为可能会发生变化(不会弃用)。使用此功能时,不要传入 blocksizeaggregate_files。相反,设置 "dataframe.parquet.minimum-partition-size" 配置来控制文件聚合。

使用 dask.dataframe.from_map()#

要实现现有 API(例如 dask.dataframe.read_parquet())未涵盖的自定义 DataFrame 创建逻辑,请尽可能使用 dask.dataframe.from_map()dask.dataframe.from_map() API 比 dask.dataframe.from_delayed() 有几个优点:

  • 它允许您的自定义逻辑进行适当的惰性执行。

  • 它支持列投影(只要映射函数支持 columns 关键字参数)。

有关更多详细信息,请参阅 from_map API 文档

注意

尽可能务必为 dask.dataframe.from_map() 指定 meta 参数。如果省略此参数,Dask 将需要急切地具体化第一个分区。如果在第一个可见设备上使用了较大的 RMM 池,客户端上的急切执行可能会导致 OOM 错误。

排序、连接和分组#

排序、连接和分组操作都有可能需要数据在不同分区之间进行全局 shuffle。当初始数据可以舒适地放入全局 GPU 内存时,这些“全对全”操作通常受限于 Worker 到 Worker 之间的通信。当数据大于全局 GPU 内存时,瓶颈通常是设备到主机内存的溢出。

尽管每个工作流都不同,但通常建议遵循以下指南:

  • 使用具有 Dask-CUDA Worker 的分布式集群。

  • 尽可能使用原生 cuDF 溢出(Dask-CUDA 溢出文档)。

  • 尽可能避免 shuffle。
    • 对于低基数 groupby 聚合,使用 split_out=1

    • 当至少有一个集合包含少量分区(例如 <=5)时,对于连接,使用 broadcast=True

  • 如果通信是瓶颈,请使用 UCX

注意

UCX 使 Dask-CUDA Worker 能够使用高性能传输技术进行通信,例如 NVLink 和 Infiniband。没有 UCX,进程间通信将依赖于 TCP 套接字。

用户自定义函数#

大多数实际的 Dask DataFrame 工作流使用 map_partitions 将用户自定义函数映射到基础数据的每个分区。此 API 是以直观且可扩展的方式应用自定义操作的绝佳方法。话虽如此,dask.dataframe.DataFrame.map_partitions() 方法将生成一个不透明的 DataFrame 表达式,阻止查询规划优化器执行有用的优化(例如投影和过滤器下推)。

由于列投影下推通常是最有效的优化,因此在调用 dask.dataframe.DataFrame.map_partitions() 之前和之后选择必要的列非常重要。您还可以添加显式过滤操作以进一步减轻过滤器下推的损失。