- class lance.BlobColumn(blob_column: 数组 | ChunkedArray)¶
一个实用工具,用于封装Pyarrow二进制列,并以类似文件对象的方式逐行迭代。
这对于处理需要与期望文件类对象的API接口交互的中小型二进制对象非常有用。对于非常大的二进制对象(每个值4-8MB或更大),最好创建一个blob列并使用
lance.Dataset.take_blobs()来访问blob数据。
- class lance.BlobFile(inner: LanceBlobFile)¶
将Lance数据集中的blob表示为类似文件的对象。
- close() None¶
刷新并关闭IO对象。
如果文件已经关闭,此方法将不起作用。
- property closed : bool¶
- readable() bool¶
返回对象是否已打开以供读取。
如果为False,read()将抛出OSError异常。
- readall() bytes¶
读取直到文件结束(EOF),使用多次read()调用。
-
seek(offset: int, 何时: int =
0) int¶ 将流位置更改为给定的字节偏移量。
- offset
流位置,相对于‘whence’。
- whence
相对于当前位置的偏移量。
偏移量是相对于whence所指示的位置进行解释的。 whence的取值包括:
os.SEEK_SET 或 0 - 流的开始位置(默认值);偏移量应为零或正数
os.SEEK_CUR 或 1 – 当前流位置;偏移量可为负值
os.SEEK_END 或 2 – 流的末尾;偏移量通常为负数
返回新的绝对位置。
- seekable() bool¶
返回对象是否支持随机访问。
如果为False,seek()、tell()和truncate()将引发OSError。 此方法可能需要进行测试seek()。
- size() int¶
返回blob的大小(以字节为单位)。
- tell() int¶
返回当前流的位置。
- class lance.DataStatistics(字段: FieldStatistics)¶
数据集中的数据统计信息
- fields : FieldStatistics¶
数据集中字段的统计信息
-
class lance.FFILanceTableProvider(dataset, *, with_row_id=
False, with_row_addr=False)¶
- class lance.FieldStatistics(id: int, bytes_on_disk: int)¶
数据集中某个字段的统计信息
- bytes_on_disk : int¶
(可能经过压缩的)磁盘上用于存储该字段的字节
- id : int¶
字段的ID
-
class lance.FragmentMetadata(id: int, 文件: list[DataFile], physical_rows: int, deletion_file: DeletionFile | None =
None, row_id_meta: RowIdMeta | None =None)¶ 片段的元数据。
- id¶
片段的ID。
- Type:
整数
- physical_rows¶
该片段最初的行数。这是删除前数据文件中的行数。
- Type:
int
- deletion_file¶
删除文件(如果有的话)。
- Type:
可选[DeletionFile]
-
deletion_file : DeletionFile | None =
None¶
- static from_json(json_data: str) FragmentMetadata¶
- id : int¶
- property num_deletions : int¶
已从此片段中删除的行数。
- property num_rows : int¶
删除操作后此片段中的行数。
- physical_rows : int¶
- to_json() dict¶
获取这个作为简单的JSON可序列化字典。
-
class lance.LanceDataset(uri: str | Path, 版本: int | str | None =
None, block_size: int | None =None, index_cache_size: int | None =None, metadata_cache_size: int | None =None, commit_lock: CommitLock | None =None, storage_options: dict[str, str] | None =None, serialized_manifest: bytes | None =None, default_scan_options: dict[str, Any] | None =None)¶ Lance数据集采用Lance格式,数据存储在给定的uri位置。
-
add_columns(转换: dict[str, str] | BatchUDF | ReaderLike | pyarrow.Field | list[pyarrow.Field] | pyarrow.Schema, read_columns: list[str] | None =
None, reader_schema: pa.Schema | None =None, batch_size: int | None =None)¶ 使用定义的值添加新列。
有几种方式可以指定新列。首先,可以为每个新列提供SQL表达式。其次,可以提供一个UDF(用户定义函数),该函数接收一批现有数据并返回包含新列的新数据批次。这些新列将被追加到数据集中。
你也可以提供一个RecordBatchReader,它将从某个外部源读取新列的值。当新列的值已经暂存到文件中(通常由某个分布式进程完成)时,这通常很有用。
有关编写UDF的更多信息,请参阅
lance.add_columns_udf()装饰器。- Parameters:
- transforms : dict or AddColumnsUDF or ReaderLike¶
如果这是一个字典,那么键是新列的名称,值则是SQL表达式字符串。这些字符串可以引用数据集中的现有列。 如果这是一个AddColumnsUDF,那么它是一个用户定义函数,接收一批现有数据并返回包含新列的新数据批次。 如果这是
pyarrow.Field或pyarrow.Schema,则会以仅元数据操作的方式添加所有具有给定模式的NULL列。- read_columns : list of str, optional¶
UDF将读取的列名。如果为None,则UDF将读取所有列。仅当transforms是UDF时使用此参数。否则,读取的列将从SQL表达式中推断得出。
- reader_schema : pa.Schema, optional¶
仅当transforms是ReaderLike对象时有效。这将用于确定读取器的模式。
- batch_size : int, optional¶
在应用转换时,每次从源数据集中读取的行数。如果数据集是v1版本,则忽略此参数。
示例
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3]}) >>> dataset = lance.write_dataset(table, "my_dataset") >>> @lance.batch_udf() ... def double_a(batch): ... df = batch.to_pandas() ... return pd.DataFrame({'double_a': 2 * df['a']}) >>> dataset.add_columns(double_a) >>> dataset.to_table().to_pandas() a double_a 0 1 2 1 2 4 2 3 6 >>> dataset.add_columns({"triple_a": "a * 3"}) >>> dataset.to_table().to_pandas() a double_a triple_a 0 1 2 3 1 2 4 6 2 3 6 9另请参阅
LanceDataset.merge将一组预先计算好的列合并到数据集中。
- alter_columns(*修改: Iterable[AlterColumn])¶
修改列名、数据类型和可空性。
重命名的列可以保留其上的任何索引。如果某列具有IVF_PQ索引,即使该列被转换为其他类型,索引仍可保留。但目前其他类型的索引尚不支持类型转换。
列类型可以进行向上转型(例如从int32转为int64)或向下转型(例如从int64转为int32)。但若存在无法用新类型表示的值时,向下转型会失败。通常来说,列可以转型为相同的大类类型:整型转整型、浮点型转浮点型、字符串转字符串。不过字符串、二进制和列表列可以在其大小变体之间进行转型。例如:字符串转大字符串、二进制转大二进制、列表转大列表。
重命名的列可以保留其上的任何索引。但是,如果将该列转换为不同类型,其索引将被删除。
- Parameters:
- alterations : Iterable[Dict[str, Any]]¶
一个字典序列,每个字典包含以下键:
- ”path”: str
要修改的列路径。对于顶级列,这是列名。 对于嵌套列,这是点分隔的路径,例如"a.b.c"。
- ”name”: str, optional
列的新名称。如果未指定,则列名保持不变。
- ”nullable”: bool, optional
列是否可为空。如果未指定,则列的可空性不变。 只能将非空列改为可空列。目前不能将可空列改为非空列。
- ”data_type”: pyarrow.DataType, optional
要将列转换到的新数据类型。如果未指定,则列的数据类型保持不变。
示例
>>> import lance >>> import pyarrow as pa >>> schema = pa.schema([pa.field('a', pa.int64()), ... pa.field('b', pa.string(), nullable=False)]) >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example") >>> dataset.alter_columns({"path": "a", "name": "x"}, ... {"path": "b", "nullable": True}) >>> dataset.to_table().to_pandas() x b 0 1 a 1 2 b 2 3 c >>> dataset.alter_columns({"path": "x", "data_type": pa.int32()}) >>> dataset.schema x: int32 b: string
- checkout_version(版本: int | str) LanceDataset¶
加载指定版本的数据集。
与
dataset()构造函数不同,此操作将重用当前缓存。如果数据集已处于指定版本,则此操作无效。- Parameters:
- version : int | str,¶
要检出的版本号。可以提供一个版本号(int)或标签(str)。
- Return type:
-
cleanup_old_versions(older_than: timedelta | None =
None, *, delete_unverified: bool =False, error_if_tagged_old_versions: bool =True) CleanupStats¶ 清理数据集的旧版本。
某些数据集变更(如覆盖操作)会遗留未被最新版本引用的数据。这些旧数据会被保留,以便将数据集回滚到之前的版本。
该方法将移除旧版本及其引用的所有数据文件。 一旦清理任务执行完毕,您将无法检出或恢复这些旧版本。
- Parameters:
- older_than : timedelta, optional¶
仅早于此版本的记录将被删除。如果未指定,默认保留两周内的记录。
- delete_unverified : bool, default False¶
事务失败后遗留的文件可能看起来像是正在进行中的操作(例如追加新数据)的一部分,这些文件只有在至少7天后才会被删除。如果delete_unverified为True,则无论文件存在多久都会被删除。
只有在您能确保当前没有其他进程正在处理此数据集时,才应将此设置为True。否则数据集可能会进入损坏状态。
- error_if_tagged_old_versions : bool, default True¶
某些版本可能关联了标签。带标签的版本不会被清理,无论它们存在多久。如果该参数设为True(默认值),当任何带标签版本匹配参数时会抛出异常。否则,带标签版本将被静默忽略,仅清理未加标签的版本。
-
static commit(base_uri: str | Path | LanceDataset, 操作: LanceOperation.BaseOperation | 事务, blobs_op: LanceOperation.BaseOperation | None =
None, read_version: int | None =None, commit_lock: CommitLock | None =None, storage_options: dict[str, str] | None =None, enable_v2_manifest_paths: bool | None =None, 分离: bool | None =False, max_retries: int =20) LanceDataset¶ 创建数据集的新版本
该方法是一个高级方法,允许用户描述对数据文件所做的更改。当使用Lance应用变更时(例如使用
LanceDataset或write_dataset()),则不需要此方法。它当前的目的是允许在分布式环境中进行更改,其中没有单一进程完成所有工作。例如,分布式批量更新或分布式批量修改操作。
一旦完成所有更改,可以调用此方法通过更新数据集清单使更改生效。
警告
这是一个高级API,不提供与其他API相同级别的验证。例如,调用者有责任确保片段对模式有效。
- Parameters:
- base_uri : str, Path, or LanceDataset¶
数据集的基URI,或数据集对象本身。使用数据集对象可能更高效,因为它可以复用文件元数据缓存。
- operation : BaseOperation¶
应用于数据集的操作。这描述了已进行的更改。可用的操作请参见
LanceOperation。- read_version : int, optional¶
作为变更基础的数据集版本。 覆盖或恢复操作不需要此版本。
- commit_lock : CommitLock, optional¶
自定义提交锁。仅当您的对象存储不支持原子提交时才需要。详情请参阅用户指南。
- storage_options : optional, dict¶
针对特定存储连接的额外选项。这用于存储连接参数,如凭证、端点等。
- enable_v2_manifest_paths : bool, optional¶
如果为True,且这是一个新数据集,则使用新的V2清单路径。 这些路径能更高效地在对象存储上打开包含多个版本的数据集。如果数据集已存在,此参数将不起作用。要迁移现有数据集,请改用
migrate_manifest_paths_v2()方法。默认为False。警告: 启用此选项将使旧版Lance(0.17.0之前版本)无法读取该数据集。- detached : bool, optional¶
如果为True,则该提交不会成为数据集谱系的一部分。它将永远不会显示为最新数据集,未来唯一查看它的方式是通过指定版本号检出。该版本将是一个随机版本号,仅在分离提交中保持唯一。调用方应将其存储在某处,因为未来将无法通过其他方式获取它。
- max_retries : int¶
提交数据集时执行的最大重试次数。
- Returns:
Lance数据集的新版本。
- Return type:
示例
使用
LanceOperation.Overwrite操作创建新数据集:>>> import lance >>> import pyarrow as pa >>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]}) >>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]}) >>> fragment1 = lance.fragment.LanceFragment.create("example", tab1) >>> fragment2 = lance.fragment.LanceFragment.create("example", tab2) >>> fragments = [fragment1, fragment2] >>> operation = lance.LanceOperation.Overwrite(tab1.schema, fragments) >>> dataset = lance.LanceDataset.commit("example", operation) >>> dataset.to_table().to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 d
-
static commit_batch(dest: str | Path | LanceDataset, 交易: collections.abc.Sequence[事务], commit_lock: CommitLock | None =
None, storage_options: dict[str, str] | None =None, enable_v2_manifest_paths: bool | None =None, 分离: bool | None =False, max_retries: int =20) BulkCommitResult¶ 使用多个事务创建数据集的新版本。
此方法是一个高级方法,允许用户描述对数据文件所做的更改。当使用Lance应用更改时(例如使用
LanceDataset或write_dataset()),则不需要此方法。- Parameters:
- dest : str, Path, or LanceDataset¶
数据集的基础URI,或者数据集对象本身。使用数据集对象可能更高效,因为它可以复用文件元数据缓存。
- transactions : Iterable[Transaction]¶
要应用于数据集的事务操作。这些操作将被合并为单个事务并应用到数据集中。注意:目前仅支持追加事务,其他类型的事务将在未来版本中支持。
- commit_lock : CommitLock, optional¶
自定义提交锁。仅当您的对象存储不支持原子提交时才需要。详情请参阅用户指南。
- storage_options : optional, dict¶
针对特定存储连接的额外选项。这用于存储连接参数,如凭证、端点等。
- enable_v2_manifest_paths : bool, optional¶
如果为True,且这是一个新数据集,则使用新的V2清单路径。 这些路径能更高效地在对象存储上打开包含多个版本的数据集。如果数据集已存在,此参数将不起作用。要迁移现有数据集,请改用
migrate_manifest_paths_v2()方法。默认为False。警告: 启用此选项将使旧版Lance(0.17.0之前版本)无法读取该数据集。- detached : bool, optional¶
如果为True,则该提交不会成为数据集谱系的一部分。它将永远不会显示为最新数据集,未来唯一查看它的方式是通过指定版本号检出。该版本将是一个随机版本号,仅在分离提交中保持唯一。调用方应将其存储在某处,因为未来将无法通过其他方式获取它。
- max_retries : int¶
提交数据集时执行的最大重试次数。
- Returns:
- dataset: LanceDataset
Lance数据集的新版本。
- merged: Transaction
应用于数据集的合并事务。
- Return type:
包含键的字典
-
count_rows(filter: 表达式 | str | None =
None, **kwargs) int¶ 统计符合扫描器筛选条件的行数。
- Parameters:
- **kwargs : dict, optional¶
完整参数描述请参见 py:method:scanner 方法。
- Returns:
count – 数据集中的总行数。
- Return type:
int
-
create_index(列: str | list[str], index_type: str, name: str | None =
None, metric: str ='L2', replace: bool =False, num_partitions: int | None =None, ivf_centroids: np.ndarray | pa.FixedSizeListArray | pa.FixedShapeTensorArray | None =None, pq_codebook: np.ndarray | pa.FixedSizeListArray | pa.FixedShapeTensorArray | None =None, num_sub_vectors: int | None =None, accelerator: str | 'torch.Device' | None =None, index_cache_size: int | None =None, shuffle_partition_batches: int | None =None, shuffle_partition_concurrency: int | None =None, ivf_centroids_file: str | None =None, precomputed_partition_dataset: str | None =None, storage_options: dict[str, str] | None =None, filter_nan: bool =True, one_pass_ivfpq: bool =False, **kwargs) LanceDataset¶ 在列上创建索引。
实验性API
- Parameters:
- column : str¶
要建立索引的列。
- index_type : str¶
索引的类型。
"IVF_PQ, IVF_HNSW_PQ 和 IVF_HNSW_SQ"目前支持。- name : str, optional¶
索引名称。如果未提供,将根据列名自动生成。
- metric : str¶
距离度量类型,即“L2”(“euclidean”的别名)、“cosine”或“dot”(点积)。默认为“L2”。
- replace : bool¶
如果索引已存在,则替换现有索引。
- num_partitions : int, optional¶
IVF(倒排文件索引)的分区数量。
- ivf_centroids : optional¶
它可以是
np.ndarray、pyarrow.FixedSizeListArray或pyarrow.FixedShapeTensorArray。 一个num_partitions x dimension维度的现有K均值中心点数组, 用于IVF聚类。如果未提供,将训练一个新的KMeans模型。- pq_codebook : optional,¶
它可以是
np.ndarray、pyarrow.FixedSizeListArray, 或pyarrow.FixedShapeTensorArray。 一个num_sub_vectors x (2 ^ nbits * dimensions // num_sub_vectors)数组,表示PQ码本的K均值中心点。注意:目前
nbits始终为8。 如果未提供,将训练一个新的PQ模型。- num_sub_vectors : int, optional¶
PQ(乘积量化)的子向量数量。
- accelerator: str | 'torch.Device' | None =
None¶ 如果设置,将使用加速器来加快训练过程。 支持的加速器包括:"cuda"(英伟达GPU)和"mps"(苹果硅GPU)。 如果未设置,则使用CPU。
- index_cache_size : int, optional¶
索引缓存的大小,以条目数表示。默认值为256。
- shuffle_partition_batches : int, optional¶
批次数,使用数据集的row group大小,决定每个shuffle分区包含的数量。默认值为10240。
假设row group大小为1024,每个shuffle分区将包含10240 * 1024 = 10,485,760行。减小此值会减少shuffle操作的内存消耗但会增加完成时间,反之亦然。
- shuffle_partition_concurrency : int, optional¶
并发处理的shuffle分区数量。默认值为2
减小该值可以减少shuffle操作的内存消耗,但会增加完成时间,反之亦然。
- storage_options : optional, dict¶
针对特定存储连接的额外选项。这用于存储连接参数,如凭证、端点等。
- filter_nan : bool¶
默认为True。False是不安全的,如果存在任何null/nan值会导致崩溃(否则不会)。禁用用于可空列的空值过滤器。可获得小幅速度提升。
- one_pass_ivfpq : bool¶
默认为False。如果启用,索引类型必须为“IVF_PQ”。可减少磁盘IO。
- **kwargs¶
传递给索引构建过程的参数。
SQ(标量量化)仅适用于
IVF_HNSW_SQ索引类型,这种量化方法用于减少索引的内存占用,它将浮点向量映射为整数向量,每个整数占用num_bits位,目前仅支持8位。- If
index_typeis “IVF_*”, then the following parameters are required: num_partitions
- If
index_typeis with “PQ”, then the following parameters are required: 子向量数量
IVF_PQ的可选参数:
- ivf_centroids
用于IVF聚类的现有K均值中心点。
- num_bits
PQ(乘积量化)的位数。默认为8。 仅支持4和8。
- Optional parameters for IVF_HNSW_*:
- max_level
Int,图中最大层级数。
- m
Int,图中每个节点的边数。
- ef_construction
Int,构建过程中需要检查的节点数量。
示例
import lance dataset = lance.dataset("/tmp/sift.lance") dataset.create_index( "vector", "IVF_PQ", num_partitions=256, num_sub_vectors=16 )import lance dataset = lance.dataset("/tmp/sift.lance") dataset.create_index( "vector", "IVF_HNSW_SQ", num_partitions=256, )实验性加速器(GPU)支持:
- accelerate: 使用GPU训练IVF分区。
目前仅支持CUDA(Nvidia)或MPS(Apple)。 需要安装PyTorch。
import lance dataset = lance.dataset("/tmp/sift.lance") dataset.create_index( "vector", "IVF_PQ", num_partitions=256, num_sub_vectors=16, accelerator="cuda" )参考文献
-
create_scalar_index(列: str, index_type: 'BTREE' | 'BITMAP' | 'LABEL_LIST' | 'INVERTED' | 'FTS' | 'NGRAM', name: str | None =
None, *, 替换: bool =True, **kwargs)¶ 在列上创建标量索引。
标量索引与向量索引类似,可用于加速扫描。当扫描包含对已索引列的过滤表达式时,标量索引能显著提升查询速度。例如,若
my_col列建有标量索引,以下扫描操作将执行得更快:import lance dataset = lance.dataset("/tmp/images.lance") my_table = dataset.scanner(filter="my_col != 7").to_table()带有预过滤器的向量搜索也可以从标量索引中受益。例如,
import lance dataset = lance.dataset("/tmp/images.lance") my_table = dataset.scanner( nearest=dict( column="vector", q=[1, 2, 3, 4], k=10, ) filter="my_col != 7", prefilter=True )目前有5种标量索引类型可供选择。
BTREE。最常见的类型是BTREE。该索引的灵感来源于btree数据结构,尽管只有btree的前几层会被缓存在内存中。它将在具有大量唯一值且每个值对应行数较少的列上表现良好。BITMAP。该索引为列中的每个唯一值存储一个位图。这种索引适用于具有少量唯一值且每个值对应多行数据的列。LABEL_LIST. 一种特殊索引,用于对值基数较小的列表列进行索引。例如,包含标签列表的列(如["tag1", "tag2", "tag3"])可以使用LABEL_LIST索引。该索引只能加速带有array_has_any或array_has_all过滤器的查询。NGRAM. 一种用于索引字符串列的特殊索引。该索引会为字符串中的每个n元语法创建位图,默认使用三元语法。当前该索引可以加速在过滤器中使用contains函数的查询。FTS/INVERTED. 该索引用于文档列。这种索引可以进行全文搜索。例如,一个包含查询字符串"hello world"中任意单词的列。结果将按BM25排序。
请注意,可以使用环境变量
LANCE_BYPASS_SPILLING来绕过磁盘溢出。将其设置为true可以避免内存耗尽问题(更多信息请参阅https://github.com/apache/datafusion/issues/10073)。实验性API
- Parameters:
- column : str¶
要建立索引的列。必须是布尔型、整型、浮点型或字符串类型的列。
- index_type : str¶
索引的类型。可选值为
"BTREE","BITMAP","LABEL_LIST","NGRAM","FTS"或"INVERTED"。- name : str, optional¶
索引名称。如果未提供,将根据列名自动生成。
- replace : bool, default True¶
如果索引已存在,则替换现有索引。
- with_position : bool, default True
这是针对
INVERTED索引的。如果设为True,索引将存储文档中单词的位置信息,以便支持短语查询。这将显著增加索引大小。即使设置为True,也不会影响非短语查询的性能。- base_tokenizer : str, default "simple"
这是针对
INVERTED索引的配置。指定使用的基础分词器,可选值包括: * "simple":根据空白字符和标点符号进行分词。 * "whitespace":仅根据空白字符进行分词。 * "raw":不进行分词处理。- language : str, default "English"
这是针对
INVERTED索引的。用于词干提取和停用词处理的语言。仅当stem或remove_stop_words为true时使用- max_token_length : Optional[int], default 40
这是针对
INVERTED索引的设置。表示最大令牌长度。任何超过此长度的令牌将被移除。- lower_case : bool, default True
这是针对
INVERTED索引的。如果设为True,索引会将所有文本转换为小写。- stem : bool, default False
这是针对
INVERTED索引的。如果为True,索引将对词干进行提取。- remove_stop_words : bool, default False
这是针对
INVERTED索引的。如果为True,该索引将移除停用词。- ascii_folding : bool, default False
这是针对
INVERTED索引的。如果设为True,索引会尽可能将非ASCII字符转换为ASCII字符。例如会将带重音符号的字母如"é"转换为"e"。
示例
import lance dataset = lance.dataset("/tmp/images.lance") dataset.create_index( "category", "BTREE", )标量索引只能加速使用等值、比较、范围(例如
my_col BETWEEN 0 AND 100)和集合成员(例如my_col IN (0, 1, 2))等基础过滤条件的扫描当筛选条件包含多个索引列且这些条件通过AND或OR逻辑连接时,可以使用标量索引 (例如
my_col < 0 AND other_col> 100)如果过滤条件包含未建立索引的列,虽然可以使用标量索引,但根据过滤条件的结构,可能无法实际使用。例如,若列
not_indexed没有标量索引,那么过滤条件my_col = 0 OR not_indexed = 1将无法利用my_col上的任何标量索引。要判断扫描是否使用了标量索引,可以使用
explain_plan查看lancedb生成的查询计划。使用标量索引的查询将包含ScalarIndexQuery关系或MaterializeIndex操作符。
- property data_storage_version : str¶
该数据集所使用的数据存储格式版本
- delete(predicate: str | 表达式)¶
从数据集中删除行。
这会将行标记为已删除,但不会从文件中物理移除它们。这样可以保持现有索引仍然有效。
- Parameters:
- predicate : str or pa.compute.Expression¶
用于选择要删除行的谓词。可以是SQL字符串或pyarrow表达式。
示例
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example") >>> dataset.delete("a = 1 or b in ('a', 'b')") >>> dataset.to_table() pyarrow.Table a: int64 b: string ---- a: [[3]] b: [["c"]]
-
static drop(base_uri: str | Path, storage_options: dict[str, str] | None =
None) None¶
- drop_columns(columns: list[str])¶
从数据集中删除一列或多列
这是一个仅涉及元数据的操作,不会从底层存储中删除实际数据。如需删除数据,您必须随后调用
compact_files来重写不包含被删除列的数据,然后调用cleanup_old_versions来移除旧文件。- Parameters:
- columns : list of str¶
要删除的列名。这些可以是嵌套列引用(例如“a.b.c”)或顶级列名(例如“a”)。
示例
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example") >>> dataset.drop_columns(["a"]) >>> dataset.to_table().to_pandas() b 0 a 1 b 2 c
- drop_index(name: str)¶
从数据集中删除索引
注意:索引是通过"索引名称"删除的。这与字段名称不同。如果在创建索引时未指定名称,则会自动生成一个名称。您可以使用list_indices方法来获取索引的名称。
- get_fragment(fragment_id: int) LanceFragment | None¶
根据片段ID获取对应的片段。
-
get_fragments(filter: Expression | None =
None) list[LanceFragment]¶ 从数据集中获取所有片段。
注意:过滤器功能暂不支持。
- property has_index¶
- index_statistics(index_name: str) dict[str, Any]¶
-
insert(数据: ReaderLike, *, mode=
'append', **kwargs)¶ 将数据插入数据集。
- Parameters:
- data_obj : Reader-like
要写入的数据。可接受的类型包括: - Pandas DataFrame、Pyarrow Table、Dataset、Scanner 或 RecordBatchReader - Huggingface 数据集
- mode : str, default 'append'¶
- 写入数据时使用的模式。可选选项有:
create - 创建新数据集(如果uri已存在则会报错)。 overwrite - 创建新的快照版本 append - 创建新版本,该版本是输入数据与最新版本的合并(如果uri不存在则会报错)
- **kwargs : dict, optional¶
传递给
write_dataset()的其他关键字参数。
-
join(right_dataset, keys, right_keys=
None, join_type='left outer', left_suffix=None, right_suffix=None, coalesce_keys=True, use_threads=True)¶ 未实现(仅覆盖pyarrow数据集以防止段错误)
- property lance_schema : LanceSchema¶
该数据集的LanceSchema
- property latest_version : int¶
返回数据集的最新版本。
- property max_field_id : int¶
清单中的max_field_id
-
merge(data_obj: ReaderLike, left_on: str, right_on: str | None =
None, schema=None)¶ 将另一个数据集合并到当前数据集中。
执行左连接操作,其中数据集作为左表,data_obj作为右表。存在于数据集但不在左表中的行将被填充为null值,除非Lance不支持某些类型的null值,这种情况下会抛出错误。
- Parameters:
示例
>>> import lance >>> import pyarrow as pa >>> df = pa.table({'x': [1, 2, 3], 'y': ['a', 'b', 'c']}) >>> dataset = lance.write_dataset(df, "dataset") >>> dataset.to_table().to_pandas() x y 0 1 a 1 2 b 2 3 c >>> new_df = pa.table({'x': [1, 2, 3], 'z': ['d', 'e', 'f']}) >>> dataset.merge(new_df, 'x') >>> dataset.to_table().to_pandas() x y z 0 1 a d 1 2 b e 2 3 c f另请参阅
LanceDataset.add_columns通过逐批计算添加新列。
- merge_insert(开启: str | Iterable[str]) MergeInsertBuilder¶
返回一个构建器,可用于创建“合并插入”操作
该操作可以在单个事务中添加行、更新行和删除行。这是一个非常通用的工具,可用于实现诸如“如果不存在则插入”、“更新或插入(即upsert)”等行为,甚至可以用新数据替换部分现有数据(例如替换所有月份为“一月”的数据)。
合并插入操作通过使用连接将源表中的新数据与目标表中的现有数据相结合。记录分为三类。
"匹配"记录是指同时存在于源表和目标表中的记录。"未匹配"记录仅存在于源表中(例如这些是新数据)。"源未匹配"记录仅存在于目标表中(这是旧数据)。
此方法返回的构建器可用于自定义每种数据类别应执行的操作。
请注意,此操作会导致数据重新排序。这是因为更新的行会从数据集中删除,然后以新值重新插入到末尾。由于内部使用了哈希连接操作,新插入行的顺序可能会随机波动。
- Parameters:
- on : Union[str, Iterable[str]]¶
要连接的列(或多列)。这是源表和目标表中记录匹配的方式。通常这是某种键或ID列。
示例
使用when_matched_update_all()和when_not_matched_insert_all()来执行"upsert"操作。这将更新数据集中已存在的行,并插入不存在的行。
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [2, 1, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example") >>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]}) >>> # Perform a "upsert" operation >>> dataset.merge_insert("a") \ ... .when_matched_update_all() \ ... .when_not_matched_insert_all() \ ... .execute(new_table) {'num_inserted_rows': 1, 'num_updated_rows': 2, 'num_deleted_rows': 0} >>> dataset.to_table().sort_by("a").to_pandas() a b 0 1 b 1 2 x 2 3 y 3 4 z使用when_not_matched_insert_all()执行"不存在则插入"操作。这只会插入数据集中尚不存在的行。
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example2") >>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]}) >>> # Perform an "insert if not exists" operation >>> dataset.merge_insert("a") \ ... .when_not_matched_insert_all() \ ... .execute(new_table) {'num_inserted_rows': 1, 'num_updated_rows': 0, 'num_deleted_rows': 0} >>> dataset.to_table().sort_by("a").to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 z您不需要提供所有列。如果只想更新部分列,可以省略不想更新的列。被省略的列在更新时会保留现有值,如果是插入操作则会设为null。
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"], \ ... "c": ["x", "y", "z"]}) >>> dataset = lance.write_dataset(table, "example3") >>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]}) >>> # Perform an "upsert" operation, only updating column "a" >>> dataset.merge_insert("a") \ ... .when_matched_update_all() \ ... .when_not_matched_insert_all() \ ... .execute(new_table) {'num_inserted_rows': 1, 'num_updated_rows': 2, 'num_deleted_rows': 0} >>> dataset.to_table().sort_by("a").to_pandas() a b c 0 1 a x 1 2 x y 2 3 y z 3 4 z None
- migrate_manifest_paths_v2()¶
将清单路径迁移到新格式。
这将更新清单以使用新的v2格式路径。
该函数具有幂等性,可以多次运行而不会改变对象存储的状态。
警告:当其他并发操作正在进行时,不应运行此操作。 并且该操作应在完成后再恢复其他操作。
- property optimize : DatasetOptimizer¶
- property partition_expression¶
未实现(仅覆盖pyarrow数据集以防止段错误)
- prewarm_index(name: str)¶
预热索引
这将把整个索引加载到内存中。这有助于避免索引查询时的冷启动问题。如果索引无法放入索引缓存中,则会导致I/O浪费。
- Parameters:
- name : str¶
需要预热的索引名称。
- replace_field_metadata(field_name: str, new_metadata: dict[str, str])¶
替换模式中某个字段的元数据
- replace_schema(schema: 模式/架构)¶
未实现(仅覆盖pyarrow数据集以防止段错误)
参见 :py:method:`replace_schema_metadata` 或 :py:method:`replace_field_metadata`
- replace_schema_metadata(new_metadata: dict[str, str])¶
替换数据集的结构元数据
- Parameters:
- new_metadata : dict¶
要设置的新元数据
- restore()¶
将当前检出的版本恢复为数据集的最新版本。
这将创建一个新的提交。
-
sample(num_rows: int, columns: list[str] | dict[str, str] | None =
None, randomize_order: bool =True, **kwargs) 表格¶ 随机选取数据样本
-
scanner(columns: list[str] | dict[str, str] | None =
None, filter: 表达式 | str | None =None, limit: int | None =None, offset: int | None =None, nearest: dict | None =None, batch_size: int | None =None, batch_readahead: int | None =None, fragment_readahead: int | None =None, scan_in_order: bool | None =None, 片段: Iterable[LanceFragment] | None =None, full_text_query: str | dict | FullTextQuery | None =None, *, prefilter: bool | None =None, with_row_id: bool | None =None, with_row_address: bool | None =None, use_stats: bool | None =None, fast_search: bool | None =None, io_buffer_size: int | None =None, late_materialization: bool | list[str] | None =None, use_scalar_index: bool | None =None, include_deleted_rows: bool | None =None, scan_stats_callback: Callable[[ScanStatistics], None] | None =None, strict_batch_size: bool | None =None) LanceScanner¶ 返回一个支持各种下推操作的Scanner。
- Parameters:
- columns : list of str, or dict of str to str default None¶
要获取的列名列表。 或者列名到SQL表达式的字典。 如果为None或未指定,则获取所有列。
- filter : pa.compute.Expression or str¶
表达式或字符串,必须是一个有效的SQL where子句。有关有效的SQL表达式,请参阅 Lance filter pushdown 。
- limit : int, default None¶
最多获取这么多行。如果为None或未指定,则获取所有行。
- offset : int, default None¶
从这一行开始获取。如果未指定则为0。
- nearest : dict, default None¶
获取与K个最相似向量对应的行。示例:
{ "column": <embedding col name>, "q": <query vector as pa.Float32Array>, "k": 10, "minimum_nprobes": 20, "maximum_nprobes": 50, "refine_factor": 1 }- batch_size : int, default None¶
返回批次的目标大小。在某些情况下,批次大小可能达到此值的两倍(但绝不会超过)。而在其他情况下,批次大小可能小于此值。
- io_buffer_size : int, default None¶
IO缓冲区的大小。有关更多信息,请参阅
ScannerBuilder.io_buffer_size。- batch_readahead : int, optional¶
预读取的批次数量。
- fragment_readahead : int, optional¶
预读取的片段数量。
- scan_in_order : bool, default True¶
是否按顺序读取片段和批次。如果为false,吞吐量可能会更高,但批次将无序返回,内存使用可能会增加。
- fragments : iterable of LanceFragment, default None¶
如果指定,则仅扫描这些片段。如果scan_in_order为True,那么片段将按照给定的顺序进行扫描。
- prefilter : bool, default False¶
如果为True,则在运行向量查询之前应用过滤器。 这将生成更准确的结果,但查询成本可能更高。 通常在过滤器具有高选择性时效果良好。
如果为False,则在运行向量查询之后应用过滤器。 这样性能会更好,但如果最接近查询的行不匹配过滤器, 结果可能会少于请求的行数(或为空)。 通常在过滤器选择性不高时效果良好。
- use_scalar_index : bool, default True¶
Lance会自动使用标量索引来优化查询。在某些极端情况下,这可能会使查询性能变差,此时可以通过该参数禁用标量索引。
- late_materialization : bool or List[str], default None¶
允许自定义控制延迟物化。延迟物化会在过滤操作后通过take操作获取非查询列,这在结果较少或列数据量非常大时非常有用。
当结果较多或列数据非常窄时,早期物化可能更优。
如果设为True,则所有列都采用延迟物化; 如果设为False,则所有列都采用早期物化; 如果是字符串列表,则只有列表中的列会延迟物化。
默认采用启发式算法,假设过滤器会筛选出约0.1%的行。如果您的过滤器选择性更强(例如按id查找),可能需要设为True;如果过滤器选择性较弱(例如匹配20%的行),可能需要设为False。
- full_text_query : str or dict, optional¶
要搜索的查询字符串,结果将按BM25算法排序。 例如"hello world"会匹配包含"hello"或"world"的文档。 或者使用包含以下键的字典:
- columns: list[str]
要搜索的列, 目前columns列表中仅支持单个列。
- query: str
要搜索的查询字符串。
- fast_search : bool, default False¶
如果为True,则搜索将仅在索引数据上执行,这样可以缩短搜索时间。
- scan_stats_callback : Callable[[ScanStatistics], None], default None¶
一个回调函数,在扫描完成后将调用该函数并传入扫描统计信息。回调函数引发的错误将被记录但不会重新抛出。
- include_deleted_rows : bool, default False¶
如果为True,则已被删除但仍存在于片段中的行将被返回。这些行的_rowid列将被设为null。所有其他列将反映磁盘上存储的值,可能不为null。
注意:如果是搜索操作或take操作(包括标量索引扫描),则无法返回已删除的行。
注意
目前,如果同时指定了filter和nearest,那么:
nearest 会优先执行。
结果会在之后进行过滤。
为了调试近似最近邻(ANN)结果,您可以选择即使存在索引也不使用它,只需指定
use_index=False。例如,以下代码将始终返回精确的KNN结果:dataset.to_table(nearest={ "column": "vector", "k": 10, "q": <query vector>, "use_index": False }
- session() _Session¶
返回数据集会话,该会话保存了数据集的状态。
- property stats : LanceStats¶
实验性API
- property tags : 标签¶
数据集标签管理。
与Git类似,标签是一种向数据集特定版本添加元数据的方式。
示例
ds = lance.open("dataset.lance") ds.tags.create("v2-prod-20250203", 10) tags = ds.tags.list()
-
take(索引: list[int] | 数组, columns: list[str] | dict[str, str] | None =
None) 表格¶ 按索引选择数据行。
- Parameters:
- Returns:
表格
- Return type:
-
take_blobs(blob_column: str, ids: list[int] | 数组 | None =
None, 地址: list[int] | 数组 | None =None, 索引: list[int] | 数组 | None =None) list[BlobFile]¶ 按行ID选择二进制大对象。
无需在处理前将大型二进制blob数据加载到内存中,该API允许您将二进制blob数据作为常规的Python类文件对象打开。更多详情请参阅
lance.BlobFile。必须且只能指定ids、addresses或indices中的一个参数。 :param blob_column: 要选择的blob列名称。 :type blob_column: str :param ids: 数据集中要选择的行ID。 :type ids: Integer Array或类似数组 :param addresses: 数据集中要选择行的(不稳定)内存地址。 :type addresses: Integer Array或类似数组 :param indices: 数据集中行的偏移量/索引。 :type indices: Integer Array或类似数组
- Returns:
blob_files
- Return type:
列表[BlobFile]
-
to_batches(columns: list[str] | dict[str, str] | None =
None, filter: 表达式 | str | None =None, limit: int | None =None, offset: int | None =None, nearest: dict | None =None, batch_size: int | None =None, batch_readahead: int | None =None, fragment_readahead: int | None =None, scan_in_order: bool | None =None, *, prefilter: bool | None =None, with_row_id: bool | None =None, with_row_address: bool | None =None, use_stats: bool | None =None, full_text_query: str | dict | None =None, io_buffer_size: int | None =None, late_materialization: bool | list[str] | None =None, use_scalar_index: bool | None =None, strict_batch_size: bool | None =None, **kwargs) Iterator[RecordBatch]¶ 将数据集读取为物化的记录批次。
- Parameters:
- Returns:
record_batches
- Return type:
RecordBatch的迭代器
-
to_table(columns: list[str] | dict[str, str] | None =
None, filter: 表达式 | str | None =None, limit: int | None =None, offset: int | None =None, nearest: dict | None =None, batch_size: int | None =None, batch_readahead: int | None =None, fragment_readahead: int | None =None, scan_in_order: bool | None =None, *, prefilter: bool | None =None, with_row_id: bool | None =None, with_row_address: bool | None =None, use_stats: bool | None =None, fast_search: bool | None =None, full_text_query: str | dict | FullTextQuery | None =None, io_buffer_size: int | None =None, late_materialization: bool | list[str] | None =None, use_scalar_index: bool | None =None, include_deleted_rows: bool | None =None) 表格¶ 将数据作为
pyarrow.Table读入内存- Parameters:
- columns : list of str, or dict of str to str default None¶
要获取的列名列表。 或者列名到SQL表达式的字典。 如果为None或未指定,则获取所有列。
- filter : pa.compute.Expression or str¶
表达式或字符串,必须是一个有效的SQL where子句。有关有效的SQL表达式,请参阅 Lance filter pushdown 。
- limit : int, default None¶
最多获取这么多行。如果为None或未指定,则获取所有行。
- offset : int, default None¶
从这一行开始获取。如果未指定则为0。
- nearest : dict, default None¶
获取与K个最相似向量对应的行。示例:
{ "column": <embedding col name>, "q": <query vector as pa.Float32Array>, "k": 10, "metric": "cosine", "minimum_nprobes": 20, "maximum_nprobes": 50, "refine_factor": 1 }- batch_size : int, optional¶
每次读取的行数。
- io_buffer_size : int, default None¶
IO缓冲区的大小。有关更多信息,请参阅
ScannerBuilder.io_buffer_size。- batch_readahead : int, optional¶
预读取的批次数量。
- fragment_readahead : int, optional¶
预读取的片段数量。
- scan_in_order : bool, optional, default True¶
是否按顺序读取片段和批次。如果为false,吞吐量可能会更高,但批次将无序返回,内存使用可能会增加。
- prefilter : bool, optional, default False¶
在向量搜索之前运行过滤器。
- late_materialization : bool or List[str], default None¶
允许自定义控制延迟物化。更多信息请参阅
ScannerBuilder.late_materialization。- use_scalar_index : bool, default True¶
允许自定义控制标量索引的使用。更多信息请参见
ScannerBuilder.use_scalar_index。- with_row_id : bool, optional, default False¶
返回行ID。
- with_row_address : bool, optional, default False¶
返回行地址
- use_stats : bool, optional, default True¶
在过滤过程中使用统计下推。
- fast_search : bool, optional, default False¶
- full_text_query : str or dict, optional¶
用于搜索的查询字符串,结果将按BM25算法排序。 例如:"hello world"会匹配包含"hello"或"world"的文档。 或者使用包含以下键的字典:
- columns: list[str]
要搜索的列, 目前仅支持在列列表中指定单个列。
- query: str
要搜索的查询字符串。
- include_deleted_rows : bool, optional, default False¶
如果为True,则已被删除但仍存在于片段中的行将被返回。这些行的_rowid列将被设为null。所有其他列将反映磁盘上存储的值,可能不为null。
注意:如果是搜索操作或take操作(包括标量索引扫描),则无法返回已删除的行。
笔记
如果同时指定了filter和nearest,那么:
nearest 会优先执行。
除非将pre-filter设置为True,否则结果会在之后进行过滤。
-
update(更新: dict[str, str], where: str | None =
None) UpdateResult¶ 更新符合where条件的行的列值。
- Parameters:
- Returns:
updates – 包含更新行数的字典。
- Return type:
字典
示例
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example") >>> update_stats = dataset.update(dict(a = 'a + 2'), where="b != 'a'") >>> update_stats["num_updated_rows"] = 2 >>> dataset.to_table().to_pandas() a b 0 1 a 1 4 b 2 5 c
- property uri : str¶
数据的位置
- validate()¶
验证数据集。
此操作会检查数据集的完整性,如果数据集已损坏则会抛出异常。
- property version : int¶
返回当前检出的数据集版本
- versions()¶
返回此数据集中的所有版本。
-
add_columns(转换: dict[str, str] | BatchUDF | ReaderLike | pyarrow.Field | list[pyarrow.Field] | pyarrow.Schema, read_columns: list[str] | None =
-
class lance.LanceFragment(dataset: LanceDataset, fragment_id: int | None, *, 片段: _Fragment | None =
None)¶ - count_rows(self, 表达式 filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, bool cache_metadata=True, MemoryPool memory_pool=None)¶
统计符合扫描器筛选条件的行数。
- Parameters:
- filter : Expression, default None
Scan 将仅返回与筛选条件匹配的行。 如果可能,谓词将被下推以利用数据源中的分区信息或内部元数据,例如 Parquet 统计信息。否则在生成 RecordBatches 之前会对加载的数据进行筛选。
- batch_size : int, default 131_072
扫描记录批次的最大行数。如果扫描的记录批次超出内存限制,可以调用此方法来减小其大小。
- batch_readahead : int, default 16
文件中预读取的批次数量。这可能不适用于所有文件格式。增加此数值将提高内存使用量,但也可能提升IO利用率。
- fragment_readahead : int, default 4
预读取的文件数量。增加此数值将提高内存使用率,但也可能提升IO利用率。
- fragment_scan_options : FragmentScanOptions, default None
特定于某次扫描和片段类型的选项,这些选项在同个数据集的不同扫描之间可能会发生变化。
- use_threads : bool, default True
如果启用,则将根据可用CPU核心数量使用最大并行度。
- cache_metadata : bool, default True
如果启用,扫描时可能会缓存元数据以加速重复扫描。
- memory_pool : MemoryPool, default None
如需内存分配,可在此指定。若未指定,则使用默认池。
- Returns:
计数
- Return type:
int
-
static create(dataset_uri: str | Path, 数据: ReaderLike, fragment_id: int | None =
None, schema: pa.Schema | None =None, max_rows_per_group: int =1024, 进度: FragmentWriteProgress | None =None, mode: str ='append', *, data_storage_version: str | None =None, use_legacy_format: bool | None =None, storage_options: dict[str, str] | None =None) FragmentMetadata¶ 从给定数据创建
FragmentMetadata。如果数据集尚未创建,可以使用此功能。
警告
内部API。该方法不面向终端用户使用。
- Parameters:
- dataset_uri : str¶
数据集的URI。
- fragment_id : int¶
片段的ID。
- data : pa.Table or pa.RecordBatchReader¶
要写入片段的数据。
- schema : pa.Schema, optional¶
数据的模式。如果未指定,将从数据中推断模式。
- max_rows_per_group : int, default 1024¶
数据文件中每个分组的最大行数。
- progress : FragmentWriteProgress, optional¶
实验性API。用于跟踪片段写入进度的功能。传入一个自定义类,该类需定义在开始写入每个片段和完成写入时调用的钩子函数。
- mode : str, default "append"¶
写入模式。如果指定为“append”,数据将与现有数据集的模式进行校验。否则,传递“create”或“overwrite”将为模式分配新的字段ID。
- data_storage_version : optional, str, default None¶
要使用的数据存储格式版本。较新的版本效率更高,但需要较新版本的lance才能读取。默认值(None)将使用最新的稳定版本。更多详情请参阅用户指南。
- use_legacy_format : bool, default None¶
已弃用的参数。请改用 data_storage_version。
- storage_options : optional, dict¶
针对特定存储连接的额外选项。这用于存储连接参数,如凭证、端点等。
另请参阅
lance.dataset.LanceOperation.Overwrite该操作用于创建一个新数据集或覆盖现有数据集,使用此API生成的片段。有关使用此API的示例,请参阅文档页面。
lance.dataset.LanceOperation.Append该操作用于将通过此API创建的片段追加到现有数据集中。查看文档页面了解使用此API的示例。
- Return type:
- static create_from_file(文件名: str, dataset: LanceDataset, fragment_id: int) FragmentMetadata¶
从给定的数据文件URI创建片段。
如果数据文件从数据集中丢失,可以使用此功能。
警告
内部API。该方法不面向终端用户使用。
- Parameters:
- filename : str¶
数据文件的文件名。
- dataset : LanceDataset¶
该片段所属的数据集。
- fragment_id : int¶
片段的ID。
- data_files()¶
返回此片段的数据文件。
- delete(predicate: str) FragmentMetadata | None¶
从此片段中删除行。
这将添加或更新该片段的删除文件。它不会修改或删除该片段的数据文件。如果删除后没有剩余行,该方法将返回None。
警告
内部API。该方法不面向终端用户使用。
- Parameters:
- predicate : str¶
指定要删除行的SQL谓词。
- Returns:
包含新删除文件的新片段,如果没有剩余行则为None。
- Return type:
FragmentMetadata 或 None
示例
>>> import lance >>> import pyarrow as pa >>> tab = pa.table({"a": [1, 2, 3], "b": [4, 5, 6]}) >>> dataset = lance.write_dataset(tab, "dataset") >>> frag = dataset.get_fragment(0) >>> frag.delete("a > 1") FragmentMetadata(id=0, files=[DataFile(path='...', fields=[0, 1], ...), ...) >>> frag.delete("a > 0") is None True另请参阅
lance.dataset.LanceOperation.Delete用于将这些更改提交到数据集的操作。有关使用此API的示例,请参阅文档页面。
- deletion_file()¶
返回删除文件(如果有的话)
- property fragment_id¶
- head(self, int num_rows, columns=None, 表达式 filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, bool cache_metadata=True, MemoryPool memory_pool=None)¶
加载片段的前N行。
- Parameters:
- num_rows : int
要加载的行数。
- columns : list of str, default None
要投影的列。这可以是一个包含列名的列表(顺序和重复项将被保留),或者一个字典,其中包含{新列名: 表达式}值用于更高级的投影。
列或表达式的列表可以使用特殊字段: __batch_index(批次在片段内的索引), __fragment_index(片段在数据集内的索引), __last_in_fragment(批次是否是片段中的最后一个),以及 __filename(源文件名或源片段的描述)。
这些列将被传递到数据集和相应的数据片段,以避免加载、复制和反序列化计算链后续不需要的列。 默认情况下,所有可用列都会被投影。如果引用的列名在数据集的Schema中不存在,则会引发异常。
- filter : Expression, default None
Scan将仅返回与筛选条件匹配的行。 如果可能,谓词将被下推以利用数据源中的分区信息或内部元数据,例如Parquet统计信息。否则在生成RecordBatches之前会对已加载的记录批次进行过滤。
- batch_size : int, default 131_072
扫描记录批次的最大行数。如果扫描的记录批次超出内存限制,可以调用此方法来减小其大小。
- batch_readahead : int, default 16
文件中预读取的批次数量。这可能不适用于所有文件格式。增加此数值将提高内存使用量,但也可能提升IO利用率。
- fragment_readahead : int, default 4
预读取的文件数量。增加此数值将提高内存使用率,但也可能提升IO利用率。
- fragment_scan_options : FragmentScanOptions, default None
特定于某次扫描和片段类型的选项,这些选项在同个数据集的不同扫描之间可能会发生变化。
- use_threads : bool, default True
如果启用,则将根据可用CPU核心数量使用最大并行度。
- cache_metadata : bool, default True
如果启用,扫描时可能会缓存元数据以加速重复扫描。
- memory_pool : MemoryPool, default None
如需内存分配,可在此指定。若未指定,则使用默认池。
- Return type:
表格
-
merge(data_obj: ReaderLike, left_on: str, right_on: str | None =
None, schema=None) tuple[FragmentMetadata, LanceSchema]¶ 将另一个数据集合并到此片段中。
执行左连接操作,其中片段作为左表,data_obj作为右表。存在于数据集中但不在左表的行将被填充为null值,除非Lance不支持某些类型的null值,这种情况下会抛出错误。
- Parameters:
示例
>>> import lance >>> import pyarrow as pa >>> df = pa.table({'x': [1, 2, 3], 'y': ['a', 'b', 'c']}) >>> dataset = lance.write_dataset(df, "dataset") >>> dataset.to_table().to_pandas() x y 0 1 a 1 2 b 2 3 c >>> fragments = dataset.get_fragments() >>> new_df = pa.table({'x': [1, 2, 3], 'z': ['d', 'e', 'f']}) >>> merged = [] >>> schema = None >>> for f in fragments: ... f, schema = f.merge(new_df, 'x') ... merged.append(f) >>> merge = lance.LanceOperation.Merge(merged, schema) >>> dataset = lance.LanceDataset.commit("dataset", merge, read_version=1) >>> dataset.to_table().to_pandas() x y z 0 1 a d 1 2 b e 2 3 c f另请参阅
LanceDataset.merge_columns向此片段添加列。
- Returns:
包含合并列和最终模式的新片段。
- Return type:
元组[FragmentMetadata, LanceSchema]
-
merge_columns(value_func: dict[str, str] | BatchUDF | ReaderLike | collections.abc.Callable[[pa.RecordBatch], pa.RecordBatch], columns: list[str] | None =
None, batch_size: int | None =None, reader_schema: pa.Schema | None =None) tuple[FragmentMetadata, LanceSchema]¶ 向此片段添加列。
警告
内部API。该方法不面向终端用户使用。
参数及其解释与
lance.dataset.LanceDataset.add_columns()操作中的相同。唯一的区别在于,不是修改数据集,而是创建一个新的片段。新片段的模式也会被返回。这些可以在后续操作中用于将更改提交到数据集。
另请参阅
lance.dataset.LanceOperation.Merge用于将这些更改提交到数据集的操作。有关使用此API的示例,请参阅文档页面。
- Returns:
一个包含新增列和最终模式的新片段。
- Return type:
元组[FragmentMetadata, LanceSchema]
- property metadata : FragmentMetadata¶
返回此片段的元数据。
- Return type:
- property num_deletions : int¶
返回此片段中已删除的行数。
- property physical_rows : int¶
返回此片段中原始的行数。
要获取删除后的行数,请改用
count_rows()。
-
scanner(*, columns: list[str] | dict[str, str] | None =
None, batch_size: int | None =None, filter: str | pa.compute.Expression | None =None, limit: int | None =None, offset: int | None =None, with_row_id: bool =False, with_row_address: bool =False, batch_readahead: int =16) LanceScanner¶ 详情请参阅 Dataset::scanner
- take(self, 索引, columns=None, 表达式 filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, bool cache_metadata=True, MemoryPool memory_pool=None)¶
按索引选择数据行。
- Parameters:
- indices : Array or array-like¶
数据集中要选择的行索引。
- columns : list of str, default None
要投影的列。这可以是要包含的列名列表(顺序和重复项将保留),或者用于更高级投影的{新列名: 表达式}字典。
列或表达式列表可以使用特殊字段: __batch_index(批次在片段内的索引), __fragment_index(片段在数据集内的索引), __last_in_fragment(批次是否为片段中的最后一个),以及 __filename(源文件名或源片段的描述)。
这些列将被传递到数据集和相应的数据片段,以避免加载、复制和反序列化计算链后续不需要的列。 默认情况下会投影所有可用列。如果引用的任何列名在数据集的Schema中不存在,则会引发异常。
- filter : Expression, default None
Scan 将仅返回与筛选条件匹配的行。 如果可能,谓词将被下推以利用数据源中的分区信息或内部元数据,例如 Parquet 统计信息。否则在生成 RecordBatches 之前会对加载的数据进行筛选。
- batch_size : int, default 131_072
扫描记录批次的最大行数。如果扫描的记录批次超出内存限制,可以调用此方法来减小其大小。
- batch_readahead : int, default 16
文件中预读取的批次数量。这可能不适用于所有文件格式。增加此数值将提高内存使用量,但也可能提升IO利用率。
- fragment_readahead : int, default 4
预读取的文件数量。增加此数值将提高内存使用率,但也可能提升IO利用率。
- fragment_scan_options : FragmentScanOptions, default None
特定于某次扫描和片段类型的选项,这些选项在同个数据集的不同扫描之间可能会发生变化。
- use_threads : bool, default True
如果启用,则将根据可用CPU核心数量使用最大并行度。
- cache_metadata : bool, default True
如果启用,扫描时可能会缓存元数据以加速重复扫描。
- memory_pool : MemoryPool, default None
如需内存分配,可在此指定。若未指定,则使用默认池。
- Return type:
表格
- to_batches(self, Schema schema=None, columns=None, 表达式 filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, bool cache_metadata=True, MemoryPool memory_pool=None)¶
将片段读取为物化的记录批次。
- Parameters:
- schema : Schema, optional
用于扫描的具体模式。
- columns : list of str, default None
要投影的列。这可以是一个包含列名的列表(顺序和重复项将被保留),或者一个字典,其中包含{新列名: 表达式}值用于更高级的投影。
列或表达式的列表可以使用特殊字段: __batch_index(批次在片段内的索引), __fragment_index(片段在数据集内的索引), __last_in_fragment(批次是否为片段中的最后一个),以及 __filename(源文件名或源片段的描述)。
这些列将被传递到数据集和相应的数据片段,以避免加载、复制和反序列化计算链后续不需要的列。 默认情况下会投影所有可用列。如果引用的任何列名在数据集的Schema中不存在,则会引发异常。
- filter : Expression, default None
Scan 将仅返回与筛选条件匹配的行。 如果可能,谓词将被下推以利用数据源中的分区信息或内部元数据,例如 Parquet 统计信息。否则在生成 RecordBatches 之前会对加载的数据进行筛选。
- batch_size : int, default 131_072
扫描记录批次的最大行数。如果扫描的记录批次超出内存限制,可以调用此方法来减小其大小。
- batch_readahead : int, default 16
文件中预读取的批次数量。这可能不适用于所有文件格式。增加此数值将提高内存使用量,但也可能提升IO利用率。
- fragment_readahead : int, default 4
预读取的文件数量。增加此数值将提高内存使用率,但也可能提升IO利用率。
- fragment_scan_options : FragmentScanOptions, default None
特定于某次扫描和片段类型的选项,这些选项在同个数据集的不同扫描之间可能会发生变化。
- use_threads : bool, default True
如果启用,则将根据可用CPU核心数量使用最大并行度。
- cache_metadata : bool, default True
如果启用,扫描时可能会缓存元数据以加速重复扫描。
- memory_pool : MemoryPool, default None
如需内存分配,可在此指定。若未指定,则使用默认池。
- Returns:
record_batches
- Return type:
RecordBatch的迭代器
- to_table(self, 模式 schema=None, columns=None, 表达式 filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, bool cache_metadata=True, MemoryPool memory_pool=None)¶
将此片段转换为表格。
请谨慎使用此便捷工具。它会在创建表之前,将扫描结果序列化并全部加载到内存中。
- Parameters:
- schema : Schema, optional
用于扫描的具体模式。
- columns : list of str, default None
要投影的列。这可以是一个要包含的列名列表(顺序和重复项将被保留),或者是一个字典,包含{新列名: 表达式}值用于更高级的投影。
列或表达式列表可以使用特殊字段: __batch_index(批次在片段内的索引), __fragment_index(片段在数据集内的索引), __last_in_fragment(批次是否为片段中的最后一个),以及 __filename(源文件名或源片段的描述)。
这些列将被传递到数据集和相应的数据片段,以避免加载、复制和反序列化计算链后续不需要的列。 默认情况下会投影所有可用列。如果引用的列名在数据集的Schema中不存在,则会引发异常。
- filter : Expression, default None
Scan将仅返回与筛选条件匹配的行。 如果可能,谓词将被下推以利用数据源中的分区信息或内部元数据,例如Parquet统计信息。否则在生成RecordBatches之前会对已加载的记录批次进行过滤。
- batch_size : int, default 131_072
扫描记录批次的最大行数。如果扫描的记录批次超出内存限制,可以调用此方法来减小其大小。
- batch_readahead : int, default 16
文件中预读取的批次数量。这可能不适用于所有文件格式。增加此数值将提高内存使用量,但也可能提升IO利用率。
- fragment_readahead : int, default 4
预读取的文件数量。增加此数值将提高内存使用率,但也可能提升IO利用率。
- fragment_scan_options : FragmentScanOptions, default None
特定于某次扫描和片段类型的选项,这些选项在同个数据集的不同扫描之间可能会发生变化。
- use_threads : bool, default True
如果启用,则将根据可用CPU核心数量使用最大并行度。
- cache_metadata : bool, default True
如果启用,扫描时可能会缓存元数据以加速重复扫描。
- memory_pool : MemoryPool, default None
如需内存分配,可在此指定。若未指定,则使用默认池。
- Returns:
表格
- Return type:
表格
- class lance.LanceOperation¶
- class Append(片段: Iterable[FragmentMetadata])¶
向数据集追加新行。
- fragments¶
包含新行的片段。
- Type:
列表[FragmentMetadata]
警告
这是一个用于分布式操作的高级API。若要在单台机器上向数据集追加数据,请使用
lance.write_dataset()。示例
要向数据集追加新行,首先使用
lance.fragment.LanceFragment.create()创建片段。然后 将片段元数据收集到列表中并传递给此类。 最后,将该操作传递给LanceDataset.commit()方法来创建新数据集。>>> import lance >>> import pyarrow as pa >>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]}) >>> dataset = lance.write_dataset(tab1, "example") >>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]}) >>> fragment = lance.fragment.LanceFragment.create("example", tab2) >>> operation = lance.LanceOperation.Append([fragment]) >>> dataset = lance.LanceDataset.commit("example", operation, ... read_version=dataset.version) >>> dataset.to_table().to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 d- fragments : Iterable[FragmentMetadata]¶
- class BaseOperation¶
可应用于数据集的操作基类。
请参阅
LanceOperation下的可用操作。
- class CreateIndex(uuid: str, name: str, 字段: list[int], dataset_version: int, fragment_ids: set[int], index_version: int)¶
在数据集上创建索引的操作。
- dataset_version : int¶
- fields : List[int]¶
- fragment_ids : Set[int]¶
- index_version : int¶
- name : str¶
- uuid : str¶
- class DataReplacement(replacements: list[DataReplacementGroup])¶
该操作用于替换数据集中的现有数据文件。
- replacements : List[DataReplacementGroup]¶
- class DataReplacementGroup(fragment_id: int, new_file: 数据文件)¶
数据替换组
- fragment_id : int¶
- class Delete(updated_fragments: Iterable[FragmentMetadata], deleted_fragment_ids: Iterable[int], predicate: str)¶
从数据集中移除片段或行。
- updated_fragments¶
已更新为包含新删除向量的片段。
- Type:
列表[FragmentMetadata]
- deleted_fragment_ids¶
已完全删除的片段ID。这些是调用
LanceFragment.delete()返回None的片段。- Type:
整数列表
- predicate¶
用于选择要删除行的原始SQL谓词。
- Type:
字符串
警告
这是一个用于分布式操作的高级API。若要在单台机器上从数据集中删除行,请使用
lance.LanceDataset.delete()。示例
要从数据集中删除行,请在每个片段上调用
lance.fragment.LanceFragment.delete()。如果返回一个新片段,则将其添加到updated_fragments列表中。如果返回None,则表示整个片段已被删除,因此将该片段ID添加到deleted_fragment_ids中。最后,将该操作传递给LanceDataset.commit()方法以完成删除操作。>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2], "b": ["a", "b"]}) >>> dataset = lance.write_dataset(table, "example") >>> table = pa.table({"a": [3, 4], "b": ["c", "d"]}) >>> dataset = lance.write_dataset(table, "example", mode="append") >>> dataset.to_table().to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 d >>> predicate = "a >= 2" >>> updated_fragments = [] >>> deleted_fragment_ids = [] >>> for fragment in dataset.get_fragments(): ... new_fragment = fragment.delete(predicate) ... if new_fragment is not None: ... updated_fragments.append(new_fragment) ... else: ... deleted_fragment_ids.append(fragment.fragment_id) >>> operation = lance.LanceOperation.Delete(updated_fragments, ... deleted_fragment_ids, ... predicate) >>> dataset = lance.LanceDataset.commit("example", operation, ... read_version=dataset.version) >>> dataset.to_table().to_pandas() a b 0 1 a- deleted_fragment_ids : Iterable[int]¶
- predicate : str¶
- updated_fragments : Iterable[FragmentMetadata]¶
- class Merge(片段: Iterable[FragmentMetadata], schema: LanceSchema | 模式/架构)¶
添加列的操作。与覆盖不同,此操作不应改变片段的结构,从而保留现有的索引。
- fragments¶
构成新数据集的片段。
- Type:
FragmentMetadata的可迭代对象
- schema¶
新数据集的模式。建议传递LanceSchema,传递pyarrow.Schema已弃用。
- Type:
LanceSchema 或 pyarrow.Schema
警告
这是一个用于分布式操作的高级API。若要在单机上覆盖或创建新数据集,请使用
lance.write_dataset()。示例
要向数据集添加新列,首先定义一个方法,该方法将基于现有列创建新列。然后使用
lance.fragment.LanceFragment.add_columns()>>> import lance >>> import pyarrow as pa >>> import pyarrow.compute as pc >>> table = pa.table({"a": [1, 2, 3, 4], "b": ["a", "b", "c", "d"]}) >>> dataset = lance.write_dataset(table, "example") >>> dataset.to_table().to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 d >>> def double_a(batch: pa.RecordBatch) -> pa.RecordBatch: ... doubled = pc.multiply(batch["a"], 2) ... return pa.record_batch([doubled], ["a_doubled"]) >>> fragments = [] >>> for fragment in dataset.get_fragments(): ... new_fragment, new_schema = fragment.merge_columns(double_a, ... columns=['a']) ... fragments.append(new_fragment) >>> operation = lance.LanceOperation.Merge(fragments, new_schema) >>> dataset = lance.LanceDataset.commit("example", operation, ... read_version=dataset.version) >>> dataset.to_table().to_pandas() a b a_doubled 0 1 a 2 1 2 b 4 2 3 c 6 3 4 d 8- fragments : Iterable[FragmentMetadata]¶
- class Overwrite(new_schema: LanceSchema | 模式/架构, 片段: Iterable[FragmentMetadata])¶
覆盖或创建新的数据集。
- new_schema¶
新数据集的模式结构。
- Type:
- fragments¶
构成新数据集的片段。
- Type:
列表[FragmentMetadata]
警告
这是一个用于分布式操作的高级API。若要在单机上覆盖或创建新数据集,请使用
lance.write_dataset()。示例
要创建或覆盖数据集,首先使用
lance.fragment.LanceFragment.create()创建片段。然后将这些片段的元数据收集到列表中,并与模式一起传递给这个类。最后,将该操作传递给LanceDataset.commit()方法来创建新数据集。>>> import lance >>> import pyarrow as pa >>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]}) >>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]}) >>> fragment1 = lance.fragment.LanceFragment.create("example", tab1) >>> fragment2 = lance.fragment.LanceFragment.create("example", tab2) >>> fragments = [fragment1, fragment2] >>> operation = lance.LanceOperation.Overwrite(tab1.schema, fragments) >>> dataset = lance.LanceDataset.commit("example", operation) >>> dataset.to_table().to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 d- fragments : Iterable[FragmentMetadata]¶
- class Project(schema: LanceSchema)¶
用于投影列的操作。 可使用此运算符删除列或重命名/交换列。
- schema¶
新数据集的lance模式。
- Type:
LanceSchema
示例
使用投影运算符交换列:
>>> import lance >>> import pyarrow as pa >>> import pyarrow.compute as pc >>> from lance.schema import LanceSchema >>> table = pa.table({"a": [1, 2], "b": ["a", "b"], "b1": ["c", "d"]}) >>> dataset = lance.write_dataset(table, "example") >>> dataset.to_table().to_pandas() a b b1 0 1 a c 1 2 b d >>> >>> ## rename column `b` into `b0` and rename b1 into `b` >>> table = pa.table({"a": [3, 4], "b0": ["a", "b"], "b": ["c", "d"]}) >>> lance_schema = LanceSchema.from_pyarrow(table.schema) >>> operation = lance.LanceOperation.Project(lance_schema) >>> dataset = lance.LanceDataset.commit("example", operation, read_version=1) >>> dataset.to_table().to_pandas() a b0 b 0 1 a c 1 2 b d- schema : LanceSchema¶
- class Rewrite(groups: Iterable[RewriteGroup], rewritten_indices: Iterable[RewrittenIndex])¶
将操作重写为一个或多个文件和索引合并为一个或多个文件和索引。
- groups¶
已被重写的文件组。
- Type:
列表[RewriteGroup]
- rewritten_indices¶
已被重写的索引。
- Type:
列表[RewrittenIndex]
警告
这是一个高级API,不适用于一般用途。
- groups : Iterable[RewriteGroup]¶
- rewritten_indices : Iterable[RewrittenIndex]¶
- class RewriteGroup(old_fragments: Iterable[FragmentMetadata], new_fragments: Iterable[FragmentMetadata])¶
重写文件的集合
- new_fragments : Iterable[FragmentMetadata]¶
- old_fragments : Iterable[FragmentMetadata]¶
- class Update(removed_fragment_ids: list[int], updated_fragments: list[FragmentMetadata], new_fragments: list[FragmentMetadata], fields_modified: list[int])¶
更新数据集中行的操作。
- removed_fragment_ids¶
已完全移除的片段ID。
- Type:
整数列表
- updated_fragments¶
已更新为包含新删除向量的片段。
- Type:
列表[FragmentMetadata]
- new_fragments¶
包含新行的片段。
- Type:
列表[FragmentMetadata]
- fields_modified¶
如果在updated_fragments中有任何字段被修改,那么必须在此列出这些字段,以便从覆盖这些字段的索引中移除这些片段。
- Type:
整数列表
- fields_modified : List[int]¶
- new_fragments : List[FragmentMetadata]¶
- removed_fragment_ids : List[int]¶
- updated_fragments : List[FragmentMetadata]¶
- class lance.LanceScanner(scanner: _Scanner, dataset: LanceDataset)¶
- analyze_plan() str¶
执行此扫描器的计划并显示运行时指标。
- Parameters:
- verbose : bool, default False
使用详细输出格式。
- Returns:
计划
- Return type:
字符串
- count_rows()¶
统计符合扫描器筛选条件的行数。
- Returns:
计数
- Return type:
int
-
explain_plan(verbose=
False) str¶ 返回此扫描器的执行计划。
- Parameters:
- verbose : bool, default False¶
使用详细输出格式。
- Returns:
计划
- Return type:
字符串
- scan_batches()¶
以记录批次形式消费Scanner,并附带对应的片段。
- Returns:
record_batches
- Return type:
TaggedRecordBatch的迭代器
- class lance.MergeInsertBuilder(dataset, 开启)¶
- conflict_retries(max_retries: int) MergeInsertBuilder¶
设置操作在出现争用时的重试次数。
如果该值设置为大于0,则操作将保留输入数据的副本(根据数据大小存储在内存或磁盘上),并在出现争用时重试操作。
默认值为10。
-
execute(data_obj: ReaderLike, *, schema: pa.Schema | None =
None)¶ 执行合并插入操作
此函数会更新原始数据集,并返回一个包含合并统计信息的字典——例如插入、更新和删除的行数。
- Parameters:
- data_obj : ReaderLike¶
作为操作源表使用的新数据。该参数可以是任何数据源(例如表/数据集),只要
write_dataset()能够接受即可。- schema : Optional[pa.Schema]¶
数据的模式。仅当数据源是某种生成器时才需要提供此信息。
-
execute_uncommitted(data_obj: ReaderLike, *, schema: pa.Schema | None =
None) tuple[事务, dict[str, Any]]¶ 执行合并插入操作但不提交
此函数会更新原始数据集,并返回一个包含合并统计信息的字典——例如插入、更新和删除的行数。
- Parameters:
- data_obj : ReaderLike¶
作为操作源表使用的新数据。该参数可以是任何数据源(例如表/数据集),只要
write_dataset()能够接受即可。- schema : Optional[pa.Schema]¶
数据的模式。仅当数据源是某种生成器时才需要提供此信息。
- retry_timeout(timeout: timedelta) MergeInsertBuilder¶
设置用于限制重试的超时时间。
这是放弃操作前允许的最大耗时。无论完成需要多长时间,至少会进行一次尝试。一旦达到此超时时间,后续尝试将被取消。如果在第一次尝试期间已达到超时,操作将在进行第二次尝试前立即取消。
默认值为30秒。
-
when_matched_update_all(condition: str | None =
None) MergeInsertBuilder¶ 配置更新匹配行的操作
调用此方法后,当执行合并插入操作时,任何同时匹配源表和目标表的行将被更新。目标表中的行将被移除,而源表中的行将被添加。
可以指定一个可选条件。这应该是一个SQL过滤器,如果存在,则只有同时满足此过滤器的匹配行才会被更新。SQL过滤器应使用前缀target.来引用目标表中的列,使用前缀source.来引用源表中的列。例如,source.last_update < target.last_update。
如果指定了条件且行不满足该条件,则这些行将不会被更新。不满足筛选条件不会导致"匹配"行变为"不匹配"行。
-
when_not_matched_by_source_delete(expr: str | None =
None) MergeInsertBuilder¶ 配置操作以删除不匹配的源行
调用此方法后,当执行合并插入操作时,仅存在于目标表中的行将被删除。可以指定一个可选过滤器来限制删除操作的范围。如果提供了过滤器(作为SQL过滤器),则只有匹配过滤器的行会被删除。
- when_not_matched_insert_all() MergeInsertBuilder¶
配置操作以插入不匹配的行
调用此方法后,当执行合并插入操作时,仅存在于源表中的所有行将被插入到目标表中。
- class lance.ScanStatistics¶
关于扫描的统计信息。
- bytes_read¶
从磁盘读取的字节数
- indices_loaded¶
已加载的索引数量
- iops¶
执行的IO操作数量。由于存在合并I/O的情况,该数值可能略高于实际数量
- parts_loaded¶
已加载的索引分区数量
-
class lance.Transaction(读取版本: 'int', 操作: 'LanceOperation.BaseOperation', uuid: 'str' =
, blobs_op: 'Optional[LanceOperation.BaseOperation]' = None)¶ -
blobs_op : BaseOperation | None =
None¶
- operation : BaseOperation¶
- read_version : int¶
- uuid : str¶
-
blobs_op : BaseOperation | None =
-
lance.batch_udf(output_schema=
None, checkpoint_file=None)¶ 创建一个用户自定义函数(UDF),用于向数据集添加列。
此函数用于向数据集添加列。它接收一个函数作为参数,该函数接受单个参数(一个RecordBatch)并返回一个RecordBatch。对于数据集中的每个批次,该函数会被调用一次。函数不应修改输入批次,而应创建一个添加了新列的新批次。
- Parameters:
- Return type:
AddColumnsUDF
- lance.bytes_read_counter()¶
-
lance.dataset(uri: str | Path, 版本: int | str | None =
None, asof: ts_types | None =None, block_size: int | None =None, commit_lock: CommitLock | None =None, index_cache_size: int | None =None, storage_options: dict[str, str] | None =None, default_scan_options: dict[str, str] | None =None) LanceDataset¶ 从指定地址打开Lance数据集。
- Parameters:
- uri : str¶
Lance数据集的地址。可以是本地文件路径/tmp/data.lance,也可以是云对象存储URI,例如s3://bucket/data.lance。
- version : optional, int | str¶
如果指定,则加载Lance数据集的特定版本。否则,加载最新版本。可以提供一个版本号(int)或标签(str)。
- asof : optional, datetime or str¶
如果指定此参数,则查找在给定参数值当天或之前创建的最新版本。如果已指定版本号,则忽略此参数。
- block_size : optional, int¶
块大小(以字节为单位)。为最小I/O请求的大小提供提示。
- commit_lock : optional, lance.commit.CommitLock¶
自定义提交锁。仅当您的对象存储不支持原子提交时才需要。详情请参阅用户指南。
- index_cache_size : optional, int¶
索引缓存大小。索引缓存是一个带有TTL的LRU缓存。该数值指定了 在主机内存中缓存的索引页数量,例如IVF分区数。默认值为
256。粗略来说,对于一个包含
n行的IVF_PQ分区,每个索引页的大小等于 pq编码(nd.array([n,pq], dtype=uint8)))和行ID(nd.array([n], dtype=uint64))的组合。 近似计算为n = 总行数 / IVF分区数量。pq = PQ子向量数量。- storage_options : optional, dict¶
针对特定存储连接的额外选项。这用于存储连接参数,如凭证、端点等。
- default_scan_options : optional, dict¶
扫描数据集时使用的默认扫描选项。这接受与
lance.LanceDataset.scanner()中描述的相同参数。这些参数将应用于任何扫描操作。这对于为常见参数(如
batch_size)提供默认值非常有用。它还可用于创建包含元字段(如
_rowid或_rowaddr)的数据集视图。如果提供了default_scan_options,那么当设置了适当的扫描选项时,lance.LanceDataset.schema()返回的模式将包含这些字段。
- lance.iops_counter()¶
- lance.json_to_schema(schema_json: dict[str, Any]) 模式/架构¶
将JSON字符串转换为PyArrow模式。
- Parameters:
- schema_json : Dict[str, Any]¶
要转换为PyArrow模式的JSON负载。
-
lance.set_logger(file_path=
'pylance.log', name='pylance', level=20, format_string=None, log_handler=None)¶
-
lance.write_dataset(data_obj: ReaderLike, uri: str | Path | LanceDataset, schema: pa.Schema | None =
None, mode: str ='create', *, max_rows_per_file: int =1048576, max_rows_per_group: int =1024, max_bytes_per_file: int =96636764160, commit_lock: CommitLock | None =None, 进度: FragmentWriteProgress | None =None, storage_options: dict[str, str] | None =None, data_storage_version: str | None =None, use_legacy_format: bool | None =None, enable_v2_manifest_paths: bool =False, enable_move_stable_row_ids: bool =False, auto_cleanup_options: AutoCleanupConfig | None =None) LanceDataset¶ 将给定的data_obj写入指定的uri
- Parameters:
- data_obj : Reader-like¶
要写入的数据。可接受的类型包括: - Pandas DataFrame、Pyarrow Table、Dataset、Scanner 或 RecordBatchReader - Huggingface 数据集
- uri : str, Path, or LanceDataset¶
数据集写入的目标位置(目录)。如果传入的是LanceDataset,则会复用现有会话。
- schema : Schema, optional¶
如果指定且输入为pandas DataFrame,则使用此模式替代默认的pandas到arrow表的转换。
- mode : str¶
create - 创建新数据集(如果uri已存在则会报错)。 overwrite - 创建新的快照版本 append - 创建新版本,该版本是输入数据与最新版本的合并(如果uri不存在则会报错)
- max_rows_per_file : int, default 1024 * 1024¶
在开始新文件之前要写入的最大行数
- max_rows_per_group : int, default 1024¶
在同一个文件中开始新分组前的最大行数
- max_bytes_per_file : int, default 90 * 1024 * 1024 * 1024¶
在开始新文件之前要写入的最大字节数。这是一个软性限制。该限制在每组数据写入后进行检查,这意味着较大的组可能会导致此限制被显著超出。默认值为90 GB,因为我们在对象存储上对每个文件有100 GB的硬性限制。
- commit_lock : CommitLock, optional¶
自定义提交锁。仅当您的对象存储不支持原子提交时才需要。详情请参阅用户指南。
- progress : FragmentWriteProgress, optional¶
实验性API。用于跟踪片段写入进度的功能。传入一个自定义类,该类需定义在开始写入每个片段和完成写入时调用的钩子函数。
- storage_options : optional, dict¶
针对特定存储连接的额外选项。这用于存储连接参数,如凭证、端点等。
- data_storage_version : optional, str, default None¶
要使用的数据存储格式版本。较新的版本效率更高,但需要较新版本的lance才能读取。默认值(None)将使用最新的稳定版本。更多详情请参阅用户指南。
- use_legacy_format : optional, bool, default None¶
已弃用的设置数据存储版本的方法。请改用data_storage_version参数。
- enable_v2_manifest_paths : bool, optional¶
如果为True,且这是一个新数据集,则使用新的V2清单路径。 这些路径能更高效地打开存储在对象存储上具有多个版本的数据集。如果数据集已存在,此参数将不起作用。要迁移现有数据集,请改用
LanceDataset.migrate_manifest_paths_v2()方法。默认为False。- enable_move_stable_row_ids : bool, optional¶
实验性参数:如果设置为true,写入器将使用移动稳定的行ID。 这些行ID在压缩操作后保持稳定,但在更新后不稳定。 这使得压缩更加高效,因为使用稳定的行ID时, 无需更新二级索引来指向新的行ID。
- auto_cleanup_options : optional, AutoCleanupConfig¶
数据集自动清理的配置选项。 如果设置此选项且这是一个新数据集,旧数据集版本将根据此参数自动清理。 要为现有数据集添加自动清理功能,请使用Dataset::update_config设置 lance.auto_cleanup.interval和lance.auto_cleanup.older_than。 必须同时设置这两个参数才能启用自动清理功能。 如果不设置此参数(默认行为), 则不会执行自动清理。 注意:此选项仅在创建新数据集时生效, 对现有数据集没有影响。
- class lance.dataset.BulkCommitResult¶
- dataset : LanceDataset¶
- class lance.dataset.DataStatistics(字段: FieldStatistics)¶
数据集中的数据统计信息
- fields : FieldStatistics¶
数据集中字段的统计信息
- class lance.dataset.DatasetOptimizer(dataset: LanceDataset)¶
-
compact_files(*, target_rows_per_fragment: int =
1048576, max_rows_per_group: int =1024, max_bytes_per_file: int | None =None, materialize_deletions: bool =True, materialize_deletions_threshold: float =0.1, num_threads: int | None =None, batch_size: int | None =None) CompactionMetrics¶ 压缩数据集中的小文件,减少文件总数。
- This does a few things:
从片段中移除已删除的行
从片段中移除已删除的列
将小片段合并为更大的片段
该方法会保留数据集的插入顺序。这意味着如果存在的小片段不与需要压缩的其他相邻片段相连,它们可能会保留在数据集中。例如,如果您有行数为500万、100和500万的片段,中间的片段将不会被压缩,因为与其相邻的片段不需要压缩。
- Parameters:
- target_rows_per_fragment : int, default 1024*1024¶
每个片段的目标行数。这是压缩后每个片段中包含的行数。
- max_rows_per_group : int, default 1024¶
每个分组的最大行数。这不会影响哪些片段需要压缩,但如果被选中,会影响它们的重写方式。
此设置仅影响使用旧版存储格式的数据集。新格式不需要行分组。
- max_bytes_per_file : Optional[int], default None¶
单个文件的最大字节数。这不会影响哪些片段需要压缩,但会影响它们被选中时的重写方式。如果该值设置过小,可能会导致片段大小小于target_rows_per_fragment。
默认值将使用
write_dataset中的默认设置。- materialize_deletions : bool, default True¶
是否压缩包含软删除行的片段,使其不再出现在文件中。
- materialize_deletions_threshold : float, default 0.1¶
在片段成为压缩候选之前,其中被软删除的原始行所占的比例。
- num_threads : int, optional¶
执行压缩操作时使用的线程数。如果未指定,默认使用机器上的核心数。
- batch_size : int, optional¶
The batch size to use when scanning input fragments. You may want to reduce this if you are running out of memory during compaction.
The default will use the same default from
scanner.
- Returns:
关于压缩过程的指标
- Return type:
压缩指标
另请参阅
lance.optimize.Compaction
- optimize_indices(**kwargs)¶
优化索引性能。
当新数据到来时,它不会自动添加到现有索引中。 在搜索时,我们需要对旧数据执行索引搜索,同时 对新数据执行昂贵的非索引搜索。随着新增 未索引数据量的增长,这可能会影响搜索延迟。 此功能会将新数据添加到现有索引中,从而恢复 性能。此功能不会重新训练索引,它仅将 新数据分配到现有分区。这意味着更新比 重新训练整个索引要快得多,但准确性可能较低 (特别是如果新数据表现出新模式、概念或趋势)
- Parameters:
- num_indices_to_merge : int, default 1
要合并的索引数量。 如果设置为0,将创建新的增量索引。
- index_names : List[str], default None
需要优化的索引名称。 如果为None,则将优化所有索引。
- retrain : bool, default False
是否重新训练整个索引。 如果为true,将基于当前数据重新训练索引, num_indices_to_merge将被忽略, 所有索引将被合并为一个。
当数据分布发生显著变化时这很有用, 我们希望通过重新训练索引来提高搜索质量。 这比从头开始重建索引更快。
-
compact_files(*, target_rows_per_fragment: int =
- class lance.dataset.DatasetStats¶
- num_deleted_rows : int¶
- num_fragments : int¶
- num_small_files : int¶
- class lance.dataset.ExecuteResult¶
- num_deleted_rows : int¶
- num_inserted_rows : int¶
- num_updated_rows : int¶
- class lance.dataset.FieldStatistics(id: int, bytes_on_disk: int)¶
数据集中某个字段的统计信息
- bytes_on_disk : int¶
(可能经过压缩的)磁盘上用于存储该字段的字节
- id : int¶
字段的ID
- class lance.dataset.Index¶
- fields : List[str]¶
- fragment_ids : Set[int]¶
- name : str¶
- type : str¶
- uuid : str¶
- version : int¶
-
class lance.dataset.LanceDataset(uri: str | Path, 版本: int | str | None =
None, block_size: int | None =None, index_cache_size: int | None =None, metadata_cache_size: int | None =None, commit_lock: CommitLock | None =None, storage_options: dict[str, str] | None =None, serialized_manifest: bytes | None =None, default_scan_options: dict[str, Any] | None =None)¶ Lance数据集采用Lance格式,数据存储在给定的uri位置。
-
add_columns(转换: dict[str, str] | BatchUDF | ReaderLike | pyarrow.Field | list[pyarrow.Field] | pyarrow.Schema, read_columns: list[str] | None =
None, reader_schema: pa.Schema | None =None, batch_size: int | None =None)¶ 使用定义的值添加新列。
有几种方式可以指定新列。首先,可以为每个新列提供SQL表达式。其次,可以提供一个UDF(用户定义函数),该函数接收一批现有数据并返回包含新列的新数据批次。这些新列将被追加到数据集中。
你也可以提供一个RecordBatchReader,它将从某个外部源读取新列的值。当新列的值已经暂存到文件中(通常由某个分布式进程完成)时,这通常很有用。
有关编写UDF的更多信息,请参阅
lance.add_columns_udf()装饰器。- Parameters:
- transforms : dict or AddColumnsUDF or ReaderLike¶
如果这是一个字典,那么键是新列的名称,值则是SQL表达式字符串。这些字符串可以引用数据集中的现有列。 如果这是一个AddColumnsUDF,那么它是一个用户定义函数,接收一批现有数据并返回包含新列的新数据批次。 如果这是
pyarrow.Field或pyarrow.Schema,则会以仅元数据操作的方式添加所有具有给定模式的NULL列。- read_columns : list of str, optional¶
UDF将读取的列名。如果为None,则UDF将读取所有列。仅当transforms是UDF时使用此参数。否则,读取的列将从SQL表达式中推断得出。
- reader_schema : pa.Schema, optional¶
仅当transforms是ReaderLike对象时有效。这将用于确定读取器的模式。
- batch_size : int, optional¶
在应用转换时,每次从源数据集中读取的行数。如果数据集是v1版本,则忽略此参数。
示例
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3]}) >>> dataset = lance.write_dataset(table, "my_dataset") >>> @lance.batch_udf() ... def double_a(batch): ... df = batch.to_pandas() ... return pd.DataFrame({'double_a': 2 * df['a']}) >>> dataset.add_columns(double_a) >>> dataset.to_table().to_pandas() a double_a 0 1 2 1 2 4 2 3 6 >>> dataset.add_columns({"triple_a": "a * 3"}) >>> dataset.to_table().to_pandas() a double_a triple_a 0 1 2 3 1 2 4 6 2 3 6 9另请参阅
LanceDataset.merge将一组预先计算好的列合并到数据集中。
- alter_columns(*修改: Iterable[AlterColumn])¶
修改列名、数据类型和可空性。
重命名的列可以保留其上的任何索引。如果某列具有IVF_PQ索引,即使该列被转换为其他类型,索引仍可保留。但目前其他类型的索引尚不支持类型转换。
列类型可以进行向上转型(例如从int32转为int64)或向下转型(例如从int64转为int32)。但若存在无法用新类型表示的值时,向下转型会失败。通常来说,列可以转型为相同的大类类型:整型转整型、浮点型转浮点型、字符串转字符串。不过字符串、二进制和列表列可以在其大小变体之间进行转型。例如:字符串转大字符串、二进制转大二进制、列表转大列表。
重命名的列可以保留其上的任何索引。但是,如果将该列转换为不同类型,其索引将被删除。
- Parameters:
- alterations : Iterable[Dict[str, Any]]¶
一个字典序列,每个字典包含以下键:
- ”path”: str
要修改的列路径。对于顶级列,这是列名。 对于嵌套列,这是点分隔的路径,例如"a.b.c"。
- ”name”: str, optional
列的新名称。如果未指定,则列名保持不变。
- ”nullable”: bool, optional
列是否可为空。如果未指定,则列的可空性不变。 只能将非空列改为可空列。目前不能将可空列改为非空列。
- ”data_type”: pyarrow.DataType, optional
要将列转换到的新数据类型。如果未指定,则列的数据类型保持不变。
示例
>>> import lance >>> import pyarrow as pa >>> schema = pa.schema([pa.field('a', pa.int64()), ... pa.field('b', pa.string(), nullable=False)]) >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example") >>> dataset.alter_columns({"path": "a", "name": "x"}, ... {"path": "b", "nullable": True}) >>> dataset.to_table().to_pandas() x b 0 1 a 1 2 b 2 3 c >>> dataset.alter_columns({"path": "x", "data_type": pa.int32()}) >>> dataset.schema x: int32 b: string
- checkout_version(版本: int | str) LanceDataset¶
加载指定版本的数据集。
与
dataset()构造函数不同,此操作将重用当前缓存。如果数据集已处于指定版本,则此操作无效。- Parameters:
- version : int | str,¶
要检出的版本号。可以提供一个版本号(int)或标签(str)。
- Return type:
-
cleanup_old_versions(older_than: timedelta | None =
None, *, delete_unverified: bool =False, error_if_tagged_old_versions: bool =True) CleanupStats¶ 清理数据集的旧版本。
某些数据集变更(如覆盖操作)会遗留未被最新版本引用的数据。这些旧数据会被保留,以便将数据集回滚到之前的版本。
该方法将移除旧版本及其引用的所有数据文件。 一旦清理任务执行完毕,您将无法检出或恢复这些旧版本。
- Parameters:
- older_than : timedelta, optional¶
仅早于此版本的记录将被删除。如果未指定,默认保留两周内的记录。
- delete_unverified : bool, default False¶
事务失败后遗留的文件可能看起来像是正在进行中的操作(例如追加新数据)的一部分,这些文件只有在至少7天后才会被删除。如果delete_unverified为True,则无论文件存在多久都会被删除。
只有在您能确保当前没有其他进程正在处理此数据集时,才应将此设置为True。否则数据集可能会进入损坏状态。
- error_if_tagged_old_versions : bool, default True¶
某些版本可能关联了标签。带标签的版本不会被清理,无论它们存在多久。如果该参数设为True(默认值),当任何带标签版本匹配参数时会抛出异常。否则,带标签版本将被静默忽略,仅清理未加标签的版本。
-
static commit(base_uri: str | Path | LanceDataset, 操作: LanceOperation.BaseOperation | 事务, blobs_op: LanceOperation.BaseOperation | None =
None, read_version: int | None =None, commit_lock: CommitLock | None =None, storage_options: dict[str, str] | None =None, enable_v2_manifest_paths: bool | None =None, 分离: bool | None =False, max_retries: int =20) LanceDataset¶ 创建数据集的新版本
此方法是一个高级方法,允许用户描述对数据文件所做的更改。当使用Lance应用更改时(例如使用
LanceDataset或write_dataset()),则不需要此方法。它当前的目的是允许在分布式环境中进行更改,其中没有单一进程完成所有工作。例如,分布式批量更新或分布式批量修改操作。
一旦完成所有更改,可以调用此方法通过更新数据集清单使更改生效。
警告
这是一个高级API,不提供与其他API相同级别的验证。例如,调用者有责任确保片段对模式有效。
- Parameters:
- base_uri : str, Path, or LanceDataset¶
数据集的基础URI,或者数据集对象本身。使用数据集对象可能更高效,因为它可以复用文件元数据缓存。
- operation : BaseOperation¶
应用于数据集的操作。这描述了已进行的更改。可用的操作请参见
LanceOperation。- read_version : int, optional¶
作为变更基础的数据集版本。 覆盖或恢复操作不需要此版本。
- commit_lock : CommitLock, optional¶
自定义提交锁。仅当您的对象存储不支持原子提交时才需要。详情请参阅用户指南。
- storage_options : optional, dict¶
针对特定存储连接的额外选项。这用于存储连接参数,如凭证、端点等。
- enable_v2_manifest_paths : bool, optional¶
如果为True,且这是一个新数据集,则使用新的V2清单路径。 这些路径能更高效地在对象存储上打开包含多个版本的数据集。如果数据集已存在,此参数将不起作用。要迁移现有数据集,请改用
migrate_manifest_paths_v2()方法。默认为False。警告: 启用此选项将使旧版Lance(0.17.0之前版本)无法读取该数据集。- detached : bool, optional¶
如果为True,则该提交不会成为数据集谱系的一部分。它将永远不会显示为最新数据集,未来唯一查看它的方式是通过指定版本号检出。该版本将是一个随机版本号,仅在分离提交中保持唯一。调用方应将其存储在某处,因为未来将无法通过其他方式获取它。
- max_retries : int¶
提交数据集时执行的最大重试次数。
- Returns:
Lance数据集的新版本。
- Return type:
示例
使用
LanceOperation.Overwrite操作创建新数据集:>>> import lance >>> import pyarrow as pa >>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]}) >>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]}) >>> fragment1 = lance.fragment.LanceFragment.create("example", tab1) >>> fragment2 = lance.fragment.LanceFragment.create("example", tab2) >>> fragments = [fragment1, fragment2] >>> operation = lance.LanceOperation.Overwrite(tab1.schema, fragments) >>> dataset = lance.LanceDataset.commit("example", operation) >>> dataset.to_table().to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 d
-
static commit_batch(dest: str | Path | LanceDataset, 交易: collections.abc.Sequence[事务], commit_lock: CommitLock | None =
None, storage_options: dict[str, str] | None =None, enable_v2_manifest_paths: bool | None =None, 分离: bool | None =False, max_retries: int =20) BulkCommitResult¶ 使用多个事务创建数据集的新版本。
此方法是一个高级方法,允许用户描述对数据文件所做的更改。当使用Lance应用更改时(例如使用
LanceDataset或write_dataset()),则不需要此方法。- Parameters:
- dest : str, Path, or LanceDataset¶
数据集的基础URI,或者数据集对象本身。使用数据集对象可能更高效,因为它可以复用文件元数据缓存。
- transactions : Iterable[Transaction]¶
要应用于数据集的事务操作。这些操作将被合并为单个事务并应用到数据集中。注意:目前仅支持追加事务,其他类型的事务将在未来版本中支持。
- commit_lock : CommitLock, optional¶
自定义提交锁。仅当您的对象存储不支持原子提交时才需要。详情请参阅用户指南。
- storage_options : optional, dict¶
针对特定存储连接的额外选项。这用于存储连接参数,如凭证、端点等。
- enable_v2_manifest_paths : bool, optional¶
如果为True,且这是一个新数据集,则使用新的V2清单路径。 这些路径能更高效地在对象存储上打开包含多个版本的数据集。如果数据集已存在,此参数将不起作用。要迁移现有数据集,请改用
migrate_manifest_paths_v2()方法。默认为False。警告: 启用此选项将使旧版Lance(0.17.0之前版本)无法读取该数据集。- detached : bool, optional¶
如果为True,则该提交不会成为数据集谱系的一部分。它将永远不会显示为最新数据集,未来唯一查看它的方式是通过指定版本号检出。该版本将是一个随机版本号,仅在分离提交中保持唯一。调用方应将其存储在某处,因为未来将无法通过其他方式获取它。
- max_retries : int¶
提交数据集时执行的最大重试次数。
- Returns:
- dataset: LanceDataset
Lance数据集的新版本。
- merged: Transaction
应用于数据集的合并事务。
- Return type:
包含键的字典
-
count_rows(filter: 表达式 | str | None =
None, **kwargs) int¶ 统计符合扫描器筛选条件的行数。
- Parameters:
- **kwargs : dict, optional¶
完整参数描述请参见 py:method:scanner 方法。
- Returns:
count – 数据集中的总行数。
- Return type:
int
-
create_index(列: str | list[str], index_type: str, name: str | None =
None, metric: str ='L2', 替换: bool =False, num_partitions: int | None =None, ivf_centroids: np.ndarray | pa.FixedSizeListArray | pa.FixedShapeTensorArray | None =None, pq_codebook: np.ndarray | pa.FixedSizeListArray | pa.FixedShapeTensorArray | None =None, num_sub_vectors: int | None =None, accelerator: str | 'torch.Device' | None =None, index_cache_size: int | None =None, shuffle_partition_batches: int | None =None, shuffle_partition_concurrency: int | None =None, ivf_centroids_file: str | None =None, precomputed_partition_dataset: str | None =None, storage_options: dict[str, str] | None =None, filter_nan: bool =True, one_pass_ivfpq: bool =False, **kwargs) LanceDataset¶ 在列上创建索引。
实验性API
- Parameters:
- column : str¶
要建立索引的列。
- index_type : str¶
索引的类型。
"IVF_PQ, IVF_HNSW_PQ 和 IVF_HNSW_SQ"目前支持。- name : str, optional¶
索引名称。如果未提供,将根据列名自动生成。
- metric : str¶
距离度量类型,即“L2”(“euclidean”的别名)、“cosine”或“dot”(点积)。默认为“L2”。
- replace : bool¶
如果索引已存在,则替换现有索引。
- num_partitions : int, optional¶
IVF(倒排文件索引)的分区数量。
- ivf_centroids : optional¶
它可以是
np.ndarray、pyarrow.FixedSizeListArray或pyarrow.FixedShapeTensorArray。 一个num_partitions x dimension维度的现有K均值中心点数组, 用于IVF聚类。如果未提供,将训练一个新的KMeans模型。- pq_codebook : optional,¶
它可以是
np.ndarray、pyarrow.FixedSizeListArray, 或pyarrow.FixedShapeTensorArray。 一个num_sub_vectors x (2 ^ nbits * dimensions // num_sub_vectors)数组,表示PQ码本的K均值中心点。注意:目前
nbits始终为8。 如果未提供,将训练一个新的PQ模型。- num_sub_vectors : int, optional¶
PQ(乘积量化)的子向量数量。
- accelerator: str | 'torch.Device' | None =
None¶ 如果设置,将使用加速器来加快训练过程。 支持的加速器包括:"cuda"(英伟达GPU)和"mps"(苹果硅GPU)。 如果未设置,则使用CPU。
- index_cache_size : int, optional¶
索引缓存的大小,以条目数表示。默认值为256。
- shuffle_partition_batches : int, optional¶
批次数,使用数据集的row group大小,决定每个shuffle分区包含的数量。默认值为10240。
假设row group大小为1024,每个shuffle分区将包含10240 * 1024 = 10,485,760行。减小此值会减少shuffle操作的内存消耗但会增加完成时间,反之亦然。
- shuffle_partition_concurrency : int, optional¶
并发处理的shuffle分区数量。默认值为2
减小该值可以减少shuffle操作的内存消耗,但会增加完成时间,反之亦然。
- storage_options : optional, dict¶
针对特定存储连接的额外选项。这用于存储连接参数,如凭证、端点等。
- filter_nan : bool¶
默认为True。False是不安全的,如果存在任何null/nan值会导致崩溃(否则不会)。禁用用于可空列的空值过滤器。可获得小幅速度提升。
- one_pass_ivfpq : bool¶
默认为False。如果启用,索引类型必须为“IVF_PQ”。可减少磁盘IO。
- **kwargs¶
传递给索引构建过程的参数。
SQ(标量量化)仅适用于
IVF_HNSW_SQ索引类型,这种量化方法用于减少索引的内存占用,它将浮点向量映射为整数向量,每个整数占用num_bits位,目前仅支持8位。- If
index_typeis “IVF_*”, then the following parameters are required: num_partitions
- If
index_typeis with “PQ”, then the following parameters are required: 子向量数量
IVF_PQ的可选参数:
- ivf_centroids
用于IVF聚类的现有K均值中心点。
- num_bits
PQ(乘积量化)的位数。默认为8。 仅支持4和8。
- Optional parameters for IVF_HNSW_*:
- max_level
Int,图中最大层级数。
- m
Int,图中每个节点的边数。
- ef_construction
Int,构建过程中需要检查的节点数量。
示例
import lance dataset = lance.dataset("/tmp/sift.lance") dataset.create_index( "vector", "IVF_PQ", num_partitions=256, num_sub_vectors=16 )import lance dataset = lance.dataset("/tmp/sift.lance") dataset.create_index( "vector", "IVF_HNSW_SQ", num_partitions=256, )实验性加速器(GPU)支持:
- accelerate: 使用GPU训练IVF分区。
目前仅支持CUDA(英伟达)或MPS(苹果)平台。 需要安装PyTorch。
import lance dataset = lance.dataset("/tmp/sift.lance") dataset.create_index( "vector", "IVF_PQ", num_partitions=256, num_sub_vectors=16, accelerator="cuda" )参考文献
-
create_scalar_index(列: str, index_type: 'BTREE' | 'BITMAP' | 'LABEL_LIST' | 'INVERTED' | 'FTS' | 'NGRAM', name: str | None =
None, *, 替换: bool =True, **kwargs)¶ 在列上创建标量索引。
标量索引与向量索引类似,可用于加速扫描。当扫描包含对已索引列的过滤表达式时,标量索引能显著提升查询速度。例如,若
my_col列建有标量索引,以下扫描操作将执行得更快:import lance dataset = lance.dataset("/tmp/images.lance") my_table = dataset.scanner(filter="my_col != 7").to_table()带有预过滤器的向量搜索也可以从标量索引中受益。例如,
import lance dataset = lance.dataset("/tmp/images.lance") my_table = dataset.scanner( nearest=dict( column="vector", q=[1, 2, 3, 4], k=10, ) filter="my_col != 7", prefilter=True )目前有5种标量索引类型可供选择。
BTREE。最常见的类型是BTREE。该索引的灵感来源于btree数据结构,尽管只有btree的前几层会被缓存在内存中。它将在具有大量唯一值且每个值对应行数较少的列上表现良好。BITMAP。该索引为列中的每个唯一值存储一个位图。这种索引适用于具有少量唯一值且每个值对应多行数据的列。LABEL_LIST. 一种特殊索引,用于对值基数较小的列表列进行索引。例如,包含标签列表的列(如["tag1", "tag2", "tag3"])可以使用LABEL_LIST索引。该索引只能加速带有array_has_any或array_has_all过滤器的查询。NGRAM. 一种用于索引字符串列的特殊索引。该索引会为字符串中的每个n元语法创建位图。默认情况下我们使用三元语法。当前该索引可以加速在过滤器中使用contains函数的查询。FTS/INVERTED. 该索引用于对文档列进行索引。这种索引可以进行全文搜索。例如,一个包含查询字符串"hello world"中任意单词的列。结果将按BM25算法排序。
请注意,可以使用环境变量
LANCE_BYPASS_SPILLING来绕过磁盘溢出。将其设置为true可以避免内存耗尽问题(更多信息请参阅https://github.com/apache/datafusion/issues/10073)。实验性API
- Parameters:
- column : str¶
要建立索引的列。必须是布尔型、整型、浮点型或字符串类型的列。
- index_type : str¶
索引的类型。可选值为
"BTREE","BITMAP","LABEL_LIST","NGRAM","FTS"或"INVERTED"。- name : str, optional¶
索引名称。如果未提供,将根据列名自动生成。
- replace : bool, default True¶
如果索引已存在,则替换现有索引。
- with_position : bool, default True
这是针对
INVERTED索引的。如果设为True,索引将存储文档中单词的位置信息,以便支持短语查询。这将显著增加索引大小。即使设置为True,也不会影响非短语查询的性能。- base_tokenizer : str, default "simple"
这是针对
INVERTED索引的配置。指定使用的基础分词器,可选值包括: * "simple":根据空白字符和标点符号进行分词。 * "whitespace":仅根据空白字符进行分词。 * "raw":不进行分词处理。- language : str, default "English"
这是针对
INVERTED索引的。用于词干提取和停用词处理的语言。仅当stem或remove_stop_words为true时使用- max_token_length : Optional[int], default 40
这是针对
INVERTED索引的设置。表示最大令牌长度。任何超过此长度的令牌将被移除。- lower_case : bool, default True
这是针对
INVERTED索引的。如果设为True,索引会将所有文本转换为小写。- stem : bool, default False
这是针对
INVERTED索引的。如果为True,索引将对词干进行提取。- remove_stop_words : bool, default False
这是针对
INVERTED索引的。如果为True,该索引将移除停用词。- ascii_folding : bool, default False
这是针对
INVERTED索引的。如果设为True,索引会尽可能将非ASCII字符转换为ASCII字符。例如会将带重音符号的字母如"é"转换为"e"。
示例
import lance dataset = lance.dataset("/tmp/images.lance") dataset.create_index( "category", "BTREE", )标量索引只能加速使用等值、比较、范围(例如
my_col BETWEEN 0 AND 100)和集合成员(例如my_col IN (0, 1, 2))等基础过滤条件的扫描当筛选条件包含多个索引列且这些条件通过AND或OR逻辑连接时,可以使用标量索引 (例如
my_col < 0 AND other_col> 100)如果过滤条件包含未建立索引的列,虽然可以使用标量索引,但根据过滤条件的结构,可能无法实际使用。例如,若列
not_indexed没有标量索引,那么过滤条件my_col = 0 OR not_indexed = 1将无法利用my_col上的任何标量索引。要判断扫描是否使用了标量索引,可以使用
explain_plan查看lancedb生成的查询计划。使用标量索引的查询将包含ScalarIndexQuery关系或MaterializeIndex操作符。
- property data_storage_version : str¶
该数据集所使用的数据存储格式版本
- delete(predicate: str | 表达式)¶
从数据集中删除行。
这会将行标记为已删除,但不会从文件中物理移除它们。这样可以保持现有索引仍然有效。
- Parameters:
- predicate : str or pa.compute.Expression¶
用于选择要删除行的谓词。可以是SQL字符串或pyarrow表达式。
示例
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example") >>> dataset.delete("a = 1 or b in ('a', 'b')") >>> dataset.to_table() pyarrow.Table a: int64 b: string ---- a: [[3]] b: [["c"]]
-
static drop(base_uri: str | Path, storage_options: dict[str, str] | None =
None) None¶
- drop_columns(columns: list[str])¶
从数据集中删除一列或多列
这是一个仅涉及元数据的操作,不会从底层存储中删除实际数据。如需删除数据,您必须随后调用
compact_files来重写不包含被删除列的数据,然后调用cleanup_old_versions来移除旧文件。- Parameters:
- columns : list of str¶
要删除的列名。这些可以是嵌套列引用(例如“a.b.c”)或顶级列名(例如“a”)。
示例
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example") >>> dataset.drop_columns(["a"]) >>> dataset.to_table().to_pandas() b 0 a 1 b 2 c
- drop_index(name: str)¶
从数据集中删除索引
注意:索引是通过"索引名称"删除的。这与字段名称不同。如果在创建索引时未指定名称,则会自动生成一个名称。您可以使用list_indices方法来获取索引的名称。
- get_fragment(fragment_id: int) LanceFragment | None¶
获取指定片段ID的片段。
-
get_fragments(filter: Expression | None =
None) list[LanceFragment]¶ 从数据集中获取所有片段。
注意:过滤器功能暂不支持。
- property has_index¶
- index_statistics(index_name: str) dict[str, Any]¶
-
insert(数据: ReaderLike, *, mode=
'append', **kwargs)¶ 将数据插入数据集。
- Parameters:
- data_obj : Reader-like
要写入的数据。可接受的类型包括: - Pandas DataFrame、Pyarrow Table、Dataset、Scanner 或 RecordBatchReader - Huggingface 数据集
- mode : str, default 'append'¶
- 写入数据时使用的模式。可选选项有:
create - 创建新数据集(如果uri已存在则会报错)。 overwrite - 创建新的快照版本 append - 创建新版本,该版本是输入数据与最新版本的合并(如果uri不存在则会报错)
- **kwargs : dict, optional¶
传递给
write_dataset()的其他关键字参数。
-
join(right_dataset, keys, right_keys=
None, join_type='left outer', left_suffix=None, right_suffix=None, coalesce_keys=True, use_threads=True)¶ 未实现(仅覆盖pyarrow数据集以防止段错误)
- property lance_schema : LanceSchema¶
该数据集的LanceSchema
- property latest_version : int¶
返回数据集的最新版本。
- property max_field_id : int¶
清单中的max_field_id
-
merge(data_obj: ReaderLike, left_on: str, right_on: str | None =
None, schema=None)¶ 将另一个数据集合并到当前数据集中。
执行左连接操作,其中数据集作为左表,data_obj作为右表。存在于数据集但不在左表中的行将被填充为null值,除非Lance不支持某些类型的null值,这种情况下会抛出错误。
- Parameters:
示例
>>> import lance >>> import pyarrow as pa >>> df = pa.table({'x': [1, 2, 3], 'y': ['a', 'b', 'c']}) >>> dataset = lance.write_dataset(df, "dataset") >>> dataset.to_table().to_pandas() x y 0 1 a 1 2 b 2 3 c >>> new_df = pa.table({'x': [1, 2, 3], 'z': ['d', 'e', 'f']}) >>> dataset.merge(new_df, 'x') >>> dataset.to_table().to_pandas() x y z 0 1 a d 1 2 b e 2 3 c f另请参阅
LanceDataset.add_columns通过逐批计算添加新列。
- merge_insert(开启: str | Iterable[str]) MergeInsertBuilder¶
返回一个构建器,可用于创建“合并插入”操作
该操作可以在单个事务中添加行、更新行和删除行。这是一个非常通用的工具,可用于实现诸如“如果不存在则插入”、“更新或插入(即upsert)”等行为,甚至可以用新数据替换部分现有数据(例如替换所有月份为“一月”的数据)。
合并插入操作通过使用连接将源表中的新数据与目标表中的现有数据相结合。记录分为三类。
"匹配"记录是指同时存在于源表和目标表中的记录。"未匹配"记录仅存在于源表中(例如这些是新数据)。"源未匹配"记录仅存在于目标表中(这是旧数据)。
此方法返回的构建器可用于自定义每种数据类别应执行的操作。
请注意,此操作会导致数据重新排序。这是因为更新的行会从数据集中删除,然后以新值重新插入到末尾。由于内部使用了哈希连接操作,新插入行的顺序可能会随机波动。
- Parameters:
- on : Union[str, Iterable[str]]¶
要连接的列(或多列)。这是源表和目标表中记录匹配的方式。通常这是某种键或ID列。
示例
使用when_matched_update_all()和when_not_matched_insert_all()来执行"upsert"操作。这将更新数据集中已存在的行,并插入不存在的行。
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [2, 1, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example") >>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]}) >>> # Perform a "upsert" operation >>> dataset.merge_insert("a") \ ... .when_matched_update_all() \ ... .when_not_matched_insert_all() \ ... .execute(new_table) {'num_inserted_rows': 1, 'num_updated_rows': 2, 'num_deleted_rows': 0} >>> dataset.to_table().sort_by("a").to_pandas() a b 0 1 b 1 2 x 2 3 y 3 4 z使用when_not_matched_insert_all()执行"不存在则插入"操作。这只会插入数据集中尚不存在的行。
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example2") >>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]}) >>> # Perform an "insert if not exists" operation >>> dataset.merge_insert("a") \ ... .when_not_matched_insert_all() \ ... .execute(new_table) {'num_inserted_rows': 1, 'num_updated_rows': 0, 'num_deleted_rows': 0} >>> dataset.to_table().sort_by("a").to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 z您不需要提供所有列。如果只想更新部分列,可以省略不想更新的列。被省略的列在更新时会保留现有值,如果是插入操作则会设为null。
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"], \ ... "c": ["x", "y", "z"]}) >>> dataset = lance.write_dataset(table, "example3") >>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]}) >>> # Perform an "upsert" operation, only updating column "a" >>> dataset.merge_insert("a") \ ... .when_matched_update_all() \ ... .when_not_matched_insert_all() \ ... .execute(new_table) {'num_inserted_rows': 1, 'num_updated_rows': 2, 'num_deleted_rows': 0} >>> dataset.to_table().sort_by("a").to_pandas() a b c 0 1 a x 1 2 x y 2 3 y z 3 4 z None
- migrate_manifest_paths_v2()¶
将清单路径迁移到新格式。
这将更新清单以使用新的v2格式路径。
该函数具有幂等性,可以多次运行而不会改变对象存储的状态。
警告:当其他并发操作正在进行时,不应运行此操作。 并且该操作应在完成后再恢复其他操作。
- property optimize : DatasetOptimizer¶
- property partition_expression¶
未实现(仅覆盖pyarrow数据集以防止段错误)
- prewarm_index(name: str)¶
预热索引
这将把整个索引加载到内存中。这有助于避免索引查询时的冷启动问题。如果索引无法放入索引缓存中,则会导致I/O浪费。
- Parameters:
- name : str¶
需要预热的索引名称。
- replace_field_metadata(field_name: str, new_metadata: dict[str, str])¶
替换模式中某个字段的元数据
- replace_schema(schema: 模式/架构)¶
未实现(仅覆盖pyarrow数据集以防止段错误)
参见 :py:method:`replace_schema_metadata` 或 :py:method:`replace_field_metadata`
- replace_schema_metadata(new_metadata: dict[str, str])¶
替换数据集的结构元数据
- Parameters:
- new_metadata : dict¶
要设置的新元数据
- restore()¶
将当前检出的版本恢复为数据集的最新版本。
这将创建一个新的提交。
-
sample(num_rows: int, columns: list[str] | dict[str, str] | None =
None, randomize_order: bool =True, **kwargs) 表格¶ 随机选取数据样本
-
scanner(columns: list[str] | dict[str, str] | None =
None, filter: 表达式 | str | None =None, limit: int | None =None, offset: int | None =None, nearest: dict | None =None, batch_size: int | None =None, batch_readahead: int | None =None, fragment_readahead: int | None =None, scan_in_order: bool | None =None, 片段: Iterable[LanceFragment] | None =None, full_text_query: str | dict | FullTextQuery | None =None, *, prefilter: bool | None =None, with_row_id: bool | None =None, with_row_address: bool | None =None, use_stats: bool | None =None, fast_search: bool | None =None, io_buffer_size: int | None =None, late_materialization: bool | list[str] | None =None, use_scalar_index: bool | None =None, include_deleted_rows: bool | None =None, scan_stats_callback: Callable[[ScanStatistics], None] | None =None, strict_batch_size: bool | None =None) LanceScanner¶ 返回一个支持各种下推操作的Scanner。
- Parameters:
- columns : list of str, or dict of str to str default None¶
要获取的列名列表。 或者列名到SQL表达式的字典。 如果为None或未指定,则获取所有列。
- filter : pa.compute.Expression or str¶
表达式或字符串,必须是一个有效的SQL where子句。有关有效的SQL表达式,请参阅 Lance filter pushdown 。
- limit : int, default None¶
最多获取这么多行。如果为None或未指定,则获取所有行。
- offset : int, default None¶
从这一行开始获取。如果未指定则为0。
- nearest : dict, default None¶
获取与K个最相似向量对应的行。示例:
{ "column": <embedding col name>, "q": <query vector as pa.Float32Array>, "k": 10, "minimum_nprobes": 20, "maximum_nprobes": 50, "refine_factor": 1 }- batch_size : int, default None¶
返回批次的目标大小。在某些情况下,批次大小可能达到此值的两倍(但绝不会超过)。而在其他情况下,批次大小可能小于此值。
- io_buffer_size : int, default None¶
IO缓冲区的大小。有关更多信息,请参阅
ScannerBuilder.io_buffer_size。- batch_readahead : int, optional¶
预读取的批次数量。
- fragment_readahead : int, optional¶
预读取的片段数量。
- scan_in_order : bool, default True¶
是否按顺序读取片段和批次。如果为false,吞吐量可能会更高,但批次将无序返回,内存使用可能会增加。
- fragments : iterable of LanceFragment, default None¶
如果指定,则仅扫描这些片段。如果scan_in_order为True,那么片段将按照给定的顺序进行扫描。
- prefilter : bool, default False¶
如果为True,则在运行向量查询之前应用过滤器。 这将生成更准确的结果,但查询成本可能更高。 通常在过滤器具有高选择性时效果良好。
如果为False,则在运行向量查询之后应用过滤器。 这样性能会更好,但如果最接近查询的行不匹配过滤器, 结果可能会少于请求的行数(或为空)。 通常在过滤器选择性不高时效果良好。
- use_scalar_index : bool, default True¶
Lance会自动使用标量索引来优化查询。在某些极端情况下,这可能会使查询性能变差,此时可以通过该参数禁用标量索引。
- late_materialization : bool or List[str], default None¶
允许自定义控制延迟物化。延迟物化会在过滤操作后通过take操作获取非查询列,这在结果较少或列数据量非常大时非常有用。
当结果较多或列数据非常窄时,早期物化可能更优。
如果设为True,则所有列都采用延迟物化; 如果设为False,则所有列都采用早期物化; 如果是字符串列表,则只有列表中的列会延迟物化。
默认采用启发式算法,假设过滤器会筛选出约0.1%的行。如果您的过滤器选择性更强(例如按id查找),可能需要设为True;如果过滤器选择性较弱(例如匹配20%的行),可能需要设为False。
- full_text_query : str or dict, optional¶
要搜索的查询字符串,结果将按BM25算法排序。 例如"hello world"会匹配包含"hello"或"world"的文档。 或者使用包含以下键的字典:
- columns: list[str]
要搜索的列, 目前columns列表中仅支持单个列。
- query: str
要搜索的查询字符串。
- fast_search : bool, default False¶
如果为True,则搜索将仅在索引数据上执行,这样可以缩短搜索时间。
- scan_stats_callback : Callable[[ScanStatistics], None], default None¶
一个回调函数,在扫描完成后将调用该函数并传入扫描统计信息。回调函数引发的错误将被记录但不会重新抛出。
- include_deleted_rows : bool, default False¶
如果为True,则已被删除但仍存在于片段中的行将被返回。这些行的_rowid列将被设为null。所有其他列将反映磁盘上存储的值,可能不为null。
注意:如果是搜索操作或take操作(包括标量索引扫描),则无法返回已删除的行。
注意
目前,如果同时指定了filter和nearest,那么:
nearest 会优先执行。
结果会在之后进行过滤。
为了调试近似最近邻(ANN)结果,您可以选择即使存在索引也不使用它,只需指定
use_index=False。例如,以下代码将始终返回精确的KNN结果:dataset.to_table(nearest={ "column": "vector", "k": 10, "q": <query vector>, "use_index": False }
- session() _Session¶
返回数据集会话,该会话保存了数据集的状态。
- property stats : LanceStats¶
实验性API
- property tags : 标签¶
数据集标签管理。
与Git类似,标签是一种向数据集特定版本添加元数据的方式。
示例
ds = lance.open("dataset.lance") ds.tags.create("v2-prod-20250203", 10) tags = ds.tags.list()
-
take(索引: list[int] | 数组, columns: list[str] | dict[str, str] | None =
None) 表格¶ 按索引选择数据行。
- Parameters:
- Returns:
表格
- Return type:
-
take_blobs(blob_column: str, ids: list[int] | 数组 | None =
None, 地址: list[int] | 数组 | None =None, 索引: list[int] | 数组 | None =None) list[BlobFile]¶ 按行ID选择二进制大对象。
无需在处理前将大型二进制blob数据加载到内存中,该API允许您将二进制blob数据作为常规的Python类文件对象打开。更多详情请参阅
lance.BlobFile。必须且只能指定ids、addresses或indices中的一个参数。 :param blob_column: 要选择的blob列名称。 :type blob_column: str :param ids: 数据集中要选择的行ID。 :type ids: Integer Array或类似数组 :param addresses: 数据集中要选择行的(不稳定)内存地址。 :type addresses: Integer Array或类似数组 :param indices: 数据集中行的偏移量/索引。 :type indices: Integer Array或类似数组
- Returns:
blob_files
- Return type:
列表[BlobFile]
-
to_batches(columns: list[str] | dict[str, str] | None =
None, filter: 表达式 | str | None =None, limit: int | None =None, offset: int | None =None, nearest: dict | None =None, batch_size: int | None =None, batch_readahead: int | None =None, fragment_readahead: int | None =None, scan_in_order: bool | None =None, *, prefilter: bool | None =None, with_row_id: bool | None =None, with_row_address: bool | None =None, use_stats: bool | None =None, full_text_query: str | dict | None =None, io_buffer_size: int | None =None, late_materialization: bool | list[str] | None =None, use_scalar_index: bool | None =None, strict_batch_size: bool | None =None, **kwargs) Iterator[RecordBatch]¶ 将数据集读取为物化的记录批次。
- Parameters:
- Returns:
record_batches
- Return type:
RecordBatch的迭代器
-
to_table(columns: list[str] | dict[str, str] | None =
None, filter: 表达式 | str | None =None, limit: int | None =None, offset: int | None =None, nearest: dict | None =None, batch_size: int | None =None, batch_readahead: int | None =None, fragment_readahead: int | None =None, scan_in_order: bool | None =None, *, prefilter: bool | None =None, with_row_id: bool | None =None, with_row_address: bool | None =None, use_stats: bool | None =None, fast_search: bool | None =None, full_text_query: str | dict | FullTextQuery | None =None, io_buffer_size: int | None =None, late_materialization: bool | list[str] | None =None, use_scalar_index: bool | None =None, include_deleted_rows: bool | None =None) 表格¶ 将数据作为
pyarrow.Table读入内存- Parameters:
- columns : list of str, or dict of str to str default None¶
要获取的列名列表。 或者列名到SQL表达式的字典。 如果为None或未指定,则获取所有列。
- filter : pa.compute.Expression or str¶
表达式或字符串,必须是一个有效的SQL where子句。有关有效的SQL表达式,请参阅 Lance filter pushdown 。
- limit : int, default None¶
最多获取这么多行。如果为None或未指定,则获取所有行。
- offset : int, default None¶
从这一行开始获取。如果未指定则为0。
- nearest : dict, default None¶
获取与K个最相似向量对应的行。示例:
{ "column": <embedding col name>, "q": <query vector as pa.Float32Array>, "k": 10, "metric": "cosine", "minimum_nprobes": 20, "maximum_nprobes": 50, "refine_factor": 1 }- batch_size : int, optional¶
每次读取的行数。
- io_buffer_size : int, default None¶
IO缓冲区的大小。有关更多信息,请参阅
ScannerBuilder.io_buffer_size。- batch_readahead : int, optional¶
预读取的批次数量。
- fragment_readahead : int, optional¶
预读取的片段数量。
- scan_in_order : bool, optional, default True¶
是否按顺序读取片段和批次。如果为false,吞吐量可能会更高,但批次将无序返回,内存使用可能会增加。
- prefilter : bool, optional, default False¶
在向量搜索之前运行过滤器。
- late_materialization : bool or List[str], default None¶
允许自定义控制延迟物化。更多信息请参阅
ScannerBuilder.late_materialization。- use_scalar_index : bool, default True¶
允许自定义控制标量索引的使用。更多信息请参见
ScannerBuilder.use_scalar_index。- with_row_id : bool, optional, default False¶
返回行ID。
- with_row_address : bool, optional, default False¶
返回行地址
- use_stats : bool, optional, default True¶
在过滤过程中使用统计下推。
- fast_search : bool, optional, default False¶
- full_text_query : str or dict, optional¶
用于搜索的查询字符串,结果将按BM25算法排序。 例如:"hello world"会匹配包含"hello"或"world"的文档。 或者使用包含以下键的字典:
- columns: list[str]
要搜索的列, 目前仅支持在列列表中指定单个列。
- query: str
要搜索的查询字符串。
- include_deleted_rows : bool, optional, default False¶
如果为True,则已被删除但仍存在于片段中的行将被返回。这些行的_rowid列将被设为null。所有其他列将反映磁盘上存储的值,可能不为null。
注意:如果是搜索操作或take操作(包括标量索引扫描),则无法返回已删除的行。
笔记
如果同时指定了filter和nearest,那么:
nearest 会优先执行。
除非将pre-filter设置为True,否则结果会在之后进行过滤。
-
update(更新: dict[str, str], where: str | None =
None) UpdateResult¶ 更新符合where条件的行的列值。
- Parameters:
- Returns:
updates – 包含更新行数的字典。
- Return type:
字典
示例
>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}) >>> dataset = lance.write_dataset(table, "example") >>> update_stats = dataset.update(dict(a = 'a + 2'), where="b != 'a'") >>> update_stats["num_updated_rows"] = 2 >>> dataset.to_table().to_pandas() a b 0 1 a 1 4 b 2 5 c
- property uri : str¶
数据的位置
- validate()¶
验证数据集。
此操作会检查数据集的完整性,如果数据集已损坏则会抛出异常。
- property version : int¶
返回当前检出的数据集版本
- versions()¶
返回此数据集中的所有版本。
-
add_columns(转换: dict[str, str] | BatchUDF | ReaderLike | pyarrow.Field | list[pyarrow.Field] | pyarrow.Schema, read_columns: list[str] | None =
- class lance.dataset.LanceOperation¶
- class Append(片段: Iterable[FragmentMetadata])¶
向数据集追加新行。
- fragments¶
包含新行的片段。
- Type:
列表[FragmentMetadata]
警告
这是一个用于分布式操作的高级API。若要在单台机器上向数据集追加数据,请使用
lance.write_dataset()。示例
要向数据集追加新行,首先使用
lance.fragment.LanceFragment.create()创建片段。然后 将片段元数据收集到列表中并传递给此类。 最后,将该操作传递给LanceDataset.commit()方法来创建新数据集。>>> import lance >>> import pyarrow as pa >>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]}) >>> dataset = lance.write_dataset(tab1, "example") >>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]}) >>> fragment = lance.fragment.LanceFragment.create("example", tab2) >>> operation = lance.LanceOperation.Append([fragment]) >>> dataset = lance.LanceDataset.commit("example", operation, ... read_version=dataset.version) >>> dataset.to_table().to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 d- fragments : Iterable[FragmentMetadata]¶
- class BaseOperation¶
可应用于数据集的操作基类。
请参阅
LanceOperation下的可用操作。
- class CreateIndex(uuid: str, name: str, 字段: list[int], dataset_version: int, fragment_ids: set[int], index_version: int)¶
在数据集上创建索引的操作。
- dataset_version : int¶
- fields : List[int]¶
- fragment_ids : Set[int]¶
- index_version : int¶
- name : str¶
- uuid : str¶
- class DataReplacement(replacements: list[DataReplacementGroup])¶
该操作用于替换数据集中的现有数据文件。
- replacements : List[DataReplacementGroup]¶
- class DataReplacementGroup(fragment_id: int, new_file: 数据文件)¶
数据替换组
- fragment_id : int¶
- class Delete(updated_fragments: Iterable[FragmentMetadata], deleted_fragment_ids: Iterable[int], predicate: str)¶
从数据集中移除片段或行。
- updated_fragments¶
已更新为包含新删除向量的片段。
- Type:
列表[FragmentMetadata]
- deleted_fragment_ids¶
已完全删除的片段ID。这些是
LanceFragment.delete()返回None的片段。- Type:
整数列表
- predicate¶
用于选择要删除行的原始SQL谓词。
- Type:
字符串
警告
这是一个用于分布式操作的高级API。若要在单台机器上从数据集中删除行,请使用
lance.LanceDataset.delete()。示例
要从数据集中删除行,请在每个片段上调用
lance.fragment.LanceFragment.delete()。如果返回一个新片段,则将其添加到updated_fragments列表中。如果返回None,则表示整个片段已被删除,因此将该片段ID添加到deleted_fragment_ids中。最后,将该操作传递给LanceDataset.commit()方法以完成删除操作。>>> import lance >>> import pyarrow as pa >>> table = pa.table({"a": [1, 2], "b": ["a", "b"]}) >>> dataset = lance.write_dataset(table, "example") >>> table = pa.table({"a": [3, 4], "b": ["c", "d"]}) >>> dataset = lance.write_dataset(table, "example", mode="append") >>> dataset.to_table().to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 d >>> predicate = "a >= 2" >>> updated_fragments = [] >>> deleted_fragment_ids = [] >>> for fragment in dataset.get_fragments(): ... new_fragment = fragment.delete(predicate) ... if new_fragment is not None: ... updated_fragments.append(new_fragment) ... else: ... deleted_fragment_ids.append(fragment.fragment_id) >>> operation = lance.LanceOperation.Delete(updated_fragments, ... deleted_fragment_ids, ... predicate) >>> dataset = lance.LanceDataset.commit("example", operation, ... read_version=dataset.version) >>> dataset.to_table().to_pandas() a b 0 1 a- deleted_fragment_ids : Iterable[int]¶
- predicate : str¶
- updated_fragments : Iterable[FragmentMetadata]¶
- class Merge(片段: Iterable[FragmentMetadata], schema: LanceSchema | 模式/架构)¶
添加列的操作。与覆盖不同,此操作不应改变片段的结构,从而保留现有的索引。
- fragments¶
构成新数据集的片段。
- Type:
FragmentMetadata的可迭代对象
- schema¶
新数据集的模式。建议传递LanceSchema,传递pyarrow.Schema已弃用。
- Type:
LanceSchema 或 pyarrow.Schema
警告
这是一个用于分布式操作的高级API。若要在单机上覆盖或创建新数据集,请使用
lance.write_dataset()。示例
要向数据集添加新列,首先定义一个方法,该方法将基于现有列创建新列。然后使用
lance.fragment.LanceFragment.add_columns()>>> import lance >>> import pyarrow as pa >>> import pyarrow.compute as pc >>> table = pa.table({"a": [1, 2, 3, 4], "b": ["a", "b", "c", "d"]}) >>> dataset = lance.write_dataset(table, "example") >>> dataset.to_table().to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 d >>> def double_a(batch: pa.RecordBatch) -> pa.RecordBatch: ... doubled = pc.multiply(batch["a"], 2) ... return pa.record_batch([doubled], ["a_doubled"]) >>> fragments = [] >>> for fragment in dataset.get_fragments(): ... new_fragment, new_schema = fragment.merge_columns(double_a, ... columns=['a']) ... fragments.append(new_fragment) >>> operation = lance.LanceOperation.Merge(fragments, new_schema) >>> dataset = lance.LanceDataset.commit("example", operation, ... read_version=dataset.version) >>> dataset.to_table().to_pandas() a b a_doubled 0 1 a 2 1 2 b 4 2 3 c 6 3 4 d 8- fragments : Iterable[FragmentMetadata]¶
- class Overwrite(new_schema: LanceSchema | 模式/架构, 片段: Iterable[FragmentMetadata])¶
覆盖或创建新的数据集。
- new_schema¶
新数据集的模式结构。
- Type:
- fragments¶
构成新数据集的片段。
- Type:
列表[FragmentMetadata]
警告
这是一个用于分布式操作的高级API。若要在单机上覆盖或创建新数据集,请使用
lance.write_dataset()。示例
要创建或覆盖数据集,首先使用
lance.fragment.LanceFragment.create()创建片段。然后将这些片段的元数据收集到列表中,并与模式一起传递给这个类。最后,将该操作传递给LanceDataset.commit()方法来创建新数据集。>>> import lance >>> import pyarrow as pa >>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]}) >>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]}) >>> fragment1 = lance.fragment.LanceFragment.create("example", tab1) >>> fragment2 = lance.fragment.LanceFragment.create("example", tab2) >>> fragments = [fragment1, fragment2] >>> operation = lance.LanceOperation.Overwrite(tab1.schema, fragments) >>> dataset = lance.LanceDataset.commit("example", operation) >>> dataset.to_table().to_pandas() a b 0 1 a 1 2 b 2 3 c 3 4 d- fragments : Iterable[FragmentMetadata]¶
- class Project(schema: LanceSchema)¶
用于投影列的操作。 可使用此运算符删除列或重命名/交换列。
- schema¶
新数据集的lance模式。
- Type:
LanceSchema
示例
使用投影运算符交换列:
>>> import lance >>> import pyarrow as pa >>> import pyarrow.compute as pc >>> from lance.schema import LanceSchema >>> table = pa.table({"a": [1, 2], "b": ["a", "b"], "b1": ["c", "d"]}) >>> dataset = lance.write_dataset(table, "example") >>> dataset.to_table().to_pandas() a b b1 0 1 a c 1 2 b d >>> >>> ## rename column `b` into `b0` and rename b1 into `b` >>> table = pa.table({"a": [3, 4], "b0": ["a", "b"], "b": ["c", "d"]}) >>> lance_schema = LanceSchema.from_pyarrow(table.schema) >>> operation = lance.LanceOperation.Project(lance_schema) >>> dataset = lance.LanceDataset.commit("example", operation, read_version=1) >>> dataset.to_table().to_pandas() a b0 b 0 1 a c 1 2 b d- schema : LanceSchema¶
- class Rewrite(groups: Iterable[RewriteGroup], rewritten_indices: Iterable[RewrittenIndex])¶
将操作重写为一个或多个文件和索引合并为一个或多个文件和索引。
- groups¶
已被重写的文件组。
- Type:
列表[RewriteGroup]
- rewritten_indices¶
已被重写的索引。
- Type:
列表[RewrittenIndex]
警告
这是一个高级API,不适用于一般用途。
- groups : Iterable[RewriteGroup]¶
- rewritten_indices : Iterable[RewrittenIndex]¶
- class RewriteGroup(old_fragments: Iterable[FragmentMetadata], new_fragments: Iterable[FragmentMetadata])¶
重写文件的集合
- new_fragments : Iterable[FragmentMetadata]¶
- old_fragments : Iterable[FragmentMetadata]¶
- class Update(removed_fragment_ids: list[int], updated_fragments: list[FragmentMetadata], new_fragments: list[FragmentMetadata], fields_modified: list[int])¶
更新数据集中行的操作。
- removed_fragment_ids¶
已完全移除的片段ID。
- Type:
整数列表
- updated_fragments¶
已更新为包含新删除向量的片段。
- Type:
列表[FragmentMetadata]
- new_fragments¶
包含新行的片段。
- Type:
列表[FragmentMetadata]
- fields_modified¶
如果在updated_fragments中有任何字段被修改,那么必须在此列出这些字段,以便从覆盖这些字段的索引中移除这些片段。
- Type:
整数列表
- fields_modified : List[int]¶
- new_fragments : List[FragmentMetadata]¶
- removed_fragment_ids : List[int]¶
- updated_fragments : List[FragmentMetadata]¶
- class lance.dataset.LanceScanner(scanner: _Scanner, dataset: LanceDataset)¶
- analyze_plan() str¶
执行此扫描器的计划并显示运行时指标。
- Parameters:
- verbose : bool, default False
使用详细输出格式。
- Returns:
计划
- Return type:
字符串
- count_rows()¶
统计符合扫描器筛选条件的行数。
- Returns:
计数
- Return type:
int
-
explain_plan(verbose=
False) str¶ 返回此扫描器的执行计划。
- Parameters:
- verbose : bool, default False¶
使用详细输出格式。
- Returns:
计划
- Return type:
字符串
- scan_batches()¶
以记录批次形式消费Scanner,并附带对应的片段。
- Returns:
record_batches
- Return type:
TaggedRecordBatch的迭代器
- class lance.dataset.LanceStats(dataset: _Dataset)¶
关于LanceDataset的统计信息。
- data_stats() DataStatistics¶
关于数据集中数据的统计信息。
-
dataset_stats(max_rows_per_group: int =
1024) DatasetStats¶ 关于数据集的相关统计信息。
- index_stats(index_name: str) dict[str, Any]¶
关于索引的统计信息。
- Parameters:
- index_name : str¶
要获取统计信息的索引名称。
- class lance.dataset.MergeInsertBuilder(dataset, 开启)¶
- conflict_retries(max_retries: int) MergeInsertBuilder¶
设置操作在出现争用时的重试次数。
如果该值设置为大于0,则操作将保留输入数据的副本(根据数据大小存储在内存或磁盘上),并在出现争用时重试操作。
默认值为10。
-
execute(data_obj: ReaderLike, *, schema: pa.Schema | None =
None)¶ 执行合并插入操作
此函数会更新原始数据集,并返回一个包含合并统计信息的字典——例如插入、更新和删除的行数。
- Parameters:
- data_obj : ReaderLike¶
作为操作源表使用的新数据。该参数可以是任何数据源(例如表/数据集),只要
write_dataset()能够接受即可。- schema : Optional[pa.Schema]¶
数据的模式。仅当数据源是某种生成器时才需要提供此信息。
-
execute_uncommitted(data_obj: ReaderLike, *, schema: pa.Schema | None =
None) tuple[事务, dict[str, Any]]¶ 执行合并插入操作但不提交
此函数会更新原始数据集,并返回一个包含合并统计信息的字典——例如插入、更新和删除的行数。
- Parameters:
- data_obj : ReaderLike¶
作为操作源表使用的新数据。该参数可以是任何数据源(例如表/数据集),只要
write_dataset()能够接受即可。- schema : Optional[pa.Schema]¶
数据的模式。仅当数据源是某种生成器时才需要提供此信息。
- retry_timeout(timeout: timedelta) MergeInsertBuilder¶
设置用于限制重试的超时时间。
这是放弃操作前允许的最大耗时。无论完成需要多长时间,至少会进行一次尝试。一旦达到此超时时间,后续尝试将被取消。如果在第一次尝试期间已达到超时,操作将在进行第二次尝试前立即取消。
默认值为30秒。
-
when_matched_update_all(condition: str | None =
None) MergeInsertBuilder¶ 配置更新匹配行的操作
调用此方法后,当执行合并插入操作时,任何同时匹配源表和目标表的行将被更新。目标表中的行将被移除,而源表中的行将被添加。
可以指定一个可选条件。这应该是一个SQL过滤器,如果存在,则只有同时满足此过滤器的匹配行才会被更新。SQL过滤器应使用前缀target.来引用目标表中的列,使用前缀source.来引用源表中的列。例如,source.last_update < target.last_update。
如果指定了条件且行不满足该条件,则这些行将不会被更新。不满足筛选条件不会导致"匹配"行变为"不匹配"行。
-
when_not_matched_by_source_delete(expr: str | None =
None) MergeInsertBuilder¶ 配置操作以删除不匹配的源行
调用此方法后,当执行合并插入操作时,仅存在于目标表中的行将被删除。可以指定一个可选过滤器来限制删除操作的范围。如果提供了过滤器(作为SQL过滤器),则只有匹配过滤器的行会被删除。
- when_not_matched_insert_all() MergeInsertBuilder¶
配置操作以插入不匹配的行
调用此方法后,当执行合并插入操作时,仅存在于源表中的所有行将被插入到目标表中。
- class lance.dataset.ScannerBuilder(ds: LanceDataset)¶
- apply_defaults(default_opts: dict[str, Any]) ScannerBuilder¶
-
batch_readahead(nbatches: int | None =
None) ScannerBuilder¶ 读取v2文件时忽略此参数
- batch_size(batch_size: int) ScannerBuilder¶
设置扫描器的批量大小
-
columns(cols: list[str] | dict[str, str] | None =
None) ScannerBuilder¶
- fast_search(flag: bool) ScannerBuilder¶
启用快速搜索,仅对已索引的数据执行搜索。
用户可以使用Table::optimize()或create_index()将新数据包含到索引中,从而使新数据可被搜索。
- filter(filter: str | 表达式) ScannerBuilder¶
-
fragment_readahead(nfragments: int | None =
None) ScannerBuilder¶
-
full_text_search(查询: str | FullTextQuery, columns: list[str] | None =
None) ScannerBuilder¶ 通过全文搜索筛选行。实验性API,在我们支持在类似SQL的filter表达式中实现此功能后,可能会移除该API
在搜索之前必须在给定列上创建倒排索引,
- include_deleted_rows(flag: bool) ScannerBuilder¶
包含已删除的行
已被删除但仍存在于片段中的行将被返回。这些行的所有列(除_rowaddr外)都将设置为null
- io_buffer_size(io_buffer_size: int) ScannerBuilder¶
设置扫描器的I/O缓冲区大小
这是为处理来自存储的I/O数据而预留的RAM容量。它用于控制扫描器使用的内存量。如果缓冲区已满,扫描器将阻塞,直到缓冲区被处理完毕。
通常这个值应该与并发I/O线程的数量成比例。默认值为2GiB,这个大小可以轻松为32到256个并发I/O线程提供足够的空间。
这个值并不是扫描器使用内存的硬性上限。部分内存空间用于计算(可通过批量大小控制),且Lance在内存返回给用户后不会继续追踪其使用情况。
目前,如果存在单个数据批次大于IO缓冲区大小的情况,扫描器将会发生死锁。这是一个已知问题,将在未来的版本中修复。
该参数仅在读取v2文件时使用
- late_materialization(late_materialization: bool | list[str]) ScannerBuilder¶
-
limit(n: int | None =
None) ScannerBuilder¶
-
nearest(列: str, q: QueryVectorLike, k: int | None =
None, metric: str | None =None, nprobes: int | None =None, minimum_nprobes: int | None =None, maximum_nprobes: int | None =None, refine_factor: int | None =None, use_index: bool =True, ef: int | None =None) ScannerBuilder¶
-
offset(n: int | None =
None) ScannerBuilder¶
- prefilter(prefilter: bool) ScannerBuilder¶
-
scan_in_order(scan_in_order: bool =
True) ScannerBuilder¶ 是否按片段和批次的顺序扫描数据集。
如果设置为False,扫描器可能会并发读取片段并乱序返回批次。这可以提高性能,因为允许扫描过程中更多的并发操作,但也可能占用更多内存。
使用v2文件格式时,此参数将被忽略。在v2文件格式中,按顺序扫描不会产生任何性能损耗,因此所有扫描操作都会按顺序执行。
- scan_stats_callback(callback: Callable[[ScanStatistics], None]) ScannerBuilder¶
设置一个回调函数,该函数将在扫描完成后被调用并传入扫描统计信息。回调函数引发的错误将被记录但不会重新抛出。
-
strict_batch_size(strict_batch_size: bool =
False) ScannerBuilder¶ 如果为True,则除最后一批外的所有批次都将严格包含batch_size行记录。 默认为false。 若启用此选项,则小批次数据需要合并处理,这将导致数据复制并产生(通常非常小的)性能损耗。
- to_scanner() LanceScanner¶
-
use_scalar_index(use_scalar_index: bool =
True) ScannerBuilder¶ 设置查询时是否应使用标量索引
当存在标量索引时,扫描将利用它们来优化带过滤条件的查询。但在某些边缘情况下,标量索引可能导致性能下降。该参数允许用户在此类情况下禁用标量索引。
-
use_stats(use_stats: bool =
True) ScannerBuilder¶ 启用统计信息用于查询规划。
禁用统计功能通常用于调试和基准测试场景。 在正常使用情况下应保持开启。
- with_fragments(片段: Iterable[LanceFragment] | None) ScannerBuilder¶
-
with_row_address(with_row_address: bool =
True) ScannerBuilder¶ 支持返回带有行地址的结果。
行地址是数据集中每行记录的唯一但不稳定的标识符,它由片段ID(高32位)和片段内的行偏移量(低32位)组成。通常更推荐使用行ID,因为当行记录被修改或压缩时,行ID不会发生变化。不过,在某些高级使用场景中,行地址可能仍然有用。
-
with_row_id(with_row_id: bool =
True) ScannerBuilder¶ 启用返回行ID的功能。
- class lance.dataset.Transaction(读取版本: 'int', 操作: 'LanceOperation.BaseOperation', uuid: 'str' = <factory>, blobs_op: 'Optional[LanceOperation.BaseOperation]' = None)¶
-
blobs_op : BaseOperation | None =
None¶
- operation : BaseOperation¶
- read_version : int¶
- uuid : str¶
-
blobs_op : BaseOperation | None =
- class lance.dataset.VectorIndexReader(dataset: LanceDataset, index_name: str)¶
该类允许您初始化特定向量索引的读取器,获取分区数量,访问索引的质心,并读取索引的特定分区。
- Parameters:
- dataset : LanceDataset¶
包含索引的数据集。
- index_name : str¶
要读取的向量索引名称。
示例
import lance from lance.dataset import VectorIndexReader import numpy as np import pyarrow as pa vectors = np.random.rand(256, 2) data = pa.table({"vector": pa.array(vectors.tolist(), type=pa.list_(pa.float32(), 2))}) dataset = lance.write_dataset(data, "/tmp/index_reader_demo") dataset.create_index("vector", index_type="IVF_PQ", num_partitions=4, num_sub_vectors=2) reader = VectorIndexReader(dataset, "vector_idx") assert reader.num_partitions() == 4 partition = reader.read_partition(0) assert "_rowid" in partition.column_names异常¶
- ValueError
如果指定的索引不是向量索引。
- num_partitions() int¶
返回数据集中的分区数量。
- Returns:
分区数量。
- Return type:
int
-
read_partition(partition_id: int, *, with_vector: bool =
False) 表格¶ 返回给定IVF分区的pyarrow表
-
lance.dataset.write_dataset(data_obj: ReaderLike, uri: str | Path | LanceDataset, schema: pa.Schema | None =
None, mode: str ='create', *, max_rows_per_file: int =1048576, max_rows_per_group: int =1024, max_bytes_per_file: int =96636764160, commit_lock: CommitLock | None =None, 进度: FragmentWriteProgress | None =None, storage_options: dict[str, str] | None =None, data_storage_version: str | None =None, use_legacy_format: bool | None =None, enable_v2_manifest_paths: bool =False, enable_move_stable_row_ids: bool =False, auto_cleanup_options: AutoCleanupConfig | None =None) LanceDataset¶ 将给定的data_obj写入指定的uri
- Parameters:
- data_obj : Reader-like¶
要写入的数据。可接受的类型包括: - Pandas DataFrame、Pyarrow Table、Dataset、Scanner 或 RecordBatchReader - Huggingface 数据集
- uri : str, Path, or LanceDataset¶
数据集写入的目标位置(目录)。如果传入的是LanceDataset,则会复用现有会话。
- schema : Schema, optional¶
如果指定且输入为pandas DataFrame,则使用此模式替代默认的pandas到arrow表的转换。
- mode : str¶
create - 创建新数据集(如果uri已存在则会报错)。 overwrite - 创建新的快照版本 append - 创建新版本,该版本是输入数据与最新版本的合并(如果uri不存在则会报错)
- max_rows_per_file : int, default 1024 * 1024¶
在开始新文件之前要写入的最大行数
- max_rows_per_group : int, default 1024¶
在同一个文件中开始新分组前的最大行数
- max_bytes_per_file : int, default 90 * 1024 * 1024 * 1024¶
在开始新文件之前要写入的最大字节数。这是一个软性限制。该限制在每组数据写入后进行检查,这意味着较大的组可能会导致此限制被显著超出。默认值为90 GB,因为我们在对象存储上对每个文件有100 GB的硬性限制。
- commit_lock : CommitLock, optional¶
自定义提交锁。仅当您的对象存储不支持原子提交时才需要。详情请参阅用户指南。
- progress : FragmentWriteProgress, optional¶
实验性API。用于跟踪片段写入进度的功能。传入一个自定义类,该类需定义在开始写入每个片段和完成写入时调用的钩子函数。
- storage_options : optional, dict¶
针对特定存储连接的额外选项。这用于存储连接参数,如凭证、端点等。
- data_storage_version : optional, str, default None¶
要使用的数据存储格式版本。较新的版本效率更高,但需要较新版本的lance才能读取。默认值(None)将使用最新的稳定版本。更多详情请参阅用户指南。
- use_legacy_format : optional, bool, default None¶
已弃用的设置数据存储版本的方法。请改用data_storage_version参数。
- enable_v2_manifest_paths : bool, optional¶
如果为True,且这是一个新数据集,则使用新的V2清单路径。 这些路径能更高效地打开存储在对象存储上具有多个版本的数据集。如果数据集已存在,此参数将不起作用。要迁移现有数据集,请改用
LanceDataset.migrate_manifest_paths_v2()方法。默认为False。- enable_move_stable_row_ids : bool, optional¶
实验性参数:如果设置为true,写入器将使用移动稳定的行ID。 这些行ID在压缩操作后保持稳定,但在更新后不稳定。 这使得压缩更加高效,因为使用稳定的行ID时, 无需更新二级索引来指向新的行ID。
- auto_cleanup_options : optional, AutoCleanupConfig¶
数据集自动清理的配置选项。 如果设置此选项且这是一个新数据集,旧数据集版本将根据此参数自动清理。 要为现有数据集添加自动清理功能,请使用Dataset::update_config设置 lance.auto_cleanup.interval和lance.auto_cleanup.older_than。 必须同时设置这两个参数才能启用自动清理功能。 如果不设置此参数(默认行为), 则不会执行自动清理。 注意:此选项仅在创建新数据集时生效, 对现有数据集没有影响。
数据集片段
-
class lance.fragment.DataFile(路径: str, 字段: list[int], column_indices: list[int] =
None, file_major_version: int =0, file_minor_version: int =0, file_size_bytes: int | None =None)¶ 片段中的数据文件。
- path¶
数据文件的路径。
- Type:
字符串
- fields¶
该文件中各列的字段ID。
- Type:
整数列表
- column_indices¶
字段在文件中存储的列索引。其长度将与fields相同。
- Type:
整数列表
- file_major_version¶
数据存储格式的主版本号。
- Type:
int
- file_minor_version¶
数据存储格式的次要版本。
- Type:
int
- file_size_bytes¶
数据文件的大小(以字节为单位),如果可用的话。
- Type:
可选[int]
- column_indices : List[int]¶
- field_ids() list[int]¶
- fields : List[int]¶
-
file_major_version : int =
0¶
-
file_minor_version : int =
0¶
-
file_size_bytes : int | None =
None¶
- property path : str¶
- class lance.fragment.DeletionFile(read_version, id, 文件类型, num_deleted_rows)¶
- asdict()¶
- file_type¶
- id¶
- json()¶
- num_deleted_rows¶
-
path(fragment_id, base_uri=
None)¶
- read_version¶
-
class lance.fragment.FragmentMetadata(id: int, 文件: list[数据文件], physical_rows: int, deletion_file: DeletionFile | None =
None, row_id_meta: RowIdMeta | None =None)¶ 片段的元数据。
- id¶
片段的ID。
- Type:
int
- physical_rows¶
该片段最初的行数。这是删除前数据文件中的行数。
- Type:
int
- deletion_file¶
删除文件(如果有的话)。
- Type:
可选[DeletionFile]
-
deletion_file : DeletionFile | None =
None¶
- static from_json(json_data: str) FragmentMetadata¶
- id : int¶
- property num_deletions : int¶
已从此片段中删除的行数。
- property num_rows : int¶
删除操作后此片段中的行数。
- physical_rows : int¶
- to_json() dict¶
获取这个作为简单的JSON可序列化字典。
-
class lance.fragment.LanceFragment(dataset: LanceDataset, fragment_id: int | None, *, 片段: _Fragment | None =
None)¶ - count_rows(self, 表达式 filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, bool cache_metadata=True, MemoryPool memory_pool=None)¶
统计符合扫描器筛选条件的行数。
- Parameters:
- filter : Expression, default None
Scan将仅返回与筛选条件匹配的行。 如果可能,谓词将被下推以利用数据源中的分区信息或内部元数据,例如Parquet统计信息。否则在生成RecordBatches之前会对已加载的记录批次进行过滤。
- batch_size : int, default 131_072
扫描记录批次的最大行数。如果扫描的记录批次超出内存限制,可以调用此方法来减小其大小。
- batch_readahead : int, default 16
文件中预读取的批次数量。这可能不适用于所有文件格式。增加此数值将提高内存使用量,但也可能提升IO利用率。
- fragment_readahead : int, default 4
预读取的文件数量。增加此数值将提高内存使用率,但也可能提升IO利用率。
- fragment_scan_options : FragmentScanOptions, default None
特定于某次扫描和片段类型的选项,这些选项在同个数据集的不同扫描之间可能会发生变化。
- use_threads : bool, default True
如果启用,则将根据可用CPU核心数量使用最大并行度。
- cache_metadata : bool, default True
如果启用,扫描时可能会缓存元数据以加速重复扫描。
- memory_pool : MemoryPool, default None
如需内存分配,可在此指定。若未指定,则使用默认池。
- Returns:
计数
- Return type:
int
-
static create(dataset_uri: str | Path, 数据: ReaderLike, fragment_id: int | None =
None, schema: pa.Schema | None =None, max_rows_per_group: int =1024, 进度: FragmentWriteProgress | None =None, mode: str ='append', *, data_storage_version: str | None =None, use_legacy_format: bool | None =None, storage_options: dict[str, str] | None =None) FragmentMetadata¶ 从给定数据创建
FragmentMetadata。如果数据集尚未创建,可以使用此功能。
警告
内部API。该方法不面向终端用户使用。
- Parameters:
- dataset_uri : str¶
数据集的URI。
- fragment_id : int¶
片段的ID。
- data : pa.Table or pa.RecordBatchReader¶
要写入片段的数据。
- schema : pa.Schema, optional¶
数据的模式。如果未指定,将从数据中推断模式。
- max_rows_per_group : int, default 1024¶
数据文件中每个分组的最大行数。
- progress : FragmentWriteProgress, optional¶
实验性API。用于跟踪片段写入进度的功能。传入一个自定义类,该类需定义在开始写入每个片段和完成写入时调用的钩子函数。
- mode : str, default "append"¶
写入模式。如果指定为“append”,数据将与现有数据集的模式进行校验。否则,传递“create”或“overwrite”将为模式分配新的字段ID。
- data_storage_version : optional, str, default None¶
要使用的数据存储格式版本。较新的版本效率更高,但需要较新版本的lance才能读取。默认值(None)将使用最新的稳定版本。更多详情请参阅用户指南。
- use_legacy_format : bool, default None¶
已弃用的参数。请改用 data_storage_version。
- storage_options : optional, dict¶
针对特定存储连接的额外选项。这用于存储连接参数,如凭证、端点等。
另请参阅
lance.dataset.LanceOperation.Overwrite该操作用于创建一个新数据集或覆盖现有数据集,使用此API生成的片段。有关使用此API的示例,请参阅文档页面。
lance.dataset.LanceOperation.Append该操作用于将通过此API创建的片段追加到现有数据集中。查看文档页面了解使用此API的示例。
- Return type:
- static create_from_file(文件名: str, dataset: LanceDataset, fragment_id: int) FragmentMetadata¶
从给定的数据文件URI创建片段。
如果数据文件从数据集中丢失,可以使用此功能。
警告
内部API。该方法不面向终端用户使用。
- Parameters:
- filename : str¶
数据文件的文件名。
- dataset : LanceDataset¶
该片段所属的数据集。
- fragment_id : int¶
片段的ID。
- data_files()¶
返回此片段的数据文件。
- delete(predicate: str) FragmentMetadata | None¶
从此片段中删除行。
这将添加或更新该片段的删除文件。它不会修改或删除该片段的数据文件。如果删除后没有剩余行,该方法将返回None。
警告
内部API。该方法不面向终端用户使用。
- Parameters:
- predicate : str¶
指定要删除行的SQL谓词。
- Returns:
包含新删除文件的新片段,如果没有剩余行则为None。
- Return type:
FragmentMetadata 或 None
示例
>>> import lance >>> import pyarrow as pa >>> tab = pa.table({"a": [1, 2, 3], "b": [4, 5, 6]}) >>> dataset = lance.write_dataset(tab, "dataset") >>> frag = dataset.get_fragment(0) >>> frag.delete("a > 1") FragmentMetadata(id=0, files=[DataFile(path='...', fields=[0, 1], ...), ...) >>> frag.delete("a > 0") is None True另请参阅
lance.dataset.LanceOperation.Delete用于将这些更改提交到数据集的操作。有关使用此API的示例,请参阅文档页面。
- deletion_file()¶
返回删除文件(如果有的话)
- property fragment_id¶
- head(self, int num_rows, columns=None, 表达式 filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, bool cache_metadata=True, MemoryPool memory_pool=None)¶
加载片段的前N行。
- Parameters:
- num_rows : int
要加载的行数。
- columns : list of str, default None
要投影的列。这可以是一个要包含的列名列表(顺序和重复项将被保留),或者是一个字典,包含{新列名: 表达式}值用于更高级的投影。
列或表达式列表可以使用特殊字段: __batch_index(批次在片段内的索引), __fragment_index(片段在数据集内的索引), __last_in_fragment(批次是否为片段中的最后一个),以及 __filename(源文件名或源片段的描述)。
这些列将被传递到数据集和相应的数据片段,以避免加载、复制和反序列化计算链后续不需要的列。 默认情况下会投影所有可用列。如果引用的列名在数据集的Schema中不存在,则会引发异常。
- filter : Expression, default None
Scan将仅返回与筛选条件匹配的行。 如果可能,谓词将被下推以利用数据源中的分区信息或内部元数据,例如Parquet统计信息。否则在生成RecordBatches之前会对已加载的记录批次进行过滤。
- batch_size : int, default 131_072
扫描记录批次的最大行数。如果扫描的记录批次超出内存限制,可以调用此方法来减小其大小。
- batch_readahead : int, default 16
文件中预读取的批次数量。这可能不适用于所有文件格式。增加此数值将提高内存使用量,但也可能提升IO利用率。
- fragment_readahead : int, default 4
预读取的文件数量。增加此数值将提高内存使用率,但也可能提升IO利用率。
- fragment_scan_options : FragmentScanOptions, default None
特定于某次扫描和片段类型的选项,这些选项在同个数据集的不同扫描之间可能会发生变化。
- use_threads : bool, default True
如果启用,则将根据可用CPU核心数量使用最大并行度。
- cache_metadata : bool, default True
如果启用,扫描时可能会缓存元数据以加速重复扫描。
- memory_pool : MemoryPool, default None
如需内存分配,可在此指定。若未指定,则使用默认池。
- Return type:
表格
-
merge(data_obj: ReaderLike, left_on: str, right_on: str | None =
None, schema=None) tuple[FragmentMetadata, LanceSchema]¶ 将另一个数据集合并到此片段中。
执行左连接操作,其中片段作为左表,data_obj作为右表。存在于数据集中但不在左表的行将被填充为null值,除非Lance不支持某些类型的null值,这种情况下会抛出错误。
- Parameters:
示例
>>> import lance >>> import pyarrow as pa >>> df = pa.table({'x': [1, 2, 3], 'y': ['a', 'b', 'c']}) >>> dataset = lance.write_dataset(df, "dataset") >>> dataset.to_table().to_pandas() x y 0 1 a 1 2 b 2 3 c >>> fragments = dataset.get_fragments() >>> new_df = pa.table({'x': [1, 2, 3], 'z': ['d', 'e', 'f']}) >>> merged = [] >>> schema = None >>> for f in fragments: ... f, schema = f.merge(new_df, 'x') ... merged.append(f) >>> merge = lance.LanceOperation.Merge(merged, schema) >>> dataset = lance.LanceDataset.commit("dataset", merge, read_version=1) >>> dataset.to_table().to_pandas() x y z 0 1 a d 1 2 b e 2 3 c f另请参阅
LanceDataset.merge_columns向此片段添加列。
- Returns:
包含合并列和最终模式的新片段。
- Return type:
元组[FragmentMetadata, LanceSchema]
-
merge_columns(value_func: dict[str, str] | BatchUDF | ReaderLike | collections.abc.Callable[[pa.RecordBatch], pa.RecordBatch], columns: list[str] | None =
None, batch_size: int | None =None, reader_schema: pa.Schema | None =None) tuple[FragmentMetadata, LanceSchema]¶ 向此片段添加列。
警告
内部API。该方法不面向终端用户使用。
参数及其解释与
lance.dataset.LanceDataset.add_columns()操作中的相同。唯一的区别在于,不是修改数据集,而是创建一个新的片段。新片段的模式也会被返回。这些可以在后续操作中用于将更改提交到数据集。
另请参阅
lance.dataset.LanceOperation.Merge用于将这些更改提交到数据集的操作。有关使用此API的示例,请参阅文档页面。
- Returns:
一个包含新增列和最终模式的新片段。
- Return type:
元组[FragmentMetadata, LanceSchema]
- property metadata : FragmentMetadata¶
返回此片段的元数据。
- Return type:
- property num_deletions : int¶
返回此片段中已删除的行数。
- property physical_rows : int¶
返回此片段中原始的行数。
要获取删除后的行数,请改用
count_rows()。
-
scanner(*, columns: list[str] | dict[str, str] | None =
None, batch_size: int | None =None, filter: str | pa.compute.Expression | None =None, limit: int | None =None, offset: int | None =None, with_row_id: bool =False, with_row_address: bool =False, batch_readahead: int =16) LanceScanner¶ 详情请参阅 Dataset::scanner
- take(self, 索引, columns=None, 表达式 filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, bool cache_metadata=True, MemoryPool memory_pool=None)¶
按索引选择数据行。
- Parameters:
- indices : Array or array-like¶
数据集中要选择的行索引。
- columns : list of str, default None
要投影的列。这可以是一个要包含的列名列表(顺序和重复项将被保留),或者是一个字典,包含{新列名: 表达式}值用于更高级的投影。
列或表达式列表可以使用特殊字段: __batch_index(批次在片段内的索引), __fragment_index(片段在数据集内的索引), __last_in_fragment(批次是否为片段中的最后一个),以及 __filename(源文件名或源片段的描述)。
这些列将被传递到数据集和相应的数据片段,以避免加载、复制和反序列化计算链后续不需要的列。 默认情况下会投影所有可用列。如果引用的列名在数据集的Schema中不存在,则会引发异常。
- filter : Expression, default None
Scan将仅返回与筛选条件匹配的行。 如果可能,谓词将被下推以利用数据源中的分区信息或内部元数据,例如Parquet统计信息。否则在生成RecordBatches之前会对已加载的记录批次进行过滤。
- batch_size : int, default 131_072
扫描记录批次的最大行数。如果扫描的记录批次超出内存限制,可以调用此方法来减小其大小。
- batch_readahead : int, default 16
文件中预读取的批次数量。这可能不适用于所有文件格式。增加此数值将提高内存使用量,但也可能提升IO利用率。
- fragment_readahead : int, default 4
预读取的文件数量。增加此数值将提高内存使用率,但也可能提升IO利用率。
- fragment_scan_options : FragmentScanOptions, default None
特定于某次扫描和片段类型的选项,这些选项在同个数据集的不同扫描之间可能会发生变化。
- use_threads : bool, default True
如果启用,则将根据可用CPU核心数量使用最大并行度。
- cache_metadata : bool, default True
如果启用,扫描时可能会缓存元数据以加速重复扫描。
- memory_pool : MemoryPool, default None
如需内存分配,可在此指定。若未指定,则使用默认池。
- Return type:
表格
- to_batches(self, Schema schema=None, columns=None, 表达式 filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, bool cache_metadata=True, MemoryPool memory_pool=None)¶
将片段读取为物化的记录批次。
- Parameters:
- schema : Schema, optional
用于扫描的具体模式。
- columns : list of str, default None
要投影的列。这可以是一个要包含的列名列表(顺序和重复项将被保留),或者是一个字典,包含{新列名: 表达式}值用于更高级的投影。
列或表达式列表可以使用特殊字段: __batch_index(批次在片段内的索引), __fragment_index(片段在数据集内的索引), __last_in_fragment(批次是否为片段中的最后一个),以及 __filename(源文件名或源片段的描述)。
这些列将被传递到数据集和相应的数据片段,以避免加载、复制和反序列化计算链后续不需要的列。 默认情况下会投影所有可用列。如果引用的列名在数据集的Schema中不存在,则会引发异常。
- filter : Expression, default None
Scan将仅返回与筛选条件匹配的行。 如果可能,谓词将被下推以利用数据源中的分区信息或内部元数据,例如Parquet统计信息。否则在生成RecordBatches之前会对已加载的记录批次进行过滤。
- batch_size : int, default 131_072
扫描记录批次的最大行数。如果扫描的记录批次超出内存限制,可以调用此方法来减小其大小。
- batch_readahead : int, default 16
文件中预读取的批次数量。这可能不适用于所有文件格式。增加此数值将提高内存使用量,但也可能提升IO利用率。
- fragment_readahead : int, default 4
预读取的文件数量。增加此数值将提高内存使用率,但也可能提升IO利用率。
- fragment_scan_options : FragmentScanOptions, default None
特定于某次扫描和片段类型的选项,这些选项在同个数据集的不同扫描之间可能会发生变化。
- use_threads : bool, default True
如果启用,则将根据可用CPU核心数量使用最大并行度。
- cache_metadata : bool, default True
如果启用,扫描时可能会缓存元数据以加速重复扫描。
- memory_pool : MemoryPool, default None
如需内存分配,可在此指定。若未指定,则使用默认池。
- Returns:
record_batches
- Return type:
RecordBatch的迭代器
- to_table(self, Schema schema=None, columns=None, 表达式 filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, bool cache_metadata=True, MemoryPool memory_pool=None)¶
将此片段转换为表格。
请谨慎使用此便捷工具。它会在创建表之前,将扫描结果序列化并全部加载到内存中。
- Parameters:
- schema : Schema, optional
用于扫描的具体模式。
- columns : list of str, default None
要投影的列。这可以是一个要包含的列名列表(顺序和重复项将被保留),或者是一个字典,包含{新列名: 表达式}值用于更高级的投影。
列或表达式列表可以使用特殊字段: __batch_index(批次在片段内的索引), __fragment_index(片段在数据集内的索引), __last_in_fragment(批次是否为片段中的最后一个),以及 __filename(源文件名或源片段的描述)。
这些列将被传递到数据集和相应的数据片段,以避免加载、复制和反序列化计算链后续不需要的列。 默认情况下会投影所有可用列。如果引用的列名在数据集的Schema中不存在,则会引发异常。
- filter : Expression, default None
Scan将仅返回与筛选条件匹配的行。 如果可能,谓词将被下推以利用数据源中的分区信息或内部元数据,例如Parquet统计信息。否则在生成RecordBatches之前会对已加载的记录批次进行过滤。
- batch_size : int, default 131_072
扫描记录批次的最大行数。如果扫描的记录批次超出内存限制,可以调用此方法来减小其大小。
- batch_readahead : int, default 16
文件中预读取的批次数量。这可能不适用于所有文件格式。增加此数值将提高内存使用量,但也可能提升IO利用率。
- fragment_readahead : int, default 4
预读取的文件数量。增加此数值将提高内存使用率,但也可能提升IO利用率。
- fragment_scan_options : FragmentScanOptions, default None
特定于某次扫描和片段类型的选项,这些选项在同个数据集的不同扫描之间可能会发生变化。
- use_threads : bool, default True
如果启用,则将根据可用CPU核心数量使用最大并行度。
- cache_metadata : bool, default True
如果启用,扫描时可能会缓存元数据以加速重复扫描。
- memory_pool : MemoryPool, default None
如需内存分配,可在此指定。若未指定,则使用默认池。
- Returns:
表格
- Return type:
表格
-
lance.fragment.write_fragments(数据: ReaderLike, dataset_uri: str | Path | LanceDataset, schema: 模式/架构 | None =
None, *, return_transaction: True, mode: str ='append', max_rows_per_file: int =1024 * 1024, max_rows_per_group: int =1024, max_bytes_per_file: int =DEFAULT_MAX_BYTES_PER_FILE, 进度: FragmentWriteProgress | None =None, data_storage_version: str | None =None, use_legacy_format: bool | None =None, storage_options: dict[str, str] | None =None, enable_move_stable_row_ids: bool =False) 事务¶ -
lance.fragment.write_fragments(数据: ReaderLike, dataset_uri: str | Path | LanceDataset, schema: 模式/架构 | None =
None, *, return_transaction: False =False, mode: str ='append', max_rows_per_file: int =1024 * 1024, max_rows_per_group: int =1024, max_bytes_per_file: int =DEFAULT_MAX_BYTES_PER_FILE, 进度: FragmentWriteProgress | None =None, data_storage_version: str | None =None, use_legacy_format: bool | None =None, storage_options: dict[str, str] | None =None, enable_move_stable_row_ids: bool =False) list[FragmentMetadata] 将数据写入一个或多个片段。
警告
这是一个底层API,专为手动实现分布式写入而设计。对于大多数用户,推荐使用
lance.write_dataset()API。- Parameters:
- data : pa.Table or pa.RecordBatchReader¶
要写入片段的数据。
- dataset_uri : str, Path, or LanceDataset¶
数据集或数据集对象的URI。
- schema : pa.Schema, optional¶
数据的模式。如果未指定,将从数据中推断模式。
- return_transaction : bool, default False¶
如果为真,事务将被返回。
- mode : str, default "append"¶
写入模式。如果指定为“append”,数据将与现有数据集的模式进行校验。否则,传递“create”或“overwrite”将为模式分配新的字段ID。
- max_rows_per_file : int, default 1024 * 1024¶
每个数据文件的最大行数。
- max_rows_per_group : int, default 1024¶
数据文件中每个分组的最大行数。
- max_bytes_per_file : int, default 90 * 1024 * 1024 * 1024¶
在开始新文件之前要写入的最大字节数。这是一个软性限制。该限制在每组数据写入后进行检查,这意味着较大的组可能会导致此限制被显著超出。默认值为90 GB,因为我们在对象存储上对每个文件有100 GB的硬性限制。
- progress : FragmentWriteProgress, optional¶
实验性API。用于跟踪片段写入进度的功能。传入一个自定义类,该类需定义在开始写入每个片段和完成写入时调用的钩子函数。
- data_storage_version : optional, str, default None¶
要使用的数据存储格式版本。较新的版本效率更高,但需要较新版本的lance才能读取。默认值(None)将使用2.0版本。更多详情请参阅用户指南。
- use_legacy_format : optional, bool, default None¶
已弃用的设置数据存储版本的方法。请改用data_storage_version参数。
- storage_options : Optional[Dict[str, str]]¶
针对特定存储连接的额外选项。这用于存储连接参数,如凭证、端点等。
- enable_move_stable_row_ids : bool¶
实验性功能:如果设置为true,写入器将使用移动稳定的行ID。这些行ID在压缩操作后保持稳定,但在更新后不稳定。这使得压缩操作更高效,因为使用稳定的行ID时,无需更新二级索引来指向新的行ID。
- Returns:
- 如果return_transaction为False:
返回已写入分片的
FragmentMetadata列表。此时分片ID保持为零值,表示尚未分配具体ID。这些ID将在分片提交到数据集时被分配。- 如果return_transaction为True:
返回写入事务对象。事务类型将对应指定的mode参数。该事务对象可传递给
LanceDataset.commit()方法。
- Return type:
列表[FragmentMetadata] | Transaction