注意

RAFT 中的向量搜索和聚类算法正在迁移到一个新的专注于向量搜索的库,该库名为 cuVS。在此迁移过程中,我们将继续支持 RAFT 中的向量搜索算法,但在 RAPIDS 24.06(6月)发布后将不再更新它们。我们计划在 RAPIDS 24.10(10月)发布时完成迁移,并在 24.12(12月)发布时将它们完全从 RAFT 中移除。

使用 RAFT 通信#

RAFT 提供了一个通信抽象,用于编写可扩展到多个 GPU 和多个节点的分布式算法。该通信抽象主要基于 MPI 和 NCCL,允许用户将算法设计与其执行环境解耦,从而实现“一次编写,随处部署”的语义。目前,cuGraph 和 cuML 中的分布式算法正在部署在 MPI 和 Dask 集群中,而 cuML 的分布式算法也部署在 Apache Spark 集群中的 GPU 上。这是一个强大的概念,因为分布式算法的编写可能很复杂,因此通过尽可能提高复用性,可以简化维护并使 bug 修复惠及更多地方。

RAFT 通信层的用户只需安装 MPI 并使用 mpirun 运行其应用程序,就可以很大程度上免费获得 MPI 集成;同时,raft-dask Python 包提供了一种机制,用于在 Dask 集群中执行使用 RAFT 通信层编写的算法。通过一个小示例来了解如何使用 RAFT 的通信层构建算法将有所帮助。

首先,通过 raft::resources 实例传递一个 raft::comms_t 实例,并根据需要编写代码来利用集合通信和/或点对点通信。

使用 RAFT 通信 API 编写的示例函数#
#include <raft/core/comms.hpp>
#include <raft/core/device_mdspan.hpp>
#include <raft/util/cudart_utils.hpp>

void test_allreduce(raft::resources const &handle, int root) {
  raft::comms::comms_t const& communicator = resource::get_comms(handle);
  cudaStream_t stream = resource::get_cuda_stream(handle);
  raft::device_scalar<int> temp_scalar(stream);

  int to_send = 1;
  raft::copy(temp_scalar.data(), &to_send, 1, stream);
  communicator.allreduce(temp_scalar.data(), temp_scalar.data(), 1,
                         raft::comms::opt_t::SUM, stream);
  resource::sync_stream(handle);
}

这个完全相同的函数现在可以在几种不同类型的 GPU 集群中执行。例如,可以通过使用 MPI_Comm 初始化一个 raft::comms::mpi_comms 实例来使用 MPI 执行它。

在 MPI 中运行 test_allreduce() 的示例#
#include <raft/core/mpi_comms.hpp>
#include <raft/core/resources.hpp>

raft::resources resource_handle;
// ...
// initialize MPI_Comm
// ...
raft::comms::initialize_mpi_comms(resource_handle,  mpi_comm);
// ...
test_allreduce(resource_handle, 0);

在 Dask 中部署我们的 `test_allreduce` 函数需要一个轻量级的 Python 接口,我们可以通过使用 pylibraft 并通过 Cython 暴露该函数来实现。

使用 cython 包装 test_allreduce() 的示例#
from pylibraft.common.handle cimport device_resources
from cython.operator cimport dereference as deref

cdef extern from allreduce_test.hpp:
    void test_allreduce(device_resources const &handle, int root) except +

def run_test_allreduce(handle, root):
    cdef const device_resources* h = <device_resources*><size_t>handle.getHandle()

test_allreduce(deref(h), root)

最后,我们可以使用 raft_dask 在 Dask 集群中执行我们的新算法(请注意,这还使用了 RAPIDS dask-cuda 库中的 LocalCUDACluster)。

在 Dask 中运行 test_allreduce() 的示例#
from raft_dask.common import Comms, local_handle
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
cluster = LocalCUDACluster()
client = Client(cluster)

# Create and initialize Comms instance
comms = Comms(client=client)
comms.init()

def func_run_allreduce(sessionId, root):
  handle = local_handle(sessionId)
  run_test_allreduce(handle, root)

# Invoke run_test_allreduce on all workers
dfs = [
  client.submit(
    func_run_allreduce,
    comms.sessionId,
    0,
    pure=False,
    workers=[w]
  )
  for w in comms.worker_addresses
]

# Wait until processing is done
wait(dfs, timeout=5)

comms.destroy()
client.close()
cluster.close()