Apache Kafka 的 libcudf 数据源 更多...
#include <kafka_consumer.hpp>
公有成员函数 | |
kafka_consumer (std::map< std::string, std::string > configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper) | |
创建处于半就绪状态的 Kafka 消费者对象实例。 更多... | |
kafka_consumer (std::map< std::string, std::string > configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper, std::string const &topic_name, int partition, int64_t start_offset, int64_t end_offset, int batch_timeout, std::string const &delimiter) | |
实例化一个 Kafka 消费者对象。librdkafka 配置文档可在 https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md 找到。 更多... | |
std::unique_ptr< cudf::io::datasource::buffer > | host_read (size_t offset, size_t size) override |
返回包含 Kafka 主题数据子集的缓冲区。 更多... | |
size_t | size () const override |
返回 Kafka 缓冲区中数据的大小。 更多... | |
size_t | host_read (size_t offset, size_t size, uint8_t *dst) override |
将选定范围的数据读入预分配的缓冲区。 更多... | |
void | commit_offset (std::string const &topic, int partition, int64_t offset) |
提交偏移量到指定的 Kafka 主题/分区实例。 更多... | |
std::map< std::string, int64_t > | get_watermark_offset (std::string const &topic, int partition, int timeout, bool cached) |
获取主题/分区的水位偏移量值。 更多... | |
std::map< std::string, std::string > | current_configs () |
获取当前 Kafka 客户端配置。 更多... | |
int64_t | get_committed_offset (std::string const &topic, int partition) |
获取成功提交到 Kafka 代理的最新偏移量。 更多... | |
std::map< std::string, std::vector< int32_t > > | list_topics (std::string specific_topic) |
查询 Kafka 代理以获取某个主题的分区列表。如果未指定主题,则将获取代理中所有主题的分区。 更多... | |
void | close (int timeout) |
关闭到底层 Kafka 的 socket 连接并清理系统资源。 更多... | |
void | unsubscribe () |
停止所有活跃的消费并移除消费者对主题/分区实例的订阅。 更多... | |
![]() | |
virtual | ~datasource ()=default |
基类析构函数。 | |
virtual std::future< std::unique_ptr< datasource::buffer > > | host_read_async (size_t offset, size_t size) |
从数据源异步读取指定部分数据。 更多... | |
virtual std::future< size_t > | host_read_async (size_t offset, size_t size, uint8_t *dst) |
从数据源异步读取数据到提供的uyang 主机内存缓冲区。 更多... | |
virtual bool | supports_device_read () const |
此数据源是否支持直接读入设备内存。 更多... | |
virtual bool | is_device_read_preferred (size_t size) const |
估计对于给定大小的数据,直接进行设备读取是否更优。 更多... | |
virtual std::unique_ptr< datasource::buffer > | device_read (size_t offset, size_t size, rmm::cuda_stream_view stream) |
返回包含数据源数据子集的设备缓冲区。 更多... | |
virtual size_t | device_read (size_t offset, size_t size, uint8_t *dst, rmm::cuda_stream_view stream) |
将选定范围的数据读入预分配的设备缓冲区。 更多... | |
virtual std::future< size_t > | device_read_async (size_t offset, size_t size, uint8_t *dst, rmm::cuda_stream_view stream) |
异步将选定范围的数据读入预分配的设备缓冲区。 更多... | |
virtual bool | is_empty () const |
返回数据源是否包含任何数据。 更多... | |
其他继承成员 | |
![]() | |
static std::unique_ptr< datasource > | create (std::string const &filepath, size_t offset=0, size_t max_size_estimate=0) |
从文件路径创建数据源。 更多... | |
static std::unique_ptr< datasource > | create (host_buffer const &buffer) |
从主机内存缓冲区创建数据源。 更多... | |
static std::unique_ptr< datasource > | create (cudf::host_span< std::byte const > buffer) |
从主机内存缓冲区创建数据源。 更多... | |
static std::unique_ptr< datasource > | create (cudf::device_span< std::byte const > buffer) |
从设备内存缓冲区创建数据源。 更多... | |
static std::unique_ptr< datasource > | create (datasource *source) |
从用户实现的数据源对象创建数据源。 更多... | |
template<typename T > | |
static std::vector< std::unique_ptr< datasource > > | create (std::vector< T > const &args) |
创建数据源向量,输入向量中的每个元素对应一个数据源。 更多... | |
Apache Kafka 的 libcudf 数据源
定义于文件 kafka_consumer.hpp 的 40 行。
cudf::io::external::kafka::kafka_consumer::kafka_consumer | ( | std::map< std::string, std::string > | configs, |
python_callable_type | python_callable, | ||
kafka_oauth_callback_wrapper_type | callable_wrapper | ||
) |
创建处于半就绪状态的 Kafka 消费者对象实例。
处于半就绪状态的消费者不具备成功与 Kafka 代理交互所需的所有参数。但在半就绪状态下,仍可进行 Kafka 元数据操作。这对于仅计划使用这些元数据操作的客户端非常有用。当分区和主题的延迟分配需求无法提前获知且需要尽可能延迟时,这种状态也很有用。librdkafka 配置文档可在 https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md 找到
configs | 将传递给 librdkafka 客户端的键/值对 librdkafka 配置 |
python_callable | python_callable_type 指向 Python functools.partial 对象的指针 |
callable_wrapper | kafka_oauth_callback_wrapper_type 将用于调用 python_callable 的 Cython 包装器。此包装器的作用是避免在 libcudf_kafka 中链接 Python 开发库。 |
cudf::io::external::kafka::kafka_consumer::kafka_consumer | ( | std::map< std::string, std::string > | configs, |
python_callable_type | python_callable, | ||
kafka_oauth_callback_wrapper_type | callable_wrapper, | ||
std::string const & | topic_name, | ||
int | partition, | ||
int64_t | start_offset, | ||
int64_t | end_offset, | ||
int | batch_timeout, | ||
std::string const & | delimiter | ||
) |
实例化一个 Kafka 消费者对象。librdkafka 配置文档可在 https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md 找到。
configs | 将传递给 librdkafka 客户端的键/值对 librdkafka 配置 |
python_callable | python_callable_type 指向 Python functools.partial 对象的指针 |
callable_wrapper | kafka_oauth_callback_wrapper_type 将用于调用 python_callable 的 Cython 包装器。此包装器的作用是避免在 libcudf_kafka 中链接 Python 开发库。 |
topic_name | 要消费的 Kafka 主题名称 |
partition | 要消费的分区索引,范围在 0 到 TOPIC_NUM_PARTITIONS - 1 (包含)之间 |
start_offset | 指定 TOPPAR(主题/分区组合)的查找位置 |
end_offset | 在指定的 TOPPAR 中要读取到的位置 |
batch_timeout | 允许的最大读取时间(毫秒)。如果在达到 batch_timeout 之前未达到 end_offset,则将返回一个较小的子集 |
delimiter | 要在 Kafka 消息之间插入到输出中的可选分隔符,例如:"\n" |
void cudf::io::external::kafka::kafka_consumer::close | ( | int | timeout | ) |
void cudf::io::external::kafka::kafka_consumer::commit_offset | ( | std::string const & | topic, |
int | partition, | ||
int64_t | offset | ||
) |
提交偏移量到指定的 Kafka 主题/分区实例。
cudf::logic_error | 提交分区偏移量失败时 |
[in] | topic | 应设置偏移量的 Kafka 主题名称 |
[in] | partition | 指定主题上应使用的分区 |
[in] | offset | 应为主题/分区对设置的偏移量 |
std::map<std::string, std::string> cudf::io::external::kafka::kafka_consumer::current_configs | ( | ) |
获取当前 Kafka 客户端配置。
int64_t cudf::io::external::kafka::kafka_consumer::get_committed_offset | ( | std::string const & | topic, |
int | partition | ||
) |
获取成功提交到 Kafka 代理的最新偏移量。
[in] | topic | 主题/分区对的主题名称 |
[in] | partition | 主题/分区对的分区号 |
std::map<std::string, int64_t> cudf::io::external::kafka::kafka_consumer::get_watermark_offset | ( | std::string const & | topic, |
int | partition, | ||
int | timeout, | ||
bool | cached | ||
) |
获取主题/分区的水位偏移量值。
[in] | topic | 应检索水位偏移量的 Kafka 主题名称 |
[in] | partition | 指定主题上应使用的分区 |
[in] | timeout | 等待 Kafka 代理响应的最大毫秒数 |
[in] | cached | 如果为 True,则使用上次从 Kafka 代理检索到的值;如果为 False,则通过网络请求从 Kafka 代理检索最新值。 |
|
overridevirtual |
|
overridevirtual |
将选定范围的数据读入预分配的缓冲区。
[in] | offset | 从开头开始的字节数 |
[in] | size | 要读取的字节数 |
[in] | dst | 现有主机内存的地址 |
std::map<std::string, std::vector<int32_t> > cudf::io::external::kafka::kafka_consumer::list_topics | ( | std::string | specific_topic | ) |
查询 Kafka 代理以获取某个主题的分区列表。如果未指定主题,则将获取代理中所有主题的分区。
[in] | specific_topic | 要检索分区的特定主题名称。如果为空,则将检索所有主题的分区。 |
|
overridevirtual |
void cudf::io::external::kafka::kafka_consumer::unsubscribe | ( | ) |
停止所有活跃的消费并移除消费者对主题/分区实例的订阅。
cudf::logic_error | 取消订阅活跃分区分配失败时。 |