Apache Kafka 的 libcudf 数据源 更多...
#include <kafka_consumer.hpp>
公有成员函数 | |
| kafka_consumer (std::map< std::string, std::string > configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper) | |
| 创建处于半就绪状态的 Kafka 消费者对象实例。 更多... | |
| kafka_consumer (std::map< std::string, std::string > configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper, std::string const &topic_name, int partition, int64_t start_offset, int64_t end_offset, int batch_timeout, std::string const &delimiter) | |
| 实例化一个 Kafka 消费者对象。librdkafka 配置文档可在 https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md 找到。 更多... | |
| std::unique_ptr< cudf::io::datasource::buffer > | host_read (size_t offset, size_t size) override |
| 返回包含 Kafka 主题数据子集的缓冲区。 更多... | |
| size_t | size () const override |
| 返回 Kafka 缓冲区中数据的大小。 更多... | |
| size_t | host_read (size_t offset, size_t size, uint8_t *dst) override |
| 将选定范围的数据读入预分配的缓冲区。 更多... | |
| void | commit_offset (std::string const &topic, int partition, int64_t offset) |
| 提交偏移量到指定的 Kafka 主题/分区实例。 更多... | |
| std::map< std::string, int64_t > | get_watermark_offset (std::string const &topic, int partition, int timeout, bool cached) |
| 获取主题/分区的水位偏移量值。 更多... | |
| std::map< std::string, std::string > | current_configs () |
| 获取当前 Kafka 客户端配置。 更多... | |
| int64_t | get_committed_offset (std::string const &topic, int partition) |
| 获取成功提交到 Kafka 代理的最新偏移量。 更多... | |
| std::map< std::string, std::vector< int32_t > > | list_topics (std::string specific_topic) |
| 查询 Kafka 代理以获取某个主题的分区列表。如果未指定主题,则将获取代理中所有主题的分区。 更多... | |
| void | close (int timeout) |
| 关闭到底层 Kafka 的 socket 连接并清理系统资源。 更多... | |
| void | unsubscribe () |
| 停止所有活跃的消费并移除消费者对主题/分区实例的订阅。 更多... | |
继承自 cudf::io::datasource 的公有成员函数 | |
| virtual | ~datasource ()=default |
| 基类析构函数。 | |
| virtual std::future< std::unique_ptr< datasource::buffer > > | host_read_async (size_t offset, size_t size) |
| 从数据源异步读取指定部分数据。 更多... | |
| virtual std::future< size_t > | host_read_async (size_t offset, size_t size, uint8_t *dst) |
| 从数据源异步读取数据到提供的uyang 主机内存缓冲区。 更多... | |
| virtual bool | supports_device_read () const |
| 此数据源是否支持直接读入设备内存。 更多... | |
| virtual bool | is_device_read_preferred (size_t size) const |
| 估计对于给定大小的数据,直接进行设备读取是否更优。 更多... | |
| virtual std::unique_ptr< datasource::buffer > | device_read (size_t offset, size_t size, rmm::cuda_stream_view stream) |
| 返回包含数据源数据子集的设备缓冲区。 更多... | |
| virtual size_t | device_read (size_t offset, size_t size, uint8_t *dst, rmm::cuda_stream_view stream) |
| 将选定范围的数据读入预分配的设备缓冲区。 更多... | |
| virtual std::future< size_t > | device_read_async (size_t offset, size_t size, uint8_t *dst, rmm::cuda_stream_view stream) |
| 异步将选定范围的数据读入预分配的设备缓冲区。 更多... | |
| virtual bool | is_empty () const |
| 返回数据源是否包含任何数据。 更多... | |
其他继承成员 | |
继承自 cudf::io::datasource 的静态公有成员函数 | |
| static std::unique_ptr< datasource > | create (std::string const &filepath, size_t offset=0, size_t max_size_estimate=0) |
| 从文件路径创建数据源。 更多... | |
| static std::unique_ptr< datasource > | create (host_buffer const &buffer) |
| 从主机内存缓冲区创建数据源。 更多... | |
| static std::unique_ptr< datasource > | create (cudf::host_span< std::byte const > buffer) |
| 从主机内存缓冲区创建数据源。 更多... | |
| static std::unique_ptr< datasource > | create (cudf::device_span< std::byte const > buffer) |
| 从设备内存缓冲区创建数据源。 更多... | |
| static std::unique_ptr< datasource > | create (datasource *source) |
| 从用户实现的数据源对象创建数据源。 更多... | |
| template<typename T > | |
| static std::vector< std::unique_ptr< datasource > > | create (std::vector< T > const &args) |
| 创建数据源向量,输入向量中的每个元素对应一个数据源。 更多... | |
Apache Kafka 的 libcudf 数据源
定义于文件 kafka_consumer.hpp 的 40 行。
| cudf::io::external::kafka::kafka_consumer::kafka_consumer | ( | std::map< std::string, std::string > | configs, |
| python_callable_type | python_callable, | ||
| kafka_oauth_callback_wrapper_type | callable_wrapper | ||
| ) |
创建处于半就绪状态的 Kafka 消费者对象实例。
处于半就绪状态的消费者不具备成功与 Kafka 代理交互所需的所有参数。但在半就绪状态下,仍可进行 Kafka 元数据操作。这对于仅计划使用这些元数据操作的客户端非常有用。当分区和主题的延迟分配需求无法提前获知且需要尽可能延迟时,这种状态也很有用。librdkafka 配置文档可在 https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md 找到
| configs | 将传递给 librdkafka 客户端的键/值对 librdkafka 配置 |
| python_callable | python_callable_type 指向 Python functools.partial 对象的指针 |
| callable_wrapper | kafka_oauth_callback_wrapper_type 将用于调用 python_callable 的 Cython 包装器。此包装器的作用是避免在 libcudf_kafka 中链接 Python 开发库。 |
| cudf::io::external::kafka::kafka_consumer::kafka_consumer | ( | std::map< std::string, std::string > | configs, |
| python_callable_type | python_callable, | ||
| kafka_oauth_callback_wrapper_type | callable_wrapper, | ||
| std::string const & | topic_name, | ||
| int | partition, | ||
| int64_t | start_offset, | ||
| int64_t | end_offset, | ||
| int | batch_timeout, | ||
| std::string const & | delimiter | ||
| ) |
实例化一个 Kafka 消费者对象。librdkafka 配置文档可在 https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md 找到。
| configs | 将传递给 librdkafka 客户端的键/值对 librdkafka 配置 |
| python_callable | python_callable_type 指向 Python functools.partial 对象的指针 |
| callable_wrapper | kafka_oauth_callback_wrapper_type 将用于调用 python_callable 的 Cython 包装器。此包装器的作用是避免在 libcudf_kafka 中链接 Python 开发库。 |
| topic_name | 要消费的 Kafka 主题名称 |
| partition | 要消费的分区索引,范围在 0 到 TOPIC_NUM_PARTITIONS - 1(包含)之间 |
| start_offset | 指定 TOPPAR(主题/分区组合)的查找位置 |
| end_offset | 在指定的 TOPPAR 中要读取到的位置 |
| batch_timeout | 允许的最大读取时间(毫秒)。如果在达到 batch_timeout 之前未达到 end_offset,则将返回一个较小的子集 |
| delimiter | 要在 Kafka 消息之间插入到输出中的可选分隔符,例如:"\n" |
| void cudf::io::external::kafka::kafka_consumer::close | ( | int | timeout | ) |
| void cudf::io::external::kafka::kafka_consumer::commit_offset | ( | std::string const & | topic, |
| int | partition, | ||
| int64_t | offset | ||
| ) |
提交偏移量到指定的 Kafka 主题/分区实例。
| cudf::logic_error | 提交分区偏移量失败时 |
| [in] | topic | 应设置偏移量的 Kafka 主题名称 |
| [in] | partition | 指定主题上应使用的分区 |
| [in] | offset | 应为主题/分区对设置的偏移量 |
| std::map<std::string, std::string> cudf::io::external::kafka::kafka_consumer::current_configs | ( | ) |
获取当前 Kafka 客户端配置。
| int64_t cudf::io::external::kafka::kafka_consumer::get_committed_offset | ( | std::string const & | topic, |
| int | partition | ||
| ) |
获取成功提交到 Kafka 代理的最新偏移量。
| [in] | topic | 主题/分区对的主题名称 |
| [in] | partition | 主题/分区对的分区号 |
| std::map<std::string, int64_t> cudf::io::external::kafka::kafka_consumer::get_watermark_offset | ( | std::string const & | topic, |
| int | partition, | ||
| int | timeout, | ||
| bool | cached | ||
| ) |
获取主题/分区的水位偏移量值。
| [in] | topic | 应检索水位偏移量的 Kafka 主题名称 |
| [in] | partition | 指定主题上应使用的分区 |
| [in] | timeout | 等待 Kafka 代理响应的最大毫秒数 |
| [in] | cached | 如果为 True,则使用上次从 Kafka 代理检索到的值;如果为 False,则通过网络请求从 Kafka 代理检索最新值。 |
|
overridevirtual |
|
overridevirtual |
将选定范围的数据读入预分配的缓冲区。
| [in] | offset | 从开头开始的字节数 |
| [in] | size | 要读取的字节数 |
| [in] | dst | 现有主机内存的地址 |
| std::map<std::string, std::vector<int32_t> > cudf::io::external::kafka::kafka_consumer::list_topics | ( | std::string | specific_topic | ) |
查询 Kafka 代理以获取某个主题的分区列表。如果未指定主题,则将获取代理中所有主题的分区。
| [in] | specific_topic | 要检索分区的特定主题名称。如果为空,则将检索所有主题的分区。 |
|
overridevirtual |
| void cudf::io::external::kafka::kafka_consumer::unsubscribe | ( | ) |
停止所有活跃的消费并移除消费者对主题/分区实例的订阅。
| cudf::logic_error | 取消订阅活跃分区分配失败时。 |