cuDF 用户定义函数概览#
import cudf
import numpy as np
from cudf.datasets import randomdata
from numba import config
config.CUDA_LOW_OCCUPANCY_WARNINGS = 0
与许多表格数据处理 API 类似,cuDF 提供了一系列可组合的 DataFrame 风格操作符。虽然现有的函数灵活且有用,但有时需要编写自定义代码或用户定义函数 (UDF),以便应用于组成 DataFrame 的行、列和其他分组的单元格。
结合更广泛的 GPU PyData 生态系统,cuDF 提供了在各种数据结构上运行 UDF 的接口。目前,我们只能在数字、布尔型、日期时间和时间差类型的数据上执行 UDF,并在某些 API 中部分支持字符串。本指南涵盖了如何在以下数据结构上编写和执行 UDF
Series
DataFrame
滚动窗口序列 (Rolling Windows Series)
GroupBy DataFrames
CuPy NDArrays
Numba DeviceNDArrays
它还演示了 cuDF 的默认空值处理行为,以及如何编写可以与空值交互的 UDF。
Series UDF#
您可以通过两种方式在 Series 上执行 UDF
编写标准的 Python 函数并使用
cudf.Series.apply
编写 Numba 内核并使用 Numba 的
forall
语法
使用 apply
更简单,但编写 Numba 内核提供了构建更复杂函数的灵活性(本指南中我们只编写简单的内核)。
cudf.Series.apply
#
cuDF 提供了一个类似于 pandas.Series.apply
的 API,用于将标量 UDF 应用于 series 对象。这是一个非常基础的示例。
# Create a cuDF series
sr = cudf.Series([1, 2, 3])
用于 cudf.Series.apply
的 UDF 可能看起来像这样
# define a scalar function
def f(x):
return x + 1
cudf.Series.apply
的调用方式与 pd.Series.apply
类似,并返回一个新的 Series
对象
sr.apply(f)
0 2
1 3
2 4
dtype: int64
带额外标量参数的函数#
此外,cudf.Series.apply
支持像 pandas 一样的 args=
参数,允许您编写接受任意数量标量参数的 UDF。以下是此类函数及其在 pandas 和 cuDF 中的 API 调用示例
def g(x, const):
return x + const
# cuDF apply
sr.apply(g, args=(42,))
0 43
1 44
2 45
dtype: int64
最后请注意,尚不支持 **kwargs
。
可空数据#
空值 NA
会在一元和二元操作中传播。因此,NA + 1
, abs(NA)
, 和 NA == NA
都返回 NA
。为了具体说明,让我们看看上面的同一个示例,这次使用可空数据
# Create a cuDF series with nulls
sr = cudf.Series([1, cudf.NA, 3])
sr
0 1
1 <NA>
2 3
dtype: int64
# redefine the same function from above
def f(x):
return x + 1
# cuDF result
sr.apply(f)
0 2
1 <NA>
2 4
dtype: int64
然而,通常您希望在函数内部进行显式的空值处理。cuDF 以与 pandas 相同的方式公开此功能,即直接与 NA
单例对象交互。以下是一个具有显式空值处理功能的函数示例
def f_null_sensitive(x):
# do something if the input is null
if x is cudf.NA:
return 42
else:
return x + 1
# cuDF result
sr.apply(f_null_sensitive)
0 2
1 42
2 4
dtype: int64
此外,cudf.NA
可以直接或有条件地从函数返回。此功能应允许您在各种情况下实现自定义的空值处理。
字符串数据#
对 apply
的部分字符串功能的实验性支持已可用。目前支持以下字符串操作
str.count
str.startswith
str.endswith
str.find
str.rfind
str.isalnum
str.isdecimal
str.isdigit
str.islower
str.isupper
str.isalpha
str.istitle
str.isspace
==
,!=
,>=
,<=
,>
,<
(between two strings)len
(e.g.len(some_string))
in
(e.g,'abc' in some_string
)strip
lstrip
rstrip
upper
lower
+
(string concatenation)replace
sr = cudf.Series(["", "abc", "some_example"])
def f(st):
if len(st) > 0:
if st.startswith("a"):
return 1
elif "example" in st:
return 2
else:
return -1
else:
return 42
result = sr.apply(f)
print(result)
0 42
1 1
2 2
dtype: int64
字符串 UDF 内存注意事项#
在计算过程中创建中间字符串的 UDF 可能需要进行内存调优。为此提供了方便的 API
from cudf.core.udf.utils import set_malloc_heap_size
set_malloc_heap_size(int(2e9))
使用自定义 numba
内核进行低级控制#
除了 Series.apply() 方法用于执行自定义操作外,您还可以将 Series 对象直接传递到用 Numba 编写的 CUDA 内核中。请注意,本节需要基本的 CUDA 知识。有关详细信息,请参阅Numba 的 CUDA 文档。
编写 Numba 内核最简单的方法是使用 cuda.grid(1)
来管理线程索引,然后利用 Numba 的 forall
方法为我们配置内核。下面,我们将定义一个基本的乘法内核作为示例,并使用 @cuda.jit
进行编译。
df = randomdata(nrows=5, dtypes={"a": int, "b": int, "c": int}, seed=12)
from numba import cuda
@cuda.jit
def multiply(in_col, out_col, multiplier):
i = cuda.grid(1)
if i < in_col.size: # boundary guard
out_col[i] = in_col[i] * multiplier
此内核将接受一个输入数组,将其乘以一个可配置的值(在运行时提供),并将结果存储在输出数组中。请注意,我们将逻辑包装在 if
语句中。因为我们可以启动比数组大小更多的线程,所以我们需要确保不使用索引超出范围的线程。忽略此步骤可能导致未定义的行为。
为了执行我们的内核,必须预先分配一个输出数组并利用上面提到的 forall
方法。首先,我们在 DataFrame 中创建一个全是 0.0
的 Series,因为我们希望输出是 float64
类型。接下来,我们使用 forall
运行内核。forall
要求我们指定所需的任务数量,因此我们将提供 Series 的长度(我们将其存储在 size
中)。cuda_array_interface 是允许我们直接在 Series 上调用 Numba 内核的原因。
size = len(df["a"])
df["e"] = 0.0
multiply.forall(size)(df["a"], df["e"], 10.0)
调用内核后,我们的 DataFrame 现在填充了结果。
df.head()
a | b | c | e | |
---|---|---|---|---|
0 | 963 | 1005 | 997 | 9630.0 |
1 | 977 | 1026 | 980 | 9770.0 |
2 | 1048 | 1026 | 1019 | 10480.0 |
3 | 1078 | 960 | 985 | 10780.0 |
4 | 979 | 982 | 1011 | 9790.0 |
此 API 允许您理论上编写任意内核逻辑,可能会访问和使用 Series 中任意索引的元素,并在 cuDF 数据结构上使用它们。具有一定 CUDA 经验的高级开发者通常可以利用此功能实现迭代转换,或使用自定义内核更快地处理数据管道中的问题区域。
DataFrame UDF#
与 cudf.Series
一样,在 DataFrames 上使用 UDF 有多种方法,这些方法本质上是期望将多个列作为输入的 UDF
cudf.DataFrame.apply
,其功能类似于pd.DataFrame.apply
,并期望一个行 UDFcudf.DataFrame.apply_rows
,它是 numba 的一个轻量级封装,期望一个 numba 内核cudf.DataFrame.apply_chunks
,它类似于cudf.DataFrame.apply_rows
,但提供了更低级的控制。
cudf.DataFrame.apply
#
cudf.DataFrame.apply
是期望将多个列作为输入并产生单个输出列的 UDF 的主要入口点。旨在由此 API 消费的函数以“行”参数的形式编写。“行”被认为是类似于字典的对象,包含 DataFrame 中特定 iloc
处的所有列值。函数可以通过键访问这些值,键即对应于所需值的列名。下面是一个示例函数,用于在 UDF 中将列 A
和列 B
相加。
def f(row):
return row["A"] + row["B"]
让我们创建一些包含至少一个空值的非常基础的示例数据。
df = cudf.DataFrame({"A": [1, 2, 3], "B": [4, cudf.NA, 6]})
df
A | B | |
---|---|---|
0 | 1 | 4 |
1 | 2 | <NA> |
2 | 3 | 6 |
最后,像在 pandas 中一样调用该函数 - 使用 lambda 函数将 UDF 映射到 DataFrame 的“行”上
df.apply(f, axis=1)
0 5
1 <NA>
2 9
dtype: int64
同样的函数应该产生与 pandas 相同的结果
df.to_pandas(nullable=True).apply(f, axis=1)
0 5
1 <NA>
2 9
dtype: object
注意,Pandas 返回 object
数据类型 - 有关此内容的说明请参见注意事项部分。
与 cudf.Series.apply
类似,这些函数支持通用的空值处理。以下是一个函数示例,它在某个输入为空时有条件地返回不同的值
def f(row):
x = row["a"]
if x is cudf.NA:
return 0
else:
return x + 1
df = cudf.DataFrame({"a": [1, cudf.NA, 3]})
df
a | |
---|---|
0 | 1 |
1 | <NA> |
2 | 3 |
df.apply(f, axis=1)
0 2
1 0
2 4
dtype: int64
cudf.NA
也可以直接从函数返回,最终得到具有正确空值的数据,就像在 Pandas 中运行一样。对于以下数据,最后一行满足条件 1 + 3 > 3
,并为该行返回 NA
def f(row):
x = row["a"]
y = row["b"]
if x + y > 3:
return cudf.NA
else:
return x + y
df = cudf.DataFrame({"a": [1, 2, 3], "b": [2, 1, 1]})
df
a | b | |
---|---|---|
0 | 1 | 2 |
1 | 2 | 1 |
2 | 3 | 1 |
df.apply(f, axis=1)
0 3
1 3
2 <NA>
dtype: int64
允许混合类型,但将返回通用类型,而不是像 Pandas 那样的 object 类型。以下是一个在整型和浮点型列之间进行的空值感知操作示例
def f(row):
return row["a"] + row["b"]
df = cudf.DataFrame({"a": [1, 2, 3], "b": [0.5, cudf.NA, 3.14]})
df
a | b | |
---|---|---|
0 | 1 | 0.5 |
1 | 2 | <NA> |
2 | 3 | 3.14 |
df.apply(f, axis=1)
0 1.5
1 <NA>
2 6.14
dtype: float64
函数也可以返回标量值,但结果将被提升为安全类型,而无论数据本身如何。这意味着即使您有一个函数如下所示
def f(x):
if x > 1000:
return 1.5
else:
return 2
并且您的数据是
[1,2,3,4,5]
最终数据中会得到浮点数,即使函数从未返回过浮点数。这是因为 Numba 最终需要生成一个可以处理任何数据类型的函数,这意味着如果有可能产生浮点数,您必须始终假设它会发生。以下是一个在某些情况下返回标量的函数示例
def f(row):
x = row["a"]
if x > 3:
return x
else:
return 1.5
df = cudf.DataFrame({"a": [1, 3, 5]})
df
a | |
---|---|
0 | 1 |
1 | 3 |
2 | 5 |
df.apply(f, axis=1)
0 1.5
1 1.5
2 5.0
dtype: float64
支持任意数量的列和许多算术操作符,从而允许复杂的 UDF
def f(row):
return row["a"] + (row["b"] - (row["c"] / row["d"])) % row["e"]
df = cudf.DataFrame(
{
"a": [1, 2, 3],
"b": [4, 5, 6],
"c": [cudf.NA, 4, 4],
"d": [8, 7, 8],
"e": [7, 1, 6],
}
)
df
a | b | c | d | e | |
---|---|---|---|---|---|
0 | 1 | 4 | <NA> | 8 | 7 |
1 | 2 | 5 | 4 | 7 | 1 |
2 | 3 | 6 | 4 | 8 | 6 |
df.apply(f, axis=1)
0 <NA>
1 2.428571429
2 8.5
dtype: float64
字符串数据#
字符串数据可以在 DataFrame.apply
UDF 内部使用,但受限于与 Series.apply
相同的约束。有关详细信息,请参阅上面关于 Series
UDF 的字符串处理部分。下面是一个简单的示例,在存在字符串列的情况下扩展了上面的行 UDF 逻辑
str_df = cudf.DataFrame(
{"str_col": ["abc", "ABC", "Example"], "scale": [1, 2, 3]}
)
str_df
str_col | scale | |
---|---|---|
0 | abc | 1 |
1 | ABC | 2 |
2 | 示例 | 3 |
def f(row):
st = row["str_col"]
scale = row["scale"]
if len(st) > 5:
return len(st) + scale
else:
return len(st)
result = str_df.apply(f, axis=1)
print(result)
0 3
1 3
2 10
dtype: int64
用户定义聚合函数 (UDAF) 和 GroupBy.apply
#
cuDF 通过基于 numba 的 GroupBy apply jit
引擎支持加速部分用户定义聚合。满足通过 jit
引擎执行所需条件的聚合将自动以此方式运行。希望为 jit
引擎开发聚合函数的用户可以通过向 apply
传递 engine='jit'
来显式调用它
# Create a dataframe with two groups
df = cudf.DataFrame(
{
"a": [1, 1, 1, 2, 2, 2],
"b": [1, 2, 3, 4, 5, 6],
"c": [3, 4, 5, 6, 7, 8],
}
)
df
a | b | c | |
---|---|---|---|
0 | 1 | 1 | 3 |
1 | 1 | 2 | 4 |
2 | 1 | 3 | 5 |
3 | 2 | 4 | 6 |
4 | 2 | 5 | 7 |
5 | 2 | 6 | 8 |
# a user defined aggregation function.
def udaf(df):
return df["b"].max() - df["b"].min() / 2
result = df.groupby("a").apply(udaf, engine="jit")
result
a
1 2.5
2 4.0
dtype: float64
GroupBy JIT 引擎支持的特性#
cuDF 通过 JIT 引擎执行 UDAF 需要输入数据和 UDAF 本身满足几个条件。预计随着开发的进行,许多限制可能会被解除。
限制#
目前不允许包含空值的数据。尝试对包含空值的数据使用
engine='jit'
将会引发错误。广义上讲,只允许 4 字节或 8 字节的整型和浮点型数据类型:
["int32, "int64" ,"float32", "float64"]
。某些函数有额外的数据类型限制,例如
corr
,它尚不支持浮点数据类型。使用此类缺少重载的数据类型调用corr
将会引发错误。如果在 UDAF 内部访问并使用了不支持数据类型的列,cuDF 将会引发错误。
在 UDAF 内部不允许返回新列的操作,例如列之间的二元操作
df['a'] + df['b']
这样做将会引发错误。
目前尚不支持返回
Series
或DataFrame
对象的函数,只允许返回标量的函数。支持以下聚合操作
max()
min()
sum()
mean()
var()
std()
idxmax()
idxmin()
corr()
(仅限整型数据)
用于 DataFrame 的 Numba 内核#
我们可以像上面使用 forall
那样在 DataFrame 上应用 UDF。我们需要编写一个期望多个输入的内核,并在执行内核时传递多个 Series 作为参数。因为这相当常见且可能难以管理,cuDF 提供了两个 API 来简化此过程:apply_rows
和 apply_chunks
。下面,我们将逐步演示如何使用 apply_rows
。apply_chunks
的工作方式类似,但提供了对低级内核行为更多的控制。
df = randomdata(
nrows=10, dtypes={"a": float, "b": bool, "c": str, "e": float}, seed=12
)
df.head()
a | b | c | e | |
---|---|---|---|---|
0 | -0.691674 | True | Dan | -0.958380 |
1 | 0.480099 | False | Bob | -0.729580 |
2 | -0.473370 | True | Xavier | -0.767454 |
3 | 0.067479 | True | Alice | -0.380205 |
4 | -0.970850 | False | Sarah | 0.342905 |
现在我们的 DataFrame 中有两个数字列,让我们编写一个使用这两个列的内核。
def conditional_add(x, y, out):
for i, (a, e) in enumerate(zip(x, y)):
if a > 0:
out[i] = a + e
else:
out[i] = a
请注意,我们需要 enumerate
遍历我们的 zipped
函数参数(这些参数与我们的输入列名匹配或映射)。我们可以将此内核传递给 apply_rows
。我们需要指定几个参数
incols
与函数参数匹配的输入列名列表。或者,一个将输入列名映射到其对应函数参数的字典,例如
{'col1': 'arg1'}
。
outcols
定义输出列名及其数据类型的字典。这些名称必须与函数参数匹配。
kwargs(可选)
我们可以选择性地以字典形式传递关键字参数。由于我们不需要任何参数,因此传递一个空字典。
虽然看起来我们的函数正在按顺序遍历列,但它实际上在 GPU 上的多个线程中并行执行。这种并行性是 GPU 加速计算的核心。了解这些背景后,我们就可以使用我们的 UDF 了。
df = df.apply_rows(
conditional_add,
incols={"a": "x", "e": "y"},
outcols={"out": np.float64},
kwargs={},
)
df.head()
a | b | c | e | out | |
---|---|---|---|---|---|
0 | -0.691674 | True | Dan | -0.958380 | -0.691674 |
1 | 0.480099 | False | Bob | -0.729580 | -0.249480 |
2 | -0.473370 | True | Xavier | -0.767454 | -0.473370 |
3 | 0.067479 | True | Alice | -0.380205 | -0.312726 |
4 | -0.970850 | False | Sarah | 0.342905 | -0.970850 |
正如预期的那样,我们的条件加法起作用了。至此,我们已成功在 cuDF 的核心数据结构上执行了 UDF。
apply_rows
和 apply_chunks
中的空值处理#
默认情况下,DataFrame 应用 UDF 的方法(如 apply_rows
)会悲观地处理空值(如果内核中使用了带有空值的行,则这些行将从输出中移除)。探讨不悲观处理空值可能导致未定义行为超出了本指南的范围。只需说明,悲观的空值处理是安全且一致的方法。您可以在下方看到一个示例。
def gpu_add(a, b, out):
for i, (x, y) in enumerate(zip(a, b)):
out[i] = x + y
df = randomdata(nrows=5, dtypes={"a": int, "b": int, "c": int}, seed=12)
df.loc[2, "a"] = None
df.loc[3, "b"] = None
df.loc[1, "c"] = None
df.head()
a | b | c | |
---|---|---|---|
0 | 963 | 1005 | 997 |
1 | 977 | 1026 | <NA> |
2 | <NA> | 1026 | 1019 |
3 | 1078 | <NA> | 985 |
4 | 979 | 982 | 1011 |
在上面的 dataframe 中,有三个空值。每列在不同的行中有一个空值。当我们使用 apply_rows
和我们的 UDF 时,由于悲观的空值处理,我们的输出应该有两个空值(因为我们没有使用列 c
,那里的空值对我们无关紧要)。
df = df.apply_rows(
gpu_add, incols=["a", "b"], outcols={"out": np.float64}, kwargs={}
)
df.head()
a | b | c | out | |
---|---|---|---|---|
0 | 963 | 1005 | 997 | 1968.0 |
1 | 977 | 1026 | <NA> | 2003.0 |
2 | <NA> | 1026 | 1019 | <NA> |
3 | 1078 | <NA> | 985 | <NA> |
4 | 979 | 982 | 1011 | 1961.0 |
正如预期的那样,我们的输出中最终有两个空值。我们使用的列中的空值传播到了输出,但我们忽略的列中的空值没有传播。
滚动窗口 UDF#
对于时间序列数据,我们可能需要在每次操作一小部分列的“窗口”,独立处理每个部分。我们可以将这个窗口在整个列上滑动(“滚动”)以回答诸如“过去一年股票价格的 3 日移动平均是多少?”之类的问题。
我们可以使用 apply
将更复杂的函数应用于滚动窗口 rolling
Series 和 DataFrames。此示例改编自 cuDF 的API 文档。首先,我们将创建一个示例 Series,然后从该 Series 创建一个 rolling
对象。
ser = cudf.Series([16, 25, 36, 49, 64, 81], dtype="float64")
ser
0 16.0
1 25.0
2 36.0
3 49.0
4 64.0
5 81.0
dtype: float64
rolling = ser.rolling(window=3, min_periods=3, center=False)
rolling
Rolling [window=3,min_periods=3,center=False]
接下来,我们将定义一个要在滚动窗口上使用的函数。我们创建这个函数是为了强调如何包含循环、数学函数和条件语句等内容。滚动窗口 UDF 尚不支持空值。
import math
def example_func(window):
b = 0
for a in window:
b = max(b, math.sqrt(a))
if b == 8:
return 100
return b
我们可以通过将其传递给 apply
来执行函数。当 window=3
, min_periods=3
, 并且 center=False
时,我们的前两个值是 null
。
rolling.apply(example_func)
0 <NA>
1 <NA>
2 6.0
3 7.0
4 100.0
5 9.0
dtype: float64
我们也可以将此函数应用于 DataFrame 中的每一列。
df2 = cudf.DataFrame()
df2["a"] = np.arange(55, 65, dtype="float64")
df2["b"] = np.arange(55, 65, dtype="float64")
df2.head()
a | b | |
---|---|---|
0 | 55.0 | 55.0 |
1 | 56.0 | 56.0 |
2 | 57.0 | 57.0 |
3 | 58.0 | 58.0 |
4 | 59.0 | 59.0 |
rolling = df2.rolling(window=3, min_periods=3, center=False)
rolling.apply(example_func)
a | b | |
---|---|---|
0 | <NA> | <NA> |
1 | <NA> | <NA> |
2 | 7.549834435 | 7.549834435 |
3 | 7.615773106 | 7.615773106 |
4 | 7.681145748 | 7.681145748 |
5 | 7.745966692 | 7.745966692 |
6 | 7.810249676 | 7.810249676 |
7 | 7.874007874 | 7.874007874 |
8 | 7.937253933 | 7.937253933 |
9 | 100.0 | 100.0 |
GroupBy DataFrame UDF#
我们还可以使用 apply_grouped
将 UDF 应用于分组的 DataFrames。
首先,我们将根据列 b
对 DataFrame 进行分组,该列的值为 True 或 False。
df = randomdata(
nrows=10, dtypes={"a": float, "b": bool, "c": str, "e": float}, seed=12
)
df.head()
a | b | c | e | |
---|---|---|---|---|
0 | -0.691674 | True | Dan | -0.958380 |
1 | 0.480099 | False | Bob | -0.729580 |
2 | -0.473370 | True | Xavier | -0.767454 |
3 | 0.067479 | True | Alice | -0.380205 |
4 | -0.970850 | False | Sarah | 0.342905 |
grouped = df.groupby(["b"])
接下来,我们将定义一个函数,独立应用于每个组。在本例中,我们将计算列 e
的滚动平均值,并将新列命名为 rolling_avg_e
。
def rolling_avg(e, rolling_avg_e):
win_size = 3
for i in range(cuda.threadIdx.x, len(e), cuda.blockDim.x):
if i < win_size - 1:
# If there is not enough data to fill the window,
# take the average to be NaN
rolling_avg_e[i] = np.nan
else:
total = 0
for j in range(i - win_size + 1, i + 1):
total += e[j]
rolling_avg_e[i] = total / win_size
我们可以使用与 apply_rows
非常相似的 API 执行此操作。不过,这次它将独立地为每个组执行。
results = grouped.apply_grouped(
rolling_avg, incols=["e"], outcols=dict(rolling_avg_e=np.float64)
)
results
a | b | c | e | rolling_avg_e | |
---|---|---|---|---|---|
1 | 0.480099 | False | Bob | -0.729580 | NaN |
4 | -0.970850 | False | Sarah | 0.342905 | NaN |
6 | 0.801430 | False | Sarah | 0.632337 | 0.081887 |
7 | -0.933157 | False | Quinn | -0.420826 | 0.184805 |
0 | -0.691674 | True | Dan | -0.958380 | NaN |
2 | -0.473370 | True | Xavier | -0.767454 | NaN |
3 | 0.067479 | True | Alice | -0.380205 | -0.702013 |
5 | 0.837494 | True | Wendy | -0.057540 | -0.401733 |
8 | 0.913899 | True | Ursula | 0.466252 | 0.009502 |
9 | -0.725581 | True | George | 0.405245 | 0.271319 |
请注意,在内核中,当窗口大小为三时,输出列中每个组的前两个值都是空值。
CuPy 数组上的 Numba 内核#
我们也可以在 CuPy NDArrays 上执行 Numba 内核,同样得益于 __cuda_array_interface__
。我们甚至可以在 Series 和 CuPy 数组上运行相同的 UDF。首先,我们定义一个 Series,然后从该 Series 创建一个 CuPy 数组。
import cupy as cp
s = cudf.Series([1.0, 2, 3, 4, 10])
arr = cp.asarray(s)
arr
array([ 1., 2., 3., 4., 10.])
接下来,我们定义一个 UDF 并在我们的 Series 上执行它。我们需要为输出分配一个相同大小的 Series,我们将其命名为 out
。
@cuda.jit
def multiply_by_5(x, out):
i = cuda.grid(1)
if i < x.size:
out[i] = x[i] * 5
out = cudf.Series(cp.zeros(len(s), dtype="int32"))
multiply_by_5.forall(s.shape[0])(s, out)
out
0 5
1 10
2 15
3 20
4 50
dtype: int32
最后,我们在我们的数组上执行相同的函数。我们分配一个空数组 out
来存储结果。
out = cp.empty_like(arr)
multiply_by_5.forall(arr.size)(arr, out)
out
array([ 5., 10., 15., 20., 50.])
注意事项#
目前,UDF 仅支持数值非十进制标量类型(完全支持)以及
Series.apply
和DataFrame.apply
中的字符串(部分支持,受限于上述注意事项)。尝试使用不支持的类型调用此 API 将引发TypeError
。我们尚未完全支持所有算术操作符。某些操作(如位操作)目前尚未实现,但计划在未来版本中实现。如果需要某个操作符,应在 GitHub 上提出问题,以便对其进行适当的优先级排序和实现。
总结#
本指南涵盖了很多内容。至此,您应该能够自如地编写用于以下数据结构(带或不带空值)的 UDF:
Series
DataFrame
滚动窗口
GroupBy DataFrames
CuPy NDArrays
Numba DeviceNDArrays
通用 NA UDF
字符串 UDF
更多信息请参阅 cuDF、Numba.cuda 和 CuPy 文档。