欢迎使用 Dask cuDF 文档!#
Dask cuDF (pronounced “DASK KOO-dee-eff”) 是 Dask 并行计算框架的扩展库。安装后,Dask cuDF 会自动注册为 Dask DataFrame 的 "cudf"
dataframe 后端。
注意
Dask cuDF 和 Dask DataFrame 本身都不支持多 GPU 或多节点执行。您还必须部署 dask.distributed 集群才能利用多个 GPU。我们强烈建议使用 Dask-CUDA 来简化集群设置,从而充分利用 GPU 和网络硬件的所有功能。
如果您熟悉 Dask 以及 pandas 或 cuDF,那么您应该会觉得 Dask cuDF 很熟悉。如果不是,我们建议先阅读 10 分钟了解 Dask,然后阅读 10 分钟了解 cuDF 和 Dask cuDF。
阅读以下章节后,请参阅 最佳实践 页面以获取有关有效使用 Dask cuDF 的进一步指导。
使用 Dask cuDF#
Dask DataFrame API(推荐)#
只需使用 Dask 配置 系统将 "dataframe.backend"
选项设置为 "cudf"
。在 Python 中,可以通过以下方式实现
import dask
dask.config.set({"dataframe.backend": "cudf"})
或者,您可以在运行您的代码之前在环境变量中设置 DASK_DATAFRAME__BACKEND=cudf
。
完成此操作后,当使用以下任何 dask.dataframe
函数从磁盘格式创建新的 DataFrame 集合时,公共的 Dask DataFrame API 将自动利用 cudf
。
例如
import dask.dataframe as dd
# By default, we obtain a pandas-backed dataframe
df = dd.read_parquet("data.parquet", ...)
import dask
dask.config.set({"dataframe.backend": "cudf"})
# This now gives us a cuDF-backed dataframe
df = dd.read_parquet("data.parquet", ...)
当使用其他函数创建新的集合时(例如 dask.dataframe.from_map()
, dask.dataframe.from_pandas()
, dask.dataframe.from_delayed()
, 以及 dask.dataframe.from_array()
),新集合的后端将取决于这些函数的输入。例如
import pandas as pd
import cudf
# This gives us a pandas-backed dataframe
dd.from_pandas(pd.DataFrame({"a": range(10)}))
# This gives us a cuDF-backed dataframe
dd.from_pandas(cudf.DataFrame({"a": range(10)}))
现有集合总是可以通过 dask.dataframe.DataFrame.to_backend()
API 移动到特定的后端
# This ensures that we have a cuDF-backed dataframe
df = df.to_backend("cudf")
# This ensures that we have a pandas-backed dataframe
df = df.to_backend("pandas")
显式的 Dask cuDF API#
除了为 Dask DataFrame 提供 "cudf"
后端外,Dask cuDF 还提供了一个显式的 dask_cudf
API
import dask_cudf
# This always gives us a cuDF-backed dataframe
df = dask_cudf.read_parquet("data.parquet", ...)
当启用 "cudf"
后端时,此 API 会被 Dask DataFrame API 隐式使用。因此,直接使用它不会比 CPU/GPU 可移植的 dask.dataframe
API 提供任何性能优势。此外,使用显式 API 的某些部分与自动查询规划不兼容(请参见下一节)。
查询规划#
Dask cuDF 现在默认提供自动查询规划(RAPIDS 24.06+)。只要在首次导入 dask.dataframe
时将 "dataframe.query-planning"
配置设置为 True
(默认值),底层就会使用 Dask Expressions。
例如,计算结果时,以下代码将自动受益于谓词下推
df = dd.read_parquet("/my/parquet/dataset/")
result = df.sort_values('B')['A']
未优化的表达式图 (df.pprint()
)
Projection: columns='A'
SortValues: by=['B'] shuffle_method='tasks' options={}
ReadParquetFSSpec: path='/my/parquet/dataset/' ...
简化的表达式图 (df.simplify().pprint()
)
Projection: columns='A'
SortValues: by=['B'] shuffle_method='tasks' options={}
ReadParquetFSSpec: path='/my/parquet/dataset/' columns=['A', 'B'] ...
注意
当结果转换为任务图时(通过 dask.compute()
或 dask.persist()
),Dask 将自动简化表达式图(在 dask.optimize()
内部)。您无需自己优化或简化图。
使用多个 GPU 和多个节点#
只要可能,Dask cuDF(即 Dask DataFrame)将自动尝试将您的数据分区成足够小的任务,以便轻松放入单个 GPU 的内存中。这意味着计算查询所需的计算任务通常可以流式传输到单个 GPU 进程进行核外计算。这也意味着计算任务可以在多 GPU 集群上并行执行。
为了在多个 GPU 上执行您的 Dask 工作流,您通常需要使用 Dask-CUDA 来部署分布式 Dask 集群,以及 Distributed 来定义客户端对象。例如
from dask_cuda import LocalCUDACluster
from distributed import Client
if __name__ == "__main__":
client = Client(
LocalCUDACluster(
CUDA_VISIBLE_DEVICES="0,1", # Use two workers (on devices 0 and 1)
rmm_pool_size=0.9, # Use 90% of GPU memory as a pool for faster allocations
enable_cudf_spill=True, # Improve device memory stability
local_directory="/fast/scratch/", # Use fast local storage for spilling
)
)
df = dd.read_parquet("/my/parquet/dataset/")
agg = df.groupby('B').sum()
agg.compute() # This will use the cluster defined above
注意
此示例使用 dask.compute()
将具体的 cudf.DataFrame
对象实例化到本地内存中。切勿在无法轻松放入单个 GPU 内存的大型集合上调用 dask.compute()
!有关更多详细信息,请参阅 Dask 的 关于管理计算的文档。
API 参考#
一般来说,Dask cuDF 尝试提供与 Dask DataFrame 完全相同的 API。然而,仍有一些细微差别,主要是因为 cuDF 并未 完美镜像 pandas API,或者因为 cuDF 提供了额外的配置标志(这些差异主要出现在数据读写接口中)。
因此,简单的工作流可以轻松迁移,但利用更多功能的更复杂工作流可能需要进行一些调整。API 文档详细描述了这些差异以及 Dask cuDF 支持的所有功能。