API

集群

class dask_cuda.LocalCUDACluster(CUDA_VISIBLE_DEVICES=None, n_workers=None, threads_per_worker=1, memory_limit='auto', device_memory_limit=0.8, enable_cudf_spill=False, cudf_spill_stats=0, data=None, local_directory=None, shared_filesystem=None, protocol=None, enable_tcp_over_ucx=None, enable_infiniband=None, enable_nvlink=None, enable_rdmacm=None, rmm_pool_size=None, rmm_maximum_pool_size=None, rmm_managed_memory=False, rmm_async=False, rmm_allocator_external_lib_list=None, rmm_release_threshold=None, rmm_log_directory=None, rmm_track_allocations=False, jit_unspill=None, log_spilling=False, worker_class=None, pre_import=None, **kwargs)[source]

dask.distributed.LocalCluster 的一个变体,每个进程使用一个 GPU。

这将为每个 Dask 工作节点进程分配不同的 CUDA_VISIBLE_DEVICES 环境变量。

对于具有复杂架构映射 CPU、GPU 和网络硬件的机器,例如 NVIDIA DGX-1 和 DGX-2,此类会创建一个本地集群,尽量尊重这些硬件特性。

每个工作节点进程会自动分配正确的 CPU 核心和网络接口卡以最大化性能。如果 UCX 和 UCX-Py 可用,则可以使用 InfiniBand 和 NVLink 连接来优化数据传输性能。

参数:
CUDA_VISIBLE_DEVICES字符串、整数列表或 None,默认为 None

限制活动使用的 GPU。可以是字符串(如 "0,1,2,3")、列表(如 [0, 1, 2, 3]),或 None 以使用所有可用的 GPU。

n_workers整数或 None,默认为 None

工作节点数量。可以是整数,或 None 以回退到 CUDA_VISIBLE_DEVICES 指定的 GPU。当 CUDA_VISIBLE_DEVICES 被指定时,n_workers 的值必须小于或等于其中指定的 GPU 数量,如果小于,则只使用前 n_workers 个 GPU。

threads_per_worker整数,默认为 1

每个 Dask 工作节点进程使用的线程数量。

memory_limit整数、浮点数、字符串或 None,默认为“auto”

主机 LRU 缓存的大小,用于确定工作节点何时开始溢出到磁盘(如果启用了 JIT-Unspill 则不可用)。可以是整数(字节)、浮点数(总系统内存的分数)、字符串(如 "5GB""5000M"),或 "auto"、0 或 None 以禁用内存管理。

device_memory_limit整数、浮点数、字符串或 None,默认为 0.8

CUDA 设备 LRU 缓存的大小,用于确定工作节点何时开始溢出到主机内存。可以是整数(字节)、浮点数(总设备内存的分数)、字符串(如 "5GB""5000M"),或 "auto"、0 或 None 以禁用溢出到主机(即允许完全使用设备内存)。

enable_cudf_spill布尔值,默认为 False

启用 cuDF 自动内存溢出功能。

警告

请勿与 JIT-Unspill 一起使用。

cudf_spill_stats整数,默认为 0

设置 cuDF 内存溢出统计级别。如果 enable_cudf_spill=False,此选项无效。

local_directory字符串或 None,默认为 None

本地机器上用于存储临时文件的路径。可以是字符串(如 "path/to/files"),或 None 以回退到本地 Dask 配置中 dask.temporary-directory 的值,如果未设置则使用当前工作目录。

shared_filesystem: 布尔值或 None,默认为 None

上面的 local_directory 是否在所有工作节点之间共享。如果为 None,则使用“jit-unspill-shared-fs”配置值,该值默认为 True。注意,在所有其他情况下此选项默认为 False,但在本地集群上默认为 True - 我们假定所有工作节点使用相同的文件系统。

protocol字符串或 None,默认为 None

用于通信的协议。可以是字符串(如 "tcp""ucx"),或 None 以自动选择正确的协议。

enable_tcp_over_ucx布尔值,默认为 None

设置环境变量以启用 TCP over UCX,即使 InfiniBand 和 NVLink 不受支持或已禁用。

enable_infiniband布尔值,默认为 None

设置环境变量以启用 UCX over InfiniBand,需要 protocol="ucx" 并在为 True 时隐含 enable_tcp_over_ucx=True

enable_nvlink布尔值,默认为 None

设置环境变量以启用 UCX over NVLink,需要 protocol="ucx" 并在为 True 时隐含 enable_tcp_over_ucx=True

enable_rdmacm布尔值,默认为 None

设置环境变量以启用 UCX RDMA 连接管理器支持,需要 protocol="ucx"enable_infiniband=True

rmm_pool_size整数、字符串或 None,默认为 None

用于初始化每个工作节点的 RMM 池大小。可以是整数(字节)、浮点数(总设备内存的分数)、字符串(如 "5GB""5000M"),或 None 以禁用 RMM 池。

注意

此大小是按工作节点配置的,而非集群范围的配置。

rmm_maximum_pool_size整数、字符串或 None,默认为 None

当设置了 rmm_pool_size 时,此参数指示最大池大小。可以是整数(字节)、浮点数(总设备内存的分数)、字符串(如 "5GB""5000M")或 None。默认情况下,使用 GPU 上的总可用内存。必须指定 rmm_pool_size 才能使用 RMM 池并设置最大池大小。

注意

--enable-rmm-async 搭配使用时,由于碎片化,无法保证最大大小。

注意

此大小是按工作节点配置的,而非集群范围的配置。

rmm_managed_memory布尔值,默认为 False

使用 RMM 初始化每个工作节点并将其设置为使用托管内存。如果禁用,仍然可以通过指定 rmm_pool_size 来使用 RMM。

警告

托管内存目前与 NVLink 不兼容。尝试同时启用两者将导致异常。

rmm_async: 布尔值,默认为 False

使用 RMM 初始化每个工作节点并将其设置为使用 RMM 的异步分配器。更多信息请参阅 rmm.mr.CudaAsyncMemoryResource

警告

异步分配器需要 CUDA Toolkit 11.2 或更新版本。它也与 RMM 池和托管内存不兼容,尝试同时启用两者将导致异常。

rmm_allocator_external_lib_list: 字符串、列表或 None,默认为 None

外部库列表,将 RMM 设置为其分配器。支持的选项有:["torch", "cupy"]。可以是逗号分隔的字符串(如 "torch,cupy")或字符串列表(如 ["torch", "cupy"])。如果为 None,则没有外部库将 RMM 用作其分配器。

rmm_release_threshold: 整数、字符串或 None,默认为 None

rmm.async is True 且池大小超出此值时,池中持有的未使用内存将在下一个同步点释放。可以是整数(字节)、浮点数(总设备内存的分数)、字符串(如 "5GB""5000M"),或 None。默认情况下,此功能处于禁用状态。

注意

此大小是按工作节点配置的,而非集群范围的配置。

rmm_log_directory字符串或 None,默认为 None

写入每个工作节点的 RMM 日志文件的目录。客户端和调度器不会在此处记录日志。可以是字符串(如 "/path/to/logs/"),或 None 以禁用日志记录。

注意

只有在指定了 rmm_pool_sizermm_managed_memory=True 时,才会启用日志记录。

rmm_track_allocations布尔值,默认为 False

如果为 True,则使用 rmm.mr.TrackingResourceAdaptor 包装每个工作节点使用的内存资源,该适配器会跟踪分配的内存量。

注意

此选项允许 Dask 面板收集和报告附加诊断信息。但是,这会带来显著的开销,应仅用于调试和内存分析。

jit_unspill布尔值或 None,默认为 None

启用即时反溢出。可以是布尔值,或 None 以回退到本地 Dask 配置中 dask.jit-unspill 的值,如果未设置则禁用反溢出。

注意

这是实验性功能,不支持内存溢出到磁盘。更多信息请参阅 proxy_object.ProxyObjectproxify_host_file.ProxifyHostFile

log_spilling布尔值,默认为 True

启用将溢出操作直接记录到 distributed.Worker,日志级别为 INFO

pre_import字符串、列表或 None,默认为 None

作为工作节点插件预导入库,以防止导入时间过长影响后续 Dask 操作。应为逗号分隔的名称列表,例如“cudf,rmm”,或字符串列表,例如 [“cudf”, “rmm”]。

引发:
TypeError

如果启用 InfiniBand 或 NVLink 且 protocol!="ucx"

ValueError

如果请求了 RMM 池、RMM 托管内存或 RMM 异步分配器但无法导入 RMM。如果同时启用了 RMM 托管内存和异步分配器。如果设置了 RMM 最大池大小但未设置 RMM 池大小。如果设置了 RMM 最大池大小但使用了 RMM 异步分配器。如果设置了 RMM 释放阈值但未使用 RMM 异步分配器。

另请参阅

LocalCluster

示例

>>> from dask_cuda import LocalCUDACluster
>>> from dask.distributed import Client
>>> cluster = LocalCUDACluster()
>>> client = Client(cluster)
new_worker_spec()[source]

返回下一个工作节点的名称和规范

返回:
d: 字典,将名称映射到工作节点规范

另请参阅

扩展

命令行界面

工作节点

dask cuda

启动一个连接到现有调度器的、附加了 GPU 的分布式工作节点。

可以通过 SCHEDULER 参数传递 URI 或通过 --scheduler-file 选项传递调度器文件来指定调度器。

更多信息请参阅 https://docs.rapids.org.cn/api/dask-cuda/stable/quickstart.html#dask-cuda-worker

dask cuda [OPTIONS] [SCHEDULER] [PRELOAD_ARGV]...

选项

--host <host>

服务主机 IP 地址;应能被调度器和其他工作节点看到。可以是字符串(如 "127.0.0.1"),或 None 以回退到 --interface 指定的接口地址或默认接口地址。

--nthreads <nthreads>

每个 Dask 工作节点进程使用的线程数量。

默认:

1

--name <name>

工作节点的唯一名称。可以是字符串(如 "worker-1"),或 None 表示无名工作节点。

--memory-limit <memory_limit>

主机 LRU 缓存的大小,用于确定工作节点何时开始溢出到磁盘(如果启用了 JIT-Unspill 则不可用)。可以是整数(字节)、浮点数(总系统内存的分数)、字符串(如 "5GB""5000M"),或 "auto"、0 或 None 以禁用内存管理。

默认:

'auto'

--device-memory-limit <device_memory_limit>

CUDA 设备 LRU 缓存的大小,用于确定工作节点何时开始溢出到主机内存。可以是整数(字节)、浮点数(总设备内存的分数)、字符串(如 "5GB""5000M"),或 "auto" 或 0 以禁用溢出到主机(即允许完全使用全部设备内存)。

默认:

'0.8'

--enable-cudf-spill, --disable-cudf-spill

启用 cuDF 自动内存溢出功能。警告:请勿与 JIT-Unspill 一起使用。

默认:

False

--cudf-spill-stats <cudf_spill_stats>

设置 cuDF 内存溢出统计级别。如果未指定 --enable-cudf-spill,此选项无效。

--rmm-pool-size <rmm_pool_size>

用于初始化每个工作节点的 RMM 池大小。可以是整数(字节)、浮点数(总设备内存的分数)、字符串(如 "5GB""5000M"),或 None 以禁用 RMM 池。

注意

此大小是按工作节点配置的,而非集群范围的配置。

--rmm-maximum-pool-size <rmm_maximum_pool_size>

指定 --rmm-pool-size 时,此参数指示最大池大小。可以是整数(字节)、浮点数(总设备内存的分数)、字符串(如 "5GB""5000M"),或 None。默认情况下,使用 GPU 上的总可用内存。必须指定 rmm_pool_size 才能使用 RMM 池并设置最大池大小。

注意

--enable-rmm-async 搭配使用时,由于碎片化,无法保证最大大小。

注意

此大小是按工作节点配置的,而非集群范围的配置。

--rmm-managed-memory, --no-rmm-managed-memory

使用 RMM 初始化每个工作节点并将其设置为使用托管内存。如果禁用,仍然可以通过指定 --rmm-pool-size 来使用 RMM。

警告

托管内存目前与 NVLink 不兼容。尝试同时启用两者将导致失败。

默认:

False

--rmm-async, --no-rmm-async

使用 RMM 初始化每个工作节点并将其设置为使用 RMM 的异步分配器。更多信息请参阅 rmm.mr.CudaAsyncMemoryResource

警告

异步分配器需要 CUDA Toolkit 11.2 或更新版本。它也与 RMM 池和托管内存不兼容,尝试同时启用两者将导致失败。

默认:

False

--set-rmm-allocator-for-libs <rmm_allocator_external_lib_list>

将 RMM 设置为外部库的分配器。提供要设置的库的逗号分隔列表,例如“torch,cupy”。

选项:

cupy | torch

--rmm-release-threshold <rmm_release_threshold>

rmm.asyncTrue 且池大小超出此值时,池中持有的未使用内存将在下一个同步点释放。可以是整数(字节)、浮点数(总设备内存的分数)、字符串(如 "5GB""5000M"),或 None。默认情况下,此功能处于禁用状态。

注意

此大小是按工作节点配置的,而非集群范围的配置。

--rmm-log-directory <rmm_log_directory>

写入每个工作节点的 RMM 日志文件的目录。客户端和调度器不会在此处记录日志。可以是字符串(如 "/path/to/logs/"),或 None 以禁用日志记录。

注意

只有指定了 --rmm-pool-size--rmm-managed-memory 时,才会启用日志记录。

--rmm-track-allocations, --no-rmm-track-allocations

跟踪 RMM 进行的内存分配。如果为 True,则使用 rmm.mr.TrackingResourceAdaptor 包装每个工作节点的内存资源,该适配器允许查询 RMM 分配的内存量。

默认:

False

--pid-file <pid_file>

写入进程 PID 的文件。

--resources <resources>

用于任务约束的资源,例如 "GPU=2 MEM=10e9"

--dashboard, --no-dashboard

启动面板。

默认:

True

--dashboard-address <dashboard_address>

服务面板(如果启用)的相对地址。

默认:

':0'

--local-directory <local_directory>

本地机器上用于存储临时文件的路径。可以是字符串(如 "path/to/files"),或 None 以回退到本地 Dask 配置中 dask.temporary-directory 的值,如果未设置则使用当前工作目录。

--shared-filesystem, --no-shared-filesystem

如果指定了 --shared-filesystem,则通知 JIT-Unspill local_directory 是所有工作节点可用的共享文件系统,而 --no-shared-filesystem 则通知它可能不是共享文件系统。如果两者都没有指定,JIT-Unspill 将根据 Dask 配置值 “jit-unspill-shared-fs” 进行判断。注意,共享文件系统必须支持 os.link() 操作。

--scheduler-file <scheduler_file>

JSON 编码的调度器信息文件名。与等效的 dask scheduler 选项配合使用。

--protocol <protocol>

协议,例如 tcp、tls 或 ucx

--interface <interface>

用于连接调度器的外部接口。通常使用以太网接口进行连接,而不是 InfiniBand 接口(如果可用)。可以是字符串(如 NVLink 的 "eth0" 或 InfiniBand 的 "ib0"),或 None 以回退到默认接口。

--preload <preload>

每个工作节点进程应加载的模块,例如 "foo.bar""/path/to/foo.py"

--death-timeout <death_timeout>

关闭前等待调度器的秒数

--dashboard-prefix <dashboard_prefix>

面板前缀。可以是字符串(例如…),或 None 表示无前缀。

--tls-ca-file <tls_ca_file>

用于 TLS 的 CA 证书文件(PEM 格式)。可以是字符串(如 "path/to/certs"),或 None 表示无证书。

--tls-cert <tls_cert>

用于 TLS 的证书文件(PEM 格式)。可以是字符串(如 "path/to/certs"),或 None 表示无证书。

--tls-key <tls_key>

用于 TLS 的私钥文件(PEM 格式)。可以是字符串(如 "path/to/certs"),或 None 表示无私钥。

--enable-tcp-over-ucx, --disable-tcp-over-ucx

设置环境变量以启用 TCP over UCX,即使 InfiniBand 和 NVLink 不受支持或已禁用。

--enable-infiniband, --disable-infiniband

设置环境变量以启用 UCX over InfiniBand,启用时隐含 --enable-tcp-over-ucx

设置环境变量以启用 UCX over NVLink,启用时隐含 --enable-tcp-over-ucx

--enable-rdmacm, --disable-rdmacm

根据用户参数创建 CUDA 上下文并初始化 UCX-Py。

有时,在启动创建各种线程的 Dask 工作节点进程之前初始化 CUDA 上下文会很方便。

启用即时反溢出。可以是布尔值,或 None 以回退到本地 Dask 配置中 dask.jit-unspill 的值,如果未设置则禁用反溢出。

注意

这是实验性功能,不支持内存溢出到磁盘。更多信息请参阅 proxy_object.ProxyObjectproxify_host_file.ProxifyHostFile

为确保 UCX 正常工作,使用正确的选项对其进行初始化非常重要。这对于客户端尤其重要,客户端无法使用 LocalCUDAClusterdask cuda worker 等参数配置使用 UCX。此函数将确保根据用户传递的标志和选项为它们提供 UCX 配置。

此函数也可用于工作节点预加载脚本中,以便对主流的 Dask.distributed 进行 UCX 配置。 https://docs.dask.org.cn/en/latest/setup/custom-startup.html

您可以使用以下 YAML 将其添加到全局配置中

有关 Dask 配置的更多信息,请参阅 https://docs.dask.org.cn/en/latest/configuration.html

--multiprocessing-method <multiprocessing_method>

使用 multiprocessing 启动新进程的方法

选项:

spawn | fork | forkserver

参数

SCHEDULER

可选参数

PRELOAD_ARGV

可选参数

集群配置

dask cuda

查询现有 GPU 集群的配置。

可以通过 SCHEDULER 参数传递 URI 或通过 --scheduler-file 选项传递调度器文件来指定集群。

dask cuda [OPTIONS] [SCHEDULER] [PRELOAD_ARGV]...

选项

--scheduler-file <scheduler_file>

JSON 编码的调度器信息文件名。与等效的 dask scheduler 选项配合使用。

--tls-ca-file <tls_ca_file>

用于 TLS 的 CA 证书文件(PEM 格式)。可以是字符串(如 "path/to/certs"),或 None 表示无证书。

--tls-cert <tls_cert>

用于 TLS 的证书文件(PEM 格式)。可以是字符串(如 "path/to/certs"),或 None 表示无证书。

--tls-key <tls_key>

用于 TLS 的私钥文件(PEM 格式)。可以是字符串(如 "path/to/certs"),或 None 表示无私钥。

参数

SCHEDULER

可选参数

PRELOAD_ARGV

可选参数

客户端初始化

在给定名称下暂存工作节点上的键

在显式通信任务中,使用 pop_staging_area(…, name) 访问暂存的键及关联的数据。

有时初始化 CUDA 上下文很方便,尤其是在启动创建各种线程的 Dask 工作进程之前。

为确保 UCX 正常工作,重要的是确保使用正确的选项对其进行初始化。这对于客户端尤其重要,客户端无法使用诸如 LocalCUDAClusterdask cuda worker 等参数配置 UCX。此函数将根据用户传递的标志和选项,确保它们获得 UCX 配置。

此函数还可以在工作进程预加载脚本中使用,用于配置 Dask.distributed 的 UCX。 https://docs.dask.org.cn/en/latest/setup/custom-startup.html

您可以使用以下 YAML 将其添加到您的全局配置中

distributed:
  worker:
    preload:
      - dask_cuda.initialize

有关 Dask 配置的更多信息,请参阅 https://docs.dask.org.cn/en/latest/configuration.html

参数:
在单个工作节点上运行协程

worker: 字符串

enable_tcp_over_ucx布尔值,默认为 None

设置环境变量以启用 TCP over UCX,即使 InfiniBand 和 NVLink 不受支持或已禁用。

enable_infiniband布尔值,默认为 None

运行 coroutine 的工作节点

enable_nvlink布尔值,默认为 None

在工作节点上运行的函数

enable_rdmacm布尔值,默认为 None

wait: boolean, 可选

如果为 True,则在返回前等待协程完成。

对 DataFrame 的分区进行排序,以便列中的所有值对齐

这通过显式通信实现了基于任务的 shuffle。它需要完整的数据集读取、序列化和 shuffle。这是昂贵的。如果可能,应避免进行 shuffle。

参数:
这不会保留有意义的索引/分区方案。如果并行执行,这不是确定性的。

需要一个活跃的客户端。

run(coroutine, *args, workers=None, lock_workers=False)[source]

在多个工作节点上运行协程

协程将工作节点的状态字典作为第一个参数,*args 作为后续参数。

参数:
coroutine: 协程

在每个工作节点上运行的函数

*args

coroutine 的参数

workers: list, 可选

工作节点列表。默认为所有工作节点

lock_workers: bool, 可选

使用 distributed.MultiLock 获取对工作节点的独占访问权。使用此标志支持并行运行。

返回:
ret: 列表

每个工作节点的输出列表

stage_keys(name: str, keys: Iterable[Hashable]) Dict[int, set][source]

在指定名称下在工作进程上暂存键

在显式通信任务中,使用 pop_staging_area(…, name) 访问暂存的键和相关数据。

参数:
name: 字符串

暂存区域的名称

keys: 可迭代对象

要暂存的键

返回:
dict

字典,将每个工作节点排名映射到工作节点暂存键集合

注意

在显式通信的上下文中,暂存是复制 Dask 键职责的行为。当暂存一个键时,拥有该键的工作节点(由 Dask 调度器分配)将其对键和关联数据的引用保存到其本地暂存区域。从那时起,如果调度器取消该键,工作节点(以及在该工作节点上运行的任务)现在拥有对键和关联数据的独占访问权。通过这种方式,暂存使得长时间运行的显式通信任务能够尽快释放输入数据。

submit(worker, coroutine, *args, wait=False)[source]

在单个工作进程上运行协程

协程将工作节点的状态字典作为第一个参数,*args 作为后续参数。

参数:
worker: str

运行 coroutine 的工作进程

coroutine: 协程

在工作进程上运行的函数

*args

coroutine 的参数

wait: boolean,可选

如果为 True,则等待协程完成再返回。

返回:
ret: object or Future

如果 wait=True,则为 coroutine 的结果;如果 wait=False,则为稍后可以等待的 Future。

dask_cuda.explicit_comms.dataframe.shuffle.shuffle(df: DataFrame, column_names: List[str], npartitions: int | None = None, ignore_index: bool = False, batchsize: int | None = None) DataFrame[source]

对 DataFrame 的分块进行排序,以使列中的所有值对齐

这使用显式通信执行基于任务的混洗。它需要完全读取数据集、序列化和混洗。这是昂贵的。如果可能,应避免混洗。

这不会保留有意义的索引/分区方案。如果并行执行,这不是确定性的。

需要一个活动的客户端。

参数:
df: dask.dataframe.DataFrame

要进行 shuffle 的 Dataframe

column_names: 字符串列表

要基于其进行拆分的列名列表。

npartitions: 整数或 None

所需的输出分区数量。如果为 None,则输出分区数量等于 df.npartitions

ignore_index: 布尔值

在 shuffle 期间忽略索引。如果为 True,性能可能会提高,但索引值将不会被保留。

batchsize: 整数

一个 shuffle 由多个轮次组成,每个工作节点在每个轮次中对其 dataframe 的一定数量的分区进行分区,然后进行all-to-all通信。批量大小是每个工作节点在每个轮次中处理的分区数量。如果为 -1,每个工作节点将在单轮中处理其所有分区,并且所有减少内存使用的方法都会被禁用,这在内存压力不是问题时可能更快。如果为 None,则使用 DASK_EXPLICIT_COMMS_BATCHSIZE 的值,如果未设置则使用 1,因此默认情况下,我们优先考虑鲁棒性而非性能。

返回:
df: dask.dataframe.DataFrame

经过 shuffle 的 dataframe