注意
RAFT 中的向量搜索和聚类算法正在迁移到一个新的专注于向量搜索的库,该库名为 cuVS。在此迁移过程中,我们将继续支持 RAFT 中的向量搜索算法,但在 RAPIDS 24.06(6月)发布后将不再更新它们。我们计划在 RAPIDS 24.10(10月)发布时完成迁移,并在 24.12(12月)发布时将它们完全从 RAFT 中移除。
使用 RAFT 通信#
RAFT 提供了一个通信抽象,用于编写可扩展到多个 GPU 和多个节点的分布式算法。该通信抽象主要基于 MPI 和 NCCL,允许用户将算法设计与其执行环境解耦,从而实现“一次编写,随处部署”的语义。目前,cuGraph 和 cuML 中的分布式算法正在部署在 MPI 和 Dask 集群中,而 cuML 的分布式算法也部署在 Apache Spark 集群中的 GPU 上。这是一个强大的概念,因为分布式算法的编写可能很复杂,因此通过尽可能提高复用性,可以简化维护并使 bug 修复惠及更多地方。
RAFT 通信层的用户只需安装 MPI 并使用 mpirun
运行其应用程序,就可以很大程度上免费获得 MPI 集成;同时,raft-dask
Python 包提供了一种机制,用于在 Dask 集群中执行使用 RAFT 通信层编写的算法。通过一个小示例来了解如何使用 RAFT 的通信层构建算法将有所帮助。
首先,通过 raft::resources
实例传递一个 raft::comms_t
实例,并根据需要编写代码来利用集合通信和/或点对点通信。
#include <raft/core/comms.hpp>
#include <raft/core/device_mdspan.hpp>
#include <raft/util/cudart_utils.hpp>
void test_allreduce(raft::resources const &handle, int root) {
raft::comms::comms_t const& communicator = resource::get_comms(handle);
cudaStream_t stream = resource::get_cuda_stream(handle);
raft::device_scalar<int> temp_scalar(stream);
int to_send = 1;
raft::copy(temp_scalar.data(), &to_send, 1, stream);
communicator.allreduce(temp_scalar.data(), temp_scalar.data(), 1,
raft::comms::opt_t::SUM, stream);
resource::sync_stream(handle);
}
这个完全相同的函数现在可以在几种不同类型的 GPU 集群中执行。例如,可以通过使用 MPI_Comm
初始化一个 raft::comms::mpi_comms
实例来使用 MPI 执行它。
#include <raft/core/mpi_comms.hpp>
#include <raft/core/resources.hpp>
raft::resources resource_handle;
// ...
// initialize MPI_Comm
// ...
raft::comms::initialize_mpi_comms(resource_handle, mpi_comm);
// ...
test_allreduce(resource_handle, 0);
在 Dask 中部署我们的 `test_allreduce` 函数需要一个轻量级的 Python 接口,我们可以通过使用 pylibraft
并通过 Cython 暴露该函数来实现。
from pylibraft.common.handle cimport device_resources
from cython.operator cimport dereference as deref
cdef extern from “allreduce_test.hpp”:
void test_allreduce(device_resources const &handle, int root) except +
def run_test_allreduce(handle, root):
cdef const device_resources* h = <device_resources*><size_t>handle.getHandle()
test_allreduce(deref(h), root)
最后,我们可以使用 raft_dask
在 Dask 集群中执行我们的新算法(请注意,这还使用了 RAPIDS dask-cuda 库中的 LocalCUDACluster
)。
from raft_dask.common import Comms, local_handle
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
cluster = LocalCUDACluster()
client = Client(cluster)
# Create and initialize Comms instance
comms = Comms(client=client)
comms.init()
def func_run_allreduce(sessionId, root):
handle = local_handle(sessionId)
run_test_allreduce(handle, root)
# Invoke run_test_allreduce on all workers
dfs = [
client.submit(
func_run_allreduce,
comms.sessionId,
0,
pure=False,
workers=[w]
)
for w in comms.worker_addresses
]
# Wait until processing is done
wait(dfs, timeout=5)
comms.destroy()
client.close()
cluster.close()