使用 NVIDIA GPU 在 Google Kubernetes Engine 上执行时间序列预测#

在此示例中,我们将研究一个使用 M5 预测竞赛数据的真实世界时间序列预测示例。Walmart 提供了来自三个州多个商店的历史销售数据,我们的任务是预测未来 28 天的销售额。

先决条件#

准备 GKE 集群#

要运行此示例,您需要一个可用的 Google Kubernetes Engine (GKE) 集群,并且可以访问 NVIDIA GPU。

查看文档

设置可以访问 NVIDIA GPU 的 Google Kubernetes Engine (GKE) 集群。按照Google Kubernetes Engine 中的说明进行操作。

  1. 为确保示例顺利运行,请确保您的 GPU 具有充足的内存。此 notebook 已使用 NVIDIA A100 进行测试。

  2. 按照以下指南中的说明设置 Dask-Kubernetes 集成

Kubeflow 并非严格必需,但我们强烈推荐它,因为 Kubeflow 提供了一个良好的 notebook 环境,您可以在其中运行此 notebook,位于 k8s 集群内。(您可以选择任何方法;我们在从清单安装 Kubeflow 后测试了此示例。)创建 notebook 环境时,请使用以下配置

  • 2 个 CPU,16 GiB 内存

  • 1 个 NVIDIA GPU

  • 40 GiB 磁盘卷

上传示例中的所有 notebook 后,在 notebook 环境中运行此 notebook (notebook.ipynb)。

注意:我们将使用 worker pod 来加速训练阶段。预处理步骤将仅在调度器节点上运行。

在 Google Cloud Storage 中准备一个存储桶#

在 Google Cloud Storage 中创建一个新的存储桶。确保 k8s 集群中的 worker pod 对此存储桶具有读/写访问权限。可以通过以下任一方法完成

  1. 选项 1:预配 GKE 集群时指定附加范围。

    预配新的 GKE 集群时,添加 storage-rw 范围。此选项仅在您从头开始创建新集群时可用。如果您使用的是现有 GKE 集群,请参阅选项 2。

    示例

gcloud container clusters create my_new_cluster --accelerator type=nvidia-tesla-t4 \
   --machine-type n1-standard-32 --zone us-central1-c --release-channel stable \
   --num-nodes 5 --scopes=gke-default,storage-rw
  1. 选项 2:向关联的服务帐号授予存储桶访问权限。

    找出与您的 GKE 集群关联的服务帐号。您可以按如下方式向服务帐号授予存储桶访问权限:导航到 Cloud Storage 控制台,打开存储桶的“存储桶详细信息”页面,打开“权限”选项卡,然后点击“授予访问权限”。

输入您的集群具有读写访问权限的存储桶名称

bucket_name = "<Put the name of the bucket here>"

在 notebook 环境中安装 Python 包#

!pip install kaggle gcsfs dask-kubernetes optuna
# Test if the bucket is accessible
import gcsfs

fs = gcsfs.GCSFileSystem()
fs.ls(f"{bucket_name}/")
[]

从 Kaggle 获取时间序列数据集#

如果您还没有 Kaggle 帐号,请立即创建一个。然后按照Kaggle 公共 API 文档中的说明获取 API 密钥。此步骤是获取 M5 预测竞赛训练数据所必需的。获取 API 密钥后,填写以下内容

kaggle_username = "<Put your Kaggle username here>"
kaggle_api_key = "<Put your Kaggle API key here>"

现在我们可以下载数据集了

%env KAGGLE_USERNAME=$kaggle_username
%env KAGGLE_KEY=$kaggle_api_key

!kaggle competitions download -c m5-forecasting-accuracy

让我们解压 ZIP 存档文件,看看里面有什么。

import zipfile

with zipfile.ZipFile("m5-forecasting-accuracy.zip", "r") as zf:
    zf.extractall(path="./data")
!ls -lh data/*.csv
-rw-r--r-- 1 rapids conda 102K Sep 28 18:59 data/calendar.csv
-rw-r--r-- 1 rapids conda 117M Sep 28 18:59 data/sales_train_evaluation.csv
-rw-r--r-- 1 rapids conda 115M Sep 28 18:59 data/sales_train_validation.csv
-rw-r--r-- 1 rapids conda 5.0M Sep 28 18:59 data/sample_submission.csv
-rw-r--r-- 1 rapids conda 194M Sep 28 18:59 data/sell_prices.csv

数据预处理#

现在我们可以运行预处理步骤了。

导入模块并定义实用函数#

import gc
import pathlib

import cudf
import gcsfs
import numpy as np


def sizeof_fmt(num, suffix="B"):
    for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
        if abs(num) < 1024.0:
            return f"{num:3.1f}{unit}{suffix}"
        num /= 1024.0
    return f"{num:.1f}Yi{suffix}"


def report_dataframe_size(df, name):
    mem_usage = sizeof_fmt(grid_df.memory_usage(index=True).sum())
    print(f"{name} takes up {mem_usage} memory on GPU")

加载数据#

TARGET = "sales"  # Our main target
END_TRAIN = 1941  # Last day in train set
raw_data_dir = pathlib.Path("./data/")
train_df = cudf.read_csv(raw_data_dir / "sales_train_evaluation.csv")
prices_df = cudf.read_csv(raw_data_dir / "sell_prices.csv")
calendar_df = cudf.read_csv(raw_data_dir / "calendar.csv").rename(
    columns={"d": "day_id"}
)
train_df
id item_id dept_id cat_id store_id state_id d_1 d_2 d_3 d_4 ... d_1932 d_1933 d_1934 d_1935 d_1936 d_1937 d_1938 d_1939 d_1940 d_1941
0 HOBBIES_1_001_CA_1_evaluation HOBBIES_1_001 HOBBIES_1 HOBBIES CA_1 CA 0 0 0 0 ... 2 4 0 0 0 0 3 3 0 1
1 HOBBIES_1_002_CA_1_evaluation HOBBIES_1_002 HOBBIES_1 HOBBIES CA_1 CA 0 0 0 0 ... 0 1 2 1 1 0 0 0 0 0
2 HOBBIES_1_003_CA_1_evaluation HOBBIES_1_003 HOBBIES_1 HOBBIES CA_1 CA 0 0 0 0 ... 1 0 2 0 0 0 2 3 0 1
3 HOBBIES_1_004_CA_1_evaluation HOBBIES_1_004 HOBBIES_1 HOBBIES CA_1 CA 0 0 0 0 ... 1 1 0 4 0 1 3 0 2 6
4 HOBBIES_1_005_CA_1_evaluation HOBBIES_1_005 HOBBIES_1 HOBBIES CA_1 CA 0 0 0 0 ... 0 0 0 2 1 0 0 2 1 0
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
30485 FOODS_3_823_WI_3_evaluation FOODS_3_823 FOODS_3 FOODS WI_3 WI 0 0 2 2 ... 1 0 3 0 1 1 0 0 1 1
30486 FOODS_3_824_WI_3_evaluation FOODS_3_824 FOODS_3 FOODS WI_3 WI 0 0 0 0 ... 0 0 0 0 0 0 1 0 1 0
30487 FOODS_3_825_WI_3_evaluation FOODS_3_825 FOODS_3 FOODS WI_3 WI 0 6 0 2 ... 0 0 1 2 0 1 0 1 0 2
30488 FOODS_3_826_WI_3_evaluation FOODS_3_826 FOODS_3 FOODS WI_3 WI 0 0 0 0 ... 1 1 1 4 6 0 1 1 1 0
30489 FOODS_3_827_WI_3_evaluation FOODS_3_827 FOODS_3 FOODS WI_3 WI 0 0 0 0 ... 1 2 0 5 4 0 2 2 5 1

30490 行 × 1947 列

d_1, d_2, …, d_1941 表示从 2011-01-29 起第 1、2、…、1941 天的销售数据。

prices_df
store_id item_id wm_yr_wk sell_price
0 CA_1 HOBBIES_1_001 11325 9.58
1 CA_1 HOBBIES_1_001 11326 9.58
2 CA_1 HOBBIES_1_001 11327 8.26
3 CA_1 HOBBIES_1_001 11328 8.26
4 CA_1 HOBBIES_1_001 11329 8.26
... ... ... ... ...
6841116 WI_3 FOODS_3_827 11617 1.00
6841117 WI_3 FOODS_3_827 11618 1.00
6841118 WI_3 FOODS_3_827 11619 1.00
6841119 WI_3 FOODS_3_827 11620 1.00
6841120 WI_3 FOODS_3_827 11621 1.00

6841121 行 × 4 列

calendar_df
date wm_yr_wk weekday wday month year day_id event_name_1 event_type_1 event_name_2 event_type_2 snap_CA snap_TX snap_WI
0 2011-01-29 11101 Saturday 1 1 2011 d_1 <NA> <NA> <NA> <NA> 0 0 0
1 2011-01-30 11101 Sunday 2 1 2011 d_2 <NA> <NA> <NA> <NA> 0 0 0
2 2011-01-31 11101 Monday 3 1 2011 d_3 <NA> <NA> <NA> <NA> 0 0 0
3 2011-02-01 11101 Tuesday 4 2 2011 d_4 <NA> <NA> <NA> <NA> 1 1 0
4 2011-02-02 11101 Wednesday 5 2 2011 d_5 <NA> <NA> <NA> <NA> 1 0 1
... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
1964 2016-06-15 11620 Wednesday 5 6 2016 d_1965 <NA> <NA> <NA> <NA> 0 1 1
1965 2016-06-16 11620 Thursday 6 6 2016 d_1966 <NA> <NA> <NA> <NA> 0 0 0
1966 2016-06-17 11620 Friday 7 6 2016 d_1967 <NA> <NA> <NA> <NA> 0 0 0
1967 2016-06-18 11621 Saturday 1 6 2016 d_1968 <NA> <NA> <NA> <NA> 0 0 0
1968 2016-06-19 11621 Sunday 2 6 2016 d_1969 NBAFinalsEnd Sporting Father's day Cultural 0 0 0

1969 行 × 14 列

重新格式化销售时间序列数据#

使用 cudf.melt 将列 d_1, d_2, …, d_1941 转换为单独的行。

index_columns = ["id", "item_id", "dept_id", "cat_id", "store_id", "state_id"]
grid_df = cudf.melt(
    train_df, id_vars=index_columns, var_name="day_id", value_name=TARGET
)
grid_df
id item_id dept_id cat_id store_id state_id day_id sales
0 HOBBIES_1_001_CA_1_evaluation HOBBIES_1_001 HOBBIES_1 HOBBIES CA_1 CA d_1 0
1 HOBBIES_1_002_CA_1_evaluation HOBBIES_1_002 HOBBIES_1 HOBBIES CA_1 CA d_1 0
2 HOBBIES_1_003_CA_1_evaluation HOBBIES_1_003 HOBBIES_1 HOBBIES CA_1 CA d_1 0
3 HOBBIES_1_004_CA_1_evaluation HOBBIES_1_004 HOBBIES_1 HOBBIES CA_1 CA d_1 0
4 HOBBIES_1_005_CA_1_evaluation HOBBIES_1_005 HOBBIES_1 HOBBIES CA_1 CA d_1 0
... ... ... ... ... ... ... ... ...
59181085 FOODS_3_823_WI_3_evaluation FOODS_3_823 FOODS_3 FOODS WI_3 WI d_1941 1
59181086 FOODS_3_824_WI_3_evaluation FOODS_3_824 FOODS_3 FOODS WI_3 WI d_1941 0
59181087 FOODS_3_825_WI_3_evaluation FOODS_3_825 FOODS_3 FOODS WI_3 WI d_1941 2
59181088 FOODS_3_826_WI_3_evaluation FOODS_3_826 FOODS_3 FOODS WI_3 WI d_1941 0
59181089 FOODS_3_827_WI_3_evaluation FOODS_3_827 FOODS_3 FOODS WI_3 WI d_1941 1

59181090 行 × 8 列

对于每个时间序列,添加对应于未来预测范围的 28 行

add_grid = cudf.DataFrame()
for i in range(1, 29):
    temp_df = train_df[index_columns]
    temp_df = temp_df.drop_duplicates()
    temp_df["day_id"] = "d_" + str(END_TRAIN + i)
    temp_df[TARGET] = np.nan  # Sales amount at time (n + i) is unknown
    add_grid = cudf.concat([add_grid, temp_df])
add_grid["day_id"] = add_grid["day_id"].astype(
    "category"
)  # The day_id column is categorical, after cudf.melt

grid_df = cudf.concat([grid_df, add_grid])
grid_df = grid_df.reset_index(drop=True)
grid_df["sales"] = grid_df["sales"].astype(
    np.float32
)  # Use float32 type for sales column, to conserve memory
grid_df
id item_id dept_id cat_id store_id state_id day_id sales
0 HOBBIES_1_001_CA_1_evaluation HOBBIES_1_001 HOBBIES_1 HOBBIES CA_1 CA d_1 0.0
1 HOBBIES_1_002_CA_1_evaluation HOBBIES_1_002 HOBBIES_1 HOBBIES CA_1 CA d_1 0.0
2 HOBBIES_1_003_CA_1_evaluation HOBBIES_1_003 HOBBIES_1 HOBBIES CA_1 CA d_1 0.0
3 HOBBIES_1_004_CA_1_evaluation HOBBIES_1_004 HOBBIES_1 HOBBIES CA_1 CA d_1 0.0
4 HOBBIES_1_005_CA_1_evaluation HOBBIES_1_005 HOBBIES_1 HOBBIES CA_1 CA d_1 0.0
... ... ... ... ... ... ... ... ...
60034805 FOODS_3_823_WI_3_evaluation FOODS_3_823 FOODS_3 FOODS WI_3 WI d_1969 NaN
60034806 FOODS_3_824_WI_3_evaluation FOODS_3_824 FOODS_3 FOODS WI_3 WI d_1969 NaN
60034807 FOODS_3_825_WI_3_evaluation FOODS_3_825 FOODS_3 FOODS WI_3 WI d_1969 NaN
60034808 FOODS_3_826_WI_3_evaluation FOODS_3_826 FOODS_3 FOODS WI_3 WI d_1969 NaN
60034809 FOODS_3_827_WI_3_evaluation FOODS_3_827 FOODS_3 FOODS WI_3 WI d_1969 NaN

60034810 行 × 8 列

释放 GPU 内存#

GPU 内存是宝贵的资源,因此让我们尝试释放一些内存。首先,删除我们不再需要的临时变量

# Use xdel magic to scrub extra references from Jupyter notebook
%xdel temp_df
%xdel add_grid
%xdel train_df

# Invoke the garbage collector explicitly to free up memory
gc.collect()
8136

其次,通过将字符串转换为分类变量来减小 grid_df 的占用空间

report_dataframe_size(grid_df, "grid_df")
grid_df takes up 5.2GiB memory on GPU
grid_df.dtypes
id            object
item_id       object
dept_id       object
cat_id        object
store_id      object
state_id      object
day_id      category
sales        float32
dtype: object
for col in index_columns:
    grid_df[col] = grid_df[col].astype("category")
    gc.collect()
report_dataframe_size(grid_df, "grid_df")
grid_df takes up 802.6MiB memory on GPU
grid_df.dtypes
id          category
item_id     category
dept_id     category
cat_id      category
store_id    category
state_id    category
day_id      category
sales        float32
dtype: object

确定每个产品的发布周#

prices_df 表中的每一行包含某个产品在某商店在特定周的售价。

prices_df
store_id item_id wm_yr_wk sell_price
0 CA_1 HOBBIES_1_001 11325 9.58
1 CA_1 HOBBIES_1_001 11326 9.58
2 CA_1 HOBBIES_1_001 11327 8.26
3 CA_1 HOBBIES_1_001 11328 8.26
4 CA_1 HOBBIES_1_001 11329 8.26
... ... ... ... ...
6841116 WI_3 FOODS_3_827 11617 1.00
6841117 WI_3 FOODS_3_827 11618 1.00
6841118 WI_3 FOODS_3_827 11619 1.00
6841119 WI_3 FOODS_3_827 11620 1.00
6841120 WI_3 FOODS_3_827 11621 1.00

6841121 行 × 4 列

请注意,并非所有产品都在每周销售。有些产品仅在某些周销售。让我们使用 groupby 操作来确定每个产品上架的第一周。

release_df = (
    prices_df.groupby(["store_id", "item_id"])["wm_yr_wk"].agg("min").reset_index()
)
release_df.columns = ["store_id", "item_id", "release_week"]
release_df
store_id item_id release_week
0 CA_4 FOODS_3_529 11421
1 TX_1 HOUSEHOLD_1_409 11230
2 WI_2 FOODS_3_145 11214
3 CA_4 HOUSEHOLD_1_494 11106
4 WI_3 HOBBIES_1_093 11223
... ... ... ...
30485 CA_3 HOUSEHOLD_1_369 11205
30486 CA_2 FOODS_3_109 11101
30487 CA_4 FOODS_2_119 11101
30488 CA_4 HOUSEHOLD_2_384 11110
30489 WI_3 HOBBIES_1_135 11328

30490 行 × 3 列

现在我们已经计算出每个产品的发布周,让我们将其合并回 grid_df

grid_df = grid_df.merge(release_df, on=["store_id", "item_id"], how="left")
grid_df = grid_df.sort_values(index_columns + ["day_id"]).reset_index(drop=True)
grid_df
id item_id dept_id cat_id store_id state_id day_id sales release_week
0 FOODS_1_001_CA_1_evaluation FOODS_1_001 FOODS_1 FOODS CA_1 CA d_1 3.0 11101
1 FOODS_1_001_CA_1_evaluation FOODS_1_001 FOODS_1 FOODS CA_1 CA d_2 0.0 11101
2 FOODS_1_001_CA_1_evaluation FOODS_1_001 FOODS_1 FOODS CA_1 CA d_3 0.0 11101
3 FOODS_1_001_CA_1_evaluation FOODS_1_001 FOODS_1 FOODS CA_1 CA d_4 1.0 11101
4 FOODS_1_001_CA_1_evaluation FOODS_1_001 FOODS_1 FOODS CA_1 CA d_5 4.0 11101
... ... ... ... ... ... ... ... ... ...
60034805 HOUSEHOLD_2_516_WI_3_evaluation HOUSEHOLD_2_516 HOUSEHOLD_2 HOUSEHOLD WI_3 WI d_1965 NaN 11101
60034806 HOUSEHOLD_2_516_WI_3_evaluation HOUSEHOLD_2_516 HOUSEHOLD_2 HOUSEHOLD WI_3 WI d_1966 NaN 11101
60034807 HOUSEHOLD_2_516_WI_3_evaluation HOUSEHOLD_2_516 HOUSEHOLD_2 HOUSEHOLD WI_3 WI d_1967 NaN 11101
60034808 HOUSEHOLD_2_516_WI_3_evaluation HOUSEHOLD_2_516 HOUSEHOLD_2 HOUSEHOLD WI_3 WI d_1968 NaN 11101
60034809 HOUSEHOLD_2_516_WI_3_evaluation HOUSEHOLD_2_516 HOUSEHOLD_2 HOUSEHOLD WI_3 WI d_1969 NaN 11101

60034810 行 × 9 列

del release_df  # No longer needed
gc.collect()
139
report_dataframe_size(grid_df, "grid_df")
grid_df takes up 1.2GiB memory on GPU

过滤掉零销售额的条目#

我们可以通过删除 grid_df 中对应于零销售额的行来进一步节省空间。由于每个产品直到其发布周才上架,因此在发布周之前的任何一周其销售额必定为零。

为了利用这一见解,我们从 calendar_df 中引入 wm_yr_wk

grid_df = grid_df.merge(calendar_df[["wm_yr_wk", "day_id"]], on=["day_id"], how="left")
grid_df
id item_id dept_id cat_id store_id state_id day_id sales release_week wm_yr_wk
0 FOODS_1_001_WI_2_evaluation FOODS_1_001 FOODS_1 FOODS WI_2 WI d_809 0.0 11101 11312
1 FOODS_1_001_WI_2_evaluation FOODS_1_001 FOODS_1 FOODS WI_2 WI d_810 0.0 11101 11312
2 FOODS_1_001_WI_2_evaluation FOODS_1_001 FOODS_1 FOODS WI_2 WI d_811 2.0 11101 11312
3 FOODS_1_001_WI_2_evaluation FOODS_1_001 FOODS_1 FOODS WI_2 WI d_812 0.0 11101 11312
4 FOODS_1_001_WI_2_evaluation FOODS_1_001 FOODS_1 FOODS WI_2 WI d_813 1.0 11101 11313
... ... ... ... ... ... ... ... ... ... ...
60034805 HOUSEHOLD_2_516_WI_3_evaluation HOUSEHOLD_2_516 HOUSEHOLD_2 HOUSEHOLD WI_3 WI d_52 0.0 11101 11108
60034806 HOUSEHOLD_2_516_WI_3_evaluation HOUSEHOLD_2_516 HOUSEHOLD_2 HOUSEHOLD WI_3 WI d_53 0.0 11101 11108
60034807 HOUSEHOLD_2_516_WI_3_evaluation HOUSEHOLD_2_516 HOUSEHOLD_2 HOUSEHOLD WI_3 WI d_54 0.0 11101 11108
60034808 HOUSEHOLD_2_516_WI_3_evaluation HOUSEHOLD_2_516 HOUSEHOLD_2 HOUSEHOLD WI_3 WI d_55 0.0 11101 11108
60034809 HOUSEHOLD_2_516_WI_3_evaluation HOUSEHOLD_2_516 HOUSEHOLD_2 HOUSEHOLD WI_3 WI d_49 0.0 11101 11107

60034810 行 × 10 列

report_dataframe_size(grid_df, "grid_df")
grid_df takes up 1.7GiB memory on GPU

wm_yr_wk 标识了包含由 day_id 列给定的日期的周。现在,让我们过滤 grid_df 中所有 wm_yr_wk 小于 release_week 的行

df = grid_df[grid_df["wm_yr_wk"] < grid_df["release_week"]]
df
id item_id dept_id cat_id store_id state_id day_id sales release_week wm_yr_wk
6766 FOODS_1_002_TX_1_evaluation FOODS_1_002 FOODS_1 FOODS TX_1 TX d_1 0.0 11102 11101
6767 FOODS_1_002_TX_1_evaluation FOODS_1_002 FOODS_1 FOODS TX_1 TX d_2 0.0 11102 11101
19686 FOODS_1_001_TX_3_evaluation FOODS_1_001 FOODS_1 FOODS TX_3 TX d_1 0.0 11102 11101
19687 FOODS_1_001_TX_3_evaluation FOODS_1_001 FOODS_1 FOODS TX_3 TX d_2 0.0 11102 11101
19688 FOODS_1_001_TX_3_evaluation FOODS_1_001 FOODS_1 FOODS TX_3 TX d_3 0.0 11102 11101
... ... ... ... ... ... ... ... ... ... ...
60033493 HOUSEHOLD_2_516_WI_2_evaluation HOUSEHOLD_2_516 HOUSEHOLD_2 HOUSEHOLD WI_2 WI d_20 0.0 11106 11103
60033494 HOUSEHOLD_2_516_WI_2_evaluation HOUSEHOLD_2_516 HOUSEHOLD_2 HOUSEHOLD WI_2 WI d_21 0.0 11106 11103
60033495 HOUSEHOLD_2_516_WI_2_evaluation HOUSEHOLD_2_516 HOUSEHOLD_2 HOUSEHOLD WI_2 WI d_22 0.0 11106 11104
60033496 HOUSEHOLD_2_516_WI_2_evaluation HOUSEHOLD_2_516 HOUSEHOLD_2 HOUSEHOLD WI_2 WI d_23 0.0 11106 11104
60033497 HOUSEHOLD_2_516_WI_2_evaluation HOUSEHOLD_2_516 HOUSEHOLD_2 HOUSEHOLD WI_2 WI d_24 0.0 11106 11104

12299413 行 × 10 列

正如我们所怀疑的,在发布周之前的几周,销售额为零。

assert (df["sales"] == 0).all()

为了数据分析的目的,我们可以安全地删除销售额为零的行

grid_df = grid_df[grid_df["wm_yr_wk"] >= grid_df["release_week"]].reset_index(drop=True)
grid_df["wm_yr_wk"] = grid_df["wm_yr_wk"].astype(
    np.int32
)  # Convert wm_yr_wk column to int32, to conserve memory
grid_df
id item_id dept_id cat_id store_id state_id day_id sales release_week wm_yr_wk
0 FOODS_1_001_WI_2_evaluation FOODS_1_001 FOODS_1 FOODS WI_2 WI d_809 0.0 11101 11312
1 FOODS_1_001_WI_2_evaluation FOODS_1_001 FOODS_1 FOODS WI_2 WI d_810 0.0 11101 11312
2 FOODS_1_001_WI_2_evaluation FOODS_1_001 FOODS_1 FOODS WI_2 WI d_811 2.0 11101 11312
3 FOODS_1_001_WI_2_evaluation FOODS_1_001 FOODS_1 FOODS WI_2 WI d_812 0.0 11101 11312
4 FOODS_1_001_WI_2_evaluation FOODS_1_001 FOODS_1 FOODS WI_2 WI d_813 1.0 11101 11313
... ... ... ... ... ... ... ... ... ... ...
47735392 HOUSEHOLD_2_516_WI_3_evaluation HOUSEHOLD_2_516 HOUSEHOLD_2 HOUSEHOLD WI_3 WI d_52 0.0 11101 11108
47735393 HOUSEHOLD_2_516_WI_3_evaluation HOUSEHOLD_2_516 HOUSEHOLD_2 HOUSEHOLD WI_3 WI d_53 0.0 11101 11108
47735394 HOUSEHOLD_2_516_WI_3_evaluation HOUSEHOLD_2_516 HOUSEHOLD_2 HOUSEHOLD WI_3 WI d_54 0.0 11101 11108
47735395 HOUSEHOLD_2_516_WI_3_evaluation HOUSEHOLD_2_516 HOUSEHOLD_2 HOUSEHOLD WI_3 WI d_55 0.0 11101 11108
47735396 HOUSEHOLD_2_516_WI_3_evaluation HOUSEHOLD_2_516 HOUSEHOLD_2 HOUSEHOLD WI_3 WI d_49 0.0 11101 11107

47735397 行 × 10 列

report_dataframe_size(grid_df, "grid_df")
grid_df takes up 1.2GiB memory on GPU

为产品项分配权重#

当我们评估机器学习模型的准确性时,应为每个产品项分配一个权重,以指示该项的相对重要性。对于 M5 竞赛,权重是根据最后 28 天的总销售额(美元)计算得出的。

# Convert day_id to integers
grid_df["day_id_int"] = grid_df["day_id"].to_pandas().apply(lambda x: x[2:]).astype(int)

# Compute the total sales over the latest 28 days, per product item
last28 = grid_df[(grid_df["day_id_int"] >= 1914) & (grid_df["day_id_int"] < 1942)]
last28 = last28[["item_id", "wm_yr_wk", "sales"]].merge(
    prices_df[["item_id", "wm_yr_wk", "sell_price"]], on=["item_id", "wm_yr_wk"]
)
last28["sales_usd"] = last28["sales"] * last28["sell_price"]
total_sales_usd = last28.groupby("item_id")[["sales_usd"]].agg(["sum"]).sort_index()
total_sales_usd.columns = total_sales_usd.columns.map("_".join)
total_sales_usd
sales_usd_sum
item_id
FOODS_1_001 3516.80
FOODS_1_002 12418.80
FOODS_1_003 5943.20
FOODS_1_004 54184.82
FOODS_1_005 17877.00
... ...
HOUSEHOLD_2_512 6034.40
HOUSEHOLD_2_513 2668.80
HOUSEHOLD_2_514 9574.60
HOUSEHOLD_2_515 630.40
HOUSEHOLD_2_516 2574.00

3049 行 × 1 列

为了获得权重,我们将单个项的销售额除以所有项的总销售额进行归一化。

weights = total_sales_usd / total_sales_usd.sum()
weights = weights.rename(columns={"sales_usd_sum": "weights"})
weights
weights
item_id
FOODS_1_001 0.000090
FOODS_1_002 0.000318
FOODS_1_003 0.000152
FOODS_1_004 0.001389
FOODS_1_005 0.000458
... ...
HOUSEHOLD_2_512 0.000155
HOUSEHOLD_2_513 0.000068
HOUSEHOLD_2_514 0.000245
HOUSEHOLD_2_515 0.000016
HOUSEHOLD_2_516 0.000066

3049 行 × 1 列

# No longer needed
del grid_df["day_id_int"]

生成滞后特征#

滞后特征是目标变量在先前时间戳的值。滞后特征很有用,因为过去发生的事情通常会影响将来发生的事情。在我们的示例中,我们通过读取 X 天前的销售额来生成滞后特征,其中 X = 28, 29, …, 42。

SHIFT_DAY = 28
LAG_DAYS = [col for col in range(SHIFT_DAY, SHIFT_DAY + 15)]

# Need to first ensure that rows in each time series are sorted by day_id
grid_df_lags = grid_df[["id", "day_id", "sales"]].copy()
grid_df_lags = grid_df_lags.sort_values(["id", "day_id"])

grid_df_lags = grid_df_lags.assign(
    **{
        f"sales_lag_{ld}": grid_df_lags.groupby(["id"])["sales"].shift(ld)
        for ld in LAG_DAYS
    }
)
grid_df_lags
id day_id sales sales_lag_28 sales_lag_29 sales_lag_30 sales_lag_31 sales_lag_32 sales_lag_33 sales_lag_34 sales_lag_35 sales_lag_36 sales_lag_37 sales_lag_38 sales_lag_39 sales_lag_40 sales_lag_41 sales_lag_42
34023 FOODS_1_001_CA_1_evaluation d_1 3.0 <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA>
34024 FOODS_1_001_CA_1_evaluation d_2 0.0 <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA>
34025 FOODS_1_001_CA_1_evaluation d_3 0.0 <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA>
34026 FOODS_1_001_CA_1_evaluation d_4 1.0 <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA>
34027 FOODS_1_001_CA_1_evaluation d_5 4.0 <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA>
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
47733744 HOUSEHOLD_2_516_WI_3_evaluation d_1965 NaN 0.0 0.0 0.0 0.0 1.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0
47733745 HOUSEHOLD_2_516_WI_3_evaluation d_1966 NaN 0.0 0.0 0.0 0.0 0.0 1.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0
47733746 HOUSEHOLD_2_516_WI_3_evaluation d_1967 NaN 0.0 0.0 0.0 0.0 0.0 0.0 1.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0
47733747 HOUSEHOLD_2_516_WI_3_evaluation d_1968 NaN 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0
47733748 HOUSEHOLD_2_516_WI_3_evaluation d_1969 NaN 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.0 0.0 0.0 0.0 0.0 0.0 0.0

47735397 行 × 18 列

计算滚动窗口统计量#

在前面的单元格中,我们使用单个时间戳的销售值来生成滞后特征。为了捕获有关过去的更丰富信息,我们还通过计算滚动窗口统计量来获取多个时间戳上的销售值分布。滚动窗口统计量补充了滞后特征,并提供了有关目标变量过去行为的更多信息。

时间序列预测的特征工程简介中阅读有关滞后特征和滚动窗口统计量的更多信息。

# Shift by 28 days and apply windows of various sizes
print(f"Shift size: {SHIFT_DAY}")
for i in [7, 14, 30, 60, 180]:
    print(f"    Window size: {i}")
    grid_df_lags[f"rolling_mean_{i}"] = (
        grid_df_lags.groupby(["id"])["sales"]
        .shift(SHIFT_DAY)
        .rolling(i)
        .mean()
        .astype(np.float32)
    )
    grid_df_lags[f"rolling_std_{i}"] = (
        grid_df_lags.groupby(["id"])["sales"]
        .shift(SHIFT_DAY)
        .rolling(i)
        .std()
        .astype(np.float32)
    )
Shift size: 28
    Window size: 7
    Window size: 14
    Window size: 30
    Window size: 60
    Window size: 180
grid_df_lags.columns
Index(['id', 'day_id', 'sales', 'sales_lag_28', 'sales_lag_29', 'sales_lag_30',
       'sales_lag_31', 'sales_lag_32', 'sales_lag_33', 'sales_lag_34',
       'sales_lag_35', 'sales_lag_36', 'sales_lag_37', 'sales_lag_38',
       'sales_lag_39', 'sales_lag_40', 'sales_lag_41', 'sales_lag_42',
       'rolling_mean_7', 'rolling_std_7', 'rolling_mean_14', 'rolling_std_14',
       'rolling_mean_30', 'rolling_std_30', 'rolling_mean_60',
       'rolling_std_60', 'rolling_mean_180', 'rolling_std_180'],
      dtype='object')
grid_df_lags.dtypes
id                  category
day_id              category
sales                float32
sales_lag_28         float32
sales_lag_29         float32
sales_lag_30         float32
sales_lag_31         float32
sales_lag_32         float32
sales_lag_33         float32
sales_lag_34         float32
sales_lag_35         float32
sales_lag_36         float32
sales_lag_37         float32
sales_lag_38         float32
sales_lag_39         float32
sales_lag_40         float32
sales_lag_41         float32
sales_lag_42         float32
rolling_mean_7       float32
rolling_std_7        float32
rolling_mean_14      float32
rolling_std_14       float32
rolling_mean_30      float32
rolling_std_30       float32
rolling_mean_60      float32
rolling_std_60       float32
rolling_mean_180     float32
rolling_std_180      float32
dtype: object
grid_df_lags
id day_id sales sales_lag_28 sales_lag_29 sales_lag_30 sales_lag_31 sales_lag_32 sales_lag_33 sales_lag_34 ... rolling_mean_7 rolling_std_7 rolling_mean_14 rolling_std_14 rolling_mean_30 rolling_std_30 rolling_mean_60 rolling_std_60 rolling_mean_180 rolling_std_180
34023 FOODS_1_001_CA_1_evaluation d_1 3.0 <NA> <NA> <NA> <NA> <NA> <NA> <NA> ... <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA>
34024 FOODS_1_001_CA_1_evaluation d_2 0.0 <NA> <NA> <NA> <NA> <NA> <NA> <NA> ... <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA>
34025 FOODS_1_001_CA_1_evaluation d_3 0.0 <NA> <NA> <NA> <NA> <NA> <NA> <NA> ... <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA>
34026 FOODS_1_001_CA_1_evaluation d_4 1.0 <NA> <NA> <NA> <NA> <NA> <NA> <NA> ... <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA>
34027 FOODS_1_001_CA_1_evaluation d_5 4.0 <NA> <NA> <NA> <NA> <NA> <NA> <NA> ... <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA> <NA>
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
47733744 HOUSEHOLD_2_516_WI_3_evaluation d_1965 NaN 0.0 0.0 0.0 0.0 1.0 0.0 0.0 ... 0.142857149 0.377964467 0.071428575 0.267261237 0.06666667 0.253708124 0.033333335 0.181020334 0.077777781 0.288621366
47733745 HOUSEHOLD_2_516_WI_3_evaluation d_1966 NaN 0.0 0.0 0.0 0.0 0.0 1.0 0.0 ... 0.142857149 0.377964467 0.071428575 0.267261237 0.06666667 0.253708124 0.033333335 0.181020334 0.077777781 0.288621366
47733746 HOUSEHOLD_2_516_WI_3_evaluation d_1967 NaN 0.0 0.0 0.0 0.0 0.0 0.0 1.0 ... 0.142857149 0.377964467 0.071428575 0.267261237 0.06666667 0.253708124 0.033333335 0.181020334 0.077777781 0.288621366
47733747 HOUSEHOLD_2_516_WI_3_evaluation d_1968 NaN 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.071428575 0.267261237 0.06666667 0.253708124 0.033333335 0.181020334 0.077777781 0.288621366
47733748 HOUSEHOLD_2_516_WI_3_evaluation d_1969 NaN 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.071428575 0.267261237 0.06666667 0.253708124 0.033333335 0.181020334 0.077777781 0.288621366

47735397 行 × 28 列

目标编码#

分类变量对许多机器学习算法(例如 XGBoost)带来了挑战。克服此挑战的一种方法是使用目标编码,我们将分类变量替换为目标变量的统计量来对其进行编码。在此示例中,我们将使用均值和标准差。

目标编码分类变量中阅读有关目标编码的更多信息。

icols = [["store_id", "dept_id"], ["item_id", "state_id"]]
new_columns = []

grid_df_target_enc = grid_df[
    ["id", "day_id", "item_id", "state_id", "store_id", "dept_id", "sales"]
].copy()
grid_df_target_enc["sales"].fillna(value=0, inplace=True)

for col in icols:
    print(f"Encoding columns {col}")
    col_name = "_" + "_".join(col) + "_"
    grid_df_target_enc["enc" + col_name + "mean"] = (
        grid_df_target_enc.groupby(col)["sales"].transform("mean").astype(np.float32)
    )
    grid_df_target_enc["enc" + col_name + "std"] = (
        grid_df_target_enc.groupby(col)["sales"].transform("std").astype(np.float32)
    )
    new_columns.extend(["enc" + col_name + "mean", "enc" + col_name + "std"])
Encoding columns ['store_id', 'dept_id']
Encoding columns ['item_id', 'state_id']
grid_df_target_enc = grid_df_target_enc[["id", "day_id"] + new_columns]
grid_df_target_enc
id day_id enc_store_id_dept_id_mean enc_store_id_dept_id_std enc_item_id_state_id_mean enc_item_id_state_id_std
0 FOODS_1_001_WI_2_evaluation d_809 1.492988 3.987657 0.433553 0.851153
1 FOODS_1_001_WI_2_evaluation d_810 1.492988 3.987657 0.433553 0.851153
2 FOODS_1_001_WI_2_evaluation d_811 1.492988 3.987657 0.433553 0.851153
3 FOODS_1_001_WI_2_evaluation d_812 1.492988 3.987657 0.433553 0.851153
4 FOODS_1_001_WI_2_evaluation d_813 1.492988 3.987657 0.433553 0.851153
... ... ... ... ... ... ...
47735392 HOUSEHOLD_2_516_WI_3_evaluation d_52 0.257027 0.661541 0.082084 0.299445
47735393 HOUSEHOLD_2_516_WI_3_evaluation d_53 0.257027 0.661541 0.082084 0.299445
47735394 HOUSEHOLD_2_516_WI_3_evaluation d_54 0.257027 0.661541 0.082084 0.299445
47735395 HOUSEHOLD_2_516_WI_3_evaluation d_55 0.257027 0.661541 0.082084 0.299445
47735396 HOUSEHOLD_2_516_WI_3_evaluation d_49 0.257027 0.661541 0.082084 0.299445

47735397 行 × 6 列

grid_df_target_enc.dtypes
id                           category
day_id                       category
enc_store_id_dept_id_mean     float32
enc_store_id_dept_id_std      float32
enc_item_id_state_id_mean     float32
enc_item_id_state_id_std      float32
dtype: object

按商店和产品部门过滤并创建数据段#

在合并了前面 notebook 中生成的所有列后,我们按 store_iddept_id 过滤数据集中的行并创建一个数据段。每个数据段都保存为 pickle 文件,然后上传到 Cloud Storage。

segmented_data_dir = pathlib.Path("./segmented_data/")
segmented_data_dir.mkdir(exist_ok=True)

STORES = [
    "CA_1",
    "CA_2",
    "CA_3",
    "CA_4",
    "TX_1",
    "TX_2",
    "TX_3",
    "WI_1",
    "WI_2",
    "WI_3",
]
DEPTS = [
    "HOBBIES_1",
    "HOBBIES_2",
    "HOUSEHOLD_1",
    "HOUSEHOLD_2",
    "FOODS_1",
    "FOODS_2",
    "FOODS_3",
]

grid2_colnm = [
    "sell_price",
    "price_max",
    "price_min",
    "price_std",
    "price_mean",
    "price_norm",
    "price_nunique",
    "item_nunique",
    "price_momentum",
    "price_momentum_m",
    "price_momentum_y",
]

grid3_colnm = [
    "event_name_1",
    "event_type_1",
    "event_name_2",
    "event_type_2",
    "snap_CA",
    "snap_TX",
    "snap_WI",
    "tm_d",
    "tm_w",
    "tm_m",
    "tm_y",
    "tm_wm",
    "tm_dw",
    "tm_w_end",
]

lag_colnm = [
    "sales_lag_28",
    "sales_lag_29",
    "sales_lag_30",
    "sales_lag_31",
    "sales_lag_32",
    "sales_lag_33",
    "sales_lag_34",
    "sales_lag_35",
    "sales_lag_36",
    "sales_lag_37",
    "sales_lag_38",
    "sales_lag_39",
    "sales_lag_40",
    "sales_lag_41",
    "sales_lag_42",
    "rolling_mean_7",
    "rolling_std_7",
    "rolling_mean_14",
    "rolling_std_14",
    "rolling_mean_30",
    "rolling_std_30",
    "rolling_mean_60",
    "rolling_std_60",
    "rolling_mean_180",
    "rolling_std_180",
]

target_enc_colnm = [
    "enc_store_id_dept_id_mean",
    "enc_store_id_dept_id_std",
    "enc_item_id_state_id_mean",
    "enc_item_id_state_id_std",
]
def prepare_data(store, dept=None):
    """
    Filter and clean data according to stores and product departments

    Parameters
    ----------
    store: Filter data by retaining rows whose store_id matches this parameter.
    dept: Filter data by retaining rows whose dept_id matches this parameter.
          This parameter can be set to None to indicate that we shouldn't filter by dept_id.
    """
    if store is None:
        raise ValueError("store parameter must not be None")

    if dept is None:
        grid1 = grid_df[grid_df["store_id"] == store]
    else:
        grid1 = grid_df[
            (grid_df["store_id"] == store) & (grid_df["dept_id"] == dept)
        ].drop(columns=["dept_id"])
    grid1 = grid1.drop(columns=["release_week", "wm_yr_wk", "store_id", "state_id"])

    grid2 = grid_df_with_price[["id", "day_id"] + grid2_colnm]
    grid_combined = grid1.merge(grid2, on=["id", "day_id"], how="left")
    del grid1, grid2

    grid3 = grid_df_with_calendar[["id", "day_id"] + grid3_colnm]
    grid_combined = grid_combined.merge(grid3, on=["id", "day_id"], how="left")
    del grid3

    lag_df = grid_df_lags[["id", "day_id"] + lag_colnm]
    grid_combined = grid_combined.merge(lag_df, on=["id", "day_id"], how="left")
    del lag_df

    target_enc_df = grid_df_target_enc[["id", "day_id"] + target_enc_colnm]
    grid_combined = grid_combined.merge(target_enc_df, on=["id", "day_id"], how="left")
    del target_enc_df
    gc.collect()

    grid_combined = grid_combined.drop(columns=["id"])
    grid_combined["day_id"] = (
        grid_combined["day_id"]
        .to_pandas()
        .astype("str")
        .apply(lambda x: x[2:])
        .astype(np.int16)
    )

    return grid_combined
# First save the segment to the disk
for store in STORES:
    print(f"Processing store {store}...")
    segment_df = prepare_data(store=store)
    segment_df.to_pandas().to_pickle(
        segmented_data_dir / f"combined_df_store_{store}.pkl"
    )
    del segment_df
    gc.collect()

for store in STORES:
    for dept in DEPTS:
        print(f"Processing (store {store}, department {dept})...")
        segment_df = prepare_data(store=store, dept=dept)
        segment_df.to_pandas().to_pickle(
            segmented_data_dir / f"combined_df_store_{store}_dept_{dept}.pkl"
        )
        del segment_df
        gc.collect()
Processing store CA_1...
Processing store CA_2...
Processing store CA_3...
Processing store CA_4...
Processing store TX_1...
Processing store TX_2...
Processing store TX_3...
Processing store WI_1...
Processing store WI_2...
Processing store WI_3...
Processing (store CA_1, department HOBBIES_1)...
Processing (store CA_1, department HOBBIES_2)...
Processing (store CA_1, department HOUSEHOLD_1)...
Processing (store CA_1, department HOUSEHOLD_2)...
Processing (store CA_1, department FOODS_1)...
Processing (store CA_1, department FOODS_2)...
Processing (store CA_1, department FOODS_3)...
Processing (store CA_2, department HOBBIES_1)...
Processing (store CA_2, department HOBBIES_2)...
Processing (store CA_2, department HOUSEHOLD_1)...
Processing (store CA_2, department HOUSEHOLD_2)...
Processing (store CA_2, department FOODS_1)...
Processing (store CA_2, department FOODS_2)...
Processing (store CA_2, department FOODS_3)...
Processing (store CA_3, department HOBBIES_1)...
Processing (store CA_3, department HOBBIES_2)...
Processing (store CA_3, department HOUSEHOLD_1)...
Processing (store CA_3, department HOUSEHOLD_2)...
Processing (store CA_3, department FOODS_1)...
Processing (store CA_3, department FOODS_2)...
Processing (store CA_3, department FOODS_3)...
Processing (store CA_4, department HOBBIES_1)...
Processing (store CA_4, department HOBBIES_2)...
Processing (store CA_4, department HOUSEHOLD_1)...
Processing (store CA_4, department HOUSEHOLD_2)...
Processing (store CA_4, department FOODS_1)...
Processing (store CA_4, department FOODS_2)...
Processing (store CA_4, department FOODS_3)...
Processing (store TX_1, department HOBBIES_1)...
Processing (store TX_1, department HOBBIES_2)...
Processing (store TX_1, department HOUSEHOLD_1)...
Processing (store TX_1, department HOUSEHOLD_2)...
Processing (store TX_1, department FOODS_1)...
Processing (store TX_1, department FOODS_2)...
Processing (store TX_1, department FOODS_3)...
Processing (store TX_2, department HOBBIES_1)...
Processing (store TX_2, department HOBBIES_2)...
Processing (store TX_2, department HOUSEHOLD_1)...
Processing (store TX_2, department HOUSEHOLD_2)...
Processing (store TX_2, department FOODS_1)...
Processing (store TX_2, department FOODS_2)...
Processing (store TX_2, department FOODS_3)...
Processing (store TX_3, department HOBBIES_1)...
Processing (store TX_3, department HOBBIES_2)...
Processing (store TX_3, department HOUSEHOLD_1)...
Processing (store TX_3, department HOUSEHOLD_2)...
Processing (store TX_3, department FOODS_1)...
Processing (store TX_3, department FOODS_2)...
Processing (store TX_3, department FOODS_3)...
Processing (store WI_1, department HOBBIES_1)...
Processing (store WI_1, department HOBBIES_2)...
Processing (store WI_1, department HOUSEHOLD_1)...
Processing (store WI_1, department HOUSEHOLD_2)...
Processing (store WI_1, department FOODS_1)...
Processing (store WI_1, department FOODS_2)...
Processing (store WI_1, department FOODS_3)...
Processing (store WI_2, department HOBBIES_1)...
Processing (store WI_2, department HOBBIES_2)...
Processing (store WI_2, department HOUSEHOLD_1)...
Processing (store WI_2, department HOUSEHOLD_2)...
Processing (store WI_2, department FOODS_1)...
Processing (store WI_2, department FOODS_2)...
Processing (store WI_2, department FOODS_3)...
Processing (store WI_3, department HOBBIES_1)...
Processing (store WI_3, department HOBBIES_2)...
Processing (store WI_3, department HOUSEHOLD_1)...
Processing (store WI_3, department HOUSEHOLD_2)...
Processing (store WI_3, department FOODS_1)...
Processing (store WI_3, department FOODS_2)...
Processing (store WI_3, department FOODS_3)...
# Then copy the segment to Cloud Storage
fs = gcsfs.GCSFileSystem()

for e in segmented_data_dir.glob("*.pkl"):
    print(f"Uploading {e}...")
    basename = e.name
    fs.put_file(e, f"{bucket_name}/{basename}")
Uploading segmented_data/combined_df_store_CA_3_dept_HOBBIES_2.pkl...
Uploading segmented_data/combined_df_store_TX_3_dept_FOODS_3.pkl...
Uploading segmented_data/combined_df_store_CA_1_dept_HOUSEHOLD_1.pkl...
Uploading segmented_data/combined_df_store_TX_3_dept_HOBBIES_1.pkl...
Uploading segmented_data/combined_df_store_WI_2_dept_HOUSEHOLD_2.pkl...
Uploading segmented_data/combined_df_store_TX_3_dept_HOUSEHOLD_1.pkl...
Uploading segmented_data/combined_df_store_WI_1_dept_HOUSEHOLD_2.pkl...
Uploading segmented_data/combined_df_store_CA_3_dept_HOBBIES_1.pkl...
Uploading segmented_data/combined_df_store_CA_1_dept_FOODS_3.pkl...
Uploading segmented_data/combined_df_store_TX_1_dept_HOBBIES_2.pkl...
Uploading segmented_data/combined_df_store_TX_2_dept_FOODS_3.pkl...
Uploading segmented_data/combined_df_store_CA_2_dept_FOODS_3.pkl...
Uploading segmented_data/combined_df_store_WI_3.pkl...
Uploading segmented_data/combined_df_store_TX_1_dept_HOUSEHOLD_2.pkl...
Uploading segmented_data/combined_df_store_WI_3_dept_FOODS_3.pkl...
Uploading segmented_data/combined_df_store_WI_2_dept_FOODS_1.pkl...
Uploading segmented_data/combined_df_store_TX_1_dept_FOODS_3.pkl...
Uploading segmented_data/combined_df_store_CA_3_dept_FOODS_3.pkl...
Uploading segmented_data/combined_df_store_WI_1_dept_HOBBIES_1.pkl...
Uploading segmented_data/combined_df_store_CA_1_dept_FOODS_2.pkl...
Uploading segmented_data/combined_df_store_TX_1_dept_HOBBIES_1.pkl...
Uploading segmented_data/combined_df_store_CA_3_dept_HOUSEHOLD_2.pkl...
Uploading segmented_data/combined_df_store_TX_1_dept_FOODS_2.pkl...
Uploading segmented_data/combined_df_store_CA_1.pkl...
Uploading segmented_data/combined_df_store_CA_2_dept_FOODS_2.pkl...
Uploading segmented_data/combined_df_store_TX_2_dept_HOUSEHOLD_2.pkl...
Uploading segmented_data/combined_df_store_WI_2.pkl...
Uploading segmented_data/combined_df_store_CA_4_dept_HOUSEHOLD_1.pkl...
Uploading segmented_data/combined_df_store_CA_3_dept_FOODS_1.pkl...
Uploading segmented_data/combined_df_store_WI_1.pkl...
Uploading segmented_data/combined_df_store_WI_1_dept_FOODS_1.pkl...
Uploading segmented_data/combined_df_store_CA_4_dept_FOODS_3.pkl...
Uploading segmented_data/combined_df_store_CA_2_dept_HOUSEHOLD_2.pkl...
Uploading segmented_data/combined_df_store_WI_2_dept_FOODS_2.pkl...
Uploading segmented_data/combined_df_store_CA_2_dept_FOODS_1.pkl...
Uploading segmented_data/combined_df_store_CA_1_dept_FOODS_1.pkl...
Uploading segmented_data/combined_df_store_TX_3.pkl...
Uploading segmented_data/combined_df_store_WI_1_dept_HOBBIES_2.pkl...
Uploading segmented_data/combined_df_store_CA_4.pkl...
Uploading segmented_data/combined_df_store_CA_1_dept_HOBBIES_1.pkl...
Uploading segmented_data/combined_df_store_WI_3_dept_HOUSEHOLD_1.pkl...
Uploading segmented_data/combined_df_store_CA_4_dept_HOUSEHOLD_2.pkl...
Uploading segmented_data/combined_df_store_CA_2_dept_HOBBIES_1.pkl...
Uploading segmented_data/combined_df_store_CA_2_dept_HOUSEHOLD_1.pkl...
Uploading segmented_data/combined_df_store_CA_4_dept_FOODS_1.pkl...
Uploading segmented_data/combined_df_store_WI_1_dept_HOUSEHOLD_1.pkl...
Uploading segmented_data/combined_df_store_CA_3_dept_FOODS_2.pkl...
Uploading segmented_data/combined_df_store_WI_1_dept_FOODS_2.pkl...
Uploading segmented_data/combined_df_store_WI_3_dept_HOBBIES_2.pkl...
Uploading segmented_data/combined_df_store_WI_3_dept_HOBBIES_1.pkl...
Uploading segmented_data/combined_df_store_WI_3_dept_HOUSEHOLD_2.pkl...
Uploading segmented_data/combined_df_store_TX_1_dept_FOODS_1.pkl...
Uploading segmented_data/combined_df_store_CA_3.pkl...
Uploading segmented_data/combined_df_store_TX_2.pkl...
Uploading segmented_data/combined_df_store_WI_2_dept_FOODS_3.pkl...
Uploading segmented_data/combined_df_store_CA_1_dept_HOUSEHOLD_2.pkl...
Uploading segmented_data/combined_df_store_WI_3_dept_FOODS_2.pkl...
Uploading segmented_data/combined_df_store_TX_2_dept_HOUSEHOLD_1.pkl...
Uploading segmented_data/combined_df_store_WI_2_dept_HOBBIES_1.pkl...
Uploading segmented_data/combined_df_store_CA_4_dept_HOBBIES_1.pkl...
Uploading segmented_data/combined_df_store_WI_2_dept_HOUSEHOLD_1.pkl...
Uploading segmented_data/combined_df_store_CA_4_dept_HOBBIES_2.pkl...
Uploading segmented_data/combined_df_store_TX_1_dept_HOUSEHOLD_1.pkl...
Uploading segmented_data/combined_df_store_WI_3_dept_FOODS_1.pkl...
Uploading segmented_data/combined_df_store_TX_3_dept_HOBBIES_2.pkl...
Uploading segmented_data/combined_df_store_CA_2_dept_HOBBIES_2.pkl...
Uploading segmented_data/combined_df_store_WI_1_dept_FOODS_3.pkl...
Uploading segmented_data/combined_df_store_TX_2_dept_FOODS_1.pkl...
Uploading segmented_data/combined_df_store_WI_2_dept_HOBBIES_2.pkl...
Uploading segmented_data/combined_df_store_CA_4_dept_FOODS_2.pkl...
Uploading segmented_data/combined_df_store_TX_3_dept_HOUSEHOLD_2.pkl...
Uploading segmented_data/combined_df_store_TX_2_dept_HOBBIES_2.pkl...
Uploading segmented_data/combined_df_store_TX_2_dept_HOBBIES_1.pkl...
Uploading segmented_data/combined_df_store_TX_2_dept_FOODS_2.pkl...
Uploading segmented_data/combined_df_store_TX_3_dept_FOODS_2.pkl...
Uploading segmented_data/combined_df_store_CA_3_dept_HOUSEHOLD_1.pkl...
Uploading segmented_data/combined_df_store_CA_1_dept_HOBBIES_2.pkl...
Uploading segmented_data/combined_df_store_TX_1.pkl...
Uploading segmented_data/combined_df_store_TX_3_dept_FOODS_1.pkl...
Uploading segmented_data/combined_df_store_CA_2.pkl...
# Also upload the product weights
fs = gcsfs.GCSFileSystem()

weights.to_pandas().to_pickle("product_weights.pkl")
fs.put_file("product_weights.pkl", f"{bucket_name}/product_weights.pkl")

使用超参数优化 (HPO) 进行训练和评估#

现在我们已经完成数据处理,可以训练模型来预测未来销售额了。我们将利用 worker pod 并行运行多个训练作业,以加快超参数搜索速度。

导入模块并定义常量#

import copy
import gc
import json
import pickle
import time

import cudf
import gcsfs
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import optuna
import pandas as pd
import xgboost as xgb
from dask.distributed import Client, wait
from dask_kubernetes.operator import KubeCluster
from matplotlib.patches import Patch
# Choose the same RAPIDS image you used for launching the notebook session
rapids_image = "nvcr.io/nvidia/rapidsai/notebooks:25.04-cuda12.8-py3.12"

# Use the number of worker nodes in your Kubernetes cluster.
n_workers = 2
# Bucket that contains the processed data pickles
bucket_name = "<Put the name of the bucket here>"
bucket_name = "phcho-m5-competition-hpo-example"

# List of stores and product departments
STORES = [
    "CA_1",
    "CA_2",
    "CA_3",
    "CA_4",
    "TX_1",
    "TX_2",
    "TX_3",
    "WI_1",
    "WI_2",
    "WI_3",
]
DEPTS = [
    "HOBBIES_1",
    "HOBBIES_2",
    "HOUSEHOLD_1",
    "HOUSEHOLD_2",
    "FOODS_1",
    "FOODS_2",
    "FOODS_3",
]

定义交叉验证折叠#

交叉验证是一种统计方法,用于估计机器学习模型对独立数据集的泛化能力。该方法也适用于评估给定模型超参数组合的选择。

为了估计泛化能力,我们定义了由多对 (训练集, 验证集) 组成的多个交叉验证折叠。对于每个折叠,我们使用训练集拟合模型,并在验证集上评估其准确性。给定超参数组合的“优劣”分数是模型在每个验证集上的准确性,对所有交叉验证折叠取平均值。

为时间序列数据定义交叉验证折叠时必须非常小心。不允许使用未来来预测过去,因此训练集在时间上必须先于验证集。因此,我们在时间维度上划分数据集,并使用时间范围分配训练集和验证集

# Cross-validation folds and held-out test set (in time dimension)
# The held-out test set is used for final evaluation
cv_folds = [  # (train_set, validation_set)
    ([0, 1114], [1114, 1314]),
    ([0, 1314], [1314, 1514]),
    ([0, 1514], [1514, 1714]),
    ([0, 1714], [1714, 1914]),
]
n_folds = len(cv_folds)
holdout = [1914, 1942]
time_horizon = 1942

使用 Matplotlib 可视化交叉验证折叠很有帮助。

cv_cmap = matplotlib.colormaps["cividis"]
plt.figure(figsize=(8, 3))

for i, (train_mask, valid_mask) in enumerate(cv_folds):
    idx = np.array([np.nan] * time_horizon)
    idx[np.arange(*train_mask)] = 1
    idx[np.arange(*valid_mask)] = 0
    plt.scatter(
        range(time_horizon),
        [i + 0.5] * time_horizon,
        c=idx,
        marker="_",
        capstyle="butt",
        s=1,
        lw=20,
        cmap=cv_cmap,
        vmin=-1.5,
        vmax=1.5,
    )

idx = np.array([np.nan] * time_horizon)
idx[np.arange(*holdout)] = -1
plt.scatter(
    range(time_horizon),
    [n_folds + 0.5] * time_horizon,
    c=idx,
    marker="_",
    capstyle="butt",
    s=1,
    lw=20,
    cmap=cv_cmap,
    vmin=-1.5,
    vmax=1.5,
)

plt.xlabel("Time")
plt.yticks(
    ticks=np.arange(n_folds + 1) + 0.5,
    labels=[f"Fold {i}" for i in range(n_folds)] + ["Holdout"],
)
plt.ylim([len(cv_folds) + 1.2, -0.2])

norm = matplotlib.colors.Normalize(vmin=-1.5, vmax=1.5)
plt.legend(
    [
        Patch(color=cv_cmap(norm(1))),
        Patch(color=cv_cmap(norm(0))),
        Patch(color=cv_cmap(norm(-1))),
    ],
    ["Training set", "Validation set", "Held-out test set"],
    ncol=3,
    loc="best",
)
plt.tight_layout()
../../../_images/42e17786dae8ef3a993ae2ebc92ed8e2b56c8492182da4bf7ace7042f7045e9c.png

在 Kubernetes 上启动 Dask 客户端#

让我们使用 KubeCluster 类来设置 Dask 集群。

cluster = KubeCluster(
    name="rapids-dask",
    image=rapids_image,
    worker_command="dask-cuda-worker",
    n_workers=n_workers,
    resources={"limits": {"nvidia.com/gpu": "1"}},
    env={"EXTRA_PIP_PACKAGES": "optuna gcsfs"},
)

cluster
client = Client(cluster)
client

Client

Client-dcc90a0f-5e32-11ee-8529-fa610e3dbb88

连接方法: Cluster object 集群类型: dask_kubernetes.KubeCluster
仪表盘: http://rapids-dask-scheduler.kubeflow-user-example-com:8787/status

集群信息

定义自定义评估指标#

M5 预测竞赛定义了一个称为 WRMSSE 的自定义指标,如下所示

加权均方根比例误差 (WRMSSE) \[ WRMSSE = \sum w_i \cdot RMSSE_i \]

即 WRMSEE 是所有产品项 \(i\) 的 RMSSE 的加权和。RMSSE 又定义为

其中,均方根比例误差 (RMSSE) \[ RMSSE = \sqrt{\frac{1/h \cdot \sum_t{\left(Y_t - \hat{Y}_t\right)}^2}{1/(n-1)\sum_t{(Y_t - Y_{t-1})}^2}} \]

其中预测(预报)的平方误差按训练数据中销售额单位变化的速率进行归一化。

这里是使用 cuDF 实现的 WRMSSE。我们使用第一个预处理 notebook 中计算的产品权重 \(w_i\)

def wrmsse(product_weights, df, pred_sales, train_mask, valid_mask):
    """Compute WRMSSE metric"""
    df_train = df[(df["day_id"] >= train_mask[0]) & (df["day_id"] < train_mask[1])]
    df_valid = df[(df["day_id"] >= valid_mask[0]) & (df["day_id"] < valid_mask[1])]

    # Compute denominator: 1/(n-1) * sum( (y(t) - y(t-1))**2 )
    diff = (
        df_train.sort_values(["item_id", "day_id"])
        .groupby(["item_id"])[["sales"]]
        .diff(1)
    )
    x = (
        df_train[["item_id", "day_id"]]
        .join(diff, how="left")
        .rename(columns={"sales": "diff"})
        .sort_values(["item_id", "day_id"])
    )
    x["diff"] = x["diff"] ** 2
    xx = x.groupby(["item_id"])[["diff"]].agg(["sum", "count"]).sort_index()
    xx.columns = xx.columns.map("_".join)
    xx["denominator"] = xx["diff_sum"] / xx["diff_count"]
    xx.reset_index()

    # Compute numerator: 1/h * sum( (y(t) - y_pred(t))**2 )
    X_valid = df_valid.drop(columns=["item_id", "cat_id", "day_id", "sales"])
    if "dept_id" in X_valid.columns:
        X_valid = X_valid.drop(columns=["dept_id"])
    df_pred = cudf.DataFrame(
        {
            "item_id": df_valid["item_id"].copy(),
            "pred_sales": pred_sales,
            "sales": df_valid["sales"].copy(),
        }
    )
    df_pred["diff"] = (df_pred["sales"] - df_pred["pred_sales"]) ** 2
    yy = df_pred.groupby(["item_id"])[["diff"]].agg(["sum", "count"]).sort_index()
    yy.columns = yy.columns.map("_".join)
    yy["numerator"] = yy["diff_sum"] / yy["diff_count"]

    zz = yy[["numerator"]].join(xx[["denominator"]], how="left")
    zz = zz.join(product_weights, how="left").sort_index()
    # Filter out zero denominator.
    # This can occur if the product was never on sale during the period in the training set
    zz = zz[zz["denominator"] != 0]
    zz["rmsse"] = np.sqrt(zz["numerator"] / zz["denominator"])
    return zz["rmsse"].multiply(zz["weights"]).sum()

使用 Optuna 定义训练和超参数搜索管道#

Optuna 允许我们迭代地定义训练过程,就好像我们要编写一个普通函数来训练单个模型一样。该函数不再使用固定的超参数组合,而是接受一个 trial 对象,该对象会产生不同的超参数组合。

在此示例中,我们根据商店划分训练数据,然后为每个数据段拟合一个单独的 XGBoost 模型。

def objective(trial):
    fs = gcsfs.GCSFileSystem()
    with fs.open(f"{bucket_name}/product_weights.pkl", "rb") as f:
        product_weights = cudf.DataFrame(pd.read_pickle(f))
    params = {
        "n_estimators": 100,
        "verbosity": 0,
        "learning_rate": 0.01,
        "objective": "reg:tweedie",
        "tree_method": "gpu_hist",
        "grow_policy": "depthwise",
        "predictor": "gpu_predictor",
        "enable_categorical": True,
        "lambda": trial.suggest_float("lambda", 1e-8, 100.0, log=True),
        "alpha": trial.suggest_float("alpha", 1e-8, 100.0, log=True),
        "colsample_bytree": trial.suggest_float("colsample_bytree", 0.2, 1.0),
        "max_depth": trial.suggest_int("max_depth", 2, 6, step=1),
        "min_child_weight": trial.suggest_float(
            "min_child_weight", 1e-8, 100, log=True
        ),
        "gamma": trial.suggest_float("gamma", 1e-8, 1.0, log=True),
        "tweedie_variance_power": trial.suggest_float("tweedie_variance_power", 1, 2),
    }
    scores = [[] for store in STORES]

    for store_id, store in enumerate(STORES):
        print(f"Processing store {store}...")
        with fs.open(f"{bucket_name}/combined_df_store_{store}.pkl", "rb") as f:
            df = cudf.DataFrame(pd.read_pickle(f))
        for train_mask, valid_mask in cv_folds:
            df_train = df[
                (df["day_id"] >= train_mask[0]) & (df["day_id"] < train_mask[1])
            ]
            df_valid = df[
                (df["day_id"] >= valid_mask[0]) & (df["day_id"] < valid_mask[1])
            ]

            X_train, y_train = (
                df_train.drop(
                    columns=["item_id", "dept_id", "cat_id", "day_id", "sales"]
                ),
                df_train["sales"],
            )
            X_valid = df_valid.drop(
                columns=["item_id", "dept_id", "cat_id", "day_id", "sales"]
            )

            clf = xgb.XGBRegressor(**params)
            clf.fit(X_train, y_train)
            pred_sales = clf.predict(X_valid)
            scores[store_id].append(
                wrmsse(product_weights, df, pred_sales, train_mask, valid_mask)
            )
            del df_train, df_valid, X_train, y_train, clf
            gc.collect()
        del df
        gc.collect()

    # We can sum WRMSSE scores over data segments because data segments contain disjoint sets of time series
    return np.array(scores).sum(axis=0).mean()

使用 Dask 集群客户端,我们可以并行执行多个训练作业。Optuna 使用内存中的 Dask 存储来跟踪超参数搜索的进度。

##### Number of hyperparameter combinations to try in parallel
n_trials = 9  # Using a small n_trials so that the demo can finish quickly
# n_trials = 100

# Optimize in parallel on your Dask cluster
backend_storage = optuna.storages.InMemoryStorage()
dask_storage = optuna.integration.DaskStorage(storage=backend_storage, client=client)
study = optuna.create_study(
    direction="minimize",
    sampler=optuna.samplers.RandomSampler(seed=0),
    storage=dask_storage,
)
futures = []
for i in range(0, n_trials, n_workers):
    iter_range = (i, min([i + n_workers, n_trials]))
    futures.append(
        {
            "range": iter_range,
            "futures": [
                client.submit(
                    # Work around bug https://github.com/optuna/optuna/issues/4859
                    lambda objective, n_trials: (
                        study.sampler.reseed_rng(),
                        study.optimize(objective, n_trials),
                    ),
                    objective,
                    n_trials=1,
                    pure=False,
                )
                for _ in range(*iter_range)
            ],
        }
    )

tstart = time.perf_counter()
for partition in futures:
    iter_range = partition["range"]
    print(f"Testing hyperparameter combinations {iter_range[0]}..{iter_range[1]}")
    _ = wait(partition["futures"])
    for fut in partition["futures"]:
        _ = fut.result()  # Ensure that the training job was successful
    tnow = time.perf_counter()
    print(
        f"Best cross-validation metric: {study.best_value}, Time elapsed = {tnow - tstart}"
    )
tend = time.perf_counter()
print(f"Total time elapsed = {tend - tstart}")
/tmp/ipykernel_1321/3389696366.py:7: ExperimentalWarning: DaskStorage is experimental (supported from v3.1.0). The interface can change in the future.
  dask_storage = optuna.integration.DaskStorage(storage=backend_storage, client=client)
Testing hyperparameter combinations 0..2
Best cross-validation metric: 10.027767173304472, Time elapsed = 331.6198390149948
Testing hyperparameter combinations 2..4
Best cross-validation metric: 9.426913749927916, Time elapsed = 640.7606940959959
Testing hyperparameter combinations 4..6
Best cross-validation metric: 9.426913749927916, Time elapsed = 958.0816706369951
Testing hyperparameter combinations 6..8
Best cross-validation metric: 9.426913749927916, Time elapsed = 1295.700604706988
Testing hyperparameter combinations 8..9
Best cross-validation metric: 8.915009508695244, Time elapsed = 1476.1182343699911
Total time elapsed = 1476.1219055669935

超参数搜索完成后,我们使用 study 对象的属性获取最佳超参数组合。

study.best_params
{'lambda': 2.6232990699579064e-06,
 'alpha': 0.004085800094564677,
 'colsample_bytree': 0.4064535567263888,
 'max_depth': 6,
 'min_child_weight': 9.652128310148716e-08,
 'gamma': 3.4446109254037165e-07,
 'tweedie_variance_power': 1.0914258082324833}
study.best_trial
FrozenTrial(number=8, state=TrialState.COMPLETE, values=[8.915009508695244], datetime_start=datetime.datetime(2023, 9, 28, 19, 35, 29, 888497), datetime_complete=datetime.datetime(2023, 9, 28, 19, 38, 30, 299541), params={'lambda': 2.6232990699579064e-06, 'alpha': 0.004085800094564677, 'colsample_bytree': 0.4064535567263888, 'max_depth': 6, 'min_child_weight': 9.652128310148716e-08, 'gamma': 3.4446109254037165e-07, 'tweedie_variance_power': 1.0914258082324833}, user_attrs={}, system_attrs={}, intermediate_values={}, distributions={'lambda': FloatDistribution(high=100.0, log=True, low=1e-08, step=None), 'alpha': FloatDistribution(high=100.0, log=True, low=1e-08, step=None), 'colsample_bytree': FloatDistribution(high=1.0, log=False, low=0.2, step=None), 'max_depth': IntDistribution(high=6, log=False, low=2, step=1), 'min_child_weight': FloatDistribution(high=100.0, log=True, low=1e-08, step=None), 'gamma': FloatDistribution(high=1.0, log=True, low=1e-08, step=None), 'tweedie_variance_power': FloatDistribution(high=2.0, log=False, low=1.0, step=None)}, trial_id=8, value=None)
# Make a deep copy to preserve the dictionary after deleting the Dask cluster
best_params = copy.deepcopy(study.best_params)
best_params
{'lambda': 2.6232990699579064e-06,
 'alpha': 0.004085800094564677,
 'colsample_bytree': 0.4064535567263888,
 'max_depth': 6,
 'min_child_weight': 9.652128310148716e-08,
 'gamma': 3.4446109254037165e-07,
 'tweedie_variance_power': 1.0914258082324833}
fs = gcsfs.GCSFileSystem()
with fs.open(f"{bucket_name}/params.json", "w") as f:
    json.dump(best_params, f)

训练最终的 XGBoost 模型并进行评估#

使用搜索中找到的最佳超参数,使用整个训练数据拟合新模型。如前一节所述,我们为每个数据段拟合一个单独的 XGBoost 模型。

fs = gcsfs.GCSFileSystem()
with fs.open(f"{bucket_name}/params.json", "r") as f:
    best_params = json.load(f)
with fs.open(f"{bucket_name}/product_weights.pkl", "rb") as f:
    product_weights = cudf.DataFrame(pd.read_pickle(f))
def final_train(best_params):
    fs = gcsfs.GCSFileSystem()
    params = {
        "n_estimators": 100,
        "verbosity": 0,
        "learning_rate": 0.01,
        "objective": "reg:tweedie",
        "tree_method": "gpu_hist",
        "grow_policy": "depthwise",
        "predictor": "gpu_predictor",
        "enable_categorical": True,
    }
    params.update(best_params)
    model = {}
    train_mask = [0, 1914]

    for store in STORES:
        print(f"Processing store {store}...")
        with fs.open(f"{bucket_name}/combined_df_store_{store}.pkl", "rb") as f:
            df = cudf.DataFrame(pd.read_pickle(f))

        df_train = df[(df["day_id"] >= train_mask[0]) & (df["day_id"] < train_mask[1])]
        X_train, y_train = (
            df_train.drop(columns=["item_id", "dept_id", "cat_id", "day_id", "sales"]),
            df_train["sales"],
        )

        clf = xgb.XGBRegressor(**params)
        clf.fit(X_train, y_train)
        model[store] = clf
    del df
    gc.collect()

    return model
model = final_train(best_params)
Processing store CA_1...
Processing store CA_2...
Processing store CA_3...
Processing store CA_4...
Processing store TX_1...
Processing store TX_2...
Processing store TX_3...
Processing store WI_1...
Processing store WI_2...
Processing store WI_3...

现在让我们使用保留的测试集来评估最终模型

test_wrmsse = 0
for store in STORES:
    with fs.open(f"{bucket_name}/combined_df_store_{store}.pkl", "rb") as f:
        df = cudf.DataFrame(pd.read_pickle(f))
    df_test = df[(df["day_id"] >= holdout[0]) & (df["day_id"] < holdout[1])]
    X_test = df_test.drop(columns=["item_id", "dept_id", "cat_id", "day_id", "sales"])
    pred_sales = model[store].predict(X_test)
    test_wrmsse += wrmsse(
        product_weights, df, pred_sales, train_mask=[0, 1914], valid_mask=holdout
    )
print(f"WRMSSE metric on the held-out test set: {test_wrmsse}")
WRMSSE metric on the held-out test set: 9.478942050051291
# Save the model to the Cloud Storage
with fs.open(f"{bucket_name}/final_model.pkl", "wb") as f:
    pickle.dump(model, f)

使用不同的销售数据分段策略创建集成模型#

创建集成模型以利用多种机器学习方法获得更好的预测性能是很常见的。集成模型通过对组成模型的预测输出取平均值来进行预测。

在此示例中,我们将通过以不同方式对销售数据进行分段来创建第二个模型。我们将不再按商店进行分割,而是同时按商店和产品类别分割数据。

def objective_alt(trial):
    fs = gcsfs.GCSFileSystem()
    with fs.open(f"{bucket_name}/product_weights.pkl", "rb") as f:
        product_weights = cudf.DataFrame(pd.read_pickle(f))
    params = {
        "n_estimators": 100,
        "verbosity": 0,
        "learning_rate": 0.01,
        "objective": "reg:tweedie",
        "tree_method": "gpu_hist",
        "grow_policy": "depthwise",
        "predictor": "gpu_predictor",
        "enable_categorical": True,
        "lambda": trial.suggest_float("lambda", 1e-8, 100.0, log=True),
        "alpha": trial.suggest_float("alpha", 1e-8, 100.0, log=True),
        "colsample_bytree": trial.suggest_float("colsample_bytree", 0.2, 1.0),
        "max_depth": trial.suggest_int("max_depth", 2, 6, step=1),
        "min_child_weight": trial.suggest_float(
            "min_child_weight", 1e-8, 100, log=True
        ),
        "gamma": trial.suggest_float("gamma", 1e-8, 1.0, log=True),
        "tweedie_variance_power": trial.suggest_float("tweedie_variance_power", 1, 2),
    }
    scores = [[] for i in range(len(STORES) * len(DEPTS))]

    for store_id, store in enumerate(STORES):
        for dept_id, dept in enumerate(DEPTS):
            print(f"Processing store {store}, department {dept}...")
            with fs.open(
                f"{bucket_name}/combined_df_store_{store}_dept_{dept}.pkl", "rb"
            ) as f:
                df = cudf.DataFrame(pd.read_pickle(f))
            for train_mask, valid_mask in cv_folds:
                df_train = df[
                    (df["day_id"] >= train_mask[0]) & (df["day_id"] < train_mask[1])
                ]
                df_valid = df[
                    (df["day_id"] >= valid_mask[0]) & (df["day_id"] < valid_mask[1])
                ]

                X_train, y_train = (
                    df_train.drop(columns=["item_id", "cat_id", "day_id", "sales"]),
                    df_train["sales"],
                )
                X_valid = df_valid.drop(
                    columns=["item_id", "cat_id", "day_id", "sales"]
                )

                clf = xgb.XGBRegressor(**params)
                clf.fit(X_train, y_train)
                sales_pred = clf.predict(X_valid)
                scores[store_id * len(DEPTS) + dept_id].append(
                    wrmsse(product_weights, df, sales_pred, train_mask, valid_mask)
                )
                del df_train, df_valid, X_train, y_train, clf
                gc.collect()
            del df
            gc.collect()

    # We can sum WRMSSE scores over data segments because data segments contain disjoint sets of time series
    return np.array(scores).sum(axis=0).mean()
##### Number of hyperparameter combinations to try in parallel
n_trials = 9  # Using a small n_trials so that the demo can finish quickly
# n_trials = 100

# Optimize in parallel on your Dask cluster
backend_storage = optuna.storages.InMemoryStorage()
dask_storage = optuna.integration.DaskStorage(storage=backend_storage, client=client)
study = optuna.create_study(
    direction="minimize",
    sampler=optuna.samplers.RandomSampler(seed=0),
    storage=dask_storage,
)
futures = []
for i in range(0, n_trials, n_workers):
    iter_range = (i, min([i + n_workers, n_trials]))
    futures.append(
        {
            "range": iter_range,
            "futures": [
                client.submit(
                    # Work around bug https://github.com/optuna/optuna/issues/4859
                    lambda objective, n_trials: (
                        study.sampler.reseed_rng(),
                        study.optimize(objective, n_trials),
                    ),
                    objective_alt,
                    n_trials=1,
                    pure=False,
                )
                for _ in range(*iter_range)
            ],
        }
    )

tstart = time.perf_counter()
for partition in futures:
    iter_range = partition["range"]
    print(f"Testing hyperparameter combinations {iter_range[0]}..{iter_range[1]}")
    _ = wait(partition["futures"])
    for fut in partition["futures"]:
        _ = fut.result()  # Ensure that the training job was successful
    tnow = time.perf_counter()
    print(
        f"Best cross-validation metric: {study.best_value}, Time elapsed = {tnow - tstart}"
    )
tend = time.perf_counter()
print(f"Total time elapsed = {tend - tstart}")
/tmp/ipykernel_1321/491731696.py:7: ExperimentalWarning: DaskStorage is experimental (supported from v3.1.0). The interface can change in the future.
  dask_storage = optuna.integration.DaskStorage(storage=backend_storage, client=client)
Testing hyperparameter combinations 0..2
Best cross-validation metric: 9.896445497438858, Time elapsed = 802.2191872399999
Testing hyperparameter combinations 2..4
Best cross-validation metric: 9.896445497438858, Time elapsed = 1494.0718872279976
Testing hyperparameter combinations 4..6
Best cross-validation metric: 9.835407407395302, Time elapsed = 2393.3159628150024
Testing hyperparameter combinations 6..8
Best cross-validation metric: 9.330048901795887, Time elapsed = 3092.471466117
Testing hyperparameter combinations 8..9
Best cross-validation metric: 9.330048901795887, Time elapsed = 3459.9082761530008
Total time elapsed = 3459.911843854992
# Make a deep copy to preserve the dictionary after deleting the Dask cluster
best_params_alt = copy.deepcopy(study.best_params)
best_params_alt
{'lambda': 0.028794929327421122,
 'alpha': 3.3150619134761685e-07,
 'colsample_bytree': 0.42330433646728755,
 'max_depth': 2,
 'min_child_weight': 0.09713314395591004,
 'gamma': 0.0016337599227941016,
 'tweedie_variance_power': 1.1915217521234043}
fs = gcsfs.GCSFileSystem()
with fs.open(f"{bucket_name}/params_alt.json", "w") as f:
    json.dump(best_params_alt, f)

使用搜索中找到的最佳超参数,使用整个训练数据拟合新模型。

def final_train_alt(best_params):
    fs = gcsfs.GCSFileSystem()
    params = {
        "n_estimators": 100,
        "verbosity": 0,
        "learning_rate": 0.01,
        "objective": "reg:tweedie",
        "tree_method": "gpu_hist",
        "grow_policy": "depthwise",
        "predictor": "gpu_predictor",
        "enable_categorical": True,
    }
    params.update(best_params)
    model = {}
    train_mask = [0, 1914]

    for _, store in enumerate(STORES):
        for _, dept in enumerate(DEPTS):
            print(f"Processing store {store}, department {dept}...")
            with fs.open(
                f"{bucket_name}/combined_df_store_{store}_dept_{dept}.pkl", "rb"
            ) as f:
                df = cudf.DataFrame(pd.read_pickle(f))
            for train_mask, _ in cv_folds:
                df_train = df[
                    (df["day_id"] >= train_mask[0]) & (df["day_id"] < train_mask[1])
                ]
                X_train, y_train = (
                    df_train.drop(columns=["item_id", "cat_id", "day_id", "sales"]),
                    df_train["sales"],
                )

                clf = xgb.XGBRegressor(**params)
                clf.fit(X_train, y_train)
                model[(store, dept)] = clf
            del df
            gc.collect()

    return model
fs = gcsfs.GCSFileSystem()
with fs.open(f"{bucket_name}/params_alt.json", "r") as f:
    best_params_alt = json.load(f)
with fs.open(f"{bucket_name}/product_weights.pkl", "rb") as f:
    product_weights = cudf.DataFrame(pd.read_pickle(f))
model_alt = final_train_alt(best_params_alt)
Processing store CA_1, department HOBBIES_1...
Processing store CA_1, department HOBBIES_2...
Processing store CA_1, department HOUSEHOLD_1...
Processing store CA_1, department HOUSEHOLD_2...
Processing store CA_1, department FOODS_1...
Processing store CA_1, department FOODS_2...
Processing store CA_1, department FOODS_3...
Processing store CA_2, department HOBBIES_1...
Processing store CA_2, department HOBBIES_2...
Processing store CA_2, department HOUSEHOLD_1...
Processing store CA_2, department HOUSEHOLD_2...
Processing store CA_2, department FOODS_1...
Processing store CA_2, department FOODS_2...
Processing store CA_2, department FOODS_3...
Processing store CA_3, department HOBBIES_1...
Processing store CA_3, department HOBBIES_2...
Processing store CA_3, department HOUSEHOLD_1...
Processing store CA_3, department HOUSEHOLD_2...
Processing store CA_3, department FOODS_1...
Processing store CA_3, department FOODS_2...
Processing store CA_3, department FOODS_3...
Processing store CA_4, department HOBBIES_1...
Processing store CA_4, department HOBBIES_2...
Processing store CA_4, department HOUSEHOLD_1...
Processing store CA_4, department HOUSEHOLD_2...
Processing store CA_4, department FOODS_1...
Processing store CA_4, department FOODS_2...
Processing store CA_4, department FOODS_3...
Processing store TX_1, department HOBBIES_1...
Processing store TX_1, department HOBBIES_2...
Processing store TX_1, department HOUSEHOLD_1...
Processing store TX_1, department HOUSEHOLD_2...
Processing store TX_1, department FOODS_1...
Processing store TX_1, department FOODS_2...
Processing store TX_1, department FOODS_3...
Processing store TX_2, department HOBBIES_1...
Processing store TX_2, department HOBBIES_2...
Processing store TX_2, department HOUSEHOLD_1...
Processing store TX_2, department HOUSEHOLD_2...
Processing store TX_2, department FOODS_1...
Processing store TX_2, department FOODS_2...
Processing store TX_2, department FOODS_3...
Processing store TX_3, department HOBBIES_1...
Processing store TX_3, department HOBBIES_2...
Processing store TX_3, department HOUSEHOLD_1...
Processing store TX_3, department HOUSEHOLD_2...
Processing store TX_3, department FOODS_1...
Processing store TX_3, department FOODS_2...
Processing store TX_3, department FOODS_3...
Processing store WI_1, department HOBBIES_1...
Processing store WI_1, department HOBBIES_2...
Processing store WI_1, department HOUSEHOLD_1...
Processing store WI_1, department HOUSEHOLD_2...
Processing store WI_1, department FOODS_1...
Processing store WI_1, department FOODS_2...
Processing store WI_1, department FOODS_3...
Processing store WI_2, department HOBBIES_1...
Processing store WI_2, department HOBBIES_2...
Processing store WI_2, department HOUSEHOLD_1...
Processing store WI_2, department HOUSEHOLD_2...
Processing store WI_2, department FOODS_1...
Processing store WI_2, department FOODS_2...
Processing store WI_2, department FOODS_3...
Processing store WI_3, department HOBBIES_1...
Processing store WI_3, department HOBBIES_2...
Processing store WI_3, department HOUSEHOLD_1...
Processing store WI_3, department HOUSEHOLD_2...
Processing store WI_3, department FOODS_1...
Processing store WI_3, department FOODS_2...
Processing store WI_3, department FOODS_3...
# Save the model to the Cloud Storage
with fs.open(f"{bucket_name}/final_model_alt.pkl", "wb") as f:
    pickle.dump(model_alt, f)

现在考虑由两个模型 modelmodel_alt 组成的集成模型。我们通过计算两个模型预测平均值的 WRMSSE 指标来评估集成模型。

test_wrmsse = 0
for store in STORES:
    print(f"Processing store {store}...")
    # Prediction from Model 1
    with fs.open(f"{bucket_name}/combined_df_store_{store}.pkl", "rb") as f:
        df = cudf.DataFrame(pd.read_pickle(f))
    df_test = df[(df["day_id"] >= holdout[0]) & (df["day_id"] < holdout[1])]
    X_test = df_test.drop(columns=["item_id", "dept_id", "cat_id", "day_id", "sales"])
    df_test["pred1"] = model[store].predict(X_test)

    # Prediction from Model 2
    df_test["pred2"] = [np.nan] * len(df_test)
    df_test["pred2"] = df_test["pred2"].astype("float32")
    for dept in DEPTS:
        with fs.open(
            f"{bucket_name}/combined_df_store_{store}_dept_{dept}.pkl", "rb"
        ) as f:
            df2 = cudf.DataFrame(pd.read_pickle(f))
        df2_test = df2[(df2["day_id"] >= holdout[0]) & (df2["day_id"] < holdout[1])]
        X_test = df2_test.drop(columns=["item_id", "cat_id", "day_id", "sales"])
        assert np.sum(df_test["dept_id"] == dept) == len(X_test)
        df_test["pred2"][df_test["dept_id"] == dept] = model_alt[(store, dept)].predict(
            X_test
        )

    # Average prediction
    df_test["avg_pred"] = (df_test["pred1"] + df_test["pred2"]) / 2.0

    test_wrmsse += wrmsse(
        product_weights,
        df,
        df_test["avg_pred"],
        train_mask=[0, 1914],
        valid_mask=holdout,
    )
print(f"WRMSSE metric on the held-out test set: {test_wrmsse}")
Processing store CA_1...
Processing store CA_2...
Processing store CA_3...
Processing store CA_4...
Processing store TX_1...
Processing store TX_2...
Processing store TX_3...
Processing store WI_1...
Processing store WI_2...
Processing store WI_3...
WRMSSE metric on the held-out test set: 10.69187847848366
# Close the Dask cluster to clean up
cluster.close()

结论#

我们演示了一个端到端的工作流,在该工作流中,我们获取真实世界的时间序列数据,并使用 Google Kubernetes Engine (GKE) 训练预测模型。通过将并行训练作业分派到 NVIDIA GPU,我们能够加快超参数优化 (HPO) 过程。