Io 数据源#
- group 数据 源
-
class datasource#
- #include <datasource.hpp>
为读取器提供输入数据的接口类。
被 cudf::io::external::kafka::kafka_consumer 继承
公有函数
-
virtual ~datasource() = default#
基类析构函数。
-
virtual std::unique_ptr<datasource::buffer> host_read(size_t offset, size_t size) = 0#
返回包含源数据子集的缓冲区。
- 参数:
offset – [in] 从开始位置的字节偏移量
size – [in] 要读取的字节数
- 返回:
数据缓冲区(可能小于 size)
-
virtual std::future<std::unique_ptr<datasource::buffer>> host_read_async(size_t offset, size_t size)#
从数据源异步读取指定部分数据。
此函数启动一个异步读取操作,从数据源中给定
offset
位置开始读取size
字节的数据。根据具体数据源的实现,读取操作可能会延迟到等待返回的 future 时才执行。- 参数:
offset – 数据源中开始读取的位置。
size – 从数据源读取的字节数。
- 返回:
一个 std::future 对象,操作完成后将包含指向 datasource::buffer 的 unique pointer,其中包含读取的数据。
-
virtual size_t host_read(size_t offset, size_t size, uint8_t *dst) = 0#
将选定的范围读取到预分配的缓冲区中。
- 参数:
offset – [in] 从开始位置的字节偏移量
size – [in] 要读取的字节数
dst – [in] 现有主机内存的地址
- 返回:
读取的字节数(可能小于 size)
-
virtual std::future<size_t> host_read_async(size_t offset, size_t size, uint8_t *dst)#
将数据从数据源异步读取到提供的主机内存缓冲区中。
此函数从数据源指定偏移量处开始启动一个异步读取操作,并将指定字节数读取到目标缓冲区中。根据具体数据源的实现,读取操作可能会延迟,并将在等待返回的 future 时执行。
- 参数:
offset – 数据源中开始读取的位置。
size – 从数据源读取的字节数。
dst – 指向目标缓冲区的指针,读取的数据将存储在此。
- 返回:
一个 std::future 对象,操作完成后将包含读取的字节数。
-
inline virtual bool supports_device_read() const#
此数据源是否支持直接读取到设备内存。
如果此函数返回 true,当读取器在设备上处理数据时,数据源将接收对 device_read() 的调用,而不是 host_read()。大多数读取器仍会进行 host_read() 调用,用于处理输入中在主机上处理的部分(例如元数据)。
不支持直接设备读取的数据源实现无需覆盖此函数。支持的实现应覆盖此函数并返回 false。
- 返回:
bool 此源是否支持 device_read() 调用
-
inline virtual bool is_device_read_preferred(size_t size) const#
估计给定大小的直接设备读取是否更优。
- 参数:
size – 要读取的字节数
- 返回:
给定大小的设备读取预计是否性能更好
-
inline virtual std::unique_ptr<datasource::buffer> device_read(size_t offset, size_t size, rmm::cuda_stream_view stream)#
返回包含源数据子集的设备缓冲区。
为获得最佳性能,仅当
is_device_read_preferred
返回true
时才应调用此函数。不支持直接设备读取的数据源实现无需覆盖此函数。- 抛出:
cudf::logic_error – 如果对象不支持直接设备读取,即
supports_device_read
返回false
。- 参数:
offset – 从开始位置的字节数
size – 要读取的字节数
stream – 要使用的 CUDA 流
- 返回:
设备内存中的数据缓冲区
-
inline virtual size_t device_read(size_t offset, size_t size, uint8_t *dst, rmm::cuda_stream_view stream)#
将选定范围读取到预分配的设备缓冲区中。
为获得最佳性能,仅当
is_device_read_preferred
返回true
时才应调用此函数。不支持直接设备读取的数据源实现无需覆盖此函数。- 抛出:
cudf::logic_error – 当对象不支持直接设备读取时,即
supports_device_read
返回false
。- 参数:
offset – 从开始位置的字节数
size – 要读取的字节数
dst – 现有设备内存的地址
stream – 要使用的 CUDA 流
- 返回:
读取的字节数(可能小于 size)
-
inline virtual std::future<size_t> device_read_async(size_t offset, size_t size, uint8_t *dst, rmm::cuda_stream_view stream)#
将选定范围异步读取到预分配的设备缓冲区中。
返回包含读取字节数的 future 值。调用返回值的
get()
方法将同步此函数。为获得最佳性能,仅当
is_device_read_preferred
返回true
时才应调用此函数。不支持直接设备读取的数据源实现无需覆盖此函数。- 抛出:
cudf::logic_error – 当对象不支持直接设备读取时,即
supports_device_read
返回false
。- 参数:
offset – 从开始位置的字节数
size – 要读取的字节数
dst – 现有设备内存的地址
stream – 要使用的 CUDA 流
- 返回:
作为 future 值的读取字节数(可能小于 size)
-
virtual size_t size() const = 0#
返回源数据的大小。
- 返回:
源数据的大小(字节)
-
inline virtual bool is_empty() const#
返回源是否包含任何数据。
- 返回:
如果存在数据,返回 True,否则返回 False
公有静态函数
-
static std::unique_ptr<datasource> create(std::string const &filepath, size_t offset = 0, size_t max_size_estimate = 0)#
从文件路径创建数据源。
参数
offset
和max_size_estimate
是给datasource
实现的提示,表明预期读取的数据范围。实现可以使用这些提示来优化读取操作。这些参数通常基于字节范围选项。在这种情况下,max_size_estimate
可以包括字节范围之后的填充,以包含处理可能需要的额外数据。- 参数:
filepath – [in] 要使用的文件路径
offset – [in] 数据开始读取的字节偏移量(默认为零)
max_size_estimate – [in] 预期读取数据范围的上限估计值(默认为零,表示 offset 之后整个文件)
- 返回:
构造的数据源对象
-
static std::unique_ptr<datasource> create(host_buffer const &buffer)#
从主机内存缓冲区创建数据源。
@已弃用 自 23.04 版本#
- 参数:
buffer – [in] 主机缓冲区对象
- 返回:
构造的数据源对象
-
static std::unique_ptr<datasource> create(cudf::host_span<std::byte const> buffer)#
从主机内存缓冲区创建数据源。
- 参数:
buffer – [in] 主机缓冲区对象
- 返回:
构造的数据源对象
-
static std::unique_ptr<datasource> create(cudf::device_span<std::byte const> buffer)#
从设备内存缓冲区创建数据源。
- 参数:
buffer – 设备缓冲区对象
- 返回:
构造的数据源对象
-
static std::unique_ptr<datasource> create(datasource *source)#
从用户实现的数据源对象创建数据源。
- 参数:
source – [in] 指向数据源对象的非所有权指针
- 返回:
构造的数据源对象
-
template<typename T>
static inline std::vector<std::unique_ptr<datasource>> create(std::vector<T> const &args)# 创建数据源向量,输入向量中的每个元素对应一个数据源。
- 参数:
args – [in] 参数向量
- 返回:
构造的数据源对象向量
-
class buffer#
- #include <datasource.hpp>
数据源返回给调用者的缓冲区的接口类。
提供返回数据地址和大小的基本接口。
被 cudf::io::datasource::non_owning_buffer, cudf::io::datasource::owning_buffer< Container > 继承
-
class non_owning_buffer : public cudf::io::datasource::buffer#
- #include <datasource.hpp>
非拥有缓冲区的实现,数据源持有缓冲区直到销毁。
-
template<typename Container>
class owning_buffer : public cudf::io::datasource::buffer# - #include <datasource.hpp>
拥有数据的
buffer
派生实现。可以使用不同的容器类型来持有数据缓冲区。
- 模板参数:
Container – 拥有数据的容器对象类型
公有函数
-
inline owning_buffer(Container &&moved_data_owner)#
将输入容器移动到新创建的对象中。
- 参数:
moved_data_owner – 用于构造缓冲区的容器。调用者应显式传递 std::move(data_owner) 到此函数以转移所有权。
-
inline owning_buffer(Container &&moved_data_owner, uint8_t const *data_ptr, size_t size)#
将输入容器移动到新创建的对象中,并暴露缓冲区的子 span。
- 参数:
moved_data_owner – 用于构造缓冲区的容器。调用者应显式传递 std::move(data_owner) 到此函数以转移所有权。
data_ptr – 子 span 的起始指针
size – 子 span 的大小
-
inline virtual size_t size() const override#
返回缓冲区的大小。
- 返回:
缓冲区的大小(字节)
-
inline virtual uint8_t const *data() const override#
返回缓冲区中数据的指针。
- 返回:
缓冲区中数据的指针
-
virtual ~datasource() = default#
-
class kafka_consumer : public cudf::io::datasource#
- #include <kafka_consumer.hpp>
libcudf 的 Apache Kafka 数据源
公有函数
-
kafka_consumer(std::map<std::string, std::string> configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper)#
创建处于半就绪状态的 Kafka 消费者对象实例。
处于半就绪状态的消费者不具备成功与 Kafka broker 交互所需的所有参数。但在半就绪状态下,Kafka 元数据操作仍然可能。这对于仅计划使用这些元数据操作的客户端很有用。当不需要提前知道延迟分区和主题分配,并且需要尽可能延迟时,这很有用。librdkafka 配置的文档可在 edenhill/librdkafka 中找到。
- 参数:
configs – 将传递给 librdkafka 客户端的 librdkafka 配置的键/值对
python_callable – 指向 Python functools.partial 对象的
python_callable_type
指针callable_wrapper – 用于调用
python_callable
的kafka_oauth_callback_wrapper_type
Cython 包装器。此包装器旨在避免在 libcudf_kafka 中链接 Python 开发库。
-
kafka_consumer(std::map<std::string, std::string> configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper, std::string const &topic_name, int partition, int64_t start_offset, int64_t end_offset, int batch_timeout, std::string const &delimiter)#
实例化一个 Kafka 消费者对象。librdkafka 配置的文档可在 edenhill/librdkafka 中找到。
- 参数:
configs – 将传递给 librdkafka 客户端的 librdkafka 配置的键/值对
python_callable – 指向 Python functools.partial 对象的
python_callable_type
指针callable_wrapper – 用于调用
python_callable
的kafka_oauth_callback_wrapper_type
Cython 包装器。此包装器旨在避免在 libcudf_kafka 中链接 Python 开发库。topic_name – 要消费的 Kafka 主题名称
partition – 要消费的分区索引,范围为
0
到TOPIC_NUM_PARTITIONS - 1
(包含)start_offset – 指定 TOPPAR(主题/分区组合)的起始偏移量
end_offset – 指定 TOPPAR 的读取结束位置
batch_timeout – 允许的最大读取时间(毫秒)。如果在 batch_timeout 前未达到 end_offset,则返回较小的子集。
delimiter – 在 Kafka 消息之间插入的可选分隔符,例如:“\n”
-
virtual std::unique_ptr<cudf::io::datasource::buffer> host_read(size_t offset, size_t size) override#
返回包含 Kafka 主题数据子集的缓冲区。
- 参数:
offset – [in] 从开始位置的字节偏移量
size – [in] 要读取的字节数
- 返回:
数据缓冲区
-
virtual size_t size() const override#
返回 Kafka 缓冲区中数据的大小。
- 返回:
size_t 源数据的大小(字节)
-
virtual size_t host_read(size_t offset, size_t size, uint8_t *dst) override#
将选定的范围读取到预分配的缓冲区中。
- 参数:
offset – [in] 从开始位置的字节偏移量
size – [in] 要读取的字节数
dst – [in] 现有主机内存的地址
- 返回:
读取的字节数(可能小于 size)
-
void commit_offset(std::string const &topic, int partition, int64_t offset)#
将偏移量提交到指定的 Kafka 主题/分区实例。
- 抛出:
cudf::logic_error – 提交分区偏移量失败时抛出
- 参数:
topic – [in] 应该设置偏移量的 Kafka 主题名称
partition – [in] 指定主题上应该使用的分区
offset – [in] 应该为主题/分区对设置的偏移量
-
std::map<std::string, int64_t> get_watermark_offset(std::string const &topic, int partition, int timeout, bool cached)#
检索主题/分区的水位偏移量值。
- 参数:
topic – [in] 应该检索水位的主题名称
partition – [in] 指定主题上应该使用的分区
timeout – [in] 等待 Kafka broker 响应的最大毫秒数
cached – [in] 如果为 True,则使用从 Kafka broker 检索到的最后一个值;如果为 False,则通过网络请求从 Kafka broker 检索最新值。
- 返回:
指定主题/分区的偏移量值
-
std::map<std::string, std::string> current_configs()#
检索当前的 Kafka 客户端配置。
- 返回:
当前客户端配置的键/值对的 Map<string, string>
-
int64_t get_committed_offset(std::string const &topic, int partition)#
获取成功提交到 Kafka broker 的最新偏移量。
- 参数:
topic – [in] 主题/分区对的主题名称
partition – [in] 主题/分区对的分区号
- 返回:
指定主题/分区对的最新偏移量
-
std::map<std::string, std::vector<int32_t>> list_topics(std::string specific_topic)#
查询 Kafka broker 以获取某个主题的分区列表。如果没有指定主题,则检索 broker 中所有主题的分区。
- 参数:
specific_topic – [in] 要检索分区的主题名称。如果为空,则检索所有主题的分区。
- 返回:
Kafka 主题名称及其对应的主题分区值列表的 Map。
-
void close(int timeout)#
关闭到底层 Kafka socket 连接并清理系统资源。
- 抛出:
cudf::logic_error – 关闭连接失败时抛出
- 参数:
timeout – 等待响应的最大毫秒数
-
void unsubscribe()#
停止所有活动消费并取消消费者对主题/分区实例的订阅。
- 抛出:
cudf::logic_error – 取消订阅活动分区分配失败时抛出。
-
kafka_consumer(std::map<std::string, std::string> configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper)#
-
class datasource#