cuDF 用户定义函数概览#

import cudf
import numpy as np
from cudf.datasets import randomdata
from numba import config

config.CUDA_LOW_OCCUPANCY_WARNINGS = 0

与许多表格数据处理 API 类似,cuDF 提供了一系列可组合的 DataFrame 风格操作符。虽然现有的函数灵活且有用,但有时需要编写自定义代码或用户定义函数 (UDF),以便应用于组成 DataFrame 的行、列和其他分组的单元格。

结合更广泛的 GPU PyData 生态系统,cuDF 提供了在各种数据结构上运行 UDF 的接口。目前,我们只能在数字、布尔型、日期时间和时间差类型的数据上执行 UDF,并在某些 API 中部分支持字符串。本指南涵盖了如何在以下数据结构上编写和执行 UDF

  • Series

  • DataFrame

  • 滚动窗口序列 (Rolling Windows Series)

  • GroupBy DataFrames

  • CuPy NDArrays

  • Numba DeviceNDArrays

它还演示了 cuDF 的默认空值处理行为,以及如何编写可以与空值交互的 UDF。

Series UDF#

您可以通过两种方式在 Series 上执行 UDF

  • 编写标准的 Python 函数并使用 cudf.Series.apply

  • 编写 Numba 内核并使用 Numba 的 forall 语法

使用 apply 更简单,但编写 Numba 内核提供了构建更复杂函数的灵活性(本指南中我们只编写简单的内核)。

cudf.Series.apply#

cuDF 提供了一个类似于 pandas.Series.apply 的 API,用于将标量 UDF 应用于 series 对象。这是一个非常基础的示例。

# Create a cuDF series
sr = cudf.Series([1, 2, 3])

用于 cudf.Series.apply 的 UDF 可能看起来像这样

# define a scalar function


def f(x):
    return x + 1

cudf.Series.apply 的调用方式与 pd.Series.apply 类似,并返回一个新的 Series 对象

sr.apply(f)
0    2
1    3
2    4
dtype: int64

带额外标量参数的函数#

此外,cudf.Series.apply 支持像 pandas 一样的 args= 参数,允许您编写接受任意数量标量参数的 UDF。以下是此类函数及其在 pandas 和 cuDF 中的 API 调用示例

def g(x, const):
    return x + const
# cuDF apply
sr.apply(g, args=(42,))
0    43
1    44
2    45
dtype: int64

最后请注意,尚不支持 **kwargs

可空数据#

空值 NA 会在一元和二元操作中传播。因此,NA + 1, abs(NA), 和 NA == NA 都返回 NA。为了具体说明,让我们看看上面的同一个示例,这次使用可空数据

# Create a cuDF series with nulls
sr = cudf.Series([1, cudf.NA, 3])
sr
0       1
1    <NA>
2       3
dtype: int64
# redefine the same function from above


def f(x):
    return x + 1
# cuDF result
sr.apply(f)
0       2
1    <NA>
2       4
dtype: int64

然而,通常您希望在函数内部进行显式的空值处理。cuDF 以与 pandas 相同的方式公开此功能,即直接与 NA 单例对象交互。以下是一个具有显式空值处理功能的函数示例

def f_null_sensitive(x):
    # do something if the input is null
    if x is cudf.NA:
        return 42
    else:
        return x + 1
# cuDF result
sr.apply(f_null_sensitive)
0     2
1    42
2     4
dtype: int64

此外,cudf.NA 可以直接或有条件地从函数返回。此功能应允许您在各种情况下实现自定义的空值处理。

字符串数据#

apply 的部分字符串功能的实验性支持已可用。目前支持以下字符串操作

  • str.count

  • str.startswith

  • str.endswith

  • str.find

  • str.rfind

  • str.isalnum

  • str.isdecimal

  • str.isdigit

  • str.islower

  • str.isupper

  • str.isalpha

  • str.istitle

  • str.isspace

  • ==, !=, >=, <=, >, < (between two strings)

  • len (e.g. len(some_string))

  • in (e.g, 'abc' in some_string)

  • strip

  • lstrip

  • rstrip

  • upper

  • lower

  • + (string concatenation)

  • replace

sr = cudf.Series(["", "abc", "some_example"])
def f(st):
    if len(st) > 0:
        if st.startswith("a"):
            return 1
        elif "example" in st:
            return 2
        else:
            return -1
    else:
        return 42
result = sr.apply(f)
print(result)
0    42
1     1
2     2
dtype: int64

字符串 UDF 内存注意事项#

在计算过程中创建中间字符串的 UDF 可能需要进行内存调优。为此提供了方便的 API

from cudf.core.udf.utils import set_malloc_heap_size

set_malloc_heap_size(int(2e9))

使用自定义 numba 内核进行低级控制#

除了 Series.apply() 方法用于执行自定义操作外,您还可以将 Series 对象直接传递到用 Numba 编写的 CUDA 内核中。请注意,本节需要基本的 CUDA 知识。有关详细信息,请参阅Numba 的 CUDA 文档

编写 Numba 内核最简单的方法是使用 cuda.grid(1) 来管理线程索引,然后利用 Numba 的 forall 方法为我们配置内核。下面,我们将定义一个基本的乘法内核作为示例,并使用 @cuda.jit 进行编译。

df = randomdata(nrows=5, dtypes={"a": int, "b": int, "c": int}, seed=12)
from numba import cuda


@cuda.jit
def multiply(in_col, out_col, multiplier):
    i = cuda.grid(1)
    if i < in_col.size:  # boundary guard
        out_col[i] = in_col[i] * multiplier

此内核将接受一个输入数组,将其乘以一个可配置的值(在运行时提供),并将结果存储在输出数组中。请注意,我们将逻辑包装在 if 语句中。因为我们可以启动比数组大小更多的线程,所以我们需要确保不使用索引超出范围的线程。忽略此步骤可能导致未定义的行为。

为了执行我们的内核,必须预先分配一个输出数组并利用上面提到的 forall 方法。首先,我们在 DataFrame 中创建一个全是 0.0 的 Series,因为我们希望输出是 float64 类型。接下来,我们使用 forall 运行内核。forall 要求我们指定所需的任务数量,因此我们将提供 Series 的长度(我们将其存储在 size 中)。cuda_array_interface 是允许我们直接在 Series 上调用 Numba 内核的原因。

size = len(df["a"])
df["e"] = 0.0
multiply.forall(size)(df["a"], df["e"], 10.0)

调用内核后,我们的 DataFrame 现在填充了结果。

df.head()
a b c e
0 963 1005 997 9630.0
1 977 1026 980 9770.0
2 1048 1026 1019 10480.0
3 1078 960 985 10780.0
4 979 982 1011 9790.0

此 API 允许您理论上编写任意内核逻辑,可能会访问和使用 Series 中任意索引的元素,并在 cuDF 数据结构上使用它们。具有一定 CUDA 经验的高级开发者通常可以利用此功能实现迭代转换,或使用自定义内核更快地处理数据管道中的问题区域。

DataFrame UDF#

cudf.Series 一样,在 DataFrames 上使用 UDF 有多种方法,这些方法本质上是期望将多个列作为输入的 UDF

  • cudf.DataFrame.apply,其功能类似于 pd.DataFrame.apply,并期望一个行 UDF

  • cudf.DataFrame.apply_rows,它是 numba 的一个轻量级封装,期望一个 numba 内核

  • cudf.DataFrame.apply_chunks,它类似于 cudf.DataFrame.apply_rows,但提供了更低级的控制。

cudf.DataFrame.apply#

cudf.DataFrame.apply 是期望将多个列作为输入并产生单个输出列的 UDF 的主要入口点。旨在由此 API 消费的函数以“行”参数的形式编写。“行”被认为是类似于字典的对象,包含 DataFrame 中特定 iloc 处的所有列值。函数可以通过键访问这些值,键即对应于所需值的列名。下面是一个示例函数,用于在 UDF 中将列 A 和列 B 相加。

def f(row):
    return row["A"] + row["B"]

让我们创建一些包含至少一个空值的非常基础的示例数据。

df = cudf.DataFrame({"A": [1, 2, 3], "B": [4, cudf.NA, 6]})
df
A B
0 1 4
1 2 <NA>
2 3 6

最后,像在 pandas 中一样调用该函数 - 使用 lambda 函数将 UDF 映射到 DataFrame 的“行”上

df.apply(f, axis=1)
0       5
1    <NA>
2       9
dtype: int64

同样的函数应该产生与 pandas 相同的结果

df.to_pandas(nullable=True).apply(f, axis=1)
0       5
1    <NA>
2       9
dtype: object

注意,Pandas 返回 object 数据类型 - 有关此内容的说明请参见注意事项部分。

cudf.Series.apply 类似,这些函数支持通用的空值处理。以下是一个函数示例,它在某个输入为空时有条件地返回不同的值

def f(row):
    x = row["a"]
    if x is cudf.NA:
        return 0
    else:
        return x + 1


df = cudf.DataFrame({"a": [1, cudf.NA, 3]})
df
a
0 1
1 <NA>
2 3
df.apply(f, axis=1)
0    2
1    0
2    4
dtype: int64

cudf.NA 也可以直接从函数返回,最终得到具有正确空值的数据,就像在 Pandas 中运行一样。对于以下数据,最后一行满足条件 1 + 3 > 3,并为该行返回 NA

def f(row):
    x = row["a"]
    y = row["b"]
    if x + y > 3:
        return cudf.NA
    else:
        return x + y


df = cudf.DataFrame({"a": [1, 2, 3], "b": [2, 1, 1]})
df
a b
0 1 2
1 2 1
2 3 1
df.apply(f, axis=1)
0       3
1       3
2    <NA>
dtype: int64

允许混合类型,但将返回通用类型,而不是像 Pandas 那样的 object 类型。以下是一个在整型和浮点型列之间进行的空值感知操作示例

def f(row):
    return row["a"] + row["b"]


df = cudf.DataFrame({"a": [1, 2, 3], "b": [0.5, cudf.NA, 3.14]})
df
a b
0 1 0.5
1 2 <NA>
2 3 3.14
df.apply(f, axis=1)
0     1.5
1    <NA>
2    6.14
dtype: float64

函数也可以返回标量值,但结果将被提升为安全类型,而无论数据本身如何。这意味着即使您有一个函数如下所示

def f(x):
    if x > 1000:
        return 1.5
    else:
        return 2

并且您的数据是

[1,2,3,4,5]

最终数据中会得到浮点数,即使函数从未返回过浮点数。这是因为 Numba 最终需要生成一个可以处理任何数据类型的函数,这意味着如果有可能产生浮点数,您必须始终假设它会发生。以下是一个在某些情况下返回标量的函数示例

def f(row):
    x = row["a"]
    if x > 3:
        return x
    else:
        return 1.5


df = cudf.DataFrame({"a": [1, 3, 5]})
df
a
0 1
1 3
2 5
df.apply(f, axis=1)
0    1.5
1    1.5
2    5.0
dtype: float64

支持任意数量的列和许多算术操作符,从而允许复杂的 UDF

def f(row):
    return row["a"] + (row["b"] - (row["c"] / row["d"])) % row["e"]


df = cudf.DataFrame(
    {
        "a": [1, 2, 3],
        "b": [4, 5, 6],
        "c": [cudf.NA, 4, 4],
        "d": [8, 7, 8],
        "e": [7, 1, 6],
    }
)
df
a b c d e
0 1 4 <NA> 8 7
1 2 5 4 7 1
2 3 6 4 8 6
df.apply(f, axis=1)
0           <NA>
1    2.428571429
2            8.5
dtype: float64

字符串数据#

字符串数据可以在 DataFrame.apply UDF 内部使用,但受限于与 Series.apply 相同的约束。有关详细信息,请参阅上面关于 Series UDF 的字符串处理部分。下面是一个简单的示例,在存在字符串列的情况下扩展了上面的行 UDF 逻辑

str_df = cudf.DataFrame(
    {"str_col": ["abc", "ABC", "Example"], "scale": [1, 2, 3]}
)
str_df
str_col scale
0 abc 1
1 ABC 2
2 示例 3
def f(row):
    st = row["str_col"]
    scale = row["scale"]

    if len(st) > 5:
        return len(st) + scale
    else:
        return len(st)
result = str_df.apply(f, axis=1)
print(result)
0     3
1     3
2    10
dtype: int64

用户定义聚合函数 (UDAF) 和 GroupBy.apply#

cuDF 通过基于 numba 的 GroupBy apply jit 引擎支持加速部分用户定义聚合。满足通过 jit 引擎执行所需条件的聚合将自动以此方式运行。希望为 jit 引擎开发聚合函数的用户可以通过向 apply 传递 engine='jit' 来显式调用它

# Create a dataframe with two groups
df = cudf.DataFrame(
    {
        "a": [1, 1, 1, 2, 2, 2],
        "b": [1, 2, 3, 4, 5, 6],
        "c": [3, 4, 5, 6, 7, 8],
    }
)
df
a b c
0 1 1 3
1 1 2 4
2 1 3 5
3 2 4 6
4 2 5 7
5 2 6 8
# a user defined aggregation function.


def udaf(df):
    return df["b"].max() - df["b"].min() / 2
result = df.groupby("a").apply(udaf, engine="jit")
result
a
1    2.5
2    4.0
dtype: float64

GroupBy JIT 引擎支持的特性#

cuDF 通过 JIT 引擎执行 UDAF 需要输入数据和 UDAF 本身满足几个条件。预计随着开发的进行,许多限制可能会被解除。

限制#

  • 目前不允许包含空值的数据。尝试对包含空值的数据使用 engine='jit' 将会引发错误。

  • 广义上讲,只允许 4 字节或 8 字节的整型和浮点型数据类型:["int32, "int64" ,"float32", "float64"]

  • 某些函数有额外的数据类型限制,例如 corr,它尚不支持浮点数据类型。使用此类缺少重载的数据类型调用 corr 将会引发错误。

  • 如果在 UDAF 内部访问并使用了不支持数据类型的列,cuDF 将会引发错误。

  • 在 UDAF 内部不允许返回新列的操作,例如列之间的二元操作

    df['a'] + df['b']
    

    这样做将会引发错误。

  • 目前尚不支持返回 SeriesDataFrame 对象的函数,只允许返回标量的函数。

  • 支持以下聚合操作

    • max()

    • min()

    • sum()

    • mean()

    • var()

    • std()

    • idxmax()

    • idxmin()

    • corr() (仅限整型数据)

用于 DataFrame 的 Numba 内核#

我们可以像上面使用 forall 那样在 DataFrame 上应用 UDF。我们需要编写一个期望多个输入的内核,并在执行内核时传递多个 Series 作为参数。因为这相当常见且可能难以管理,cuDF 提供了两个 API 来简化此过程:apply_rowsapply_chunks。下面,我们将逐步演示如何使用 apply_rowsapply_chunks 的工作方式类似,但提供了对低级内核行为更多的控制。

df = randomdata(
    nrows=10, dtypes={"a": float, "b": bool, "c": str, "e": float}, seed=12
)
df.head()
a b c e
0 -0.691674 True Dan -0.958380
1 0.480099 False Bob -0.729580
2 -0.473370 True Xavier -0.767454
3 0.067479 True Alice -0.380205
4 -0.970850 False Sarah 0.342905

现在我们的 DataFrame 中有两个数字列,让我们编写一个使用这两个列的内核。

def conditional_add(x, y, out):
    for i, (a, e) in enumerate(zip(x, y)):
        if a > 0:
            out[i] = a + e
        else:
            out[i] = a

请注意,我们需要 enumerate 遍历我们的 zipped 函数参数(这些参数与我们的输入列名匹配或映射)。我们可以将此内核传递给 apply_rows。我们需要指定几个参数

  • incols

    • 与函数参数匹配的输入列名列表。或者,一个将输入列名映射到其对应函数参数的字典,例如 {'col1': 'arg1'}

  • outcols

    • 定义输出列名及其数据类型的字典。这些名称必须与函数参数匹配。

  • kwargs(可选)

    • 我们可以选择性地以字典形式传递关键字参数。由于我们不需要任何参数,因此传递一个空字典。

虽然看起来我们的函数正在按顺序遍历列,但它实际上在 GPU 上的多个线程中并行执行。这种并行性是 GPU 加速计算的核心。了解这些背景后,我们就可以使用我们的 UDF 了。

df = df.apply_rows(
    conditional_add,
    incols={"a": "x", "e": "y"},
    outcols={"out": np.float64},
    kwargs={},
)
df.head()
a b c e out
0 -0.691674 True Dan -0.958380 -0.691674
1 0.480099 False Bob -0.729580 -0.249480
2 -0.473370 True Xavier -0.767454 -0.473370
3 0.067479 True Alice -0.380205 -0.312726
4 -0.970850 False Sarah 0.342905 -0.970850

正如预期的那样,我们的条件加法起作用了。至此,我们已成功在 cuDF 的核心数据结构上执行了 UDF。

apply_rowsapply_chunks 中的空值处理#

默认情况下,DataFrame 应用 UDF 的方法(如 apply_rows)会悲观地处理空值(如果内核中使用了带有空值的行,则这些行将从输出中移除)。探讨不悲观处理空值可能导致未定义行为超出了本指南的范围。只需说明,悲观的空值处理是安全且一致的方法。您可以在下方看到一个示例。

def gpu_add(a, b, out):
    for i, (x, y) in enumerate(zip(a, b)):
        out[i] = x + y


df = randomdata(nrows=5, dtypes={"a": int, "b": int, "c": int}, seed=12)
df.loc[2, "a"] = None
df.loc[3, "b"] = None
df.loc[1, "c"] = None
df.head()
a b c
0 963 1005 997
1 977 1026 <NA>
2 <NA> 1026 1019
3 1078 <NA> 985
4 979 982 1011

在上面的 dataframe 中,有三个空值。每列在不同的行中有一个空值。当我们使用 apply_rows 和我们的 UDF 时,由于悲观的空值处理,我们的输出应该有两个空值(因为我们没有使用列 c,那里的空值对我们无关紧要)。

df = df.apply_rows(
    gpu_add, incols=["a", "b"], outcols={"out": np.float64}, kwargs={}
)
df.head()
a b c out
0 963 1005 997 1968.0
1 977 1026 <NA> 2003.0
2 <NA> 1026 1019 <NA>
3 1078 <NA> 985 <NA>
4 979 982 1011 1961.0

正如预期的那样,我们的输出中最终有两个空值。我们使用的列中的空值传播到了输出,但我们忽略的列中的空值没有传播。

滚动窗口 UDF#

对于时间序列数据,我们可能需要在每次操作一小部分列的“窗口”,独立处理每个部分。我们可以将这个窗口在整个列上滑动(“滚动”)以回答诸如“过去一年股票价格的 3 日移动平均是多少?”之类的问题。

我们可以使用 apply 将更复杂的函数应用于滚动窗口 rolling Series 和 DataFrames。此示例改编自 cuDF 的API 文档。首先,我们将创建一个示例 Series,然后从该 Series 创建一个 rolling 对象。

ser = cudf.Series([16, 25, 36, 49, 64, 81], dtype="float64")
ser
0    16.0
1    25.0
2    36.0
3    49.0
4    64.0
5    81.0
dtype: float64
rolling = ser.rolling(window=3, min_periods=3, center=False)
rolling
Rolling [window=3,min_periods=3,center=False]

接下来,我们将定义一个要在滚动窗口上使用的函数。我们创建这个函数是为了强调如何包含循环、数学函数和条件语句等内容。滚动窗口 UDF 尚不支持空值。

import math


def example_func(window):
    b = 0
    for a in window:
        b = max(b, math.sqrt(a))
    if b == 8:
        return 100
    return b

我们可以通过将其传递给 apply 来执行函数。当 window=3, min_periods=3, 并且 center=False 时,我们的前两个值是 null

rolling.apply(example_func)
0     <NA>
1     <NA>
2      6.0
3      7.0
4    100.0
5      9.0
dtype: float64

我们也可以将此函数应用于 DataFrame 中的每一列。

df2 = cudf.DataFrame()
df2["a"] = np.arange(55, 65, dtype="float64")
df2["b"] = np.arange(55, 65, dtype="float64")
df2.head()
a b
0 55.0 55.0
1 56.0 56.0
2 57.0 57.0
3 58.0 58.0
4 59.0 59.0
rolling = df2.rolling(window=3, min_periods=3, center=False)
rolling.apply(example_func)
a b
0 <NA> <NA>
1 <NA> <NA>
2 7.549834435 7.549834435
3 7.615773106 7.615773106
4 7.681145748 7.681145748
5 7.745966692 7.745966692
6 7.810249676 7.810249676
7 7.874007874 7.874007874
8 7.937253933 7.937253933
9 100.0 100.0

GroupBy DataFrame UDF#

我们还可以使用 apply_grouped 将 UDF 应用于分组的 DataFrames。

首先,我们将根据列 b 对 DataFrame 进行分组,该列的值为 True 或 False。

df = randomdata(
    nrows=10, dtypes={"a": float, "b": bool, "c": str, "e": float}, seed=12
)
df.head()
a b c e
0 -0.691674 True Dan -0.958380
1 0.480099 False Bob -0.729580
2 -0.473370 True Xavier -0.767454
3 0.067479 True Alice -0.380205
4 -0.970850 False Sarah 0.342905
grouped = df.groupby(["b"])

接下来,我们将定义一个函数,独立应用于每个组。在本例中,我们将计算列 e 的滚动平均值,并将新列命名为 rolling_avg_e

def rolling_avg(e, rolling_avg_e):
    win_size = 3
    for i in range(cuda.threadIdx.x, len(e), cuda.blockDim.x):
        if i < win_size - 1:
            # If there is not enough data to fill the window,
            # take the average to be NaN
            rolling_avg_e[i] = np.nan
        else:
            total = 0
            for j in range(i - win_size + 1, i + 1):
                total += e[j]
            rolling_avg_e[i] = total / win_size

我们可以使用与 apply_rows 非常相似的 API 执行此操作。不过,这次它将独立地为每个组执行。

results = grouped.apply_grouped(
    rolling_avg, incols=["e"], outcols=dict(rolling_avg_e=np.float64)
)
results
a b c e rolling_avg_e
1 0.480099 False Bob -0.729580 NaN
4 -0.970850 False Sarah 0.342905 NaN
6 0.801430 False Sarah 0.632337 0.081887
7 -0.933157 False Quinn -0.420826 0.184805
0 -0.691674 True Dan -0.958380 NaN
2 -0.473370 True Xavier -0.767454 NaN
3 0.067479 True Alice -0.380205 -0.702013
5 0.837494 True Wendy -0.057540 -0.401733
8 0.913899 True Ursula 0.466252 0.009502
9 -0.725581 True George 0.405245 0.271319

请注意,在内核中,当窗口大小为三时,输出列中每个组的前两个值都是空值。

CuPy 数组上的 Numba 内核#

我们也可以在 CuPy NDArrays 上执行 Numba 内核,同样得益于 __cuda_array_interface__。我们甚至可以在 Series 和 CuPy 数组上运行相同的 UDF。首先,我们定义一个 Series,然后从该 Series 创建一个 CuPy 数组。

import cupy as cp

s = cudf.Series([1.0, 2, 3, 4, 10])
arr = cp.asarray(s)
arr
array([ 1.,  2.,  3.,  4., 10.])

接下来,我们定义一个 UDF 并在我们的 Series 上执行它。我们需要为输出分配一个相同大小的 Series,我们将其命名为 out

@cuda.jit
def multiply_by_5(x, out):
    i = cuda.grid(1)
    if i < x.size:
        out[i] = x[i] * 5


out = cudf.Series(cp.zeros(len(s), dtype="int32"))
multiply_by_5.forall(s.shape[0])(s, out)
out
0     5
1    10
2    15
3    20
4    50
dtype: int32

最后,我们在我们的数组上执行相同的函数。我们分配一个空数组 out 来存储结果。

out = cp.empty_like(arr)
multiply_by_5.forall(arr.size)(arr, out)
out
array([ 5., 10., 15., 20., 50.])

注意事项#

  • 目前,UDF 仅支持数值非十进制标量类型(完全支持)以及 Series.applyDataFrame.apply 中的字符串(部分支持,受限于上述注意事项)。尝试使用不支持的类型调用此 API 将引发 TypeError

  • 我们尚未完全支持所有算术操作符。某些操作(如位操作)目前尚未实现,但计划在未来版本中实现。如果需要某个操作符,应在 GitHub 上提出问题,以便对其进行适当的优先级排序和实现。

总结#

本指南涵盖了很多内容。至此,您应该能够自如地编写用于以下数据结构(带或不带空值)的 UDF:

  • Series

  • DataFrame

  • 滚动窗口

  • GroupBy DataFrames

  • CuPy NDArrays

  • Numba DeviceNDArrays

  • 通用 NA UDF

  • 字符串 UDF

更多信息请参阅 cuDFNumba.cudaCuPy 文档。