Dask 调度器需要 GPU 吗?#
用户在部署 Dask 集群时常见的一个问题是,调度器与 worker 是否有不同的最低要求。在使用 RAPIDS 和 GPU 时,这个问题更加复杂。
警告
本指南概述了我们目前关于调度器硬件要求的建议,但这可能会有所变化。
简而言之:强烈建议您的 Dask 调度器与集群中的其他组件具有匹配的硬件/软件功能。
因此,如果您的 worker 拥有 GPU 并安装了 RAPIDS 库,我们建议您的调度器也具备这些。但是,连接到调度器的 GPU 不需要像 worker 上的 GPU 那样强大,只要它具有相同的功能和驱动程序/CUDA 版本即可。
调度器使用 GPU 做什么?#
Dask 客户端生成它想要执行的操作的任务图,并序列化任何需要发送给 worker 的数据。调度器负责将这些任务分配给各个 Dask worker,并在两者之间来回传递序列化数据。Worker 反序列化数据,执行计算,序列化结果并将其传回。
这可能导致用户合乎逻辑地问调度器是否需要与 worker/客户端相同的功能。它不处理实际数据或执行任何用户计算,它只决定工作应该去哪里。
更进一步,您甚至可以问:“Dask 调度器甚至需要用 Python 编写吗?”几年前,甚至有人尝试用 Rust 实现调度器。
我们建议调度器具有相同功能的主要原因有两个
在一些边缘情况下,调度器确实会反序列化数据。
一些调度器优化需要将高层图在客户端进行 pickle 操作,并在调度器上进行 unpickle 操作。
如果您的工作负载没有触发任何边缘情况,并且您没有使用高层图优化,那么您很可能可以不使用 GPU。但您很可能会最终遇到问题,而且故障模式可能难以调试。
已知边缘情况#
调用 client.submit
并直接将数据传递给函数时,整个图将被序列化并发送到调度器。为了让调度器弄清楚如何处理它,图将被反序列化。如果数据使用 GPU,这可能导致调度器导入 RAPIDS 库,尝试实例化 CUDA 上下文并将数据填充到 GPU 内存中。如果这些库丢失和/或没有 GPU,这将导致调度器失败。
许多 Dask 集合也具有一个 meta 对象,它代表整个集合,但不包含任何数据。例如,一个 Dask Dataframe 有一个 meta Pandas Dataframe,它具有相同的元属性,并在调度期间使用。如果底层数据是 cuDF Dataframe,那么 meta 对象也将是 cuDF Dataframe,它将在调度器上反序列化。
示例故障模式#
当使用默认的 TCP 通信协议时,调度器通常不检查客户端和 worker 之间通信的数据,因此许多工作流不会引发故障。例如,假设我们设置了一个 Dask 集群,并且没有为调度器提供 GPU。以下使用 CuPy 支持的 Dask 数组的简单计算可以成功完成
import cupy
from distributed import Client, wait
import dask.array as da
client = Client(scheduler_file="scheduler.json")
x = cupy.arange(10)
y = da.arange(1000, like=x)
z = (y * 2).persist()
wait(z)
# Now let's look at some results
print(z[:10].compute())
我们可以运行这段代码,调度器无需访问 GPU
$ CUDA_VISIBLE_DEVICES="" dask scheduler --protocol tcp --scheduler-file scheduler.json &
$ dask cuda worker --protocol tcp --scheduler-file scheduler.json &
$ python test.py
...
[ 0 2 4 6 8 10 12 14 16 18]
...
相比之下,如果您配置了启用 InfiniBand 的系统并希望利用高性能网络,您将需要使用 UCX 协议,而不是 TCP。在没有 GPU 的调度器上使用此类设置将不会成功。当客户端或 worker 与调度器通信时,任何 GPU 分配的缓冲区将直接在 GPU 之间发送(避免往返主机内存)。这更高效,但如果调度器没有 GPU,则不会成功。运行上面相同的示例,但这次使用 UCX,我们将获得一个错误
$ CUDA_VISIBLE_DEVICES="" dask scheduler --protocol ucx --scheduler-file scheduler.json &
$ dask cuda worker --protocol ucx --scheduler-file scheduler.json &
$ python test.py
$ CUDA_VISIBLE_DEVICES="" dask scheduler --protocol ucx --scheduler-file foo.json &
$ dask-cuda-worker --protocol ucx --scheduler-file scheduler.json &
$ python test.py
...
2023-01-27 11:01:28,263 - distributed.core - ERROR - CUDA error at: .../rmm/include/rmm/cuda_device.hpp:56: cudaErrorNoDevice no CUDA-capable device is detected
Traceback (most recent call last):
File ".../distributed/distributed/utils.py", line 741, in wrapper
return await func(*args, **kwargs)
File ".../distributed/distributed/comm/ucx.py", line 372, in read
frames = [
File ".../distributed/distributed/comm/ucx.py", line 373, in <listcomp>
device_array(each_size) if is_cuda else host_array(each_size)
File ".../distributed/distributed/comm/ucx.py", line 171, in device_array
return rmm.DeviceBuffer(size=n)
File "device_buffer.pyx", line 85, in rmm._lib.device_buffer.DeviceBuffer.__cinit__
RuntimeError: CUDA error at: .../rmm/include/rmm/cuda_device.hpp:56: cudaErrorNoDevice no CUDA-capable device is detected
2023-01-27 11:01:28,263 - distributed.core - ERROR - Exception while handling op gather
Traceback (most recent call last):
File ".../distributed/distributed/core.py", line 820, in _handle_comm
result = await result
File ".../distributed/distributed/scheduler.py", line 5687, in gather
data, missing_keys, missing_workers = await gather_from_workers(
File ".../distributed/distributed/utils_comm.py", line 80, in gather_from_workers
r = await c
File ".../distributed/distributed/worker.py", line 2872, in get_data_from_worker
return await retry_operation(_get_data, operation="get_data_from_worker")
File ".../distributed/distributed/utils_comm.py", line 419, in retry_operation
return await retry(
File ".../distributed/distributed/utils_comm.py", line 404, in retry
return await coro()
File ".../distributed/distributed/worker.py", line 2852, in _get_data
response = await send_recv(
File ".../distributed/distributed/core.py", line 986, in send_recv
response = await comm.read(deserializers=deserializers)
File ".../distributed/distributed/utils.py", line 741, in wrapper
return await func(*args, **kwargs)
File ".../distributed/distributed/comm/ucx.py", line 372, in read
frames = [
File ".../distributed/distributed/comm/ucx.py", line 373, in <listcomp>
device_array(each_size) if is_cuda else host_array(each_size)
File ".../distributed/distributed/comm/ucx.py", line 171, in device_array
return rmm.DeviceBuffer(size=n)
File "device_buffer.pyx", line 85, in rmm._lib.device_buffer.DeviceBuffer.__cinit__
RuntimeError: CUDA error at: .../rmm/include/rmm/cuda_device.hpp:56: cudaErrorNoDevice no CUDA-capable device is detected
Traceback (most recent call last):
File "test.py", line 15, in <module>
print(z[:10].compute())
File ".../dask/dask/base.py", line 314, in compute
(result,) = compute(self, traverse=False, **kwargs)
File ".../dask/dask/base.py", line 599, in compute
results = schedule(dsk, keys, **kwargs)
File ".../distributed/distributed/client.py", line 3144, in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
File ".../distributed/distributed/client.py", line 2313, in gather
return self.sync(
File ".../distributed/distributed/utils.py", line 338, in sync
return sync(
File ".../distributed/distributed/utils.py", line 405, in sync
raise exc.with_traceback(tb)
File ".../distributed/distributed/utils.py", line 378, in f
result = yield future
File ".../tornado/gen.py", line 769, in run
value = future.result()
File ".../distributed/distributed/client.py", line 2205, in _gather
response = await future
File ".../distributed/distributed/client.py", line 2256, in _gather_remote
response = await retry_operation(self.scheduler.gather, keys=keys)
File ".../distributed/distributed/utils_comm.py", line 419, in retry_operation
return await retry(
File ".../distributed/distributed/utils_comm.py", line 404, in retry
return await coro()
File ".../distributed/distributed/core.py", line 1221, in send_recv_from_rpc
return await send_recv(comm=comm, op=key, **kwargs)
File ".../distributed/distributed/core.py", line 1011, in send_recv
raise exc.with_traceback(tb)
File ".../distributed/distributed/core.py", line 820, in _handle_comm
result = await result
File ".../distributed/distributed/scheduler.py", line 5687, in gather
data, missing_keys, missing_workers = await gather_from_workers(
File ".../distributed/distributed/utils_comm.py", line 80, in gather_from_workers
r = await c
File ".../distributed/distributed/worker.py", line 2872, in get_data_from_worker
return await retry_operation(_get_data, operation="get_data_from_worker")
File ".../distributed/distributed/utils_comm.py", line 419, in retry_operation
return await retry(
File ".../distributed/distributed/utils_comm.py", line 404, in retry
return await coro()
File ".../distributed/distributed/worker.py", line 2852, in _get_data
response = await send_recv(
File ".../distributed/distributed/core.py", line 986, in send_recv
response = await comm.read(deserializers=deserializers)
File ".../distributed/distributed/utils.py", line 741, in wrapper
return await func(*args, **kwargs)
File ".../distributed/distributed/comm/ucx.py", line 372, in read
frames = [
File ".../distributed/distributed/comm/ucx.py", line 373, in <listcomp>
device_array(each_size) if is_cuda else host_array(each_size)
File ".../distributed/distributed/comm/ucx.py", line 171, in device_array
return rmm.DeviceBuffer(size=n)
File "device_buffer.pyx", line 85, in rmm._lib.device_buffer.DeviceBuffer.__cinit__
RuntimeError: CUDA error at: .../rmm/include/rmm/cuda_device.hpp:56: cudaErrorNoDevice no CUDA-capable device is detected
...
关键错误来自 RMM,我们试图在调度器上分配一个 DeviceBuffer
,但没有可用的 GPU 来执行此操作
File ".../distributed/distributed/comm/ucx.py", line 171, in device_array
return rmm.DeviceBuffer(size=n)
File "device_buffer.pyx", line 85, in rmm._lib.device_buffer.DeviceBuffer.__cinit__
RuntimeError: CUDA error at: .../rmm/include/rmm/cuda_device.hpp:56: cudaErrorNoDevice no CUDA-capable device is detected
调度器优化和高层图#
Dask 社区正在积极致力于实现高层图,这将加速客户端 -> 调度器通信,并允许调度器进行高级优化,例如谓词下推。
为了使用现有序列化策略来通信 HLG,已经付出了很多努力,但这被证明实现起来非常困难。目前的计划是简化 HighLevelGraph/Layer,以便可以在客户端将整个 HLG pickle,作为一个单一的二进制 blob 发送到调度器,然后在调度器上进行 unpickle/实例化 (HLG->dict)。这个新计划的问题在于,pickle/un-pickle 约定将要求调度器与客户端具有相同的环境。如果任何 Layer 逻辑也需要设备分配,那么这种方法也要求调度器能够访问 GPU。
那么调度器的最低要求是什么?#
从软件角度来看,我们建议客户端、调度器和 worker 上的 Python 环境都匹配。考虑到用户需要确保 worker 与客户端具有相同的环境,确保调度器也具有相同的环境并不是很大的负担。
从硬件角度来看,我们建议调度器具有相同的功能,但不一定需要相同数量的资源。因此,如果 worker 拥有一个或多个 GPU,我们建议调度器能够访问一个具有匹配的 NVIDIA 驱动程序和 CUDA 版本的 GPU。在云平台上的大型多节点集群部署中,这可能意味着 worker 在拥有 8 个 GPU 的 VM 上启动,而调度器在拥有一个 GPU 的较小 VM 上启动。您也可以为调度器选择一个功能较弱的 GPU,例如用于推理的 T4,前提是它具有相同的 CUDA 功能、NVIDIA 驱动程序版本和 CUDA/CUDA 工具包版本。
这种平衡意味着我们可以保证一切按预期运行,同时降低成本,因为将调度器放在一个 8 个 GPU 的节点上是资源浪费。