cuDF 和 Dask cuDF 十分钟入门#
本文以《Pandas 十分钟入门》为蓝本,旨在为新用户提供一份 cuDF 和 Dask cuDF 的简短介绍。
这些库是什么?#
cuDF 是一个 Python GPU DataFrame 库(基于 Apache Arrow 列式内存格式),用于使用 DataFrame 风格的 API(类似于 pandas)加载、连接、聚合、过滤和操作表格数据。
Dask 是一个灵活的 Python 并行计算库,可使您的工作流程平滑简单地进行扩展。在 CPU 上,Dask 使用 Pandas 在 DataFrame 分区上并行执行操作。
Dask cuDF 在必要时扩展了 Dask,以允许使用 cuDF GPU DataFrame 而非 Pandas DataFrame 处理其 DataFrame 分区。例如,当您调用 dask_cudf.read_csv(...)
时,您的集群的 GPU 会通过调用 cudf.read_csv()
来完成解析 CSV 文件的工作。
何时使用 cuDF 和 Dask cuDF#
如果您的工作流程在单个 GPU 上足够快,或者您的数据能轻松放入单个 GPU 的内存中,您会希望使用 cuDF。如果您想将工作流程分布到多个 GPU 上,数据量超出单个 GPU 内存容量,或者想同时分析分散在许多文件中的数据,您会希望使用 Dask cuDF。
import os
import cudf
import cupy as cp
import dask_cudf
import pandas as pd
cp.random.seed(12)
#### Portions of this were borrowed and adapted from the
#### cuDF cheatsheet, existing cuDF documentation,
#### and 10 Minutes to Pandas.
对象创建#
创建 cudf.Series
和 dask_cudf.Series
。
s = cudf.Series([1, 2, 3, None, 4])
s
0 1
1 2
2 3
3 <NA>
4 4
dtype: int64
ds = dask_cudf.from_cudf(s, npartitions=2)
# Note the call to head here to show the first few entries, unlike
# cuDF objects, Dask-cuDF objects do not have a printing
# representation that shows values since they may not be in local
# memory.
ds.head(n=3)
0 1
1 2
2 3
dtype: int64
通过为每列指定值来创建 cudf.DataFrame
和 dask_cudf.DataFrame
。
df = cudf.DataFrame(
{
"a": list(range(20)),
"b": list(reversed(range(20))),
"c": list(range(20)),
}
)
df
a | b | c | |
---|---|---|---|
0 | 0 | 19 | 0 |
1 | 1 | 18 | 1 |
2 | 2 | 17 | 2 |
3 | 3 | 16 | 3 |
4 | 4 | 15 | 4 |
5 | 5 | 14 | 5 |
6 | 6 | 13 | 6 |
7 | 7 | 12 | 7 |
8 | 8 | 11 | 8 |
9 | 9 | 10 | 9 |
10 | 10 | 9 | 10 |
11 | 11 | 8 | 11 |
12 | 12 | 7 | 12 |
13 | 13 | 6 | 13 |
14 | 14 | 5 | 14 |
15 | 15 | 4 | 15 |
16 | 16 | 3 | 16 |
17 | 17 | 2 | 17 |
18 | 18 | 1 | 18 |
19 | 19 | 0 | 19 |
现在我们将 cuDF dataframe 转换为等效的 Dask-cuDF。这里我们指出一个关键区别:要查看数据,我们必须调用一个方法(这里是 .head()
来查看前几个值)。在一般情况下(参见本 notebook 结尾),ddf
中的数据将分布在多个 GPU 上。
在这种小案例中,我们可以调用 ddf.compute()
从 Dask-cuDF 对象获取一个 cuDF 对象。通常,我们应避免在大 dataframe 上调用 .compute()
,仅在我们有相对较小的后处理结果需要查看时使用它。因此,在本 notebook 中,我们通常会调用 .head()
来查看 Dask-cuDF dataframe 的前几个值,偶尔会指出我们使用 .compute()
的地方及其原因。
要了解 cuDF 和 Dask cuDF 在这里的行为差异,请在本教程之后访问《Dask 十分钟入门》教程。
ddf = dask_cudf.from_cudf(df, npartitions=2)
ddf.head()
a | b | c | |
---|---|---|---|
0 | 0 | 19 | 0 |
1 | 1 | 18 | 1 |
2 | 2 | 17 | 2 |
3 | 3 | 16 | 3 |
4 | 4 | 15 | 4 |
从 pandas Dataframe
创建 cudf.DataFrame
,以及从 cudf.Dataframe
创建 dask_cudf.Dataframe
。
请注意,使用 dask-cuDF 的最佳实践是使用 read_csv
或其他内置 I/O 例程(下文讨论)将数据直接读取到 dask_cudf.DataFrame
中。
pdf = pd.DataFrame({"a": [0, 1, 2, 3], "b": [0.1, 0.2, None, 0.3]})
gdf = cudf.DataFrame.from_pandas(pdf)
gdf
a | b | |
---|---|---|
0 | 0 | 0.1 |
1 | 1 | 0.2 |
2 | 2 | <NA> |
3 | 3 | 0.3 |
dask_gdf = dask_cudf.from_cudf(gdf, npartitions=2)
dask_gdf.head(n=2)
a | b | |
---|---|---|
0 | 0 | 0.1 |
1 | 1 | 0.2 |
查看数据#
查看 GPU dataframe 的顶部行。
df.head(2)
a | b | c | |
---|---|---|---|
0 | 0 | 19 | 0 |
1 | 1 | 18 | 1 |
ddf.head(2)
a | b | c | |
---|---|---|---|
0 | 0 | 19 | 0 |
1 | 1 | 18 | 1 |
按值排序。
df.sort_values(by="b")
a | b | c | |
---|---|---|---|
19 | 19 | 0 | 19 |
18 | 18 | 1 | 18 |
17 | 17 | 2 | 17 |
16 | 16 | 3 | 16 |
15 | 15 | 4 | 15 |
14 | 14 | 5 | 14 |
13 | 13 | 6 | 13 |
12 | 12 | 7 | 12 |
11 | 11 | 8 | 11 |
10 | 10 | 9 | 10 |
9 | 9 | 10 | 9 |
8 | 8 | 11 | 8 |
7 | 7 | 12 | 7 |
6 | 6 | 13 | 6 |
5 | 5 | 14 | 5 |
4 | 4 | 15 | 4 |
3 | 3 | 16 | 3 |
2 | 2 | 17 | 2 |
1 | 1 | 18 | 1 |
0 | 0 | 19 | 0 |
ddf.sort_values(by="b").head()
a | b | c | |
---|---|---|---|
19 | 19 | 0 | 19 |
18 | 18 | 1 | 18 |
17 | 17 | 2 | 17 |
16 | 16 | 3 | 16 |
15 | 15 | 4 | 15 |
选择列#
选择单列,这将最初产生一个 cudf.Series
或 dask_cudf.Series
。调用 compute
会得到一个 cudf.Series
(相当于 df.a
)。
df["a"]
0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
10 10
11 11
12 12
13 13
14 14
15 15
16 16
17 17
18 18
19 19
Name: a, dtype: int64
ddf["a"].head()
0 0
1 1
2 2
3 3
4 4
Name: a, dtype: int64
按标签选择行#
选择索引 2 到索引 5 的行,从列 'a' 和 'b' 中。
df.loc[2:5, ["a", "b"]]
a | b | |
---|---|---|
2 | 2 | 17 |
3 | 3 | 16 |
4 | 4 | 15 |
5 | 5 | 14 |
ddf.loc[2:5, ["a", "b"]].head()
a | b | |
---|---|---|
2 | 2 | 17 |
3 | 3 | 16 |
4 | 4 | 15 |
5 | 5 | 14 |
按位置选择行#
通过整数和整数切片进行选择,类似于 numpy/pandas。请注意,此功能不适用于 Dask-cuDF DataFrame。
df.iloc[0]
a 0
b 19
c 0
Name: 0, dtype: int64
df.iloc[0:3, 0:2]
a | b | |
---|---|---|
0 | 0 | 19 |
1 | 1 | 18 |
2 | 2 | 17 |
您也可以通过直接索引访问来选择 DataFrame
或 Series
的元素。
df[3:5]
a | b | c | |
---|---|---|---|
3 | 3 | 16 | 3 |
4 | 4 | 15 | 4 |
s[3:5]
3 <NA>
4 4
dtype: int64
布尔索引#
通过直接布尔索引来选择 DataFrame
或 Series
中的行。
df[df.b > 15]
a | b | c | |
---|---|---|---|
0 | 0 | 19 | 0 |
1 | 1 | 18 | 1 |
2 | 2 | 17 | 2 |
3 | 3 | 16 | 3 |
ddf[ddf.b > 15].head(n=3)
a | b | c | |
---|---|---|---|
0 | 0 | 19 | 0 |
1 | 1 | 18 | 1 |
2 | 2 | 17 | 2 |
通过 query
API 选择满足布尔条件的 DataFrame
中的值。
df.query("b == 3")
a | b | c | |
---|---|---|---|
16 | 16 | 3 | 16 |
注意,这里我们在 Dask-cuDF dataframe 上调用 compute()
而不是 head()
,因为我们确信匹配的行数会很少(因此将整个结果取回是完全合理的)。
ddf.query("b == 3").compute()
a | b | c | |
---|---|---|---|
16 | 16 | 3 | 16 |
您还可以通过 local_dict
关键字将局部变量传递给 Dask-cuDF 查询。对于标准的 cuDF,您可以使用 local_dict
关键字或通过 @
关键字直接传递变量。支持的逻辑运算符包括 >
, <
, >=
, <=
, ==
和 !=
。
cudf_comparator = 3
df.query("b == @cudf_comparator")
a | b | c | |
---|---|---|---|
16 | 16 | 3 | 16 |
dask_cudf_comparator = 3
ddf.query("b == @val", local_dict={"val": dask_cudf_comparator}).compute()
a | b | c | |
---|---|---|---|
16 | 16 | 3 | 16 |
使用 isin
方法进行过滤。
df[df.a.isin([0, 5])]
a | b | c | |
---|---|---|---|
0 | 0 | 19 | 0 |
5 | 5 | 14 | 5 |
MultiIndex#
cuDF 支持使用 MultiIndex 对 DataFrame 进行分层索引。按层级分组(参见下面的 Grouping
)会自动生成带有 MultiIndex 的 DataFrame。
arrays = [["a", "a", "b", "b"], [1, 2, 3, 4]]
tuples = list(zip(*arrays))
idx = cudf.MultiIndex.from_tuples(tuples)
idx
MultiIndex([('a', 1),
('a', 2),
('b', 3),
('b', 4)],
)
此索引可支持 DataFrame 的任一轴。
gdf1 = cudf.DataFrame(
{"first": cp.random.rand(4), "second": cp.random.rand(4)}
)
gdf1.index = idx
gdf1
first | second | ||
---|---|---|---|
a | 1 | 0.082654 | 0.967955 |
2 | 0.399417 | 0.441425 | |
b | 3 | 0.784297 | 0.793582 |
4 | 0.070303 | 0.271711 |
gdf2 = cudf.DataFrame(
{"first": cp.random.rand(4), "second": cp.random.rand(4)}
).T
gdf2.columns = idx
gdf2
a | b | |||
---|---|---|---|---|
1 | 2 | 3 | 4 | |
first | 0.343382 | 0.003700 | 0.20043 | 0.581614 |
second | 0.907812 | 0.101512 | 0.24179 | 0.224180 |
使用 MultiIndex 访问 DataFrame 的值,使用 .loc
gdf1.loc[("b", 3)]
first 0.784297
second 0.793582
Name: ('b', 3), dtype: float64
以及 .iloc
gdf1.iloc[0:2]
first | second | ||
---|---|---|---|
a | 1 | 0.082654 | 0.967955 |
2 | 0.399417 | 0.441425 |
缺失数据#
可以使用 fillna
方法替换缺失数据。
s.fillna(999)
0 1
1 2
2 3
3 999
4 4
dtype: int64
ds.fillna(999).head(n=3)
0 1
1 2
2 3
dtype: int64
统计#
计算 Series
的描述性统计信息。
s.mean(), s.var()
(np.float64(2.5), np.float64(1.6666666666666667))
这是一个典型示例,说明何时我们可能想调用 .compute()
。计算均值和方差的结果在每种情况下都是一个单独的数字,因此查看整个结果是完全合理的!
ds.mean().compute(), ds.var().compute()
(np.float64(2.5), np.float64(1.6666666666666667))
Applymap#
将函数应用于 Series
。请注意,使用 Dask cuDF 直接应用用户定义函数的功能尚未实现。目前,您可以使用 map_partitions 将函数应用于分布式 dataframe 的每个分区。
def add_ten(num):
return num + 10
df["a"].apply(add_ten)
0 10
1 11
2 12
3 13
4 14
5 15
6 16
7 17
8 18
9 19
10 20
11 21
12 22
13 23
14 24
15 25
16 26
17 27
18 28
19 29
Name: a, dtype: int64
ddf["a"].map_partitions(add_ten).head()
0 10
1 11
2 12
3 13
4 14
Name: a, dtype: int64
直方图绘制#
计算变量每个唯一值的出现次数。
df.a.value_counts()
a
15 1
10 1
18 1
2 1
11 1
5 1
3 1
16 1
9 1
12 1
19 1
6 1
7 1
8 1
13 1
4 1
17 1
14 1
1 1
0 1
Name: count, dtype: int64
ddf.a.value_counts().head()
a
6 1
17 1
12 1
16 1
7 1
Name: count, dtype: int64
字符串方法#
与 pandas 类似,cuDF 在 Series
的 str
属性中提供了字符串处理方法。字符串方法的完整文档正在进行中。请参阅 cuDF API 文档了解更多信息。
s = cudf.Series(["A", "B", "C", "Aaba", "Baca", None, "CABA", "dog", "cat"])
s.str.lower()
0 a
1 b
2 c
3 aaba
4 baca
5 <NA>
6 caba
7 dog
8 cat
dtype: object
ds = dask_cudf.from_cudf(s, npartitions=2)
ds.str.lower().head(n=4)
0 a
1 b
2 c
3 aaba
dtype: object
除了简单的操作,我们还可以使用正则表达式匹配字符串。
s.str.match("^[aAc].+")
0 False
1 False
2 False
3 True
4 False
5 <NA>
6 False
7 False
8 True
dtype: bool
ds.str.match("^[aAc].+").head()
0 False
1 False
2 False
3 True
4 False
dtype: bool
Concat#
按行连接 Series
和 DataFrames
。
s = cudf.Series([1, 2, 3, None, 5])
cudf.concat([s, s])
0 1
1 2
2 3
3 <NA>
4 5
0 1
1 2
2 3
3 <NA>
4 5
dtype: int64
ds2 = dask_cudf.from_cudf(s, npartitions=2)
dask_cudf.concat([ds2, ds2]).head(n=3)
0 1
1 2
2 3
dtype: int64
Join#
执行 SQL 风格的合并。请注意,dataframe 的顺序不会保留,但可以在合并后通过索引排序来恢复。
df_a = cudf.DataFrame()
df_a["key"] = ["a", "b", "c", "d", "e"]
df_a["vals_a"] = [float(i + 10) for i in range(5)]
df_b = cudf.DataFrame()
df_b["key"] = ["a", "c", "e"]
df_b["vals_b"] = [float(i + 100) for i in range(3)]
merged = df_a.merge(df_b, on=["key"], how="left")
merged
key | vals_a | vals_b | |
---|---|---|---|
0 | a | 10.0 | 100.0 |
1 | c | 12.0 | 101.0 |
2 | e | 14.0 | 102.0 |
3 | b | 11.0 | <NA> |
4 | d | 13.0 | <NA> |
ddf_a = dask_cudf.from_cudf(df_a, npartitions=2)
ddf_b = dask_cudf.from_cudf(df_b, npartitions=2)
merged = ddf_a.merge(ddf_b, on=["key"], how="left").head(n=4)
merged
key | vals_a | vals_b | |
---|---|---|---|
0 | e | 14.0 | 102.0 |
1 | c | 12.0 | 101.0 |
2 | d | 13.0 | <NA> |
3 | b | 11.0 | <NA> |
分组#
与 pandas 类似,cuDF 和 Dask-cuDF 支持 Split-Apply-Combine 分组范式。
df["agg_col1"] = [1 if x % 2 == 0 else 0 for x in range(len(df))]
df["agg_col2"] = [1 if x % 3 == 0 else 0 for x in range(len(df))]
ddf = dask_cudf.from_cudf(df, npartitions=2)
分组然后将 sum
函数应用于分组数据。
df.groupby("agg_col1").sum()
a | b | c | agg_col2 | |
---|---|---|---|---|
agg_col1 | ||||
1 | 90 | 100 | 90 | 4 |
0 | 100 | 90 | 100 | 3 |
ddf.groupby("agg_col1").sum().compute()
a | b | c | agg_col2 | |
---|---|---|---|---|
agg_col1 | ||||
1 | 90 | 100 | 90 | 4 |
0 | 100 | 90 | 100 | 3 |
分层分组然后将 sum
函数应用于分组数据。
df.groupby(["agg_col1", "agg_col2"]).sum()
a | b | c | ||
---|---|---|---|---|
agg_col1 | agg_col2 | |||
1 | 1 | 36 | 40 | 36 |
0 | 54 | 60 | 54 | |
0 | 1 | 27 | 30 | 27 |
0 | 73 | 60 | 73 |
ddf.groupby(["agg_col1", "agg_col2"]).sum().compute()
a | b | c | ||
---|---|---|---|---|
agg_col1 | agg_col2 | |||
1 | 1 | 36 | 40 | 36 |
0 | 0 | 73 | 60 | 73 |
1 | 27 | 30 | 27 | |
1 | 0 | 54 | 60 | 54 |
使用 agg
对特定列进行分组并应用统计函数。
df.groupby("agg_col1").agg({"a": "max", "b": "mean", "c": "sum"})
a | b | c | |
---|---|---|---|
agg_col1 | |||
1 | 18 | 10.0 | 90 |
0 | 19 | 9.0 | 100 |
ddf.groupby("agg_col1").agg({"a": "max", "b": "mean", "c": "sum"}).compute()
a | b | c | |
---|---|---|---|
agg_col1 | |||
1 | 18 | 10.0 | 90 |
0 | 19 | 9.0 | 100 |
转置#
使用 transpose
方法或 T
属性转置 dataframe。目前,所有列必须具有相同的类型。转置功能目前尚未在 Dask cuDF 中实现。
sample = cudf.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
sample
a | b | |
---|---|---|
0 | 1 | 4 |
1 | 2 | 5 |
2 | 3 | 6 |
sample.transpose()
0 | 1 | 2 | |
---|---|---|---|
a | 1 | 2 | 3 |
b | 4 | 5 | 6 |
时间序列#
DataFrames
支持 datetime
类型的列,允许用户基于特定时间戳与数据交互和过滤数据。
import datetime as dt
date_df = cudf.DataFrame()
date_df["date"] = pd.date_range("11/20/2018", periods=72, freq="D")
date_df["value"] = cp.random.sample(len(date_df))
search_date = dt.datetime.strptime("2018-11-23", "%Y-%m-%d")
date_df.query("date <= @search_date")
date | value | |
---|---|---|
0 | 2018-11-20 | 0.986051 |
1 | 2018-11-21 | 0.232034 |
2 | 2018-11-22 | 0.397617 |
3 | 2018-11-23 | 0.103839 |
date_ddf = dask_cudf.from_cudf(date_df, npartitions=2)
date_ddf.query(
"date <= @search_date", local_dict={"search_date": search_date}
).compute()
date | value | |
---|---|---|
0 | 2018-11-20 | 0.986051 |
1 | 2018-11-21 | 0.232034 |
2 | 2018-11-22 | 0.397617 |
3 | 2018-11-23 | 0.103839 |
分类数据#
DataFrames
支持分类列。
gdf = cudf.DataFrame(
{"id": [1, 2, 3, 4, 5, 6], "grade": ["a", "b", "b", "a", "a", "e"]}
)
gdf["grade"] = gdf["grade"].astype("category")
gdf
id | grade | |
---|---|---|
0 | 1 | a |
1 | 2 | b |
2 | 3 | b |
3 | 4 | a |
4 | 5 | a |
5 | 6 | e |
dgdf = dask_cudf.from_cudf(gdf, npartitions=2)
dgdf.head(n=3)
id | grade | |
---|---|---|
0 | 1 | a |
1 | 2 | b |
2 | 3 | b |
访问列的类别。请注意,这目前在 Dask-cuDF 中不受支持。
gdf.grade.cat.categories
Index(['a', 'b', 'e'], dtype='object')
访问每个分类观测值的底层代码值。
gdf.grade.cat.codes
0 0
1 1
2 1
3 0
4 0
5 2
dtype: uint8
dgdf.grade.cat.codes.compute()
0 0
1 1
2 1
3 0
4 0
5 2
dtype: uint8
转换为 Pandas#
将 cuDF 和 Dask-cuDF DataFrame
转换为 pandas DataFrame
。
df.head().to_pandas()
a | b | c | agg_col1 | agg_col2 | |
---|---|---|---|---|---|
0 | 0 | 19 | 0 | 1 | 1 |
1 | 1 | 18 | 1 | 0 | 0 |
2 | 2 | 17 | 2 | 1 | 0 |
3 | 3 | 16 | 3 | 0 | 1 |
4 | 4 | 15 | 4 | 1 | 0 |
要将前几个条目转换为 pandas,我们类似地在 Dask-cuDF dataframe 上调用 .head()
以获得一个本地 cuDF dataframe,然后我们可以进行转换。
ddf.head().to_pandas()
a | b | c | agg_col1 | agg_col2 | |
---|---|---|---|---|---|
0 | 0 | 19 | 0 | 1 | 1 |
1 | 1 | 18 | 1 | 0 | 0 |
2 | 2 | 17 | 2 | 1 | 0 |
3 | 3 | 16 | 3 | 0 | 1 |
4 | 4 | 15 | 4 | 1 | 0 |
相比之下,如果我们要转换整个 frame,需要在 ddf
上调用 .compute()
以获取本地 cuDF dataframe,然后调用 to_pandas()
,之后再进行后续处理。不建议采用此工作流程,因为它既会给单个 GPU 带来较高的内存压力(.compute()
调用),又没有利用 GPU 加速进行处理(计算发生在 pandas 中)。
ddf.compute().to_pandas().head()
a | b | c | agg_col1 | agg_col2 | |
---|---|---|---|---|---|
0 | 0 | 19 | 0 | 1 | 1 |
1 | 1 | 18 | 1 | 0 | 0 |
2 | 2 | 17 | 2 | 1 | 0 |
3 | 3 | 16 | 3 | 0 | 1 |
4 | 4 | 15 | 4 | 1 | 0 |
转换为 Numpy#
将 cuDF 或 Dask-cuDF DataFrame
转换为 numpy ndarray
。
df.to_numpy()
array([[ 0, 19, 0, 1, 1],
[ 1, 18, 1, 0, 0],
[ 2, 17, 2, 1, 0],
[ 3, 16, 3, 0, 1],
[ 4, 15, 4, 1, 0],
[ 5, 14, 5, 0, 0],
[ 6, 13, 6, 1, 1],
[ 7, 12, 7, 0, 0],
[ 8, 11, 8, 1, 0],
[ 9, 10, 9, 0, 1],
[10, 9, 10, 1, 0],
[11, 8, 11, 0, 0],
[12, 7, 12, 1, 1],
[13, 6, 13, 0, 0],
[14, 5, 14, 1, 0],
[15, 4, 15, 0, 1],
[16, 3, 16, 1, 0],
[17, 2, 17, 0, 0],
[18, 1, 18, 1, 1],
[19, 0, 19, 0, 0]])
ddf.compute().to_numpy()
array([[ 0, 19, 0, 1, 1],
[ 1, 18, 1, 0, 0],
[ 2, 17, 2, 1, 0],
[ 3, 16, 3, 0, 1],
[ 4, 15, 4, 1, 0],
[ 5, 14, 5, 0, 0],
[ 6, 13, 6, 1, 1],
[ 7, 12, 7, 0, 0],
[ 8, 11, 8, 1, 0],
[ 9, 10, 9, 0, 1],
[10, 9, 10, 1, 0],
[11, 8, 11, 0, 0],
[12, 7, 12, 1, 1],
[13, 6, 13, 0, 0],
[14, 5, 14, 1, 0],
[15, 4, 15, 0, 1],
[16, 3, 16, 1, 0],
[17, 2, 17, 0, 0],
[18, 1, 18, 1, 1],
[19, 0, 19, 0, 0]])
将 cuDF 或 Dask-cuDF Series
转换为 numpy ndarray
。
df["a"].to_numpy()
array([ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
17, 18, 19])
ddf["a"].compute().to_numpy()
array([ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
17, 18, 19])
转换为 Arrow#
将 cuDF 或 Dask-cuDF DataFrame
转换为 PyArrow Table
。
df.to_arrow()
pyarrow.Table
a: int64
b: int64
c: int64
agg_col1: int64
agg_col2: int64
----
a: [[0,1,2,3,4,...,15,16,17,18,19]]
b: [[19,18,17,16,15,...,4,3,2,1,0]]
c: [[0,1,2,3,4,...,15,16,17,18,19]]
agg_col1: [[1,0,1,0,1,...,0,1,0,1,0]]
agg_col2: [[1,0,0,1,0,...,1,0,0,1,0]]
ddf.head().to_arrow()
pyarrow.Table
a: int64
b: int64
c: int64
agg_col1: int64
agg_col2: int64
----
a: [[0,1,2,3,4]]
b: [[19,18,17,16,15]]
c: [[0,1,2,3,4]]
agg_col1: [[1,0,1,0,1]]
agg_col2: [[1,0,0,1,0]]
读/写 CSV 文件#
写入 CSV 文件。
if not os.path.exists("example_output"):
os.mkdir("example_output")
df.to_csv("example_output/foo.csv", index=False)
ddf.compute().to_csv("example_output/foo_dask.csv", index=False)
读取 CSV 文件。
df = cudf.read_csv("example_output/foo.csv")
df
a | b | c | agg_col1 | agg_col2 | |
---|---|---|---|---|---|
0 | 0 | 19 | 0 | 1 | 1 |
1 | 1 | 18 | 1 | 0 | 0 |
2 | 2 | 17 | 2 | 1 | 0 |
3 | 3 | 16 | 3 | 0 | 1 |
4 | 4 | 15 | 4 | 1 | 0 |
5 | 5 | 14 | 5 | 0 | 0 |
6 | 6 | 13 | 6 | 1 | 1 |
7 | 7 | 12 | 7 | 0 | 0 |
8 | 8 | 11 | 8 | 1 | 0 |
9 | 9 | 10 | 9 | 0 | 1 |
10 | 10 | 9 | 10 | 1 | 0 |
11 | 11 | 8 | 11 | 0 | 0 |
12 | 12 | 7 | 12 | 1 | 1 |
13 | 13 | 6 | 13 | 0 | 0 |
14 | 14 | 5 | 14 | 1 | 0 |
15 | 15 | 4 | 15 | 0 | 1 |
16 | 16 | 3 | 16 | 1 | 0 |
17 | 17 | 2 | 17 | 0 | 0 |
18 | 18 | 1 | 18 | 1 | 1 |
19 | 19 | 0 | 19 | 0 | 0 |
请注意,对于 Dask-cuDF 的情况,我们优先使用 dask_cudf.read_csv
而不是 dask_cudf.from_cudf(cudf.read_csv)
,因为前者可以在多个 GPU 上并行化并处理单个 GPU 内存无法容纳的较大 CSV 文件。
ddf = dask_cudf.read_csv("example_output/foo_dask.csv")
ddf.head()
a | b | c | agg_col1 | agg_col2 | |
---|---|---|---|---|---|
0 | 0 | 19 | 0 | 1 | 1 |
1 | 1 | 18 | 1 | 0 | 0 |
2 | 2 | 17 | 2 | 1 | 0 |
3 | 3 | 16 | 3 | 0 | 1 |
4 | 4 | 15 | 4 | 1 | 0 |
使用星号通配符将目录中的所有 CSV 文件读取到一个 dask_cudf.DataFrame
中。
ddf = dask_cudf.read_csv("example_output/*.csv")
ddf.head()
a | b | c | agg_col1 | agg_col2 | |
---|---|---|---|---|---|
0 | 0 | 19 | 0 | 1 | 1 |
1 | 1 | 18 | 1 | 0 | 0 |
2 | 2 | 17 | 2 | 1 | 0 |
3 | 3 | 16 | 3 | 0 | 1 |
4 | 4 | 15 | 4 | 1 | 0 |
读/写 Parquet 文件#
使用 cuDF 的 GPU 加速 Parquet 写入器写入 Parquet 文件
df.to_parquet("example_output/temp_parquet")
使用 cuDF 的 GPU 加速 Parquet 读取器读取 Parquet 文件。
df = cudf.read_parquet("example_output/temp_parquet")
df
a | b | c | agg_col1 | agg_col2 | |
---|---|---|---|---|---|
0 | 0 | 19 | 0 | 1 | 1 |
1 | 1 | 18 | 1 | 0 | 0 |
2 | 2 | 17 | 2 | 1 | 0 |
3 | 3 | 16 | 3 | 0 | 1 |
4 | 4 | 15 | 4 | 1 | 0 |
5 | 5 | 14 | 5 | 0 | 0 |
6 | 6 | 13 | 6 | 1 | 1 |
7 | 7 | 12 | 7 | 0 | 0 |
8 | 8 | 11 | 8 | 1 | 0 |
9 | 9 | 10 | 9 | 0 | 1 |
10 | 10 | 9 | 10 | 1 | 0 |
11 | 11 | 8 | 11 | 0 | 0 |
12 | 12 | 7 | 12 | 1 | 1 |
13 | 13 | 6 | 13 | 0 | 0 |
14 | 14 | 5 | 14 | 1 | 0 |
15 | 15 | 4 | 15 | 0 | 1 |
16 | 16 | 3 | 16 | 1 | 0 |
17 | 17 | 2 | 17 | 0 | 0 |
18 | 18 | 1 | 18 | 1 | 1 |
19 | 19 | 0 | 19 | 0 | 0 |
从 dask_cudf.DataFrame
写入 Parquet 文件,底层使用 cuDF 的 Parquet 写入器。
ddf.to_parquet("example_output/ddf_parquet_files")
读/写 ORC 文件#
写入 ORC 文件。
df.to_orc("example_output/temp_orc")
以及读取
df2 = cudf.read_orc("example_output/temp_orc")
df2
a | b | c | agg_col1 | agg_col2 | |
---|---|---|---|---|---|
0 | 0 | 19 | 0 | 1 | 1 |
1 | 1 | 18 | 1 | 0 | 0 |
2 | 2 | 17 | 2 | 1 | 0 |
3 | 3 | 16 | 3 | 0 | 1 |
4 | 4 | 15 | 4 | 1 | 0 |
5 | 5 | 14 | 5 | 0 | 0 |
6 | 6 | 13 | 6 | 1 | 1 |
7 | 7 | 12 | 7 | 0 | 0 |
8 | 8 | 11 | 8 | 1 | 0 |
9 | 9 | 10 | 9 | 0 | 1 |
10 | 10 | 9 | 10 | 1 | 0 |
11 | 11 | 8 | 11 | 0 | 0 |
12 | 12 | 7 | 12 | 1 | 1 |
13 | 13 | 6 | 13 | 0 | 0 |
14 | 14 | 5 | 14 | 1 | 0 |
15 | 15 | 4 | 15 | 0 | 1 |
16 | 16 | 3 | 16 | 1 | 0 |
17 | 17 | 2 | 17 | 0 | 0 |
18 | 18 | 1 | 18 | 1 | 1 |
19 | 19 | 0 | 19 | 0 | 0 |
Dask 性能技巧#
与 Apache Spark 类似,Dask 操作是惰性的。大多数操作不会立即执行,而是被添加到任务图中,实际评估会延迟到需要结果时才进行。
然而,有时我们希望强制执行某些操作。在 Dask 集合上调用 persist
会完全计算它(或在后台主动计算),将结果持久化到内存中。在使用分布式系统时,我们可能希望等到 persist
完成后再开始任何下游操作。我们可以使用 wait
来强制执行此契约。使用 wait
包装操作将确保它不会开始执行,直到所有必需的上游操作都已完成。
以下代码片段提供了一些基本示例,使用 LocalCUDACluster
在本地机器上为每个 GPU 创建一个 dask-worker。有关 persist
和 wait
的更多详细信息,请参阅 Dask 关于 persist 和 wait 的文档。wait 依赖于 Futures 的概念,这超出了本教程的范围。有关 Futures 的更多信息,请参阅 Dask 的 Futures 文档。有关多 GPU 集群的更多信息,请参阅 dask-cuda 库(文档正在编写中)。
首先,我们设置一个 GPU 集群。设置好我们的 client
后,Dask-cuDF 计算将分布在集群中的 GPU 上。
import time
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
cluster = LocalCUDACluster()
client = Client(cluster)
数据持久化#
接下来,我们创建 Dask-cuDF DataFrame 并应用转换,将结果存储为新列。
nrows = 10000000
df2 = cudf.DataFrame({"a": cp.arange(nrows), "b": cp.arange(nrows)})
ddf2 = dask_cudf.from_cudf(df2, npartitions=16)
ddf2["c"] = ddf2["a"] + 5
ddf2
a | b | c | |
---|---|---|---|
npartitions=16 | |||
0 | int64 | int64 | int64 |
625000 | ... | ... | ... |
... | ... | ... | ... |
9375000 | ... | ... | ... |
9999999 | ... | ... | ... |
!nvidia-smi
Mon Apr 14 06:00:29 2025
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 565.57.01 Driver Version: 565.57.01 CUDA Version: 12.8 |
|-----------------------------------------+------------------------+----------------------+
| GPU Name Persistence-M | Bus-Id Disp.A | Volatile Uncorr. ECC |
| Fan Temp Perf Pwr:Usage/Cap | Memory-Usage | GPU-Util Compute M. |
| | | MIG M. |
|=========================================+========================+======================|
| 0 NVIDIA L4 Off | 00000000:0A:00.0 Off | 0 |
| N/A 40C P0 28W / 72W | 919MiB / 23034MiB | 3% Default |
| | | N/A |
+-----------------------------------------+------------------------+----------------------+
+-----------------------------------------------------------------------------------------+
| Processes: |
| GPU GI CI PID Type Process name GPU Memory |
| ID ID Usage |
|=========================================================================================|
| 0 N/A N/A 1620 C /opt/conda/envs/docs/bin/python3.12 188MiB |
| 0 N/A N/A 1667 C /opt/conda/envs/docs/bin/python 524MiB |
| 0 N/A N/A 1721 C /opt/conda/envs/docs/bin/python 188MiB |
+-----------------------------------------------------------------------------------------+
由于 Dask 是惰性的,计算尚未发生。我们可以看到任务图中有六十四个任务,每个 GPU 使用约 330 MB 的设备内存。我们可以使用 persist
强制进行计算。通过强制执行,结果现在明确地存储在内存中,并且我们的任务图每个分区只包含一个任务(基线)。
ddf2 = ddf2.persist()
ddf2
/opt/conda/envs/docs/lib/python3.12/site-packages/distributed/client.py:3370: UserWarning: Sending large graph of size 152.60 MiB.
This may cause some slowdown.
Consider loading the data with Dask directly
or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org.cn/en/stable/best-practices.html#load-data-with-dask for more information.
warnings.warn(
a | b | c | |
---|---|---|---|
npartitions=16 | |||
0 | int64 | int64 | int64 |
625000 | ... | ... | ... |
... | ... | ... | ... |
9375000 | ... | ... | ... |
9999999 | ... | ... | ... |
# Sleep to ensure the persist finishes and shows in the memory usage
!sleep 5; nvidia-smi
Mon Apr 14 06:00:35 2025
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 565.57.01 Driver Version: 565.57.01 CUDA Version: 12.8 |
|-----------------------------------------+------------------------+----------------------+
| GPU Name Persistence-M | Bus-Id Disp.A | Volatile Uncorr. ECC |
| Fan Temp Perf Pwr:Usage/Cap | Memory-Usage | GPU-Util Compute M. |
| | | MIG M. |
|=========================================+========================+======================|
| 0 NVIDIA L4 Off | 00000000:0A:00.0 Off | 0 |
| N/A 41C P0 28W / 72W | 1592MiB / 23034MiB | 3% Default |
| | | N/A |
+-----------------------------------------+------------------------+----------------------+
+-----------------------------------------------------------------------------------------+
| Processes: |
| GPU GI CI PID Type Process name GPU Memory |
| ID ID Usage |
|=========================================================================================|
| 0 N/A N/A 1620 C /opt/conda/envs/docs/bin/python3.12 188MiB |
| 0 N/A N/A 1667 C /opt/conda/envs/docs/bin/python 716MiB |
| 0 N/A N/A 1721 C /opt/conda/envs/docs/bin/python 188MiB |
| 0 N/A N/A 1798 C /opt/conda/envs/docs/bin/python 476MiB |
+-----------------------------------------------------------------------------------------+
因为我们强制进行了计算,所以现在在分布式 GPU 内存中有一个更大的对象。请注意,实际数字因系统而异(例如,取决于有多少设备可用)。
Wait#
根据我们的工作流程或分布式计算设置,我们可能希望在执行特定函数之前 wait
直到所有上游任务完成。本节展示了这种行为的一个示例,改编自 Dask 文档。
首先,我们创建一个新的 Dask DataFrame 并定义一个函数,我们将把这个函数映射到 dataframe 的每个分区。
import random
nrows = 10000000
df1 = cudf.DataFrame({"a": cp.arange(nrows), "b": cp.arange(nrows)})
ddf1 = dask_cudf.from_cudf(df1, npartitions=100)
def func(df):
time.sleep(random.randint(1, 10))
return (df + 5) * 3 - 11
此函数将对 dataframe 中的每一列进行基本转换,但由于 time.sleep
语句随机增加 1-10 秒的时间,函数执行时间会有所不同。我们将使用 map_partitions
在 dataframe 的每个分区上运行此函数,这将任务添加到我们的任务图中并存储结果。然后我们可以调用 persist
来强制执行。
results_ddf = ddf2.map_partitions(func)
results_ddf = results_ddf.persist()
但是,有些分区会比其他分区完成得快得多。如果我们的下游进程需要等待所有分区完成,我们可以使用 wait
来强制执行此行为。
wait(results_ddf)
DoneAndNotDoneFutures(done={<Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 11)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 15)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 10)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 4)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 5)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 9)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 0)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 1)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 8)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 13)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 14)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 12)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 3)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 7)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 2)>, <Future: finished, type: cudf.core.dataframe.DataFrame, key: ('func-f080b503a89d015dcfbb07c1762d343a', 6)>}, not_done=set())
随着 wait
完成,我们可以安全地继续我们的工作流程。