欢迎使用 Dask cuDF 文档!#

Dask cuDF (pronounced “DASK KOO-dee-eff”) 是 Dask 并行计算框架的扩展库。安装后,Dask cuDF 会自动注册为 Dask DataFrame"cudf" dataframe 后端。

注意

Dask cuDF 和 Dask DataFrame 本身都不支持多 GPU 或多节点执行。您还必须部署 dask.distributed 集群才能利用多个 GPU。我们强烈建议使用 Dask-CUDA 来简化集群设置,从而充分利用 GPU 和网络硬件的所有功能。

如果您熟悉 Dask 以及 pandascuDF,那么您应该会觉得 Dask cuDF 很熟悉。如果不是,我们建议先阅读 10 分钟了解 Dask,然后阅读 10 分钟了解 cuDF 和 Dask cuDF

阅读以下章节后,请参阅 最佳实践 页面以获取有关有效使用 Dask cuDF 的进一步指导。

使用 Dask cuDF#

显式的 Dask cuDF API#

除了为 Dask DataFrame 提供 "cudf" 后端外,Dask cuDF 还提供了一个显式的 dask_cudf API

import dask_cudf

# This always gives us a cuDF-backed dataframe
df = dask_cudf.read_parquet("data.parquet", ...)

当启用 "cudf" 后端时,此 API 会被 Dask DataFrame API 隐式使用。因此,直接使用它不会比 CPU/GPU 可移植的 dask.dataframe API 提供任何性能优势。此外,使用显式 API 的某些部分与自动查询规划不兼容(请参见下一节)。

查询规划#

Dask cuDF 现在默认提供自动查询规划(RAPIDS 24.06+)。只要在首次导入 dask.dataframe 时将 "dataframe.query-planning" 配置设置为 True(默认值),底层就会使用 Dask Expressions

例如,计算结果时,以下代码将自动受益于谓词下推

df = dd.read_parquet("/my/parquet/dataset/")
result = df.sort_values('B')['A']

未优化的表达式图 (df.pprint())

Projection: columns='A'
  SortValues: by=['B'] shuffle_method='tasks' options={}
    ReadParquetFSSpec: path='/my/parquet/dataset/' ...

简化的表达式图 (df.simplify().pprint())

Projection: columns='A'
  SortValues: by=['B'] shuffle_method='tasks' options={}
    ReadParquetFSSpec: path='/my/parquet/dataset/' columns=['A', 'B'] ...

注意

当结果转换为任务图时(通过 dask.compute()dask.persist()),Dask 将自动简化表达式图(在 dask.optimize() 内部)。您无需自己优化或简化图。

使用多个 GPU 和多个节点#

只要可能,Dask cuDF(即 Dask DataFrame)将自动尝试将您的数据分区成足够小的任务,以便轻松放入单个 GPU 的内存中。这意味着计算查询所需的计算任务通常可以流式传输到单个 GPU 进程进行核外计算。这也意味着计算任务可以在多 GPU 集群上并行执行。

为了在多个 GPU 上执行您的 Dask 工作流,您通常需要使用 Dask-CUDA 来部署分布式 Dask 集群,以及 Distributed 来定义客户端对象。例如

from dask_cuda import LocalCUDACluster
from distributed import Client

if __name__ == "__main__":

  client = Client(
    LocalCUDACluster(
      CUDA_VISIBLE_DEVICES="0,1",  # Use two workers (on devices 0 and 1)
      rmm_pool_size=0.9,  # Use 90% of GPU memory as a pool for faster allocations
      enable_cudf_spill=True,  # Improve device memory stability
      local_directory="/fast/scratch/",  # Use fast local storage for spilling
    )
  )

  df = dd.read_parquet("/my/parquet/dataset/")
  agg = df.groupby('B').sum()
  agg.compute()  # This will use the cluster defined above

注意

此示例使用 dask.compute() 将具体的 cudf.DataFrame 对象实例化到本地内存中。切勿在无法轻松放入单个 GPU 内存的大型集合上调用 dask.compute()!有关更多详细信息,请参阅 Dask 的 关于管理计算的文档

请参阅 Dask-CUDA 文档,了解有关部署 GPU 感知集群的更多信息(包括 最佳实践)。

API 参考#

一般来说,Dask cuDF 尝试提供与 Dask DataFrame 完全相同的 API。然而,仍有一些细微差别,主要是因为 cuDF 并未 完美镜像 pandas API,或者因为 cuDF 提供了额外的配置标志(这些差异主要出现在数据读写接口中)。

因此,简单的工作流可以轻松迁移,但利用更多功能的更复杂工作流可能需要进行一些调整。API 文档详细描述了这些差异以及 Dask cuDF 支持的所有功能。

索引和表格#