注意

RAFT 中的向量搜索和聚类算法正在迁移到名为 cuVS 的专用向量搜索新库中。在迁移期间,我们将继续支持 RAFT 中的向量搜索算法,但在 RAPIDS 24.06 (June) 版本之后将不再更新它们。我们计划在 RAPIDS 24.10 (October) 版本之前完成迁移,并在 24.12 (December) 版本中将其完全从 RAFT 中移除。

RAFT Dask API#

基于 Dask 的多节点多 GPU 通信器#

class raft_dask.common.Comms(comms_p2p=False, client=None, verbose=False, streams_per_handle=0, nccl_root_location='scheduler')[source]#

初始化并管理 Dask 集群的 worker 上的底层 NCCL 和 UCX 通信句柄。期望显式调用 init()。当不再需要通信时,建议同时调用 destroy() 以便清理底层资源。此类不是线程安全的。

方法

destroy()

关闭已初始化的通信并清理资源。

init([workers])

初始化底层通信。

worker_info(workers)

构建一个字典 { (worker_address, worker_port)

create_nccl_uniqueid

示例

# The following code block assumes we have wrapped a C++
# function in a Python function called `run_algorithm`,
# which takes a `raft::handle_t` as a single argument.
# Once the `Comms` instance is successfully initialized,
# the underlying `raft::handle_t` will contain an instance
# of `raft::comms::comms_t`

from dask_cuda import LocalCUDACluster
from dask.distributed import Client

from raft.dask.common import Comms, local_handle

cluster = LocalCUDACluster()
client = Client(cluster)

def _use_comms(sessionId):
    return run_algorithm(local_handle(sessionId))

comms = Comms(client=client)
comms.init()

futures = [client.submit(_use_comms,
                         comms.sessionId,
                         workers=[w],
                         pure=False) # Don't memoize
               for w in cb.worker_addresses]
wait(dfs, timeout=5)

comms.destroy()
client.close()
cluster.close()
destroy()[source]#

关闭已初始化的通信并清理资源。这将由 Comms 析构函数自动调用,但也可以提前调用以节省资源。

init(workers=None)[source]#

初始化底层通信。NCCL 是必需的,但 UCX 仅在 comms_p2p == True 时才初始化。

参数:
workersSequence

用于初始化通信的唯一 worker 集合。

worker_info(workers)[source]#
构建一个字典 { (worker_address, worker_port)

(worker_rank, worker_port ) }