公有成员函数 | 所有成员列表
cudf::io::external::kafka::kafka_consumer 类参考

Apache Kafka 的 libcudf 数据源 更多...

#include <kafka_consumer.hpp>

cudf::io::external::kafka::kafka_consumer 的继承图
cudf::io::datasource

公有成员函数

 kafka_consumer (std::map< std::string, std::string > configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper)
 创建处于半就绪状态的 Kafka 消费者对象实例。 更多...
 
 kafka_consumer (std::map< std::string, std::string > configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper, std::string const &topic_name, int partition, int64_t start_offset, int64_t end_offset, int batch_timeout, std::string const &delimiter)
 实例化一个 Kafka 消费者对象。librdkafka 配置文档可在 https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md 找到。 更多...
 
std::unique_ptr< cudf::io::datasource::bufferhost_read (size_t offset, size_t size) override
 返回包含 Kafka 主题数据子集的缓冲区。 更多...
 
size_t size () const override
 返回 Kafka 缓冲区中数据的大小。 更多...
 
size_t host_read (size_t offset, size_t size, uint8_t *dst) override
 将选定范围的数据读入预分配的缓冲区。 更多...
 
void commit_offset (std::string const &topic, int partition, int64_t offset)
 提交偏移量到指定的 Kafka 主题/分区实例。 更多...
 
std::map< std::string, int64_t > get_watermark_offset (std::string const &topic, int partition, int timeout, bool cached)
 获取主题/分区的水位偏移量值。 更多...
 
std::map< std::string, std::string > current_configs ()
 获取当前 Kafka 客户端配置。 更多...
 
int64_t get_committed_offset (std::string const &topic, int partition)
 获取成功提交到 Kafka 代理的最新偏移量。 更多...
 
std::map< std::string, std::vector< int32_t > > list_topics (std::string specific_topic)
 查询 Kafka 代理以获取某个主题的分区列表。如果未指定主题,则将获取代理中所有主题的分区。 更多...
 
void close (int timeout)
 关闭到底层 Kafka 的 socket 连接并清理系统资源。 更多...
 
void unsubscribe ()
 停止所有活跃的消费并移除消费者对主题/分区实例的订阅。 更多...
 
- 继承自 cudf::io::datasource 的公有成员函数
virtual ~datasource ()=default
 基类析构函数。
 
virtual std::future< std::unique_ptr< datasource::buffer > > host_read_async (size_t offset, size_t size)
 从数据源异步读取指定部分数据。 更多...
 
virtual std::future< size_t > host_read_async (size_t offset, size_t size, uint8_t *dst)
 从数据源异步读取数据到提供的uyang 主机内存缓冲区。 更多...
 
virtual bool supports_device_read () const
 此数据源是否支持直接读入设备内存。 更多...
 
virtual bool is_device_read_preferred (size_t size) const
 估计对于给定大小的数据,直接进行设备读取是否更优。 更多...
 
virtual std::unique_ptr< datasource::bufferdevice_read (size_t offset, size_t size, rmm::cuda_stream_view stream)
 返回包含数据源数据子集的设备缓冲区。 更多...
 
virtual size_t device_read (size_t offset, size_t size, uint8_t *dst, rmm::cuda_stream_view stream)
 将选定范围的数据读入预分配的设备缓冲区。 更多...
 
virtual std::future< size_t > device_read_async (size_t offset, size_t size, uint8_t *dst, rmm::cuda_stream_view stream)
 异步将选定范围的数据读入预分配的设备缓冲区。 更多...
 
virtual bool is_empty () const
 返回数据源是否包含任何数据。 更多...
 

其他继承成员

- 继承自 cudf::io::datasource 的静态公有成员函数
static std::unique_ptr< datasourcecreate (std::string const &filepath, size_t offset=0, size_t max_size_estimate=0)
 从文件路径创建数据源。 更多...
 
static std::unique_ptr< datasourcecreate (host_buffer const &buffer)
 从主机内存缓冲区创建数据源。 更多...
 
static std::unique_ptr< datasourcecreate (cudf::host_span< std::byte const > buffer)
 从主机内存缓冲区创建数据源。 更多...
 
static std::unique_ptr< datasourcecreate (cudf::device_span< std::byte const > buffer)
 从设备内存缓冲区创建数据源。 更多...
 
static std::unique_ptr< datasourcecreate (datasource *source)
 从用户实现的数据源对象创建数据源。 更多...
 
template<typename T >
static std::vector< std::unique_ptr< datasource > > create (std::vector< T > const &args)
 创建数据源向量,输入向量中的每个元素对应一个数据源。 更多...
 

详细描述

Apache Kafka 的 libcudf 数据源

定义于文件 kafka_consumer.hpp40 行。

构造函数与析构函数文档

◆ kafka_consumer() [1/2]

cudf::io::external::kafka::kafka_consumer::kafka_consumer ( std::map< std::string, std::string >  configs,
python_callable_type  python_callable,
kafka_oauth_callback_wrapper_type  callable_wrapper 
)

创建处于半就绪状态的 Kafka 消费者对象实例。

处于半就绪状态的消费者不具备成功与 Kafka 代理交互所需的所有参数。但在半就绪状态下,仍可进行 Kafka 元数据操作。这对于仅计划使用这些元数据操作的客户端非常有用。当分区和主题的延迟分配需求无法提前获知且需要尽可能延迟时,这种状态也很有用。librdkafka 配置文档可在 https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md 找到

参数
configs将传递给 librdkafka 客户端的键/值对 librdkafka 配置
python_callablepython_callable_type 指向 Python functools.partial 对象的指针
callable_wrapperkafka_oauth_callback_wrapper_type 将用于调用 python_callable 的 Cython 包装器。此包装器的作用是避免在 libcudf_kafka 中链接 Python 开发库。

◆ kafka_consumer() [2/2]

cudf::io::external::kafka::kafka_consumer::kafka_consumer ( std::map< std::string, std::string >  configs,
python_callable_type  python_callable,
kafka_oauth_callback_wrapper_type  callable_wrapper,
std::string const &  topic_name,
int  partition,
int64_t  start_offset,
int64_t  end_offset,
int  batch_timeout,
std::string const &  delimiter 
)

实例化一个 Kafka 消费者对象。librdkafka 配置文档可在 https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md 找到。

参数
configs将传递给 librdkafka 客户端的键/值对 librdkafka 配置
python_callablepython_callable_type 指向 Python functools.partial 对象的指针
callable_wrapperkafka_oauth_callback_wrapper_type 将用于调用 python_callable 的 Cython 包装器。此包装器的作用是避免在 libcudf_kafka 中链接 Python 开发库。
topic_name要消费的 Kafka 主题名称
partition要消费的分区索引,范围在 0TOPIC_NUM_PARTITIONS - 1(包含)之间
start_offset指定 TOPPAR(主题/分区组合)的查找位置
end_offset在指定的 TOPPAR 中要读取到的位置
batch_timeout允许的最大读取时间(毫秒)。如果在达到 batch_timeout 之前未达到 end_offset,则将返回一个较小的子集
delimiter要在 Kafka 消息之间插入到输出中的可选分隔符,例如:"\n"

成员函数文档

◆ close()

void cudf::io::external::kafka::kafka_consumer::close ( int  timeout)

关闭到底层 Kafka 的 socket 连接并清理系统资源。

异常
cudf::logic_error关闭连接失败时
参数
timeout等待响应的最大毫秒数

◆ commit_offset()

void cudf::io::external::kafka::kafka_consumer::commit_offset ( std::string const &  topic,
int  partition,
int64_t  offset 
)

提交偏移量到指定的 Kafka 主题/分区实例。

异常
cudf::logic_error提交分区偏移量失败时
参数
[in]topic应设置偏移量的 Kafka 主题名称
[in]partition指定主题上应使用的分区
[in]offset应为主题/分区对设置的偏移量

◆ current_configs()

std::map<std::string, std::string> cudf::io::external::kafka::kafka_consumer::current_configs ( )

获取当前 Kafka 客户端配置。

返回值
Map<string, string> 形式的当前客户端配置的键/值对

◆ get_committed_offset()

int64_t cudf::io::external::kafka::kafka_consumer::get_committed_offset ( std::string const &  topic,
int  partition 
)

获取成功提交到 Kafka 代理的最新偏移量。

参数
[in]topic主题/分区对的主题名称
[in]partition主题/分区对的分区号
返回值
指定主题/分区对的最新偏移量

◆ get_watermark_offset()

std::map<std::string, int64_t> cudf::io::external::kafka::kafka_consumer::get_watermark_offset ( std::string const &  topic,
int  partition,
int  timeout,
bool  cached 
)

获取主题/分区的水位偏移量值。

参数
[in]topic应检索水位偏移量的 Kafka 主题名称
[in]partition指定主题上应使用的分区
[in]timeout等待 Kafka 代理响应的最大毫秒数
[in]cached如果为 True,则使用上次从 Kafka 代理检索到的值;如果为 False,则通过网络请求从 Kafka 代理检索最新值。
返回值
指定主题/分区的的水位偏移量值

◆ host_read() [1/2]

std::unique_ptr<cudf::io::datasource::buffer> cudf::io::external::kafka::kafka_consumer::host_read ( size_t  offset,
size_t  size 
)
overridevirtual

返回包含 Kafka 主题数据子集的缓冲区。

参数
[in]offset从开头开始的字节数
[in]size要读取的字节数
返回值
数据缓冲区

实现 cudf::io::datasource

◆ host_read() [2/2]

size_t cudf::io::external::kafka::kafka_consumer::host_read ( size_t  offset,
size_t  size,
uint8_t *  dst 
)
overridevirtual

将选定范围的数据读入预分配的缓冲区。

参数
[in]offset从开头开始的字节数
[in]size要读取的字节数
[in]dst现有主机内存的地址
返回值
读取的字节数(可能小于 size)

实现 cudf::io::datasource

◆ list_topics()

std::map<std::string, std::vector<int32_t> > cudf::io::external::kafka::kafka_consumer::list_topics ( std::string  specific_topic)

查询 Kafka 代理以获取某个主题的分区列表。如果未指定主题,则将获取代理中所有主题的分区。

参数
[in]specific_topic要检索分区的特定主题名称。如果为空,则将检索所有主题的分区。
返回值
Kafka 主题名称及其对应主题分区值列表的 Map。

◆ size()

size_t cudf::io::external::kafka::kafka_consumer::size ( ) const
overridevirtual

返回 Kafka 缓冲区中数据的大小。

返回值
size_t 源数据的字节大小

实现 cudf::io::datasource

◆ unsubscribe()

void cudf::io::external::kafka::kafka_consumer::unsubscribe ( )

停止所有活跃的消费并移除消费者对主题/分区实例的订阅。

异常
cudf::logic_error取消订阅活跃分区分配失败时。

本类的文档生成自以下文件