注意
RAFT 中的向量搜索和聚类算法正在迁移到名为 cuVS 的专用向量搜索新库中。在迁移期间,我们将继续支持 RAFT 中的向量搜索算法,但在 RAPIDS 24.06 (June) 版本之后将不再更新它们。我们计划在 RAPIDS 24.10 (October) 版本之前完成迁移,并在 24.12 (December) 版本中将其完全从 RAFT 中移除。
RAFT Dask API#
基于 Dask 的多节点多 GPU 通信器#
- class raft_dask.common.Comms(comms_p2p=False, client=None, verbose=False, streams_per_handle=0, nccl_root_location='scheduler')[source]#
初始化并管理 Dask 集群的 worker 上的底层 NCCL 和 UCX 通信句柄。期望显式调用
init()
。当不再需要通信时,建议同时调用destroy()
以便清理底层资源。此类不是线程安全的。方法
destroy
()关闭已初始化的通信并清理资源。
init
([workers])初始化底层通信。
worker_info
(workers)构建一个字典 { (worker_address, worker_port)
create_nccl_uniqueid
示例
# The following code block assumes we have wrapped a C++ # function in a Python function called `run_algorithm`, # which takes a `raft::handle_t` as a single argument. # Once the `Comms` instance is successfully initialized, # the underlying `raft::handle_t` will contain an instance # of `raft::comms::comms_t` from dask_cuda import LocalCUDACluster from dask.distributed import Client from raft.dask.common import Comms, local_handle cluster = LocalCUDACluster() client = Client(cluster) def _use_comms(sessionId): return run_algorithm(local_handle(sessionId)) comms = Comms(client=client) comms.init() futures = [client.submit(_use_comms, comms.sessionId, workers=[w], pure=False) # Don't memoize for w in cb.worker_addresses] wait(dfs, timeout=5) comms.destroy() client.close() cluster.close()