18 #include "kafka_callback.hpp"
22 #include <librdkafka/rdkafkacpp.h>
62 python_callable_type python_callable,
63 kafka_oauth_callback_wrapper_type callable_wrapper);
86 python_callable_type python_callable,
87 kafka_oauth_callback_wrapper_type callable_wrapper,
88 std::string
const& topic_name,
93 std::string
const& delimiter);
103 std::unique_ptr<cudf::io::datasource::buffer>
host_read(
size_t offset,
size_t size)
override;
133 void commit_offset(std::string
const& topic,
int partition, int64_t offset);
177 std::map<std::string, std::vector<int32_t>>
list_topics(std::string specific_topic);
198 std::unique_ptr<RdKafka::Conf> kafka_conf;
199 std::unique_ptr<RdKafka::KafkaConsumer> consumer;
201 std::map<std::string, std::string> configs;
202 python_callable_type python_callable_;
203 kafka_oauth_callback_wrapper_type callable_wrapper_;
205 std::string topic_name;
207 int64_t start_offset;
210 int default_timeout = 10000;
211 std::string delimiter;
216 RdKafka::ErrorCode update_consumer_topic_partition_assignment(std::string
const& topic,
225 void consume_to_buffer();
用于 Apache Kafka 的 libcudf 数据源
void unsubscribe()
停止所有活动的消费并取消消费者对主题/分区实例的订阅。
std::unique_ptr< cudf::io::datasource::buffer > host_read(size_t offset, size_t size) override
返回包含 Kafka 主题中部分数据的缓冲区。
std::map< std::string, std::string > current_configs()
检索当前 Kafka 客户端配置。
std::map< std::string, std::vector< int32_t > > list_topics(std::string specific_topic)
向 Kafka 代理查询某个主题的分区列表。如果未指定主题,则...
kafka_consumer(std::map< std::string, std::string > configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper)
创建一个处于半就绪状态的 Kafka 消费者对象实例。
size_t host_read(size_t offset, size_t size, uint8_t *dst) override
将选定的范围读入预分配的缓冲区。
void commit_offset(std::string const &topic, int partition, int64_t offset)
将偏移量提交到指定的 Kafka 主题/分区实例。
kafka_consumer(std::map< std::string, std::string > configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper, std::string const &topic_name, int partition, int64_t start_offset, int64_t end_offset, int batch_timeout, std::string const &delimiter)
实例化一个 Kafka 消费者对象。有关 librdkafka 配置的文档可在以下位置找到 http...
void close(int timeout)
关闭与 Kafka 的底层套接字连接并清理系统资源。
size_t size() const override
返回 Kafka 缓冲区中的数据大小。
int64_t get_committed_offset(std::string const &topic, int partition)
获取成功提交到 Kafka 代理的最新偏移量。
std::map< std::string, int64_t > get_watermark_offset(std::string const &topic, int partition, int timeout, bool cached)
检索主题/分区的最高/最低偏移量值。