注意

RAFT 中的向量搜索和聚类算法正在迁移到一个新的专门用于向量搜索的库,称为 cuVS。在迁移期间,我们将继续支持 RAFT 中的向量搜索算法,但在 RAPIDS 24.06(6月)发布后将不再更新。我们计划在 RAPIDS 24.10(10月)发布时完成迁移,并在 24.12(12月)发布时将其完全从 RAFT 中移除。

多节点多 GPU#

RAFT 包含 C++ 基础设施,用于在编写可在多个节点和多个 GPU 上扩展的应用程序时抽象通信层。该基础设施假设 OPG(每个 GPU 一个进程)架构,其中多个物理并行单元(进程、rank 或 worker)可能同时执行代码,但每个并行单元仅与一个 GPU 通信,并且是与每个 GPU 通信的唯一进程。

RAFT 中的 comms 层旨在为屏障同步集合通信提供外观 API,允许用户使用单个抽象层编写算法,并在许多不同类型的系统中部署。目前,RAFT 通信代码已部署在 MPI、Dask 和 Spark 集群中。

通用类型#

#include <raft/core/comms.hpp>

namespace raft::comms

enum class datatype_t#

enumerator CHAR#
enumerator UINT8#
enumerator INT32#
enumerator UINT32#
enumerator INT64#
enumerator UINT64#
enumerator FLOAT32#
enumerator FLOAT64#
enum class op_t#

enumerator SUM#
enumerator PROD#
enumerator MIN#
enumerator MAX#
enum class status_t#

分布式流同步的结果状态

enumerator SUCCESS#
enumerator ERROR#
enumerator ABORT#
typedef unsigned int request_t#
template<typename value_t>
datatype_t get_type()#
template<>
datatype_t get_type<char>()#
template<>
datatype_t get_type<uint8_t>()#
template<>
datatype_t get_type<int>()#
template<>
datatype_t get_type<uint32_t>()#
template<>
datatype_t get_type<int64_t>()#
template<>
datatype_t get_type<uint64_t>()#
template<>
datatype_t get_type<float>()#
template<>
datatype_t get_type<double>()#

Comms 接口#

class comms_t#
#include <comms.hpp>

公共函数

inline virtual ~comms_t()#

虚析构函数以启用多态性

inline int get_size() const#

返回通信器组的大小

inline int get_rank() const#

返回本地 rank

inline std::unique_ptr<comms_iface> comm_split(int color, int key) const#

根据给定的 color 和 key 将当前通信器组拆分为子组

参数:
  • color – 具有相同 color 的 rank 放在同一个通信器中

  • key – 控制 rank 分配

inline void barrier() const#

执行集体屏障同步

inline status_t sync_stream(cudaStream_t stream) const#

某些集合通信实现(例如 NCCL)可能使用需要显式同步的异步集合操作。始终使用此方法进行同步很重要,以便传播故障,而不是使用 cudaStreamSynchronize(),以防止潜在的死锁。

参数:

stream – 用于同步集合操作的 cuda 流

template<typename value_t>
inline void isend(const value_t *buf, size_t size, int dest, int tag, request_t *request) const#

执行异步点对点发送

模板参数:

value_t – 要发送的数据类型

参数:
  • buf – 指向要发送数据数组的指针

  • size – buf 中的元素数量

  • dest – 目标 rank

  • tag – 接收方用于过滤的标签

  • request – 指向用于存储返回的 request_t 对象的指针。这将在 waitall() 中使用,以同步直到消息发送成功(或失败)。

template<typename value_t>
inline void irecv(value_t *buf, size_t size, int source, int tag, request_t *request) const#

执行异步点对点接收

模板参数:

value_t – 要接收的数据类型

参数:
  • buf – 指向将存储接收到的数据(已初始化)数组的指针

  • size – buf 中的元素数量

  • source – 源 rank

  • tag – 用于消息过滤的标签

  • request – 指向用于存储返回的 request_t 对象的指针。这将在 waitall() 中使用,以同步直到消息发送成功(或失败)。

inline void waitall(int count, request_t array_of_requests[]) const#

对 isend/irecv 返回的 request_t 对象数组进行同步

参数:
  • count – 要同步的请求数量

  • array_of_requests – isend/irecv 返回的 request_t 对象数组

template<typename value_t>
inline void allreduce(const value_t *sendbuff, value_t *recvbuff, size_t count, op_t op, cudaStream_t stream) const#

执行 allreduce 集合操作

模板参数:

value_t – 底层缓冲区的 datatype

参数:
  • sendbuff – 要规约的数据

  • recvbuff – 存储规约结果的缓冲区

  • count – sendbuff 中的元素数量

  • op – 要执行的规约操作

  • stream – 同步操作的 CUDA 流

template<typename value_t>
inline void bcast(value_t *buff, size_t count, int root, cudaStream_t stream) const#

将数据从一个 rank 广播到其余所有 rank

模板参数:

value_t – 底层缓冲区的 datatype

参数:
  • buff – 要发送的缓冲区

  • count – buff 中的元素数量

  • root – 发起广播的 rank

  • stream – 同步操作的 CUDA 流

template<typename value_t>
inline void bcast(const value_t *sendbuff, value_t *recvbuff, size_t count, int root, cudaStream_t stream) const#

将数据从一个 rank 广播到其余所有 rank

模板参数:

value_t – 底层缓冲区的 datatype

参数:
  • sendbuff – 包含要广播数据的缓冲区(仅在 root 上使用)

  • recvbuff – 接收广播数据的缓冲区

  • count – buff 中的元素数量

  • root – 发起广播的 rank

  • stream – 同步操作的 CUDA 流

template<typename value_t>
inline void reduce(const value_t *sendbuff, value_t *recvbuff, size_t count, op_t op, int root, cudaStream_t stream) const#

将来自多个 rank 的数据规约到单个 rank

模板参数:

value_t – 底层缓冲区的 datatype

参数:
  • sendbuff – 包含要规约数据的缓冲区

  • recvbuff – 包含规约数据的缓冲区(仅需要在 root 上初始化)

  • count – sendbuff 中的元素数量

  • op – 要执行的规约操作

  • root – 存储结果的 rank

  • stream – 同步操作的 CUDA 流

template<typename value_t>
inline void allgather(const value_t *sendbuff, value_t *recvbuff, size_t sendcount, cudaStream_t stream) const#

将每个 rank 的数据收集到所有 rank 上

模板参数:

value_t – 底层缓冲区的 datatype

参数:
  • sendbuff – 包含要收集数据的缓冲区

  • recvbuff – 包含从所有 rank 收集数据的缓冲区

  • sendcount – 发送缓冲区中的元素数量

  • stream – 同步操作的 CUDA 流

template<typename value_t>
inline void allgatherv(const value_t *sendbuf, value_t *recvbuf, const size_t *recvcounts, const size_t *displs, cudaStream_t stream) const#

收集所有 rank 的数据并将合并后的数据分发给所有 rank

模板参数:

value_t – 底层缓冲区的 datatype

参数:
  • sendbuf – 包含要发送数据的缓冲区

  • recvbuf – 包含要接收数据的缓冲区

  • recvcounts – 指向一个数组(长度为 num_ranks 大小),其中包含从每个 rank 接收的元素数量

  • displs – 指向一个数组(长度为 num_ranks 大小),用于指定放置来自每个 rank 的接收数据的位置(相对于 recvbuf 的偏移量)

  • stream – 同步操作的 CUDA 流

template<typename value_t>
inline void gather(const value_t *sendbuff, value_t *recvbuff, size_t sendcount, int root, cudaStream_t stream) const#

将每个 rank 的数据收集到所有 rank 上

模板参数:

value_t – 底层缓冲区的 datatype

参数:
  • sendbuff – 包含要收集数据的缓冲区

  • recvbuff – 包含从所有 rank 收集数据的缓冲区

  • sendcount – 发送缓冲区中的元素数量

  • root – 存储结果的 rank

  • stream – 同步操作的 CUDA 流

template<typename value_t>
inline void gatherv(const value_t *sendbuf, value_t *recvbuf, size_t sendcount, const size_t *recvcounts, const size_t *displs, int root, cudaStream_t stream) const#

收集所有 rank 的数据并将合并后的数据分发给所有 rank

模板参数:

value_t – 底层缓冲区的 datatype

参数:
  • sendbuf – 包含要发送数据的缓冲区

  • recvbuf – 包含要接收数据的缓冲区

  • sendcount – 发送缓冲区中的元素数量

  • recvcounts – 指向一个数组(长度为 num_ranks 大小),其中包含从每个 rank 接收的元素数量

  • displs – 指向一个数组(长度为 num_ranks 大小),用于指定放置来自每个 rank 的接收数据的位置(相对于 recvbuf 的偏移量)

  • root – 存储结果的 rank

  • stream – 同步操作的 CUDA 流

template<typename value_t>
inline void reducescatter(const value_t *sendbuff, value_t *recvbuff, size_t recvcount, op_t op, cudaStream_t stream) const#

对所有 rank 的数据进行规约,然后将结果分散到各个 rank

模板参数:

value_t – 底层缓冲区的 datatype

参数:
  • sendbuff – 包含要发送数据的缓冲区(大小为 recvcount * num_ranks)

  • recvbuff – 包含接收数据的缓冲区

  • recvcount – 要接收的项数量

  • op – 要执行的规约操作

  • stream – 同步操作的 CUDA 流

template<typename value_t>
inline void device_send(const value_t *buf, size_t size, int dest, cudaStream_t stream) const#

执行点对点发送

如果一个线程同时进行发送和接收,请使用 device_sendrecv 以避免死锁。

模板参数:

value_t – 要发送的数据类型

参数:
  • buf – 指向要发送数据数组的指针

  • size – buf 中的元素数量

  • dest – 目标 rank

  • stream – 同步操作的 CUDA 流

template<typename value_t>
inline void device_recv(value_t *buf, size_t size, int source, cudaStream_t stream) const#

执行点对点接收

如果一个线程同时进行发送和接收,请使用 device_sendrecv 以避免死锁。

模板参数:

value_t – 要接收的数据类型

参数:
  • buf – 指向将存储接收到的数据(已初始化)数组的指针

  • size – buf 中的元素数量

  • source – 源 rank

  • stream – 同步操作的 CUDA 流

template<typename value_t>
inline void device_sendrecv(const value_t *sendbuf, size_t sendsize, int dest, value_t *recvbuf, size_t recvsize, int source, cudaStream_t stream) const#

执行点对点发送/接收

模板参数:

value_t – 要发送和接收的数据类型

参数:
  • sendbuf – 指向要发送数据数组的指针

  • sendsize – sendbuf 中的元素数量

  • dest – 目标 rank

  • recvbuf – 指向将存储接收到的数据(已初始化)数组的指针

  • recvsize – recvbuf 中的元素数量

  • source – 源 rank

  • stream – 同步操作的 CUDA 流

template<typename value_t>
inline void device_multicast_sendrecv(const value_t *sendbuf, std::vector<size_t> const &sendsizes, std::vector<size_t> const &sendoffsets, std::vector<int> const &dests, value_t *recvbuf, std::vector<size_t> const &recvsizes, std::vector<size_t> const &recvoffsets, std::vector<int> const &sources, cudaStream_t stream) const#

执行组播发送/接收

模板参数:

value_t – 要发送和接收的数据类型

参数:
  • sendbuf – 指向要发送数据数组的指针

  • sendsizes – 要发送的元素数量

  • sendoffsets – 相对于 sendbuf 的元素数量偏移量

  • dests – 目标 rank

  • recvbuf – 指向将存储接收到的数据(已初始化)数组的指针

  • recvsizes – 要接收的元素数量

  • recvoffsets – 相对于 recvbuf 的元素数量偏移量

  • sources – 源 rank

  • stream – 同步操作的 CUDA 流

inline void group_start() const#

放在 group_start()group_end() 之间的多个集合操作和设备发送/接收操作会被合并为一个大型操作。在内部,此函数是 ncclGroupStart() 的封装。

inline void group_end() const#

放在 group_start()group_end() 之间的多个集合操作和设备发送/接收操作会被合并为一个大型操作。在内部,此函数是 ncclGroupEnd() 的封装。

MPI Comms#

inline void initialize_mpi_comms(resources *handle, MPI_Comm comm)#

给定一个正确初始化的 MPI_Comm,构造 RAFT 的 MPI 通信器实例并将其注入给定的 RAFT handle 实例中

#include <raft/comms/mpi_comms.hpp>
#include <raft/core/device_mdarray.hpp>

MPI_Comm mpi_comm;
raft::raft::resources handle;

initialize_mpi_comms(&handle, mpi_comm);
...
const auto& comm = resource::get_comms(handle);
auto gather_data = raft::make_device_vector<float>(handle, comm.get_size());
...
comm.allgather((gather_data.data_handle())[comm.get_rank()],
               gather_data.data_handle(),
               1,
               resource::get_cuda_stream(handle));

comm.sync_stream(resource::get_cuda_stream(handle));

参数:
  • handle – 用于管理昂贵资源的 raft handle

  • comm – 已初始化的 MPI 通信器

NCCL+UCX Comms#

void build_comms_nccl_only(raft::resources *handle, ncclComm_t nccl_comm, int num_ranks, int rank)#

用于构造 RAFT NCCL 通信器并将其注入到 RAFT handle 的工厂函数。

#include <raft/comms/std_comms.hpp>
#include <raft/core/device_mdarray.hpp>

ncclComm_t nccl_comm;
raft::resources handle;

build_comms_nccl_only(&handle, nccl_comm, 5, 0);
...
const auto& comm = resource::get_comms(handle);
auto gather_data = raft::make_device_vector<float>(handle, comm.get_size());
...
comm.allgather((gather_data.data_handle())[comm.get_rank()],
               gather_data.data_handle(),
               1,
               resource::get_cuda_stream(handle));

comm.sync_stream(resource::get_cuda_stream(handle));

参数:
  • handle – 用于注入 comms 的 raft::resources

  • nccl_comm – 用于集合操作的已初始化 NCCL 通信器

  • num_ranks – 通信器组中的 rank 数量

  • rank – 本地实例的 rank

void build_comms_nccl_ucx(resources *handle, ncclComm_t nccl_comm, bool is_ucxx, void *ucp_worker, void *eps, int num_ranks, int rank)#

用于构造 RAFT NCCL+UCX 并将其注入到 RAFT handle 的工厂函数。

#include <raft/comms/std_comms.hpp>
#include <raft/core/device_mdarray.hpp>

ncclComm_t nccl_comm;
raft::resources handle;
ucp_worker_h ucp_worker;
ucp_ep_h *ucp_endpoints_arr;

build_comms_nccl_ucx(&handle, nccl_comm, &ucp_worker, ucp_endpoints_arr, 5, 0);
...
const auto& comm = resource::get_comms(handle);
auto gather_data = raft::make_device_vector<float>(handle, comm.get_size());
...
comm.allgather((gather_data.data_handle())[comm.get_rank()],
               gather_data.data_handle(),
               1,
               resource::get_cuda_stream(handle));

comm.sync_stream(resource::get_cuda_stream(handle));

参数:
  • handle – 用于注入 comms 的 raft::resources

  • nccl_comm – 用于集合操作的已初始化 NCCL 通信器

  • is_ucxxucp_workereps 对象是 UCXX (true) 还是纯 UCX (false)。

  • ucp_worker – 本地进程的 worker。注意:此处有意保留为 void* 类型,以便 ucp_worker_h 无需通过 cython 层暴露。

  • eps – ucp_ep_h 实例数组。注意:此处有意保留为 void* 类型,以便 ucp_ep_h 无需通过 cython 层暴露。

  • num_ranks – 通信器组中的 rank 数量

  • rank – 本地实例的 rank