cuDF 和 Dask cuDF 十分钟入门#

本文以《Pandas 十分钟入门》为蓝本,旨在为新用户提供一份 cuDF 和 Dask cuDF 的简短介绍。

这些库是什么?#

cuDF 是一个 Python GPU DataFrame 库(基于 Apache Arrow 列式内存格式),用于使用 DataFrame 风格的 API(类似于 pandas)加载、连接、聚合、过滤和操作表格数据。

Dask 是一个灵活的 Python 并行计算库,可使您的工作流程平滑简单地进行扩展。在 CPU 上,Dask 使用 Pandas 在 DataFrame 分区上并行执行操作。

Dask cuDF 在必要时扩展了 Dask,以允许使用 cuDF GPU DataFrame 而非 Pandas DataFrame 处理其 DataFrame 分区。例如,当您调用 dask_cudf.read_csv(...) 时,您的集群的 GPU 会通过调用 cudf.read_csv() 来完成解析 CSV 文件的工作。

注意:本 notebook 为了清晰起见使用了显式的 Dask cuDF API (dask_cudf)。但是,我们强烈建议您使用 Dask 的配置基础结构将 “dataframe.backend” 选项设置为 “cudf”,并直接使用 Dask DataFrame API。请参阅 Dask cuDF 文档了解更多信息。

何时使用 cuDF 和 Dask cuDF#

如果您的工作流程在单个 GPU 上足够快,或者您的数据能轻松放入单个 GPU 的内存中,您会希望使用 cuDF。如果您想将工作流程分布到多个 GPU 上,数据量超出单个 GPU 内存容量,或者想同时分析分散在许多文件中的数据,您会希望使用 Dask cuDF。

import os

import cudf
import cupy as cp
import dask_cudf
import pandas as pd

cp.random.seed(12)

#### Portions of this were borrowed and adapted from the
#### cuDF cheatsheet, existing cuDF documentation,
#### and 10 Minutes to Pandas.

对象创建#

创建 cudf.Seriesdask_cudf.Series

s = cudf.Series([1, 2, 3, None, 4])
s
0       1
1       2
2       3
3    <NA>
4       4
dtype: int64
ds = dask_cudf.from_cudf(s, npartitions=2)
# Note the call to head here to show the first few entries, unlike
# cuDF objects, Dask-cuDF objects do not have a printing
# representation that shows values since they may not be in local
# memory.
ds.head(n=3)
0    1
1    2
2    3
dtype: int64

通过为每列指定值来创建 cudf.DataFramedask_cudf.DataFrame

df = cudf.DataFrame(
    {
        "a": list(range(20)),
        "b": list(reversed(range(20))),
        "c": list(range(20)),
    }
)
df
a b c
0 0 19 0
1 1 18 1
2 2 17 2
3 3 16 3
4 4 15 4
5 5 14 5
6 6 13 6
7 7 12 7
8 8 11 8
9 9 10 9
10 10 9 10
11 11 8 11
12 12 7 12
13 13 6 13
14 14 5 14
15 15 4 15
16 16 3 16
17 17 2 17
18 18 1 18
19 19 0 19

现在我们将 cuDF dataframe 转换为等效的 Dask-cuDF。这里我们指出一个关键区别:要查看数据,我们必须调用一个方法(这里是 .head() 来查看前几个值)。在一般情况下(参见本 notebook 结尾),ddf 中的数据将分布在多个 GPU 上。

在这种小案例中,我们可以调用 ddf.compute() 从 Dask-cuDF 对象获取一个 cuDF 对象。通常,我们应避免在大 dataframe 上调用 .compute(),仅在我们有相对较小的后处理结果需要查看时使用它。因此,在本 notebook 中,我们通常会调用 .head() 来查看 Dask-cuDF dataframe 的前几个值,偶尔会指出我们使用 .compute() 的地方及其原因。

要了解 cuDF 和 Dask cuDF 在这里的行为差异,请在本教程之后访问《Dask 十分钟入门》教程。

ddf = dask_cudf.from_cudf(df, npartitions=2)
ddf.head()
a b c
0 0 19 0
1 1 18 1
2 2 17 2
3 3 16 3
4 4 15 4

从 pandas Dataframe 创建 cudf.DataFrame,以及从 cudf.Dataframe 创建 dask_cudf.Dataframe

请注意,使用 dask-cuDF 的最佳实践是使用 read_csv 或其他内置 I/O 例程(下文讨论)将数据直接读取到 dask_cudf.DataFrame 中。

pdf = pd.DataFrame({"a": [0, 1, 2, 3], "b": [0.1, 0.2, None, 0.3]})
gdf = cudf.DataFrame.from_pandas(pdf)
gdf
a b
0 0 0.1
1 1 0.2
2 2 <NA>
3 3 0.3
dask_gdf = dask_cudf.from_cudf(gdf, npartitions=2)
dask_gdf.head(n=2)
a b
0 0 0.1
1 1 0.2

查看数据#

查看 GPU dataframe 的顶部行。

df.head(2)
a b c
0 0 19 0
1 1 18 1
ddf.head(2)
a b c
0 0 19 0
1 1 18 1

按值排序。

df.sort_values(by="b")
a b c
19 19 0 19
18 18 1 18
17 17 2 17
16 16 3 16
15 15 4 15
14 14 5 14
13 13 6 13
12 12 7 12
11 11 8 11
10 10 9 10
9 9 10 9
8 8 11 8
7 7 12 7
6 6 13 6
5 5 14 5
4 4 15 4
3 3 16 3
2 2 17 2
1 1 18 1
0 0 19 0
ddf.sort_values(by="b").head()
a b c
19 19 0 19
18 18 1 18
17 17 2 17
16 16 3 16
15 15 4 15

选择列#

选择单列,这将最初产生一个 cudf.Seriesdask_cudf.Series。调用 compute 会得到一个 cudf.Series(相当于 df.a)。

df["a"]
0      0
1      1
2      2
3      3
4      4
5      5
6      6
7      7
8      8
9      9
10    10
11    11
12    12
13    13
14    14
15    15
16    16
17    17
18    18
19    19
Name: a, dtype: int64
ddf["a"].head()
0    0
1    1
2    2
3    3
4    4
Name: a, dtype: int64

按标签选择行#

选择索引 2 到索引 5 的行,从列 'a' 和 'b' 中。

df.loc[2:5, ["a", "b"]]
a b
2 2 17
3 3 16
4 4 15
5 5 14
ddf.loc[2:5, ["a", "b"]].head()
a b
2 2 17
3 3 16
4 4 15
5 5 14

按位置选择行#

通过整数和整数切片进行选择,类似于 numpy/pandas。请注意,此功能不适用于 Dask-cuDF DataFrame。

df.iloc[0]
a     0
b    19
c     0
Name: 0, dtype: int64
df.iloc[0:3, 0:2]
a b
0 0 19
1 1 18
2 2 17

您也可以通过直接索引访问来选择 DataFrameSeries 的元素。

df[3:5]
a b c
3 3 16 3
4 4 15 4
s[3:5]
3    <NA>
4       4
dtype: int64

布尔索引#

通过直接布尔索引来选择 DataFrameSeries 中的行。

df[df.b > 15]
a b c
0 0 19 0
1 1 18 1
2 2 17 2
3 3 16 3
ddf[ddf.b > 15].head(n=3)
a b c
0 0 19 0
1 1 18 1
2 2 17 2

通过 query API 选择满足布尔条件的 DataFrame 中的值。

df.query("b == 3")
a b c
16 16 3 16

注意,这里我们在 Dask-cuDF dataframe 上调用 compute() 而不是 head(),因为我们确信匹配的行数会很少(因此将整个结果取回是完全合理的)。

ddf.query("b == 3").compute()
a b c
16 16 3 16

您还可以通过 local_dict 关键字将局部变量传递给 Dask-cuDF 查询。对于标准的 cuDF,您可以使用 local_dict 关键字或通过 @ 关键字直接传递变量。支持的逻辑运算符包括 >, <, >=, <=, ==!=

cudf_comparator = 3
df.query("b == @cudf_comparator")
a b c
16 16 3 16
dask_cudf_comparator = 3
ddf.query("b == @val", local_dict={"val": dask_cudf_comparator}).compute()
a b c
16 16 3 16

使用 isin 方法进行过滤。

df[df.a.isin([0, 5])]
a b c
0 0 19 0
5 5 14 5

MultiIndex#

cuDF 支持使用 MultiIndex 对 DataFrame 进行分层索引。按层级分组(参见下面的 Grouping)会自动生成带有 MultiIndex 的 DataFrame。

arrays = [["a", "a", "b", "b"], [1, 2, 3, 4]]
tuples = list(zip(*arrays))
idx = cudf.MultiIndex.from_tuples(tuples)
idx
MultiIndex([('a', 1),
            ('a', 2),
            ('b', 3),
            ('b', 4)],
           )

此索引可支持 DataFrame 的任一轴。

gdf1 = cudf.DataFrame(
    {"first": cp.random.rand(4), "second": cp.random.rand(4)}
)
gdf1.index = idx
gdf1
first second
a 1 0.082654 0.967955
2 0.399417 0.441425
b 3 0.784297 0.793582
4 0.070303 0.271711
gdf2 = cudf.DataFrame(
    {"first": cp.random.rand(4), "second": cp.random.rand(4)}
).T
gdf2.columns = idx
gdf2
a b
1 2 3 4
first 0.343382 0.003700 0.20043 0.581614
second 0.907812 0.101512 0.24179 0.224180

使用 MultiIndex 访问 DataFrame 的值,使用 .loc

gdf1.loc[("b", 3)]
first     0.784297
second    0.793582
Name: ('b', 3), dtype: float64

以及 .iloc

gdf1.iloc[0:2]
first second
a 1 0.082654 0.967955
2 0.399417 0.441425

缺失数据#

可以使用 fillna 方法替换缺失数据。

s.fillna(999)
0      1
1      2
2      3
3    999
4      4
dtype: int64
ds.fillna(999).head(n=3)
0    1
1    2
2    3
dtype: int64

统计#

计算 Series 的描述性统计信息。

s.mean(), s.var()
(np.float64(2.5), np.float64(1.6666666666666667))

这是一个典型示例,说明何时我们可能想调用 .compute()。计算均值和方差的结果在每种情况下都是一个单独的数字,因此查看整个结果是完全合理的!

ds.mean().compute(), ds.var().compute()
(np.float64(2.5), np.float64(1.6666666666666667))

Applymap#

将函数应用于 Series。请注意,使用 Dask cuDF 直接应用用户定义函数的功能尚未实现。目前,您可以使用 map_partitions 将函数应用于分布式 dataframe 的每个分区。

def add_ten(num):
    return num + 10


df["a"].apply(add_ten)
0     10
1     11
2     12
3     13
4     14
5     15
6     16
7     17
8     18
9     19
10    20
11    21
12    22
13    23
14    24
15    25
16    26
17    27
18    28
19    29
Name: a, dtype: int64
ddf["a"].map_partitions(add_ten).head()
0    10
1    11
2    12
3    13
4    14
Name: a, dtype: int64

直方图绘制#

计算变量每个唯一值的出现次数。

df.a.value_counts()
a
15    1
10    1
18    1
2     1
11    1
5     1
3     1
16    1
9     1
12    1
19    1
6     1
7     1
8     1
13    1
4     1
17    1
14    1
1     1
0     1
Name: count, dtype: int64
ddf.a.value_counts().head()
a
6     1
17    1
12    1
16    1
7     1
Name: count, dtype: int64

字符串方法#

与 pandas 类似,cuDF 在 Seriesstr 属性中提供了字符串处理方法。字符串方法的完整文档正在进行中。请参阅 cuDF API 文档了解更多信息。

s = cudf.Series(["A", "B", "C", "Aaba", "Baca", None, "CABA", "dog", "cat"])
s.str.lower()
0       a
1       b
2       c
3    aaba
4    baca
5    <NA>
6    caba
7     dog
8     cat
dtype: object
ds = dask_cudf.from_cudf(s, npartitions=2)
ds.str.lower().head(n=4)
0       a
1       b
2       c
3    aaba
dtype: object

除了简单的操作,我们还可以使用正则表达式匹配字符串。

s.str.match("^[aAc].+")
0    False
1    False
2    False
3     True
4    False
5     <NA>
6    False
7    False
8     True
dtype: bool
ds.str.match("^[aAc].+").head()
0    False
1    False
2    False
3     True
4    False
dtype: bool

Concat#

按行连接 SeriesDataFrames

s = cudf.Series([1, 2, 3, None, 5])
cudf.concat([s, s])
0       1
1       2
2       3
3    <NA>
4       5
0       1
1       2
2       3
3    <NA>
4       5
dtype: int64
ds2 = dask_cudf.from_cudf(s, npartitions=2)
dask_cudf.concat([ds2, ds2]).head(n=3)
0    1
1    2
2    3
dtype: int64

Join#

执行 SQL 风格的合并。请注意,dataframe 的顺序不会保留,但可以在合并后通过索引排序来恢复。

df_a = cudf.DataFrame()
df_a["key"] = ["a", "b", "c", "d", "e"]
df_a["vals_a"] = [float(i + 10) for i in range(5)]

df_b = cudf.DataFrame()
df_b["key"] = ["a", "c", "e"]
df_b["vals_b"] = [float(i + 100) for i in range(3)]

merged = df_a.merge(df_b, on=["key"], how="left")
merged
key vals_a vals_b
0 a 10.0 100.0
1 c 12.0 101.0
2 e 14.0 102.0
3 b 11.0 <NA>
4 d 13.0 <NA>
ddf_a = dask_cudf.from_cudf(df_a, npartitions=2)
ddf_b = dask_cudf.from_cudf(df_b, npartitions=2)

merged = ddf_a.merge(ddf_b, on=["key"], how="left").head(n=4)
merged
key vals_a vals_b
0 e 14.0 102.0
1 c 12.0 101.0
2 d 13.0 <NA>
3 b 11.0 <NA>

分组#

pandas 类似,cuDF 和 Dask-cuDF 支持 Split-Apply-Combine 分组范式

df["agg_col1"] = [1 if x % 2 == 0 else 0 for x in range(len(df))]
df["agg_col2"] = [1 if x % 3 == 0 else 0 for x in range(len(df))]

ddf = dask_cudf.from_cudf(df, npartitions=2)

分组然后将 sum 函数应用于分组数据。

df.groupby("agg_col1").sum()
a b c agg_col2
agg_col1
1 90 100 90 4
0 100 90 100 3
ddf.groupby("agg_col1").sum().compute()
a b c agg_col2
agg_col1
1 90 100 90 4
0 100 90 100 3

分层分组然后将 sum 函数应用于分组数据。

df.groupby(["agg_col1", "agg_col2"]).sum()
a b c
agg_col1 agg_col2
1 1 36 40 36
0 54 60 54
0 1 27 30 27
0 73 60 73
ddf.groupby(["agg_col1", "agg_col2"]).sum().compute()
a b c
agg_col1 agg_col2
1 1 36 40 36
0 0 73 60 73
1 27 30 27
1 0 54 60 54

使用 agg 对特定列进行分组并应用统计函数。

df.groupby("agg_col1").agg({"a": "max", "b": "mean", "c": "sum"})
a b c
agg_col1
1 18 10.0 90
0 19 9.0 100
ddf.groupby("agg_col1").agg({"a": "max", "b": "mean", "c": "sum"}).compute()
a b c
agg_col1
1 18 10.0 90
0 19 9.0 100

转置#

使用 transpose 方法或 T 属性转置 dataframe。目前,所有列必须具有相同的类型。转置功能目前尚未在 Dask cuDF 中实现。

sample = cudf.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
sample
a b
0 1 4
1 2 5
2 3 6
sample.transpose()
0 1 2
a 1 2 3
b 4 5 6

时间序列#

DataFrames 支持 datetime 类型的列,允许用户基于特定时间戳与数据交互和过滤数据。

import datetime as dt

date_df = cudf.DataFrame()
date_df["date"] = pd.date_range("11/20/2018", periods=72, freq="D")
date_df["value"] = cp.random.sample(len(date_df))

search_date = dt.datetime.strptime("2018-11-23", "%Y-%m-%d")
date_df.query("date <= @search_date")
date value
0 2018-11-20 0.986051
1 2018-11-21 0.232034
2 2018-11-22 0.397617
3 2018-11-23 0.103839
date_ddf = dask_cudf.from_cudf(date_df, npartitions=2)
date_ddf.query(
    "date <= @search_date", local_dict={"search_date": search_date}
).compute()
date value
0 2018-11-20 0.986051
1 2018-11-21 0.232034
2 2018-11-22 0.397617
3 2018-11-23 0.103839

分类数据#

DataFrames 支持分类列。

gdf = cudf.DataFrame(
    {"id": [1, 2, 3, 4, 5, 6], "grade": ["a", "b", "b", "a", "a", "e"]}
)
gdf["grade"] = gdf["grade"].astype("category")
gdf
id grade
0 1 a
1 2 b
2 3 b
3 4 a
4 5 a
5 6 e
dgdf = dask_cudf.from_cudf(gdf, npartitions=2)
dgdf.head(n=3)
id grade
0 1 a
1 2 b
2 3 b

访问列的类别。请注意,这目前在 Dask-cuDF 中不受支持。

gdf.grade.cat.categories
Index(['a', 'b', 'e'], dtype='object')

访问每个分类观测值的底层代码值。

gdf.grade.cat.codes
0    0
1    1
2    1
3    0
4    0
5    2
dtype: uint8
dgdf.grade.cat.codes.compute()
0    0
1    1
2    1
3    0
4    0
5    2
dtype: uint8

转换为 Pandas#

将 cuDF 和 Dask-cuDF DataFrame 转换为 pandas DataFrame

df.head().to_pandas()
a b c agg_col1 agg_col2
0 0 19 0 1 1
1 1 18 1 0 0
2 2 17 2 1 0
3 3 16 3 0 1
4 4 15 4 1 0

要将前几个条目转换为 pandas,我们类似地在 Dask-cuDF dataframe 上调用 .head() 以获得一个本地 cuDF dataframe,然后我们可以进行转换。

ddf.head().to_pandas()
a b c agg_col1 agg_col2
0 0 19 0 1 1
1 1 18 1 0 0
2 2 17 2 1 0
3 3 16 3 0 1
4 4 15 4 1 0

相比之下,如果我们要转换整个 frame,需要在 ddf 上调用 .compute() 以获取本地 cuDF dataframe,然后调用 to_pandas(),之后再进行后续处理。不建议采用此工作流程,因为它既会给单个 GPU 带来较高的内存压力(.compute() 调用),又没有利用 GPU 加速进行处理(计算发生在 pandas 中)。

ddf.compute().to_pandas().head()
a b c agg_col1 agg_col2
0 0 19 0 1 1
1 1 18 1 0 0
2 2 17 2 1 0
3 3 16 3 0 1
4 4 15 4 1 0

转换为 Numpy#

将 cuDF 或 Dask-cuDF DataFrame 转换为 numpy ndarray

df.to_numpy()
array([[ 0, 19,  0,  1,  1],
       [ 1, 18,  1,  0,  0],
       [ 2, 17,  2,  1,  0],
       [ 3, 16,  3,  0,  1],
       [ 4, 15,  4,  1,  0],
       [ 5, 14,  5,  0,  0],
       [ 6, 13,  6,  1,  1],
       [ 7, 12,  7,  0,  0],
       [ 8, 11,  8,  1,  0],
       [ 9, 10,  9,  0,  1],
       [10,  9, 10,  1,  0],
       [11,  8, 11,  0,  0],
       [12,  7, 12,  1,  1],
       [13,  6, 13,  0,  0],
       [14,  5, 14,  1,  0],
       [15,  4, 15,  0,  1],
       [16,  3, 16,  1,  0],
       [17,  2, 17,  0,  0],
       [18,  1, 18,  1,  1],
       [19,  0, 19,  0,  0]])
ddf.compute().to_numpy()
array([[ 0, 19,  0,  1,  1],
       [ 1, 18,  1,  0,  0],
       [ 2, 17,  2,  1,  0],
       [ 3, 16,  3,  0,  1],
       [ 4, 15,  4,  1,  0],
       [ 5, 14,  5,  0,  0],
       [ 6, 13,  6,  1,  1],
       [ 7, 12,  7,  0,  0],
       [ 8, 11,  8,  1,  0],
       [ 9, 10,  9,  0,  1],
       [10,  9, 10,  1,  0],
       [11,  8, 11,  0,  0],
       [12,  7, 12,  1,  1],
       [13,  6, 13,  0,  0],
       [14,  5, 14,  1,  0],
       [15,  4, 15,  0,  1],
       [16,  3, 16,  1,  0],
       [17,  2, 17,  0,  0],
       [18,  1, 18,  1,  1],
       [19,  0, 19,  0,  0]])

将 cuDF 或 Dask-cuDF Series 转换为 numpy ndarray

df["a"].to_numpy()
array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16,
       17, 18, 19])
ddf["a"].compute().to_numpy()
array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16,
       17, 18, 19])

转换为 Arrow#

将 cuDF 或 Dask-cuDF DataFrame 转换为 PyArrow Table

df.to_arrow()
pyarrow.Table
a: int64
b: int64
c: int64
agg_col1: int64
agg_col2: int64
----
a: [[0,1,2,3,4,...,15,16,17,18,19]]
b: [[19,18,17,16,15,...,4,3,2,1,0]]
c: [[0,1,2,3,4,...,15,16,17,18,19]]
agg_col1: [[1,0,1,0,1,...,0,1,0,1,0]]
agg_col2: [[1,0,0,1,0,...,1,0,0,1,0]]
ddf.head().to_arrow()
pyarrow.Table
a: int64
b: int64
c: int64
agg_col1: int64
agg_col2: int64
----
a: [[0,1,2,3,4]]
b: [[19,18,17,16,15]]
c: [[0,1,2,3,4]]
agg_col1: [[1,0,1,0,1]]
agg_col2: [[1,0,0,1,0]]

读/写 CSV 文件#

写入 CSV 文件。

if not os.path.exists("example_output"):
    os.mkdir("example_output")

df.to_csv("example_output/foo.csv", index=False)
ddf.compute().to_csv("example_output/foo_dask.csv", index=False)

读取 CSV 文件。

df = cudf.read_csv("example_output/foo.csv")
df
a b c agg_col1 agg_col2
0 0 19 0 1 1
1 1 18 1 0 0
2 2 17 2 1 0
3 3 16 3 0 1
4 4 15 4 1 0
5 5 14 5 0 0
6 6 13 6 1 1
7 7 12 7 0 0
8 8 11 8 1 0
9 9 10 9 0 1
10 10 9 10 1 0
11 11 8 11 0 0
12 12 7 12 1 1
13 13 6 13 0 0
14 14 5 14 1 0
15 15 4 15 0 1
16 16 3 16 1 0
17 17 2 17 0 0
18 18 1 18 1 1
19 19 0 19 0 0

请注意,对于 Dask-cuDF 的情况,我们优先使用 dask_cudf.read_csv 而不是 dask_cudf.from_cudf(cudf.read_csv),因为前者可以在多个 GPU 上并行化并处理单个 GPU 内存无法容纳的较大 CSV 文件。

ddf = dask_cudf.read_csv("example_output/foo_dask.csv")
ddf.head()
a b c agg_col1 agg_col2
0 0 19 0 1 1
1 1 18 1 0 0
2 2 17 2 1 0
3 3 16 3 0 1
4 4 15 4 1 0

使用星号通配符将目录中的所有 CSV 文件读取到一个 dask_cudf.DataFrame 中。

ddf = dask_cudf.read_csv("example_output/*.csv")
ddf.head()
a b c agg_col1 agg_col2
0 0 19 0 1 1
1 1 18 1 0 0
2 2 17 2 1 0
3 3 16 3 0 1
4 4 15 4 1 0

读/写 Parquet 文件#

使用 cuDF 的 GPU 加速 Parquet 写入器写入 Parquet 文件

df.to_parquet("example_output/temp_parquet")

使用 cuDF 的 GPU 加速 Parquet 读取器读取 Parquet 文件。

df = cudf.read_parquet("example_output/temp_parquet")
df
a b c agg_col1 agg_col2
0 0 19 0 1 1
1 1 18 1 0 0
2 2 17 2 1 0
3 3 16 3 0 1
4 4 15 4 1 0
5 5 14 5 0 0
6 6 13 6 1 1
7 7 12 7 0 0
8 8 11 8 1 0
9 9 10 9 0 1
10 10 9 10 1 0
11 11 8 11 0 0
12 12 7 12 1 1
13 13 6 13 0 0
14 14 5 14 1 0
15 15 4 15 0 1
16 16 3 16 1 0
17 17 2 17 0 0
18 18 1 18 1 1
19 19 0 19 0 0

dask_cudf.DataFrame 写入 Parquet 文件,底层使用 cuDF 的 Parquet 写入器。

ddf.to_parquet("example_output/ddf_parquet_files")

读/写 ORC 文件#

写入 ORC 文件。

df.to_orc("example_output/temp_orc")

以及读取

df2 = cudf.read_orc("example_output/temp_orc")
df2
a b c agg_col1 agg_col2
0 0 19 0 1 1
1 1 18 1 0 0
2 2 17 2 1 0
3 3 16 3 0 1
4 4 15 4 1 0
5 5 14 5 0 0
6 6 13 6 1 1
7 7 12 7 0 0
8 8 11 8 1 0
9 9 10 9 0 1
10 10 9 10 1 0
11 11 8 11 0 0
12 12 7 12 1 1
13 13 6 13 0 0
14 14 5 14 1 0
15 15 4 15 0 1
16 16 3 16 1 0
17 17 2 17 0 0
18 18 1 18 1 1
19 19 0 19 0 0

Dask 性能技巧#

与 Apache Spark 类似,Dask 操作是惰性的。大多数操作不会立即执行,而是被添加到任务图中,实际评估会延迟到需要结果时才进行。

然而,有时我们希望强制执行某些操作。在 Dask 集合上调用 persist 会完全计算它(或在后台主动计算),将结果持久化到内存中。在使用分布式系统时,我们可能希望等到 persist 完成后再开始任何下游操作。我们可以使用 wait 来强制执行此契约。使用 wait 包装操作将确保它不会开始执行,直到所有必需的上游操作都已完成。

以下代码片段提供了一些基本示例,使用 LocalCUDACluster 在本地机器上为每个 GPU 创建一个 dask-worker。有关 persistwait 的更多详细信息,请参阅 Dask 关于 persistwait 的文档。wait 依赖于 Futures 的概念,这超出了本教程的范围。有关 Futures 的更多信息,请参阅 Dask 的 Futures 文档。有关多 GPU 集群的更多信息,请参阅 dask-cuda 库(文档正在编写中)。

首先,我们设置一个 GPU 集群。设置好我们的 client 后,Dask-cuDF 计算将分布在集群中的 GPU 上。

import time

from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster

cluster = LocalCUDACluster()
client = Client(cluster)

数据持久化#

接下来,我们创建 Dask-cuDF DataFrame 并应用转换,将结果存储为新列。

nrows = 10000000

df2 = cudf.DataFrame({"a": cp.arange(nrows), "b": cp.arange(nrows)})
ddf2 = dask_cudf.from_cudf(df2, npartitions=16)
ddf2["c"] = ddf2["a"] + 5
ddf2
Dask DataFrame 结构
a b c
npartitions=16
0 int64 int64 int64
625000 ... ... ...
... ... ... ...
9375000 ... ... ...
9999999 ... ... ...
Dask 名称: assign, 4 表达式
!nvidia-smi
Mon Apr 14 06:00:29 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 565.57.01              Driver Version: 565.57.01      CUDA Version: 12.8     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|=========================================+========================+======================|
|   0  NVIDIA L4                      Off |   00000000:0A:00.0 Off |                    0 |
| N/A   40C    P0             28W /   72W |     919MiB /  23034MiB |      3%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                                                         
+-----------------------------------------------------------------------------------------+
| Processes:                                                                              |
|  GPU   GI   CI        PID   Type   Process name                              GPU Memory |
|        ID   ID                                                               Usage      |
|=========================================================================================|
|    0   N/A  N/A      1620      C   /opt/conda/envs/docs/bin/python3.12           188MiB |
|    0   N/A  N/A      1667      C   /opt/conda/envs/docs/bin/python               524MiB |
|    0   N/A  N/A      1721      C   /opt/conda/envs/docs/bin/python               188MiB |
+-----------------------------------------------------------------------------------------+

由于 Dask 是惰性的,计算尚未发生。我们可以看到任务图中有六十四个任务,每个 GPU 使用约 330 MB 的设备内存。我们可以使用 persist 强制进行计算。通过强制执行,结果现在明确地存储在内存中,并且我们的任务图每个分区只包含一个任务(基线)。

ddf2 = ddf2.persist()
ddf2
/opt/conda/envs/docs/lib/python3.12/site-packages/distributed/client.py:3370: UserWarning: Sending large graph of size 152.60 MiB.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org.cn/en/stable/best-practices.html#load-data-with-dask for more information.
  warnings.warn(
Dask DataFrame 结构
a b c
npartitions=16
0 int64 int64 int64
625000 ... ... ...
... ... ... ...
9375000 ... ... ...
9999999 ... ... ...
Dask 名称: getitem-add-assign, 1 表达式
# Sleep to ensure the persist finishes and shows in the memory usage
!sleep 5; nvidia-smi
Mon Apr 14 06:00:35 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 565.57.01              Driver Version: 565.57.01      CUDA Version: 12.8     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|=========================================+========================+======================|
|   0  NVIDIA L4                      Off |   00000000:0A:00.0 Off |                    0 |
| N/A   41C    P0             28W /   72W |    1592MiB /  23034MiB |      3%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                                                         
+-----------------------------------------------------------------------------------------+
| Processes:                                                                              |
|  GPU   GI   CI        PID   Type   Process name                              GPU Memory |
|        ID   ID                                                               Usage      |
|=========================================================================================|
|    0   N/A  N/A      1620      C   /opt/conda/envs/docs/bin/python3.12           188MiB |
|    0   N/A  N/A      1667      C   /opt/conda/envs/docs/bin/python               716MiB |
|    0   N/A  N/A      1721      C   /opt/conda/envs/docs/bin/python               188MiB |
|    0   N/A  N/A      1798      C   /opt/conda/envs/docs/bin/python               476MiB |
+-----------------------------------------------------------------------------------------+

因为我们强制进行了计算,所以现在在分布式 GPU 内存中有一个更大的对象。请注意,实际数字因系统而异(例如,取决于有多少设备可用)。

Wait#

根据我们的工作流程或分布式计算设置,我们可能希望在执行特定函数之前 wait 直到所有上游任务完成。本节展示了这种行为的一个示例,改编自 Dask 文档。

首先,我们创建一个新的 Dask DataFrame 并定义一个函数,我们将把这个函数映射到 dataframe 的每个分区。

import random

nrows = 10000000

df1 = cudf.DataFrame({"a": cp.arange(nrows), "b": cp.arange(nrows)})
ddf1 = dask_cudf.from_cudf(df1, npartitions=100)


def func(df):
    time.sleep(random.randint(1, 10))
    return (df + 5) * 3 - 11

此函数将对 dataframe 中的每一列进行基本转换,但由于 time.sleep 语句随机增加 1-10 秒的时间,函数执行时间会有所不同。我们将使用 map_partitions 在 dataframe 的每个分区上运行此函数,这将任务添加到我们的任务图中并存储结果。然后我们可以调用 persist 来强制执行。

results_ddf = ddf2.map_partitions(func)
results_ddf = results_ddf.persist()

但是,有些分区会比其他分区完成得快得多。如果我们的下游进程需要等待所有分区完成,我们可以使用 wait 来强制执行此行为。

wait(results_ddf)
DoneAndNotDoneFutures(done={<Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 11)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 15)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 10)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 4)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 5)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 9)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 0)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 1)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 8)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 13)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 14)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 12)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 3)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 7)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 2)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 6)>}, not_done=set())

随着 wait 完成,我们可以安全地继续我们的工作流程。