使用 dask-cloudprovider 在 Azure 上运行多节点多 GPU XGBoost 示例#
Dask Cloud Provider 是 Dask 的一个原生云集成库。它帮助管理不同云平台上的 Dask 集群。在本笔记本中,我们将探讨如何使用此软件包设置 Azure 集群,并使用 RAPIDS 运行多节点多 GPU (MNMG) 示例。RAPIDS 提供了一套完全在 GPU 上加速数据科学流程的库。正如我们将在本笔记本中看到的那样,这可以使用 Dask 扩展到多个节点。
出于本次演示的目的,我们将使用部分 NYC 出租车数据集(此处仅使用 2014 日历年的文件)。目标是根据出租车行程的时间和坐标预测给定行程的票价金额。我们将从 Azure 开放数据集下载数据,该数据集由 Microsoft 公开托管。
注意
在本笔记本中,我们将探讨使用 dask-cloudprovider
在 Azure VM 集群上运行工作负载的两种可能方法:
选项 1:使用 NVIDIA 免费提供的 Azure Marketplace 镜像。虚拟机启动后,将随后下载 RAPIDS 容器。
选项 2:使用
packer
创建用于集群的自定义 VM 镜像。此镜像将包含 RAPIDS 容器,将容器预置在镜像中应能加快集群预配过程。
您可以使用选项 1 或选项 2
步骤 0:设置 Azure 凭据和 CLI#
在运行笔记本之前,在终端中运行以下命令以设置 Azure CLI
curl -sL https://aka.ms/InstallAzureCLIDeb | sudo bash
az login
然后,按照提示上的说明完成账户设置。如果您在 Docker 容器内运行笔记本,则可以删除 sudo
。
!az login
A web browser has been opened at https://login.microsoftonline.com/organizations/oauth2/v2.0/authorize. Please continue the login in the web browser. If no web browser is available or if the web browser fails to open, use device code flow with `az login --use-device-code`.
[
{
"cloudName": "AzureCloud",
"homeTenantId": "43083d15-7273-40c1-b7db-39efd9ccc17a",
"id": "fc4f4a6b-4041-4b1c-8249-854d68edcf62",
"isDefault": true,
"managedByTenants": [
{
"tenantId": "2f4a9838-26b7-47ee-be60-ccc1fdec5953"
}
],
"name": "NV-AI-Infra",
"state": "Enabled",
"tenantId": "43083d15-7273-40c1-b7db-39efd9ccc17a",
"user": {
"name": "skirui@nvidia.com",
"type": "user"
}
}
]
步骤 1:导入必要的软件包#
# # Uncomment the following and install some libraries at the beginning.
# If adlfs is not present, install adlfs to read from Azure data lake.
! pip install adlfs
! pip install "dask-cloudprovider[azure]" --upgrade
import json
from timeit import default_timer as timer
import dask
import dask_cudf
import numpy as np
import xgboost as xgb
from cuml.metrics import mean_squared_error
from dask.distributed import Client, wait
from dask_cloudprovider.azure import AzureVMCluster
from dask_ml.model_selection import train_test_split
步骤 2:设置 Azure VM 集群#
现在,我们将按照这些说明,使用 Dask Cloud Provider 中的 AzureVMCluster
在 Azure 虚拟机上设置 Dask 集群。
为此,您首先需要在 Azure 上设置资源组、虚拟网络和安全组。了解更多关于如何设置。请注意,您也可以使用 Azure 门户进行设置。
设置完成后,您现在可以将您创建的实体名称填入下面的单元格中。
我们需要传入 docker 参数 docker_args = '--shm-size=256m'
,以允许更大的共享内存,从而成功地在同一 VM 中运行多个 docker 容器。当每个 VM 有多个 worker 时就是这种情况。即使您没有这种情况,拥有更大的共享内存也没有坏处。最后,请注意,我们使用 RAPIDS docker 镜像构建 VM,并使用 dask_cuda.CUDAWorker
在 VM 中运行。这将使用具有 GPU 能力而不是 CPU 的 worker docker 镜像运行。
location = "West US 2" resource_group = "rapidsai-deployment" vnet = "rapidsai-deployment-vnet" security_group = "rapidsaiclouddeploymenttest-nsg" vm_size = "Standard_NC12s_v3" # or choose a different GPU enabled VM type docker_image = "nvcr.io/nvidia/rapidsai/base:25.04-cuda12.8-py3.12" docker_args = "--shm-size=256m" worker_class = "dask_cuda.CUDAWorker"
选项 1:使用 Azure Marketplace VM 镜像#
在此方法中,我们可以使用 NVIDIA 免费提供的 Azure Marketplace VM。这些 VM 镜像预装了所有必要的依赖项和 NVIDIA 驱动程序。NVIDIA 提供这些镜像作为一个即插即用解决方案,以减少数据科学家的集群设置时间。对我们来说幸运的是,dask-cloudprovider
使得传入 Marketplace VM 信息变得简单,它将使用选定的 VM 镜像而不是通用镜像。
我们将使用以下镜像:适用于 AI 和 HPC 的 NVIDIA GPU 优化镜像。
注意
请确保您的 dask-cloudprovider 版本为 2021.6.0 或更高。旧版本不支持 Azure 中的 Marketplace VM。
设置 Marketplace VM 信息并清除默认的 dask 配置。#
dask.config.set(
{
"logging.distributed": "info",
"cloudprovider.azure.azurevm.marketplace_plan": {
"publisher": "nvidia",
"name": "ngc-base-version-23_03_0",
"product": "ngc_azure_17_11",
"version": "23.03.0",
},
}
)
vm_image = ""
config = dask.config.get("cloudprovider.azure.azurevm", {})
config
{'vnet': None,
'security_group': None,
'public_ingress': True,
'vm_size': 'Standard_DS1_v2',
'disk_size': 50,
'scheduler_vm_size': None,
'docker_image': 'daskdev/dask:latest',
'vm_image': {'publisher': 'Canonical',
'offer': 'UbuntuServer',
'sku': '18.04-LTS',
'version': 'latest'},
'bootstrap': True,
'auto_shutdown': True,
'marketplace_plan': {'publisher': 'nvidia',
'name': 'ngc-base-version-23_03_0',
'product': 'ngc_azure_17_11',
'version': '23.03.0'}}
如有必要,您必须取消注释并接受 Azure Marketplace 镜像条款,以便可以使用该镜像创建 VM
! az vm image terms accept --urn "nvidia:ngc_azure_17_11:ngc-base-version-23_03_0:23.03.0" --verbose
{
"accepted": true,
"id": "/subscriptions/fc4f4a6b-4041-4b1c-8249-854d68edcf62/providers/Microsoft.MarketplaceOrdering/offerTypes/Microsoft.MarketplaceOrdering/offertypes/publishers/nvidia/offers/ngc_azure_17_11/plans/ngc-base-version-23_03_0/agreements/current",
"licenseTextLink": "https://mpcprodsa.blob.core.windows.net/legalterms/3E5ED_legalterms_NVIDIA%253a24NGC%253a5FAZURE%253a5F17%253a5F11%253a24NGC%253a2DBASE%253a2DVERSION%253a2D23%253a5F03%253a5F0%253a24KJVKRIWKTRQ3CIEPNL6YTG4AVORBHHPZCDQDVWX7JPPDEF6UM7R4XO76VDRHXCNTQYATKLGYYW3KA7DSIKTYXBZ3HJ2FMWYCINEY4WQ.txt",
"marketplaceTermsLink": "https://mpcprodsa.blob.core.windows.net/marketplaceterms/3EDEF_marketplaceterms_VIRTUALMACHINE%253a24AAK2OAIZEAWW5H4MSP5KSTVB6NDKKRTUBAU23BRFTWN4YC2MQLJUB5ZEYUOUJBVF3YK34CIVPZL2HWYASPGDUY5O2FWEGRBYOXWZE5Y.txt",
"name": "ngc-base-version-23_03_0",
"plan": "ngc-base-version-23_03_0",
"privacyPolicyLink": "https://www.nvidia.com/en-us/about-nvidia/privacy-policy/",
"product": "ngc_azure_17_11",
"publisher": "nvidia",
"retrieveDatetime": "2023-10-02T08:17:40.3203275Z",
"signature": "SWCKS7PPTL3XIBGBE2IZCMF43KBRDLSIZ7XLXXTLI6SXDCPCXY53BAISH6DNIELVV63GPZ44AOMMMZ6RV2AL5ARNM6XWHXRJ4HDNTJI",
"systemData": {
"createdAt": "2023-10-02T08:17:43.219827+00:00",
"createdBy": "fc4f4a6b-4041-4b1c-8249-854d68edcf62",
"createdByType": "ManagedIdentity",
"lastModifiedAt": "2023-10-02T08:17:43.219827+00:00",
"lastModifiedBy": "fc4f4a6b-4041-4b1c-8249-854d68edcf62",
"lastModifiedByType": "ManagedIdentity"
},
"type": "Microsoft.MarketplaceOrdering/offertypes"
}
Command ran in 7.879 seconds (init: 0.159, invoke: 7.720)
现在您已经设置了使用 NVIDIA VM 镜像所需的配置,直接转到步骤 2.1 启动 AzureVMCluster。
选项 2:设置 Azure 自定义 VM#
如果您已经有一个自定义 VM 并且知道其资源 ID,请跳至选项 2 的步骤 f.
通常,如果我们使用通用镜像创建集群,则必须等待新 VM 完全预配所有依赖项。预配步骤会执行多项操作,例如使用所需库设置 VM、设置 Docker、安装 NVIDIA 驱动程序,以及拉取和解压 RAPIDS 容器等。这通常需要大约 10-15 分钟的时间,具体取决于云提供商。如果用户希望快速启动集群,每次从通用镜像设置 VM 可能不是最优的。
此外,如选项 1 中详述,我们也可以选择使用 NVIDIA 的自定义 Marketplace VM。但是,我们仍然需要下载和解压 RAPIDS 容器。因此,启动 worker 和调度器所需的设置时间仍然大约需要 8-10 分钟。
幸运的是,我们可以改进这一点。我们可以制作自己的定制 VM,其中包含所有必要的软件包、驱动程序、容器和依赖项。这样,使用定制 VM 启动集群所需的时间将最短。
在此示例中,我们将使用一个名为 packer 的工具来创建我们的自定义虚拟机镜像。Packer 可自动跨所有主要云提供商构建和定制 VM 的过程。
现在,要创建自定义 VM 镜像,请按照步骤 a. 到 f. 操作。
a. 安装 packer
#
按照入门指南,根据您的平台下载所需的二进制文件并安装它。
b. 使用 Azure 验证 packer
#
有几种方法可以验证 packer
以便与 Azure 配合使用(详细信息请参阅此处)。然而,由于我们已经在笔记本开始时安装了 Azure cli (az
),使用 az
cli 验证 packer
是最简单的选项。我们将让 packer
使用 az
cli 中的 Azure 凭据,因此,您在此步骤中无需再做任何事情。
c. 生成用于定制 VM 镜像的 cloud init 脚本#
packer
可以使用 cloud-init 脚本来初始化 VM。cloud init 脚本包含将设置我们的自定义 VM 环境的命令集。我们将通过配置文件将其作为外部文件传递给 packer
命令。
cloud init 文件 cloud_init.yaml.j2 位于 configs
文件夹中。如果您想添加/修改任何配置,请在继续下一步之前编辑 cloud_init.yaml.j2。
d. 将 packer 配置写入配置文件#
现在我们需要为 packer
提供一个包含平台相关和 cloud-init 配置的构建文件。packer
将使用此文件创建自定义 VM。
在此示例中,我们将创建一个单个自定义 VM 镜像,仅供用户访问。我们将使用 Ubuntu Server 18.04 基础镜像并对其进行定制。稍后,我们将从此自定义 VM 镜像实例化所有 VM。
如果您对还可以配置哪些内容感到好奇,请查看 packer 的所有可用 Azure 构建参数。
注意
在此示例中,我们的资源组已存在。因此,我们只需在必需参数 managed_image_resource_group_name
和 build_resource_group_name
中传入我们的资源组名称即可。
custom_vm_image_name = "FILL-THIS-IN"
packer_config = {
"builders": [
{
"type": "azure-arm",
"use_azure_cli_auth": True,
"managed_image_resource_group_name": resource_group,
"managed_image_name": custom_vm_image_name,
"custom_data_file": "./configs/cloud_init.yaml.j2",
"os_type": "Linux",
"image_publisher": "Canonical",
"image_offer": "UbuntuServer",
"image_sku": "18.04-LTS",
"azure_tags": {
"dept": "RAPIDS-CSP",
"task": "RAPIDS Custom Image deployment",
},
"build_resource_group_name": resource_group,
"vm_size": vm_size,
}
],
"provisioners": [
{
"inline": [
(
"echo 'Waiting for cloud-init'; "
"while [ ! -f /var/lib/cloud/instance/boot-finished ]; "
"do sleep 1; done; echo 'Done'"
)
],
"type": "shell",
}
],
}
with open("packer_config.json", "w") as fh:
fh.write(json.dumps(packer_config))
e. 运行 packer
build 并创建镜像#
# # Uncomment the following line and run to create the custom image
# ! packer build packer_config.json
这将需要大约 15 分钟。去喝杯咖啡或看一集您喜欢的电视节目,然后回来。但请记住,您只需执行此操作一次,除非您想更新 VM 中的软件包。这意味着您可以一次创建此自定义镜像,然后持续使用数百次。
当 packer
正在构建镜像时,您将看到类似于下面所示的输出。
$ packer build packer_config.json
azure-arm: output will be in this color.
==> azure-arm: Running builder ...
==> azure-arm: Getting tokens using Azure CLI
==> azure-arm: Getting tokens using Azure CLI
azure-arm: Creating Azure Resource Manager (ARM) client ...
==> azure-arm: Using existing resource group ...
==> azure-arm: -> ResourceGroupName : <your resource group>
==> azure-arm: -> Location : <some chosen location>
==> azure-arm: Validating deployment template ...
==> azure-arm: -> ResourceGroupName : <your resource group>
==> azure-arm: -> DeploymentName : 'pkrdp04rrahxkg9'
==> azure-arm: Deploying deployment template ...
==> azure-arm: -> ResourceGroupName : <your resource group>
==> azure-arm: -> DeploymentName : 'pkrdp04rrahxkg9'
==> azure-arm:
==> azure-arm: Getting the VM's IP address ...
==> azure-arm: -> ResourceGroupName : <your resource group>
==> azure-arm: -> PublicIPAddressName : 'pkrip04rrahxkg9'
==> azure-arm: -> NicName : 'pkrni04rrahxkg9'
==> azure-arm: -> Network Connection : 'PublicEndpoint'
==> azure-arm: -> IP Address : '40.77.62.118'
==> azure-arm: Waiting for SSH to become available...
==> azure-arm: Connected to SSH!
==> azure-arm: Provisioning with shell script: /tmp/packer-shell614221056
azure-arm: Waiting for cloud-init
azure-arm: Done
==> azure-arm: Querying the machine's properties ...
==> azure-arm: -> ResourceGroupName : <your resource group>
==> azure-arm: -> ComputeName : 'pkrvm04rrahxkg9'
==> azure-arm: -> Managed OS Disk : '/subscriptions/<your subscription id>/resourceGroups/<your resource group>/providers/Microsoft.Compute/disks/pkros04rrahxkg9'
==> azure-arm: Querying the machine's additional disks properties ...
==> azure-arm: -> ResourceGroupName : <your resource group>
==> azure-arm: -> ComputeName : 'pkrvm04rrahxkg9'
==> azure-arm: Powering off machine ...
==> azure-arm: -> ResourceGroupName : <your resource group>
==> azure-arm: -> ComputeName : 'pkrvm04rrahxkg9'
==> azure-arm: Capturing image ...
==> azure-arm: -> Compute ResourceGroupName : <your resource group>
==> azure-arm: -> Compute Name : 'pkrvm04rrahxkg9'
==> azure-arm: -> Compute Location : <some chosen location>
==> azure-arm: -> Image ResourceGroupName : <your resource group>
==> azure-arm: -> Image Name : <your chosen custom image name>
==> azure-arm: -> Image Location : <some chosen location>
==> azure-arm:
==> azure-arm: Deleting individual resources ...
==> azure-arm: Adding to deletion queue -> Microsoft.Compute/virtualMachines : 'pkrvm04rrahxkg9'
==> azure-arm: Adding to deletion queue -> Microsoft.Network/networkInterfaces : 'pkrni04rrahxkg9'
==> azure-arm: Adding to deletion queue -> Microsoft.Network/publicIPAddresses : 'pkrip04rrahxkg9'
==> azure-arm: Adding to deletion queue -> Microsoft.Network/virtualNetworks : 'pkrvn04rrahxkg9'
==> azure-arm: Attempting deletion -> Microsoft.Network/networkInterfaces : 'pkrni04rrahxkg9'
==> azure-arm: Waiting for deletion of all resources...
==> azure-arm: Attempting deletion -> Microsoft.Network/publicIPAddresses : 'pkrip04rrahxkg9'
==> azure-arm: Attempting deletion -> Microsoft.Compute/virtualMachines : 'pkrvm04rrahxkg9'
==> azure-arm: Attempting deletion -> Microsoft.Network/virtualNetworks : 'pkrvn04rrahxkg9'
.
.
.
.
.
.
.
.
==> azure-arm: Deleting -> Microsoft.Compute/disks : '/subscriptions/<your subscription id>/resourceGroups/<your resource group>/providers/Microsoft.Compute/disks/pkros04rrahxkg9'
==> azure-arm: Removing the created Deployment object: 'pkrdp04rrahxkg9'
==> azure-arm:
==> azure-arm: The resource group was not created by Packer, not deleting ...
Build 'azure-arm' finished after 16 minutes 22 seconds.
==> Wait completed after 16 minutes 22 seconds
==> Builds finished. The artifacts of successful builds are:
--> azure-arm: Azure.ResourceManagement.VMImage:
OSType: Linux
ManagedImageResourceGroupName: <your resource group>
ManagedImageName: <your chosen custom image name>
ManagedImageId: /subscriptions/<your subscription id>/resourceGroups/<your resource group>/providers/Microsoft.Compute/images/<your chosen custom image name>
ManagedImageLocation: <some chosen location>
当 packer
完成时,在输出底部,您将看到类似于以下内容的信息
ManagedImageResourceGroupName: <your resource group>
ManagedImageName: <your chosen custom image name>
ManagedImageId: /subscriptions/<your subscription id>/resourceGroups/<your resource group>/providers/Microsoft.Compute/images/<your chosen custom image name>
ManagedImageLocation: <some chosen location>
记下 ManagedImageId
。这是我们将使用的自定义镜像的资源 ID。
如上所示,ManagedImageId
将类似于:/subscriptions/12345/resourceGroups/myown-rg/providers/Microsoft.Compute/images/myCustomImage
f. 设置自定义 VM 信息并清除默认的 dask 配置#
获取自定义 VM 资源 ID 后,您应该在 dask.config
中重置默认 VM 镜像信息。dask.config
中加载的默认镜像值是基本的 Ubuntu Server 18.04 LTS(即您已定制的那个)。如果您不重置它,dask
将尝试使用该镜像而不是您定制的镜像。
# fill this in with the value from above
# or the customized VM id if you already have resource id of the customized VM from a previous run.
ManagedImageId = "FILL-THIS-IN"
dask.config.set({"cloudprovider.azure.azurevm.vm_image": {}})
config = dask.config.get("cloudprovider.azure.azurevm", {})
print(config)
vm_image = {"id": ManagedImageId}
print(vm_image)
步骤 2.1:在 Azure 中启动 VM 集群#
此处,如果您使用了选项 1,即 NVIDIA VM 镜像,请为 vm_image
信息传递一个空字符串。
对于选项 2,将从 packer
运行输出中获取的 vm_image
信息作为参数传递给 AzureVMCluster
。
此外,通过传递 bootstrap=False
来关闭 VM 的自举。这将关闭在实例化 VM 时安装依赖项,因为在两种情况下我们都已经在自定义 VM 上安装了它们。
注意
笔记本的其余部分应该与您选择选项 1 还是选项 2 无关,都是相同的。
注意
我们的集群拥有的实际 worker 数量并不总是等于创建的 VM 数量,即传入的 \(n\_workers\) 值。如果所选 vm_size
中的 GPU 数量为 \(G\),创建的 VM 数量为 \(n\_workers\),那么实际 worker 数量 \(W = n\_workers \times G\)。例如,对于每台 VM 有 2 个 V100 GPU 的 Standard_NC12s_v3
VM,当 \(n\_workers=2\) 时,我们有 \(W = 2 \times 2=4\)。
%%time
cluster = AzureVMCluster(
location=location,
resource_group=resource_group,
vnet=vnet,
security_group=security_group,
vm_image=vm_image,
vm_size=vm_size,
disk_size=200,
docker_image=docker_image,
worker_class=worker_class,
n_workers=2,
security=True,
docker_args=docker_args,
debug=False,
bootstrap=False, # This is to prevent the cloud init jinja2 script from running in the custom VM.
)
client = Client(cluster)
client
客户端
Client-b8982284-60fe-11ee-a1e9-80e82cd32958
连接方法: 集群对象 | 集群类型: dask_cloudprovider.AzureVMCluster |
仪表板: http://4.155.2.188:8787/status |
集群信息
AzureVMCluster
e0c95e38
仪表板: http://4.155.2.188:8787/status | Workers 4 |
总线程数 4 | 总内存: 440.42 GiB |
调度器信息
调度器
Scheduler-3bae5a4d-29d1-4317-bbfc-931e97a077fb
通信: tls://10.5.0.42:8786 | Workers 4 |
仪表板: http://10.5.0.42:8787/status | 总线程数 4 |
启动于: 12 分钟前 | 总内存: 440.42 GiB |
Workers
Worker: dask-92c5978e-worker-54f8d057-0
通信: tls://10.5.0.43:38107 | 总线程数 1 |
仪表板: http://10.5.0.43:45421/status | 内存: 110.11 GiB |
Nanny: tls://10.5.0.43:33657 | |
本地目录: /tmp/dask-scratch-space/worker-ix8y4_eg | |
GPU: Tesla V100-PCIE-16GB | GPU 内存: 16.00 GiB |
Worker: dask-92c5978e-worker-54f8d057-1
通信: tls://10.5.0.43:36201 | 总线程数 1 |
仪表板: http://10.5.0.43:44817/status | 内存: 110.11 GiB |
Nanny: tls://10.5.0.43:42265 | |
本地目录: /tmp/dask-scratch-space/worker-6bghw_yx | |
GPU: Tesla V100-PCIE-16GB | GPU 内存: 16.00 GiB |
Worker: dask-92c5978e-worker-9f9a9c9b-0
通信: tls://10.5.0.44:37791 | 总线程数 1 |
仪表板: http://10.5.0.44:32965/status | 内存: 110.11 GiB |
Nanny: tls://10.5.0.44:36779 | |
本地目录: /tmp/dask-scratch-space/worker-7y8g_hu7 | |
GPU: Tesla V100-PCIE-16GB | GPU 内存: 16.00 GiB |
正在执行的任务 | 内存中的任务 |
就绪任务 | 正在传输的任务 |
CPU 使用率 0.0% | 上次看到: 6 分钟前 |
内存使用率: 329.71 MiB | 溢出字节: 0 B |
读取字节: 0.0 B | 写入字节: 0.0 B |
Worker: dask-92c5978e-worker-9f9a9c9b-1
通信: tls://10.5.0.44:34087 | 总线程数 1 |
仪表板: http://10.5.0.44:36073/status | 内存: 110.11 GiB |
Nanny: tls://10.5.0.44:37979 | |
本地目录: /tmp/dask-scratch-space/worker-1d7vbddw | |
GPU: Tesla V100-PCIE-16GB | GPU 内存: 16.00 GiB |
正在执行的任务 | 内存中的任务 |
就绪任务 | 正在传输的任务 |
CPU 使用率 0.0% | 上次看到: 6 分钟前 |
内存使用率: 329.50 MiB | 溢出字节: 0 B |
读取字节: 0.0 B | 写入字节: 0.0 B |
%%time
client.wait_for_workers(2)
CPU times: user 0 ns, sys: 6.1 ms, total: 6.1 ms
Wall time: 29 ms
# Uncomment if you only have the scheduler with n_workers=0 and want to scale the workers separately.
# %%time
# client.cluster.scale(n_workers)
等待所有 worker 启动。这将等待 n_workers
数量的 VM 启动。
在开始训练过程之前,让我们快速查看一下我们将使用的 worker pod 中 GPU 的详细信息。
import pprint
pp = pprint.PrettyPrinter()
pp.pprint(
client.scheduler_info()
) # will show some information of the GPUs of the workers
{'address': 'tls://10.5.0.42:8786',
'id': 'Scheduler-3bae5a4d-29d1-4317-bbfc-931e97a077fb',
'services': {'dashboard': 8787},
'started': 1696235012.5914223,
'type': 'Scheduler',
'workers': {'tls://10.5.0.43:36201': {'gpu': {'memory-total': 17179869184,
'name': 'Tesla V100-PCIE-16GB'},
'host': '10.5.0.43',
'id': 'dask-92c5978e-worker-54f8d057-1',
'last_seen': 1696235778.2340653,
'local_directory': '/tmp/dask-scratch-space/worker-6bghw_yx',
'memory_limit': 118225670144,
'metrics': {'bandwidth': {'total': 100000000,
'types': {},
'workers': {}},
'cpu': 4.0,
'digests_total_since_heartbeat': {'latency': 0.004627227783203125,
'tick-duration': 0.5006744861602783},
'event_loop_interval': 0.019985613822937013,
'gpu': {'memory-used': 598867968,
'utilization': 0},
'gpu_memory_used': 598867968,
'gpu_utilization': 0,
'host_disk_io': {'read_bps': 0.0,
'write_bps': 0.0},
'host_net_io': {'read_bps': 612.42422993883,
'write_bps': 3346.3180145677247},
'managed_bytes': 0,
'memory': 623116288,
'num_fds': 86,
'rmm': {'rmm-total': 0,
'rmm-used': 0},
'spilled_bytes': {'disk': 0,
'memory': 0},
'task_counts': {},
'time': 1696235777.730071,
'transfer': {'incoming_bytes': 0,
'incoming_count': 0,
'incoming_count_total': 0,
'outgoing_bytes': 0,
'outgoing_count': 0,
'outgoing_count_total': 0}},
'name': 'dask-92c5978e-worker-54f8d057-1',
'nanny': 'tls://10.5.0.43:42265',
'nthreads': 1,
'resources': {},
'services': {'dashboard': 44817},
'status': 'running',
'type': 'Worker'},
'tls://10.5.0.43:38107': {'gpu': {'memory-total': 17179869184,
'name': 'Tesla V100-PCIE-16GB'},
'host': '10.5.0.43',
'id': 'dask-92c5978e-worker-54f8d057-0',
'last_seen': 1696235778.2329032,
'local_directory': '/tmp/dask-scratch-space/worker-ix8y4_eg',
'memory_limit': 118225670144,
'metrics': {'bandwidth': {'total': 100000000,
'types': {},
'workers': {}},
'cpu': 2.0,
'digests_total_since_heartbeat': {'latency': 0.004603147506713867,
'tick-duration': 0.4996976852416992},
'event_loop_interval': 0.019999494552612306,
'gpu': {'memory-used': 598867968,
'utilization': 0},
'gpu_memory_used': 598867968,
'gpu_utilization': 0,
'host_disk_io': {'read_bps': 0.0,
'write_bps': 0.0},
'host_net_io': {'read_bps': 611.5250712835996,
'write_bps': 3341.404964660714},
'managed_bytes': 0,
'memory': 623882240,
'num_fds': 86,
'rmm': {'rmm-total': 0,
'rmm-used': 0},
'spilled_bytes': {'disk': 0,
'memory': 0},
'task_counts': {},
'time': 1696235777.729443,
'transfer': {'incoming_bytes': 0,
'incoming_count': 0,
'incoming_count_total': 0,
'outgoing_bytes': 0,
'outgoing_count': 0,
'outgoing_count_total': 0}},
'name': 'dask-92c5978e-worker-54f8d057-0',
'nanny': 'tls://10.5.0.43:33657',
'nthreads': 1,
'resources': {},
'services': {'dashboard': 45421},
'status': 'running',
'type': 'Worker'},
'tls://10.5.0.44:34087': {'gpu': {'memory-total': 17179869184,
'name': 'Tesla V100-PCIE-16GB'},
'host': '10.5.0.44',
'id': 'dask-92c5978e-worker-9f9a9c9b-1',
'last_seen': 1696235778.5268767,
'local_directory': '/tmp/dask-scratch-space/worker-1d7vbddw',
'memory_limit': 118225670144,
'metrics': {'bandwidth': {'total': 100000000,
'types': {},
'workers': {}},
'cpu': 0.0,
'digests_total_since_heartbeat': {'latency': 0.004075765609741211,
'tick-duration': 0.4998819828033447},
'event_loop_interval': 0.02001068115234375,
'gpu': {'memory-used': 598867968,
'utilization': 0},
'gpu_memory_used': 598867968,
'gpu_utilization': 0,
'host_disk_io': {'read_bps': 0.0,
'write_bps': 12597732.652975753},
'host_net_io': {'read_bps': 612.7208378808626,
'write_bps': 3347.938695871903},
'managed_bytes': 0,
'memory': 624406528,
'num_fds': 86,
'rmm': {'rmm-total': 0,
'rmm-used': 0},
'spilled_bytes': {'disk': 0,
'memory': 0},
'task_counts': {},
'time': 1696235778.023989,
'transfer': {'incoming_bytes': 0,
'incoming_count': 0,
'incoming_count_total': 0,
'outgoing_bytes': 0,
'outgoing_count': 0,
'outgoing_count_total': 0}},
'name': 'dask-92c5978e-worker-9f9a9c9b-1',
'nanny': 'tls://10.5.0.44:37979',
'nthreads': 1,
'resources': {},
'services': {'dashboard': 36073},
'status': 'running',
'type': 'Worker'},
'tls://10.5.0.44:37791': {'gpu': {'memory-total': 17179869184,
'name': 'Tesla V100-PCIE-16GB'},
'host': '10.5.0.44',
'id': 'dask-92c5978e-worker-9f9a9c9b-0',
'last_seen': 1696235778.528408,
'local_directory': '/tmp/dask-scratch-space/worker-7y8g_hu7',
'memory_limit': 118225670144,
'metrics': {'bandwidth': {'total': 100000000,
'types': {},
'workers': {}},
'cpu': 0.0,
'digests_total_since_heartbeat': {'latency': 0.003975629806518555,
'tick-duration': 0.4994323253631592},
'event_loop_interval': 0.020001530647277832,
'gpu': {'memory-used': 598867968,
'utilization': 0},
'gpu_memory_used': 598867968,
'gpu_utilization': 0,
'host_disk_io': {'read_bps': 0.0,
'write_bps': 12589746.67130889},
'host_net_io': {'read_bps': 612.3324205749067,
'write_bps': 3345.8163634027583},
'managed_bytes': 0,
'memory': 623104000,
'num_fds': 86,
'rmm': {'rmm-total': 0,
'rmm-used': 0},
'spilled_bytes': {'disk': 0,
'memory': 0},
'task_counts': {},
'time': 1696235778.0250378,
'transfer': {'incoming_bytes': 0,
'incoming_count': 0,
'incoming_count_total': 0,
'outgoing_bytes': 0,
'outgoing_count': 0,
'outgoing_count_total': 0}},
'name': 'dask-92c5978e-worker-9f9a9c9b-0',
'nanny': 'tls://10.5.0.44:36779',
'nthreads': 1,
'resources': {},
'services': {'dashboard': 32965},
'status': 'running',
'type': 'Worker'}}}
步骤 3:数据设置、清理和增强#
步骤 3.a:设置 worker 以便从 Azure Data Lake 端点读取 parquet 文件#
现在,我们将使所有 worker 能够直接从 Azure Data Lake 端点读取 parquet
文件。这需要在 worker 中使用 adlfs
python 库。我们将在 client.run
中传入简单的函数 installAdlfs
,该函数将在所有 worker 中安装 python 包。
from dask.distributed import PipInstall
client.register_worker_plugin(PipInstall(packages=["adlfs"]))
{'tls://10.5.0.43:36201': {'status': 'OK'},
'tls://10.5.0.43:38107': {'status': 'OK'},
'tls://10.5.0.44:34087': {'status': 'OK'},
'tls://10.5.0.44:37791': {'status': 'OK'}}
步骤 3.b:数据清理、增强和持久化脚本#
首先需要对数据进行清理。我们删除一些不感兴趣的列。我们还定义了每列需要读取的数据类型。
我们还通过一些自定义函数向我们的数据框添加了一些新特征,即:
Haversine 距离:用于计算总行程距离。
星期几:这对于确定票价可能是有用的信息。
add_features
函数结合这两者,生成一个包含添加特征的新数据框。
注意
在函数 persist_train_infer_split
中,我们还将测试数据集持久化到 worker 中。如果 X_infer
(即测试数据集)足够小,我们可以在其上调用 compute()
将测试数据集带到本地机器,然后在其上执行预测。但通常情况下,如果 X_infer
很大,它可能无法放入本地机器的 GPU 中。此外,大量数据移动也会增加预测延迟。因此,最好将测试数据集持久化在 dask worker 上,然后对各个 worker 调用预测功能。最后,我们从 dask worker 中收集预测结果。
添加特征函数#
import math
from math import asin, cos, pi, sin, sqrt
def haversine_distance_kernel(
pickup_latitude_r,
pickup_longitude_r,
dropoff_latitude_r,
dropoff_longitude_r,
h_distance,
radius,
):
for i, (x_1, y_1, x_2, y_2) in enumerate(
zip(
pickup_latitude_r,
pickup_longitude_r,
dropoff_latitude_r,
dropoff_longitude_r,
)
):
x_1 = pi / 180 * x_1
y_1 = pi / 180 * y_1
x_2 = pi / 180 * x_2
y_2 = pi / 180 * y_2
dlon = y_2 - y_1
dlat = x_2 - x_1
a = sin(dlat / 2) ** 2 + cos(x_1) * cos(x_2) * sin(dlon / 2) ** 2
c = 2 * asin(sqrt(a))
# radius = 6371 # Radius of earth in kilometers # currently passed as input arguments
h_distance[i] = c * radius
def day_of_the_week_kernel(day, month, year, day_of_week):
for i, (_, _, _) in enumerate(zip(day, month, year)):
if month[i] < 3:
shift = month[i]
else:
shift = 0
Y = year[i] - (month[i] < 3)
y = Y - 2000
c = 20
d = day[i]
m = month[i] + shift + 1
day_of_week[i] = (d + math.floor(m * 2.6) + y + (y // 4) + (c // 4) - 2 * c) % 7
def add_features(df):
df["hour"] = df["tpepPickupDateTime"].dt.hour
df["year"] = df["tpepPickupDateTime"].dt.year
df["month"] = df["tpepPickupDateTime"].dt.month
df["day"] = df["tpepPickupDateTime"].dt.day
df["diff"] = (
df["tpepDropoffDateTime"] - df["tpepPickupDateTime"]
).dt.seconds # convert difference between pickup and dropoff into seconds
df["pickup_latitude_r"] = df["startLat"] // 0.01 * 0.01
df["pickup_longitude_r"] = df["startLon"] // 0.01 * 0.01
df["dropoff_latitude_r"] = df["endLat"] // 0.01 * 0.01
df["dropoff_longitude_r"] = df["endLon"] // 0.01 * 0.01
df = df.drop("tpepDropoffDateTime", axis=1)
df = df.drop("tpepPickupDateTime", axis=1)
df = df.apply_rows(
haversine_distance_kernel,
incols=[
"pickup_latitude_r",
"pickup_longitude_r",
"dropoff_latitude_r",
"dropoff_longitude_r",
],
outcols=dict(h_distance=np.float32),
kwargs=dict(radius=6371),
)
df = df.apply_rows(
day_of_the_week_kernel,
incols=["day", "month", "year"],
outcols=dict(day_of_week=np.float32),
kwargs=dict(),
)
df["is_weekend"] = df["day_of_week"] < 2
return df
用于在 worker 中清理和持久化数据的函数。
def persist_train_infer_split(
client,
df,
response_dtype,
response_id,
infer_frac=1.0,
random_state=42,
shuffle=True,
):
workers = client.has_what().keys()
X, y = df.drop([response_id], axis=1), df[response_id].astype("float32")
infer_frac = max(0, min(infer_frac, 1.0))
X_train, X_infer, y_train, y_infer = train_test_split(
X, y, shuffle=True, random_state=random_state, test_size=infer_frac
)
with dask.annotate(workers=set(workers)):
X_train, y_train = client.persist(collections=[X_train, y_train])
if infer_frac != 1.0:
with dask.annotate(workers=set(workers)):
X_infer, y_infer = client.persist(collections=[X_infer, y_infer])
wait([X_train, y_train, X_infer, y_infer])
else:
X_infer = X_train
y_infer = y_train
wait([X_train, y_train])
return X_train, y_train, X_infer, y_infer
def clean(df_part, must_haves):
"""
This function performs the various clean up tasks for the data
and returns the cleaned dataframe.
"""
# iterate through columns in this df partition
for col in df_part.columns:
# drop anything not in our expected list
if col not in must_haves:
df_part = df_part.drop(col, axis=1)
continue
# fixes datetime error found by Ty Mckercher and fixed by Paul Mahler
if df_part[col].dtype == "object" and col in [
"tpepPickupDateTime",
"tpepDropoffDateTime",
]:
df_part[col] = df_part[col].astype("datetime64[ms]")
continue
# if column was read as a string, recast as float
if df_part[col].dtype == "object":
df_part[col] = df_part[col].str.fillna("-1")
df_part[col] = df_part[col].astype("float32")
else:
# downcast from 64bit to 32bit types
# Tesla T4 are faster on 32bit ops
if "int" in str(df_part[col].dtype):
df_part[col] = df_part[col].astype("int32")
if "float" in str(df_part[col].dtype):
df_part[col] = df_part[col].astype("float32")
df_part[col] = df_part[col].fillna(-1)
return df_part
def taxi_data_loader(
client,
adlsaccount,
adlspath,
response_dtype=np.float32,
infer_frac=1.0,
random_state=0,
):
# create a list of columns & dtypes the df must have
must_haves = {
"tpepPickupDateTime": "datetime64[ms]",
"tpepDropoffDateTime": "datetime64[ms]",
"passengerCount": "int32",
"tripDistance": "float32",
"startLon": "float32",
"startLat": "float32",
"rateCodeId": "int32",
"endLon": "float32",
"endLat": "float32",
"fareAmount": "float32",
}
workers = client.has_what().keys()
response_id = "fareAmount"
storage_options = {"account_name": adlsaccount}
taxi_data = dask_cudf.read_parquet(
adlspath,
storage_options=storage_options,
chunksize=25e6,
npartitions=len(workers),
)
taxi_data = clean(taxi_data, must_haves)
taxi_data = taxi_data.map_partitions(add_features)
# Drop NaN values and convert to float32
taxi_data = taxi_data.dropna()
fields = [
"passengerCount",
"tripDistance",
"startLon",
"startLat",
"rateCodeId",
"endLon",
"endLat",
"fareAmount",
"diff",
"h_distance",
"day_of_week",
"is_weekend",
]
taxi_data = taxi_data.astype("float32")
taxi_data = taxi_data[fields]
taxi_data = taxi_data.reset_index()
return persist_train_infer_split(
client, taxi_data, response_dtype, response_id, infer_frac, random_state
)
步骤 3.c:获取分割数据并在 worker 之间持久化#
出于演示目的,我们将使用 2014 年 11 月和 12 月的数据。
tic = timer()
X_train, y_train, X_infer, y_infer = taxi_data_loader(
client,
adlsaccount="azureopendatastorage",
adlspath="az://nyctlc/yellow/puYear=2014/puMonth=1*/*.parquet",
infer_frac=0.1,
random_state=42,
)
toc = timer()
print(f"Wall clock time taken for ETL and persisting : {toc-tic} s")
X_train.shape[0].compute()
48817562
我们的训练数据集大小约为 4900 万行。让我们在本地查看数据,了解我们正在处理什么。我们看到有关于上下车经纬度、乘客人数、行程距离、星期几等的列。这些信息将用于估算行程票价金额。
X_train.head()
索引 | 乘客数 | 行程距离 | 起始经度 | 起始纬度 | 费率代码 ID | 结束经度 | 结束纬度 | 差值 | h_distance | 星期几 | 是否周末 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|
300446 | 300446 | 1.0 | 4.00 | -73.984955 | 40.768543 | 1.0 | -74.008789 | 40.719330 | 1324.0 | 5.809536e+00 | 6.0 | 0.0 |
163817 | 163817 | 3.0 | 1.93 | -74.008179 | 40.722198 | 1.0 | -73.992989 | 40.739151 | 840.0 | 1.395489e+00 | 2.0 | 0.0 |
236958 | 236958 | 5.0 | 1.10 | -73.987595 | 40.775360 | 1.0 | -73.976494 | 40.785755 | 360.0 | 1.394768e+00 | 5.0 | 0.0 |
73461 | 73461 | 1.0 | 0.76 | -73.994698 | 40.725929 | 1.0 | -73.994698 | 40.725929 | 180.0 | 1.005159e-13 | 5.0 | 0.0 |
294464 | 294464 | 1.0 | 0.60 | -73.974342 | 40.748165 | 1.0 | -73.982536 | 40.750767 | 229.0 | 1.395336e+00 | 6.0 | 0.0 |
X_infer
索引 | 乘客数 | 行程距离 | 起始经度 | 起始纬度 | 费率代码 ID | 结束经度 | 结束纬度 | 差值 | h_distance | 星期几 | 是否周末 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|
npartitions=84 | ||||||||||||
int64 | float32 | float32 | float32 | float32 | float32 | float32 | float32 | float32 | float32 | float32 | float32 | |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
步骤 4:训练 XGBoost 模型#
现在,我们已准备好在数据上训练 XGBoost 模型,然后预测每次行程的票价。
步骤 4.a:设置训练参数#
在此训练示例中,我们将使用 RMSE 作为评估指标。还值得注意的是,执行 HPO 将获得一组更优化的超参数。
有关如何在 Azure 上执行 HPO,请参阅本仓库中的笔记本 HPO-RAPIDS。
params = {
"learning_rate": 0.15,
"max_depth": 8,
"objective": "reg:squarederror",
"subsample": 0.7,
"colsample_bytree": 0.7,
"min_child_weight": 1,
"gamma": 1,
"silent": True,
"verbose_eval": True,
"booster": "gbtree", # 'gblinear' not implemented in dask
"debug_synchronize": True,
"eval_metric": "rmse",
"tree_method": "gpu_hist",
"num_boost_rounds": 100,
}
步骤 4.b:训练 XGBoost 模型#
由于数据已经持久化在集群的 dask worker 中,因此后续步骤不应花费太多时间。
data_train = xgb.dask.DaskDMatrix(client, X_train, y_train)
tic = timer()
xgboost_output = xgb.dask.train(
client, params, data_train, num_boost_round=params["num_boost_rounds"]
)
xgb_gpu_model = xgboost_output["booster"]
toc = timer()
print(f"Wall clock time taken for this cell : {toc-tic} s")
Wall clock time taken for this cell : 9.483002611901611 s
步骤 4.c:将训练好的模型保存到本地磁盘#
xgb_gpu_model
<xgboost.core.Booster at 0x7fcee8055bd0>
model_filename = "trained-model_nyctaxi.xgb"
xgb_gpu_model.save_model(model_filename)
步骤 5:使用原生 XGBoost Predict 进行预测和评分#
在这里,我们将直接使用 xgboost.dask
库提供的 predict
和 inplace_predict
方法。稍后,我们还将使用 Forest Inference Library (FIL) 进行预测。
_y_test = y_infer.compute()
wait(_y_test)
DoneAndNotDoneFutures(done=set(), not_done=set())
d_test = xgb.dask.DaskDMatrix(client, X_infer)
tic = timer()
y_pred = xgb.dask.predict(client, xgb_gpu_model, d_test)
y_pred = y_pred.compute()
wait(y_pred)
toc = timer()
print(f"Wall clock time taken for xgb.dask.predict : {toc-tic} s")
Wall clock time taken for xgb.dask.predict : 1.5550181320868433 s
使用 dask XGBoost 的 inplace predict 方法进行推理#
tic = timer()
y_pred = xgb.dask.inplace_predict(client, xgb_gpu_model, X_infer)
y_pred = y_pred.compute()
wait(y_pred)
toc = timer()
print(f"Wall clock time taken for inplace inference : {toc-tic} s")
Wall clock time taken for inplace inference : 1.8849179210374132 s
tic = timer()
print("Calculating MSE")
score = mean_squared_error(y_pred, _y_test)
print("Workflow Complete - RMSE: ", np.sqrt(score))
toc = timer()
print(f"Wall clock time taken for this cell : {toc-tic} s")
Calculating MSE
Workflow Complete - RMSE: 2.2968235
Wall clock time taken for this cell : 0.009336891933344305 s
步骤 6:使用 FIL 或 Forest Inference Library 进行预测和评分#
Forest Inference Library (FIL) 为树模型提供 GPU 加速推理能力。我们将从 cuML 库导入 FIL 功能。
它接受 treelite 格式的已训练树模型(当前支持 LightGBM、XGBoost 以及 SKLearn GBDT 和随机森林模型)。一般来说,在使用大量 worker 时,使用 FIL 可以更快地进行推理,并且随着数据集规模的增大,延迟优势更加明显。
步骤 6.a:如果测试数据集较小,则在单个 worker 上使用 compute
进行预测。#
如步骤 3.b 中所述,如果测试数据集很大,则在 dask worker 上单独调用预测比将整个测试数据集带到本地机器更有意义。
为了在 dask worker 上单独执行预测,每个 dask worker 都需要使用 FIL 加载 XGB 模型。然而,dask worker 是远程的,无法访问本地保存的模型。因此,我们需要将本地保存的 XGB 模型发送到 dask worker。
将本地模型持久化到远程 dask worker 中#
# the code below will read the locally saved xgboost model
# in binary format and write a copy of it to all dask workers
def read_model(path):
"""Read model file into memory."""
with open(path, "rb") as fh:
return fh.read()
def write_model(path, data):
"""Write model file to disk."""
with open(path, "wb") as fh:
fh.write(data)
return path
model_data = read_model("trained-model_nyctaxi.xgb")
# Tell all the workers to write the model to disk
client.run(write_model, "/tmp/model.dat", model_data)
# this code reads the binary file in worker directory
# and loads the model via FIL for prediction
def predict_model(input_df):
from cuml import ForestInference
# load xgboost model using FIL and make prediction
fm = ForestInference.load("/tmp/model.dat", model_type="xgboost")
print(fm)
pred = fm.predict(input_df)
return pred
使用 FIL 进行分布式预测推理#
tic = timer()
predictions = X_infer.map_partitions(
predict_model, meta="float"
) # this is like MPI reduce
y_pred = predictions.compute()
wait(y_pred)
toc = timer()
print(f"Wall clock time taken for this cell : {toc-tic} s")
rows_csv = X_infer.iloc[:, 0].shape[0].compute()
print(
f"It took {toc-tic} seconds to predict on {rows_csv} rows using FIL distributedly on each worker"
)
It took 5.638823717948981 seconds to predict on 5426301 rows using FIL distributedly on each worker
tic = timer()
score = mean_squared_error(y_pred, _y_test)
toc = timer()
print("Final - RMSE: ", np.sqrt(score))
Final - RMSE: 2.2968235
步骤 7:清理#
client.close()
cluster.close()
Terminated VM dask-92c5978e-worker-54f8d057
Terminated VM dask-92c5978e-worker-9f9a9c9b
Removed disks for VM dask-92c5978e-worker-54f8d057
Removed disks for VM dask-92c5978e-worker-9f9a9c9b
Deleted network interface
Deleted network interface
Terminated VM dask-92c5978e-scheduler
Removed disks for VM dask-92c5978e-scheduler
Deleted network interface
Unassigned public IP