kafka_consumer.hpp
1 /*
2  * Copyright (c) 2020-2022, NVIDIA CORPORATION.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * https://apache.ac.cn/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17 
18 #include "kafka_callback.hpp"
19 
20 #include <cudf/io/datasource.hpp>
21 
22 #include <librdkafka/rdkafkacpp.h>
23 
24 #include <algorithm>
25 #include <chrono>
26 #include <map>
27 #include <memory>
28 #include <string>
29 
30 namespace cudf {
31 namespace io {
32 namespace external {
33 namespace kafka {
34 
41  public
61  kafka_consumer(std::map<std::string, std::string> configs,
62  python_callable_type python_callable,
63  kafka_oauth_callback_wrapper_type callable_wrapper);
64 
85  kafka_consumer(std::map<std::string, std::string> configs,
86  python_callable_type python_callable,
87  kafka_oauth_callback_wrapper_type callable_wrapper,
88  std::string const& topic_name,
89  int partition,
90  int64_t start_offset,
91  int64_t end_offset,
92  int batch_timeout,
93  std::string const& delimiter);
94 
103  std::unique_ptr<cudf::io::datasource::buffer> host_read(size_t offset, size_t size) override;
104 
110  size_t size() const override;
111 
121  size_t host_read(size_t offset, size_t size, uint8_t* dst) override;
122 
133  void commit_offset(std::string const& topic, int partition, int64_t offset);
134 
146  std::map<std::string, int64_t> get_watermark_offset(std::string const& topic,
147  int partition,
148  int timeout,
149  bool cached);
150 
156  std::map<std::string, std::string> current_configs();
157 
166  int64_t get_committed_offset(std::string const& topic, int partition);
167 
177  std::map<std::string, std::vector<int32_t>> list_topics(std::string specific_topic);
178 
185  void close(int timeout);
186 
193  void unsubscribe();
194 
195  virtual ~kafka_consumer(){};
196 
197  private
198  std::unique_ptr<RdKafka::Conf> kafka_conf; // RDKafka 配置对象
199  std::unique_ptr<RdKafka::KafkaConsumer> consumer;
200 
201  std::map<std::string, std::string> configs;
202  python_callable_type python_callable_;
203  kafka_oauth_callback_wrapper_type callable_wrapper_;
204 
205  std::string topic_name;
206  int partition;
207  int64_t start_offset;
208  int64_t end_offset;
209  int batch_timeout;
210  int default_timeout = 10000; // 毫秒
211  std::string delimiter;
212 
213  std::string buffer;
214 
215  private
216  RdKafka::ErrorCode update_consumer_topic_partition_assignment(std::string const& topic,
217  int partition,
218  int64_t offset);
219 
223  int64_t now();
224 
225  void consume_to_buffer();
226 };
227 
228 } // namespace kafka
229 } // namespace external
230 } // namespace io
231 } // namespace cudf
为读取器提供输入数据的接口类。
用于 Apache Kafka 的 libcudf 数据源
void unsubscribe()
停止所有活动的消费并取消消费者对主题/分区实例的订阅。
std::unique_ptr< cudf::io::datasource::buffer > host_read(size_t offset, size_t size) override
返回包含 Kafka 主题中部分数据的缓冲区。
std::map< std::string, std::string > current_configs()
检索当前 Kafka 客户端配置。
std::map< std::string, std::vector< int32_t > > list_topics(std::string specific_topic)
向 Kafka 代理查询某个主题的分区列表。如果未指定主题,则...
kafka_consumer(std::map< std::string, std::string > configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper)
创建一个处于半就绪状态的 Kafka 消费者对象实例。
size_t host_read(size_t offset, size_t size, uint8_t *dst) override
将选定的范围读入预分配的缓冲区。
void commit_offset(std::string const &topic, int partition, int64_t offset)
将偏移量提交到指定的 Kafka 主题/分区实例。
kafka_consumer(std::map< std::string, std::string > configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper, std::string const &topic_name, int partition, int64_t start_offset, int64_t end_offset, int batch_timeout, std::string const &delimiter)
实例化一个 Kafka 消费者对象。有关 librdkafka 配置的文档可在以下位置找到 http...
void close(int timeout)
关闭与 Kafka 的底层套接字连接并清理系统资源。
size_t size() const override
返回 Kafka 缓冲区中的数据大小。
int64_t get_committed_offset(std::string const &topic, int partition)
获取成功提交到 Kafka 代理的最新偏移量。
std::map< std::string, int64_t > get_watermark_offset(std::string const &topic, int partition, int timeout, bool cached)
检索主题/分区的最高/最低偏移量值。
cuDF 接口
定义: host_udf.hpp:37