设备溢出

默认情况下,当 GPU 内存利用率达到 80% 时,Dask-CUDA 会启用从 GPU 到主机内存的溢出。可以通过显式设置 device_memory_limit 来更改此设置以适应工作负载需求,或者完全禁用。此参数接受表示内存大小的整数或字符串,或表示 GPU 总内存百分比的浮点数。

from dask_cuda import LocalCUDACluster

cluster = LocalCUDACluster(device_memory_limit=50000)  # spilling after 50000 bytes
cluster = LocalCUDACluster(device_memory_limit="5GB")  # spilling after 5 GB
cluster = LocalCUDACluster(device_memory_limit=0.3)    # spilling after 30% memory utilization

通过将 device_memory_limit 设置为 0 可以禁用内存溢出。

cluster = LocalCUDACluster(device_memory_limit=0)  # spilling disabled

dask cuda worker 同样适用,可以通过设置 --device-memory-limit 来控制溢出。

$ dask scheduler
distributed.scheduler - INFO -   Scheduler at:  tcp://127.0.0.1:8786

$ dask cuda worker --device-memory-limit 50000
$ dask cuda worker --device-memory-limit 5GB
$ dask cuda worker --device-memory-limit 0.3
$ dask cuda worker --device-memory-limit 0

JIT-反溢出

Dask 和 Dask-CUDA 中的常规溢出存在一些显著问题。它跟踪的是任务输出,而不是跟踪单个对象。这意味着返回 CUDA 对象集合的任务要么溢出所有 CUDA 对象,要么都不溢出。其他问题包括对象重复错误的溢出顺序以及未跟踪共享设备缓冲区参见讨论)。

为了解决所有这些问题,Dask-CUDA 引入了 JIT-反溢出,它可以显著提高性能和内存使用。对于需要显著溢出的工作负载(例如在内存可用量少于数据的基础设施上进行大型连接),我们经常看到超过 50% 的改进(即,需要 300 秒的操作可能只需要 110 秒)。对于不需要溢出的工作负载,我们预期不会看到太大差异。

为了启用 JIT-反溢出,使用 jit_unspill 参数。

>>> import dask​
>>> from distributed import Client​
>>> from dask_cuda import LocalCUDACluster​

>>> cluster = LocalCUDACluster(n_workers=10, device_memory_limit="1GB", jit_unspill=True)​
>>> client = Client(cluster)​

>>> with dask.config.set(jit_unspill=True):​
...   cluster = LocalCUDACluster(n_workers=10, device_memory_limit="1GB")​
...   client = Client(cluster)

或者设置工作器参数 --enable-jit-unspill​

$ dask scheduler
distributed.scheduler - INFO - Scheduler at:  tcp://127.0.0.1:8786

$ dask cuda worker --enable-jit-unspill​

或者设置环境变量 DASK_JIT_UNSPILL=True

$ dask scheduler
distributed.scheduler - INFO -   Scheduler at:  tcp://127.0.0.1:8786

$ DASK_JIT_UNSPILL=True dask cuda worker​

限制

JIT-反溢出将 CUDA 对象(例如 cudf.Dataframe)包装在 ProxyObject 中。由 ProxyObject 实例代理的对象在访问时将被 JIT 反序列化。该实例的行为与被代理的对象相同,可以像使用被代理的对象一样进行访问/使用。

ProxyObject 有一些限制,不能完美地模仿被代理的对象。最值得注意的是,使用 instance() 进行类型检查按预期工作,但直接类型检查不行。

>>> import numpy as np
>>> from dask_cuda.proxy_object import asproxy
>>> x = np.arange(3)
>>> isinstance(asproxy(x), type(x))
True
>>>  type(asproxy(x)) is type(x)
False

因此,如果遇到问题,请记住始终可以使用 unproxy() 直接访问被代理的对象,或者设置 DASK_JIT_UNSPILL_COMPATIBILITY_MODE=True 来启用兼容模式,该模式会自动对所有函数输入调用 unproxy()

cuDF 溢出

在使用 Dask cuDF(即 Dask DataFrame)执行 ETL 工作流时,通常最好利用 cuDF 中的原生溢出支持

原生 cuDF 溢出比上述其他方法具有重要优势。当使用 JIT-反溢出或默认溢出时,工作器只能溢出任务的输入或输出。这意味着在任务执行完成之前,任务中创建的任何数据都是完全受限的。然而,当使用 cuDF 溢出时,可以在任务执行期间根据需要对单个设备缓冲区进行溢出/反溢出。

部署 LocalCUDACluster 时,可以通过 enable_cudf_spill 参数启用 cuDF 溢出。

>>> from distributed import Client​
>>> from dask_cuda import LocalCUDACluster​

>>> cluster = LocalCUDACluster(n_workers=10, enable_cudf_spill=True)​
>>> client = Client(cluster)​

dask cuda worker 同样适用。

$ dask scheduler
distributed.scheduler - INFO -   Scheduler at:  tcp://127.0.0.1:8786

$ dask cuda worker --enable-cudf-spill

统计信息

启用 cuDF 溢出后,也可以让 cuDF 收集基本的溢出统计信息。收集这些信息是理解使用 cuDF 的内存密集型工作流性能的有用方法。

部署 LocalCUDACluster 时,可以通过 cudf_spill_stats 参数启用 cuDF 溢出统计信息。

>>> cluster = LocalCUDACluster(n_workers=10, enable_cudf_spill=True, cudf_spill_stats=1)​

dask cuda worker 同样适用。

$ dask cuda worker --enable-cudf-spill --cudf-spill-stats 1

要让每个 dask-cuda 工作器在工作流中打印溢出统计信息,可以这样做:

def spill_info():
    from cudf.core.buffer.spill_manager import get_global_manager
    print(get_global_manager().statistics)
client.submit(spill_info)

有关可用溢出统计信息选项的更多信息,请参阅 cuDF 溢出文档

限制

尽管 cuDF 溢出是大多数使用 Dask cuDF 的 ETL 工作流的最佳选择,但如果工作流在 cudf.DataFrame 和其他数据格式(例如 cupy.ndarray)之间进行转换,其效果会差很多。一旦底层设备缓冲区“暴露”给外部内存引用,它们就变得 cuDF “不可溢出”了。在这种情况下(例如,Dask-CUDA + XGBoost),JIT-反溢出通常是更好的选择。