IO 数据源#
- group io_datasources
-
class datasource#
- #include <datasource.hpp>
为读者提供输入数据的接口类。
由 cudf::io::external::kafka::kafka_consumer 子类化
公共函数
-
inline virtual ~datasource()#
基类析构函数。
-
virtual std::unique_ptr<datasource::buffer> host_read(size_t offset, size_t size) = 0#
返回一个包含源数据子集的缓冲区。
- Parameters:
offset – [in] 从开始的字节数
size – [in] 要读取的字节数
- Returns:
数据缓冲区(可以小于大小)
-
virtual size_t host_read(size_t offset, size_t size, uint8_t *dst) = 0#
将选定的范围读取到预先分配的缓冲区中。
- Parameters:
offset – [in] 从开始的字节数
size – [in] 要读取的字节数
dst – [in] 现有主机内存的地址
- Returns:
读取的字节数(可能小于大小)
-
inline virtual bool supports_device_read() const#
此源是否支持直接读取到设备内存。
如果此函数返回true,当读取器在设备上处理数据时,数据源将接收对device_read()的调用,而不是host_read()。大多数读取器仍然会进行host_read()调用,用于在主机上处理的部分输入(例如元数据)。
不支持直接设备读取的数据源实现不需要重写此函数。支持直接设备读取的实现应重写此函数以返回false。
- Returns:
bool 此源是否支持 device_read() 调用
-
inline virtual bool is_device_read_preferred(size_t size) const#
估计直接设备读取对于给定大小是否更优。
- Parameters:
size – 要读取的字节数
- Returns:
设备读取是否预期对给定大小更高效
-
inline virtual std::unique_ptr<datasource::buffer> device_read(size_t offset, size_t size, rmm::cuda_stream_view stream)#
返回一个包含源数据子集的设备缓冲区。
为了获得最佳性能,只有在
is_device_read_preferred返回true时才应调用此函数。不支持直接设备读取的数据源实现不需要重写此函数。- Throws:
cudf::logic_error – 该对象不支持直接设备读取,即
supports_device_read返回false。- Parameters:
offset – 从起始位置开始的字节数
size – 要读取的字节数
stream – 使用的CUDA流
- Returns:
设备内存中的数据缓冲区
-
inline virtual size_t device_read(size_t offset, size_t size, uint8_t *dst, rmm::cuda_stream_view stream)#
将选定的范围读取到预分配的设备缓冲区中。
为了获得最佳性能,只有在
is_device_read_preferred返回true时才应调用此函数。不支持直接设备读取的数据源实现不需要重写此函数。- Throws:
cudf::logic_error – 当对象不支持直接设备读取时,即
supports_device_read返回false。- Parameters:
offset – 从起始位置开始的字节数
size – 要读取的字节数
dst – 现有设备内存的地址
stream – 使用的CUDA流
- Returns:
读取的字节数(可能小于大小)
-
inline virtual std::future<size_t> device_read_async(size_t offset, size_t size, uint8_t *dst, rmm::cuda_stream_view stream)#
异步读取选定范围到预分配的设备缓冲区。
返回一个包含读取字节数的未来值。调用返回值的
get()方法会同步此函数。为了获得最佳性能,只有在
is_device_read_preferred返回true时才应调用此函数。不支持直接设备读取的数据源实现不需要重写此函数。- Throws:
cudf::logic_error – 当对象不支持直接设备读取时,即
supports_device_read返回false。- Parameters:
offset – 从起始位置开始的字节数
size – 要读取的字节数
dst – 现有设备内存的地址
stream – 使用的CUDA流
- Returns:
读取的字节数作为未来值(可能小于大小)
-
virtual size_t size() const = 0#
返回源数据的大小。
- Returns:
源数据的大小(以字节为单位)
-
inline virtual bool is_empty() const#
返回源是否包含任何数据。
- Returns:
如果有数据则为真,否则为假
公共静态函数
-
static std::unique_ptr<datasource> create(std::string const &filepath, size_t offset = 0, size_t max_size_estimate = 0)#
从文件路径创建一个源。
参数
offset和max_size_estimate是对datasource实现的提示,关于将要读取的数据的预期范围。实现可以使用这些提示来优化读取操作。这些参数通常基于字节范围选项。在这种情况下,max_size_estimate可以包括字节范围之后的填充,以包含处理可能需要的额外数据。- Parameters:
filepath – [in] 要使用的文件的路径
offset – [in] 起始字节偏移量,从该位置开始读取数据(默认为零)
max_size_estimate – [in] 将要读取的数据范围的上限估计(默认值为零,表示在
offset之后的整个文件)
- Returns:
构建的数据源对象
-
static std::unique_ptr<datasource> create(host_buffer const &buffer)#
从主机内存缓冲区创建一个源。
@deprecated 自23.04起#
- Parameters:
buffer – [in] 主机缓冲区对象
- Returns:
构建的数据源对象
-
static std::unique_ptr<datasource> create(cudf::host_span<std::byte const> buffer)#
从主机内存缓冲区创建一个源。
- Parameters:
buffer – [in] 主机缓冲区对象
- Returns:
构建的数据源对象
-
static std::unique_ptr<datasource> create(cudf::device_span<std::byte const> buffer)#
从设备内存缓冲区创建一个源。
- Parameters:
buffer – 设备缓冲区对象
- Returns:
构建的数据源对象
-
static std::unique_ptr<datasource> create(datasource *source)#
从用户实现的数据源对象创建一个源。
- Parameters:
source – [in] 指向数据源对象的非拥有指针
- Returns:
构建的数据源对象
-
template<typename T>
static inline std::vector<std::unique_ptr<datasource>> create(std::vector<T> const &args)# 创建一个数据源向量,输入向量中的每个元素对应一个数据源。
- Parameters:
args – [in] 参数向量
- Returns:
构建的数据源对象向量
-
class buffer#
- #include <datasource.hpp>
数据源返回给调用者的缓冲区的接口类。
提供一个基本接口来返回数据地址和大小。
由 cudf::io::datasource::non_owning_buffer, cudf::io::datasource::owning_buffer< Container > 子类化
-
class non_owning_buffer : public cudf::io::datasource::buffer#
- #include <datasource.hpp>
非拥有缓冲区的实现,其中数据源在销毁之前持有缓冲区。
-
template<typename Container>
class owning_buffer : public cudf::io::datasource::buffer# - #include <datasource.hpp>
buffer的派生实现,拥有数据。可以使用不同的容器类型来保存数据缓冲区。
- Template Parameters:
容器 – 拥有数据的容器对象的类型
公共函数
-
inline owning_buffer(Container &&moved_data_owner)#
将输入容器移动到新创建的对象中。
- Parameters:
moved_data_owner – 用于构造缓冲区的容器。调用者应显式传递 std::move(data_owner) 给此函数以转移所有权。
-
inline owning_buffer(Container &&moved_data_owner, uint8_t const *data_ptr, size_t size)#
将输入容器移动到新创建的对象中,并暴露缓冲区的一个子范围。
- Parameters:
moved_data_owner – 用于构造缓冲区的容器。调用者应显式传递 std::move(data_owner) 给此函数以转移所有权。
data_ptr – 指向子跨度起始位置的指针
size – 子跨度的大小
-
inline virtual size_t size() const override#
返回缓冲区的大小。
- Returns:
缓冲区的大小(以字节为单位)
-
inline virtual uint8_t const *data() const override#
返回指向缓冲区中数据的指针。
- Returns:
指向缓冲区中数据的指针
-
inline virtual ~datasource()#
-
class kafka_consumer : public cudf::io::datasource#
- #include <kafka_consumer.hpp>
用于Apache Kafka的libcudf数据源
公共函数
-
kafka_consumer(std::map<std::string, std::string> configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper)#
创建一个处于半就绪状态的Kafka消费者对象的实例。
处于半就绪状态的消费者没有所有必需的参数来与Kafka代理成功进行消费者交互。然而,在半就绪状态下,Kafka元数据操作仍然是可能的。这对于计划仅使用这些元数据操作的客户端非常有用。这在需要延迟分区和主题分配的情况下非常有用,因为这种需求无法提前预知,需要尽可能延迟。librdkafka配置的文档可以在edenhill/librdkafka找到。
- Parameters:
configs – 将传递给 librdkafka 客户端的 librdkafka 配置的键/值对
python_callable –
python_callable_type指向一个 Python functools.partial 对象的指针callable_wrapper –
kafka_oauth_callback_wrapper_typeCython 包装器,将用于调用python_callable。此包装器的目的是防止我们在 libcudf_kafka 中链接 Python 开发库。
-
kafka_consumer(std::map<std::string, std::string> configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper, std::string const &topic_name, int partition, int64_t start_offset, int64_t end_offset, int batch_timeout, std::string const &delimiter)#
实例化一个Kafka消费者对象。有关librdkafka配置的文档可以在edenhill/librdkafka找到。
- Parameters:
configs – 将传递给 librdkafka 客户端的 librdkafka 配置的键/值对
python_callable –
python_callable_type指向一个 Python functools.partial 对象的指针callable_wrapper –
kafka_oauth_callback_wrapper_typeCython 包装器,将用于调用python_callable。此包装器的目的是防止我们在 libcudf_kafka 中链接 Python 开发库。topic_name – 要消费的Kafka主题的名称
partition – 要消费的分区索引,范围在
0到TOPIC_NUM_PARTITIONS - 1之间,包括两端start_offset – 指定TOPPAR(主题/分区组合)的查找位置
end_offset – 在指定的TOPPAR中读取到的位置
batch_timeout – 允许的最大读取时间(毫秒)。如果在 batch_timeout 之前未达到 end_offset,将返回较小的子集
delimiter – 可选的分隔符,用于在kafka消息之间插入到输出中,例如:“\n”
-
virtual std::unique_ptr<cudf::io::datasource::buffer> host_read(size_t offset, size_t size) override#
返回一个包含来自Kafka主题数据子集的缓冲区。
- Parameters:
offset – [in] 从开始的字节数
size – [in] 要读取的字节数
- Returns:
数据缓冲区
-
virtual size_t size() const override#
返回Kafka缓冲区中数据的大小。
- Returns:
size_t 源数据的大小,以字节为单位
-
virtual size_t host_read(size_t offset, size_t size, uint8_t *dst) override#
将选定的范围读取到预先分配的缓冲区中。
- Parameters:
offset – [in] 从开始的字节数
size – [in] 要读取的字节数
dst – [in] 现有主机内存的地址
- Returns:
读取的字节数(可能小于大小)
-
void commit_offset(std::string const &topic, int partition, int64_t offset)#
将偏移量提交到指定的Kafka主题/分区实例。
- Throws:
cudf::logic_error – 在提交分区偏移失败时
- Parameters:
topic – [in] 应设置偏移量的Kafka主题名称
partition – [in] 应使用的指定主题的分区
offset – [in] 应为主题/分区对设置的偏移量
-
std::map<std::string, int64_t> get_watermark_offset(std::string const &topic, int partition, int timeout, bool cached)#
检索主题/分区的水印偏移值。
- Parameters:
topic – [in] 应该检索水印的Kafka主题的名称
partition – [in] 应使用的指定主题的分区
timeout – [in] 等待Kafka代理响应的最大毫秒数
cached – [in] 如果为True,则使用从Kafka代理最后检索的值,如果为False,则通过发出网络请求从Kafka代理检索最新值。
- Returns:
指定主题/分区的水印偏移值
-
std::map<std::string, std::string> current_configs()#
检索当前的Kafka客户端配置。
- Returns:
当前客户端配置的键/值对的Map
-
int64_t get_committed_offset(std::string const &topic, int partition)#
获取已成功提交到Kafka代理的最新偏移量。
- Parameters:
topic – [in] 主题/分区对的主题名称
partition – [in] 主题/分区对的分区编号
- Returns:
指定主题/分区对的最新偏移量
-
std::map<std::string, std::vector<int32_t>> list_topics(std::string specific_topic)#
查询Kafka代理以获取某个主题的主题分区列表。如果未指定主题,则将检索代理中所有主题的分区。
- Parameters:
specific_topic – [in] 要检索分区的主题名称。如果为空,则将检索所有主题的分区。
- Returns:
Kafka主题名称及其对应的主题分区值列表的映射。
-
void close(int timeout)#
关闭与Kafka的底层套接字连接并清理系统资源。
- Throws:
cudf::logic_error – 在连接关闭失败时
- Parameters:
timeout – 等待响应的最大毫秒数
-
void unsubscribe()#
停止所有活动的消费并移除消费者对主题/分区实例的订阅。
- Throws:
cudf::logic_error – 在取消订阅活动分区分配失败时。
-
kafka_consumer(std::map<std::string, std::string> configs, python_callable_type python_callable, kafka_oauth_callback_wrapper_type callable_wrapper)#
-
class datasource#