cudf.pandas 和 Snowflake 入门#

Snowflake 中的 RAPIDS#

RAPIDS 是一套库,用于在 GPU 上执行端到端数据科学流水线。如果您有 Snowflake 表中的数据,并且希望使用 RAPIDS 进行探索,则可以使用 Snowpark Container Services 在 Snowflake 中部署 RAPIDS。

查看文档

为本示例的目的,请在开始之前遵循在 Snowflake 上运行 RAPIDS 的指南。

纽约市停车罚单 cudf.pandas 示例#

如果您有 Snowflake 表中的数据,您可以使用 cuDF.pandas 加速您的 ETL 工作流程。使用 cudf.pandas,您可以在零代码更改的情况下加速 pandas 生态系统。只需加载 cudf.pandas,您即可获得 GPU 加速的优势,并在需要时自动回退到 CPU。

在此示例中,我们有一个 Snowflake 表,其中包含来自 NYC Open Data 的 2022 财年停车违规罚单数据集。

将数据导入 Snowflake 表#

要跟着操作,您需要将纽约市停车违规数据导入到您的 Snowflake 账户中,并确保您可以从按照 在 Snowflake 上运行 RAPIDS 指南部署的 RAPIDS notebook Snowpark Service Container 访问此数据。

在 Snowflake SQL 工作表中使用 ACCOUNTADMIN 角色

-- Create a database where the table would live --

CREATE DATABASE CUDF_SNOWFLAKE_EXAMPLE;

USE DATABASE DATABASE CUDF_SNOWFLAKE_EXAMPLE; 

CREATE OR REPLACE FILE FORMAT my_parquet_format
TYPE = 'PARQUET';

CREATE OR REPLACE STAGE my_s3_stage
URL = 's3://rapidsai-data/datasets/nyc_parking/'
FILE_FORMAT = my_parquet_format;

-- Infer schema from parquet file to use when creating table later --
SELECT COLUMN_NAME, TYPE
FROM TABLE(
    INFER_SCHEMA(
        LOCATION => '@my_s3_stage',
        FILE_FORMAT => 'my_parquet_format',
        FILES => ('nyc_parking_violations_2022.parquet')
    )
);

-- Create table using the inferred schema in the previous step --
CREATE OR REPLACE TABLE NYC_PARKING_VIOLATIONS
  USING TEMPLATE (
    SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
      FROM TABLE(
        INFER_SCHEMA(
        LOCATION => '@my_s3_stage',
        FILE_FORMAT => 'my_parquet_format',
        FILES => ('nyc_parking_violations_2022.parquet')
        )
      ));

-- Get data from the stage into the table --      
COPY INTO NYC_PARKING_VIOLATIONS
FROM @my_s3_stage
FILES = ('nyc_parking_violations_2022.parquet')
FILE_FORMAT = (TYPE = 'PARQUET')
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;

确保从容器访问#

在 Snowflake 中部署 RAPIDS 的过程中,您创建了一个 CONTAINER_USER_ROLE,我们需要确保此角色对数据所在的数据库、模式和表具有访问权限,以便能够从中查询数据。

-- Ensure the role has USAGE permissions on the database and schema
GRANT USAGE ON DATABASE CUDF_SNOWFLAKE_EXAMPLE TO ROLE CONTAINER_USER_ROLE;
GRANT USAGE ON SCHEMA CUDF_SNOWFLAKE_EXAMPLE.PUBLIC TO ROLE CONTAINER_USER_ROLE; 

-- Ensure the role has SELECT permission on the table
GRANT SELECT ON TABLE CUDF_SNOWFLAKE_EXAMPLE.PUBLIC.NYC_PARKING_VIOLATIONS TO ROLE CONTAINER_USER_ROLE;

读取数据并试用。#

现在您已经将数据导入到 Snowflake 表中,并且 RAPIDS Snowpark 容器已启动并运行,请在 workspace 目录(添加到此目录的任何内容都将持久化)中创建一个新的 notebook,并按照以下说明操作。

加载 cudf.pandas#

在您的 notebook 的第一个单元格中,加载 cudf.pandas 扩展

%load_ext cudf.pandas

连接到 Snowflake 并创建一个 Snowpark session#

import os
from pathlib import Path

from snowflake.snowpark import Session

connection_parameters = {
    "account": os.getenv("SNOWFLAKE_ACCOUNT"),
    "host": os.getenv("SNOWFLAKE_HOST"),
    "token": Path("/snowflake/session/token").read_text(),
    "authenticator": "oauth",
    "database": "CUDF_SNOWFLAKE_EXAMPLE",  # the created database
    "schema": "PUBLIC",
    "warehouse": "CONTAINER_HOL_WH",
}

session = Session.builder.configs(connection_parameters).create()

# Check the session
print(
    f"Current session info: Warehouse: {session.get_current_warehouse()}  "
    f"Database: {session.get_current_database()}    "
    f"Schema: {session.get_current_schema()}  "
    f"Role: {session.get_current_role()}"
)
# Get some interesting columns from the table
table = session.table("NYC_PARKING_VIOLATIONS").select(
    "Registration State",
    "Violation Description",
    "Vehicle Body Type",
    "Issue Date",
    "Summons Number",
)
table

请注意,截至目前,我们有一个 snowpark dataframe。要获得 pandas dataframe,我们使用 .to_pandas()

警告

目前,存在一个已知问题,它阻止我们使用 cudf 加速后续步骤,我们希望尽快解决此问题。在此期间,我们需要一个变通方法将数据导入到 cudf.pandas 可以理解的 pandas dataframe 中。

from cudf.pandas.module_accelerator import disable_module_accelerator

with disable_module_accelerator():
    df = table.to_pandas()

import pandas as pd

df = pd.DataFrame(df)  # this will take a few seconds

将来,上面的单元格将简化为只需执行 df = table.to_pandas()

但现在我们已经准备好看看 cudf.pandas 的实际表现了。需要说明的是,此数据集有 len(df) = 15435607 条记录,您应该会看到以下操作在毫秒级别完成。

美国各州车辆最常犯的停车违规行为是什么?

我们数据集中的每条记录都包含违规车辆的注册州和停车违规类型。假设我们想知道不同州注册的车辆最常犯的违规类型。我们可以这样做:

%%time
(
    df[["Registration State", "Violation Description"]]  # get only these two columns
    .value_counts()  # get the count of offences per state and per type of offence
    .groupby("Registration State")  # group by state
    .head(
        1
    )  # get the first row in each group (the type of offence with the largest count)
    .sort_index()  # sort by state name
    .reset_index()
)

哪些车辆类型最常涉及停车违规? 我们还可以调查哪些车辆类型最常出现在停车违规中

%%time

(
    df.groupby(["Vehicle Body Type"])
    .agg({"Summons Number": "count"})
    .rename(columns={"Summons Number": "Count"})
    .sort_values(["Count"], ascending=False)
)

停车违规行为在一周中的不同天如何变化?

%%time

weekday_names = {
    0: "Monday",
    1: "Tuesday",
    2: "Wednesday",
    3: "Thursday",
    4: "Friday",
    5: "Saturday",
    6: "Sunday",
}

df["Issue Date"] = df["Issue Date"].astype("datetime64[ms]")
df["issue_weekday"] = df["Issue Date"].dt.weekday.map(weekday_names)

df.groupby(["issue_weekday"])["Summons Number"].count().sort_values()

结论#

使用 cudf.pandas,您可以通过将其读取到 pandas dataframe 中,加速涉及 Snowflake 表中数据的 GPU 工作流程

当速度变慢时,只需加载 cudf.pandas 并在 GPU 上运行您现有的代码即可!

要了解更多信息,我们鼓励您访问 rapids.ai/cudf-pandas