Io 数据源#

group 数据
class datasource#
#include <datasource.hpp>

为读取器提供输入数据的接口类。

cudf::io::external::kafka::kafka_consumer 继承

公有函数

virtual ~datasource() = default#

基类析构函数。

virtual std::unique_ptr<datasource::buffer> host_read(size_t offset, size_t size) = 0#

返回包含源数据子集的缓冲区。

参数:
  • offset[in] 从开始位置的字节偏移量

  • size[in] 要读取的字节数

返回:

数据缓冲区(可能小于 size)

virtual std::future<std::unique_ptr<datasource::buffer>> host_read_async(size_t offset, size_t size)#

从数据源异步读取指定部分数据。

此函数启动一个异步读取操作,从数据源中给定 offset 位置开始读取 size 字节的数据。根据具体数据源的实现,读取操作可能会延迟到等待返回的 future 时才执行。

参数:
  • offset – 数据源中开始读取的位置。

  • size – 从数据源读取的字节数。

返回:

一个 std::future 对象,操作完成后将包含指向 datasource::buffer 的 unique pointer,其中包含读取的数据。

virtual size_t host_read(size_t offset, size_t size, uint8_t *dst) = 0#

将选定的范围读取到预分配的缓冲区中。

参数:
  • offset[in] 从开始位置的字节偏移量

  • size[in] 要读取的字节数

  • dst[in] 现有主机内存的地址

返回:

读取的字节数(可能小于 size)

virtual std::future<size_t> host_read_async(size_t offset, size_t size, uint8_t *dst)#

将数据从数据源异步读取到提供的主机内存缓冲区中。

此函数从数据源指定偏移量处开始启动一个异步读取操作,并将指定字节数读取到目标缓冲区中。根据具体数据源的实现,读取操作可能会延迟,并将在等待返回的 future 时执行。

参数:
  • offset – 数据源中开始读取的位置。

  • size – 从数据源读取的字节数。

  • dst – 指向目标缓冲区的指针,读取的数据将存储在此。

返回:

一个 std::future 对象,操作完成后将包含读取的字节数。

inline virtual bool supports_device_read() const#

此数据源是否支持直接读取到设备内存。

如果此函数返回 true,当读取器在设备上处理数据时,数据源将接收对 device_read() 的调用,而不是 host_read()。大多数读取器仍会进行 host_read() 调用,用于处理输入中在主机上处理的部分(例如元数据)。

不支持直接设备读取的数据源实现无需覆盖此函数。支持的实现应覆盖此函数并返回 false。

返回:

bool 此源是否支持 device_read() 调用

inline virtual bool is_device_read_preferred(size_t size) const#

估计给定大小的直接设备读取是否更优。

参数:

size – 要读取的字节数

返回:

给定大小的设备读取预计是否性能更好

inline virtual std::unique_ptr<datasource::buffer> device_read(size_t offset, size_t size, rmm::cuda_stream_view stream)#

返回包含源数据子集的设备缓冲区。

为获得最佳性能,仅当 is_device_read_preferred 返回 true 时才应调用此函数。不支持直接设备读取的数据源实现无需覆盖此函数。

抛出:

cudf::logic_error – 如果对象不支持直接设备读取,即 supports_device_read 返回 false

参数:
  • offset – 从开始位置的字节数

  • size – 要读取的字节数

  • stream – 要使用的 CUDA 流

返回:

设备内存中的数据缓冲区

inline virtual size_t device_read(size_t offset, size_t size, uint8_t *dst, rmm::cuda_stream_view stream)#

将选定范围读取到预分配的设备缓冲区中。

为获得最佳性能,仅当 is_device_read_preferred 返回 true 时才应调用此函数。不支持直接设备读取的数据源实现无需覆盖此函数。

抛出:

cudf::logic_error – 当对象不支持直接设备读取时,即 supports_device_read 返回 false

参数:
  • offset – 从开始位置的字节数

  • size – 要读取的字节数

  • dst – 现有设备内存的地址

  • stream – 要使用的 CUDA 流

返回:

读取的字节数(可能小于 size)

inline virtual std::future<size_t> device_read_async(size_t offset, size_t size, uint8_t *dst, rmm::cuda_stream_view stream)#

将选定范围异步读取到预分配的设备缓冲区中。

返回包含读取字节数的 future 值。调用返回值的 get() 方法将同步此函数。

为获得最佳性能,仅当 is_device_read_preferred 返回 true 时才应调用此函数。不支持直接设备读取的数据源实现无需覆盖此函数。

抛出:

cudf::logic_error – 当对象不支持直接设备读取时,即 supports_device_read 返回 false

参数:
  • offset – 从开始位置的字节数

  • size – 要读取的字节数

  • dst – 现有设备内存的地址

  • stream – 要使用的 CUDA 流

返回:

作为 future 值的读取字节数(可能小于 size)

virtual size_t size() const = 0#

返回源数据的大小。

返回:

源数据的大小(字节)

inline virtual bool is_empty() const#

返回源是否包含任何数据。

返回:

如果存在数据,返回 True,否则返回 False

公有静态函数

static std::unique_ptr<datasource> create(std::string const &filepath, size_t offset = 0, size_t max_size_estimate = 0)#

从文件路径创建数据源。

参数 offsetmax_size_estimate 是给 datasource 实现的提示,表明预期读取的数据范围。实现可以使用这些提示来优化读取操作。这些参数通常基于字节范围选项。在这种情况下,max_size_estimate 可以包括字节范围之后的填充,以包含处理可能需要的额外数据。

参数:
  • filepath[in] 要使用的文件路径

  • offset[in] 数据开始读取的字节偏移量(默认为零)

  • max_size_estimate[in] 预期读取数据范围的上限估计值(默认为零,表示 offset 之后整个文件)

返回:

构造的数据源对象

static std::unique_ptr<datasource> create(host_buffer const &buffer)#

从主机内存缓冲区创建数据源。

@已弃用 自 23.04 版本#

参数:

buffer[in] 主机缓冲区对象

返回:

构造的数据源对象

static std::unique_ptr<datasource> create(cudf::host_span<std::byte const> buffer)#

从主机内存缓冲区创建数据源。

参数:

buffer[in] 主机缓冲区对象

返回:

构造的数据源对象

static std::unique_ptr<datasource> create(cudf::device_span<std::byte const> buffer)#

从设备内存缓冲区创建数据源。

参数:

buffer – 设备缓冲区对象

返回:

构造的数据源对象

static std::unique_ptr<datasource> create(datasource *source)#

从用户实现的数据源对象创建数据源。

参数:

source[in] 指向数据源对象的非所有权指针

返回:

构造的数据源对象

template<typename T>
static inline std::vector<std::unique_ptr<datasource>> create(std::vector<T> const &args)#

创建数据源向量,输入向量中的每个元素对应一个数据源。

参数:

args[in] 参数向量

返回:

构造的数据源对象向量

class buffer#
#include <datasource.hpp>

数据源返回给调用者的缓冲区的接口类。

提供返回数据地址和大小的基本接口。

cudf::io::datasource::non_owning_buffer, cudf::io::datasource::owning_buffer< Container > 继承

公有函数

virtual size_t size() const = 0#

返回缓冲区的大小(字节)。

返回:

缓冲区大小(字节)

virtual uint8_t const *data() const = 0#

返回缓冲区中数据的地址。

返回:

缓冲区中数据的地址

virtual ~buffer() = default#

基类析构函数。

公有静态函数

template<typename Container>
static inline std::unique_ptr<buffer> create(Container &&data_owner)#

工厂函数,用于从容器构造数据源缓冲区对象。

模板参数:

Container – 用于构造缓冲区的容器类型

参数:

data_owner – 用于构造缓冲区的容器(所有权转移)

返回:

构造的缓冲区对象

class non_owning_buffer : public cudf::io::datasource::buffer#
#include <datasource.hpp>

非拥有缓冲区的实现,数据源持有缓冲区直到销毁。

公有函数

inline non_owning_buffer(uint8_t const *data, size_t size)#

构造一个新的非拥有缓冲区对象。

参数:
  • data – 数据缓冲区

  • size – 数据缓冲区的大小

inline virtual size_t size() const override#

返回缓冲区的大小。

返回:

缓冲区的大小(字节)

inline virtual uint8_t const *data() const override#

返回指向缓冲区的指针。

返回:

指向缓冲区的指针

template<typename Container>
class owning_buffer : public cudf::io::datasource::buffer#
#include <datasource.hpp>

拥有数据的 buffer 派生实现。

可以使用不同的容器类型来持有数据缓冲区。

模板参数:

Container – 拥有数据的容器对象类型

公有函数

inline owning_buffer(Container &&moved_data_owner)#

将输入容器移动到新创建的对象中。

参数:

moved_data_owner – 用于构造缓冲区的容器。调用者应显式传递 std::move(data_owner) 到此函数以转移所有权。

inline owning_buffer(Container &&moved_data_owner, uint8_t const *data_ptr, size_t size)#

将输入容器移动到新创建的对象中,并暴露缓冲区的子 span。

参数:
  • moved_data_owner – 用于构造缓冲区的容器。调用者应显式传递 std::move(data_owner) 到此函数以转移所有权。

  • data_ptr – 子 span 的起始指针

  • size – 子 span 的大小

inline virtual size_t size() const override#

返回缓冲区的大小。

返回:

缓冲区的大小(字节)

inline virtual uint8_t const *data() const override#

返回缓冲区中数据的指针。

返回:

缓冲区中数据的指针

class kafka_consumer : public cudf::io::datasource#
#include <kafka_consumer.hpp>

libcudf 的 Apache Kafka 数据源

公有函数

kafka_consumer(std::map<std::string, std::string> configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper)#

创建处于半就绪状态的 Kafka 消费者对象实例。

处于半就绪状态的消费者不具备成功与 Kafka broker 交互所需的所有参数。但在半就绪状态下,Kafka 元数据操作仍然可能。这对于仅计划使用这些元数据操作的客户端很有用。当不需要提前知道延迟分区和主题分配,并且需要尽可能延迟时,这很有用。librdkafka 配置的文档可在 edenhill/librdkafka 中找到。

参数:
  • configs – 将传递给 librdkafka 客户端的 librdkafka 配置的键/值对

  • python_callable – 指向 Python functools.partial 对象的 python_callable_type 指针

  • callable_wrapper – 用于调用 python_callablekafka_oauth_callback_wrapper_type Cython 包装器。此包装器旨在避免在 libcudf_kafka 中链接 Python 开发库。

kafka_consumer(std::map<std::string, std::string> configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper, std::string const &topic_name, int partition, int64_t start_offset, int64_t end_offset, int batch_timeout, std::string const &delimiter)#

实例化一个 Kafka 消费者对象。librdkafka 配置的文档可在 edenhill/librdkafka 中找到。

参数:
  • configs – 将传递给 librdkafka 客户端的 librdkafka 配置的键/值对

  • python_callable – 指向 Python functools.partial 对象的 python_callable_type 指针

  • callable_wrapper – 用于调用 python_callablekafka_oauth_callback_wrapper_type Cython 包装器。此包装器旨在避免在 libcudf_kafka 中链接 Python 开发库。

  • topic_name – 要消费的 Kafka 主题名称

  • partition – 要消费的分区索引,范围为 0TOPIC_NUM_PARTITIONS - 1(包含)

  • start_offset – 指定 TOPPAR(主题/分区组合)的起始偏移量

  • end_offset – 指定 TOPPAR 的读取结束位置

  • batch_timeout – 允许的最大读取时间(毫秒)。如果在 batch_timeout 前未达到 end_offset,则返回较小的子集。

  • delimiter – 在 Kafka 消息之间插入的可选分隔符,例如:“\n”

virtual std::unique_ptr<cudf::io::datasource::buffer> host_read(size_t offset, size_t size) override#

返回包含 Kafka 主题数据子集的缓冲区。

参数:
  • offset[in] 从开始位置的字节偏移量

  • size[in] 要读取的字节数

返回:

数据缓冲区

virtual size_t size() const override#

返回 Kafka 缓冲区中数据的大小。

返回:

size_t 源数据的大小(字节)

virtual size_t host_read(size_t offset, size_t size, uint8_t *dst) override#

将选定的范围读取到预分配的缓冲区中。

参数:
  • offset[in] 从开始位置的字节偏移量

  • size[in] 要读取的字节数

  • dst[in] 现有主机内存的地址

返回:

读取的字节数(可能小于 size)

void commit_offset(std::string const &topic, int partition, int64_t offset)#

将偏移量提交到指定的 Kafka 主题/分区实例。

抛出:

cudf::logic_error – 提交分区偏移量失败时抛出

参数:
  • topic[in] 应该设置偏移量的 Kafka 主题名称

  • partition[in] 指定主题上应该使用的分区

  • offset[in] 应该为主题/分区对设置的偏移量

std::map<std::string, int64_t> get_watermark_offset(std::string const &topic, int partition, int timeout, bool cached)#

检索主题/分区的水位偏移量值。

参数:
  • topic[in] 应该检索水位的主题名称

  • partition[in] 指定主题上应该使用的分区

  • timeout[in] 等待 Kafka broker 响应的最大毫秒数

  • cached[in] 如果为 True,则使用从 Kafka broker 检索到的最后一个值;如果为 False,则通过网络请求从 Kafka broker 检索最新值。

返回:

指定主题/分区的偏移量值

std::map<std::string, std::string> current_configs()#

检索当前的 Kafka 客户端配置。

返回:

当前客户端配置的键/值对的 Map<string, string>

int64_t get_committed_offset(std::string const &topic, int partition)#

获取成功提交到 Kafka broker 的最新偏移量。

参数:
  • topic[in] 主题/分区对的主题名称

  • partition[in] 主题/分区对的分区号

返回:

指定主题/分区对的最新偏移量

std::map<std::string, std::vector<int32_t>> list_topics(std::string specific_topic)#

查询 Kafka broker 以获取某个主题的分区列表。如果没有指定主题,则检索 broker 中所有主题的分区。

参数:

specific_topic[in] 要检索分区的主题名称。如果为空,则检索所有主题的分区。

返回:

Kafka 主题名称及其对应的主题分区值列表的 Map。

void close(int timeout)#

关闭到底层 Kafka socket 连接并清理系统资源。

抛出:

cudf::logic_error – 关闭连接失败时抛出

参数:

timeout – 等待响应的最大毫秒数

void unsubscribe()#

停止所有活动消费并取消消费者对主题/分区实例的订阅。

抛出:

cudf::logic_error – 取消订阅活动分区分配失败时抛出。