使用 NVIDIA GPU 在 Google Kubernetes Engine 上执行时间序列预测#
在此示例中,我们将研究一个使用 M5 预测竞赛数据的真实世界时间序列预测示例。Walmart 提供了来自三个州多个商店的历史销售数据,我们的任务是预测未来 28 天的销售额。
先决条件#
准备 GKE 集群#
要运行此示例,您需要一个可用的 Google Kubernetes Engine (GKE) 集群,并且可以访问 NVIDIA GPU。
查看文档
设置可以访问 NVIDIA GPU 的 Google Kubernetes Engine (GKE) 集群。按照Google Kubernetes Engine 中的说明进行操作。
为确保示例顺利运行,请确保您的 GPU 具有充足的内存。此 notebook 已使用 NVIDIA A100 进行测试。
按照以下指南中的说明设置 Dask-Kubernetes 集成
Kubeflow 并非严格必需,但我们强烈推荐它,因为 Kubeflow 提供了一个良好的 notebook 环境,您可以在其中运行此 notebook,位于 k8s 集群内。(您可以选择任何方法;我们在从清单安装 Kubeflow 后测试了此示例。)创建 notebook 环境时,请使用以下配置
2 个 CPU,16 GiB 内存
1 个 NVIDIA GPU
40 GiB 磁盘卷
上传示例中的所有 notebook 后,在 notebook 环境中运行此 notebook (notebook.ipynb
)。
注意:我们将使用 worker pod 来加速训练阶段。预处理步骤将仅在调度器节点上运行。
在 Google Cloud Storage 中准备一个存储桶#
在 Google Cloud Storage 中创建一个新的存储桶。确保 k8s 集群中的 worker pod 对此存储桶具有读/写访问权限。可以通过以下任一方法完成
选项 1:预配 GKE 集群时指定附加范围。
预配新的 GKE 集群时,添加
storage-rw
范围。此选项仅在您从头开始创建新集群时可用。如果您使用的是现有 GKE 集群,请参阅选项 2。示例
gcloud container clusters create my_new_cluster --accelerator type=nvidia-tesla-t4 \
--machine-type n1-standard-32 --zone us-central1-c --release-channel stable \
--num-nodes 5 --scopes=gke-default,storage-rw
选项 2:向关联的服务帐号授予存储桶访问权限。
找出与您的 GKE 集群关联的服务帐号。您可以按如下方式向服务帐号授予存储桶访问权限:导航到 Cloud Storage 控制台,打开存储桶的“存储桶详细信息”页面,打开“权限”选项卡,然后点击“授予访问权限”。
输入您的集群具有读写访问权限的存储桶名称
bucket_name = "<Put the name of the bucket here>"
在 notebook 环境中安装 Python 包#
!pip install kaggle gcsfs dask-kubernetes optuna
# Test if the bucket is accessible
import gcsfs
fs = gcsfs.GCSFileSystem()
fs.ls(f"{bucket_name}/")
[]
从 Kaggle 获取时间序列数据集#
如果您还没有 Kaggle 帐号,请立即创建一个。然后按照Kaggle 公共 API 文档中的说明获取 API 密钥。此步骤是获取 M5 预测竞赛训练数据所必需的。获取 API 密钥后,填写以下内容
kaggle_username = "<Put your Kaggle username here>"
kaggle_api_key = "<Put your Kaggle API key here>"
现在我们可以下载数据集了
%env KAGGLE_USERNAME=$kaggle_username
%env KAGGLE_KEY=$kaggle_api_key
!kaggle competitions download -c m5-forecasting-accuracy
让我们解压 ZIP 存档文件,看看里面有什么。
import zipfile
with zipfile.ZipFile("m5-forecasting-accuracy.zip", "r") as zf:
zf.extractall(path="./data")
!ls -lh data/*.csv
-rw-r--r-- 1 rapids conda 102K Sep 28 18:59 data/calendar.csv
-rw-r--r-- 1 rapids conda 117M Sep 28 18:59 data/sales_train_evaluation.csv
-rw-r--r-- 1 rapids conda 115M Sep 28 18:59 data/sales_train_validation.csv
-rw-r--r-- 1 rapids conda 5.0M Sep 28 18:59 data/sample_submission.csv
-rw-r--r-- 1 rapids conda 194M Sep 28 18:59 data/sell_prices.csv
数据预处理#
现在我们可以运行预处理步骤了。
导入模块并定义实用函数#
import gc
import pathlib
import cudf
import gcsfs
import numpy as np
def sizeof_fmt(num, suffix="B"):
for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
if abs(num) < 1024.0:
return f"{num:3.1f}{unit}{suffix}"
num /= 1024.0
return f"{num:.1f}Yi{suffix}"
def report_dataframe_size(df, name):
mem_usage = sizeof_fmt(grid_df.memory_usage(index=True).sum())
print(f"{name} takes up {mem_usage} memory on GPU")
加载数据#
TARGET = "sales" # Our main target
END_TRAIN = 1941 # Last day in train set
raw_data_dir = pathlib.Path("./data/")
train_df = cudf.read_csv(raw_data_dir / "sales_train_evaluation.csv")
prices_df = cudf.read_csv(raw_data_dir / "sell_prices.csv")
calendar_df = cudf.read_csv(raw_data_dir / "calendar.csv").rename(
columns={"d": "day_id"}
)
train_df
id | item_id | dept_id | cat_id | store_id | state_id | d_1 | d_2 | d_3 | d_4 | ... | d_1932 | d_1933 | d_1934 | d_1935 | d_1936 | d_1937 | d_1938 | d_1939 | d_1940 | d_1941 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | HOBBIES_1_001_CA_1_evaluation | HOBBIES_1_001 | HOBBIES_1 | HOBBIES | CA_1 | CA | 0 | 0 | 0 | 0 | ... | 2 | 4 | 0 | 0 | 0 | 0 | 3 | 3 | 0 | 1 |
1 | HOBBIES_1_002_CA_1_evaluation | HOBBIES_1_002 | HOBBIES_1 | HOBBIES | CA_1 | CA | 0 | 0 | 0 | 0 | ... | 0 | 1 | 2 | 1 | 1 | 0 | 0 | 0 | 0 | 0 |
2 | HOBBIES_1_003_CA_1_evaluation | HOBBIES_1_003 | HOBBIES_1 | HOBBIES | CA_1 | CA | 0 | 0 | 0 | 0 | ... | 1 | 0 | 2 | 0 | 0 | 0 | 2 | 3 | 0 | 1 |
3 | HOBBIES_1_004_CA_1_evaluation | HOBBIES_1_004 | HOBBIES_1 | HOBBIES | CA_1 | CA | 0 | 0 | 0 | 0 | ... | 1 | 1 | 0 | 4 | 0 | 1 | 3 | 0 | 2 | 6 |
4 | HOBBIES_1_005_CA_1_evaluation | HOBBIES_1_005 | HOBBIES_1 | HOBBIES | CA_1 | CA | 0 | 0 | 0 | 0 | ... | 0 | 0 | 0 | 2 | 1 | 0 | 0 | 2 | 1 | 0 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
30485 | FOODS_3_823_WI_3_evaluation | FOODS_3_823 | FOODS_3 | FOODS | WI_3 | WI | 0 | 0 | 2 | 2 | ... | 1 | 0 | 3 | 0 | 1 | 1 | 0 | 0 | 1 | 1 |
30486 | FOODS_3_824_WI_3_evaluation | FOODS_3_824 | FOODS_3 | FOODS | WI_3 | WI | 0 | 0 | 0 | 0 | ... | 0 | 0 | 0 | 0 | 0 | 0 | 1 | 0 | 1 | 0 |
30487 | FOODS_3_825_WI_3_evaluation | FOODS_3_825 | FOODS_3 | FOODS | WI_3 | WI | 0 | 6 | 0 | 2 | ... | 0 | 0 | 1 | 2 | 0 | 1 | 0 | 1 | 0 | 2 |
30488 | FOODS_3_826_WI_3_evaluation | FOODS_3_826 | FOODS_3 | FOODS | WI_3 | WI | 0 | 0 | 0 | 0 | ... | 1 | 1 | 1 | 4 | 6 | 0 | 1 | 1 | 1 | 0 |
30489 | FOODS_3_827_WI_3_evaluation | FOODS_3_827 | FOODS_3 | FOODS | WI_3 | WI | 0 | 0 | 0 | 0 | ... | 1 | 2 | 0 | 5 | 4 | 0 | 2 | 2 | 5 | 1 |
30490 行 × 1947 列
列 d_1
, d_2
, …, d_1941
表示从 2011-01-29 起第 1、2、…、1941 天的销售数据。
prices_df
store_id | item_id | wm_yr_wk | sell_price | |
---|---|---|---|---|
0 | CA_1 | HOBBIES_1_001 | 11325 | 9.58 |
1 | CA_1 | HOBBIES_1_001 | 11326 | 9.58 |
2 | CA_1 | HOBBIES_1_001 | 11327 | 8.26 |
3 | CA_1 | HOBBIES_1_001 | 11328 | 8.26 |
4 | CA_1 | HOBBIES_1_001 | 11329 | 8.26 |
... | ... | ... | ... | ... |
6841116 | WI_3 | FOODS_3_827 | 11617 | 1.00 |
6841117 | WI_3 | FOODS_3_827 | 11618 | 1.00 |
6841118 | WI_3 | FOODS_3_827 | 11619 | 1.00 |
6841119 | WI_3 | FOODS_3_827 | 11620 | 1.00 |
6841120 | WI_3 | FOODS_3_827 | 11621 | 1.00 |
6841121 行 × 4 列
calendar_df
date | wm_yr_wk | weekday | wday | month | year | day_id | event_name_1 | event_type_1 | event_name_2 | event_type_2 | snap_CA | snap_TX | snap_WI | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2011-01-29 | 11101 | Saturday | 1 | 1 | 2011 | d_1 | <NA> | <NA> | <NA> | <NA> | 0 | 0 | 0 |
1 | 2011-01-30 | 11101 | Sunday | 2 | 1 | 2011 | d_2 | <NA> | <NA> | <NA> | <NA> | 0 | 0 | 0 |
2 | 2011-01-31 | 11101 | Monday | 3 | 1 | 2011 | d_3 | <NA> | <NA> | <NA> | <NA> | 0 | 0 | 0 |
3 | 2011-02-01 | 11101 | Tuesday | 4 | 2 | 2011 | d_4 | <NA> | <NA> | <NA> | <NA> | 1 | 1 | 0 |
4 | 2011-02-02 | 11101 | Wednesday | 5 | 2 | 2011 | d_5 | <NA> | <NA> | <NA> | <NA> | 1 | 0 | 1 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
1964 | 2016-06-15 | 11620 | Wednesday | 5 | 6 | 2016 | d_1965 | <NA> | <NA> | <NA> | <NA> | 0 | 1 | 1 |
1965 | 2016-06-16 | 11620 | Thursday | 6 | 6 | 2016 | d_1966 | <NA> | <NA> | <NA> | <NA> | 0 | 0 | 0 |
1966 | 2016-06-17 | 11620 | Friday | 7 | 6 | 2016 | d_1967 | <NA> | <NA> | <NA> | <NA> | 0 | 0 | 0 |
1967 | 2016-06-18 | 11621 | Saturday | 1 | 6 | 2016 | d_1968 | <NA> | <NA> | <NA> | <NA> | 0 | 0 | 0 |
1968 | 2016-06-19 | 11621 | Sunday | 2 | 6 | 2016 | d_1969 | NBAFinalsEnd | Sporting | Father's day | Cultural | 0 | 0 | 0 |
1969 行 × 14 列
重新格式化销售时间序列数据#
使用 cudf.melt
将列 d_1
, d_2
, …, d_1941
转换为单独的行。
index_columns = ["id", "item_id", "dept_id", "cat_id", "store_id", "state_id"]
grid_df = cudf.melt(
train_df, id_vars=index_columns, var_name="day_id", value_name=TARGET
)
grid_df
id | item_id | dept_id | cat_id | store_id | state_id | day_id | sales | |
---|---|---|---|---|---|---|---|---|
0 | HOBBIES_1_001_CA_1_evaluation | HOBBIES_1_001 | HOBBIES_1 | HOBBIES | CA_1 | CA | d_1 | 0 |
1 | HOBBIES_1_002_CA_1_evaluation | HOBBIES_1_002 | HOBBIES_1 | HOBBIES | CA_1 | CA | d_1 | 0 |
2 | HOBBIES_1_003_CA_1_evaluation | HOBBIES_1_003 | HOBBIES_1 | HOBBIES | CA_1 | CA | d_1 | 0 |
3 | HOBBIES_1_004_CA_1_evaluation | HOBBIES_1_004 | HOBBIES_1 | HOBBIES | CA_1 | CA | d_1 | 0 |
4 | HOBBIES_1_005_CA_1_evaluation | HOBBIES_1_005 | HOBBIES_1 | HOBBIES | CA_1 | CA | d_1 | 0 |
... | ... | ... | ... | ... | ... | ... | ... | ... |
59181085 | FOODS_3_823_WI_3_evaluation | FOODS_3_823 | FOODS_3 | FOODS | WI_3 | WI | d_1941 | 1 |
59181086 | FOODS_3_824_WI_3_evaluation | FOODS_3_824 | FOODS_3 | FOODS | WI_3 | WI | d_1941 | 0 |
59181087 | FOODS_3_825_WI_3_evaluation | FOODS_3_825 | FOODS_3 | FOODS | WI_3 | WI | d_1941 | 2 |
59181088 | FOODS_3_826_WI_3_evaluation | FOODS_3_826 | FOODS_3 | FOODS | WI_3 | WI | d_1941 | 0 |
59181089 | FOODS_3_827_WI_3_evaluation | FOODS_3_827 | FOODS_3 | FOODS | WI_3 | WI | d_1941 | 1 |
59181090 行 × 8 列
对于每个时间序列,添加对应于未来预测范围的 28 行
add_grid = cudf.DataFrame()
for i in range(1, 29):
temp_df = train_df[index_columns]
temp_df = temp_df.drop_duplicates()
temp_df["day_id"] = "d_" + str(END_TRAIN + i)
temp_df[TARGET] = np.nan # Sales amount at time (n + i) is unknown
add_grid = cudf.concat([add_grid, temp_df])
add_grid["day_id"] = add_grid["day_id"].astype(
"category"
) # The day_id column is categorical, after cudf.melt
grid_df = cudf.concat([grid_df, add_grid])
grid_df = grid_df.reset_index(drop=True)
grid_df["sales"] = grid_df["sales"].astype(
np.float32
) # Use float32 type for sales column, to conserve memory
grid_df
id | item_id | dept_id | cat_id | store_id | state_id | day_id | sales | |
---|---|---|---|---|---|---|---|---|
0 | HOBBIES_1_001_CA_1_evaluation | HOBBIES_1_001 | HOBBIES_1 | HOBBIES | CA_1 | CA | d_1 | 0.0 |
1 | HOBBIES_1_002_CA_1_evaluation | HOBBIES_1_002 | HOBBIES_1 | HOBBIES | CA_1 | CA | d_1 | 0.0 |
2 | HOBBIES_1_003_CA_1_evaluation | HOBBIES_1_003 | HOBBIES_1 | HOBBIES | CA_1 | CA | d_1 | 0.0 |
3 | HOBBIES_1_004_CA_1_evaluation | HOBBIES_1_004 | HOBBIES_1 | HOBBIES | CA_1 | CA | d_1 | 0.0 |
4 | HOBBIES_1_005_CA_1_evaluation | HOBBIES_1_005 | HOBBIES_1 | HOBBIES | CA_1 | CA | d_1 | 0.0 |
... | ... | ... | ... | ... | ... | ... | ... | ... |
60034805 | FOODS_3_823_WI_3_evaluation | FOODS_3_823 | FOODS_3 | FOODS | WI_3 | WI | d_1969 | NaN |
60034806 | FOODS_3_824_WI_3_evaluation | FOODS_3_824 | FOODS_3 | FOODS | WI_3 | WI | d_1969 | NaN |
60034807 | FOODS_3_825_WI_3_evaluation | FOODS_3_825 | FOODS_3 | FOODS | WI_3 | WI | d_1969 | NaN |
60034808 | FOODS_3_826_WI_3_evaluation | FOODS_3_826 | FOODS_3 | FOODS | WI_3 | WI | d_1969 | NaN |
60034809 | FOODS_3_827_WI_3_evaluation | FOODS_3_827 | FOODS_3 | FOODS | WI_3 | WI | d_1969 | NaN |
60034810 行 × 8 列
释放 GPU 内存#
GPU 内存是宝贵的资源,因此让我们尝试释放一些内存。首先,删除我们不再需要的临时变量
# Use xdel magic to scrub extra references from Jupyter notebook
%xdel temp_df
%xdel add_grid
%xdel train_df
# Invoke the garbage collector explicitly to free up memory
gc.collect()
8136
其次,通过将字符串转换为分类变量来减小 grid_df
的占用空间
report_dataframe_size(grid_df, "grid_df")
grid_df takes up 5.2GiB memory on GPU
grid_df.dtypes
id object
item_id object
dept_id object
cat_id object
store_id object
state_id object
day_id category
sales float32
dtype: object
for col in index_columns:
grid_df[col] = grid_df[col].astype("category")
gc.collect()
report_dataframe_size(grid_df, "grid_df")
grid_df takes up 802.6MiB memory on GPU
grid_df.dtypes
id category
item_id category
dept_id category
cat_id category
store_id category
state_id category
day_id category
sales float32
dtype: object
确定每个产品的发布周#
prices_df
表中的每一行包含某个产品在某商店在特定周的售价。
prices_df
store_id | item_id | wm_yr_wk | sell_price | |
---|---|---|---|---|
0 | CA_1 | HOBBIES_1_001 | 11325 | 9.58 |
1 | CA_1 | HOBBIES_1_001 | 11326 | 9.58 |
2 | CA_1 | HOBBIES_1_001 | 11327 | 8.26 |
3 | CA_1 | HOBBIES_1_001 | 11328 | 8.26 |
4 | CA_1 | HOBBIES_1_001 | 11329 | 8.26 |
... | ... | ... | ... | ... |
6841116 | WI_3 | FOODS_3_827 | 11617 | 1.00 |
6841117 | WI_3 | FOODS_3_827 | 11618 | 1.00 |
6841118 | WI_3 | FOODS_3_827 | 11619 | 1.00 |
6841119 | WI_3 | FOODS_3_827 | 11620 | 1.00 |
6841120 | WI_3 | FOODS_3_827 | 11621 | 1.00 |
6841121 行 × 4 列
请注意,并非所有产品都在每周销售。有些产品仅在某些周销售。让我们使用 groupby 操作来确定每个产品上架的第一周。
release_df = (
prices_df.groupby(["store_id", "item_id"])["wm_yr_wk"].agg("min").reset_index()
)
release_df.columns = ["store_id", "item_id", "release_week"]
release_df
store_id | item_id | release_week | |
---|---|---|---|
0 | CA_4 | FOODS_3_529 | 11421 |
1 | TX_1 | HOUSEHOLD_1_409 | 11230 |
2 | WI_2 | FOODS_3_145 | 11214 |
3 | CA_4 | HOUSEHOLD_1_494 | 11106 |
4 | WI_3 | HOBBIES_1_093 | 11223 |
... | ... | ... | ... |
30485 | CA_3 | HOUSEHOLD_1_369 | 11205 |
30486 | CA_2 | FOODS_3_109 | 11101 |
30487 | CA_4 | FOODS_2_119 | 11101 |
30488 | CA_4 | HOUSEHOLD_2_384 | 11110 |
30489 | WI_3 | HOBBIES_1_135 | 11328 |
30490 行 × 3 列
现在我们已经计算出每个产品的发布周,让我们将其合并回 grid_df
grid_df = grid_df.merge(release_df, on=["store_id", "item_id"], how="left")
grid_df = grid_df.sort_values(index_columns + ["day_id"]).reset_index(drop=True)
grid_df
id | item_id | dept_id | cat_id | store_id | state_id | day_id | sales | release_week | |
---|---|---|---|---|---|---|---|---|---|
0 | FOODS_1_001_CA_1_evaluation | FOODS_1_001 | FOODS_1 | FOODS | CA_1 | CA | d_1 | 3.0 | 11101 |
1 | FOODS_1_001_CA_1_evaluation | FOODS_1_001 | FOODS_1 | FOODS | CA_1 | CA | d_2 | 0.0 | 11101 |
2 | FOODS_1_001_CA_1_evaluation | FOODS_1_001 | FOODS_1 | FOODS | CA_1 | CA | d_3 | 0.0 | 11101 |
3 | FOODS_1_001_CA_1_evaluation | FOODS_1_001 | FOODS_1 | FOODS | CA_1 | CA | d_4 | 1.0 | 11101 |
4 | FOODS_1_001_CA_1_evaluation | FOODS_1_001 | FOODS_1 | FOODS | CA_1 | CA | d_5 | 4.0 | 11101 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
60034805 | HOUSEHOLD_2_516_WI_3_evaluation | HOUSEHOLD_2_516 | HOUSEHOLD_2 | HOUSEHOLD | WI_3 | WI | d_1965 | NaN | 11101 |
60034806 | HOUSEHOLD_2_516_WI_3_evaluation | HOUSEHOLD_2_516 | HOUSEHOLD_2 | HOUSEHOLD | WI_3 | WI | d_1966 | NaN | 11101 |
60034807 | HOUSEHOLD_2_516_WI_3_evaluation | HOUSEHOLD_2_516 | HOUSEHOLD_2 | HOUSEHOLD | WI_3 | WI | d_1967 | NaN | 11101 |
60034808 | HOUSEHOLD_2_516_WI_3_evaluation | HOUSEHOLD_2_516 | HOUSEHOLD_2 | HOUSEHOLD | WI_3 | WI | d_1968 | NaN | 11101 |
60034809 | HOUSEHOLD_2_516_WI_3_evaluation | HOUSEHOLD_2_516 | HOUSEHOLD_2 | HOUSEHOLD | WI_3 | WI | d_1969 | NaN | 11101 |
60034810 行 × 9 列
del release_df # No longer needed
gc.collect()
139
report_dataframe_size(grid_df, "grid_df")
grid_df takes up 1.2GiB memory on GPU
过滤掉零销售额的条目#
我们可以通过删除 grid_df
中对应于零销售额的行来进一步节省空间。由于每个产品直到其发布周才上架,因此在发布周之前的任何一周其销售额必定为零。
为了利用这一见解,我们从 calendar_df
中引入 wm_yr_wk
列
grid_df = grid_df.merge(calendar_df[["wm_yr_wk", "day_id"]], on=["day_id"], how="left")
grid_df
id | item_id | dept_id | cat_id | store_id | state_id | day_id | sales | release_week | wm_yr_wk | |
---|---|---|---|---|---|---|---|---|---|---|
0 | FOODS_1_001_WI_2_evaluation | FOODS_1_001 | FOODS_1 | FOODS | WI_2 | WI | d_809 | 0.0 | 11101 | 11312 |
1 | FOODS_1_001_WI_2_evaluation | FOODS_1_001 | FOODS_1 | FOODS | WI_2 | WI | d_810 | 0.0 | 11101 | 11312 |
2 | FOODS_1_001_WI_2_evaluation | FOODS_1_001 | FOODS_1 | FOODS | WI_2 | WI | d_811 | 2.0 | 11101 | 11312 |
3 | FOODS_1_001_WI_2_evaluation | FOODS_1_001 | FOODS_1 | FOODS | WI_2 | WI | d_812 | 0.0 | 11101 | 11312 |
4 | FOODS_1_001_WI_2_evaluation | FOODS_1_001 | FOODS_1 | FOODS | WI_2 | WI | d_813 | 1.0 | 11101 | 11313 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
60034805 | HOUSEHOLD_2_516_WI_3_evaluation | HOUSEHOLD_2_516 | HOUSEHOLD_2 | HOUSEHOLD | WI_3 | WI | d_52 | 0.0 | 11101 | 11108 |
60034806 | HOUSEHOLD_2_516_WI_3_evaluation | HOUSEHOLD_2_516 | HOUSEHOLD_2 | HOUSEHOLD | WI_3 | WI | d_53 | 0.0 | 11101 | 11108 |
60034807 | HOUSEHOLD_2_516_WI_3_evaluation | HOUSEHOLD_2_516 | HOUSEHOLD_2 | HOUSEHOLD | WI_3 | WI | d_54 | 0.0 | 11101 | 11108 |
60034808 | HOUSEHOLD_2_516_WI_3_evaluation | HOUSEHOLD_2_516 | HOUSEHOLD_2 | HOUSEHOLD | WI_3 | WI | d_55 | 0.0 | 11101 | 11108 |
60034809 | HOUSEHOLD_2_516_WI_3_evaluation | HOUSEHOLD_2_516 | HOUSEHOLD_2 | HOUSEHOLD | WI_3 | WI | d_49 | 0.0 | 11101 | 11107 |
60034810 行 × 10 列
report_dataframe_size(grid_df, "grid_df")
grid_df takes up 1.7GiB memory on GPU
列 wm_yr_wk
标识了包含由 day_id
列给定的日期的周。现在,让我们过滤 grid_df
中所有 wm_yr_wk
小于 release_week
的行
df = grid_df[grid_df["wm_yr_wk"] < grid_df["release_week"]]
df
id | item_id | dept_id | cat_id | store_id | state_id | day_id | sales | release_week | wm_yr_wk | |
---|---|---|---|---|---|---|---|---|---|---|
6766 | FOODS_1_002_TX_1_evaluation | FOODS_1_002 | FOODS_1 | FOODS | TX_1 | TX | d_1 | 0.0 | 11102 | 11101 |
6767 | FOODS_1_002_TX_1_evaluation | FOODS_1_002 | FOODS_1 | FOODS | TX_1 | TX | d_2 | 0.0 | 11102 | 11101 |
19686 | FOODS_1_001_TX_3_evaluation | FOODS_1_001 | FOODS_1 | FOODS | TX_3 | TX | d_1 | 0.0 | 11102 | 11101 |
19687 | FOODS_1_001_TX_3_evaluation | FOODS_1_001 | FOODS_1 | FOODS | TX_3 | TX | d_2 | 0.0 | 11102 | 11101 |
19688 | FOODS_1_001_TX_3_evaluation | FOODS_1_001 | FOODS_1 | FOODS | TX_3 | TX | d_3 | 0.0 | 11102 | 11101 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
60033493 | HOUSEHOLD_2_516_WI_2_evaluation | HOUSEHOLD_2_516 | HOUSEHOLD_2 | HOUSEHOLD | WI_2 | WI | d_20 | 0.0 | 11106 | 11103 |
60033494 | HOUSEHOLD_2_516_WI_2_evaluation | HOUSEHOLD_2_516 | HOUSEHOLD_2 | HOUSEHOLD | WI_2 | WI | d_21 | 0.0 | 11106 | 11103 |
60033495 | HOUSEHOLD_2_516_WI_2_evaluation | HOUSEHOLD_2_516 | HOUSEHOLD_2 | HOUSEHOLD | WI_2 | WI | d_22 | 0.0 | 11106 | 11104 |
60033496 | HOUSEHOLD_2_516_WI_2_evaluation | HOUSEHOLD_2_516 | HOUSEHOLD_2 | HOUSEHOLD | WI_2 | WI | d_23 | 0.0 | 11106 | 11104 |
60033497 | HOUSEHOLD_2_516_WI_2_evaluation | HOUSEHOLD_2_516 | HOUSEHOLD_2 | HOUSEHOLD | WI_2 | WI | d_24 | 0.0 | 11106 | 11104 |
12299413 行 × 10 列
正如我们所怀疑的,在发布周之前的几周,销售额为零。
assert (df["sales"] == 0).all()
为了数据分析的目的,我们可以安全地删除销售额为零的行
grid_df = grid_df[grid_df["wm_yr_wk"] >= grid_df["release_week"]].reset_index(drop=True)
grid_df["wm_yr_wk"] = grid_df["wm_yr_wk"].astype(
np.int32
) # Convert wm_yr_wk column to int32, to conserve memory
grid_df
id | item_id | dept_id | cat_id | store_id | state_id | day_id | sales | release_week | wm_yr_wk | |
---|---|---|---|---|---|---|---|---|---|---|
0 | FOODS_1_001_WI_2_evaluation | FOODS_1_001 | FOODS_1 | FOODS | WI_2 | WI | d_809 | 0.0 | 11101 | 11312 |
1 | FOODS_1_001_WI_2_evaluation | FOODS_1_001 | FOODS_1 | FOODS | WI_2 | WI | d_810 | 0.0 | 11101 | 11312 |
2 | FOODS_1_001_WI_2_evaluation | FOODS_1_001 | FOODS_1 | FOODS | WI_2 | WI | d_811 | 2.0 | 11101 | 11312 |
3 | FOODS_1_001_WI_2_evaluation | FOODS_1_001 | FOODS_1 | FOODS | WI_2 | WI | d_812 | 0.0 | 11101 | 11312 |
4 | FOODS_1_001_WI_2_evaluation | FOODS_1_001 | FOODS_1 | FOODS | WI_2 | WI | d_813 | 1.0 | 11101 | 11313 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
47735392 | HOUSEHOLD_2_516_WI_3_evaluation | HOUSEHOLD_2_516 | HOUSEHOLD_2 | HOUSEHOLD | WI_3 | WI | d_52 | 0.0 | 11101 | 11108 |
47735393 | HOUSEHOLD_2_516_WI_3_evaluation | HOUSEHOLD_2_516 | HOUSEHOLD_2 | HOUSEHOLD | WI_3 | WI | d_53 | 0.0 | 11101 | 11108 |
47735394 | HOUSEHOLD_2_516_WI_3_evaluation | HOUSEHOLD_2_516 | HOUSEHOLD_2 | HOUSEHOLD | WI_3 | WI | d_54 | 0.0 | 11101 | 11108 |
47735395 | HOUSEHOLD_2_516_WI_3_evaluation | HOUSEHOLD_2_516 | HOUSEHOLD_2 | HOUSEHOLD | WI_3 | WI | d_55 | 0.0 | 11101 | 11108 |
47735396 | HOUSEHOLD_2_516_WI_3_evaluation | HOUSEHOLD_2_516 | HOUSEHOLD_2 | HOUSEHOLD | WI_3 | WI | d_49 | 0.0 | 11101 | 11107 |
47735397 行 × 10 列
report_dataframe_size(grid_df, "grid_df")
grid_df takes up 1.2GiB memory on GPU
为产品项分配权重#
当我们评估机器学习模型的准确性时,应为每个产品项分配一个权重,以指示该项的相对重要性。对于 M5 竞赛,权重是根据最后 28 天的总销售额(美元)计算得出的。
# Convert day_id to integers
grid_df["day_id_int"] = grid_df["day_id"].to_pandas().apply(lambda x: x[2:]).astype(int)
# Compute the total sales over the latest 28 days, per product item
last28 = grid_df[(grid_df["day_id_int"] >= 1914) & (grid_df["day_id_int"] < 1942)]
last28 = last28[["item_id", "wm_yr_wk", "sales"]].merge(
prices_df[["item_id", "wm_yr_wk", "sell_price"]], on=["item_id", "wm_yr_wk"]
)
last28["sales_usd"] = last28["sales"] * last28["sell_price"]
total_sales_usd = last28.groupby("item_id")[["sales_usd"]].agg(["sum"]).sort_index()
total_sales_usd.columns = total_sales_usd.columns.map("_".join)
total_sales_usd
sales_usd_sum | |
---|---|
item_id | |
FOODS_1_001 | 3516.80 |
FOODS_1_002 | 12418.80 |
FOODS_1_003 | 5943.20 |
FOODS_1_004 | 54184.82 |
FOODS_1_005 | 17877.00 |
... | ... |
HOUSEHOLD_2_512 | 6034.40 |
HOUSEHOLD_2_513 | 2668.80 |
HOUSEHOLD_2_514 | 9574.60 |
HOUSEHOLD_2_515 | 630.40 |
HOUSEHOLD_2_516 | 2574.00 |
3049 行 × 1 列
为了获得权重,我们将单个项的销售额除以所有项的总销售额进行归一化。
weights = total_sales_usd / total_sales_usd.sum()
weights = weights.rename(columns={"sales_usd_sum": "weights"})
weights
weights | |
---|---|
item_id | |
FOODS_1_001 | 0.000090 |
FOODS_1_002 | 0.000318 |
FOODS_1_003 | 0.000152 |
FOODS_1_004 | 0.001389 |
FOODS_1_005 | 0.000458 |
... | ... |
HOUSEHOLD_2_512 | 0.000155 |
HOUSEHOLD_2_513 | 0.000068 |
HOUSEHOLD_2_514 | 0.000245 |
HOUSEHOLD_2_515 | 0.000016 |
HOUSEHOLD_2_516 | 0.000066 |
3049 行 × 1 列
# No longer needed
del grid_df["day_id_int"]
生成滞后特征#
滞后特征是目标变量在先前时间戳的值。滞后特征很有用,因为过去发生的事情通常会影响将来发生的事情。在我们的示例中,我们通过读取 X 天前的销售额来生成滞后特征,其中 X = 28, 29, …, 42。
SHIFT_DAY = 28
LAG_DAYS = [col for col in range(SHIFT_DAY, SHIFT_DAY + 15)]
# Need to first ensure that rows in each time series are sorted by day_id
grid_df_lags = grid_df[["id", "day_id", "sales"]].copy()
grid_df_lags = grid_df_lags.sort_values(["id", "day_id"])
grid_df_lags = grid_df_lags.assign(
**{
f"sales_lag_{ld}": grid_df_lags.groupby(["id"])["sales"].shift(ld)
for ld in LAG_DAYS
}
)
grid_df_lags
id | day_id | sales | sales_lag_28 | sales_lag_29 | sales_lag_30 | sales_lag_31 | sales_lag_32 | sales_lag_33 | sales_lag_34 | sales_lag_35 | sales_lag_36 | sales_lag_37 | sales_lag_38 | sales_lag_39 | sales_lag_40 | sales_lag_41 | sales_lag_42 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
34023 | FOODS_1_001_CA_1_evaluation | d_1 | 3.0 | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> |
34024 | FOODS_1_001_CA_1_evaluation | d_2 | 0.0 | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> |
34025 | FOODS_1_001_CA_1_evaluation | d_3 | 0.0 | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> |
34026 | FOODS_1_001_CA_1_evaluation | d_4 | 1.0 | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> |
34027 | FOODS_1_001_CA_1_evaluation | d_5 | 4.0 | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
47733744 | HOUSEHOLD_2_516_WI_3_evaluation | d_1965 | NaN | 0.0 | 0.0 | 0.0 | 0.0 | 1.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 |
47733745 | HOUSEHOLD_2_516_WI_3_evaluation | d_1966 | NaN | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 1.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 |
47733746 | HOUSEHOLD_2_516_WI_3_evaluation | d_1967 | NaN | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 1.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 |
47733747 | HOUSEHOLD_2_516_WI_3_evaluation | d_1968 | NaN | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 1.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 |
47733748 | HOUSEHOLD_2_516_WI_3_evaluation | d_1969 | NaN | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 1.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 |
47735397 行 × 18 列
计算滚动窗口统计量#
在前面的单元格中,我们使用单个时间戳的销售值来生成滞后特征。为了捕获有关过去的更丰富信息,我们还通过计算滚动窗口统计量来获取多个时间戳上的销售值分布。滚动窗口统计量补充了滞后特征,并提供了有关目标变量过去行为的更多信息。
在时间序列预测的特征工程简介中阅读有关滞后特征和滚动窗口统计量的更多信息。
# Shift by 28 days and apply windows of various sizes
print(f"Shift size: {SHIFT_DAY}")
for i in [7, 14, 30, 60, 180]:
print(f" Window size: {i}")
grid_df_lags[f"rolling_mean_{i}"] = (
grid_df_lags.groupby(["id"])["sales"]
.shift(SHIFT_DAY)
.rolling(i)
.mean()
.astype(np.float32)
)
grid_df_lags[f"rolling_std_{i}"] = (
grid_df_lags.groupby(["id"])["sales"]
.shift(SHIFT_DAY)
.rolling(i)
.std()
.astype(np.float32)
)
Shift size: 28
Window size: 7
Window size: 14
Window size: 30
Window size: 60
Window size: 180
grid_df_lags.columns
Index(['id', 'day_id', 'sales', 'sales_lag_28', 'sales_lag_29', 'sales_lag_30',
'sales_lag_31', 'sales_lag_32', 'sales_lag_33', 'sales_lag_34',
'sales_lag_35', 'sales_lag_36', 'sales_lag_37', 'sales_lag_38',
'sales_lag_39', 'sales_lag_40', 'sales_lag_41', 'sales_lag_42',
'rolling_mean_7', 'rolling_std_7', 'rolling_mean_14', 'rolling_std_14',
'rolling_mean_30', 'rolling_std_30', 'rolling_mean_60',
'rolling_std_60', 'rolling_mean_180', 'rolling_std_180'],
dtype='object')
grid_df_lags.dtypes
id category
day_id category
sales float32
sales_lag_28 float32
sales_lag_29 float32
sales_lag_30 float32
sales_lag_31 float32
sales_lag_32 float32
sales_lag_33 float32
sales_lag_34 float32
sales_lag_35 float32
sales_lag_36 float32
sales_lag_37 float32
sales_lag_38 float32
sales_lag_39 float32
sales_lag_40 float32
sales_lag_41 float32
sales_lag_42 float32
rolling_mean_7 float32
rolling_std_7 float32
rolling_mean_14 float32
rolling_std_14 float32
rolling_mean_30 float32
rolling_std_30 float32
rolling_mean_60 float32
rolling_std_60 float32
rolling_mean_180 float32
rolling_std_180 float32
dtype: object
grid_df_lags
id | day_id | sales | sales_lag_28 | sales_lag_29 | sales_lag_30 | sales_lag_31 | sales_lag_32 | sales_lag_33 | sales_lag_34 | ... | rolling_mean_7 | rolling_std_7 | rolling_mean_14 | rolling_std_14 | rolling_mean_30 | rolling_std_30 | rolling_mean_60 | rolling_std_60 | rolling_mean_180 | rolling_std_180 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
34023 | FOODS_1_001_CA_1_evaluation | d_1 | 3.0 | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | ... | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> |
34024 | FOODS_1_001_CA_1_evaluation | d_2 | 0.0 | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | ... | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> |
34025 | FOODS_1_001_CA_1_evaluation | d_3 | 0.0 | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | ... | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> |
34026 | FOODS_1_001_CA_1_evaluation | d_4 | 1.0 | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | ... | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> |
34027 | FOODS_1_001_CA_1_evaluation | d_5 | 4.0 | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | ... | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> | <NA> |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
47733744 | HOUSEHOLD_2_516_WI_3_evaluation | d_1965 | NaN | 0.0 | 0.0 | 0.0 | 0.0 | 1.0 | 0.0 | 0.0 | ... | 0.142857149 | 0.377964467 | 0.071428575 | 0.267261237 | 0.06666667 | 0.253708124 | 0.033333335 | 0.181020334 | 0.077777781 | 0.288621366 |
47733745 | HOUSEHOLD_2_516_WI_3_evaluation | d_1966 | NaN | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 1.0 | 0.0 | ... | 0.142857149 | 0.377964467 | 0.071428575 | 0.267261237 | 0.06666667 | 0.253708124 | 0.033333335 | 0.181020334 | 0.077777781 | 0.288621366 |
47733746 | HOUSEHOLD_2_516_WI_3_evaluation | d_1967 | NaN | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 1.0 | ... | 0.142857149 | 0.377964467 | 0.071428575 | 0.267261237 | 0.06666667 | 0.253708124 | 0.033333335 | 0.181020334 | 0.077777781 | 0.288621366 |
47733747 | HOUSEHOLD_2_516_WI_3_evaluation | d_1968 | NaN | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | ... | 0.0 | 0.0 | 0.071428575 | 0.267261237 | 0.06666667 | 0.253708124 | 0.033333335 | 0.181020334 | 0.077777781 | 0.288621366 |
47733748 | HOUSEHOLD_2_516_WI_3_evaluation | d_1969 | NaN | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | ... | 0.0 | 0.0 | 0.071428575 | 0.267261237 | 0.06666667 | 0.253708124 | 0.033333335 | 0.181020334 | 0.077777781 | 0.288621366 |
47735397 行 × 28 列
目标编码#
分类变量对许多机器学习算法(例如 XGBoost)带来了挑战。克服此挑战的一种方法是使用目标编码,我们将分类变量替换为目标变量的统计量来对其进行编码。在此示例中,我们将使用均值和标准差。
在目标编码分类变量中阅读有关目标编码的更多信息。
icols = [["store_id", "dept_id"], ["item_id", "state_id"]]
new_columns = []
grid_df_target_enc = grid_df[
["id", "day_id", "item_id", "state_id", "store_id", "dept_id", "sales"]
].copy()
grid_df_target_enc["sales"].fillna(value=0, inplace=True)
for col in icols:
print(f"Encoding columns {col}")
col_name = "_" + "_".join(col) + "_"
grid_df_target_enc["enc" + col_name + "mean"] = (
grid_df_target_enc.groupby(col)["sales"].transform("mean").astype(np.float32)
)
grid_df_target_enc["enc" + col_name + "std"] = (
grid_df_target_enc.groupby(col)["sales"].transform("std").astype(np.float32)
)
new_columns.extend(["enc" + col_name + "mean", "enc" + col_name + "std"])
Encoding columns ['store_id', 'dept_id']
Encoding columns ['item_id', 'state_id']
grid_df_target_enc = grid_df_target_enc[["id", "day_id"] + new_columns]
grid_df_target_enc
id | day_id | enc_store_id_dept_id_mean | enc_store_id_dept_id_std | enc_item_id_state_id_mean | enc_item_id_state_id_std | |
---|---|---|---|---|---|---|
0 | FOODS_1_001_WI_2_evaluation | d_809 | 1.492988 | 3.987657 | 0.433553 | 0.851153 |
1 | FOODS_1_001_WI_2_evaluation | d_810 | 1.492988 | 3.987657 | 0.433553 | 0.851153 |
2 | FOODS_1_001_WI_2_evaluation | d_811 | 1.492988 | 3.987657 | 0.433553 | 0.851153 |
3 | FOODS_1_001_WI_2_evaluation | d_812 | 1.492988 | 3.987657 | 0.433553 | 0.851153 |
4 | FOODS_1_001_WI_2_evaluation | d_813 | 1.492988 | 3.987657 | 0.433553 | 0.851153 |
... | ... | ... | ... | ... | ... | ... |
47735392 | HOUSEHOLD_2_516_WI_3_evaluation | d_52 | 0.257027 | 0.661541 | 0.082084 | 0.299445 |
47735393 | HOUSEHOLD_2_516_WI_3_evaluation | d_53 | 0.257027 | 0.661541 | 0.082084 | 0.299445 |
47735394 | HOUSEHOLD_2_516_WI_3_evaluation | d_54 | 0.257027 | 0.661541 | 0.082084 | 0.299445 |
47735395 | HOUSEHOLD_2_516_WI_3_evaluation | d_55 | 0.257027 | 0.661541 | 0.082084 | 0.299445 |
47735396 | HOUSEHOLD_2_516_WI_3_evaluation | d_49 | 0.257027 | 0.661541 | 0.082084 | 0.299445 |
47735397 行 × 6 列
grid_df_target_enc.dtypes
id category
day_id category
enc_store_id_dept_id_mean float32
enc_store_id_dept_id_std float32
enc_item_id_state_id_mean float32
enc_item_id_state_id_std float32
dtype: object
按商店和产品部门过滤并创建数据段#
在合并了前面 notebook 中生成的所有列后,我们按 store_id
和 dept_id
过滤数据集中的行并创建一个数据段。每个数据段都保存为 pickle 文件,然后上传到 Cloud Storage。
segmented_data_dir = pathlib.Path("./segmented_data/")
segmented_data_dir.mkdir(exist_ok=True)
STORES = [
"CA_1",
"CA_2",
"CA_3",
"CA_4",
"TX_1",
"TX_2",
"TX_3",
"WI_1",
"WI_2",
"WI_3",
]
DEPTS = [
"HOBBIES_1",
"HOBBIES_2",
"HOUSEHOLD_1",
"HOUSEHOLD_2",
"FOODS_1",
"FOODS_2",
"FOODS_3",
]
grid2_colnm = [
"sell_price",
"price_max",
"price_min",
"price_std",
"price_mean",
"price_norm",
"price_nunique",
"item_nunique",
"price_momentum",
"price_momentum_m",
"price_momentum_y",
]
grid3_colnm = [
"event_name_1",
"event_type_1",
"event_name_2",
"event_type_2",
"snap_CA",
"snap_TX",
"snap_WI",
"tm_d",
"tm_w",
"tm_m",
"tm_y",
"tm_wm",
"tm_dw",
"tm_w_end",
]
lag_colnm = [
"sales_lag_28",
"sales_lag_29",
"sales_lag_30",
"sales_lag_31",
"sales_lag_32",
"sales_lag_33",
"sales_lag_34",
"sales_lag_35",
"sales_lag_36",
"sales_lag_37",
"sales_lag_38",
"sales_lag_39",
"sales_lag_40",
"sales_lag_41",
"sales_lag_42",
"rolling_mean_7",
"rolling_std_7",
"rolling_mean_14",
"rolling_std_14",
"rolling_mean_30",
"rolling_std_30",
"rolling_mean_60",
"rolling_std_60",
"rolling_mean_180",
"rolling_std_180",
]
target_enc_colnm = [
"enc_store_id_dept_id_mean",
"enc_store_id_dept_id_std",
"enc_item_id_state_id_mean",
"enc_item_id_state_id_std",
]
def prepare_data(store, dept=None):
"""
Filter and clean data according to stores and product departments
Parameters
----------
store: Filter data by retaining rows whose store_id matches this parameter.
dept: Filter data by retaining rows whose dept_id matches this parameter.
This parameter can be set to None to indicate that we shouldn't filter by dept_id.
"""
if store is None:
raise ValueError("store parameter must not be None")
if dept is None:
grid1 = grid_df[grid_df["store_id"] == store]
else:
grid1 = grid_df[
(grid_df["store_id"] == store) & (grid_df["dept_id"] == dept)
].drop(columns=["dept_id"])
grid1 = grid1.drop(columns=["release_week", "wm_yr_wk", "store_id", "state_id"])
grid2 = grid_df_with_price[["id", "day_id"] + grid2_colnm]
grid_combined = grid1.merge(grid2, on=["id", "day_id"], how="left")
del grid1, grid2
grid3 = grid_df_with_calendar[["id", "day_id"] + grid3_colnm]
grid_combined = grid_combined.merge(grid3, on=["id", "day_id"], how="left")
del grid3
lag_df = grid_df_lags[["id", "day_id"] + lag_colnm]
grid_combined = grid_combined.merge(lag_df, on=["id", "day_id"], how="left")
del lag_df
target_enc_df = grid_df_target_enc[["id", "day_id"] + target_enc_colnm]
grid_combined = grid_combined.merge(target_enc_df, on=["id", "day_id"], how="left")
del target_enc_df
gc.collect()
grid_combined = grid_combined.drop(columns=["id"])
grid_combined["day_id"] = (
grid_combined["day_id"]
.to_pandas()
.astype("str")
.apply(lambda x: x[2:])
.astype(np.int16)
)
return grid_combined
# First save the segment to the disk
for store in STORES:
print(f"Processing store {store}...")
segment_df = prepare_data(store=store)
segment_df.to_pandas().to_pickle(
segmented_data_dir / f"combined_df_store_{store}.pkl"
)
del segment_df
gc.collect()
for store in STORES:
for dept in DEPTS:
print(f"Processing (store {store}, department {dept})...")
segment_df = prepare_data(store=store, dept=dept)
segment_df.to_pandas().to_pickle(
segmented_data_dir / f"combined_df_store_{store}_dept_{dept}.pkl"
)
del segment_df
gc.collect()
Processing store CA_1...
Processing store CA_2...
Processing store CA_3...
Processing store CA_4...
Processing store TX_1...
Processing store TX_2...
Processing store TX_3...
Processing store WI_1...
Processing store WI_2...
Processing store WI_3...
Processing (store CA_1, department HOBBIES_1)...
Processing (store CA_1, department HOBBIES_2)...
Processing (store CA_1, department HOUSEHOLD_1)...
Processing (store CA_1, department HOUSEHOLD_2)...
Processing (store CA_1, department FOODS_1)...
Processing (store CA_1, department FOODS_2)...
Processing (store CA_1, department FOODS_3)...
Processing (store CA_2, department HOBBIES_1)...
Processing (store CA_2, department HOBBIES_2)...
Processing (store CA_2, department HOUSEHOLD_1)...
Processing (store CA_2, department HOUSEHOLD_2)...
Processing (store CA_2, department FOODS_1)...
Processing (store CA_2, department FOODS_2)...
Processing (store CA_2, department FOODS_3)...
Processing (store CA_3, department HOBBIES_1)...
Processing (store CA_3, department HOBBIES_2)...
Processing (store CA_3, department HOUSEHOLD_1)...
Processing (store CA_3, department HOUSEHOLD_2)...
Processing (store CA_3, department FOODS_1)...
Processing (store CA_3, department FOODS_2)...
Processing (store CA_3, department FOODS_3)...
Processing (store CA_4, department HOBBIES_1)...
Processing (store CA_4, department HOBBIES_2)...
Processing (store CA_4, department HOUSEHOLD_1)...
Processing (store CA_4, department HOUSEHOLD_2)...
Processing (store CA_4, department FOODS_1)...
Processing (store CA_4, department FOODS_2)...
Processing (store CA_4, department FOODS_3)...
Processing (store TX_1, department HOBBIES_1)...
Processing (store TX_1, department HOBBIES_2)...
Processing (store TX_1, department HOUSEHOLD_1)...
Processing (store TX_1, department HOUSEHOLD_2)...
Processing (store TX_1, department FOODS_1)...
Processing (store TX_1, department FOODS_2)...
Processing (store TX_1, department FOODS_3)...
Processing (store TX_2, department HOBBIES_1)...
Processing (store TX_2, department HOBBIES_2)...
Processing (store TX_2, department HOUSEHOLD_1)...
Processing (store TX_2, department HOUSEHOLD_2)...
Processing (store TX_2, department FOODS_1)...
Processing (store TX_2, department FOODS_2)...
Processing (store TX_2, department FOODS_3)...
Processing (store TX_3, department HOBBIES_1)...
Processing (store TX_3, department HOBBIES_2)...
Processing (store TX_3, department HOUSEHOLD_1)...
Processing (store TX_3, department HOUSEHOLD_2)...
Processing (store TX_3, department FOODS_1)...
Processing (store TX_3, department FOODS_2)...
Processing (store TX_3, department FOODS_3)...
Processing (store WI_1, department HOBBIES_1)...
Processing (store WI_1, department HOBBIES_2)...
Processing (store WI_1, department HOUSEHOLD_1)...
Processing (store WI_1, department HOUSEHOLD_2)...
Processing (store WI_1, department FOODS_1)...
Processing (store WI_1, department FOODS_2)...
Processing (store WI_1, department FOODS_3)...
Processing (store WI_2, department HOBBIES_1)...
Processing (store WI_2, department HOBBIES_2)...
Processing (store WI_2, department HOUSEHOLD_1)...
Processing (store WI_2, department HOUSEHOLD_2)...
Processing (store WI_2, department FOODS_1)...
Processing (store WI_2, department FOODS_2)...
Processing (store WI_2, department FOODS_3)...
Processing (store WI_3, department HOBBIES_1)...
Processing (store WI_3, department HOBBIES_2)...
Processing (store WI_3, department HOUSEHOLD_1)...
Processing (store WI_3, department HOUSEHOLD_2)...
Processing (store WI_3, department FOODS_1)...
Processing (store WI_3, department FOODS_2)...
Processing (store WI_3, department FOODS_3)...
# Then copy the segment to Cloud Storage
fs = gcsfs.GCSFileSystem()
for e in segmented_data_dir.glob("*.pkl"):
print(f"Uploading {e}...")
basename = e.name
fs.put_file(e, f"{bucket_name}/{basename}")
Uploading segmented_data/combined_df_store_CA_3_dept_HOBBIES_2.pkl...
Uploading segmented_data/combined_df_store_TX_3_dept_FOODS_3.pkl...
Uploading segmented_data/combined_df_store_CA_1_dept_HOUSEHOLD_1.pkl...
Uploading segmented_data/combined_df_store_TX_3_dept_HOBBIES_1.pkl...
Uploading segmented_data/combined_df_store_WI_2_dept_HOUSEHOLD_2.pkl...
Uploading segmented_data/combined_df_store_TX_3_dept_HOUSEHOLD_1.pkl...
Uploading segmented_data/combined_df_store_WI_1_dept_HOUSEHOLD_2.pkl...
Uploading segmented_data/combined_df_store_CA_3_dept_HOBBIES_1.pkl...
Uploading segmented_data/combined_df_store_CA_1_dept_FOODS_3.pkl...
Uploading segmented_data/combined_df_store_TX_1_dept_HOBBIES_2.pkl...
Uploading segmented_data/combined_df_store_TX_2_dept_FOODS_3.pkl...
Uploading segmented_data/combined_df_store_CA_2_dept_FOODS_3.pkl...
Uploading segmented_data/combined_df_store_WI_3.pkl...
Uploading segmented_data/combined_df_store_TX_1_dept_HOUSEHOLD_2.pkl...
Uploading segmented_data/combined_df_store_WI_3_dept_FOODS_3.pkl...
Uploading segmented_data/combined_df_store_WI_2_dept_FOODS_1.pkl...
Uploading segmented_data/combined_df_store_TX_1_dept_FOODS_3.pkl...
Uploading segmented_data/combined_df_store_CA_3_dept_FOODS_3.pkl...
Uploading segmented_data/combined_df_store_WI_1_dept_HOBBIES_1.pkl...
Uploading segmented_data/combined_df_store_CA_1_dept_FOODS_2.pkl...
Uploading segmented_data/combined_df_store_TX_1_dept_HOBBIES_1.pkl...
Uploading segmented_data/combined_df_store_CA_3_dept_HOUSEHOLD_2.pkl...
Uploading segmented_data/combined_df_store_TX_1_dept_FOODS_2.pkl...
Uploading segmented_data/combined_df_store_CA_1.pkl...
Uploading segmented_data/combined_df_store_CA_2_dept_FOODS_2.pkl...
Uploading segmented_data/combined_df_store_TX_2_dept_HOUSEHOLD_2.pkl...
Uploading segmented_data/combined_df_store_WI_2.pkl...
Uploading segmented_data/combined_df_store_CA_4_dept_HOUSEHOLD_1.pkl...
Uploading segmented_data/combined_df_store_CA_3_dept_FOODS_1.pkl...
Uploading segmented_data/combined_df_store_WI_1.pkl...
Uploading segmented_data/combined_df_store_WI_1_dept_FOODS_1.pkl...
Uploading segmented_data/combined_df_store_CA_4_dept_FOODS_3.pkl...
Uploading segmented_data/combined_df_store_CA_2_dept_HOUSEHOLD_2.pkl...
Uploading segmented_data/combined_df_store_WI_2_dept_FOODS_2.pkl...
Uploading segmented_data/combined_df_store_CA_2_dept_FOODS_1.pkl...
Uploading segmented_data/combined_df_store_CA_1_dept_FOODS_1.pkl...
Uploading segmented_data/combined_df_store_TX_3.pkl...
Uploading segmented_data/combined_df_store_WI_1_dept_HOBBIES_2.pkl...
Uploading segmented_data/combined_df_store_CA_4.pkl...
Uploading segmented_data/combined_df_store_CA_1_dept_HOBBIES_1.pkl...
Uploading segmented_data/combined_df_store_WI_3_dept_HOUSEHOLD_1.pkl...
Uploading segmented_data/combined_df_store_CA_4_dept_HOUSEHOLD_2.pkl...
Uploading segmented_data/combined_df_store_CA_2_dept_HOBBIES_1.pkl...
Uploading segmented_data/combined_df_store_CA_2_dept_HOUSEHOLD_1.pkl...
Uploading segmented_data/combined_df_store_CA_4_dept_FOODS_1.pkl...
Uploading segmented_data/combined_df_store_WI_1_dept_HOUSEHOLD_1.pkl...
Uploading segmented_data/combined_df_store_CA_3_dept_FOODS_2.pkl...
Uploading segmented_data/combined_df_store_WI_1_dept_FOODS_2.pkl...
Uploading segmented_data/combined_df_store_WI_3_dept_HOBBIES_2.pkl...
Uploading segmented_data/combined_df_store_WI_3_dept_HOBBIES_1.pkl...
Uploading segmented_data/combined_df_store_WI_3_dept_HOUSEHOLD_2.pkl...
Uploading segmented_data/combined_df_store_TX_1_dept_FOODS_1.pkl...
Uploading segmented_data/combined_df_store_CA_3.pkl...
Uploading segmented_data/combined_df_store_TX_2.pkl...
Uploading segmented_data/combined_df_store_WI_2_dept_FOODS_3.pkl...
Uploading segmented_data/combined_df_store_CA_1_dept_HOUSEHOLD_2.pkl...
Uploading segmented_data/combined_df_store_WI_3_dept_FOODS_2.pkl...
Uploading segmented_data/combined_df_store_TX_2_dept_HOUSEHOLD_1.pkl...
Uploading segmented_data/combined_df_store_WI_2_dept_HOBBIES_1.pkl...
Uploading segmented_data/combined_df_store_CA_4_dept_HOBBIES_1.pkl...
Uploading segmented_data/combined_df_store_WI_2_dept_HOUSEHOLD_1.pkl...
Uploading segmented_data/combined_df_store_CA_4_dept_HOBBIES_2.pkl...
Uploading segmented_data/combined_df_store_TX_1_dept_HOUSEHOLD_1.pkl...
Uploading segmented_data/combined_df_store_WI_3_dept_FOODS_1.pkl...
Uploading segmented_data/combined_df_store_TX_3_dept_HOBBIES_2.pkl...
Uploading segmented_data/combined_df_store_CA_2_dept_HOBBIES_2.pkl...
Uploading segmented_data/combined_df_store_WI_1_dept_FOODS_3.pkl...
Uploading segmented_data/combined_df_store_TX_2_dept_FOODS_1.pkl...
Uploading segmented_data/combined_df_store_WI_2_dept_HOBBIES_2.pkl...
Uploading segmented_data/combined_df_store_CA_4_dept_FOODS_2.pkl...
Uploading segmented_data/combined_df_store_TX_3_dept_HOUSEHOLD_2.pkl...
Uploading segmented_data/combined_df_store_TX_2_dept_HOBBIES_2.pkl...
Uploading segmented_data/combined_df_store_TX_2_dept_HOBBIES_1.pkl...
Uploading segmented_data/combined_df_store_TX_2_dept_FOODS_2.pkl...
Uploading segmented_data/combined_df_store_TX_3_dept_FOODS_2.pkl...
Uploading segmented_data/combined_df_store_CA_3_dept_HOUSEHOLD_1.pkl...
Uploading segmented_data/combined_df_store_CA_1_dept_HOBBIES_2.pkl...
Uploading segmented_data/combined_df_store_TX_1.pkl...
Uploading segmented_data/combined_df_store_TX_3_dept_FOODS_1.pkl...
Uploading segmented_data/combined_df_store_CA_2.pkl...
# Also upload the product weights
fs = gcsfs.GCSFileSystem()
weights.to_pandas().to_pickle("product_weights.pkl")
fs.put_file("product_weights.pkl", f"{bucket_name}/product_weights.pkl")
使用超参数优化 (HPO) 进行训练和评估#
现在我们已经完成数据处理,可以训练模型来预测未来销售额了。我们将利用 worker pod 并行运行多个训练作业,以加快超参数搜索速度。
导入模块并定义常量#
import copy
import gc
import json
import pickle
import time
import cudf
import gcsfs
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import optuna
import pandas as pd
import xgboost as xgb
from dask.distributed import Client, wait
from dask_kubernetes.operator import KubeCluster
from matplotlib.patches import Patch
# Choose the same RAPIDS image you used for launching the notebook session rapids_image = "nvcr.io/nvidia/rapidsai/notebooks:25.04-cuda12.8-py3.12" # Use the number of worker nodes in your Kubernetes cluster. n_workers = 2 # Bucket that contains the processed data pickles bucket_name = "<Put the name of the bucket here>" bucket_name = "phcho-m5-competition-hpo-example" # List of stores and product departments STORES = [ "CA_1", "CA_2", "CA_3", "CA_4", "TX_1", "TX_2", "TX_3", "WI_1", "WI_2", "WI_3", ] DEPTS = [ "HOBBIES_1", "HOBBIES_2", "HOUSEHOLD_1", "HOUSEHOLD_2", "FOODS_1", "FOODS_2", "FOODS_3", ]
定义交叉验证折叠#
交叉验证是一种统计方法,用于估计机器学习模型对独立数据集的泛化能力。该方法也适用于评估给定模型超参数组合的选择。
为了估计泛化能力,我们定义了由多对 (训练集, 验证集)
组成的多个交叉验证折叠。对于每个折叠,我们使用训练集拟合模型,并在验证集上评估其准确性。给定超参数组合的“优劣”分数是模型在每个验证集上的准确性,对所有交叉验证折叠取平均值。
为时间序列数据定义交叉验证折叠时必须非常小心。不允许使用未来来预测过去,因此训练集在时间上必须先于验证集。因此,我们在时间维度上划分数据集,并使用时间范围分配训练集和验证集
# Cross-validation folds and held-out test set (in time dimension)
# The held-out test set is used for final evaluation
cv_folds = [ # (train_set, validation_set)
([0, 1114], [1114, 1314]),
([0, 1314], [1314, 1514]),
([0, 1514], [1514, 1714]),
([0, 1714], [1714, 1914]),
]
n_folds = len(cv_folds)
holdout = [1914, 1942]
time_horizon = 1942
使用 Matplotlib 可视化交叉验证折叠很有帮助。
cv_cmap = matplotlib.colormaps["cividis"]
plt.figure(figsize=(8, 3))
for i, (train_mask, valid_mask) in enumerate(cv_folds):
idx = np.array([np.nan] * time_horizon)
idx[np.arange(*train_mask)] = 1
idx[np.arange(*valid_mask)] = 0
plt.scatter(
range(time_horizon),
[i + 0.5] * time_horizon,
c=idx,
marker="_",
capstyle="butt",
s=1,
lw=20,
cmap=cv_cmap,
vmin=-1.5,
vmax=1.5,
)
idx = np.array([np.nan] * time_horizon)
idx[np.arange(*holdout)] = -1
plt.scatter(
range(time_horizon),
[n_folds + 0.5] * time_horizon,
c=idx,
marker="_",
capstyle="butt",
s=1,
lw=20,
cmap=cv_cmap,
vmin=-1.5,
vmax=1.5,
)
plt.xlabel("Time")
plt.yticks(
ticks=np.arange(n_folds + 1) + 0.5,
labels=[f"Fold {i}" for i in range(n_folds)] + ["Holdout"],
)
plt.ylim([len(cv_folds) + 1.2, -0.2])
norm = matplotlib.colors.Normalize(vmin=-1.5, vmax=1.5)
plt.legend(
[
Patch(color=cv_cmap(norm(1))),
Patch(color=cv_cmap(norm(0))),
Patch(color=cv_cmap(norm(-1))),
],
["Training set", "Validation set", "Held-out test set"],
ncol=3,
loc="best",
)
plt.tight_layout()

在 Kubernetes 上启动 Dask 客户端#
让我们使用 KubeCluster
类来设置 Dask 集群。
cluster = KubeCluster(
name="rapids-dask",
image=rapids_image,
worker_command="dask-cuda-worker",
n_workers=n_workers,
resources={"limits": {"nvidia.com/gpu": "1"}},
env={"EXTRA_PIP_PACKAGES": "optuna gcsfs"},
)
cluster
client = Client(cluster)
client
Client
Client-dcc90a0f-5e32-11ee-8529-fa610e3dbb88
连接方法: Cluster object | 集群类型: dask_kubernetes.KubeCluster |
仪表盘: http://rapids-dask-scheduler.kubeflow-user-example-com:8787/status |
集群信息
KubeCluster
rapids-dask
仪表盘: http://rapids-dask-scheduler.kubeflow-user-example-com:8787/status | Worker 1 |
总线程数 1 | 总内存: 83.48 GiB |
调度器信息
调度器
Scheduler-ca08b554-394b-419d-b380-7b3ccdfd882f
通信: tcp://10.36.0.26:8786 | Worker 1 |
仪表盘: http://10.36.0.26:8787/status | 总线程数 1 |
启动时间: 刚刚 | 总内存: 83.48 GiB |
Worker
Worker: rapids-dask-default-worker-4f5ea8bc10
通信: tcp://10.36.2.23:34385 | 总线程数 1 |
仪表盘: http://10.36.2.23:8788/status | 内存: 83.48 GiB |
Nanny: tcp://10.36.2.23:44783 | |
本地目录: /tmp/dask-scratch-space/worker-h2_cxa0d | |
GPU:NVIDIA A100-SXM4-40GB | GPU 内存: 40.00 GiB |
定义自定义评估指标#
M5 预测竞赛定义了一个称为 WRMSSE 的自定义指标,如下所示
即 WRMSEE 是所有产品项 \(i\) 的 RMSSE 的加权和。RMSSE 又定义为
其中预测(预报)的平方误差按训练数据中销售额单位变化的速率进行归一化。
这里是使用 cuDF 实现的 WRMSSE。我们使用第一个预处理 notebook 中计算的产品权重 \(w_i\)。
def wrmsse(product_weights, df, pred_sales, train_mask, valid_mask):
"""Compute WRMSSE metric"""
df_train = df[(df["day_id"] >= train_mask[0]) & (df["day_id"] < train_mask[1])]
df_valid = df[(df["day_id"] >= valid_mask[0]) & (df["day_id"] < valid_mask[1])]
# Compute denominator: 1/(n-1) * sum( (y(t) - y(t-1))**2 )
diff = (
df_train.sort_values(["item_id", "day_id"])
.groupby(["item_id"])[["sales"]]
.diff(1)
)
x = (
df_train[["item_id", "day_id"]]
.join(diff, how="left")
.rename(columns={"sales": "diff"})
.sort_values(["item_id", "day_id"])
)
x["diff"] = x["diff"] ** 2
xx = x.groupby(["item_id"])[["diff"]].agg(["sum", "count"]).sort_index()
xx.columns = xx.columns.map("_".join)
xx["denominator"] = xx["diff_sum"] / xx["diff_count"]
xx.reset_index()
# Compute numerator: 1/h * sum( (y(t) - y_pred(t))**2 )
X_valid = df_valid.drop(columns=["item_id", "cat_id", "day_id", "sales"])
if "dept_id" in X_valid.columns:
X_valid = X_valid.drop(columns=["dept_id"])
df_pred = cudf.DataFrame(
{
"item_id": df_valid["item_id"].copy(),
"pred_sales": pred_sales,
"sales": df_valid["sales"].copy(),
}
)
df_pred["diff"] = (df_pred["sales"] - df_pred["pred_sales"]) ** 2
yy = df_pred.groupby(["item_id"])[["diff"]].agg(["sum", "count"]).sort_index()
yy.columns = yy.columns.map("_".join)
yy["numerator"] = yy["diff_sum"] / yy["diff_count"]
zz = yy[["numerator"]].join(xx[["denominator"]], how="left")
zz = zz.join(product_weights, how="left").sort_index()
# Filter out zero denominator.
# This can occur if the product was never on sale during the period in the training set
zz = zz[zz["denominator"] != 0]
zz["rmsse"] = np.sqrt(zz["numerator"] / zz["denominator"])
return zz["rmsse"].multiply(zz["weights"]).sum()
使用 Optuna 定义训练和超参数搜索管道#
Optuna 允许我们迭代地定义训练过程,就好像我们要编写一个普通函数来训练单个模型一样。该函数不再使用固定的超参数组合,而是接受一个 trial
对象,该对象会产生不同的超参数组合。
在此示例中,我们根据商店划分训练数据,然后为每个数据段拟合一个单独的 XGBoost 模型。
def objective(trial):
fs = gcsfs.GCSFileSystem()
with fs.open(f"{bucket_name}/product_weights.pkl", "rb") as f:
product_weights = cudf.DataFrame(pd.read_pickle(f))
params = {
"n_estimators": 100,
"verbosity": 0,
"learning_rate": 0.01,
"objective": "reg:tweedie",
"tree_method": "gpu_hist",
"grow_policy": "depthwise",
"predictor": "gpu_predictor",
"enable_categorical": True,
"lambda": trial.suggest_float("lambda", 1e-8, 100.0, log=True),
"alpha": trial.suggest_float("alpha", 1e-8, 100.0, log=True),
"colsample_bytree": trial.suggest_float("colsample_bytree", 0.2, 1.0),
"max_depth": trial.suggest_int("max_depth", 2, 6, step=1),
"min_child_weight": trial.suggest_float(
"min_child_weight", 1e-8, 100, log=True
),
"gamma": trial.suggest_float("gamma", 1e-8, 1.0, log=True),
"tweedie_variance_power": trial.suggest_float("tweedie_variance_power", 1, 2),
}
scores = [[] for store in STORES]
for store_id, store in enumerate(STORES):
print(f"Processing store {store}...")
with fs.open(f"{bucket_name}/combined_df_store_{store}.pkl", "rb") as f:
df = cudf.DataFrame(pd.read_pickle(f))
for train_mask, valid_mask in cv_folds:
df_train = df[
(df["day_id"] >= train_mask[0]) & (df["day_id"] < train_mask[1])
]
df_valid = df[
(df["day_id"] >= valid_mask[0]) & (df["day_id"] < valid_mask[1])
]
X_train, y_train = (
df_train.drop(
columns=["item_id", "dept_id", "cat_id", "day_id", "sales"]
),
df_train["sales"],
)
X_valid = df_valid.drop(
columns=["item_id", "dept_id", "cat_id", "day_id", "sales"]
)
clf = xgb.XGBRegressor(**params)
clf.fit(X_train, y_train)
pred_sales = clf.predict(X_valid)
scores[store_id].append(
wrmsse(product_weights, df, pred_sales, train_mask, valid_mask)
)
del df_train, df_valid, X_train, y_train, clf
gc.collect()
del df
gc.collect()
# We can sum WRMSSE scores over data segments because data segments contain disjoint sets of time series
return np.array(scores).sum(axis=0).mean()
使用 Dask 集群客户端,我们可以并行执行多个训练作业。Optuna 使用内存中的 Dask 存储来跟踪超参数搜索的进度。
##### Number of hyperparameter combinations to try in parallel
n_trials = 9 # Using a small n_trials so that the demo can finish quickly
# n_trials = 100
# Optimize in parallel on your Dask cluster
backend_storage = optuna.storages.InMemoryStorage()
dask_storage = optuna.integration.DaskStorage(storage=backend_storage, client=client)
study = optuna.create_study(
direction="minimize",
sampler=optuna.samplers.RandomSampler(seed=0),
storage=dask_storage,
)
futures = []
for i in range(0, n_trials, n_workers):
iter_range = (i, min([i + n_workers, n_trials]))
futures.append(
{
"range": iter_range,
"futures": [
client.submit(
# Work around bug https://github.com/optuna/optuna/issues/4859
lambda objective, n_trials: (
study.sampler.reseed_rng(),
study.optimize(objective, n_trials),
),
objective,
n_trials=1,
pure=False,
)
for _ in range(*iter_range)
],
}
)
tstart = time.perf_counter()
for partition in futures:
iter_range = partition["range"]
print(f"Testing hyperparameter combinations {iter_range[0]}..{iter_range[1]}")
_ = wait(partition["futures"])
for fut in partition["futures"]:
_ = fut.result() # Ensure that the training job was successful
tnow = time.perf_counter()
print(
f"Best cross-validation metric: {study.best_value}, Time elapsed = {tnow - tstart}"
)
tend = time.perf_counter()
print(f"Total time elapsed = {tend - tstart}")
/tmp/ipykernel_1321/3389696366.py:7: ExperimentalWarning: DaskStorage is experimental (supported from v3.1.0). The interface can change in the future.
dask_storage = optuna.integration.DaskStorage(storage=backend_storage, client=client)
Testing hyperparameter combinations 0..2
Best cross-validation metric: 10.027767173304472, Time elapsed = 331.6198390149948
Testing hyperparameter combinations 2..4
Best cross-validation metric: 9.426913749927916, Time elapsed = 640.7606940959959
Testing hyperparameter combinations 4..6
Best cross-validation metric: 9.426913749927916, Time elapsed = 958.0816706369951
Testing hyperparameter combinations 6..8
Best cross-validation metric: 9.426913749927916, Time elapsed = 1295.700604706988
Testing hyperparameter combinations 8..9
Best cross-validation metric: 8.915009508695244, Time elapsed = 1476.1182343699911
Total time elapsed = 1476.1219055669935
超参数搜索完成后,我们使用 study
对象的属性获取最佳超参数组合。
study.best_params
{'lambda': 2.6232990699579064e-06,
'alpha': 0.004085800094564677,
'colsample_bytree': 0.4064535567263888,
'max_depth': 6,
'min_child_weight': 9.652128310148716e-08,
'gamma': 3.4446109254037165e-07,
'tweedie_variance_power': 1.0914258082324833}
study.best_trial
FrozenTrial(number=8, state=TrialState.COMPLETE, values=[8.915009508695244], datetime_start=datetime.datetime(2023, 9, 28, 19, 35, 29, 888497), datetime_complete=datetime.datetime(2023, 9, 28, 19, 38, 30, 299541), params={'lambda': 2.6232990699579064e-06, 'alpha': 0.004085800094564677, 'colsample_bytree': 0.4064535567263888, 'max_depth': 6, 'min_child_weight': 9.652128310148716e-08, 'gamma': 3.4446109254037165e-07, 'tweedie_variance_power': 1.0914258082324833}, user_attrs={}, system_attrs={}, intermediate_values={}, distributions={'lambda': FloatDistribution(high=100.0, log=True, low=1e-08, step=None), 'alpha': FloatDistribution(high=100.0, log=True, low=1e-08, step=None), 'colsample_bytree': FloatDistribution(high=1.0, log=False, low=0.2, step=None), 'max_depth': IntDistribution(high=6, log=False, low=2, step=1), 'min_child_weight': FloatDistribution(high=100.0, log=True, low=1e-08, step=None), 'gamma': FloatDistribution(high=1.0, log=True, low=1e-08, step=None), 'tweedie_variance_power': FloatDistribution(high=2.0, log=False, low=1.0, step=None)}, trial_id=8, value=None)
# Make a deep copy to preserve the dictionary after deleting the Dask cluster
best_params = copy.deepcopy(study.best_params)
best_params
{'lambda': 2.6232990699579064e-06,
'alpha': 0.004085800094564677,
'colsample_bytree': 0.4064535567263888,
'max_depth': 6,
'min_child_weight': 9.652128310148716e-08,
'gamma': 3.4446109254037165e-07,
'tweedie_variance_power': 1.0914258082324833}
fs = gcsfs.GCSFileSystem()
with fs.open(f"{bucket_name}/params.json", "w") as f:
json.dump(best_params, f)
训练最终的 XGBoost 模型并进行评估#
使用搜索中找到的最佳超参数,使用整个训练数据拟合新模型。如前一节所述,我们为每个数据段拟合一个单独的 XGBoost 模型。
fs = gcsfs.GCSFileSystem()
with fs.open(f"{bucket_name}/params.json", "r") as f:
best_params = json.load(f)
with fs.open(f"{bucket_name}/product_weights.pkl", "rb") as f:
product_weights = cudf.DataFrame(pd.read_pickle(f))
def final_train(best_params):
fs = gcsfs.GCSFileSystem()
params = {
"n_estimators": 100,
"verbosity": 0,
"learning_rate": 0.01,
"objective": "reg:tweedie",
"tree_method": "gpu_hist",
"grow_policy": "depthwise",
"predictor": "gpu_predictor",
"enable_categorical": True,
}
params.update(best_params)
model = {}
train_mask = [0, 1914]
for store in STORES:
print(f"Processing store {store}...")
with fs.open(f"{bucket_name}/combined_df_store_{store}.pkl", "rb") as f:
df = cudf.DataFrame(pd.read_pickle(f))
df_train = df[(df["day_id"] >= train_mask[0]) & (df["day_id"] < train_mask[1])]
X_train, y_train = (
df_train.drop(columns=["item_id", "dept_id", "cat_id", "day_id", "sales"]),
df_train["sales"],
)
clf = xgb.XGBRegressor(**params)
clf.fit(X_train, y_train)
model[store] = clf
del df
gc.collect()
return model
model = final_train(best_params)
Processing store CA_1...
Processing store CA_2...
Processing store CA_3...
Processing store CA_4...
Processing store TX_1...
Processing store TX_2...
Processing store TX_3...
Processing store WI_1...
Processing store WI_2...
Processing store WI_3...
现在让我们使用保留的测试集来评估最终模型
test_wrmsse = 0
for store in STORES:
with fs.open(f"{bucket_name}/combined_df_store_{store}.pkl", "rb") as f:
df = cudf.DataFrame(pd.read_pickle(f))
df_test = df[(df["day_id"] >= holdout[0]) & (df["day_id"] < holdout[1])]
X_test = df_test.drop(columns=["item_id", "dept_id", "cat_id", "day_id", "sales"])
pred_sales = model[store].predict(X_test)
test_wrmsse += wrmsse(
product_weights, df, pred_sales, train_mask=[0, 1914], valid_mask=holdout
)
print(f"WRMSSE metric on the held-out test set: {test_wrmsse}")
WRMSSE metric on the held-out test set: 9.478942050051291
# Save the model to the Cloud Storage
with fs.open(f"{bucket_name}/final_model.pkl", "wb") as f:
pickle.dump(model, f)
使用不同的销售数据分段策略创建集成模型#
创建集成模型以利用多种机器学习方法获得更好的预测性能是很常见的。集成模型通过对组成模型的预测输出取平均值来进行预测。
在此示例中,我们将通过以不同方式对销售数据进行分段来创建第二个模型。我们将不再按商店进行分割,而是同时按商店和产品类别分割数据。
def objective_alt(trial):
fs = gcsfs.GCSFileSystem()
with fs.open(f"{bucket_name}/product_weights.pkl", "rb") as f:
product_weights = cudf.DataFrame(pd.read_pickle(f))
params = {
"n_estimators": 100,
"verbosity": 0,
"learning_rate": 0.01,
"objective": "reg:tweedie",
"tree_method": "gpu_hist",
"grow_policy": "depthwise",
"predictor": "gpu_predictor",
"enable_categorical": True,
"lambda": trial.suggest_float("lambda", 1e-8, 100.0, log=True),
"alpha": trial.suggest_float("alpha", 1e-8, 100.0, log=True),
"colsample_bytree": trial.suggest_float("colsample_bytree", 0.2, 1.0),
"max_depth": trial.suggest_int("max_depth", 2, 6, step=1),
"min_child_weight": trial.suggest_float(
"min_child_weight", 1e-8, 100, log=True
),
"gamma": trial.suggest_float("gamma", 1e-8, 1.0, log=True),
"tweedie_variance_power": trial.suggest_float("tweedie_variance_power", 1, 2),
}
scores = [[] for i in range(len(STORES) * len(DEPTS))]
for store_id, store in enumerate(STORES):
for dept_id, dept in enumerate(DEPTS):
print(f"Processing store {store}, department {dept}...")
with fs.open(
f"{bucket_name}/combined_df_store_{store}_dept_{dept}.pkl", "rb"
) as f:
df = cudf.DataFrame(pd.read_pickle(f))
for train_mask, valid_mask in cv_folds:
df_train = df[
(df["day_id"] >= train_mask[0]) & (df["day_id"] < train_mask[1])
]
df_valid = df[
(df["day_id"] >= valid_mask[0]) & (df["day_id"] < valid_mask[1])
]
X_train, y_train = (
df_train.drop(columns=["item_id", "cat_id", "day_id", "sales"]),
df_train["sales"],
)
X_valid = df_valid.drop(
columns=["item_id", "cat_id", "day_id", "sales"]
)
clf = xgb.XGBRegressor(**params)
clf.fit(X_train, y_train)
sales_pred = clf.predict(X_valid)
scores[store_id * len(DEPTS) + dept_id].append(
wrmsse(product_weights, df, sales_pred, train_mask, valid_mask)
)
del df_train, df_valid, X_train, y_train, clf
gc.collect()
del df
gc.collect()
# We can sum WRMSSE scores over data segments because data segments contain disjoint sets of time series
return np.array(scores).sum(axis=0).mean()
##### Number of hyperparameter combinations to try in parallel
n_trials = 9 # Using a small n_trials so that the demo can finish quickly
# n_trials = 100
# Optimize in parallel on your Dask cluster
backend_storage = optuna.storages.InMemoryStorage()
dask_storage = optuna.integration.DaskStorage(storage=backend_storage, client=client)
study = optuna.create_study(
direction="minimize",
sampler=optuna.samplers.RandomSampler(seed=0),
storage=dask_storage,
)
futures = []
for i in range(0, n_trials, n_workers):
iter_range = (i, min([i + n_workers, n_trials]))
futures.append(
{
"range": iter_range,
"futures": [
client.submit(
# Work around bug https://github.com/optuna/optuna/issues/4859
lambda objective, n_trials: (
study.sampler.reseed_rng(),
study.optimize(objective, n_trials),
),
objective_alt,
n_trials=1,
pure=False,
)
for _ in range(*iter_range)
],
}
)
tstart = time.perf_counter()
for partition in futures:
iter_range = partition["range"]
print(f"Testing hyperparameter combinations {iter_range[0]}..{iter_range[1]}")
_ = wait(partition["futures"])
for fut in partition["futures"]:
_ = fut.result() # Ensure that the training job was successful
tnow = time.perf_counter()
print(
f"Best cross-validation metric: {study.best_value}, Time elapsed = {tnow - tstart}"
)
tend = time.perf_counter()
print(f"Total time elapsed = {tend - tstart}")
/tmp/ipykernel_1321/491731696.py:7: ExperimentalWarning: DaskStorage is experimental (supported from v3.1.0). The interface can change in the future.
dask_storage = optuna.integration.DaskStorage(storage=backend_storage, client=client)
Testing hyperparameter combinations 0..2
Best cross-validation metric: 9.896445497438858, Time elapsed = 802.2191872399999
Testing hyperparameter combinations 2..4
Best cross-validation metric: 9.896445497438858, Time elapsed = 1494.0718872279976
Testing hyperparameter combinations 4..6
Best cross-validation metric: 9.835407407395302, Time elapsed = 2393.3159628150024
Testing hyperparameter combinations 6..8
Best cross-validation metric: 9.330048901795887, Time elapsed = 3092.471466117
Testing hyperparameter combinations 8..9
Best cross-validation metric: 9.330048901795887, Time elapsed = 3459.9082761530008
Total time elapsed = 3459.911843854992
# Make a deep copy to preserve the dictionary after deleting the Dask cluster
best_params_alt = copy.deepcopy(study.best_params)
best_params_alt
{'lambda': 0.028794929327421122,
'alpha': 3.3150619134761685e-07,
'colsample_bytree': 0.42330433646728755,
'max_depth': 2,
'min_child_weight': 0.09713314395591004,
'gamma': 0.0016337599227941016,
'tweedie_variance_power': 1.1915217521234043}
fs = gcsfs.GCSFileSystem()
with fs.open(f"{bucket_name}/params_alt.json", "w") as f:
json.dump(best_params_alt, f)
使用搜索中找到的最佳超参数,使用整个训练数据拟合新模型。
def final_train_alt(best_params):
fs = gcsfs.GCSFileSystem()
params = {
"n_estimators": 100,
"verbosity": 0,
"learning_rate": 0.01,
"objective": "reg:tweedie",
"tree_method": "gpu_hist",
"grow_policy": "depthwise",
"predictor": "gpu_predictor",
"enable_categorical": True,
}
params.update(best_params)
model = {}
train_mask = [0, 1914]
for _, store in enumerate(STORES):
for _, dept in enumerate(DEPTS):
print(f"Processing store {store}, department {dept}...")
with fs.open(
f"{bucket_name}/combined_df_store_{store}_dept_{dept}.pkl", "rb"
) as f:
df = cudf.DataFrame(pd.read_pickle(f))
for train_mask, _ in cv_folds:
df_train = df[
(df["day_id"] >= train_mask[0]) & (df["day_id"] < train_mask[1])
]
X_train, y_train = (
df_train.drop(columns=["item_id", "cat_id", "day_id", "sales"]),
df_train["sales"],
)
clf = xgb.XGBRegressor(**params)
clf.fit(X_train, y_train)
model[(store, dept)] = clf
del df
gc.collect()
return model
fs = gcsfs.GCSFileSystem()
with fs.open(f"{bucket_name}/params_alt.json", "r") as f:
best_params_alt = json.load(f)
with fs.open(f"{bucket_name}/product_weights.pkl", "rb") as f:
product_weights = cudf.DataFrame(pd.read_pickle(f))
model_alt = final_train_alt(best_params_alt)
Processing store CA_1, department HOBBIES_1...
Processing store CA_1, department HOBBIES_2...
Processing store CA_1, department HOUSEHOLD_1...
Processing store CA_1, department HOUSEHOLD_2...
Processing store CA_1, department FOODS_1...
Processing store CA_1, department FOODS_2...
Processing store CA_1, department FOODS_3...
Processing store CA_2, department HOBBIES_1...
Processing store CA_2, department HOBBIES_2...
Processing store CA_2, department HOUSEHOLD_1...
Processing store CA_2, department HOUSEHOLD_2...
Processing store CA_2, department FOODS_1...
Processing store CA_2, department FOODS_2...
Processing store CA_2, department FOODS_3...
Processing store CA_3, department HOBBIES_1...
Processing store CA_3, department HOBBIES_2...
Processing store CA_3, department HOUSEHOLD_1...
Processing store CA_3, department HOUSEHOLD_2...
Processing store CA_3, department FOODS_1...
Processing store CA_3, department FOODS_2...
Processing store CA_3, department FOODS_3...
Processing store CA_4, department HOBBIES_1...
Processing store CA_4, department HOBBIES_2...
Processing store CA_4, department HOUSEHOLD_1...
Processing store CA_4, department HOUSEHOLD_2...
Processing store CA_4, department FOODS_1...
Processing store CA_4, department FOODS_2...
Processing store CA_4, department FOODS_3...
Processing store TX_1, department HOBBIES_1...
Processing store TX_1, department HOBBIES_2...
Processing store TX_1, department HOUSEHOLD_1...
Processing store TX_1, department HOUSEHOLD_2...
Processing store TX_1, department FOODS_1...
Processing store TX_1, department FOODS_2...
Processing store TX_1, department FOODS_3...
Processing store TX_2, department HOBBIES_1...
Processing store TX_2, department HOBBIES_2...
Processing store TX_2, department HOUSEHOLD_1...
Processing store TX_2, department HOUSEHOLD_2...
Processing store TX_2, department FOODS_1...
Processing store TX_2, department FOODS_2...
Processing store TX_2, department FOODS_3...
Processing store TX_3, department HOBBIES_1...
Processing store TX_3, department HOBBIES_2...
Processing store TX_3, department HOUSEHOLD_1...
Processing store TX_3, department HOUSEHOLD_2...
Processing store TX_3, department FOODS_1...
Processing store TX_3, department FOODS_2...
Processing store TX_3, department FOODS_3...
Processing store WI_1, department HOBBIES_1...
Processing store WI_1, department HOBBIES_2...
Processing store WI_1, department HOUSEHOLD_1...
Processing store WI_1, department HOUSEHOLD_2...
Processing store WI_1, department FOODS_1...
Processing store WI_1, department FOODS_2...
Processing store WI_1, department FOODS_3...
Processing store WI_2, department HOBBIES_1...
Processing store WI_2, department HOBBIES_2...
Processing store WI_2, department HOUSEHOLD_1...
Processing store WI_2, department HOUSEHOLD_2...
Processing store WI_2, department FOODS_1...
Processing store WI_2, department FOODS_2...
Processing store WI_2, department FOODS_3...
Processing store WI_3, department HOBBIES_1...
Processing store WI_3, department HOBBIES_2...
Processing store WI_3, department HOUSEHOLD_1...
Processing store WI_3, department HOUSEHOLD_2...
Processing store WI_3, department FOODS_1...
Processing store WI_3, department FOODS_2...
Processing store WI_3, department FOODS_3...
# Save the model to the Cloud Storage
with fs.open(f"{bucket_name}/final_model_alt.pkl", "wb") as f:
pickle.dump(model_alt, f)
现在考虑由两个模型 model
和 model_alt
组成的集成模型。我们通过计算两个模型预测平均值的 WRMSSE 指标来评估集成模型。
test_wrmsse = 0
for store in STORES:
print(f"Processing store {store}...")
# Prediction from Model 1
with fs.open(f"{bucket_name}/combined_df_store_{store}.pkl", "rb") as f:
df = cudf.DataFrame(pd.read_pickle(f))
df_test = df[(df["day_id"] >= holdout[0]) & (df["day_id"] < holdout[1])]
X_test = df_test.drop(columns=["item_id", "dept_id", "cat_id", "day_id", "sales"])
df_test["pred1"] = model[store].predict(X_test)
# Prediction from Model 2
df_test["pred2"] = [np.nan] * len(df_test)
df_test["pred2"] = df_test["pred2"].astype("float32")
for dept in DEPTS:
with fs.open(
f"{bucket_name}/combined_df_store_{store}_dept_{dept}.pkl", "rb"
) as f:
df2 = cudf.DataFrame(pd.read_pickle(f))
df2_test = df2[(df2["day_id"] >= holdout[0]) & (df2["day_id"] < holdout[1])]
X_test = df2_test.drop(columns=["item_id", "cat_id", "day_id", "sales"])
assert np.sum(df_test["dept_id"] == dept) == len(X_test)
df_test["pred2"][df_test["dept_id"] == dept] = model_alt[(store, dept)].predict(
X_test
)
# Average prediction
df_test["avg_pred"] = (df_test["pred1"] + df_test["pred2"]) / 2.0
test_wrmsse += wrmsse(
product_weights,
df,
df_test["avg_pred"],
train_mask=[0, 1914],
valid_mask=holdout,
)
print(f"WRMSSE metric on the held-out test set: {test_wrmsse}")
Processing store CA_1...
Processing store CA_2...
Processing store CA_3...
Processing store CA_4...
Processing store TX_1...
Processing store TX_2...
Processing store TX_3...
Processing store WI_1...
Processing store WI_2...
Processing store WI_3...
WRMSSE metric on the held-out test set: 10.69187847848366
# Close the Dask cluster to clean up
cluster.close()
结论#
我们演示了一个端到端的工作流,在该工作流中,我们获取真实世界的时间序列数据,并使用 Google Kubernetes Engine (GKE) 训练预测模型。通过将并行训练作业分派到 NVIDIA GPU,我们能够加快超参数优化 (HPO) 过程。