使用十亿行挑战衡量性能#

十亿行挑战”是一项针对 Java 开发人员的编程竞赛,旨在编写最高效的代码来处理一个包含十亿行文本的文件并计算一些指标。这项挑战激发了 Java 之外许多语言的解决方案,包括 Python

在本 Notebook 中,我们将探讨如何使用 RAPIDS 构建高效的 Python 解决方案,以及如何使用仪表板来了解代码的性能表现。

问题#

挑战的输入数据是一个约 13GB 的文本文件,包含十亿行温度测量数据。文件结构为每行一个测量值,气象站名称和测量值用分号分隔。

Hamburg;12.0
Bulawayo;8.9
Palembang;38.8
St. John's;15.2
Cracow;12.6
...

我们的目标是计算每个气象站的最低、平均和最高温度,并按站点名称的字母顺序排序,尽可能快地完成。

参考实现#

使用流行的 PyData 工具编写的参考实现可能与以下 Pandas 代码类似(假设您有足够的 RAM 将数据加载到内存中)。

import pandas as pd

df = pd.read_csv(
    "measurements.txt",
    sep=";",
    header=None,
    names=["station", "measure"],
    engine='pyarrow'
)
df = df.groupby("station").agg(["min", "max", "mean"])
df.columns = df.columns.droplevel()
df = df.sort_values("station")

这里我们使用 pandas.read_csv() 打开文本文件,并指定 ; 分隔符,同时设置一些列名。我们还将引擎设置为 pyarrow,以获得一些开箱即用的额外性能。

然后,我们按气象站名称对测量值进行分组,并计算最小值、最大值和平均值。最后,我们按站点名称对分组后的 DataFrame 进行排序。

在配备 12 核 CPU 的工作站上运行此任务,大约需要 4 分钟 完成。

部署 RAPIDS#

要运行本 Notebook,我们需要一台或多台配备 GPU 的机器。您可以通过多种方式获得:

一旦拥有了 GPU 机器,您需要安装 RAPIDS。您可以使用 pipcondadocker 来完成。

我们还将使用带有 RAPIDS nvdashboard 扩展Dask Lab 扩展 的 Jupyter Lab,以便了解机器正在做什么。如果您使用 Docker 容器,这些工具已为您安装好,否则您需要自行安装。

仪表板#

一旦您安装了扩展并下载了本 Notebook 后,启动并运行 Jupyter,即可打开一些性能仪表板,以便在代码运行时监控硬件。

我们从 nvdashboard 开始,它在左侧工具栏中有 GPU 图标。

首先打开“Machine Resources”表格、“GPU Utilization”图表和“GPU Memory”图表,并将它们移动到右侧。

数据生成#

在开始解决问题之前,我们需要生成输入数据。1BRC 代码库中有一个 Java 实现,生成文件大约需要 15 分钟。

如果您运行 Java 实现,会看到 CPU 繁忙但磁盘带宽较低,这表明这是一个计算密集型问题。我们可以使用 cuDF 和 CuPy 在 GPU 上加速这一过程。

下载包含站点及其平均温度的 lookup.csv 表格,我们将用它来生成包含 n 行随机温度的数据文件。

为了生成每一行,我们从 lookup 表中随机选择一个站点,然后围绕平均温度生成一个符合正态分布的随机温度测量值。我们假设所有站点的标准差为 10.0

import time
from pathlib import Path

import cudf
import cupy as cp
def generate_chunk(filename, chunksize, std, lookup_df):
    """Generate some sample data based on the lookup table."""
    df = cudf.DataFrame(
        {
            # Choose a random station from the lookup table for each row in our output
            "station": cp.random.randint(0, len(lookup_df) - 1, int(chunksize)),
            # Generate a normal distribution around zero for each row in our output
            # Because the std is the same for every station we can adjust the mean for each row afterwards
            "measure": cp.random.normal(0, std, int(chunksize)),
        }
    )

    # Offset each measurement by the station's mean value
    df.measure += df.station.map(lookup_df.mean_temp)
    # Round the temperature to one decimal place
    df.measure = df.measure.round(decimals=1)
    # Convert the station index to the station name
    df.station = df.station.map(lookup_df.station)

    # Append this chunk to the output file
    with open(filename, "a") as fh:
        df.to_csv(fh, sep=";", chunksize=10_000_000, header=False, index=False)

配置#

n = 1_000_000_000  # Number of rows of data to generate

lookup_df = cudf.read_csv(
    "lookup.csv"
)  # Load our lookup table of stations and their mean temperatures
std = 10.0  # We assume temperatures are normally distributed with a standard deviation of 10
chunksize = 2e8  # Set the number of rows to generate in one go (reduce this if you run into GPU RAM limits)
filename = Path("measurements.txt")  # Choose where to write to
filename.unlink() if filename.exists() else None  # Delete the file if it exists already

运行数据生成#

%%time
# Loop over chunks and generate data
start = time.time()
for i in range(int(n / chunksize)):
    # Generate a chunk
    generate_chunk(filename, chunksize, std, lookup_df)

    # Update the progress bar
    percent_complete = int(((i + 1) * chunksize) / n * 100)
    time_taken = int(time.time() - start)
    time_remaining = int((time_taken / percent_complete) * 100) - time_taken
    print(
        (
            f"Writing {int(n / 1e9)} billion rows to {filename}: {percent_complete}% "
            f"in {time_taken}s ({time_remaining}s remaining)"
        ),
        end="\r",
    )
print()
Writing 1 billion rows to measurements.txt: 100% in 25s (0s remaining)
CPU times: user 10.1 s, sys: 18 s, total: 28.2 s
Wall time: 25.3 s

如果您在运行此单元时观察图表,应该会看到 GPU 生成随机数时出现 GPU 利用率的峰值,然后数据写入磁盘时出现磁盘 IO 的峰值。每个生成的块都会出现这种模式。

注意

我们可以在这里通过在当前块写入磁盘时生成下一个块来进一步提高性能,但目前看来 30 倍的加速已经足够优化了。

检查文件#

现在我们可以验证数据集的大小是否符合预期,并且包含符合挑战所需格式的行。

!ls -lh {filename}
-rw-r--r-- 1 rapids conda 13G Jan 22 16:54 measurements.txt
!head {filename}
Guatemala City;17.3
Launceston;24.3
Bulawayo;8.7
Tbilisi;9.5
Napoli;26.8
Sarajevo;27.5
Chihuahua;29.2
Ho Chi Minh City;8.4
Johannesburg;19.2
Cape Town;16.3

使用 RAPIDS 的 GPU 解决方案#

现在我们来看看如何使用 RAPIDS 加速我们的 Pandas 挑战实现。如果直接将参考实现从 Pandas 转换为 cuDF,您会遇到 cuDF 在字符串列方面的一些限制。此外,根据您的 GPU,您可能会遇到内存限制,因为 cuDF 会将整个数据集读入内存,而机器通常 GPU 内存少于 CPU 内存。

因此,要使用 RAPIDS 解决此问题,我们还需要使用 Dask 来对数据集进行分区并将其流式传输到 GPU 内存中,然后 cuDF 可以高效地处理每个分区。

部署 Dask#

我们将使用 dask-cuda 来启动 GPU Dask 集群。

from dask.distributed import Client
from dask_cuda import LocalCUDACluster

client = Client(LocalCUDACluster())

创建一个 LocalCUDACluster() 会检查机器并为每个检测到的 GPU 启动一个 Dask worker。然后我们将其传递给 Dask 客户端,这意味着 Notebook 中所有后续代码都将利用 GPU worker。

提示

Dask 有许多不同的工具来部署集群,并且它们都遵循实例化一个类的相同格式。因此,无论您是想利用单台机器中的所有资源(如本例),还是想利用整个多节点集群,Dask 都可以帮助您快速启动和运行。

Dask 仪表板#

我们还可以使用 Dask 仪表板 来查看正在发生的事情。

如果您从左侧工具栏中选择 Dask 图标,然后单击搜索图标,它应该会自动检测到我们的 LocalCUDACluster,并显示一个很长的图表列表供您选择。

在使用 GPU 时,“GPU Utilization”和“GPU Memory”图表将显示与 nvdashboard 图表相同的内容,但适用于 Dask 集群中的所有机器。这在处理多节点集群时非常有用,但在单节点配置中没有帮助。

要查看 Dask 在此挑战中正在做什么,您应该打开“Progress”和“Task Stream”图表,它们将显示正在执行的所有操作。但您也可以随意打开其他图表,探索 Dask 可以提供的所有不同指标。

Dask + cuDF 解决方案#

现在我们已经有了输入数据和 Dask 集群,我们可以编写一些 Dask 代码,利用底层的 cuDF 来执行计算操作。

首先我们需要导入 dask.dataframe 并告诉它使用 cudf 后端。

import dask
import dask.dataframe as dd

dask.config.set({"dataframe.backend": "cudf"})
<dask.config.set at 0x7fbd773ae590>

现在我们可以运行我们的 Dask 代码了,它几乎与我们之前使用的 Pandas 代码完全相同。

%%timeit -n 3 -r 4
df = dd.read_csv("measurements.txt", sep=";", header=None, names=["station", "measure"])
df = df.groupby("station").agg(["min", "max", "mean"])
df.columns = df.columns.droplevel()

# We need to switch back to Pandas for the final sort at the time of writing due to rapidsai/cudf#14794
df = df.compute().to_pandas()
df = df.sort_values("station")
4.59 s ± 124 ms per loop (mean ± std. dev. of 4 runs, 3 loops each)

在配备两块 NVIDIA RTX 8000 GPU 的台式工作站上运行此 Notebook,挑战大约在 4 秒内完成(比 Pandas 快 60 倍)。

观察进度条,您应该会看到它们总共填充和重置 12 次,因为我们的 %%timeit 操作正在多次解决挑战以获得平均速度。

在上面的截图中,您可以看到在双 GPU 系统上,Dask 利用了两块 GPU。但同样有趣的是,GPU 利用率从未达到 100%。这是因为机器中的 SSD 现在成了瓶颈。GPU 的计算效率非常高,我们从磁盘读取数据的速度不足以完全饱和它们。

结论#

RAPIDS 可以加速使用 Pandas 等库编写的现有工作流程,只需很少或无需更改代码。GPU 可以将计算加速几个数量级,这可以将性能瓶颈转移到系统的其他部分。

使用 nvdashboard 和 Dask 仪表板等仪表板工具可以帮助您查看和理解系统性能。也许在本例中,升级 SSD 是进一步提高性能的下一步。