cudf.pandas 和 Snowflake 入门#
Snowflake 中的 RAPIDS#
RAPIDS 是一套库,用于在 GPU 上执行端到端数据科学流水线。如果您有 Snowflake 表中的数据,并且希望使用 RAPIDS 进行探索,则可以使用 Snowpark Container Services 在 Snowflake 中部署 RAPIDS。
纽约市停车罚单 cudf.pandas
示例#
如果您有 Snowflake 表中的数据,您可以使用 cuDF.pandas
加速您的 ETL 工作流程。使用 cudf.pandas
,您可以在零代码更改的情况下加速 pandas
生态系统。只需加载 cudf.pandas
,您即可获得 GPU 加速的优势,并在需要时自动回退到 CPU。
在此示例中,我们有一个 Snowflake 表,其中包含来自 NYC Open Data 的 2022 财年停车违规罚单数据集。
将数据导入 Snowflake 表#
要跟着操作,您需要将纽约市停车违规数据导入到您的 Snowflake 账户中,并确保您可以从按照 在 Snowflake 上运行 RAPIDS 指南部署的 RAPIDS notebook Snowpark Service Container 访问此数据。
在 Snowflake SQL 工作表中使用 ACCOUNTADMIN
角色
-- Create a database where the table would live --
CREATE DATABASE CUDF_SNOWFLAKE_EXAMPLE;
USE DATABASE DATABASE CUDF_SNOWFLAKE_EXAMPLE;
CREATE OR REPLACE FILE FORMAT my_parquet_format
TYPE = 'PARQUET';
CREATE OR REPLACE STAGE my_s3_stage
URL = 's3://rapidsai-data/datasets/nyc_parking/'
FILE_FORMAT = my_parquet_format;
-- Infer schema from parquet file to use when creating table later --
SELECT COLUMN_NAME, TYPE
FROM TABLE(
INFER_SCHEMA(
LOCATION => '@my_s3_stage',
FILE_FORMAT => 'my_parquet_format',
FILES => ('nyc_parking_violations_2022.parquet')
)
);
-- Create table using the inferred schema in the previous step --
CREATE OR REPLACE TABLE NYC_PARKING_VIOLATIONS
USING TEMPLATE (
SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
FROM TABLE(
INFER_SCHEMA(
LOCATION => '@my_s3_stage',
FILE_FORMAT => 'my_parquet_format',
FILES => ('nyc_parking_violations_2022.parquet')
)
));
-- Get data from the stage into the table --
COPY INTO NYC_PARKING_VIOLATIONS
FROM @my_s3_stage
FILES = ('nyc_parking_violations_2022.parquet')
FILE_FORMAT = (TYPE = 'PARQUET')
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE;
确保从容器访问#
在 Snowflake 中部署 RAPIDS 的过程中,您创建了一个 CONTAINER_USER_ROLE
,我们需要确保此角色对数据所在的数据库、模式和表具有访问权限,以便能够从中查询数据。
-- Ensure the role has USAGE permissions on the database and schema
GRANT USAGE ON DATABASE CUDF_SNOWFLAKE_EXAMPLE TO ROLE CONTAINER_USER_ROLE;
GRANT USAGE ON SCHEMA CUDF_SNOWFLAKE_EXAMPLE.PUBLIC TO ROLE CONTAINER_USER_ROLE;
-- Ensure the role has SELECT permission on the table
GRANT SELECT ON TABLE CUDF_SNOWFLAKE_EXAMPLE.PUBLIC.NYC_PARKING_VIOLATIONS TO ROLE CONTAINER_USER_ROLE;
读取数据并试用。#
现在您已经将数据导入到 Snowflake 表中,并且 RAPIDS Snowpark 容器已启动并运行,请在 workspace
目录(添加到此目录的任何内容都将持久化)中创建一个新的 notebook,并按照以下说明操作。
加载 cudf.pandas#
在您的 notebook 的第一个单元格中,加载 cudf.pandas
扩展
%load_ext cudf.pandas
连接到 Snowflake 并创建一个 Snowpark session#
import os
from pathlib import Path
from snowflake.snowpark import Session
connection_parameters = {
"account": os.getenv("SNOWFLAKE_ACCOUNT"),
"host": os.getenv("SNOWFLAKE_HOST"),
"token": Path("/snowflake/session/token").read_text(),
"authenticator": "oauth",
"database": "CUDF_SNOWFLAKE_EXAMPLE", # the created database
"schema": "PUBLIC",
"warehouse": "CONTAINER_HOL_WH",
}
session = Session.builder.configs(connection_parameters).create()
# Check the session
print(
f"Current session info: Warehouse: {session.get_current_warehouse()} "
f"Database: {session.get_current_database()} "
f"Schema: {session.get_current_schema()} "
f"Role: {session.get_current_role()}"
)
# Get some interesting columns from the table
table = session.table("NYC_PARKING_VIOLATIONS").select(
"Registration State",
"Violation Description",
"Vehicle Body Type",
"Issue Date",
"Summons Number",
)
table
请注意,截至目前,我们有一个 snowpark dataframe。要获得 pandas dataframe,我们使用 .to_pandas()
警告
目前,存在一个已知问题,它阻止我们使用 cudf 加速后续步骤,我们希望尽快解决此问题。在此期间,我们需要一个变通方法将数据导入到 cudf.pandas
可以理解的 pandas dataframe 中。
from cudf.pandas.module_accelerator import disable_module_accelerator
with disable_module_accelerator():
df = table.to_pandas()
import pandas as pd
df = pd.DataFrame(df) # this will take a few seconds
将来,上面的单元格将简化为只需执行 df = table.to_pandas()
。
但现在我们已经准备好看看 cudf.pandas
的实际表现了。需要说明的是,此数据集有 len(df) = 15435607
条记录,您应该会看到以下操作在毫秒级别完成。
美国各州车辆最常犯的停车违规行为是什么?
我们数据集中的每条记录都包含违规车辆的注册州和停车违规类型。假设我们想知道不同州注册的车辆最常犯的违规类型。我们可以这样做:
%%time
(
df[["Registration State", "Violation Description"]] # get only these two columns
.value_counts() # get the count of offences per state and per type of offence
.groupby("Registration State") # group by state
.head(
1
) # get the first row in each group (the type of offence with the largest count)
.sort_index() # sort by state name
.reset_index()
)
哪些车辆类型最常涉及停车违规? 我们还可以调查哪些车辆类型最常出现在停车违规中
%%time
(
df.groupby(["Vehicle Body Type"])
.agg({"Summons Number": "count"})
.rename(columns={"Summons Number": "Count"})
.sort_values(["Count"], ascending=False)
)
停车违规行为在一周中的不同天如何变化?
%%time
weekday_names = {
0: "Monday",
1: "Tuesday",
2: "Wednesday",
3: "Thursday",
4: "Friday",
5: "Saturday",
6: "Sunday",
}
df["Issue Date"] = df["Issue Date"].astype("datetime64[ms]")
df["issue_weekday"] = df["Issue Date"].dt.weekday.map(weekday_names)
df.groupby(["issue_weekday"])["Summons Number"].count().sort_values()
结论#
使用 cudf.pandas
,您可以通过将其读取到 pandas dataframe 中,加速涉及 Snowflake 表中数据的 GPU 工作流程
当速度变慢时,只需加载 cudf.pandas 并在 GPU 上运行您现有的代码即可!
要了解更多信息,我们鼓励您访问 rapids.ai/cudf-pandas。