IO 数据源#

group io_datasources
class datasource#
#include <datasource.hpp>

为读者提供输入数据的接口类。

cudf::io::external::kafka::kafka_consumer 子类化

公共函数

inline virtual ~datasource()#

基类析构函数。

virtual std::unique_ptr<datasource::buffer> host_read(size_t offset, size_t size) = 0#

返回一个包含源数据子集的缓冲区。

Parameters:
  • offset[in] 从开始的字节数

  • size[in] 要读取的字节数

Returns:

数据缓冲区(可以小于大小)

virtual size_t host_read(size_t offset, size_t size, uint8_t *dst) = 0#

将选定的范围读取到预先分配的缓冲区中。

Parameters:
  • offset[in] 从开始的字节数

  • size[in] 要读取的字节数

  • dst[in] 现有主机内存的地址

Returns:

读取的字节数(可能小于大小)

inline virtual bool supports_device_read() const#

此源是否支持直接读取到设备内存。

如果此函数返回true,当读取器在设备上处理数据时,数据源将接收对device_read()的调用,而不是host_read()。大多数读取器仍然会进行host_read()调用,用于在主机上处理的部分输入(例如元数据)。

不支持直接设备读取的数据源实现不需要重写此函数。支持直接设备读取的实现应重写此函数以返回false。

Returns:

bool 此源是否支持 device_read() 调用

inline virtual bool is_device_read_preferred(size_t size) const#

估计直接设备读取对于给定大小是否更优。

Parameters:

size – 要读取的字节数

Returns:

设备读取是否预期对给定大小更高效

inline virtual std::unique_ptr<datasource::buffer> device_read(size_t offset, size_t size, rmm::cuda_stream_view stream)#

返回一个包含源数据子集的设备缓冲区。

为了获得最佳性能,只有在is_device_read_preferred返回true时才应调用此函数。不支持直接设备读取的数据源实现不需要重写此函数。

Throws:

cudf::logic_error – 该对象不支持直接设备读取,即 supports_device_read 返回 false

Parameters:
  • offset – 从起始位置开始的字节数

  • size – 要读取的字节数

  • stream – 使用的CUDA流

Returns:

设备内存中的数据缓冲区

inline virtual size_t device_read(size_t offset, size_t size, uint8_t *dst, rmm::cuda_stream_view stream)#

将选定的范围读取到预分配的设备缓冲区中。

为了获得最佳性能,只有在is_device_read_preferred返回true时才应调用此函数。不支持直接设备读取的数据源实现不需要重写此函数。

Throws:

cudf::logic_error – 当对象不支持直接设备读取时,即 supports_device_read 返回 false

Parameters:
  • offset – 从起始位置开始的字节数

  • size – 要读取的字节数

  • dst – 现有设备内存的地址

  • stream – 使用的CUDA流

Returns:

读取的字节数(可能小于大小)

inline virtual std::future<size_t> device_read_async(size_t offset, size_t size, uint8_t *dst, rmm::cuda_stream_view stream)#

异步读取选定范围到预分配的设备缓冲区。

返回一个包含读取字节数的未来值。调用返回值的get()方法会同步此函数。

为了获得最佳性能,只有在is_device_read_preferred返回true时才应调用此函数。不支持直接设备读取的数据源实现不需要重写此函数。

Throws:

cudf::logic_error – 当对象不支持直接设备读取时,即 supports_device_read 返回 false

Parameters:
  • offset – 从起始位置开始的字节数

  • size – 要读取的字节数

  • dst – 现有设备内存的地址

  • stream – 使用的CUDA流

Returns:

读取的字节数作为未来值(可能小于大小)

virtual size_t size() const = 0#

返回源数据的大小。

Returns:

源数据的大小(以字节为单位)

inline virtual bool is_empty() const#

返回源是否包含任何数据。

Returns:

如果有数据则为真,否则为假

公共静态函数

static std::unique_ptr<datasource> create(std::string const &filepath, size_t offset = 0, size_t max_size_estimate = 0)#

从文件路径创建一个源。

参数 offsetmax_size_estimate 是对 datasource 实现的提示,关于将要读取的数据的预期范围。实现可以使用这些提示来优化读取操作。这些参数通常基于字节范围选项。在这种情况下,max_size_estimate 可以包括字节范围之后的填充,以包含处理可能需要的额外数据。

Parameters:
  • filepath[in] 要使用的文件的路径

  • offset[in] 起始字节偏移量,从该位置开始读取数据(默认为零)

  • max_size_estimate[in] 将要读取的数据范围的上限估计(默认值为零,表示在offset之后的整个文件)

Returns:

构建的数据源对象

static std::unique_ptr<datasource> create(host_buffer const &buffer)#

从主机内存缓冲区创建一个源。

@deprecated 自23.04起#

Parameters:

buffer[in] 主机缓冲区对象

Returns:

构建的数据源对象

static std::unique_ptr<datasource> create(cudf::host_span<std::byte const> buffer)#

从主机内存缓冲区创建一个源。

Parameters:

buffer[in] 主机缓冲区对象

Returns:

构建的数据源对象

static std::unique_ptr<datasource> create(cudf::device_span<std::byte const> buffer)#

从设备内存缓冲区创建一个源。

Parameters:

buffer – 设备缓冲区对象

Returns:

构建的数据源对象

static std::unique_ptr<datasource> create(datasource *source)#

从用户实现的数据源对象创建一个源。

Parameters:

source[in] 指向数据源对象的非拥有指针

Returns:

构建的数据源对象

template<typename T>
static inline std::vector<std::unique_ptr<datasource>> create(std::vector<T> const &args)#

创建一个数据源向量,输入向量中的每个元素对应一个数据源。

Parameters:

args[in] 参数向量

Returns:

构建的数据源对象向量

class buffer#
#include <datasource.hpp>

数据源返回给调用者的缓冲区的接口类。

提供一个基本接口来返回数据地址和大小。

cudf::io::datasource::non_owning_buffer, cudf::io::datasource::owning_buffer< Container > 子类化

公共函数

virtual size_t size() const = 0#

返回缓冲区大小(以字节为单位)。

Returns:

缓冲区大小(以字节为单位)

virtual uint8_t const *data() const = 0#

返回缓冲区中数据的地址。

Returns:

缓冲区中数据的地址

inline virtual ~buffer()#

基类析构函数。

公共静态函数

template<typename Container>
static inline std::unique_ptr<buffer> create(Container &&data_owner)#

从容器构造数据源缓冲对象的工厂。

Template Parameters:

容器 – 用于构建缓冲区的容器类型

Parameters:

data_owner – 从中构造缓冲区的容器(所有权被转移)

Returns:

构建的缓冲区对象

class non_owning_buffer : public cudf::io::datasource::buffer#
#include <datasource.hpp>

非拥有缓冲区的实现,其中数据源在销毁之前持有缓冲区。

公共函数

inline non_owning_buffer(uint8_t const *data, size_t size)#

构造一个新的非拥有缓冲区对象。

Parameters:
  • data – 数据缓冲区

  • size – 数据缓冲区的大小

inline virtual size_t size() const override#

返回缓冲区的大小。

Returns:

缓冲区的大小(以字节为单位)

inline virtual uint8_t const *data() const override#

返回指向缓冲区的指针。

Returns:

指向缓冲区的指针

template<typename Container>
class owning_buffer : public cudf::io::datasource::buffer#
#include <datasource.hpp>

buffer 的派生实现,拥有数据。

可以使用不同的容器类型来保存数据缓冲区。

Template Parameters:

容器 – 拥有数据的容器对象的类型

公共函数

inline owning_buffer(Container &&moved_data_owner)#

将输入容器移动到新创建的对象中。

Parameters:

moved_data_owner – 用于构造缓冲区的容器。调用者应显式传递 std::move(data_owner) 给此函数以转移所有权。

inline owning_buffer(Container &&moved_data_owner, uint8_t const *data_ptr, size_t size)#

将输入容器移动到新创建的对象中,并暴露缓冲区的一个子范围。

Parameters:
  • moved_data_owner – 用于构造缓冲区的容器。调用者应显式传递 std::move(data_owner) 给此函数以转移所有权。

  • data_ptr – 指向子跨度起始位置的指针

  • size – 子跨度的大小

inline virtual size_t size() const override#

返回缓冲区的大小。

Returns:

缓冲区的大小(以字节为单位)

inline virtual uint8_t const *data() const override#

返回指向缓冲区中数据的指针。

Returns:

指向缓冲区中数据的指针

class kafka_consumer : public cudf::io::datasource#
#include <kafka_consumer.hpp>

用于Apache Kafka的libcudf数据源

公共函数

kafka_consumer(std::map<std::string, std::string> configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper)#

创建一个处于半就绪状态的Kafka消费者对象的实例。

处于半就绪状态的消费者没有所有必需的参数来与Kafka代理成功进行消费者交互。然而,在半就绪状态下,Kafka元数据操作仍然是可能的。这对于计划仅使用这些元数据操作的客户端非常有用。这在需要延迟分区和主题分配的情况下非常有用,因为这种需求无法提前预知,需要尽可能延迟。librdkafka配置的文档可以在edenhill/librdkafka找到。

Parameters:
  • configs – 将传递给 librdkafka 客户端的 librdkafka 配置的键/值对

  • python_callablepython_callable_type 指向一个 Python functools.partial 对象的指针

  • callable_wrapperkafka_oauth_callback_wrapper_type Cython 包装器,将用于调用 python_callable。此包装器的目的是防止我们在 libcudf_kafka 中链接 Python 开发库。

kafka_consumer(std::map<std::string, std::string> configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper, std::string const &topic_name, int partition, int64_t start_offset, int64_t end_offset, int batch_timeout, std::string const &delimiter)#

实例化一个Kafka消费者对象。有关librdkafka配置的文档可以在edenhill/librdkafka找到。

Parameters:
  • configs – 将传递给 librdkafka 客户端的 librdkafka 配置的键/值对

  • python_callablepython_callable_type 指向一个 Python functools.partial 对象的指针

  • callable_wrapperkafka_oauth_callback_wrapper_type Cython 包装器,将用于调用 python_callable。此包装器的目的是防止我们在 libcudf_kafka 中链接 Python 开发库。

  • topic_name – 要消费的Kafka主题的名称

  • partition – 要消费的分区索引,范围在 0TOPIC_NUM_PARTITIONS - 1 之间,包括两端

  • start_offset – 指定TOPPAR(主题/分区组合)的查找位置

  • end_offset – 在指定的TOPPAR中读取到的位置

  • batch_timeout – 允许的最大读取时间(毫秒)。如果在 batch_timeout 之前未达到 end_offset,将返回较小的子集

  • delimiter – 可选的分隔符,用于在kafka消息之间插入到输出中,例如:“\n”

virtual std::unique_ptr<cudf::io::datasource::buffer> host_read(size_t offset, size_t size) override#

返回一个包含来自Kafka主题数据子集的缓冲区。

Parameters:
  • offset[in] 从开始的字节数

  • size[in] 要读取的字节数

Returns:

数据缓冲区

virtual size_t size() const override#

返回Kafka缓冲区中数据的大小。

Returns:

size_t 源数据的大小,以字节为单位

virtual size_t host_read(size_t offset, size_t size, uint8_t *dst) override#

将选定的范围读取到预先分配的缓冲区中。

Parameters:
  • offset[in] 从开始的字节数

  • size[in] 要读取的字节数

  • dst[in] 现有主机内存的地址

Returns:

读取的字节数(可能小于大小)

void commit_offset(std::string const &topic, int partition, int64_t offset)#

将偏移量提交到指定的Kafka主题/分区实例。

Throws:

cudf::logic_error – 在提交分区偏移失败时

Parameters:
  • topic[in] 应设置偏移量的Kafka主题名称

  • partition[in] 应使用的指定主题的分区

  • offset[in] 应为主题/分区对设置的偏移量

std::map<std::string, int64_t> get_watermark_offset(std::string const &topic, int partition, int timeout, bool cached)#

检索主题/分区的水印偏移值。

Parameters:
  • topic[in] 应该检索水印的Kafka主题的名称

  • partition[in] 应使用的指定主题的分区

  • timeout[in] 等待Kafka代理响应的最大毫秒数

  • cached[in] 如果为True,则使用从Kafka代理最后检索的值,如果为False,则通过发出网络请求从Kafka代理检索最新值。

Returns:

指定主题/分区的水印偏移值

std::map<std::string, std::string> current_configs()#

检索当前的Kafka客户端配置。

Returns:

当前客户端配置的键/值对的Map

int64_t get_committed_offset(std::string const &topic, int partition)#

获取已成功提交到Kafka代理的最新偏移量。

Parameters:
  • topic[in] 主题/分区对的主题名称

  • partition[in] 主题/分区对的分区编号

Returns:

指定主题/分区对的最新偏移量

std::map<std::string, std::vector<int32_t>> list_topics(std::string specific_topic)#

查询Kafka代理以获取某个主题的主题分区列表。如果未指定主题,则将检索代理中所有主题的分区。

Parameters:

specific_topic[in] 要检索分区的主题名称。如果为空,则将检索所有主题的分区。

Returns:

Kafka主题名称及其对应的主题分区值列表的映射。

void close(int timeout)#

关闭与Kafka的底层套接字连接并清理系统资源。

Throws:

cudf::logic_error – 在连接关闭失败时

Parameters:

timeout – 等待响应的最大毫秒数

void unsubscribe()#

停止所有活动的消费并移除消费者对主题/分区实例的订阅。

Throws:

cudf::logic_error – 在取消订阅活动分区分配失败时。