设备溢出
默认情况下,当 GPU 内存利用率达到 80% 时,Dask-CUDA 会启用从 GPU 到主机内存的溢出。可以通过显式设置 device_memory_limit
来更改此设置以适应工作负载需求,或者完全禁用。此参数接受表示内存大小的整数或字符串,或表示 GPU 总内存百分比的浮点数。
from dask_cuda import LocalCUDACluster
cluster = LocalCUDACluster(device_memory_limit=50000) # spilling after 50000 bytes
cluster = LocalCUDACluster(device_memory_limit="5GB") # spilling after 5 GB
cluster = LocalCUDACluster(device_memory_limit=0.3) # spilling after 30% memory utilization
通过将 device_memory_limit
设置为 0 可以禁用内存溢出。
cluster = LocalCUDACluster(device_memory_limit=0) # spilling disabled
dask cuda worker
同样适用,可以通过设置 --device-memory-limit
来控制溢出。
$ dask scheduler
distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:8786
$ dask cuda worker --device-memory-limit 50000
$ dask cuda worker --device-memory-limit 5GB
$ dask cuda worker --device-memory-limit 0.3
$ dask cuda worker --device-memory-limit 0
JIT-反溢出
Dask 和 Dask-CUDA 中的常规溢出存在一些显著问题。它跟踪的是任务输出,而不是跟踪单个对象。这意味着返回 CUDA 对象集合的任务要么溢出所有 CUDA 对象,要么都不溢出。其他问题包括对象重复、错误的溢出顺序以及未跟踪共享设备缓冲区(参见讨论)。
为了解决所有这些问题,Dask-CUDA 引入了 JIT-反溢出,它可以显著提高性能和内存使用。对于需要显著溢出的工作负载(例如在内存可用量少于数据的基础设施上进行大型连接),我们经常看到超过 50% 的改进(即,需要 300 秒的操作可能只需要 110 秒)。对于不需要溢出的工作负载,我们预期不会看到太大差异。
为了启用 JIT-反溢出,使用 jit_unspill
参数。
>>> import dask
>>> from distributed import Client
>>> from dask_cuda import LocalCUDACluster
>>> cluster = LocalCUDACluster(n_workers=10, device_memory_limit="1GB", jit_unspill=True)
>>> client = Client(cluster)
>>> with dask.config.set(jit_unspill=True):
... cluster = LocalCUDACluster(n_workers=10, device_memory_limit="1GB")
... client = Client(cluster)
或者设置工作器参数 --enable-jit-unspill
。
$ dask scheduler
distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:8786
$ dask cuda worker --enable-jit-unspill
或者设置环境变量 DASK_JIT_UNSPILL=True
。
$ dask scheduler
distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:8786
$ DASK_JIT_UNSPILL=True dask cuda worker
限制
JIT-反溢出将 CUDA 对象(例如 cudf.Dataframe
)包装在 ProxyObject
中。由 ProxyObject
实例代理的对象在访问时将被 JIT 反序列化。该实例的行为与被代理的对象相同,可以像使用被代理的对象一样进行访问/使用。
ProxyObject 有一些限制,不能完美地模仿被代理的对象。最值得注意的是,使用 instance()
进行类型检查按预期工作,但直接类型检查不行。
>>> import numpy as np
>>> from dask_cuda.proxy_object import asproxy
>>> x = np.arange(3)
>>> isinstance(asproxy(x), type(x))
True
>>> type(asproxy(x)) is type(x)
False
因此,如果遇到问题,请记住始终可以使用 unproxy()
直接访问被代理的对象,或者设置 DASK_JIT_UNSPILL_COMPATIBILITY_MODE=True
来启用兼容模式,该模式会自动对所有函数输入调用 unproxy()
。
cuDF 溢出
在使用 Dask cuDF(即 Dask DataFrame)执行 ETL 工作流时,通常最好利用 cuDF 中的原生溢出支持。
原生 cuDF 溢出比上述其他方法具有重要优势。当使用 JIT-反溢出或默认溢出时,工作器只能溢出任务的输入或输出。这意味着在任务执行完成之前,任务中创建的任何数据都是完全受限的。然而,当使用 cuDF 溢出时,可以在任务执行期间根据需要对单个设备缓冲区进行溢出/反溢出。
部署 LocalCUDACluster
时,可以通过 enable_cudf_spill
参数启用 cuDF 溢出。
>>> from distributed import Client
>>> from dask_cuda import LocalCUDACluster
>>> cluster = LocalCUDACluster(n_workers=10, enable_cudf_spill=True)
>>> client = Client(cluster)
dask cuda worker
同样适用。
$ dask scheduler
distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:8786
$ dask cuda worker --enable-cudf-spill
统计信息
启用 cuDF 溢出后,也可以让 cuDF 收集基本的溢出统计信息。收集这些信息是理解使用 cuDF 的内存密集型工作流性能的有用方法。
部署 LocalCUDACluster
时,可以通过 cudf_spill_stats
参数启用 cuDF 溢出统计信息。
>>> cluster = LocalCUDACluster(n_workers=10, enable_cudf_spill=True, cudf_spill_stats=1)
dask cuda worker
同样适用。
$ dask cuda worker --enable-cudf-spill --cudf-spill-stats 1
要让每个 dask-cuda 工作器在工作流中打印溢出统计信息,可以这样做:
def spill_info():
from cudf.core.buffer.spill_manager import get_global_manager
print(get_global_manager().statistics)
client.submit(spill_info)
有关可用溢出统计信息选项的更多信息,请参阅 cuDF 溢出文档。
限制
尽管 cuDF 溢出是大多数使用 Dask cuDF 的 ETL 工作流的最佳选择,但如果工作流在 cudf.DataFrame
和其他数据格式(例如 cupy.ndarray
)之间进行转换,其效果会差很多。一旦底层设备缓冲区“暴露”给外部内存引用,它们就变得 cuDF “不可溢出”了。在这种情况下(例如,Dask-CUDA + XGBoost),JIT-反溢出通常是更好的选择。