API
集群
- class dask_cuda.LocalCUDACluster(CUDA_VISIBLE_DEVICES=None, n_workers=None, threads_per_worker=1, memory_limit='auto', device_memory_limit=0.8, enable_cudf_spill=False, cudf_spill_stats=0, data=None, local_directory=None, shared_filesystem=None, protocol=None, enable_tcp_over_ucx=None, enable_infiniband=None, enable_nvlink=None, enable_rdmacm=None, rmm_pool_size=None, rmm_maximum_pool_size=None, rmm_managed_memory=False, rmm_async=False, rmm_allocator_external_lib_list=None, rmm_release_threshold=None, rmm_log_directory=None, rmm_track_allocations=False, jit_unspill=None, log_spilling=False, worker_class=None, pre_import=None, **kwargs)[source]
dask.distributed.LocalCluster
的一个变体,每个进程使用一个 GPU。这将为每个 Dask 工作节点进程分配不同的
CUDA_VISIBLE_DEVICES
环境变量。对于具有复杂架构映射 CPU、GPU 和网络硬件的机器,例如 NVIDIA DGX-1 和 DGX-2,此类会创建一个本地集群,尽量尊重这些硬件特性。
每个工作节点进程会自动分配正确的 CPU 核心和网络接口卡以最大化性能。如果 UCX 和 UCX-Py 可用,则可以使用 InfiniBand 和 NVLink 连接来优化数据传输性能。
- 参数:
- CUDA_VISIBLE_DEVICES字符串、整数列表或 None,默认为 None
限制活动使用的 GPU。可以是字符串(如
"0,1,2,3"
)、列表(如[0, 1, 2, 3]
),或None
以使用所有可用的 GPU。- n_workers整数或 None,默认为 None
工作节点数量。可以是整数,或
None
以回退到CUDA_VISIBLE_DEVICES
指定的 GPU。当CUDA_VISIBLE_DEVICES
被指定时,n_workers
的值必须小于或等于其中指定的 GPU 数量,如果小于,则只使用前n_workers
个 GPU。- threads_per_worker整数,默认为 1
每个 Dask 工作节点进程使用的线程数量。
- memory_limit整数、浮点数、字符串或 None,默认为“auto”
主机 LRU 缓存的大小,用于确定工作节点何时开始溢出到磁盘(如果启用了 JIT-Unspill 则不可用)。可以是整数(字节)、浮点数(总系统内存的分数)、字符串(如
"5GB"
或"5000M"
),或"auto"
、0 或None
以禁用内存管理。- device_memory_limit整数、浮点数、字符串或 None,默认为 0.8
CUDA 设备 LRU 缓存的大小,用于确定工作节点何时开始溢出到主机内存。可以是整数(字节)、浮点数(总设备内存的分数)、字符串(如
"5GB"
或"5000M"
),或"auto"
、0 或None
以禁用溢出到主机(即允许完全使用设备内存)。- enable_cudf_spill布尔值,默认为 False
启用 cuDF 自动内存溢出功能。
警告
请勿与 JIT-Unspill 一起使用。
- cudf_spill_stats整数,默认为 0
设置 cuDF 内存溢出统计级别。如果
enable_cudf_spill=False
,此选项无效。- local_directory字符串或 None,默认为 None
本地机器上用于存储临时文件的路径。可以是字符串(如
"path/to/files"
),或None
以回退到本地 Dask 配置中dask.temporary-directory
的值,如果未设置则使用当前工作目录。- shared_filesystem: 布尔值或 None,默认为 None
上面的 local_directory 是否在所有工作节点之间共享。如果为
None
,则使用“jit-unspill-shared-fs”配置值,该值默认为 True。注意,在所有其他情况下此选项默认为 False,但在本地集群上默认为 True - 我们假定所有工作节点使用相同的文件系统。- protocol字符串或 None,默认为 None
用于通信的协议。可以是字符串(如
"tcp"
或"ucx"
),或None
以自动选择正确的协议。- enable_tcp_over_ucx布尔值,默认为 None
设置环境变量以启用 TCP over UCX,即使 InfiniBand 和 NVLink 不受支持或已禁用。
- enable_infiniband布尔值,默认为 None
设置环境变量以启用 UCX over InfiniBand,需要
protocol="ucx"
并在为True
时隐含enable_tcp_over_ucx=True
。- enable_nvlink布尔值,默认为 None
设置环境变量以启用 UCX over NVLink,需要
protocol="ucx"
并在为True
时隐含enable_tcp_over_ucx=True
。- enable_rdmacm布尔值,默认为 None
设置环境变量以启用 UCX RDMA 连接管理器支持,需要
protocol="ucx"
和enable_infiniband=True
。- rmm_pool_size整数、字符串或 None,默认为 None
用于初始化每个工作节点的 RMM 池大小。可以是整数(字节)、浮点数(总设备内存的分数)、字符串(如
"5GB"
或"5000M"
),或None
以禁用 RMM 池。注意
此大小是按工作节点配置的,而非集群范围的配置。
- rmm_maximum_pool_size整数、字符串或 None,默认为 None
当设置了
rmm_pool_size
时,此参数指示最大池大小。可以是整数(字节)、浮点数(总设备内存的分数)、字符串(如"5GB"
或"5000M"
)或None
。默认情况下,使用 GPU 上的总可用内存。必须指定rmm_pool_size
才能使用 RMM 池并设置最大池大小。注意
与 --enable-rmm-async 搭配使用时,由于碎片化,无法保证最大大小。
注意
此大小是按工作节点配置的,而非集群范围的配置。
- rmm_managed_memory布尔值,默认为 False
使用 RMM 初始化每个工作节点并将其设置为使用托管内存。如果禁用,仍然可以通过指定
rmm_pool_size
来使用 RMM。警告
托管内存目前与 NVLink 不兼容。尝试同时启用两者将导致异常。
- rmm_async: 布尔值,默认为 False
使用 RMM 初始化每个工作节点并将其设置为使用 RMM 的异步分配器。更多信息请参阅
rmm.mr.CudaAsyncMemoryResource
。警告
异步分配器需要 CUDA Toolkit 11.2 或更新版本。它也与 RMM 池和托管内存不兼容,尝试同时启用两者将导致异常。
- rmm_allocator_external_lib_list: 字符串、列表或 None,默认为 None
外部库列表,将 RMM 设置为其分配器。支持的选项有:
["torch", "cupy"]
。可以是逗号分隔的字符串(如"torch,cupy"
)或字符串列表(如["torch", "cupy"]
)。如果为None
,则没有外部库将 RMM 用作其分配器。- rmm_release_threshold: 整数、字符串或 None,默认为 None
当
rmm.async is True
且池大小超出此值时,池中持有的未使用内存将在下一个同步点释放。可以是整数(字节)、浮点数(总设备内存的分数)、字符串(如"5GB"
或"5000M"
),或None
。默认情况下,此功能处于禁用状态。注意
此大小是按工作节点配置的,而非集群范围的配置。
- rmm_log_directory字符串或 None,默认为 None
写入每个工作节点的 RMM 日志文件的目录。客户端和调度器不会在此处记录日志。可以是字符串(如
"/path/to/logs/"
),或None
以禁用日志记录。注意
只有在指定了
rmm_pool_size
或rmm_managed_memory=True
时,才会启用日志记录。- rmm_track_allocations布尔值,默认为 False
如果为 True,则使用
rmm.mr.TrackingResourceAdaptor
包装每个工作节点使用的内存资源,该适配器会跟踪分配的内存量。注意
此选项允许 Dask 面板收集和报告附加诊断信息。但是,这会带来显著的开销,应仅用于调试和内存分析。
- jit_unspill布尔值或 None,默认为 None
启用即时反溢出。可以是布尔值,或
None
以回退到本地 Dask 配置中dask.jit-unspill
的值,如果未设置则禁用反溢出。注意
这是实验性功能,不支持内存溢出到磁盘。更多信息请参阅
proxy_object.ProxyObject
和proxify_host_file.ProxifyHostFile
。- log_spilling布尔值,默认为 True
启用将溢出操作直接记录到
distributed.Worker
,日志级别为INFO
。- pre_import字符串、列表或 None,默认为 None
作为工作节点插件预导入库,以防止导入时间过长影响后续 Dask 操作。应为逗号分隔的名称列表,例如“cudf,rmm”,或字符串列表,例如 [“cudf”, “rmm”]。
- 引发:
- TypeError
如果启用 InfiniBand 或 NVLink 且
protocol!="ucx"
。- ValueError
如果请求了 RMM 池、RMM 托管内存或 RMM 异步分配器但无法导入 RMM。如果同时启用了 RMM 托管内存和异步分配器。如果设置了 RMM 最大池大小但未设置 RMM 池大小。如果设置了 RMM 最大池大小但使用了 RMM 异步分配器。如果设置了 RMM 释放阈值但未使用 RMM 异步分配器。
另请参阅
LocalCluster
示例
>>> from dask_cuda import LocalCUDACluster >>> from dask.distributed import Client >>> cluster = LocalCUDACluster() >>> client = Client(cluster)
命令行界面
工作节点
dask cuda
启动一个连接到现有调度器的、附加了 GPU 的分布式工作节点。
可以通过 SCHEDULER
参数传递 URI 或通过 --scheduler-file
选项传递调度器文件来指定调度器。
更多信息请参阅 https://docs.rapids.org.cn/api/dask-cuda/stable/quickstart.html#dask-cuda-worker。
dask cuda [OPTIONS] [SCHEDULER] [PRELOAD_ARGV]...
选项
- --host <host>
服务主机 IP 地址;应能被调度器和其他工作节点看到。可以是字符串(如
"127.0.0.1"
),或None
以回退到--interface
指定的接口地址或默认接口地址。
- --nthreads <nthreads>
每个 Dask 工作节点进程使用的线程数量。
- 默认:
1
- --name <name>
工作节点的唯一名称。可以是字符串(如
"worker-1"
),或None
表示无名工作节点。
- --memory-limit <memory_limit>
主机 LRU 缓存的大小,用于确定工作节点何时开始溢出到磁盘(如果启用了 JIT-Unspill 则不可用)。可以是整数(字节)、浮点数(总系统内存的分数)、字符串(如
"5GB"
或"5000M"
),或"auto"
、0 或None
以禁用内存管理。- 默认:
'auto'
- --device-memory-limit <device_memory_limit>
CUDA 设备 LRU 缓存的大小,用于确定工作节点何时开始溢出到主机内存。可以是整数(字节)、浮点数(总设备内存的分数)、字符串(如
"5GB"
或"5000M"
),或"auto"
或 0 以禁用溢出到主机(即允许完全使用全部设备内存)。- 默认:
'0.8'
- --enable-cudf-spill, --disable-cudf-spill
启用 cuDF 自动内存溢出功能。警告:请勿与 JIT-Unspill 一起使用。
- 默认:
False
- --cudf-spill-stats <cudf_spill_stats>
设置 cuDF 内存溢出统计级别。如果未指定 --enable-cudf-spill,此选项无效。
- --rmm-pool-size <rmm_pool_size>
用于初始化每个工作节点的 RMM 池大小。可以是整数(字节)、浮点数(总设备内存的分数)、字符串(如
"5GB"
或"5000M"
),或None
以禁用 RMM 池。注意
此大小是按工作节点配置的,而非集群范围的配置。
- --rmm-maximum-pool-size <rmm_maximum_pool_size>
指定
--rmm-pool-size
时,此参数指示最大池大小。可以是整数(字节)、浮点数(总设备内存的分数)、字符串(如"5GB"
或"5000M"
),或None
。默认情况下,使用 GPU 上的总可用内存。必须指定rmm_pool_size
才能使用 RMM 池并设置最大池大小。注意
与 --enable-rmm-async 搭配使用时,由于碎片化,无法保证最大大小。
注意
此大小是按工作节点配置的,而非集群范围的配置。
- --rmm-managed-memory, --no-rmm-managed-memory
使用 RMM 初始化每个工作节点并将其设置为使用托管内存。如果禁用,仍然可以通过指定
--rmm-pool-size
来使用 RMM。警告
托管内存目前与 NVLink 不兼容。尝试同时启用两者将导致失败。
- 默认:
False
- --rmm-async, --no-rmm-async
使用 RMM 初始化每个工作节点并将其设置为使用 RMM 的异步分配器。更多信息请参阅
rmm.mr.CudaAsyncMemoryResource
。警告
异步分配器需要 CUDA Toolkit 11.2 或更新版本。它也与 RMM 池和托管内存不兼容,尝试同时启用两者将导致失败。
- 默认:
False
- --set-rmm-allocator-for-libs <rmm_allocator_external_lib_list>
将 RMM 设置为外部库的分配器。提供要设置的库的逗号分隔列表,例如“torch,cupy”。
- 选项:
cupy | torch
- --rmm-release-threshold <rmm_release_threshold>
当
rmm.async
为True
且池大小超出此值时,池中持有的未使用内存将在下一个同步点释放。可以是整数(字节)、浮点数(总设备内存的分数)、字符串(如"5GB"
或"5000M"
),或None
。默认情况下,此功能处于禁用状态。注意
此大小是按工作节点配置的,而非集群范围的配置。
- --rmm-log-directory <rmm_log_directory>
写入每个工作节点的 RMM 日志文件的目录。客户端和调度器不会在此处记录日志。可以是字符串(如
"/path/to/logs/"
),或None
以禁用日志记录。注意
只有指定了
--rmm-pool-size
或--rmm-managed-memory
时,才会启用日志记录。
- --rmm-track-allocations, --no-rmm-track-allocations
跟踪 RMM 进行的内存分配。如果为
True
,则使用rmm.mr.TrackingResourceAdaptor
包装每个工作节点的内存资源,该适配器允许查询 RMM 分配的内存量。- 默认:
False
- --pid-file <pid_file>
写入进程 PID 的文件。
- --resources <resources>
用于任务约束的资源,例如
"GPU=2 MEM=10e9"
。
- --dashboard, --no-dashboard
启动面板。
- 默认:
True
- --dashboard-address <dashboard_address>
服务面板(如果启用)的相对地址。
- 默认:
':0'
- --local-directory <local_directory>
本地机器上用于存储临时文件的路径。可以是字符串(如
"path/to/files"
),或None
以回退到本地 Dask 配置中dask.temporary-directory
的值,如果未设置则使用当前工作目录。
如果指定了 --shared-filesystem,则通知 JIT-Unspill local_directory 是所有工作节点可用的共享文件系统,而 --no-shared-filesystem 则通知它可能不是共享文件系统。如果两者都没有指定,JIT-Unspill 将根据 Dask 配置值 “jit-unspill-shared-fs” 进行判断。注意,共享文件系统必须支持 os.link() 操作。
- --scheduler-file <scheduler_file>
JSON 编码的调度器信息文件名。与等效的
dask scheduler
选项配合使用。
- --protocol <protocol>
协议,例如 tcp、tls 或 ucx
- --interface <interface>
用于连接调度器的外部接口。通常使用以太网接口进行连接,而不是 InfiniBand 接口(如果可用)。可以是字符串(如 NVLink 的
"eth0"
或 InfiniBand 的"ib0"
),或None
以回退到默认接口。
- --preload <preload>
每个工作节点进程应加载的模块,例如
"foo.bar"
或"/path/to/foo.py"
。
- --death-timeout <death_timeout>
关闭前等待调度器的秒数
- --dashboard-prefix <dashboard_prefix>
面板前缀。可以是字符串(例如…),或
None
表示无前缀。
- --tls-ca-file <tls_ca_file>
用于 TLS 的 CA 证书文件(PEM 格式)。可以是字符串(如
"path/to/certs"
),或None
表示无证书。
- --tls-cert <tls_cert>
用于 TLS 的证书文件(PEM 格式)。可以是字符串(如
"path/to/certs"
),或None
表示无证书。
- --tls-key <tls_key>
用于 TLS 的私钥文件(PEM 格式)。可以是字符串(如
"path/to/certs"
),或None
表示无私钥。
- --enable-tcp-over-ucx, --disable-tcp-over-ucx
设置环境变量以启用 TCP over UCX,即使 InfiniBand 和 NVLink 不受支持或已禁用。
- --enable-infiniband, --disable-infiniband
设置环境变量以启用 UCX over InfiniBand,启用时隐含
--enable-tcp-over-ucx
。
- --enable-nvlink, --disable-nvlink
设置环境变量以启用 UCX over NVLink,启用时隐含
--enable-tcp-over-ucx
。
- --enable-rdmacm, --disable-rdmacm
根据用户参数创建 CUDA 上下文并初始化 UCX-Py。
- 有时,在启动创建各种线程的 Dask 工作节点进程之前初始化 CUDA 上下文会很方便。
启用即时反溢出。可以是布尔值,或
None
以回退到本地 Dask 配置中dask.jit-unspill
的值,如果未设置则禁用反溢出。注意
这是实验性功能,不支持内存溢出到磁盘。更多信息请参阅
proxy_object.ProxyObject
和proxify_host_file.ProxifyHostFile
。
-
为确保 UCX 正常工作,使用正确的选项对其进行初始化非常重要。这对于客户端尤其重要,客户端无法使用
LocalCUDACluster
和dask cuda worker
等参数配置使用 UCX。此函数将确保根据用户传递的标志和选项为它们提供 UCX 配置。 此函数也可用于工作节点预加载脚本中,以便对主流的 Dask.distributed 进行 UCX 配置。 https://docs.dask.org.cn/en/latest/setup/custom-startup.html
- 您可以使用以下 YAML 将其添加到全局配置中
有关 Dask 配置的更多信息,请参阅 https://docs.dask.org.cn/en/latest/configuration.html。
- --multiprocessing-method <multiprocessing_method>
使用 multiprocessing 启动新进程的方法
- 选项:
spawn | fork | forkserver
参数
- SCHEDULER
可选参数
- PRELOAD_ARGV
可选参数
集群配置
dask cuda
查询现有 GPU 集群的配置。
可以通过 SCHEDULER
参数传递 URI 或通过 --scheduler-file
选项传递调度器文件来指定集群。
dask cuda [OPTIONS] [SCHEDULER] [PRELOAD_ARGV]...
选项
- --scheduler-file <scheduler_file>
JSON 编码的调度器信息文件名。与等效的
dask scheduler
选项配合使用。
- --tls-ca-file <tls_ca_file>
用于 TLS 的 CA 证书文件(PEM 格式)。可以是字符串(如
"path/to/certs"
),或None
表示无证书。
- --tls-cert <tls_cert>
用于 TLS 的证书文件(PEM 格式)。可以是字符串(如
"path/to/certs"
),或None
表示无证书。
- --tls-key <tls_key>
用于 TLS 的私钥文件(PEM 格式)。可以是字符串(如
"path/to/certs"
),或None
表示无私钥。
参数
- SCHEDULER
可选参数
- PRELOAD_ARGV
可选参数
客户端初始化
- 在给定名称下暂存工作节点上的键
在显式通信任务中,使用 pop_staging_area(…, name) 访问暂存的键及关联的数据。
有时初始化 CUDA 上下文很方便,尤其是在启动创建各种线程的 Dask 工作进程之前。
为确保 UCX 正常工作,重要的是确保使用正确的选项对其进行初始化。这对于客户端尤其重要,客户端无法使用诸如
LocalCUDACluster
和dask cuda worker
等参数配置 UCX。此函数将根据用户传递的标志和选项,确保它们获得 UCX 配置。此函数还可以在工作进程预加载脚本中使用,用于配置 Dask.distributed 的 UCX。 https://docs.dask.org.cn/en/latest/setup/custom-startup.html
您可以使用以下 YAML 将其添加到您的全局配置中
distributed: worker: preload: - dask_cuda.initialize
有关 Dask 配置的更多信息,请参阅 https://docs.dask.org.cn/en/latest/configuration.html。
- 参数:
- 在单个工作节点上运行协程
worker: 字符串
- enable_tcp_over_ucx布尔值,默认为 None
设置环境变量以启用 TCP over UCX,即使 InfiniBand 和 NVLink 不受支持或已禁用。
- enable_infiniband布尔值,默认为 None
运行
coroutine
的工作节点- enable_nvlink布尔值,默认为 None
在工作节点上运行的函数
- enable_rdmacm布尔值,默认为 None
wait: boolean, 可选
如果为 True,则在返回前等待协程完成。
- 对 DataFrame 的分区进行排序,以便列中的所有值对齐
这通过显式通信实现了基于任务的 shuffle。它需要完整的数据集读取、序列化和 shuffle。这是昂贵的。如果可能,应避免进行 shuffle。
- 参数:
- 这不会保留有意义的索引/分区方案。如果并行执行,这不是确定性的。
需要一个活跃的客户端。
- run(coroutine, *args, workers=None, lock_workers=False)[source]
在多个工作节点上运行协程
协程将工作节点的状态字典作为第一个参数,
*args
作为后续参数。- 参数:
- coroutine: 协程
在每个工作节点上运行的函数
- *args
coroutine
的参数- workers: list, 可选
工作节点列表。默认为所有工作节点
- lock_workers: bool, 可选
使用 distributed.MultiLock 获取对工作节点的独占访问权。使用此标志支持并行运行。
- 返回:
- ret: 列表
每个工作节点的输出列表
- stage_keys(name: str, keys: Iterable[Hashable]) Dict[int, set] [source]
在指定名称下在工作进程上暂存键
在显式通信任务中,使用 pop_staging_area(…, name) 访问暂存的键和相关数据。
- 参数:
- name: 字符串
暂存区域的名称
- keys: 可迭代对象
要暂存的键
- 返回:
- dict
字典,将每个工作节点排名映射到工作节点暂存键集合
注意
在显式通信的上下文中,暂存是复制 Dask 键职责的行为。当暂存一个键时,拥有该键的工作节点(由 Dask 调度器分配)将其对键和关联数据的引用保存到其本地暂存区域。从那时起,如果调度器取消该键,工作节点(以及在该工作节点上运行的任务)现在拥有对键和关联数据的独占访问权。通过这种方式,暂存使得长时间运行的显式通信任务能够尽快释放输入数据。
- submit(worker, coroutine, *args, wait=False)[source]
在单个工作进程上运行协程
协程将工作节点的状态字典作为第一个参数,
*args
作为后续参数。- 参数:
- worker: str
运行
coroutine
的工作进程- coroutine: 协程
在工作进程上运行的函数
- *args
coroutine
的参数- wait: boolean,可选
如果为 True,则等待协程完成再返回。
- 返回:
- ret: object or Future
如果 wait=True,则为 coroutine 的结果;如果 wait=False,则为稍后可以等待的 Future。
- dask_cuda.explicit_comms.dataframe.shuffle.shuffle(df: DataFrame, column_names: List[str], npartitions: int | None = None, ignore_index: bool = False, batchsize: int | None = None) DataFrame [source]
对 DataFrame 的分块进行排序,以使列中的所有值对齐
这使用显式通信执行基于任务的混洗。它需要完全读取数据集、序列化和混洗。这是昂贵的。如果可能,应避免混洗。
这不会保留有意义的索引/分区方案。如果并行执行,这不是确定性的。
需要一个活动的客户端。
- 参数:
- df: dask.dataframe.DataFrame
要进行 shuffle 的 Dataframe
- column_names: 字符串列表
要基于其进行拆分的列名列表。
- npartitions: 整数或 None
所需的输出分区数量。如果为 None,则输出分区数量等于 df.npartitions
- ignore_index: 布尔值
在 shuffle 期间忽略索引。如果为 True,性能可能会提高,但索引值将不会被保留。
- batchsize: 整数
一个 shuffle 由多个轮次组成,每个工作节点在每个轮次中对其 dataframe 的一定数量的分区进行分区,然后进行all-to-all通信。批量大小是每个工作节点在每个轮次中处理的分区数量。如果为 -1,每个工作节点将在单轮中处理其所有分区,并且所有减少内存使用的方法都会被禁用,这在内存压力不是问题时可能更快。如果为 None,则使用 DASK_EXPLICIT_COMMS_BATCHSIZE 的值,如果未设置则使用 1,因此默认情况下,我们优先考虑鲁棒性而非性能。
- 返回:
- df: dask.dataframe.DataFrame
经过 shuffle 的 dataframe