在 AWS 上使用 dask-cloudprovider 的多节点多 GPU 示例#

Dask Cloud Provider 是 Dask 的原生云集成。它有助于管理不同云平台上的 Dask 集群。在本笔记本中,我们将了解如何使用该包设置 AWS 集群,并使用 RAPIDS 运行多节点多 GPU (MNMG) 示例。RAPIDS 提供了一套库,用于完全在 GPU 上加速数据科学管道。正如我们将通过本笔记本看到的那样,这可以使用 Dask 扩展到多个节点。

创建您的集群#

注意

首先按照有关使用 Dask Cloud Provider 启动多节点 GPU 集群的完整说明进行操作。

一旦您的 cluster 对象启动并运行,请返回此处继续。

from dask_cloudprovider.aws import EC2Cluster

cluster = EC2Cluster(...)

客户端设置#

现在我们可以使用刚刚定义的集群创建一个 Dask 客户端

from dask.distributed import Client

client = Client(cluster)
client
可选:我们可以等待所有工作器启动并运行。

我们可以通过添加以下内容来做到这一点

# n_workers is the number of GPUs your cluster will have
client.wait_for_workers(n_workers)  

机器学习工作流#

工作器可用后,我们现在可以运行工作流的其余部分

  • 读取和清理数据

  • 添加特征

  • 拆分为训练集和验证集

  • 拟合随机森林模型

  • 在验证集上进行预测

  • 计算 RMSE

让我们导入其余的依赖项。

import dask_cudf
import numpy as np
from cuml.dask.common import utils as dask_utils
from cuml.dask.ensemble import RandomForestRegressor
from cuml.metrics import mean_squared_error
from dask_ml.model_selection import train_test_split

1. 读取和清理数据#

数据在使用前需要进行清理,以便以有意义的方式使用。我们验证列具有适当的数据类型,以便使用 cuML 进行计算。

# create a list of all columns & dtypes the df must have for reading
col_dtype = {
    "VendorID": "int32",
    "tpep_pickup_datetime": "datetime64[ms]",
    "tpep_dropoff_datetime": "datetime64[ms]",
    "passenger_count": "int32",
    "trip_distance": "float32",
    "pickup_longitude": "float32",
    "pickup_latitude": "float32",
    "RatecodeID": "int32",
    "store_and_fwd_flag": "int32",
    "dropoff_longitude": "float32",
    "dropoff_latitude": "float32",
    "payment_type": "int32",
    "fare_amount": "float32",
    "extra": "float32",
    "mta_tax": "float32",
    "tip_amount": "float32",
    "total_amount": "float32",
    "tolls_amount": "float32",
    "improvement_surcharge": "float32",
}
taxi_df = dask_cudf.read_csv(
    "https://storage.googleapis.com/anaconda-public-data/nyc-taxi/csv/2016/yellow_tripdata_2016-02.csv",
    dtype=col_dtype,
)
# Dictionary of required columns and their datatypes
must_haves = {
    "pickup_datetime": "datetime64[ms]",
    "dropoff_datetime": "datetime64[ms]",
    "passenger_count": "int32",
    "trip_distance": "float32",
    "pickup_longitude": "float32",
    "pickup_latitude": "float32",
    "rate_code": "int32",
    "dropoff_longitude": "float32",
    "dropoff_latitude": "float32",
    "fare_amount": "float32",
}
def clean(ddf, must_haves):
    # replace the extraneous spaces in column names and lower the font type
    tmp = {col: col.strip().lower() for col in list(ddf.columns)}
    ddf = ddf.rename(columns=tmp)

    ddf = ddf.rename(
        columns={
            "tpep_pickup_datetime": "pickup_datetime",
            "tpep_dropoff_datetime": "dropoff_datetime",
            "ratecodeid": "rate_code",
        }
    )

    ddf["pickup_datetime"] = ddf["pickup_datetime"].astype("datetime64[ms]")
    ddf["dropoff_datetime"] = ddf["dropoff_datetime"].astype("datetime64[ms]")

    for col in ddf.columns:
        if col not in must_haves:
            ddf = ddf.drop(columns=col)
            continue
        if ddf[col].dtype == "object":
            # Fixing error: could not convert arg to str
            ddf = ddf.drop(columns=col)
        else:
            # downcast from 64bit to 32bit types
            # Tesla T4 are faster on 32bit ops
            if "int" in str(ddf[col].dtype):
                ddf[col] = ddf[col].astype("int32")
            if "float" in str(ddf[col].dtype):
                ddf[col] = ddf[col].astype("float32")
            ddf[col] = ddf[col].fillna(-1)

    return ddf
taxi_df = taxi_df.map_partitions(clean, must_haves, meta=must_haves)

2. 添加特征#

我们将向数据框添加新特征

  1. 我们可以拆分 datetime 列以获取年、月、日、小时、星期几列。找到上车时间和下车时间之间的差值。

  2. 上车和下车坐标之间的 Haversine 距离。

## add features

taxi_df["hour"] = taxi_df["pickup_datetime"].dt.hour.astype("int32")
taxi_df["year"] = taxi_df["pickup_datetime"].dt.year.astype("int32")
taxi_df["month"] = taxi_df["pickup_datetime"].dt.month.astype("int32")
taxi_df["day"] = taxi_df["pickup_datetime"].dt.day.astype("int32")
taxi_df["day_of_week"] = taxi_df["pickup_datetime"].dt.weekday.astype("int32")
taxi_df["is_weekend"] = (taxi_df["day_of_week"] >= 5).astype("int32")

# calculate the time difference between dropoff and pickup.
taxi_df["diff"] = taxi_df["dropoff_datetime"].astype("int32") - taxi_df[
    "pickup_datetime"
].astype("int32")
taxi_df["diff"] = (taxi_df["diff"] / 1000).astype("int32")

taxi_df["pickup_latitude_r"] = taxi_df["pickup_latitude"] // 0.01 * 0.01
taxi_df["pickup_longitude_r"] = taxi_df["pickup_longitude"] // 0.01 * 0.01
taxi_df["dropoff_latitude_r"] = taxi_df["dropoff_latitude"] // 0.01 * 0.01
taxi_df["dropoff_longitude_r"] = taxi_df["dropoff_longitude"] // 0.01 * 0.01

taxi_df = taxi_df.drop("pickup_datetime", axis=1)
taxi_df = taxi_df.drop("dropoff_datetime", axis=1)


def haversine_dist(df):
    import cuspatial

    pickup = cuspatial.GeoSeries.from_points_xy(
        df[["pickup_longitude", "pickup_latitude"]].interleave_columns()
    )
    dropoff = cuspatial.GeoSeries.from_points_xy(
        df[["dropoff_longitude", "dropoff_latitude"]].interleave_columns()
    )
    df["h_distance"] = cuspatial.haversine_distance(pickup, dropoff)
    df["h_distance"] = df["h_distance"].astype("float32")
    return df


taxi_df = taxi_df.map_partitions(haversine_dist)

3. 拆分数据#

# Split into training and validation sets
X, y = taxi_df.drop(["fare_amount"], axis=1).astype("float32"), taxi_df[
    "fare_amount"
].astype("float32")
X_train, X_test, y_train, y_test = train_test_split(X, y, shuffle=True)
workers = client.has_what().keys()
X_train, X_test, y_train, y_test = dask_utils.persist_across_workers(
    client, [X_train, X_test, y_train, y_test], workers=workers
)

4. 创建并拟合随机森林模型#

# create cuml.dask RF regressor
cu_dask_rf = RandomForestRegressor(ignore_empty_partitions=True)
# fit RF model
cu_dask_rf = cu_dask_rf.fit(X_train, y_train)

5. 在验证集上进行预测#

# predict on validation set
y_pred = cu_dask_rf.predict(X_test)

6. 计算 RMSE#

# compute RMSE
score = mean_squared_error(y_pred.compute().to_numpy(), y_test.compute().to_numpy())
print("Workflow Complete - RMSE: ", np.sqrt(score))

资源清理#

# Clean up resources
client.close()
cluster.close()

了解更多#