使用 dask-cloudprovider 在 Azure 上运行多节点多 GPU XGBoost 示例#

Dask Cloud Provider 是 Dask 的一个原生云集成库。它帮助管理不同云平台上的 Dask 集群。在本笔记本中,我们将探讨如何使用此软件包设置 Azure 集群,并使用 RAPIDS 运行多节点多 GPU (MNMG) 示例。RAPIDS 提供了一套完全在 GPU 上加速数据科学流程的库。正如我们将在本笔记本中看到的那样,这可以使用 Dask 扩展到多个节点。

出于本次演示的目的,我们将使用部分 NYC 出租车数据集(此处仅使用 2014 日历年的文件)。目标是根据出租车行程的时间和坐标预测给定行程的票价金额。我们将从 Azure 开放数据集下载数据,该数据集由 Microsoft 公开托管。

注意

在本笔记本中,我们将探讨使用 dask-cloudprovider 在 Azure VM 集群上运行工作负载的两种可能方法:

  1. 选项 1:使用 NVIDIA 免费提供的 Azure Marketplace 镜像。虚拟机启动后,将随后下载 RAPIDS 容器。

  2. 选项 2:使用 packer 创建用于集群的自定义 VM 镜像。此镜像将包含 RAPIDS 容器,将容器预置在镜像中应能加快集群预配过程。

您可以使用选项 1 或选项 2

步骤 0:设置 Azure 凭据和 CLI#

在运行笔记本之前,在终端中运行以下命令以设置 Azure CLI

curl -sL https://aka.ms/InstallAzureCLIDeb | sudo bash
az login

然后,按照提示上的说明完成账户设置。如果您在 Docker 容器内运行笔记本,则可以删除 sudo

!az login
A web browser has been opened at https://login.microsoftonline.com/organizations/oauth2/v2.0/authorize. Please continue the login in the web browser. If no web browser is available or if the web browser fails to open, use device code flow with `az login --use-device-code`.
[
  {
    "cloudName": "AzureCloud",
    "homeTenantId": "43083d15-7273-40c1-b7db-39efd9ccc17a",
    "id": "fc4f4a6b-4041-4b1c-8249-854d68edcf62",
    "isDefault": true,
    "managedByTenants": [
      {
        "tenantId": "2f4a9838-26b7-47ee-be60-ccc1fdec5953"
      }
    ],
    "name": "NV-AI-Infra",
    "state": "Enabled",
    "tenantId": "43083d15-7273-40c1-b7db-39efd9ccc17a",
    "user": {
      "name": "skirui@nvidia.com",
      "type": "user"
    }
  }
]

步骤 1:导入必要的软件包#

# # Uncomment the following and install some libraries at the beginning.
# If adlfs is not present, install adlfs to read from Azure data lake.
! pip install adlfs
! pip install "dask-cloudprovider[azure]" --upgrade
import json
from timeit import default_timer as timer

import dask
import dask_cudf
import numpy as np
import xgboost as xgb
from cuml.metrics import mean_squared_error
from dask.distributed import Client, wait
from dask_cloudprovider.azure import AzureVMCluster
from dask_ml.model_selection import train_test_split

步骤 2:设置 Azure VM 集群#

现在,我们将按照这些说明,使用 Dask Cloud Provider 中的 AzureVMCluster 在 Azure 虚拟机上设置 Dask 集群。

为此,您首先需要在 Azure 上设置资源组、虚拟网络和安全组。了解更多关于如何设置。请注意,您也可以使用 Azure 门户进行设置。

设置完成后,您现在可以将您创建的实体名称填入下面的单元格中。

我们需要传入 docker 参数 docker_args = '--shm-size=256m',以允许更大的共享内存,从而成功地在同一 VM 中运行多个 docker 容器。当每个 VM 有多个 worker 时就是这种情况。即使您没有这种情况,拥有更大的共享内存也没有坏处。最后,请注意,我们使用 RAPIDS docker 镜像构建 VM,并使用 dask_cuda.CUDAWorker 在 VM 中运行。这将使用具有 GPU 能力而不是 CPU 的 worker docker 镜像运行。

location = "West US 2"
resource_group = "rapidsai-deployment"
vnet = "rapidsai-deployment-vnet"
security_group = "rapidsaiclouddeploymenttest-nsg"
vm_size = "Standard_NC12s_v3"  # or choose a different GPU enabled VM type

docker_image = "nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12"
docker_args = "--shm-size=256m"
worker_class = "dask_cuda.CUDAWorker"

选项 1:使用 Azure Marketplace VM 镜像#

在此方法中,我们可以使用 NVIDIA 免费提供的 Azure Marketplace VM。这些 VM 镜像预装了所有必要的依赖项和 NVIDIA 驱动程序。NVIDIA 提供这些镜像作为一个即插即用解决方案,以减少数据科学家的集群设置时间。对我们来说幸运的是,dask-cloudprovider 使得传入 Marketplace VM 信息变得简单,它将使用选定的 VM 镜像而不是通用镜像。

我们将使用以下镜像:适用于 AI 和 HPC 的 NVIDIA GPU 优化镜像

注意

请确保您的 dask-cloudprovider 版本为 2021.6.0 或更高。旧版本不支持 Azure 中的 Marketplace VM。

设置 Marketplace VM 信息并清除默认的 dask 配置。#

dask.config.set(
    {
        "logging.distributed": "info",
        "cloudprovider.azure.azurevm.marketplace_plan": {
            "publisher": "nvidia",
            "name": "ngc-base-version-23_03_0",
            "product": "ngc_azure_17_11",
            "version": "23.03.0",
        },
    }
)
vm_image = ""
config = dask.config.get("cloudprovider.azure.azurevm", {})
config
{'vnet': None,
 'security_group': None,
 'public_ingress': True,
 'vm_size': 'Standard_DS1_v2',
 'disk_size': 50,
 'scheduler_vm_size': None,
 'docker_image': 'daskdev/dask:latest',
 'vm_image': {'publisher': 'Canonical',
  'offer': 'UbuntuServer',
  'sku': '18.04-LTS',
  'version': 'latest'},
 'bootstrap': True,
 'auto_shutdown': True,
 'marketplace_plan': {'publisher': 'nvidia',
  'name': 'ngc-base-version-23_03_0',
  'product': 'ngc_azure_17_11',
  'version': '23.03.0'}}

如有必要,您必须取消注释并接受 Azure Marketplace 镜像条款,以便可以使用该镜像创建 VM

! az vm image terms accept --urn "nvidia:ngc_azure_17_11:ngc-base-version-23_03_0:23.03.0" --verbose
{
  "accepted": true,
  "id": "/subscriptions/fc4f4a6b-4041-4b1c-8249-854d68edcf62/providers/Microsoft.MarketplaceOrdering/offerTypes/Microsoft.MarketplaceOrdering/offertypes/publishers/nvidia/offers/ngc_azure_17_11/plans/ngc-base-version-23_03_0/agreements/current",
  "licenseTextLink": "https://mpcprodsa.blob.core.windows.net/legalterms/3E5ED_legalterms_NVIDIA%253a24NGC%253a5FAZURE%253a5F17%253a5F11%253a24NGC%253a2DBASE%253a2DVERSION%253a2D23%253a5F03%253a5F0%253a24KJVKRIWKTRQ3CIEPNL6YTG4AVORBHHPZCDQDVWX7JPPDEF6UM7R4XO76VDRHXCNTQYATKLGYYW3KA7DSIKTYXBZ3HJ2FMWYCINEY4WQ.txt",
  "marketplaceTermsLink": "https://mpcprodsa.blob.core.windows.net/marketplaceterms/3EDEF_marketplaceterms_VIRTUALMACHINE%253a24AAK2OAIZEAWW5H4MSP5KSTVB6NDKKRTUBAU23BRFTWN4YC2MQLJUB5ZEYUOUJBVF3YK34CIVPZL2HWYASPGDUY5O2FWEGRBYOXWZE5Y.txt",
  "name": "ngc-base-version-23_03_0",
  "plan": "ngc-base-version-23_03_0",
  "privacyPolicyLink": "https://www.nvidia.com/en-us/about-nvidia/privacy-policy/",
  "product": "ngc_azure_17_11",
  "publisher": "nvidia",
  "retrieveDatetime": "2023-10-02T08:17:40.3203275Z",
  "signature": "SWCKS7PPTL3XIBGBE2IZCMF43KBRDLSIZ7XLXXTLI6SXDCPCXY53BAISH6DNIELVV63GPZ44AOMMMZ6RV2AL5ARNM6XWHXRJ4HDNTJI",
  "systemData": {
    "createdAt": "2023-10-02T08:17:43.219827+00:00",
    "createdBy": "fc4f4a6b-4041-4b1c-8249-854d68edcf62",
    "createdByType": "ManagedIdentity",
    "lastModifiedAt": "2023-10-02T08:17:43.219827+00:00",
    "lastModifiedBy": "fc4f4a6b-4041-4b1c-8249-854d68edcf62",
    "lastModifiedByType": "ManagedIdentity"
  },
  "type": "Microsoft.MarketplaceOrdering/offertypes"
}
Command ran in 7.879 seconds (init: 0.159, invoke: 7.720)

现在您已经设置了使用 NVIDIA VM 镜像所需的配置,直接转到步骤 2.1 启动 AzureVMCluster。

选项 2:设置 Azure 自定义 VM#

如果您已经有一个自定义 VM 并且知道其资源 ID,请跳至选项 2 的步骤 f.

通常,如果我们使用通用镜像创建集群,则必须等待新 VM 完全预配所有依赖项。预配步骤会执行多项操作,例如使用所需库设置 VM、设置 Docker、安装 NVIDIA 驱动程序,以及拉取和解压 RAPIDS 容器等。这通常需要大约 10-15 分钟的时间,具体取决于云提供商。如果用户希望快速启动集群,每次从通用镜像设置 VM 可能不是最优的。

此外,如选项 1 中详述,我们也可以选择使用 NVIDIA 的自定义 Marketplace VM。但是,我们仍然需要下载和解压 RAPIDS 容器。因此,启动 worker 和调度器所需的设置时间仍然大约需要 8-10 分钟。

幸运的是,我们可以改进这一点。我们可以制作自己的定制 VM,其中包含所有必要的软件包、驱动程序、容器和依赖项。这样,使用定制 VM 启动集群所需的时间将最短。

在此示例中,我们将使用一个名为 packer 的工具来创建我们的自定义虚拟机镜像。Packer 可自动跨所有主要云提供商构建和定制 VM 的过程。

现在,要创建自定义 VM 镜像,请按照步骤 a. 到 f. 操作。

a. 安装 packer#

按照入门指南,根据您的平台下载所需的二进制文件并安装它。

b. 使用 Azure 验证 packer#

有几种方法可以验证 packer 以便与 Azure 配合使用(详细信息请参阅此处)。然而,由于我们已经在笔记本开始时安装了 Azure cli (az),使用 az cli 验证 packer 是最简单的选项。我们将让 packer 使用 az cli 中的 Azure 凭据,因此,您在此步骤中无需再做任何事情。

c. 生成用于定制 VM 镜像的 cloud init 脚本#

packer 可以使用 cloud-init 脚本来初始化 VM。cloud init 脚本包含将设置我们的自定义 VM 环境的命令集。我们将通过配置文件将其作为外部文件传递给 packer 命令。

cloud init 文件 cloud_init.yaml.j2 位于 configs 文件夹中。如果您想添加/修改任何配置,请在继续下一步之前编辑 cloud_init.yaml.j2

d. 将 packer 配置写入配置文件#

现在我们需要为 packer 提供一个包含平台相关和 cloud-init 配置的构建文件。packer 将使用此文件创建自定义 VM。

在此示例中,我们将创建一个单个自定义 VM 镜像,仅供用户访问。我们将使用 Ubuntu Server 18.04 基础镜像并对其进行定制。稍后,我们将从此自定义 VM 镜像实例化所有 VM。

如果您对还可以配置哪些内容感到好奇,请查看 packer 的所有可用 Azure 构建参数

注意

在此示例中,我们的资源组已存在。因此,我们只需在必需参数 managed_image_resource_group_namebuild_resource_group_name 中传入我们的资源组名称即可。

custom_vm_image_name = "FILL-THIS-IN"
packer_config = {
    "builders": [
        {
            "type": "azure-arm",
            "use_azure_cli_auth": True,
            "managed_image_resource_group_name": resource_group,
            "managed_image_name": custom_vm_image_name,
            "custom_data_file": "./configs/cloud_init.yaml.j2",
            "os_type": "Linux",
            "image_publisher": "Canonical",
            "image_offer": "UbuntuServer",
            "image_sku": "18.04-LTS",
            "azure_tags": {
                "dept": "RAPIDS-CSP",
                "task": "RAPIDS Custom Image deployment",
            },
            "build_resource_group_name": resource_group,
            "vm_size": vm_size,
        }
    ],
    "provisioners": [
        {
            "inline": [
                (
                    "echo 'Waiting for cloud-init'; "
                    "while [ ! -f /var/lib/cloud/instance/boot-finished ]; "
                    "do sleep 1; done; echo 'Done'"
                )
            ],
            "type": "shell",
        }
    ],
}

with open("packer_config.json", "w") as fh:
    fh.write(json.dumps(packer_config))

e. 运行 packer build 并创建镜像#

# # Uncomment the following line and run to create the custom image
# ! packer build packer_config.json

这将需要大约 15 分钟。去喝杯咖啡或看一集您喜欢的电视节目,然后回来。但请记住,您只需执行此操作一次,除非您想更新 VM 中的软件包。这意味着您可以一次创建此自定义镜像,然后持续使用数百次。

packer 正在构建镜像时,您将看到类似于下面所示的输出。

$ packer build packer_config.json
azure-arm: output will be in this color.

==> azure-arm: Running builder ...
==> azure-arm: Getting tokens using Azure CLI
==> azure-arm: Getting tokens using Azure CLI
    azure-arm: Creating Azure Resource Manager (ARM) client ...
==> azure-arm: Using existing resource group ...
==> azure-arm:  -> ResourceGroupName : <your resource group>
==> azure-arm:  -> Location          : <some chosen location>
==> azure-arm: Validating deployment template ...
==> azure-arm:  -> ResourceGroupName : <your resource group>
==> azure-arm:  -> DeploymentName    : 'pkrdp04rrahxkg9'
==> azure-arm: Deploying deployment template ...
==> azure-arm:  -> ResourceGroupName : <your resource group>
==> azure-arm:  -> DeploymentName    : 'pkrdp04rrahxkg9'
==> azure-arm:
==> azure-arm: Getting the VM's IP address ...
==> azure-arm:  -> ResourceGroupName   : <your resource group>
==> azure-arm:  -> PublicIPAddressName : 'pkrip04rrahxkg9'
==> azure-arm:  -> NicName             : 'pkrni04rrahxkg9'
==> azure-arm:  -> Network Connection  : 'PublicEndpoint'
==> azure-arm:  -> IP Address          : '40.77.62.118'
==> azure-arm: Waiting for SSH to become available...
==> azure-arm: Connected to SSH!
==> azure-arm: Provisioning with shell script: /tmp/packer-shell614221056
    azure-arm: Waiting for cloud-init
    azure-arm: Done
==> azure-arm: Querying the machine's properties ...
==> azure-arm:  -> ResourceGroupName : <your resource group>
==> azure-arm:  -> ComputeName       : 'pkrvm04rrahxkg9'
==> azure-arm:  -> Managed OS Disk   : '/subscriptions/<your subscription id>/resourceGroups/<your resource group>/providers/Microsoft.Compute/disks/pkros04rrahxkg9'
==> azure-arm: Querying the machine's additional disks properties ...
==> azure-arm:  -> ResourceGroupName : <your resource group>
==> azure-arm:  -> ComputeName       : 'pkrvm04rrahxkg9'
==> azure-arm: Powering off machine ...
==> azure-arm:  -> ResourceGroupName : <your resource group>
==> azure-arm:  -> ComputeName       : 'pkrvm04rrahxkg9'
==> azure-arm: Capturing image ...
==> azure-arm:  -> Compute ResourceGroupName : <your resource group>
==> azure-arm:  -> Compute Name              : 'pkrvm04rrahxkg9'
==> azure-arm:  -> Compute Location          : <some chosen location>
==> azure-arm:  -> Image ResourceGroupName   : <your resource group>
==> azure-arm:  -> Image Name                : <your chosen custom image name>
==> azure-arm:  -> Image Location            : <some chosen location>
==> azure-arm: 
==> azure-arm: Deleting individual resources ...
==> azure-arm: Adding to deletion queue -> Microsoft.Compute/virtualMachines : 'pkrvm04rrahxkg9'
==> azure-arm: Adding to deletion queue -> Microsoft.Network/networkInterfaces : 'pkrni04rrahxkg9'
==> azure-arm: Adding to deletion queue -> Microsoft.Network/publicIPAddresses : 'pkrip04rrahxkg9'
==> azure-arm: Adding to deletion queue -> Microsoft.Network/virtualNetworks : 'pkrvn04rrahxkg9'
==> azure-arm: Attempting deletion -> Microsoft.Network/networkInterfaces : 'pkrni04rrahxkg9'
==> azure-arm: Waiting for deletion of all resources...
==> azure-arm: Attempting deletion -> Microsoft.Network/publicIPAddresses : 'pkrip04rrahxkg9'
==> azure-arm: Attempting deletion -> Microsoft.Compute/virtualMachines : 'pkrvm04rrahxkg9'
==> azure-arm: Attempting deletion -> Microsoft.Network/virtualNetworks : 'pkrvn04rrahxkg9'
    .
    .
    .
    .
    .
    .
    .
    .
==> azure-arm:  Deleting -> Microsoft.Compute/disks : '/subscriptions/<your subscription id>/resourceGroups/<your resource group>/providers/Microsoft.Compute/disks/pkros04rrahxkg9'
==> azure-arm: Removing the created Deployment object: 'pkrdp04rrahxkg9'
==> azure-arm: 
==> azure-arm: The resource group was not created by Packer, not deleting ...
Build 'azure-arm' finished after 16 minutes 22 seconds.

==> Wait completed after 16 minutes 22 seconds

==> Builds finished. The artifacts of successful builds are:
--> azure-arm: Azure.ResourceManagement.VMImage:

OSType: Linux
ManagedImageResourceGroupName: <your resource group>
ManagedImageName: <your chosen custom image name>
ManagedImageId: /subscriptions/<your subscription id>/resourceGroups/<your resource group>/providers/Microsoft.Compute/images/<your chosen custom image name>
ManagedImageLocation: <some chosen location>

packer 完成时,在输出底部,您将看到类似于以下内容的信息

ManagedImageResourceGroupName: <your resource group>
ManagedImageName: <your chosen custom image name>
ManagedImageId: /subscriptions/<your subscription id>/resourceGroups/<your resource group>/providers/Microsoft.Compute/images/<your chosen custom image name>
ManagedImageLocation: <some chosen location>

记下 ManagedImageId。这是我们将使用的自定义镜像的资源 ID。

如上所示,ManagedImageId 将类似于:/subscriptions/12345/resourceGroups/myown-rg/providers/Microsoft.Compute/images/myCustomImage

f. 设置自定义 VM 信息并清除默认的 dask 配置#

获取自定义 VM 资源 ID 后,您应该在 dask.config 中重置默认 VM 镜像信息。dask.config 中加载的默认镜像值是基本的 Ubuntu Server 18.04 LTS(即您已定制的那个)。如果您不重置它,dask 将尝试使用该镜像而不是您定制的镜像。

# fill this in with the value from above
# or the customized VM id if you already have resource id of the customized VM from a previous run.
ManagedImageId = "FILL-THIS-IN"
dask.config.set({"cloudprovider.azure.azurevm.vm_image": {}})
config = dask.config.get("cloudprovider.azure.azurevm", {})
print(config)
vm_image = {"id": ManagedImageId}
print(vm_image)

步骤 2.1:在 Azure 中启动 VM 集群#

此处,如果您使用了选项 1,即 NVIDIA VM 镜像,请为 vm_image 信息传递一个空字符串。

对于选项 2,将从 packer 运行输出中获取的 vm_image 信息作为参数传递给 AzureVMCluster

此外,通过传递 bootstrap=False 来关闭 VM 的自举。这将关闭在实例化 VM 时安装依赖项,因为在两种情况下我们都已经在自定义 VM 上安装了它们。

注意

笔记本的其余部分应该与您选择选项 1 还是选项 2 无关,都是相同的。

注意

我们的集群拥有的实际 worker 数量并不总是等于创建的 VM 数量,即传入的 \(n\_workers\) 值。如果所选 vm_size 中的 GPU 数量为 \(G\),创建的 VM 数量为 \(n\_workers\),那么实际 worker 数量 \(W = n\_workers \times G\)。例如,对于每台 VM 有 2 个 V100 GPU 的 Standard_NC12s_v3 VM,当 \(n\_workers=2\) 时,我们有 \(W = 2 \times 2=4\)。

%%time

cluster = AzureVMCluster(
    location=location,
    resource_group=resource_group,
    vnet=vnet,
    security_group=security_group,
    vm_image=vm_image,
    vm_size=vm_size,
    disk_size=200,
    docker_image=docker_image,
    worker_class=worker_class,
    n_workers=2,
    security=True,
    docker_args=docker_args,
    debug=False,
    bootstrap=False,  # This is to prevent the cloud init jinja2 script from running in the custom VM.
)
client = Client(cluster)
client

客户端

Client-b8982284-60fe-11ee-a1e9-80e82cd32958

连接方法: 集群对象 集群类型: dask_cloudprovider.AzureVMCluster
仪表板: http://4.155.2.188:8787/status

集群信息

%%time
client.wait_for_workers(2)
CPU times: user 0 ns, sys: 6.1 ms, total: 6.1 ms
Wall time: 29 ms
# Uncomment if you only have the scheduler with n_workers=0 and want to scale the workers separately.
# %%time
# client.cluster.scale(n_workers)

等待所有 worker 启动。这将等待 n_workers 数量的 VM 启动。

在开始训练过程之前,让我们快速查看一下我们将使用的 worker pod 中 GPU 的详细信息。

import pprint

pp = pprint.PrettyPrinter()

pp.pprint(
    client.scheduler_info()
)  # will show some information of the GPUs of the workers
{'address': 'tls://10.5.0.42:8786',
 'id': 'Scheduler-3bae5a4d-29d1-4317-bbfc-931e97a077fb',
 'services': {'dashboard': 8787},
 'started': 1696235012.5914223,
 'type': 'Scheduler',
 'workers': {'tls://10.5.0.43:36201': {'gpu': {'memory-total': 17179869184,
                                               'name': 'Tesla V100-PCIE-16GB'},
                                       'host': '10.5.0.43',
                                       'id': 'dask-92c5978e-worker-54f8d057-1',
                                       'last_seen': 1696235778.2340653,
                                       'local_directory': '/tmp/dask-scratch-space/worker-6bghw_yx',
                                       'memory_limit': 118225670144,
                                       'metrics': {'bandwidth': {'total': 100000000,
                                                                 'types': {},
                                                                 'workers': {}},
                                                   'cpu': 4.0,
                                                   'digests_total_since_heartbeat': {'latency': 0.004627227783203125,
                                                                                     'tick-duration': 0.5006744861602783},
                                                   'event_loop_interval': 0.019985613822937013,
                                                   'gpu': {'memory-used': 598867968,
                                                           'utilization': 0},
                                                   'gpu_memory_used': 598867968,
                                                   'gpu_utilization': 0,
                                                   'host_disk_io': {'read_bps': 0.0,
                                                                    'write_bps': 0.0},
                                                   'host_net_io': {'read_bps': 612.42422993883,
                                                                   'write_bps': 3346.3180145677247},
                                                   'managed_bytes': 0,
                                                   'memory': 623116288,
                                                   'num_fds': 86,
                                                   'rmm': {'rmm-total': 0,
                                                           'rmm-used': 0},
                                                   'spilled_bytes': {'disk': 0,
                                                                     'memory': 0},
                                                   'task_counts': {},
                                                   'time': 1696235777.730071,
                                                   'transfer': {'incoming_bytes': 0,
                                                                'incoming_count': 0,
                                                                'incoming_count_total': 0,
                                                                'outgoing_bytes': 0,
                                                                'outgoing_count': 0,
                                                                'outgoing_count_total': 0}},
                                       'name': 'dask-92c5978e-worker-54f8d057-1',
                                       'nanny': 'tls://10.5.0.43:42265',
                                       'nthreads': 1,
                                       'resources': {},
                                       'services': {'dashboard': 44817},
                                       'status': 'running',
                                       'type': 'Worker'},
             'tls://10.5.0.43:38107': {'gpu': {'memory-total': 17179869184,
                                               'name': 'Tesla V100-PCIE-16GB'},
                                       'host': '10.5.0.43',
                                       'id': 'dask-92c5978e-worker-54f8d057-0',
                                       'last_seen': 1696235778.2329032,
                                       'local_directory': '/tmp/dask-scratch-space/worker-ix8y4_eg',
                                       'memory_limit': 118225670144,
                                       'metrics': {'bandwidth': {'total': 100000000,
                                                                 'types': {},
                                                                 'workers': {}},
                                                   'cpu': 2.0,
                                                   'digests_total_since_heartbeat': {'latency': 0.004603147506713867,
                                                                                     'tick-duration': 0.4996976852416992},
                                                   'event_loop_interval': 0.019999494552612306,
                                                   'gpu': {'memory-used': 598867968,
                                                           'utilization': 0},
                                                   'gpu_memory_used': 598867968,
                                                   'gpu_utilization': 0,
                                                   'host_disk_io': {'read_bps': 0.0,
                                                                    'write_bps': 0.0},
                                                   'host_net_io': {'read_bps': 611.5250712835996,
                                                                   'write_bps': 3341.404964660714},
                                                   'managed_bytes': 0,
                                                   'memory': 623882240,
                                                   'num_fds': 86,
                                                   'rmm': {'rmm-total': 0,
                                                           'rmm-used': 0},
                                                   'spilled_bytes': {'disk': 0,
                                                                     'memory': 0},
                                                   'task_counts': {},
                                                   'time': 1696235777.729443,
                                                   'transfer': {'incoming_bytes': 0,
                                                                'incoming_count': 0,
                                                                'incoming_count_total': 0,
                                                                'outgoing_bytes': 0,
                                                                'outgoing_count': 0,
                                                                'outgoing_count_total': 0}},
                                       'name': 'dask-92c5978e-worker-54f8d057-0',
                                       'nanny': 'tls://10.5.0.43:33657',
                                       'nthreads': 1,
                                       'resources': {},
                                       'services': {'dashboard': 45421},
                                       'status': 'running',
                                       'type': 'Worker'},
             'tls://10.5.0.44:34087': {'gpu': {'memory-total': 17179869184,
                                               'name': 'Tesla V100-PCIE-16GB'},
                                       'host': '10.5.0.44',
                                       'id': 'dask-92c5978e-worker-9f9a9c9b-1',
                                       'last_seen': 1696235778.5268767,
                                       'local_directory': '/tmp/dask-scratch-space/worker-1d7vbddw',
                                       'memory_limit': 118225670144,
                                       'metrics': {'bandwidth': {'total': 100000000,
                                                                 'types': {},
                                                                 'workers': {}},
                                                   'cpu': 0.0,
                                                   'digests_total_since_heartbeat': {'latency': 0.004075765609741211,
                                                                                     'tick-duration': 0.4998819828033447},
                                                   'event_loop_interval': 0.02001068115234375,
                                                   'gpu': {'memory-used': 598867968,
                                                           'utilization': 0},
                                                   'gpu_memory_used': 598867968,
                                                   'gpu_utilization': 0,
                                                   'host_disk_io': {'read_bps': 0.0,
                                                                    'write_bps': 12597732.652975753},
                                                   'host_net_io': {'read_bps': 612.7208378808626,
                                                                   'write_bps': 3347.938695871903},
                                                   'managed_bytes': 0,
                                                   'memory': 624406528,
                                                   'num_fds': 86,
                                                   'rmm': {'rmm-total': 0,
                                                           'rmm-used': 0},
                                                   'spilled_bytes': {'disk': 0,
                                                                     'memory': 0},
                                                   'task_counts': {},
                                                   'time': 1696235778.023989,
                                                   'transfer': {'incoming_bytes': 0,
                                                                'incoming_count': 0,
                                                                'incoming_count_total': 0,
                                                                'outgoing_bytes': 0,
                                                                'outgoing_count': 0,
                                                                'outgoing_count_total': 0}},
                                       'name': 'dask-92c5978e-worker-9f9a9c9b-1',
                                       'nanny': 'tls://10.5.0.44:37979',
                                       'nthreads': 1,
                                       'resources': {},
                                       'services': {'dashboard': 36073},
                                       'status': 'running',
                                       'type': 'Worker'},
             'tls://10.5.0.44:37791': {'gpu': {'memory-total': 17179869184,
                                               'name': 'Tesla V100-PCIE-16GB'},
                                       'host': '10.5.0.44',
                                       'id': 'dask-92c5978e-worker-9f9a9c9b-0',
                                       'last_seen': 1696235778.528408,
                                       'local_directory': '/tmp/dask-scratch-space/worker-7y8g_hu7',
                                       'memory_limit': 118225670144,
                                       'metrics': {'bandwidth': {'total': 100000000,
                                                                 'types': {},
                                                                 'workers': {}},
                                                   'cpu': 0.0,
                                                   'digests_total_since_heartbeat': {'latency': 0.003975629806518555,
                                                                                     'tick-duration': 0.4994323253631592},
                                                   'event_loop_interval': 0.020001530647277832,
                                                   'gpu': {'memory-used': 598867968,
                                                           'utilization': 0},
                                                   'gpu_memory_used': 598867968,
                                                   'gpu_utilization': 0,
                                                   'host_disk_io': {'read_bps': 0.0,
                                                                    'write_bps': 12589746.67130889},
                                                   'host_net_io': {'read_bps': 612.3324205749067,
                                                                   'write_bps': 3345.8163634027583},
                                                   'managed_bytes': 0,
                                                   'memory': 623104000,
                                                   'num_fds': 86,
                                                   'rmm': {'rmm-total': 0,
                                                           'rmm-used': 0},
                                                   'spilled_bytes': {'disk': 0,
                                                                     'memory': 0},
                                                   'task_counts': {},
                                                   'time': 1696235778.0250378,
                                                   'transfer': {'incoming_bytes': 0,
                                                                'incoming_count': 0,
                                                                'incoming_count_total': 0,
                                                                'outgoing_bytes': 0,
                                                                'outgoing_count': 0,
                                                                'outgoing_count_total': 0}},
                                       'name': 'dask-92c5978e-worker-9f9a9c9b-0',
                                       'nanny': 'tls://10.5.0.44:36779',
                                       'nthreads': 1,
                                       'resources': {},
                                       'services': {'dashboard': 32965},
                                       'status': 'running',
                                       'type': 'Worker'}}}

步骤 3:数据设置、清理和增强#

步骤 3.a:设置 worker 以便从 Azure Data Lake 端点读取 parquet 文件#

现在,我们将使所有 worker 能够直接从 Azure Data Lake 端点读取 parquet 文件。这需要在 worker 中使用 adlfs python 库。我们将在 client.run 中传入简单的函数 installAdlfs,该函数将在所有 worker 中安装 python 包。

from dask.distributed import PipInstall

client.register_worker_plugin(PipInstall(packages=["adlfs"]))
{'tls://10.5.0.43:36201': {'status': 'OK'},
 'tls://10.5.0.43:38107': {'status': 'OK'},
 'tls://10.5.0.44:34087': {'status': 'OK'},
 'tls://10.5.0.44:37791': {'status': 'OK'}}

步骤 3.b:数据清理、增强和持久化脚本#

首先需要对数据进行清理。我们删除一些不感兴趣的列。我们还定义了每列需要读取的数据类型。

我们还通过一些自定义函数向我们的数据框添加了一些新特征,即:

  1. Haversine 距离:用于计算总行程距离。

  2. 星期几:这对于确定票价可能是有用的信息。

add_features 函数结合这两者,生成一个包含添加特征的新数据框。

注意

在函数 persist_train_infer_split 中,我们还将测试数据集持久化到 worker 中。如果 X_infer(即测试数据集)足够小,我们可以在其上调用 compute() 将测试数据集带到本地机器,然后在其上执行预测。但通常情况下,如果 X_infer 很大,它可能无法放入本地机器的 GPU 中。此外,大量数据移动也会增加预测延迟。因此,最好将测试数据集持久化在 dask worker 上,然后对各个 worker 调用预测功能。最后,我们从 dask worker 中收集预测结果。

添加特征函数#

import math
from math import asin, cos, pi, sin, sqrt


def haversine_distance_kernel(
    pickup_latitude_r,
    pickup_longitude_r,
    dropoff_latitude_r,
    dropoff_longitude_r,
    h_distance,
    radius,
):
    for i, (x_1, y_1, x_2, y_2) in enumerate(
        zip(
            pickup_latitude_r,
            pickup_longitude_r,
            dropoff_latitude_r,
            dropoff_longitude_r,
        )
    ):
        x_1 = pi / 180 * x_1
        y_1 = pi / 180 * y_1
        x_2 = pi / 180 * x_2
        y_2 = pi / 180 * y_2

        dlon = y_2 - y_1
        dlat = x_2 - x_1
        a = sin(dlat / 2) ** 2 + cos(x_1) * cos(x_2) * sin(dlon / 2) ** 2

        c = 2 * asin(sqrt(a))
        # radius = 6371 # Radius of earth in kilometers # currently passed as input arguments

        h_distance[i] = c * radius


def day_of_the_week_kernel(day, month, year, day_of_week):
    for i, (_, _, _) in enumerate(zip(day, month, year)):
        if month[i] < 3:
            shift = month[i]
        else:
            shift = 0
        Y = year[i] - (month[i] < 3)
        y = Y - 2000
        c = 20
        d = day[i]
        m = month[i] + shift + 1
        day_of_week[i] = (d + math.floor(m * 2.6) + y + (y // 4) + (c // 4) - 2 * c) % 7


def add_features(df):
    df["hour"] = df["tpepPickupDateTime"].dt.hour
    df["year"] = df["tpepPickupDateTime"].dt.year
    df["month"] = df["tpepPickupDateTime"].dt.month
    df["day"] = df["tpepPickupDateTime"].dt.day
    df["diff"] = (
        df["tpepDropoffDateTime"] - df["tpepPickupDateTime"]
    ).dt.seconds  # convert difference between pickup and dropoff into seconds

    df["pickup_latitude_r"] = df["startLat"] // 0.01 * 0.01
    df["pickup_longitude_r"] = df["startLon"] // 0.01 * 0.01
    df["dropoff_latitude_r"] = df["endLat"] // 0.01 * 0.01
    df["dropoff_longitude_r"] = df["endLon"] // 0.01 * 0.01

    df = df.drop("tpepDropoffDateTime", axis=1)
    df = df.drop("tpepPickupDateTime", axis=1)

    df = df.apply_rows(
        haversine_distance_kernel,
        incols=[
            "pickup_latitude_r",
            "pickup_longitude_r",
            "dropoff_latitude_r",
            "dropoff_longitude_r",
        ],
        outcols=dict(h_distance=np.float32),
        kwargs=dict(radius=6371),
    )

    df = df.apply_rows(
        day_of_the_week_kernel,
        incols=["day", "month", "year"],
        outcols=dict(day_of_week=np.float32),
        kwargs=dict(),
    )

    df["is_weekend"] = df["day_of_week"] < 2
    return df

用于在 worker 中清理和持久化数据的函数。

def persist_train_infer_split(
    client,
    df,
    response_dtype,
    response_id,
    infer_frac=1.0,
    random_state=42,
    shuffle=True,
):
    workers = client.has_what().keys()
    X, y = df.drop([response_id], axis=1), df[response_id].astype("float32")
    infer_frac = max(0, min(infer_frac, 1.0))
    X_train, X_infer, y_train, y_infer = train_test_split(
        X, y, shuffle=True, random_state=random_state, test_size=infer_frac
    )

    with dask.annotate(workers=set(workers)):
        X_train, y_train = client.persist(collections=[X_train, y_train])

    if infer_frac != 1.0:
        with dask.annotate(workers=set(workers)):
            X_infer, y_infer = client.persist(collections=[X_infer, y_infer])

        wait([X_train, y_train, X_infer, y_infer])
    else:
        X_infer = X_train
        y_infer = y_train

        wait([X_train, y_train])

    return X_train, y_train, X_infer, y_infer


def clean(df_part, must_haves):
    """
    This function performs the various clean up tasks for the data
    and returns the cleaned dataframe.
    """
    # iterate through columns in this df partition
    for col in df_part.columns:
        # drop anything not in our expected list
        if col not in must_haves:
            df_part = df_part.drop(col, axis=1)
            continue

        # fixes datetime error found by Ty Mckercher and fixed by Paul Mahler
        if df_part[col].dtype == "object" and col in [
            "tpepPickupDateTime",
            "tpepDropoffDateTime",
        ]:
            df_part[col] = df_part[col].astype("datetime64[ms]")
            continue

        # if column was read as a string, recast as float
        if df_part[col].dtype == "object":
            df_part[col] = df_part[col].str.fillna("-1")
            df_part[col] = df_part[col].astype("float32")
        else:
            # downcast from 64bit to 32bit types
            # Tesla T4 are faster on 32bit ops
            if "int" in str(df_part[col].dtype):
                df_part[col] = df_part[col].astype("int32")
            if "float" in str(df_part[col].dtype):
                df_part[col] = df_part[col].astype("float32")
            df_part[col] = df_part[col].fillna(-1)

    return df_part


def taxi_data_loader(
    client,
    adlsaccount,
    adlspath,
    response_dtype=np.float32,
    infer_frac=1.0,
    random_state=0,
):
    # create a list of columns & dtypes the df must have
    must_haves = {
        "tpepPickupDateTime": "datetime64[ms]",
        "tpepDropoffDateTime": "datetime64[ms]",
        "passengerCount": "int32",
        "tripDistance": "float32",
        "startLon": "float32",
        "startLat": "float32",
        "rateCodeId": "int32",
        "endLon": "float32",
        "endLat": "float32",
        "fareAmount": "float32",
    }

    workers = client.has_what().keys()
    response_id = "fareAmount"
    storage_options = {"account_name": adlsaccount}
    taxi_data = dask_cudf.read_parquet(
        adlspath,
        storage_options=storage_options,
        chunksize=25e6,
        npartitions=len(workers),
    )
    taxi_data = clean(taxi_data, must_haves)
    taxi_data = taxi_data.map_partitions(add_features)
    # Drop NaN values and convert to float32
    taxi_data = taxi_data.dropna()
    fields = [
        "passengerCount",
        "tripDistance",
        "startLon",
        "startLat",
        "rateCodeId",
        "endLon",
        "endLat",
        "fareAmount",
        "diff",
        "h_distance",
        "day_of_week",
        "is_weekend",
    ]
    taxi_data = taxi_data.astype("float32")
    taxi_data = taxi_data[fields]
    taxi_data = taxi_data.reset_index()

    return persist_train_infer_split(
        client, taxi_data, response_dtype, response_id, infer_frac, random_state
    )

步骤 3.c:获取分割数据并在 worker 之间持久化#

出于演示目的,我们将使用 2014 年 11 月和 12 月的数据。

tic = timer()
X_train, y_train, X_infer, y_infer = taxi_data_loader(
    client,
    adlsaccount="azureopendatastorage",
    adlspath="az://nyctlc/yellow/puYear=2014/puMonth=1*/*.parquet",
    infer_frac=0.1,
    random_state=42,
)
toc = timer()
print(f"Wall clock time taken for ETL and persisting : {toc-tic} s")
X_train.shape[0].compute()
48817562

我们的训练数据集大小约为 4900 万行。让我们在本地查看数据,了解我们正在处理什么。我们看到有关于上下车经纬度、乘客人数、行程距离、星期几等的列。这些信息将用于估算行程票价金额。

X_train.head()
索引 乘客数 行程距离 起始经度 起始纬度 费率代码 ID 结束经度 结束纬度 差值 h_distance 星期几 是否周末
300446 300446 1.0 4.00 -73.984955 40.768543 1.0 -74.008789 40.719330 1324.0 5.809536e+00 6.0 0.0
163817 163817 3.0 1.93 -74.008179 40.722198 1.0 -73.992989 40.739151 840.0 1.395489e+00 2.0 0.0
236958 236958 5.0 1.10 -73.987595 40.775360 1.0 -73.976494 40.785755 360.0 1.394768e+00 5.0 0.0
73461 73461 1.0 0.76 -73.994698 40.725929 1.0 -73.994698 40.725929 180.0 1.005159e-13 5.0 0.0
294464 294464 1.0 0.60 -73.974342 40.748165 1.0 -73.982536 40.750767 229.0 1.395336e+00 6.0 0.0
X_infer
Dask DataFrame 结构
索引 乘客数 行程距离 起始经度 起始纬度 费率代码 ID 结束经度 结束纬度 差值 h_distance 星期几 是否周末
npartitions=84
int64 float32 float32 float32 float32 float32 float32 float32 float32 float32 float32 float32
... ... ... ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ... ... ...
Dask 名称:split,1 个图层

步骤 4:训练 XGBoost 模型#

现在,我们已准备好在数据上训练 XGBoost 模型,然后预测每次行程的票价。

步骤 4.a:设置训练参数#

在此训练示例中,我们将使用 RMSE 作为评估指标。还值得注意的是,执行 HPO 将获得一组更优化的超参数。

有关如何在 Azure 上执行 HPO,请参阅本仓库中的笔记本 HPO-RAPIDS

params = {
    "learning_rate": 0.15,
    "max_depth": 8,
    "objective": "reg:squarederror",
    "subsample": 0.7,
    "colsample_bytree": 0.7,
    "min_child_weight": 1,
    "gamma": 1,
    "silent": True,
    "verbose_eval": True,
    "booster": "gbtree",  # 'gblinear' not implemented in dask
    "debug_synchronize": True,
    "eval_metric": "rmse",
    "tree_method": "gpu_hist",
    "num_boost_rounds": 100,
}

步骤 4.b:训练 XGBoost 模型#

由于数据已经持久化在集群的 dask worker 中,因此后续步骤不应花费太多时间。

data_train = xgb.dask.DaskDMatrix(client, X_train, y_train)
tic = timer()
xgboost_output = xgb.dask.train(
    client, params, data_train, num_boost_round=params["num_boost_rounds"]
)
xgb_gpu_model = xgboost_output["booster"]
toc = timer()
print(f"Wall clock time taken for this cell : {toc-tic} s")
Wall clock time taken for this cell : 9.483002611901611 s

步骤 4.c:将训练好的模型保存到本地磁盘#

xgb_gpu_model
<xgboost.core.Booster at 0x7fcee8055bd0>
model_filename = "trained-model_nyctaxi.xgb"
xgb_gpu_model.save_model(model_filename)

步骤 5:使用原生 XGBoost Predict 进行预测和评分#

在这里,我们将直接使用 xgboost.dask 库提供的 predictinplace_predict 方法。稍后,我们还将使用 Forest Inference Library (FIL) 进行预测。

_y_test = y_infer.compute()
wait(_y_test)
DoneAndNotDoneFutures(done=set(), not_done=set())
d_test = xgb.dask.DaskDMatrix(client, X_infer)
tic = timer()
y_pred = xgb.dask.predict(client, xgb_gpu_model, d_test)
y_pred = y_pred.compute()
wait(y_pred)
toc = timer()
print(f"Wall clock time taken for xgb.dask.predict : {toc-tic} s")
Wall clock time taken for xgb.dask.predict : 1.5550181320868433 s

使用 dask XGBoost 的 inplace predict 方法进行推理#

tic = timer()
y_pred = xgb.dask.inplace_predict(client, xgb_gpu_model, X_infer)
y_pred = y_pred.compute()
wait(y_pred)
toc = timer()
print(f"Wall clock time taken for inplace inference : {toc-tic} s")
Wall clock time taken for inplace inference : 1.8849179210374132 s
tic = timer()
print("Calculating MSE")
score = mean_squared_error(y_pred, _y_test)
print("Workflow Complete - RMSE: ", np.sqrt(score))
toc = timer()
print(f"Wall clock time taken for this cell : {toc-tic} s")
Calculating MSE
Workflow Complete - RMSE:  2.2968235
Wall clock time taken for this cell : 0.009336891933344305 s

步骤 6:使用 FIL 或 Forest Inference Library 进行预测和评分#

Forest Inference Library (FIL) 为树模型提供 GPU 加速推理能力。我们将从 cuML 库导入 FIL 功能。

它接受 treelite 格式的已训练树模型(当前支持 LightGBM、XGBoost 以及 SKLearn GBDT 和随机森林模型)。一般来说,在使用大量 worker 时,使用 FIL 可以更快地进行推理,并且随着数据集规模的增大,延迟优势更加明显。

步骤 6.a:如果测试数据集较小,则在单个 worker 上使用 compute 进行预测。#

如步骤 3.b 中所述,如果测试数据集很大,则在 dask worker 上单独调用预测比将整个测试数据集带到本地机器更有意义。

为了在 dask worker 上单独执行预测,每个 dask worker 都需要使用 FIL 加载 XGB 模型。然而,dask worker 是远程的,无法访问本地保存的模型。因此,我们需要将本地保存的 XGB 模型发送到 dask worker。

将本地模型持久化到远程 dask worker 中#

# the code below will read the locally saved xgboost model
# in binary format and write a copy of it to all dask workers
def read_model(path):
    """Read model file into memory."""
    with open(path, "rb") as fh:
        return fh.read()


def write_model(path, data):
    """Write model file to disk."""
    with open(path, "wb") as fh:
        fh.write(data)
    return path


model_data = read_model("trained-model_nyctaxi.xgb")

# Tell all the workers to write the model to disk
client.run(write_model, "/tmp/model.dat", model_data)


# this code reads the binary file in worker directory
# and loads the model via FIL for prediction
def predict_model(input_df):
    from cuml import ForestInference

    # load xgboost model using FIL and make prediction
    fm = ForestInference.load("/tmp/model.dat", model_type="xgboost")
    print(fm)
    pred = fm.predict(input_df)

    return pred

使用 FIL 进行分布式预测推理#

tic = timer()
predictions = X_infer.map_partitions(
    predict_model, meta="float"
)  # this is like MPI reduce
y_pred = predictions.compute()
wait(y_pred)
toc = timer()
print(f"Wall clock time taken for this cell : {toc-tic} s")
rows_csv = X_infer.iloc[:, 0].shape[0].compute()
print(
    f"It took {toc-tic} seconds to predict on {rows_csv} rows using FIL distributedly on each worker"
)
It took 5.638823717948981 seconds to predict on 5426301 rows using FIL distributedly on each worker
tic = timer()
score = mean_squared_error(y_pred, _y_test)
toc = timer()
print("Final - RMSE: ", np.sqrt(score))
Final - RMSE:  2.2968235

步骤 7:清理#

client.close()
cluster.close()
Terminated VM dask-92c5978e-worker-54f8d057
Terminated VM dask-92c5978e-worker-9f9a9c9b
Removed disks for VM dask-92c5978e-worker-54f8d057
Removed disks for VM dask-92c5978e-worker-9f9a9c9b
Deleted network interface
Deleted network interface
Terminated VM dask-92c5978e-scheduler
Removed disks for VM dask-92c5978e-scheduler
Deleted network interface
Unassigned public IP