在 AWS 上使用 dask-cloudprovider 的多节点多 GPU 示例#
Dask Cloud Provider 是 Dask 的原生云集成。它有助于管理不同云平台上的 Dask 集群。在本笔记本中,我们将了解如何使用该包设置 AWS 集群,并使用 RAPIDS 运行多节点多 GPU (MNMG) 示例。RAPIDS 提供了一套库,用于完全在 GPU 上加速数据科学管道。正如我们将通过本笔记本看到的那样,这可以使用 Dask 扩展到多个节点。
创建您的集群#
注意
首先按照有关使用 Dask Cloud Provider 启动多节点 GPU 集群的完整说明进行操作。
一旦您的 cluster
对象启动并运行,请返回此处继续。
from dask_cloudprovider.aws import EC2Cluster
cluster = EC2Cluster(...)
客户端设置#
现在我们可以使用刚刚定义的集群创建一个 Dask 客户端。
from dask.distributed import Client
client = Client(cluster)
client
可选:我们可以等待所有工作器启动并运行。
我们可以通过添加以下内容来做到这一点
# n_workers is the number of GPUs your cluster will have
client.wait_for_workers(n_workers)
机器学习工作流#
工作器可用后,我们现在可以运行工作流的其余部分
读取和清理数据
添加特征
拆分为训练集和验证集
拟合随机森林模型
在验证集上进行预测
计算 RMSE
让我们导入其余的依赖项。
import dask_cudf
import numpy as np
from cuml.dask.common import utils as dask_utils
from cuml.dask.ensemble import RandomForestRegressor
from cuml.metrics import mean_squared_error
from dask_ml.model_selection import train_test_split
1. 读取和清理数据#
数据在使用前需要进行清理,以便以有意义的方式使用。我们验证列具有适当的数据类型,以便使用 cuML 进行计算。
# create a list of all columns & dtypes the df must have for reading
col_dtype = {
"VendorID": "int32",
"tpep_pickup_datetime": "datetime64[ms]",
"tpep_dropoff_datetime": "datetime64[ms]",
"passenger_count": "int32",
"trip_distance": "float32",
"pickup_longitude": "float32",
"pickup_latitude": "float32",
"RatecodeID": "int32",
"store_and_fwd_flag": "int32",
"dropoff_longitude": "float32",
"dropoff_latitude": "float32",
"payment_type": "int32",
"fare_amount": "float32",
"extra": "float32",
"mta_tax": "float32",
"tip_amount": "float32",
"total_amount": "float32",
"tolls_amount": "float32",
"improvement_surcharge": "float32",
}
taxi_df = dask_cudf.read_csv(
"https://storage.googleapis.com/anaconda-public-data/nyc-taxi/csv/2016/yellow_tripdata_2016-02.csv",
dtype=col_dtype,
)
# Dictionary of required columns and their datatypes
must_haves = {
"pickup_datetime": "datetime64[ms]",
"dropoff_datetime": "datetime64[ms]",
"passenger_count": "int32",
"trip_distance": "float32",
"pickup_longitude": "float32",
"pickup_latitude": "float32",
"rate_code": "int32",
"dropoff_longitude": "float32",
"dropoff_latitude": "float32",
"fare_amount": "float32",
}
def clean(ddf, must_haves):
# replace the extraneous spaces in column names and lower the font type
tmp = {col: col.strip().lower() for col in list(ddf.columns)}
ddf = ddf.rename(columns=tmp)
ddf = ddf.rename(
columns={
"tpep_pickup_datetime": "pickup_datetime",
"tpep_dropoff_datetime": "dropoff_datetime",
"ratecodeid": "rate_code",
}
)
ddf["pickup_datetime"] = ddf["pickup_datetime"].astype("datetime64[ms]")
ddf["dropoff_datetime"] = ddf["dropoff_datetime"].astype("datetime64[ms]")
for col in ddf.columns:
if col not in must_haves:
ddf = ddf.drop(columns=col)
continue
if ddf[col].dtype == "object":
# Fixing error: could not convert arg to str
ddf = ddf.drop(columns=col)
else:
# downcast from 64bit to 32bit types
# Tesla T4 are faster on 32bit ops
if "int" in str(ddf[col].dtype):
ddf[col] = ddf[col].astype("int32")
if "float" in str(ddf[col].dtype):
ddf[col] = ddf[col].astype("float32")
ddf[col] = ddf[col].fillna(-1)
return ddf
taxi_df = taxi_df.map_partitions(clean, must_haves, meta=must_haves)
2. 添加特征#
我们将向数据框添加新特征
我们可以拆分 datetime 列以获取年、月、日、小时、星期几列。找到上车时间和下车时间之间的差值。
上车和下车坐标之间的 Haversine 距离。
## add features
taxi_df["hour"] = taxi_df["pickup_datetime"].dt.hour.astype("int32")
taxi_df["year"] = taxi_df["pickup_datetime"].dt.year.astype("int32")
taxi_df["month"] = taxi_df["pickup_datetime"].dt.month.astype("int32")
taxi_df["day"] = taxi_df["pickup_datetime"].dt.day.astype("int32")
taxi_df["day_of_week"] = taxi_df["pickup_datetime"].dt.weekday.astype("int32")
taxi_df["is_weekend"] = (taxi_df["day_of_week"] >= 5).astype("int32")
# calculate the time difference between dropoff and pickup.
taxi_df["diff"] = taxi_df["dropoff_datetime"].astype("int32") - taxi_df[
"pickup_datetime"
].astype("int32")
taxi_df["diff"] = (taxi_df["diff"] / 1000).astype("int32")
taxi_df["pickup_latitude_r"] = taxi_df["pickup_latitude"] // 0.01 * 0.01
taxi_df["pickup_longitude_r"] = taxi_df["pickup_longitude"] // 0.01 * 0.01
taxi_df["dropoff_latitude_r"] = taxi_df["dropoff_latitude"] // 0.01 * 0.01
taxi_df["dropoff_longitude_r"] = taxi_df["dropoff_longitude"] // 0.01 * 0.01
taxi_df = taxi_df.drop("pickup_datetime", axis=1)
taxi_df = taxi_df.drop("dropoff_datetime", axis=1)
def haversine_dist(df):
import cuspatial
pickup = cuspatial.GeoSeries.from_points_xy(
df[["pickup_longitude", "pickup_latitude"]].interleave_columns()
)
dropoff = cuspatial.GeoSeries.from_points_xy(
df[["dropoff_longitude", "dropoff_latitude"]].interleave_columns()
)
df["h_distance"] = cuspatial.haversine_distance(pickup, dropoff)
df["h_distance"] = df["h_distance"].astype("float32")
return df
taxi_df = taxi_df.map_partitions(haversine_dist)
3. 拆分数据#
# Split into training and validation sets
X, y = taxi_df.drop(["fare_amount"], axis=1).astype("float32"), taxi_df[
"fare_amount"
].astype("float32")
X_train, X_test, y_train, y_test = train_test_split(X, y, shuffle=True)
workers = client.has_what().keys()
X_train, X_test, y_train, y_test = dask_utils.persist_across_workers(
client, [X_train, X_test, y_train, y_test], workers=workers
)
4. 创建并拟合随机森林模型#
# create cuml.dask RF regressor
cu_dask_rf = RandomForestRegressor(ignore_empty_partitions=True)
# fit RF model
cu_dask_rf = cu_dask_rf.fit(X_train, y_train)
5. 在验证集上进行预测#
# predict on validation set
y_pred = cu_dask_rf.predict(X_test)
6. 计算 RMSE#
# compute RMSE
score = mean_squared_error(y_pred.compute().to_numpy(), y_test.compute().to_numpy())
print("Workflow Complete - RMSE: ", np.sqrt(score))
资源清理#
# Clean up resources
client.close()
cluster.close()