动态批处理#

动态批处理允许将小型搜索请求分组到批次中,以提高设备占用率和吞吐量,同时将延迟保持在限制范围内。

#include <cuvs/neighbors/dynamic_batching.hpp>

namespace cuvs::neighbors::dynamic_batching

索引构建参数#

struct index_params : public cuvs::neighbors::index_params#
#include <dynamic_batching.hpp>

公共成员

int64_t k#

要搜索的邻居数量在构造时固定。

int64_t max_batch_size = 100#

提交给上游索引的最大批次大小。

size_t n_queues = 3#

独立请求队列的数量。

每个队列都与一个唯一的 CUDA 流和 IO 设备缓冲区相关联。如果并发请求数量很高,使用多个队列可以在一个队列忙时填充数据并准备批次。此外,队列是并发提交的;这可以通过隐藏内核启动延迟来更好地利用 GPU,有助于提高吞吐量。

bool conservative_dispatch = false#

默认情况下(conservative_dispatch = false),第一个将查询提交到批次的 CPU 线程会尽快分派上游搜索函数(在批次满之前)。在这种情况下,在调用上游搜索时不知道最终批次大小,因此每次都以最大批次大小运行上游搜索,即使批次中只有一个有效查询。这会减少延迟,但会浪费 GPU 资源。

另一种行为(conservative_dispatch = true)更为保守:分派线程启动收集输入查询的内核,但会等到批次满或等待时间超过。只有那时它才会获取实际批次大小并启动上游搜索。因此,以暴露上游搜索延迟为代价,可以减少 GPU 资源的浪费。

经验法则:对于较大的 max_batch_size,设置 conservative_dispatch = true,否则保持禁用状态。

索引搜索参数#

struct search_params : public cuvs::neighbors::search_params#
#include <dynamic_batching.hpp>

公共成员

double dispatch_timeout_ms = 1.0#

请求可以在队列中停留多久(毫秒)。注意,这仅影响分派时间,不反映完整的请求延迟;后者取决于上游搜索参数和批次大小。

索引#

template<typename T, typename IdxT>
struct index : public cuvs::neighbors::index#
#include <dynamic_batching.hpp>

轻量级动态批处理索引包装器。

一个轻量级动态批处理索引管理一个单一索引和一个单一搜索参数集。该结构应通过复制语义在多个用户之间共享:对底层实现的访问通过共享指针管理,并且参与者之间的并发搜索是线程安全的。

使用示例

using namespace cuvs::neighbors;
// When creating a dynamic batching index, k parameter has to be passed explicitly.
// The first empty braces default-initialize the parent `neighbors::index_params` (unused).
dynamic_batching::index_params dynb_index_params{{}, k};
// Construct the index by wrapping the upstream index and search parameters.
dynamic_batching::index<float, uint32_t> index{
    res, dynb_index_params, upstream_index, upstream_search_params
};
// Use default search parameters
dynamic_batching::search_params search_params;
// Search K nearest neighbours
auto neighbors = raft::make_device_matrix<uint32_t>(res, n_queries, k);
auto distances = raft::make_device_matrix<float>(res, n_queries, k);
dynamic_batching::search(
    res, search_params, index, queries, neighbors.view(), distances.view()
);

优先级队列

动态批处理索引对单个请求的优先级支持有限。批处理程序中只有一个队列池,没有优先处理某个批次的功能。每个请求中传递的 search_params::dispatch_timeout_ms 参数在内部汇总,并且批次的分派不会晚于任何一个超时时间的到来。在这种逻辑下,高优先级请求的处理时间永远不会早于任何更早提交的低优先级请求。

然而,动态批处理索引是轻量级的,不包含任何全局或静态状态。这意味着很容易组合多个批处理程序。例如,您可以为每个优先级类构建一个批处理索引

using namespace cuvs::neighbors;
// Large batch size (128), couple queues (2),
//   enabled conservative dispatch - all for better throughput
dynamic_batching::index_params low_priority_params{{}, k, 128, 2, true};
// Small batch size (16), more queues (4),
//   disabled conservative dispatch - to minimize latency with reasonable throughput
dynamic_batching::index_params high_priority_params{{}, k, 16, 4, false};
// Construct the indexes by wrapping the upstream index and search parameters.
dynamic_batching::index<float, uint32_t> low_priority_index{
    res, low_priority_params, upstream_index, upstream_search_params
};
dynamic_batching::index<float, uint32_t> high_priority_index{
    res, high_priority_params, upstream_index, upstream_search_params
};
// Define a combined search function with priority selection
double high_priority_threshold_ms = 0.1;
auto search_function =
   [low_priority_index, high_priority_index, high_priority_threshold_ms](
     raft::resources const &res,
     dynamic_batching::search_params search_params,
     raft::device_matrix_view<const float, int64_t> queries,
     raft::device_matrix_view<uint32_t, int64_t> neighbors,
     raft::device_matrix_view<float, int64_t> distances) {
   dynamic_batching::search(
       res,
       search_params,
       search_params.dispatch_timeout_ms < high_priority_threshold_ms
         ? high_priority_index : low_priority_index,
       queries,
       neighbors,
       distances
   );
};

模板参数:
  • T – 数据类型

  • IdxT – 索引类型

公共函数

template<typename Upstream>
index(
const raft::resources &res,
const cuvs::neighbors::dynamic_batching::index_params &params,
const Upstream &upstream_index,
const typename Upstream::search_params_type &upstream_params,
const cuvs::neighbors::filtering::base_filter *sample_filter = nullptr
)#

通过包装上游索引来构建动态批处理索引。

模板参数:

Upstream – 上游索引类型

参数:
  • res[in] raft 资源

  • params[in] 动态批处理参数

  • upstream_index[in] 执行搜索的原始索引(该引用在动态批处理索引的生命周期内必须保持有效)

  • upstream_params[in] 批次中所有查询的原始索引搜索参数(这些参数在动态批处理索引的生命周期内按值捕获)

  • sample_filter[in] 过滤函数(如果有),批次中所有请求必须相同(该指针在动态批处理索引的生命周期内必须保持有效)