class lance.BlobColumn(blob_column: 数组 | ChunkedArray)

一个实用工具,用于封装Pyarrow二进制列,并以类似文件对象的方式逐行迭代。

这对于处理需要与期望文件类对象的API接口交互的中小型二进制对象非常有用。对于非常大的二进制对象(每个值4-8MB或更大),最好创建一个blob列并使用lance.Dataset.take_blobs()来访问blob数据。

class lance.BlobFile(inner: LanceBlobFile)

将Lance数据集中的blob表示为类似文件的对象。

close() None

刷新并关闭IO对象。

如果文件已经关闭,此方法将不起作用。

property closed : bool
readable() bool

返回对象是否已打开以供读取。

如果为False,read()将抛出OSError异常。

readall() bytes

读取直到文件结束(EOF),使用多次read()调用。

readinto(b: bytearray) int
seek(offset: int, 何时: int = 0) int

将流位置更改为给定的字节偏移量。

offset

流位置,相对于‘whence’。

whence

相对于当前位置的偏移量。

偏移量是相对于whence所指示的位置进行解释的。 whence的取值包括:

  • os.SEEK_SET 或 0 - 流的开始位置(默认值);偏移量应为零或正数

  • os.SEEK_CUR 或 1 – 当前流位置;偏移量可为负值

  • os.SEEK_END 或 2 – 流的末尾;偏移量通常为负数

返回新的绝对位置。

seekable() bool

返回对象是否支持随机访问。

如果为False,seek()、tell()和truncate()将引发OSError。 此方法可能需要进行测试seek()。

size() int

返回blob的大小(以字节为单位)。

tell() int

返回当前流的位置。

class lance.DataStatistics(字段: FieldStatistics)

数据集中的数据统计信息

fields : FieldStatistics

数据集中字段的统计信息

class lance.FFILanceTableProvider(dataset, *, with_row_id=False, with_row_addr=False)
class lance.FieldStatistics(id: int, bytes_on_disk: int)

数据集中某个字段的统计信息

bytes_on_disk : int

(可能经过压缩的)磁盘上用于存储该字段的字节

id : int

字段的ID

class lance.FragmentMetadata(id: int, 文件: list[DataFile], physical_rows: int, deletion_file: DeletionFile | None = None, row_id_meta: RowIdMeta | None = None)

片段的元数据。

id

片段的ID。

Type:

整数

files

片段的数据文件。每个数据文件必须具有相同的行数。每个文件存储不同列的子集。

Type:

列表[DataFile]

physical_rows

该片段最初的行数。这是删除前数据文件中的行数。

Type:

int

deletion_file

删除文件(如果有的话)。

Type:

可选[DeletionFile]

row_id_meta

行ID元数据(如果有的话)。

Type:

可选[RowIdMeta]

data_files() list[DataFile]
deletion_file : DeletionFile | None = None
files : List[数据文件]
static from_json(json_data: str) FragmentMetadata
id : int
property num_deletions : int

已从此片段中删除的行数。

property num_rows : int

删除操作后此片段中的行数。

physical_rows : int
row_id_meta : RowIdMeta | None = None
to_json() dict

获取这个作为简单的JSON可序列化字典。

class lance.LanceDataset(uri: str | Path, 版本: int | str | None = None, block_size: int | None = None, index_cache_size: int | None = None, metadata_cache_size: int | None = None, commit_lock: CommitLock | None = None, storage_options: dict[str, str] | None = None, serialized_manifest: bytes | None = None, default_scan_options: dict[str, Any] | None = None)

Lance数据集采用Lance格式,数据存储在给定的uri位置。

add_columns(转换: dict[str, str] | BatchUDF | ReaderLike | pyarrow.Field | list[pyarrow.Field] | pyarrow.Schema, read_columns: list[str] | None = None, reader_schema: pa.Schema | None = None, batch_size: int | None = None)

使用定义的值添加新列。

有几种方式可以指定新列。首先,可以为每个新列提供SQL表达式。其次,可以提供一个UDF(用户定义函数),该函数接收一批现有数据并返回包含新列的新数据批次。这些新列将被追加到数据集中。

你也可以提供一个RecordBatchReader,它将从某个外部源读取新列的值。当新列的值已经暂存到文件中(通常由某个分布式进程完成)时,这通常很有用。

有关编写UDF的更多信息,请参阅lance.add_columns_udf()装饰器。

Parameters:
transforms : dict or AddColumnsUDF or ReaderLike

如果这是一个字典,那么键是新列的名称,值则是SQL表达式字符串。这些字符串可以引用数据集中的现有列。 如果这是一个AddColumnsUDF,那么它是一个用户定义函数,接收一批现有数据并返回包含新列的新数据批次。 如果这是pyarrow.Fieldpyarrow.Schema,则会以仅元数据操作的方式添加所有具有给定模式的NULL列。

read_columns : list of str, optional

UDF将读取的列名。如果为None,则UDF将读取所有列。仅当transforms是UDF时使用此参数。否则,读取的列将从SQL表达式中推断得出。

reader_schema : pa.Schema, optional

仅当transforms是ReaderLike对象时有效。这将用于确定读取器的模式。

batch_size : int, optional

在应用转换时,每次从源数据集中读取的行数。如果数据集是v1版本,则忽略此参数。

示例

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3]})
>>> dataset = lance.write_dataset(table, "my_dataset")
>>> @lance.batch_udf()
... def double_a(batch):
...     df = batch.to_pandas()
...     return pd.DataFrame({'double_a': 2 * df['a']})
>>> dataset.add_columns(double_a)
>>> dataset.to_table().to_pandas()
   a  double_a
0  1         2
1  2         4
2  3         6
>>> dataset.add_columns({"triple_a": "a * 3"})
>>> dataset.to_table().to_pandas()
   a  double_a  triple_a
0  1         2         3
1  2         4         6
2  3         6         9

另请参阅

LanceDataset.merge

将一组预先计算好的列合并到数据集中。

alter_columns(*修改: Iterable[AlterColumn])

修改列名、数据类型和可空性。

重命名的列可以保留其上的任何索引。如果某列具有IVF_PQ索引,即使该列被转换为其他类型,索引仍可保留。但目前其他类型的索引尚不支持类型转换。

列类型可以进行向上转型(例如从int32转为int64)或向下转型(例如从int64转为int32)。但若存在无法用新类型表示的值时,向下转型会失败。通常来说,列可以转型为相同的大类类型:整型转整型、浮点型转浮点型、字符串转字符串。不过字符串、二进制和列表列可以在其大小变体之间进行转型。例如:字符串转大字符串、二进制转大二进制、列表转大列表。

重命名的列可以保留其上的任何索引。但是,如果将该列转换为不同类型,其索引将被删除。

Parameters:
alterations : Iterable[Dict[str, Any]]

一个字典序列,每个字典包含以下键:

  • ”path”: str

    要修改的列路径。对于顶级列,这是列名。 对于嵌套列,这是点分隔的路径,例如"a.b.c"。

  • ”name”: str, optional

    列的新名称。如果未指定,则列名保持不变。

  • ”nullable”: bool, optional

    列是否可为空。如果未指定,则列的可空性不变。 只能将非空列改为可空列。目前不能将可空列改为非空列。

  • ”data_type”: pyarrow.DataType, optional

    要将列转换到的新数据类型。如果未指定,则列的数据类型保持不变。

示例

>>> import lance
>>> import pyarrow as pa
>>> schema = pa.schema([pa.field('a', pa.int64()),
...                     pa.field('b', pa.string(), nullable=False)])
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.alter_columns({"path": "a", "name": "x"},
...                       {"path": "b", "nullable": True})
>>> dataset.to_table().to_pandas()
   x  b
0  1  a
1  2  b
2  3  c
>>> dataset.alter_columns({"path": "x", "data_type": pa.int32()})
>>> dataset.schema
x: int32
b: string
checkout_version(版本: int | str) LanceDataset

加载指定版本的数据集。

dataset()构造函数不同,此操作将重用当前缓存。如果数据集已处于指定版本,则此操作无效。

Parameters:
version : int | str,

要检出的版本号。可以提供一个版本号(int)或标签(str)。

Return type:

LanceDataset

cleanup_old_versions(older_than: timedelta | None = None, *, delete_unverified: bool = False, error_if_tagged_old_versions: bool = True) CleanupStats

清理数据集的旧版本。

某些数据集变更(如覆盖操作)会遗留未被最新版本引用的数据。这些旧数据会被保留,以便将数据集回滚到之前的版本。

该方法将移除旧版本及其引用的所有数据文件。 一旦清理任务执行完毕,您将无法检出或恢复这些旧版本。

Parameters:
older_than : timedelta, optional

仅早于此版本的记录将被删除。如果未指定,默认保留两周内的记录。

delete_unverified : bool, default False

事务失败后遗留的文件可能看起来像是正在进行中的操作(例如追加新数据)的一部分,这些文件只有在至少7天后才会被删除。如果delete_unverified为True,则无论文件存在多久都会被删除。

只有在您能确保当前没有其他进程正在处理此数据集时,才应将此设置为True。否则数据集可能会进入损坏状态。

error_if_tagged_old_versions : bool, default True

某些版本可能关联了标签。带标签的版本不会被清理,无论它们存在多久。如果该参数设为True(默认值),当任何带标签版本匹配参数时会抛出异常。否则,带标签版本将被静默忽略,仅清理未加标签的版本。

static commit(base_uri: str | Path | LanceDataset, 操作: LanceOperation.BaseOperation | 事务, blobs_op: LanceOperation.BaseOperation | None = None, read_version: int | None = None, commit_lock: CommitLock | None = None, storage_options: dict[str, str] | None = None, enable_v2_manifest_paths: bool | None = None, 分离: bool | None = False, max_retries: int = 20) LanceDataset

创建数据集的新版本

该方法是一个高级方法,允许用户描述对数据文件所做的更改。当使用Lance应用变更时(例如使用LanceDatasetwrite_dataset()),则不需要此方法。

它当前的目的是允许在分布式环境中进行更改,其中没有单一进程完成所有工作。例如,分布式批量更新或分布式批量修改操作。

一旦完成所有更改,可以调用此方法通过更新数据集清单使更改生效。

警告

这是一个高级API,不提供与其他API相同级别的验证。例如,调用者有责任确保片段对模式有效。

Parameters:
base_uri : str, Path, or LanceDataset

数据集的基URI,或数据集对象本身。使用数据集对象可能更高效,因为它可以复用文件元数据缓存。

operation : BaseOperation

应用于数据集的操作。这描述了已进行的更改。可用的操作请参见LanceOperation

read_version : int, optional

作为变更基础的数据集版本。 覆盖或恢复操作不需要此版本。

commit_lock : CommitLock, optional

自定义提交锁。仅当您的对象存储不支持原子提交时才需要。详情请参阅用户指南。

storage_options : optional, dict

针对特定存储连接的额外选项。这用于存储连接参数,如凭证、端点等。

enable_v2_manifest_paths : bool, optional

如果为True,且这是一个新数据集,则使用新的V2清单路径。 这些路径能更高效地在对象存储上打开包含多个版本的数据集。如果数据集已存在,此参数将不起作用。要迁移现有数据集,请改用migrate_manifest_paths_v2()方法。默认为False。警告: 启用此选项将使旧版Lance(0.17.0之前版本)无法读取该数据集。

detached : bool, optional

如果为True,则该提交不会成为数据集谱系的一部分。它将永远不会显示为最新数据集,未来唯一查看它的方式是通过指定版本号检出。该版本将是一个随机版本号,仅在分离提交中保持唯一。调用方应将其存储在某处,因为未来将无法通过其他方式获取它。

max_retries : int

提交数据集时执行的最大重试次数。

Returns:

Lance数据集的新版本。

Return type:

LanceDataset

示例

使用LanceOperation.Overwrite操作创建新数据集:

>>> import lance
>>> import pyarrow as pa
>>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]})
>>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]})
>>> fragment1 = lance.fragment.LanceFragment.create("example", tab1)
>>> fragment2 = lance.fragment.LanceFragment.create("example", tab2)
>>> fragments = [fragment1, fragment2]
>>> operation = lance.LanceOperation.Overwrite(tab1.schema, fragments)
>>> dataset = lance.LanceDataset.commit("example", operation)
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
static commit_batch(dest: str | Path | LanceDataset, 交易: collections.abc.Sequence[事务], commit_lock: CommitLock | None = None, storage_options: dict[str, str] | None = None, enable_v2_manifest_paths: bool | None = None, 分离: bool | None = False, max_retries: int = 20) BulkCommitResult

使用多个事务创建数据集的新版本。

此方法是一个高级方法,允许用户描述对数据文件所做的更改。当使用Lance应用更改时(例如使用LanceDatasetwrite_dataset()),则不需要此方法。

Parameters:
dest : str, Path, or LanceDataset

数据集的基础URI,或者数据集对象本身。使用数据集对象可能更高效,因为它可以复用文件元数据缓存。

transactions : Iterable[Transaction]

要应用于数据集的事务操作。这些操作将被合并为单个事务并应用到数据集中。注意:目前仅支持追加事务,其他类型的事务将在未来版本中支持。

commit_lock : CommitLock, optional

自定义提交锁。仅当您的对象存储不支持原子提交时才需要。详情请参阅用户指南。

storage_options : optional, dict

针对特定存储连接的额外选项。这用于存储连接参数,如凭证、端点等。

enable_v2_manifest_paths : bool, optional

如果为True,且这是一个新数据集,则使用新的V2清单路径。 这些路径能更高效地在对象存储上打开包含多个版本的数据集。如果数据集已存在,此参数将不起作用。要迁移现有数据集,请改用migrate_manifest_paths_v2()方法。默认为False。警告: 启用此选项将使旧版Lance(0.17.0之前版本)无法读取该数据集。

detached : bool, optional

如果为True,则该提交不会成为数据集谱系的一部分。它将永远不会显示为最新数据集,未来唯一查看它的方式是通过指定版本号检出。该版本将是一个随机版本号,仅在分离提交中保持唯一。调用方应将其存储在某处,因为未来将无法通过其他方式获取它。

max_retries : int

提交数据集时执行的最大重试次数。

Returns:

dataset: LanceDataset

Lance数据集的新版本。

merged: Transaction

应用于数据集的合并事务。

Return type:

包含键的字典

count_rows(filter: 表达式 | str | None = None, **kwargs) int

统计符合扫描器筛选条件的行数。

Parameters:
**kwargs : dict, optional

完整参数描述请参见 py:method:scanner 方法。

Returns:

count – 数据集中的总行数。

Return type:

int

create_index(: str | list[str], index_type: str, name: str | None = None, metric: str = 'L2', replace: bool = False, num_partitions: int | None = None, ivf_centroids: np.ndarray | pa.FixedSizeListArray | pa.FixedShapeTensorArray | None = None, pq_codebook: np.ndarray | pa.FixedSizeListArray | pa.FixedShapeTensorArray | None = None, num_sub_vectors: int | None = None, accelerator: str | 'torch.Device' | None = None, index_cache_size: int | None = None, shuffle_partition_batches: int | None = None, shuffle_partition_concurrency: int | None = None, ivf_centroids_file: str | None = None, precomputed_partition_dataset: str | None = None, storage_options: dict[str, str] | None = None, filter_nan: bool = True, one_pass_ivfpq: bool = False, **kwargs) LanceDataset

在列上创建索引。

实验性API

Parameters:
column : str

要建立索引的列。

index_type : str

索引的类型。 "IVF_PQ, IVF_HNSW_PQ IVF_HNSW_SQ" 目前支持。

name : str, optional

索引名称。如果未提供,将根据列名自动生成。

metric : str

距离度量类型,即“L2”(“euclidean”的别名)、“cosine”或“dot”(点积)。默认为“L2”。

replace : bool

如果索引已存在,则替换现有索引。

num_partitions : int, optional

IVF(倒排文件索引)的分区数量。

ivf_centroids : optional

它可以是np.ndarraypyarrow.FixedSizeListArraypyarrow.FixedShapeTensorArray。 一个num_partitions x dimension维度的现有K均值中心点数组, 用于IVF聚类。如果未提供,将训练一个新的KMeans模型。

pq_codebook : optional,

它可以是np.ndarraypyarrow.FixedSizeListArray, 或pyarrow.FixedShapeTensorArray。 一个num_sub_vectors x (2 ^ nbits * dimensions // num_sub_vectors) 数组,表示PQ码本的K均值中心点。

注意:目前nbits始终为8。 如果未提供,将训练一个新的PQ模型。

num_sub_vectors : int, optional

PQ(乘积量化)的子向量数量。

accelerator: str | 'torch.Device' | None = None

如果设置,将使用加速器来加快训练过程。 支持的加速器包括:"cuda"(英伟达GPU)和"mps"(苹果硅GPU)。 如果未设置,则使用CPU。

index_cache_size : int, optional

索引缓存的大小,以条目数表示。默认值为256。

shuffle_partition_batches : int, optional

批次数,使用数据集的row group大小,决定每个shuffle分区包含的数量。默认值为10240。

假设row group大小为1024,每个shuffle分区将包含10240 * 1024 = 10,485,760行。减小此值会减少shuffle操作的内存消耗但会增加完成时间,反之亦然。

shuffle_partition_concurrency : int, optional

并发处理的shuffle分区数量。默认值为2

减小该值可以减少shuffle操作的内存消耗,但会增加完成时间,反之亦然。

storage_options : optional, dict

针对特定存储连接的额外选项。这用于存储连接参数,如凭证、端点等。

filter_nan : bool

默认为True。False是不安全的,如果存在任何null/nan值会导致崩溃(否则不会)。禁用用于可空列的空值过滤器。可获得小幅速度提升。

one_pass_ivfpq : bool

默认为False。如果启用,索引类型必须为“IVF_PQ”。可减少磁盘IO。

**kwargs

传递给索引构建过程的参数。

SQ(标量量化)仅适用于IVF_HNSW_SQ索引类型,这种量化方法用于减少索引的内存占用,它将浮点向量映射为整数向量,每个整数占用num_bits位,目前仅支持8位。

If index_type is “IVF_*”, then the following parameters are required:

num_partitions

If index_type is with “PQ”, then the following parameters are required:

子向量数量

IVF_PQ的可选参数:

  • ivf_centroids

    用于IVF聚类的现有K均值中心点。

  • num_bits

    PQ(乘积量化)的位数。默认为8。 仅支持4和8。

Optional parameters for IVF_HNSW_*:
max_level

Int,图中最大层级数。

m

Int,图中每个节点的边数。

ef_construction

Int,构建过程中需要检查的节点数量。

示例

import lance

dataset = lance.dataset("/tmp/sift.lance")
dataset.create_index(
    "vector",
    "IVF_PQ",
    num_partitions=256,
    num_sub_vectors=16
)
import lance

dataset = lance.dataset("/tmp/sift.lance")
dataset.create_index(
    "vector",
    "IVF_HNSW_SQ",
    num_partitions=256,
)

实验性加速器(GPU)支持:

  • accelerate: 使用GPU训练IVF分区。

    目前仅支持CUDA(Nvidia)或MPS(Apple)。 需要安装PyTorch。

import lance

dataset = lance.dataset("/tmp/sift.lance")
dataset.create_index(
    "vector",
    "IVF_PQ",
    num_partitions=256,
    num_sub_vectors=16,
    accelerator="cuda"
)

参考文献

create_scalar_index(: str, index_type: 'BTREE' | 'BITMAP' | 'LABEL_LIST' | 'INVERTED' | 'FTS' | 'NGRAM', name: str | None = None, *, 替换: bool = True, **kwargs)

在列上创建标量索引。

标量索引与向量索引类似,可用于加速扫描。当扫描包含对已索引列的过滤表达式时,标量索引能显著提升查询速度。例如,若my_col列建有标量索引,以下扫描操作将执行得更快:

import lance

dataset = lance.dataset("/tmp/images.lance")
my_table = dataset.scanner(filter="my_col != 7").to_table()

带有预过滤器的向量搜索也可以从标量索引中受益。例如,

import lance

dataset = lance.dataset("/tmp/images.lance")
my_table = dataset.scanner(
    nearest=dict(
       column="vector",
       q=[1, 2, 3, 4],
       k=10,
    )
    filter="my_col != 7",
    prefilter=True
)

目前有5种标量索引类型可供选择。

  • BTREE。最常见的类型是BTREE。该索引的灵感来源于btree数据结构,尽管只有btree的前几层会被缓存在内存中。它将在具有大量唯一值且每个值对应行数较少的列上表现良好。

  • BITMAP。该索引为列中的每个唯一值存储一个位图。这种索引适用于具有少量唯一值且每个值对应多行数据的列。

  • LABEL_LIST. 一种特殊索引,用于对值基数较小的列表列进行索引。例如,包含标签列表的列(如 ["tag1", "tag2", "tag3"])可以使用 LABEL_LIST 索引。该索引只能加速带有 array_has_anyarray_has_all 过滤器的查询。

  • NGRAM. 一种用于索引字符串列的特殊索引。该索引会为字符串中的每个n元语法创建位图,默认使用三元语法。当前该索引可以加速在过滤器中使用contains函数的查询。

  • FTS/INVERTED. 该索引用于文档列。这种索引可以进行全文搜索。例如,一个包含查询字符串"hello world"中任意单词的列。结果将按BM25排序。

请注意,可以使用环境变量LANCE_BYPASS_SPILLING来绕过磁盘溢出。将其设置为true可以避免内存耗尽问题(更多信息请参阅https://github.com/apache/datafusion/issues/10073)。

实验性API

Parameters:
column : str

要建立索引的列。必须是布尔型、整型、浮点型或字符串类型的列。

index_type : str

索引的类型。可选值为 "BTREE", "BITMAP", "LABEL_LIST", "NGRAM", "FTS""INVERTED"

name : str, optional

索引名称。如果未提供,将根据列名自动生成。

replace : bool, default True

如果索引已存在,则替换现有索引。

with_position : bool, default True

这是针对INVERTED索引的。如果设为True,索引将存储文档中单词的位置信息,以便支持短语查询。这将显著增加索引大小。即使设置为True,也不会影响非短语查询的性能。

base_tokenizer : str, default "simple"

这是针对INVERTED索引的配置。指定使用的基础分词器,可选值包括: * "simple":根据空白字符和标点符号进行分词。 * "whitespace":仅根据空白字符进行分词。 * "raw":不进行分词处理。

language : str, default "English"

这是针对INVERTED索引的。用于词干提取和停用词处理的语言。仅当stemremove_stop_words为true时使用

max_token_length : Optional[int], default 40

这是针对INVERTED索引的设置。表示最大令牌长度。任何超过此长度的令牌将被移除。

lower_case : bool, default True

这是针对INVERTED索引的。如果设为True,索引会将所有文本转换为小写。

stem : bool, default False

这是针对INVERTED索引的。如果为True,索引将对词干进行提取。

remove_stop_words : bool, default False

这是针对INVERTED索引的。如果为True,该索引将移除停用词。

ascii_folding : bool, default False

这是针对INVERTED索引的。如果设为True,索引会尽可能将非ASCII字符转换为ASCII字符。例如会将带重音符号的字母如"é"转换为"e"。

示例

import lance

dataset = lance.dataset("/tmp/images.lance")
dataset.create_index(
    "category",
    "BTREE",
)

标量索引只能加速使用等值、比较、范围(例如my_col BETWEEN 0 AND 100)和集合成员(例如my_col IN (0, 1, 2))等基础过滤条件的扫描

当筛选条件包含多个索引列且这些条件通过AND或OR逻辑连接时,可以使用标量索引 (例如 my_col < 0 AND other_col> 100)

如果过滤条件包含未建立索引的列,虽然可以使用标量索引,但根据过滤条件的结构,可能无法实际使用。例如,若列not_indexed没有标量索引,那么过滤条件my_col = 0 OR not_indexed = 1将无法利用my_col上的任何标量索引。

要判断扫描是否使用了标量索引,可以使用explain_plan查看lancedb生成的查询计划。使用标量索引的查询将包含ScalarIndexQuery关系或MaterializeIndex操作符。

property data_storage_version : str

该数据集所使用的数据存储格式版本

delete(predicate: str | 表达式)

从数据集中删除行。

这会将行标记为已删除,但不会从文件中物理移除它们。这样可以保持现有索引仍然有效。

Parameters:
predicate : str or pa.compute.Expression

用于选择要删除行的谓词。可以是SQL字符串或pyarrow表达式。

示例

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.delete("a = 1 or b in ('a', 'b')")
>>> dataset.to_table()
pyarrow.Table
a: int64
b: string
----
a: [[3]]
b: [["c"]]
static drop(base_uri: str | Path, storage_options: dict[str, str] | None = None) None
drop_columns(columns: list[str])

从数据集中删除一列或多列

这是一个仅涉及元数据的操作,不会从底层存储中删除实际数据。如需删除数据,您必须随后调用compact_files来重写不包含被删除列的数据,然后调用cleanup_old_versions来移除旧文件。

Parameters:
columns : list of str

要删除的列名。这些可以是嵌套列引用(例如“a.b.c”)或顶级列名(例如“a”)。

示例

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.drop_columns(["a"])
>>> dataset.to_table().to_pandas()
   b
0  a
1  b
2  c
drop_index(name: str)

从数据集中删除索引

注意:索引是通过"索引名称"删除的。这与字段名称不同。如果在创建索引时未指定名称,则会自动生成一个名称。您可以使用list_indices方法来获取索引的名称。

get_fragment(fragment_id: int) LanceFragment | None

根据片段ID获取对应的片段。

get_fragments(filter: Expression | None = None) list[LanceFragment]

从数据集中获取所有片段。

注意:过滤器功能暂不支持。

property has_index
head(num_rows, **kwargs)

加载数据集的前N行。

Parameters:
num_rows : int

要加载的行数。

**kwargs : dict, optional

完整参数描述请参见scanner()方法。

Returns:

表格

Return type:

表格

index_statistics(index_name: str) dict[str, Any]
insert(数据: ReaderLike, *, mode='append', **kwargs)

将数据插入数据集。

Parameters:
data_obj : Reader-like

要写入的数据。可接受的类型包括: - Pandas DataFrame、Pyarrow Table、Dataset、Scanner 或 RecordBatchReader - Huggingface 数据集

mode : str, default 'append'

写入数据时使用的模式。可选选项有:

create - 创建新数据集(如果uri已存在则会报错)。 overwrite - 创建新的快照版本 append - 创建新版本,该版本是输入数据与最新版本的合并(如果uri不存在则会报错)

**kwargs : dict, optional

传递给write_dataset()的其他关键字参数。

join(right_dataset, keys, right_keys=None, join_type='left outer', left_suffix=None, right_suffix=None, coalesce_keys=True, use_threads=True)

未实现(仅覆盖pyarrow数据集以防止段错误)

property lance_schema : LanceSchema

该数据集的LanceSchema

property latest_version : int

返回数据集的最新版本。

list_indices() list[索引]
property max_field_id : int

清单中的max_field_id

merge(data_obj: ReaderLike, left_on: str, right_on: str | None = None, schema=None)

将另一个数据集合并到当前数据集中。

执行左连接操作,其中数据集作为左表,data_obj作为右表。存在于数据集但不在左表中的行将被填充为null值,除非Lance不支持某些类型的null值,这种情况下会抛出错误。

Parameters:
data_obj : Reader-like

要合并的数据。可接受的类型包括: - Pandas DataFrame、Pyarrow Table、Dataset、Scanner、 Iterator[RecordBatch] 或 RecordBatchReader

left_on : str

数据集中用于连接的列名。

right_on : str or None

数据对象中用于连接的列名。如果为None,则默认为left_on。

示例

>>> import lance
>>> import pyarrow as pa
>>> df = pa.table({'x': [1, 2, 3], 'y': ['a', 'b', 'c']})
>>> dataset = lance.write_dataset(df, "dataset")
>>> dataset.to_table().to_pandas()
   x  y
0  1  a
1  2  b
2  3  c
>>> new_df = pa.table({'x': [1, 2, 3], 'z': ['d', 'e', 'f']})
>>> dataset.merge(new_df, 'x')
>>> dataset.to_table().to_pandas()
   x  y  z
0  1  a  d
1  2  b  e
2  3  c  f

另请参阅

LanceDataset.add_columns

通过逐批计算添加新列。

merge_insert(开启: str | Iterable[str]) MergeInsertBuilder

返回一个构建器,可用于创建“合并插入”操作

该操作可以在单个事务中添加行、更新行和删除行。这是一个非常通用的工具,可用于实现诸如“如果不存在则插入”、“更新或插入(即upsert)”等行为,甚至可以用新数据替换部分现有数据(例如替换所有月份为“一月”的数据)。

合并插入操作通过使用连接将源表中的新数据与目标表中的现有数据相结合。记录分为三类。

"匹配"记录是指同时存在于源表和目标表中的记录。"未匹配"记录仅存在于源表中(例如这些是新数据)。"源未匹配"记录仅存在于目标表中(这是旧数据)。

此方法返回的构建器可用于自定义每种数据类别应执行的操作。

请注意,此操作会导致数据重新排序。这是因为更新的行会从数据集中删除,然后以新值重新插入到末尾。由于内部使用了哈希连接操作,新插入行的顺序可能会随机波动。

Parameters:
on : Union[str, Iterable[str]]

要连接的列(或多列)。这是源表和目标表中记录匹配的方式。通常这是某种键或ID列。

示例

使用when_matched_update_all()when_not_matched_insert_all()来执行"upsert"操作。这将更新数据集中已存在的行,并插入不存在的行。

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [2, 1, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
>>> # Perform a "upsert" operation
>>> dataset.merge_insert("a")     \
...             .when_matched_update_all()     \
...             .when_not_matched_insert_all() \
...             .execute(new_table)
{'num_inserted_rows': 1, 'num_updated_rows': 2, 'num_deleted_rows': 0}
>>> dataset.to_table().sort_by("a").to_pandas()
   a  b
0  1  b
1  2  x
2  3  y
3  4  z

使用when_not_matched_insert_all()执行"不存在则插入"操作。这只会插入数据集中尚不存在的行。

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example2")
>>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
>>> # Perform an "insert if not exists" operation
>>> dataset.merge_insert("a")     \
...             .when_not_matched_insert_all() \
...             .execute(new_table)
{'num_inserted_rows': 1, 'num_updated_rows': 0, 'num_deleted_rows': 0}
>>> dataset.to_table().sort_by("a").to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  z

您不需要提供所有列。如果只想更新部分列,可以省略不想更新的列。被省略的列在更新时会保留现有值,如果是插入操作则会设为null。

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"], \
...                   "c": ["x", "y", "z"]})
>>> dataset = lance.write_dataset(table, "example3")
>>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
>>> # Perform an "upsert" operation, only updating column "a"
>>> dataset.merge_insert("a")     \
...             .when_matched_update_all()     \
...             .when_not_matched_insert_all() \
...             .execute(new_table)
{'num_inserted_rows': 1, 'num_updated_rows': 2, 'num_deleted_rows': 0}
>>> dataset.to_table().sort_by("a").to_pandas()
   a  b     c
0  1  a     x
1  2  x     y
2  3  y     z
3  4  z  None
migrate_manifest_paths_v2()

将清单路径迁移到新格式。

这将更新清单以使用新的v2格式路径。

该函数具有幂等性,可以多次运行而不会改变对象存储的状态。

警告:当其他并发操作正在进行时,不应运行此操作。 并且该操作应在完成后再恢复其他操作。

property optimize : DatasetOptimizer
property partition_expression

未实现(仅覆盖pyarrow数据集以防止段错误)

prewarm_index(name: str)

预热索引

这将把整个索引加载到内存中。这有助于避免索引查询时的冷启动问题。如果索引无法放入索引缓存中,则会导致I/O浪费。

Parameters:
name : str

需要预热的索引名称。

replace_field_metadata(field_name: str, new_metadata: dict[str, str])

替换模式中某个字段的元数据

Parameters:
field_name : str

要替换元数据的字段名称

new_metadata : dict

要设置的新元数据

replace_schema(schema: 模式/架构)

未实现(仅覆盖pyarrow数据集以防止段错误)

参见 :py:method:`replace_schema_metadata`:py:method:`replace_field_metadata`

replace_schema_metadata(new_metadata: dict[str, str])

替换数据集的结构元数据

Parameters:
new_metadata : dict

要设置的新元数据

restore()

将当前检出的版本恢复为数据集的最新版本。

这将创建一个新的提交。

sample(num_rows: int, columns: list[str] | dict[str, str] | None = None, randomize_order: bool = True, **kwargs) 表格

随机选取数据样本

Parameters:
num_rows : int

要检索的行数

columns : list of str, or dict of str to str default None

要获取的列名列表。 或者列名到SQL表达式的字典。 如果为None或未指定,则获取所有列。

**kwargs : dict, optional

完整参数描述请参见scanner()方法。

Returns:

表格

Return type:

表格

scanner(columns: list[str] | dict[str, str] | None = None, filter: 表达式 | str | None = None, limit: int | None = None, offset: int | None = None, nearest: dict | None = None, batch_size: int | None = None, batch_readahead: int | None = None, fragment_readahead: int | None = None, scan_in_order: bool | None = None, 片段: Iterable[LanceFragment] | None = None, full_text_query: str | dict | FullTextQuery | None = None, *, prefilter: bool | None = None, with_row_id: bool | None = None, with_row_address: bool | None = None, use_stats: bool | None = None, fast_search: bool | None = None, io_buffer_size: int | None = None, late_materialization: bool | list[str] | None = None, use_scalar_index: bool | None = None, include_deleted_rows: bool | None = None, scan_stats_callback: Callable[[ScanStatistics], None] | None = None, strict_batch_size: bool | None = None) LanceScanner

返回一个支持各种下推操作的Scanner。

Parameters:
columns : list of str, or dict of str to str default None

要获取的列名列表。 或者列名到SQL表达式的字典。 如果为None或未指定,则获取所有列。

filter : pa.compute.Expression or str

表达式或字符串,必须是一个有效的SQL where子句。有关有效的SQL表达式,请参阅 Lance filter pushdown

limit : int, default None

最多获取这么多行。如果为None或未指定,则获取所有行。

offset : int, default None

从这一行开始获取。如果未指定则为0。

nearest : dict, default None

获取与K个最相似向量对应的行。示例:

{
    "column": <embedding col name>,
    "q": <query vector as pa.Float32Array>,
    "k": 10,
    "minimum_nprobes": 20,
    "maximum_nprobes": 50,
    "refine_factor": 1
}

batch_size : int, default None

返回批次的目标大小。在某些情况下,批次大小可能达到此值的两倍(但绝不会超过)。而在其他情况下,批次大小可能小于此值。

io_buffer_size : int, default None

IO缓冲区的大小。有关更多信息,请参阅ScannerBuilder.io_buffer_size

batch_readahead : int, optional

预读取的批次数量。

fragment_readahead : int, optional

预读取的片段数量。

scan_in_order : bool, default True

是否按顺序读取片段和批次。如果为false,吞吐量可能会更高,但批次将无序返回,内存使用可能会增加。

fragments : iterable of LanceFragment, default None

如果指定,则仅扫描这些片段。如果scan_in_order为True,那么片段将按照给定的顺序进行扫描。

prefilter : bool, default False

如果为True,则在运行向量查询之前应用过滤器。 这将生成更准确的结果,但查询成本可能更高。 通常在过滤器具有高选择性时效果良好。

如果为False,则在运行向量查询之后应用过滤器。 这样性能会更好,但如果最接近查询的行不匹配过滤器, 结果可能会少于请求的行数(或为空)。 通常在过滤器选择性不高时效果良好。

use_scalar_index : bool, default True

Lance会自动使用标量索引来优化查询。在某些极端情况下,这可能会使查询性能变差,此时可以通过该参数禁用标量索引。

late_materialization : bool or List[str], default None

允许自定义控制延迟物化。延迟物化会在过滤操作后通过take操作获取非查询列,这在结果较少或列数据量非常大时非常有用。

当结果较多或列数据非常窄时,早期物化可能更优。

如果设为True,则所有列都采用延迟物化; 如果设为False,则所有列都采用早期物化; 如果是字符串列表,则只有列表中的列会延迟物化。

默认采用启发式算法,假设过滤器会筛选出约0.1%的行。如果您的过滤器选择性更强(例如按id查找),可能需要设为True;如果过滤器选择性较弱(例如匹配20%的行),可能需要设为False。

full_text_query : str or dict, optional

要搜索的查询字符串,结果将按BM25算法排序。 例如"hello world"会匹配包含"hello"或"world"的文档。 或者使用包含以下键的字典:

  • columns: list[str]

    要搜索的列, 目前columns列表中仅支持单个列。

  • query: str

    要搜索的查询字符串。

如果为True,则搜索将仅在索引数据上执行,这样可以缩短搜索时间。

scan_stats_callback : Callable[[ScanStatistics], None], default None

一个回调函数,在扫描完成后将调用该函数并传入扫描统计信息。回调函数引发的错误将被记录但不会重新抛出。

include_deleted_rows : bool, default False

如果为True,则已被删除但仍存在于片段中的行将被返回。这些行的_rowid列将被设为null。所有其他列将反映磁盘上存储的值,可能不为null。

注意:如果是搜索操作或take操作(包括标量索引扫描),则无法返回已删除的行。

注意

目前,如果同时指定了filter和nearest,那么:

  1. nearest 会优先执行。

  2. 结果会在之后进行过滤。

为了调试近似最近邻(ANN)结果,您可以选择即使存在索引也不使用它,只需指定use_index=False。例如,以下代码将始终返回精确的KNN结果:

dataset.to_table(nearest={
    "column": "vector",
    "k": 10,
    "q": <query vector>,
    "use_index": False
}
property schema : 模式/架构

该数据集的pyarrow模式

session() _Session

返回数据集会话,该会话保存了数据集的状态。

property stats : LanceStats

实验性API

property tags : 标签

数据集标签管理。

与Git类似,标签是一种向数据集特定版本添加元数据的方式。

警告

带标签的版本不受cleanup_old_versions()清理过程的影响。

要删除已标记的版本,您必须首先delete()关联的标记。

示例

ds = lance.open("dataset.lance")
ds.tags.create("v2-prod-20250203", 10)

tags = ds.tags.list()
take(索引: list[int] | 数组, columns: list[str] | dict[str, str] | None = None) 表格

按索引选择数据行。

Parameters:
indices : Array or array-like

数据集中要选择的行索引。

columns : list of str, or dict of str to str default None

要获取的列名列表。 或者列名到SQL表达式的字典。 如果为None或未指定,则获取所有列。

Returns:

表格

Return type:

pyarrow.Table

take_blobs(blob_column: str, ids: list[int] | 数组 | None = None, 地址: list[int] | 数组 | None = None, 索引: list[int] | 数组 | None = None) list[BlobFile]

按行ID选择二进制大对象。

无需在处理前将大型二进制blob数据加载到内存中,该API允许您将二进制blob数据作为常规的Python类文件对象打开。更多详情请参阅lance.BlobFile

必须且只能指定ids、addresses或indices中的一个参数。 :param blob_column: 要选择的blob列名称。 :type blob_column: str :param ids: 数据集中要选择的行ID。 :type ids: Integer Array或类似数组 :param addresses: 数据集中要选择行的(不稳定)内存地址。 :type addresses: Integer Array或类似数组 :param indices: 数据集中行的偏移量/索引。 :type indices: Integer Array或类似数组

Returns:

blob_files

Return type:

列表[BlobFile]

to_batches(columns: list[str] | dict[str, str] | None = None, filter: 表达式 | str | None = None, limit: int | None = None, offset: int | None = None, nearest: dict | None = None, batch_size: int | None = None, batch_readahead: int | None = None, fragment_readahead: int | None = None, scan_in_order: bool | None = None, *, prefilter: bool | None = None, with_row_id: bool | None = None, with_row_address: bool | None = None, use_stats: bool | None = None, full_text_query: str | dict | None = None, io_buffer_size: int | None = None, late_materialization: bool | list[str] | None = None, use_scalar_index: bool | None = None, strict_batch_size: bool | None = None, **kwargs) Iterator[RecordBatch]

将数据集读取为物化的记录批次。

Parameters:
**kwargs : dict, optional

scanner()的参数。

Returns:

record_batches

Return type:

RecordBatch的迭代器

to_table(columns: list[str] | dict[str, str] | None = None, filter: 表达式 | str | None = None, limit: int | None = None, offset: int | None = None, nearest: dict | None = None, batch_size: int | None = None, batch_readahead: int | None = None, fragment_readahead: int | None = None, scan_in_order: bool | None = None, *, prefilter: bool | None = None, with_row_id: bool | None = None, with_row_address: bool | None = None, use_stats: bool | None = None, fast_search: bool | None = None, full_text_query: str | dict | FullTextQuery | None = None, io_buffer_size: int | None = None, late_materialization: bool | list[str] | None = None, use_scalar_index: bool | None = None, include_deleted_rows: bool | None = None) 表格

将数据作为pyarrow.Table读入内存

Parameters:
columns : list of str, or dict of str to str default None

要获取的列名列表。 或者列名到SQL表达式的字典。 如果为None或未指定,则获取所有列。

filter : pa.compute.Expression or str

表达式或字符串,必须是一个有效的SQL where子句。有关有效的SQL表达式,请参阅 Lance filter pushdown

limit : int, default None

最多获取这么多行。如果为None或未指定,则获取所有行。

offset : int, default None

从这一行开始获取。如果未指定则为0。

nearest : dict, default None

获取与K个最相似向量对应的行。示例:

{
    "column": <embedding col name>,
    "q": <query vector as pa.Float32Array>,
    "k": 10,
    "metric": "cosine",
    "minimum_nprobes": 20,
    "maximum_nprobes": 50,
    "refine_factor": 1
}

batch_size : int, optional

每次读取的行数。

io_buffer_size : int, default None

IO缓冲区的大小。有关更多信息,请参阅ScannerBuilder.io_buffer_size

batch_readahead : int, optional

预读取的批次数量。

fragment_readahead : int, optional

预读取的片段数量。

scan_in_order : bool, optional, default True

是否按顺序读取片段和批次。如果为false,吞吐量可能会更高,但批次将无序返回,内存使用可能会增加。

prefilter : bool, optional, default False

在向量搜索之前运行过滤器。

late_materialization : bool or List[str], default None

允许自定义控制延迟物化。更多信息请参阅 ScannerBuilder.late_materialization

use_scalar_index : bool, default True

允许自定义控制标量索引的使用。更多信息请参见 ScannerBuilder.use_scalar_index

with_row_id : bool, optional, default False

返回行ID。

with_row_address : bool, optional, default False

返回行地址

use_stats : bool, optional, default True

在过滤过程中使用统计下推。

full_text_query : str or dict, optional

用于搜索的查询字符串,结果将按BM25算法排序。 例如:"hello world"会匹配包含"hello"或"world"的文档。 或者使用包含以下键的字典:

  • columns: list[str]

    要搜索的列, 目前仅支持在列列表中指定单个列。

  • query: str

    要搜索的查询字符串。

include_deleted_rows : bool, optional, default False

如果为True,则已被删除但仍存在于片段中的行将被返回。这些行的_rowid列将被设为null。所有其他列将反映磁盘上存储的值,可能不为null。

注意:如果是搜索操作或take操作(包括标量索引扫描),则无法返回已删除的行。

笔记

如果同时指定了filter和nearest,那么:

  1. nearest 会优先执行。

  2. 除非将pre-filter设置为True,否则结果会在之后进行过滤。

update(更新: dict[str, str], where: str | None = None) UpdateResult

更新符合where条件的行的列值。

Parameters:
updates : dict of str to str

列名到SQL表达式的映射关系。

where : str, optional

一个SQL谓词,指示应更新哪些行。

Returns:

updates – 包含更新行数的字典。

Return type:

字典

示例

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> update_stats = dataset.update(dict(a = 'a + 2'), where="b != 'a'")
>>> update_stats["num_updated_rows"] = 2
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  4  b
2  5  c
property uri : str

数据的位置

validate()

验证数据集。

此操作会检查数据集的完整性,如果数据集已损坏则会抛出异常。

property version : int

返回当前检出的数据集版本

versions()

返回此数据集中的所有版本。

class lance.LanceFragment(dataset: LanceDataset, fragment_id: int | None, *, 片段: _Fragment | None = None)
count_rows(self, 表达式 filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, bool cache_metadata=True, MemoryPool memory_pool=None)

统计符合扫描器筛选条件的行数。

Parameters:
filter : Expression, default None

Scan 将仅返回与筛选条件匹配的行。 如果可能,谓词将被下推以利用数据源中的分区信息或内部元数据,例如 Parquet 统计信息。否则在生成 RecordBatches 之前会对加载的数据进行筛选。

batch_size : int, default 131_072

扫描记录批次的最大行数。如果扫描的记录批次超出内存限制,可以调用此方法来减小其大小。

batch_readahead : int, default 16

文件中预读取的批次数量。这可能不适用于所有文件格式。增加此数值将提高内存使用量,但也可能提升IO利用率。

fragment_readahead : int, default 4

预读取的文件数量。增加此数值将提高内存使用率,但也可能提升IO利用率。

fragment_scan_options : FragmentScanOptions, default None

特定于某次扫描和片段类型的选项,这些选项在同个数据集的不同扫描之间可能会发生变化。

use_threads : bool, default True

如果启用,则将根据可用CPU核心数量使用最大并行度。

cache_metadata : bool, default True

如果启用,扫描时可能会缓存元数据以加速重复扫描。

memory_pool : MemoryPool, default None

如需内存分配,可在此指定。若未指定,则使用默认池。

Returns:

计数

Return type:

int

static create(dataset_uri: str | Path, 数据: ReaderLike, fragment_id: int | None = None, schema: pa.Schema | None = None, max_rows_per_group: int = 1024, 进度: FragmentWriteProgress | None = None, mode: str = 'append', *, data_storage_version: str | None = None, use_legacy_format: bool | None = None, storage_options: dict[str, str] | None = None) FragmentMetadata

从给定数据创建FragmentMetadata

如果数据集尚未创建,可以使用此功能。

警告

内部API。该方法不面向终端用户使用。

Parameters:
dataset_uri : str

数据集的URI。

fragment_id : int

片段的ID。

data : pa.Table or pa.RecordBatchReader

要写入片段的数据。

schema : pa.Schema, optional

数据的模式。如果未指定,将从数据中推断模式。

max_rows_per_group : int, default 1024

数据文件中每个分组的最大行数。

progress : FragmentWriteProgress, optional

实验性API。用于跟踪片段写入进度的功能。传入一个自定义类,该类需定义在开始写入每个片段和完成写入时调用的钩子函数。

mode : str, default "append"

写入模式。如果指定为“append”,数据将与现有数据集的模式进行校验。否则,传递“create”或“overwrite”将为模式分配新的字段ID。

data_storage_version : optional, str, default None

要使用的数据存储格式版本。较新的版本效率更高,但需要较新版本的lance才能读取。默认值(None)将使用最新的稳定版本。更多详情请参阅用户指南。

use_legacy_format : bool, default None

已弃用的参数。请改用 data_storage_version。

storage_options : optional, dict

针对特定存储连接的额外选项。这用于存储连接参数,如凭证、端点等。

另请参阅

lance.dataset.LanceOperation.Overwrite

该操作用于创建一个新数据集或覆盖现有数据集,使用此API生成的片段。有关使用此API的示例,请参阅文档页面。

lance.dataset.LanceOperation.Append

该操作用于将通过此API创建的片段追加到现有数据集中。查看文档页面了解使用此API的示例。

Return type:

FragmentMetadata

static create_from_file(文件名: str, dataset: LanceDataset, fragment_id: int) FragmentMetadata

从给定的数据文件URI创建片段。

如果数据文件从数据集中丢失,可以使用此功能。

警告

内部API。该方法不面向终端用户使用。

Parameters:
filename : str

数据文件的文件名。

dataset : LanceDataset

该片段所属的数据集。

fragment_id : int

片段的ID。

data_files()

返回此片段的数据文件。

delete(predicate: str) FragmentMetadata | None

从此片段中删除行。

这将添加或更新该片段的删除文件。它不会修改或删除该片段的数据文件。如果删除后没有剩余行,该方法将返回None。

警告

内部API。该方法不面向终端用户使用。

Parameters:
predicate : str

指定要删除行的SQL谓词。

Returns:

包含新删除文件的新片段,如果没有剩余行则为None。

Return type:

FragmentMetadata 或 None

示例

>>> import lance
>>> import pyarrow as pa
>>> tab = pa.table({"a": [1, 2, 3], "b": [4, 5, 6]})
>>> dataset = lance.write_dataset(tab, "dataset")
>>> frag = dataset.get_fragment(0)
>>> frag.delete("a > 1")
FragmentMetadata(id=0, files=[DataFile(path='...', fields=[0, 1], ...), ...)
>>> frag.delete("a > 0") is None
True

另请参阅

lance.dataset.LanceOperation.Delete

用于将这些更改提交到数据集的操作。有关使用此API的示例,请参阅文档页面。

deletion_file()

返回删除文件(如果有的话)

property fragment_id
head(self, int num_rows, columns=None, 表达式 filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, bool cache_metadata=True, MemoryPool memory_pool=None)

加载片段的前N行。

Parameters:
num_rows : int

要加载的行数。

columns : list of str, default None

要投影的列。这可以是一个包含列名的列表(顺序和重复项将被保留),或者一个字典,其中包含{新列名: 表达式}值用于更高级的投影。

列或表达式的列表可以使用特殊字段: __batch_index(批次在片段内的索引), __fragment_index(片段在数据集内的索引), __last_in_fragment(批次是否是片段中的最后一个),以及 __filename(源文件名或源片段的描述)。

这些列将被传递到数据集和相应的数据片段,以避免加载、复制和反序列化计算链后续不需要的列。 默认情况下,所有可用列都会被投影。如果引用的列名在数据集的Schema中不存在,则会引发异常。

filter : Expression, default None

Scan将仅返回与筛选条件匹配的行。 如果可能,谓词将被下推以利用数据源中的分区信息或内部元数据,例如Parquet统计信息。否则在生成RecordBatches之前会对已加载的记录批次进行过滤。

batch_size : int, default 131_072

扫描记录批次的最大行数。如果扫描的记录批次超出内存限制,可以调用此方法来减小其大小。

batch_readahead : int, default 16

文件中预读取的批次数量。这可能不适用于所有文件格式。增加此数值将提高内存使用量,但也可能提升IO利用率。

fragment_readahead : int, default 4

预读取的文件数量。增加此数值将提高内存使用率,但也可能提升IO利用率。

fragment_scan_options : FragmentScanOptions, default None

特定于某次扫描和片段类型的选项,这些选项在同个数据集的不同扫描之间可能会发生变化。

use_threads : bool, default True

如果启用,则将根据可用CPU核心数量使用最大并行度。

cache_metadata : bool, default True

如果启用,扫描时可能会缓存元数据以加速重复扫描。

memory_pool : MemoryPool, default None

如需内存分配,可在此指定。若未指定,则使用默认池。

Return type:

表格

merge(data_obj: ReaderLike, left_on: str, right_on: str | None = None, schema=None) tuple[FragmentMetadata, LanceSchema]

将另一个数据集合并到此片段中。

执行左连接操作,其中片段作为左表,data_obj作为右表。存在于数据集中但不在左表的行将被填充为null值,除非Lance不支持某些类型的null值,这种情况下会抛出错误。

Parameters:
data_obj : Reader-like

要合并的数据。可接受的类型包括: - Pandas DataFrame、Pyarrow Table、Dataset、Scanner、 Iterator[RecordBatch] 或 RecordBatchReader

left_on : str

数据集中用于连接的列名。

right_on : str or None

数据对象中用于连接的列名。如果为None,则默认为left_on。

示例

>>> import lance
>>> import pyarrow as pa
>>> df = pa.table({'x': [1, 2, 3], 'y': ['a', 'b', 'c']})
>>> dataset = lance.write_dataset(df, "dataset")
>>> dataset.to_table().to_pandas()
   x  y
0  1  a
1  2  b
2  3  c
>>> fragments = dataset.get_fragments()
>>> new_df = pa.table({'x': [1, 2, 3], 'z': ['d', 'e', 'f']})
>>> merged = []
>>> schema = None
>>> for f in fragments:
...     f, schema = f.merge(new_df, 'x')
...     merged.append(f)
>>> merge = lance.LanceOperation.Merge(merged, schema)
>>> dataset = lance.LanceDataset.commit("dataset", merge, read_version=1)
>>> dataset.to_table().to_pandas()
   x  y  z
0  1  a  d
1  2  b  e
2  3  c  f

另请参阅

LanceDataset.merge_columns

向此片段添加列。

Returns:

包含合并列和最终模式的新片段。

Return type:

元组[FragmentMetadata, LanceSchema]

merge_columns(value_func: dict[str, str] | BatchUDF | ReaderLike | collections.abc.Callable[[pa.RecordBatch], pa.RecordBatch], columns: list[str] | None = None, batch_size: int | None = None, reader_schema: pa.Schema | None = None) tuple[FragmentMetadata, LanceSchema]

向此片段添加列。

警告

内部API。该方法不面向终端用户使用。

参数及其解释与lance.dataset.LanceDataset.add_columns()操作中的相同。

唯一的区别在于,不是修改数据集,而是创建一个新的片段。新片段的模式也会被返回。这些可以在后续操作中用于将更改提交到数据集。

另请参阅

lance.dataset.LanceOperation.Merge

用于将这些更改提交到数据集的操作。有关使用此API的示例,请参阅文档页面。

Returns:

一个包含新增列和最终模式的新片段。

Return type:

元组[FragmentMetadata, LanceSchema]

property metadata : FragmentMetadata

返回此片段的元数据。

Return type:

FragmentMetadata

property num_deletions : int

返回此片段中已删除的行数。

property partition_expression : 模式/架构

一个表达式,对于该片段查看的所有数据评估为真。

property physical_rows : int

返回此片段中原始的行数。

要获取删除后的行数,请改用count_rows()

property physical_schema : 模式/架构

返回此片段的物理模式。该模式可能与数据集读取模式不同。

scanner(*, columns: list[str] | dict[str, str] | None = None, batch_size: int | None = None, filter: str | pa.compute.Expression | None = None, limit: int | None = None, offset: int | None = None, with_row_id: bool = False, with_row_address: bool = False, batch_readahead: int = 16) LanceScanner

详情请参阅 Dataset::scanner

property schema : 模式/架构

返回此片段的模式。

take(self, 索引, columns=None, 表达式 filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, bool cache_metadata=True, MemoryPool memory_pool=None)

按索引选择数据行。

Parameters:
indices : Array or array-like

数据集中要选择的行索引。

columns : list of str, default None

要投影的列。这可以是要包含的列名列表(顺序和重复项将保留),或者用于更高级投影的{新列名: 表达式}字典。

列或表达式列表可以使用特殊字段: __batch_index(批次在片段内的索引), __fragment_index(片段在数据集内的索引), __last_in_fragment(批次是否为片段中的最后一个),以及 __filename(源文件名或源片段的描述)。

这些列将被传递到数据集和相应的数据片段,以避免加载、复制和反序列化计算链后续不需要的列。 默认情况下会投影所有可用列。如果引用的任何列名在数据集的Schema中不存在,则会引发异常。

filter : Expression, default None

Scan 将仅返回与筛选条件匹配的行。 如果可能,谓词将被下推以利用数据源中的分区信息或内部元数据,例如 Parquet 统计信息。否则在生成 RecordBatches 之前会对加载的数据进行筛选。

batch_size : int, default 131_072

扫描记录批次的最大行数。如果扫描的记录批次超出内存限制,可以调用此方法来减小其大小。

batch_readahead : int, default 16

文件中预读取的批次数量。这可能不适用于所有文件格式。增加此数值将提高内存使用量,但也可能提升IO利用率。

fragment_readahead : int, default 4

预读取的文件数量。增加此数值将提高内存使用率,但也可能提升IO利用率。

fragment_scan_options : FragmentScanOptions, default None

特定于某次扫描和片段类型的选项,这些选项在同个数据集的不同扫描之间可能会发生变化。

use_threads : bool, default True

如果启用,则将根据可用CPU核心数量使用最大并行度。

cache_metadata : bool, default True

如果启用,扫描时可能会缓存元数据以加速重复扫描。

memory_pool : MemoryPool, default None

如需内存分配,可在此指定。若未指定,则使用默认池。

Return type:

表格

to_batches(self, Schema schema=None, columns=None, 表达式 filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, bool cache_metadata=True, MemoryPool memory_pool=None)

将片段读取为物化的记录批次。

Parameters:
schema : Schema, optional

用于扫描的具体模式。

columns : list of str, default None

要投影的列。这可以是一个包含列名的列表(顺序和重复项将被保留),或者一个字典,其中包含{新列名: 表达式}值用于更高级的投影。

列或表达式的列表可以使用特殊字段: __batch_index(批次在片段内的索引), __fragment_index(片段在数据集内的索引), __last_in_fragment(批次是否为片段中的最后一个),以及 __filename(源文件名或源片段的描述)。

这些列将被传递到数据集和相应的数据片段,以避免加载、复制和反序列化计算链后续不需要的列。 默认情况下会投影所有可用列。如果引用的任何列名在数据集的Schema中不存在,则会引发异常。

filter : Expression, default None

Scan 将仅返回与筛选条件匹配的行。 如果可能,谓词将被下推以利用数据源中的分区信息或内部元数据,例如 Parquet 统计信息。否则在生成 RecordBatches 之前会对加载的数据进行筛选。

batch_size : int, default 131_072

扫描记录批次的最大行数。如果扫描的记录批次超出内存限制,可以调用此方法来减小其大小。

batch_readahead : int, default 16

文件中预读取的批次数量。这可能不适用于所有文件格式。增加此数值将提高内存使用量,但也可能提升IO利用率。

fragment_readahead : int, default 4

预读取的文件数量。增加此数值将提高内存使用率,但也可能提升IO利用率。

fragment_scan_options : FragmentScanOptions, default None

特定于某次扫描和片段类型的选项,这些选项在同个数据集的不同扫描之间可能会发生变化。

use_threads : bool, default True

如果启用,则将根据可用CPU核心数量使用最大并行度。

cache_metadata : bool, default True

如果启用,扫描时可能会缓存元数据以加速重复扫描。

memory_pool : MemoryPool, default None

如需内存分配,可在此指定。若未指定,则使用默认池。

Returns:

record_batches

Return type:

RecordBatch的迭代器

to_table(self, 模式 schema=None, columns=None, 表达式 filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, bool cache_metadata=True, MemoryPool memory_pool=None)

将此片段转换为表格。

请谨慎使用此便捷工具。它会在创建表之前,将扫描结果序列化并全部加载到内存中。

Parameters:
schema : Schema, optional

用于扫描的具体模式。

columns : list of str, default None

要投影的列。这可以是一个要包含的列名列表(顺序和重复项将被保留),或者是一个字典,包含{新列名: 表达式}值用于更高级的投影。

列或表达式列表可以使用特殊字段: __batch_index(批次在片段内的索引), __fragment_index(片段在数据集内的索引), __last_in_fragment(批次是否为片段中的最后一个),以及 __filename(源文件名或源片段的描述)。

这些列将被传递到数据集和相应的数据片段,以避免加载、复制和反序列化计算链后续不需要的列。 默认情况下会投影所有可用列。如果引用的列名在数据集的Schema中不存在,则会引发异常。

filter : Expression, default None

Scan将仅返回与筛选条件匹配的行。 如果可能,谓词将被下推以利用数据源中的分区信息或内部元数据,例如Parquet统计信息。否则在生成RecordBatches之前会对已加载的记录批次进行过滤。

batch_size : int, default 131_072

扫描记录批次的最大行数。如果扫描的记录批次超出内存限制,可以调用此方法来减小其大小。

batch_readahead : int, default 16

文件中预读取的批次数量。这可能不适用于所有文件格式。增加此数值将提高内存使用量,但也可能提升IO利用率。

fragment_readahead : int, default 4

预读取的文件数量。增加此数值将提高内存使用率,但也可能提升IO利用率。

fragment_scan_options : FragmentScanOptions, default None

特定于某次扫描和片段类型的选项,这些选项在同个数据集的不同扫描之间可能会发生变化。

use_threads : bool, default True

如果启用,则将根据可用CPU核心数量使用最大并行度。

cache_metadata : bool, default True

如果启用,扫描时可能会缓存元数据以加速重复扫描。

memory_pool : MemoryPool, default None

如需内存分配,可在此指定。若未指定,则使用默认池。

Returns:

表格

Return type:

表格

class lance.LanceOperation
class Append(片段: Iterable[FragmentMetadata])

向数据集追加新行。

fragments

包含新行的片段。

Type:

列表[FragmentMetadata]

警告

这是一个用于分布式操作的高级API。若要在单台机器上向数据集追加数据,请使用lance.write_dataset()

示例

要向数据集追加新行,首先使用 lance.fragment.LanceFragment.create()创建片段。然后 将片段元数据收集到列表中并传递给此类。 最后,将该操作传递给LanceDataset.commit() 方法来创建新数据集。

>>> import lance
>>> import pyarrow as pa
>>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]})
>>> dataset = lance.write_dataset(tab1, "example")
>>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]})
>>> fragment = lance.fragment.LanceFragment.create("example", tab2)
>>> operation = lance.LanceOperation.Append([fragment])
>>> dataset = lance.LanceDataset.commit("example", operation,
...                                     read_version=dataset.version)
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
fragments : Iterable[FragmentMetadata]
class BaseOperation

可应用于数据集的操作基类。

请参阅LanceOperation下的可用操作。

class CreateIndex(uuid: str, name: str, 字段: list[int], dataset_version: int, fragment_ids: set[int], index_version: int)

在数据集上创建索引的操作。

dataset_version : int
fields : List[int]
fragment_ids : Set[int]
index_version : int
name : str
uuid : str
class DataReplacement(replacements: list[DataReplacementGroup])

该操作用于替换数据集中的现有数据文件。

replacements : List[DataReplacementGroup]
class DataReplacementGroup(fragment_id: int, new_file: 数据文件)

数据替换组

fragment_id : int
new_file : 数据文件
class Delete(updated_fragments: Iterable[FragmentMetadata], deleted_fragment_ids: Iterable[int], predicate: str)

从数据集中移除片段或行。

updated_fragments

已更新为包含新删除向量的片段。

Type:

列表[FragmentMetadata]

deleted_fragment_ids

已完全删除的片段ID。这些是调用LanceFragment.delete()返回None的片段。

Type:

整数列表

predicate

用于选择要删除行的原始SQL谓词。

Type:

字符串

警告

这是一个用于分布式操作的高级API。若要在单台机器上从数据集中删除行,请使用lance.LanceDataset.delete()

示例

要从数据集中删除行,请在每个片段上调用lance.fragment.LanceFragment.delete()。如果返回一个新片段,则将其添加到updated_fragments列表中。如果返回None,则表示整个片段已被删除,因此将该片段ID添加到deleted_fragment_ids中。最后,将该操作传递给LanceDataset.commit()方法以完成删除操作。

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2], "b": ["a", "b"]})
>>> dataset = lance.write_dataset(table, "example")
>>> table = pa.table({"a": [3, 4], "b": ["c", "d"]})
>>> dataset = lance.write_dataset(table, "example", mode="append")
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
>>> predicate = "a >= 2"
>>> updated_fragments = []
>>> deleted_fragment_ids = []
>>> for fragment in dataset.get_fragments():
...     new_fragment = fragment.delete(predicate)
...     if new_fragment is not None:
...         updated_fragments.append(new_fragment)
...     else:
...         deleted_fragment_ids.append(fragment.fragment_id)
>>> operation = lance.LanceOperation.Delete(updated_fragments,
...                                         deleted_fragment_ids,
...                                         predicate)
>>> dataset = lance.LanceDataset.commit("example", operation,
...                                     read_version=dataset.version)
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
deleted_fragment_ids : Iterable[int]
predicate : str
updated_fragments : Iterable[FragmentMetadata]
class Merge(片段: Iterable[FragmentMetadata], schema: LanceSchema | 模式/架构)

添加列的操作。与覆盖不同,此操作不应改变片段的结构,从而保留现有的索引。

fragments

构成新数据集的片段。

Type:

FragmentMetadata的可迭代对象

schema

新数据集的模式。建议传递LanceSchema,传递pyarrow.Schema已弃用。

Type:

LanceSchema 或 pyarrow.Schema

警告

这是一个用于分布式操作的高级API。若要在单机上覆盖或创建新数据集,请使用lance.write_dataset()

示例

要向数据集添加新列,首先定义一个方法,该方法将基于现有列创建新列。然后使用lance.fragment.LanceFragment.add_columns()

>>> import lance
>>> import pyarrow as pa
>>> import pyarrow.compute as pc
>>> table = pa.table({"a": [1, 2, 3, 4], "b": ["a", "b", "c", "d"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
>>> def double_a(batch: pa.RecordBatch) -> pa.RecordBatch:
...     doubled = pc.multiply(batch["a"], 2)
...     return pa.record_batch([doubled], ["a_doubled"])
>>> fragments = []
>>> for fragment in dataset.get_fragments():
...     new_fragment, new_schema = fragment.merge_columns(double_a,
...                                                       columns=['a'])
...     fragments.append(new_fragment)
>>> operation = lance.LanceOperation.Merge(fragments, new_schema)
>>> dataset = lance.LanceDataset.commit("example", operation,
...                                     read_version=dataset.version)
>>> dataset.to_table().to_pandas()
   a  b  a_doubled
0  1  a          2
1  2  b          4
2  3  c          6
3  4  d          8
fragments : Iterable[FragmentMetadata]
schema : LanceSchema | 模式/架构
class Overwrite(new_schema: LanceSchema | 模式/架构, 片段: Iterable[FragmentMetadata])

覆盖或创建新的数据集。

new_schema

新数据集的模式结构。

Type:

pyarrow.Schema

fragments

构成新数据集的片段。

Type:

列表[FragmentMetadata]

警告

这是一个用于分布式操作的高级API。若要在单机上覆盖或创建新数据集,请使用lance.write_dataset()

示例

要创建或覆盖数据集,首先使用 lance.fragment.LanceFragment.create()创建片段。然后将这些片段的元数据收集到列表中,并与模式一起传递给这个类。最后,将该操作传递给 LanceDataset.commit()方法来创建新数据集。

>>> import lance
>>> import pyarrow as pa
>>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]})
>>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]})
>>> fragment1 = lance.fragment.LanceFragment.create("example", tab1)
>>> fragment2 = lance.fragment.LanceFragment.create("example", tab2)
>>> fragments = [fragment1, fragment2]
>>> operation = lance.LanceOperation.Overwrite(tab1.schema, fragments)
>>> dataset = lance.LanceDataset.commit("example", operation)
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
fragments : Iterable[FragmentMetadata]
new_schema : LanceSchema | 模式/架构
class Project(schema: LanceSchema)

用于投影列的操作。 可使用此运算符删除列或重命名/交换列。

schema

新数据集的lance模式。

Type:

LanceSchema

示例

使用投影运算符交换列:

>>> import lance
>>> import pyarrow as pa
>>> import pyarrow.compute as pc
>>> from lance.schema import LanceSchema
>>> table = pa.table({"a": [1, 2], "b": ["a", "b"], "b1": ["c", "d"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.to_table().to_pandas()
   a  b b1
0  1  a  c
1  2  b  d
>>>
>>> ## rename column `b` into `b0` and rename b1 into `b`
>>> table = pa.table({"a": [3, 4], "b0": ["a", "b"], "b": ["c", "d"]})
>>> lance_schema = LanceSchema.from_pyarrow(table.schema)
>>> operation = lance.LanceOperation.Project(lance_schema)
>>> dataset = lance.LanceDataset.commit("example", operation, read_version=1)
>>> dataset.to_table().to_pandas()
   a b0  b
0  1  a  c
1  2  b  d
schema : LanceSchema
class Restore(版本: int)

恢复数据集先前版本的操作。

version : int
class Rewrite(groups: Iterable[RewriteGroup], rewritten_indices: Iterable[RewrittenIndex])

将操作重写为一个或多个文件和索引合并为一个或多个文件和索引。

groups

已被重写的文件组。

Type:

列表[RewriteGroup]

rewritten_indices

已被重写的索引。

Type:

列表[RewrittenIndex]

警告

这是一个高级API,不适用于一般用途。

groups : Iterable[RewriteGroup]
rewritten_indices : Iterable[RewrittenIndex]
class RewriteGroup(old_fragments: Iterable[FragmentMetadata], new_fragments: Iterable[FragmentMetadata])

重写文件的集合

new_fragments : Iterable[FragmentMetadata]
old_fragments : Iterable[FragmentMetadata]
class RewrittenIndex(old_id: str, new_id: str)

一个已被重写的索引

new_id : str
old_id : str
class Update(removed_fragment_ids: list[int], updated_fragments: list[FragmentMetadata], new_fragments: list[FragmentMetadata], fields_modified: list[int])

更新数据集中行的操作。

removed_fragment_ids

已完全移除的片段ID。

Type:

整数列表

updated_fragments

已更新为包含新删除向量的片段。

Type:

列表[FragmentMetadata]

new_fragments

包含新行的片段。

Type:

列表[FragmentMetadata]

fields_modified

如果在updated_fragments中有任何字段被修改,那么必须在此列出这些字段,以便从覆盖这些字段的索引中移除这些片段。

Type:

整数列表

fields_modified : List[int]
new_fragments : List[FragmentMetadata]
removed_fragment_ids : List[int]
updated_fragments : List[FragmentMetadata]
class lance.LanceScanner(scanner: _Scanner, dataset: LanceDataset)
analyze_plan() str

执行此扫描器的计划并显示运行时指标。

Parameters:
verbose : bool, default False

使用详细输出格式。

Returns:

计划

Return type:

字符串

count_rows()

统计符合扫描器筛选条件的行数。

Returns:

计数

Return type:

int

property dataset_schema : 模式/架构

从片段中读取批次时使用的模式。

explain_plan(verbose=False) str

返回此扫描器的执行计划。

Parameters:
verbose : bool, default False

使用详细输出格式。

Returns:

计划

Return type:

字符串

static from_batches(*args, **kwargs)

未实现

static from_dataset(*args, **kwargs)

未实现

static from_fragment(*args, **kwargs)

未实现

head(num_rows)

加载数据集的前N行。

Parameters:
num_rows : int

要加载的行数。

Return type:

表格

property projected_schema : 模式/架构

数据的物化模式,考虑了投影。

这是从扫描器返回的任何数据的模式。

scan_batches()

以记录批次形式消费Scanner,并附带对应的片段。

Returns:

record_batches

Return type:

TaggedRecordBatch的迭代器

take(索引)

未实现

to_batches(self)

以记录批次形式消费扫描器数据。

Returns:

record_batches

Return type:

RecordBatch的迭代器

to_reader(self)

将此扫描器作为RecordBatchReader使用。

Return type:

RecordBatchReader

to_table() 表格

将数据读入内存并返回一个pyarrow表格。

class lance.MergeInsertBuilder(dataset, 开启)
conflict_retries(max_retries: int) MergeInsertBuilder

设置操作在出现争用时的重试次数。

如果该值设置为大于0,则操作将保留输入数据的副本(根据数据大小存储在内存或磁盘上),并在出现争用时重试操作。

默认值为10。

execute(data_obj: ReaderLike, *, schema: pa.Schema | None = None)

执行合并插入操作

此函数会更新原始数据集,并返回一个包含合并统计信息的字典——例如插入、更新和删除的行数。

Parameters:
data_obj : ReaderLike

作为操作源表使用的新数据。该参数可以是任何数据源(例如表/数据集),只要write_dataset()能够接受即可。

schema : Optional[pa.Schema]

数据的模式。仅当数据源是某种生成器时才需要提供此信息。

execute_uncommitted(data_obj: ReaderLike, *, schema: pa.Schema | None = None) tuple[事务, dict[str, Any]]

执行合并插入操作但不提交

此函数会更新原始数据集,并返回一个包含合并统计信息的字典——例如插入、更新和删除的行数。

Parameters:
data_obj : ReaderLike

作为操作源表使用的新数据。该参数可以是任何数据源(例如表/数据集),只要write_dataset()能够接受即可。

schema : Optional[pa.Schema]

数据的模式。仅当数据源是某种生成器时才需要提供此信息。

retry_timeout(timeout: timedelta) MergeInsertBuilder

设置用于限制重试的超时时间。

这是放弃操作前允许的最大耗时。无论完成需要多长时间,至少会进行一次尝试。一旦达到此超时时间,后续尝试将被取消。如果在第一次尝试期间已达到超时,操作将在进行第二次尝试前立即取消。

默认值为30秒。

when_matched_update_all(condition: str | None = None) MergeInsertBuilder

配置更新匹配行的操作

调用此方法后,当执行合并插入操作时,任何同时匹配源表和目标表的行将被更新。目标表中的行将被移除,而源表中的行将被添加。

可以指定一个可选条件。这应该是一个SQL过滤器,如果存在,则只有同时满足此过滤器的匹配行才会被更新。SQL过滤器应使用前缀target.来引用目标表中的列,使用前缀source.来引用源表中的列。例如,source.last_update < target.last_update

如果指定了条件且行不满足该条件,则这些行将不会被更新。不满足筛选条件不会导致"匹配"行变为"不匹配"行。

when_not_matched_by_source_delete(expr: str | None = None) MergeInsertBuilder

配置操作以删除不匹配的源行

调用此方法后,当执行合并插入操作时,仅存在于目标表中的行将被删除。可以指定一个可选过滤器来限制删除操作的范围。如果提供了过滤器(作为SQL过滤器),则只有匹配过滤器的行会被删除。

when_not_matched_insert_all() MergeInsertBuilder

配置操作以插入不匹配的行

调用此方法后,当执行合并插入操作时,仅存在于源表中的所有行将被插入到目标表中。

class lance.ScanStatistics

关于扫描的统计信息。

bytes_read

从磁盘读取的字节数

indices_loaded

已加载的索引数量

iops

执行的IO操作数量。由于存在合并I/O的情况,该数值可能略高于实际数量

parts_loaded

已加载的索引分区数量

class lance.Transaction(读取版本: 'int', 操作: 'LanceOperation.BaseOperation', uuid: 'str' = , blobs_op: 'Optional[LanceOperation.BaseOperation]' = None)
blobs_op : BaseOperation | None = None
operation : BaseOperation
read_version : int
uuid : str
lance.batch_udf(output_schema=None, checkpoint_file=None)

创建一个用户自定义函数(UDF),用于向数据集添加列。

此函数用于向数据集添加列。它接收一个函数作为参数,该函数接受单个参数(一个RecordBatch)并返回一个RecordBatch。对于数据集中的每个批次,该函数会被调用一次。函数不应修改输入批次,而应创建一个添加了新列的新批次。

Parameters:
output_schema : Schema, optional

输出RecordBatch的模式。这用于验证函数的输出。如果未提供,则将使用第一个输出RecordBatch的模式。

checkpoint_file : str or Path, optional

如果指定,该文件将用作此UDF未保存结果的缓存。如果进程失败,并且您再次使用同一文件调用add_columns,它将从最后保存的状态恢复。这对于可能失败并需要恢复的长时间运行进程非常有用。该文件可能会变得非常大。它将在磁盘上保存多达整个数据文件大小的结果,可能达到数GB的数据量。

Return type:

AddColumnsUDF

lance.bytes_read_counter()
lance.dataset(uri: str | Path, 版本: int | str | None = None, asof: ts_types | None = None, block_size: int | None = None, commit_lock: CommitLock | None = None, index_cache_size: int | None = None, storage_options: dict[str, str] | None = None, default_scan_options: dict[str, str] | None = None) LanceDataset

从指定地址打开Lance数据集。

Parameters:
uri : str

Lance数据集的地址。可以是本地文件路径/tmp/data.lance,也可以是云对象存储URI,例如s3://bucket/data.lance

version : optional, int | str

如果指定,则加载Lance数据集的特定版本。否则,加载最新版本。可以提供一个版本号(int)或标签(str)。

asof : optional, datetime or str

如果指定此参数,则查找在给定参数值当天或之前创建的最新版本。如果已指定版本号,则忽略此参数。

block_size : optional, int

块大小(以字节为单位)。为最小I/O请求的大小提供提示。

commit_lock : optional, lance.commit.CommitLock

自定义提交锁。仅当您的对象存储不支持原子提交时才需要。详情请参阅用户指南。

index_cache_size : optional, int

索引缓存大小。索引缓存是一个带有TTL的LRU缓存。该数值指定了 在主机内存中缓存的索引页数量,例如IVF分区数。默认值为256

粗略来说,对于一个包含n行的IVF_PQ分区,每个索引页的大小等于 pq编码(nd.array([n,pq], dtype=uint8)))和行ID(nd.array([n], dtype=uint64))的组合。 近似计算为n = 总行数 / IVF分区数量pq = PQ子向量数量

storage_options : optional, dict

针对特定存储连接的额外选项。这用于存储连接参数,如凭证、端点等。

default_scan_options : optional, dict

扫描数据集时使用的默认扫描选项。这接受与lance.LanceDataset.scanner()中描述的相同参数。这些参数将应用于任何扫描操作。

这对于为常见参数(如batch_size)提供默认值非常有用。

它还可用于创建包含元字段(如_rowid_rowaddr)的数据集视图。如果提供了default_scan_options,那么当设置了适当的扫描选项时,lance.LanceDataset.schema()返回的模式将包含这些字段。

lance.iops_counter()
lance.json_to_schema(schema_json: dict[str, Any]) 模式/架构

将JSON字符串转换为PyArrow模式。

Parameters:
schema_json : Dict[str, Any]

要转换为PyArrow模式的JSON负载。

lance.schema_to_json(schema: 模式/架构) dict[str, Any]

将pyarrow模式转换为JSON字符串。

lance.set_logger(file_path='pylance.log', name='pylance', level=20, format_string=None, log_handler=None)
lance.write_dataset(data_obj: ReaderLike, uri: str | Path | LanceDataset, schema: pa.Schema | None = None, mode: str = 'create', *, max_rows_per_file: int = 1048576, max_rows_per_group: int = 1024, max_bytes_per_file: int = 96636764160, commit_lock: CommitLock | None = None, 进度: FragmentWriteProgress | None = None, storage_options: dict[str, str] | None = None, data_storage_version: str | None = None, use_legacy_format: bool | None = None, enable_v2_manifest_paths: bool = False, enable_move_stable_row_ids: bool = False, auto_cleanup_options: AutoCleanupConfig | None = None) LanceDataset

将给定的data_obj写入指定的uri

Parameters:
data_obj : Reader-like

要写入的数据。可接受的类型包括: - Pandas DataFrame、Pyarrow Table、Dataset、Scanner 或 RecordBatchReader - Huggingface 数据集

uri : str, Path, or LanceDataset

数据集写入的目标位置(目录)。如果传入的是LanceDataset,则会复用现有会话。

schema : Schema, optional

如果指定且输入为pandas DataFrame,则使用此模式替代默认的pandas到arrow表的转换。

mode : str

create - 创建新数据集(如果uri已存在则会报错)。 overwrite - 创建新的快照版本 append - 创建新版本,该版本是输入数据与最新版本的合并(如果uri不存在则会报错)

max_rows_per_file : int, default 1024 * 1024

在开始新文件之前要写入的最大行数

max_rows_per_group : int, default 1024

在同一个文件中开始新分组前的最大行数

max_bytes_per_file : int, default 90 * 1024 * 1024 * 1024

在开始新文件之前要写入的最大字节数。这是一个软性限制。该限制在每组数据写入后进行检查,这意味着较大的组可能会导致此限制被显著超出。默认值为90 GB,因为我们在对象存储上对每个文件有100 GB的硬性限制。

commit_lock : CommitLock, optional

自定义提交锁。仅当您的对象存储不支持原子提交时才需要。详情请参阅用户指南。

progress : FragmentWriteProgress, optional

实验性API。用于跟踪片段写入进度的功能。传入一个自定义类,该类需定义在开始写入每个片段和完成写入时调用的钩子函数。

storage_options : optional, dict

针对特定存储连接的额外选项。这用于存储连接参数,如凭证、端点等。

data_storage_version : optional, str, default None

要使用的数据存储格式版本。较新的版本效率更高,但需要较新版本的lance才能读取。默认值(None)将使用最新的稳定版本。更多详情请参阅用户指南。

use_legacy_format : optional, bool, default None

已弃用的设置数据存储版本的方法。请改用data_storage_version参数。

enable_v2_manifest_paths : bool, optional

如果为True,且这是一个新数据集,则使用新的V2清单路径。 这些路径能更高效地打开存储在对象存储上具有多个版本的数据集。如果数据集已存在,此参数将不起作用。要迁移现有数据集,请改用 LanceDataset.migrate_manifest_paths_v2()方法。默认为False。

enable_move_stable_row_ids : bool, optional

实验性参数:如果设置为true,写入器将使用移动稳定的行ID。 这些行ID在压缩操作后保持稳定,但在更新后不稳定。 这使得压缩更加高效,因为使用稳定的行ID时, 无需更新二级索引来指向新的行ID。

auto_cleanup_options : optional, AutoCleanupConfig

数据集自动清理的配置选项。 如果设置此选项且这是一个新数据集,旧数据集版本将根据此参数自动清理。 要为现有数据集添加自动清理功能,请使用Dataset::update_config设置 lance.auto_cleanup.interval和lance.auto_cleanup.older_than。 必须同时设置这两个参数才能启用自动清理功能。 如果不设置此参数(默认行为), 则不会执行自动清理。 注意:此选项仅在创建新数据集时生效, 对现有数据集没有影响。

class lance.dataset.AlterColumn
data_type : 数据类型 | None
name : str | None
nullable : bool | None
path : str
class lance.dataset.AutoCleanupConfig
interval : int
older_than_seconds : int
class lance.dataset.BulkCommitResult
dataset : LanceDataset
merged : 事务
class lance.dataset.DataStatistics(字段: FieldStatistics)

数据集中的数据统计信息

fields : FieldStatistics

数据集中字段的统计信息

class lance.dataset.DatasetOptimizer(dataset: LanceDataset)
compact_files(*, target_rows_per_fragment: int = 1048576, max_rows_per_group: int = 1024, max_bytes_per_file: int | None = None, materialize_deletions: bool = True, materialize_deletions_threshold: float = 0.1, num_threads: int | None = None, batch_size: int | None = None) CompactionMetrics

压缩数据集中的小文件,减少文件总数。

This does a few things:
  • 从片段中移除已删除的行

  • 从片段中移除已删除的列

  • 将小片段合并为更大的片段

该方法会保留数据集的插入顺序。这意味着如果存在的小片段不与需要压缩的其他相邻片段相连,它们可能会保留在数据集中。例如,如果您有行数为500万、100和500万的片段,中间的片段将不会被压缩,因为与其相邻的片段不需要压缩。

Parameters:
target_rows_per_fragment : int, default 1024*1024

每个片段的目标行数。这是压缩后每个片段中包含的行数。

max_rows_per_group : int, default 1024

每个分组的最大行数。这不会影响哪些片段需要压缩,但如果被选中,会影响它们的重写方式。

此设置仅影响使用旧版存储格式的数据集。新格式不需要行分组。

max_bytes_per_file : Optional[int], default None

单个文件的最大字节数。这不会影响哪些片段需要压缩,但会影响它们被选中时的重写方式。如果该值设置过小,可能会导致片段大小小于target_rows_per_fragment

默认值将使用write_dataset中的默认设置。

materialize_deletions : bool, default True

是否压缩包含软删除行的片段,使其不再出现在文件中。

materialize_deletions_threshold : float, default 0.1

在片段成为压缩候选之前,其中被软删除的原始行所占的比例。

num_threads : int, optional

执行压缩操作时使用的线程数。如果未指定,默认使用机器上的核心数。

batch_size : int, optional

The batch size to use when scanning input fragments. You may want to reduce this if you are running out of memory during compaction.

The default will use the same default from scanner.

Returns:

关于压缩过程的指标

Return type:

压缩指标

另请参阅

lance.optimize.Compaction

optimize_indices(**kwargs)

优化索引性能。

当新数据到来时,它不会自动添加到现有索引中。 在搜索时,我们需要对旧数据执行索引搜索,同时 对新数据执行昂贵的非索引搜索。随着新增 未索引数据量的增长,这可能会影响搜索延迟。 此功能会将新数据添加到现有索引中,从而恢复 性能。此功能不会重新训练索引,它仅将 新数据分配到现有分区。这意味着更新比 重新训练整个索引要快得多,但准确性可能较低 (特别是如果新数据表现出新模式、概念或趋势)

Parameters:
num_indices_to_merge : int, default 1

要合并的索引数量。 如果设置为0,将创建新的增量索引。

index_names : List[str], default None

需要优化的索引名称。 如果为None,则将优化所有索引。

retrain : bool, default False

是否重新训练整个索引。 如果为true,将基于当前数据重新训练索引, num_indices_to_merge将被忽略, 所有索引将被合并为一个。

当数据分布发生显著变化时这很有用, 我们希望通过重新训练索引来提高搜索质量。 这比从头开始重建索引更快。

class lance.dataset.DatasetStats
num_deleted_rows : int
num_fragments : int
num_small_files : int
class lance.dataset.ExecuteResult
num_deleted_rows : int
num_inserted_rows : int
num_updated_rows : int
class lance.dataset.FieldStatistics(id: int, bytes_on_disk: int)

数据集中某个字段的统计信息

bytes_on_disk : int

(可能经过压缩的)磁盘上用于存储该字段的字节

id : int

字段的ID

class lance.dataset.Index
fields : List[str]
fragment_ids : Set[int]
name : str
type : str
uuid : str
version : int
class lance.dataset.LanceDataset(uri: str | Path, 版本: int | str | None = None, block_size: int | None = None, index_cache_size: int | None = None, metadata_cache_size: int | None = None, commit_lock: CommitLock | None = None, storage_options: dict[str, str] | None = None, serialized_manifest: bytes | None = None, default_scan_options: dict[str, Any] | None = None)

Lance数据集采用Lance格式,数据存储在给定的uri位置。

add_columns(转换: dict[str, str] | BatchUDF | ReaderLike | pyarrow.Field | list[pyarrow.Field] | pyarrow.Schema, read_columns: list[str] | None = None, reader_schema: pa.Schema | None = None, batch_size: int | None = None)

使用定义的值添加新列。

有几种方式可以指定新列。首先,可以为每个新列提供SQL表达式。其次,可以提供一个UDF(用户定义函数),该函数接收一批现有数据并返回包含新列的新数据批次。这些新列将被追加到数据集中。

你也可以提供一个RecordBatchReader,它将从某个外部源读取新列的值。当新列的值已经暂存到文件中(通常由某个分布式进程完成)时,这通常很有用。

有关编写UDF的更多信息,请参阅lance.add_columns_udf()装饰器。

Parameters:
transforms : dict or AddColumnsUDF or ReaderLike

如果这是一个字典,那么键是新列的名称,值则是SQL表达式字符串。这些字符串可以引用数据集中的现有列。 如果这是一个AddColumnsUDF,那么它是一个用户定义函数,接收一批现有数据并返回包含新列的新数据批次。 如果这是pyarrow.Fieldpyarrow.Schema,则会以仅元数据操作的方式添加所有具有给定模式的NULL列。

read_columns : list of str, optional

UDF将读取的列名。如果为None,则UDF将读取所有列。仅当transforms是UDF时使用此参数。否则,读取的列将从SQL表达式中推断得出。

reader_schema : pa.Schema, optional

仅当transforms是ReaderLike对象时有效。这将用于确定读取器的模式。

batch_size : int, optional

在应用转换时,每次从源数据集中读取的行数。如果数据集是v1版本,则忽略此参数。

示例

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3]})
>>> dataset = lance.write_dataset(table, "my_dataset")
>>> @lance.batch_udf()
... def double_a(batch):
...     df = batch.to_pandas()
...     return pd.DataFrame({'double_a': 2 * df['a']})
>>> dataset.add_columns(double_a)
>>> dataset.to_table().to_pandas()
   a  double_a
0  1         2
1  2         4
2  3         6
>>> dataset.add_columns({"triple_a": "a * 3"})
>>> dataset.to_table().to_pandas()
   a  double_a  triple_a
0  1         2         3
1  2         4         6
2  3         6         9

另请参阅

LanceDataset.merge

将一组预先计算好的列合并到数据集中。

alter_columns(*修改: Iterable[AlterColumn])

修改列名、数据类型和可空性。

重命名的列可以保留其上的任何索引。如果某列具有IVF_PQ索引,即使该列被转换为其他类型,索引仍可保留。但目前其他类型的索引尚不支持类型转换。

列类型可以进行向上转型(例如从int32转为int64)或向下转型(例如从int64转为int32)。但若存在无法用新类型表示的值时,向下转型会失败。通常来说,列可以转型为相同的大类类型:整型转整型、浮点型转浮点型、字符串转字符串。不过字符串、二进制和列表列可以在其大小变体之间进行转型。例如:字符串转大字符串、二进制转大二进制、列表转大列表。

重命名的列可以保留其上的任何索引。但是,如果将该列转换为不同类型,其索引将被删除。

Parameters:
alterations : Iterable[Dict[str, Any]]

一个字典序列,每个字典包含以下键:

  • ”path”: str

    要修改的列路径。对于顶级列,这是列名。 对于嵌套列,这是点分隔的路径,例如"a.b.c"。

  • ”name”: str, optional

    列的新名称。如果未指定,则列名保持不变。

  • ”nullable”: bool, optional

    列是否可为空。如果未指定,则列的可空性不变。 只能将非空列改为可空列。目前不能将可空列改为非空列。

  • ”data_type”: pyarrow.DataType, optional

    要将列转换到的新数据类型。如果未指定,则列的数据类型保持不变。

示例

>>> import lance
>>> import pyarrow as pa
>>> schema = pa.schema([pa.field('a', pa.int64()),
...                     pa.field('b', pa.string(), nullable=False)])
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.alter_columns({"path": "a", "name": "x"},
...                       {"path": "b", "nullable": True})
>>> dataset.to_table().to_pandas()
   x  b
0  1  a
1  2  b
2  3  c
>>> dataset.alter_columns({"path": "x", "data_type": pa.int32()})
>>> dataset.schema
x: int32
b: string
checkout_version(版本: int | str) LanceDataset

加载指定版本的数据集。

dataset()构造函数不同,此操作将重用当前缓存。如果数据集已处于指定版本,则此操作无效。

Parameters:
version : int | str,

要检出的版本号。可以提供一个版本号(int)或标签(str)。

Return type:

LanceDataset

cleanup_old_versions(older_than: timedelta | None = None, *, delete_unverified: bool = False, error_if_tagged_old_versions: bool = True) CleanupStats

清理数据集的旧版本。

某些数据集变更(如覆盖操作)会遗留未被最新版本引用的数据。这些旧数据会被保留,以便将数据集回滚到之前的版本。

该方法将移除旧版本及其引用的所有数据文件。 一旦清理任务执行完毕,您将无法检出或恢复这些旧版本。

Parameters:
older_than : timedelta, optional

仅早于此版本的记录将被删除。如果未指定,默认保留两周内的记录。

delete_unverified : bool, default False

事务失败后遗留的文件可能看起来像是正在进行中的操作(例如追加新数据)的一部分,这些文件只有在至少7天后才会被删除。如果delete_unverified为True,则无论文件存在多久都会被删除。

只有在您能确保当前没有其他进程正在处理此数据集时,才应将此设置为True。否则数据集可能会进入损坏状态。

error_if_tagged_old_versions : bool, default True

某些版本可能关联了标签。带标签的版本不会被清理,无论它们存在多久。如果该参数设为True(默认值),当任何带标签版本匹配参数时会抛出异常。否则,带标签版本将被静默忽略,仅清理未加标签的版本。

static commit(base_uri: str | Path | LanceDataset, 操作: LanceOperation.BaseOperation | 事务, blobs_op: LanceOperation.BaseOperation | None = None, read_version: int | None = None, commit_lock: CommitLock | None = None, storage_options: dict[str, str] | None = None, enable_v2_manifest_paths: bool | None = None, 分离: bool | None = False, max_retries: int = 20) LanceDataset

创建数据集的新版本

此方法是一个高级方法,允许用户描述对数据文件所做的更改。当使用Lance应用更改时(例如使用LanceDatasetwrite_dataset()),则不需要此方法。

它当前的目的是允许在分布式环境中进行更改,其中没有单一进程完成所有工作。例如,分布式批量更新或分布式批量修改操作。

一旦完成所有更改,可以调用此方法通过更新数据集清单使更改生效。

警告

这是一个高级API,不提供与其他API相同级别的验证。例如,调用者有责任确保片段对模式有效。

Parameters:
base_uri : str, Path, or LanceDataset

数据集的基础URI,或者数据集对象本身。使用数据集对象可能更高效,因为它可以复用文件元数据缓存。

operation : BaseOperation

应用于数据集的操作。这描述了已进行的更改。可用的操作请参见LanceOperation

read_version : int, optional

作为变更基础的数据集版本。 覆盖或恢复操作不需要此版本。

commit_lock : CommitLock, optional

自定义提交锁。仅当您的对象存储不支持原子提交时才需要。详情请参阅用户指南。

storage_options : optional, dict

针对特定存储连接的额外选项。这用于存储连接参数,如凭证、端点等。

enable_v2_manifest_paths : bool, optional

如果为True,且这是一个新数据集,则使用新的V2清单路径。 这些路径能更高效地在对象存储上打开包含多个版本的数据集。如果数据集已存在,此参数将不起作用。要迁移现有数据集,请改用migrate_manifest_paths_v2()方法。默认为False。警告: 启用此选项将使旧版Lance(0.17.0之前版本)无法读取该数据集。

detached : bool, optional

如果为True,则该提交不会成为数据集谱系的一部分。它将永远不会显示为最新数据集,未来唯一查看它的方式是通过指定版本号检出。该版本将是一个随机版本号,仅在分离提交中保持唯一。调用方应将其存储在某处,因为未来将无法通过其他方式获取它。

max_retries : int

提交数据集时执行的最大重试次数。

Returns:

Lance数据集的新版本。

Return type:

LanceDataset

示例

使用LanceOperation.Overwrite操作创建新数据集:

>>> import lance
>>> import pyarrow as pa
>>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]})
>>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]})
>>> fragment1 = lance.fragment.LanceFragment.create("example", tab1)
>>> fragment2 = lance.fragment.LanceFragment.create("example", tab2)
>>> fragments = [fragment1, fragment2]
>>> operation = lance.LanceOperation.Overwrite(tab1.schema, fragments)
>>> dataset = lance.LanceDataset.commit("example", operation)
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
static commit_batch(dest: str | Path | LanceDataset, 交易: collections.abc.Sequence[事务], commit_lock: CommitLock | None = None, storage_options: dict[str, str] | None = None, enable_v2_manifest_paths: bool | None = None, 分离: bool | None = False, max_retries: int = 20) BulkCommitResult

使用多个事务创建数据集的新版本。

此方法是一个高级方法,允许用户描述对数据文件所做的更改。当使用Lance应用更改时(例如使用LanceDatasetwrite_dataset()),则不需要此方法。

Parameters:
dest : str, Path, or LanceDataset

数据集的基础URI,或者数据集对象本身。使用数据集对象可能更高效,因为它可以复用文件元数据缓存。

transactions : Iterable[Transaction]

要应用于数据集的事务操作。这些操作将被合并为单个事务并应用到数据集中。注意:目前仅支持追加事务,其他类型的事务将在未来版本中支持。

commit_lock : CommitLock, optional

自定义提交锁。仅当您的对象存储不支持原子提交时才需要。详情请参阅用户指南。

storage_options : optional, dict

针对特定存储连接的额外选项。这用于存储连接参数,如凭证、端点等。

enable_v2_manifest_paths : bool, optional

如果为True,且这是一个新数据集,则使用新的V2清单路径。 这些路径能更高效地在对象存储上打开包含多个版本的数据集。如果数据集已存在,此参数将不起作用。要迁移现有数据集,请改用migrate_manifest_paths_v2()方法。默认为False。警告: 启用此选项将使旧版Lance(0.17.0之前版本)无法读取该数据集。

detached : bool, optional

如果为True,则该提交不会成为数据集谱系的一部分。它将永远不会显示为最新数据集,未来唯一查看它的方式是通过指定版本号检出。该版本将是一个随机版本号,仅在分离提交中保持唯一。调用方应将其存储在某处,因为未来将无法通过其他方式获取它。

max_retries : int

提交数据集时执行的最大重试次数。

Returns:

dataset: LanceDataset

Lance数据集的新版本。

merged: Transaction

应用于数据集的合并事务。

Return type:

包含键的字典

count_rows(filter: 表达式 | str | None = None, **kwargs) int

统计符合扫描器筛选条件的行数。

Parameters:
**kwargs : dict, optional

完整参数描述请参见 py:method:scanner 方法。

Returns:

count – 数据集中的总行数。

Return type:

int

create_index(: str | list[str], index_type: str, name: str | None = None, metric: str = 'L2', 替换: bool = False, num_partitions: int | None = None, ivf_centroids: np.ndarray | pa.FixedSizeListArray | pa.FixedShapeTensorArray | None = None, pq_codebook: np.ndarray | pa.FixedSizeListArray | pa.FixedShapeTensorArray | None = None, num_sub_vectors: int | None = None, accelerator: str | 'torch.Device' | None = None, index_cache_size: int | None = None, shuffle_partition_batches: int | None = None, shuffle_partition_concurrency: int | None = None, ivf_centroids_file: str | None = None, precomputed_partition_dataset: str | None = None, storage_options: dict[str, str] | None = None, filter_nan: bool = True, one_pass_ivfpq: bool = False, **kwargs) LanceDataset

在列上创建索引。

实验性API

Parameters:
column : str

要建立索引的列。

index_type : str

索引的类型。 "IVF_PQ, IVF_HNSW_PQ IVF_HNSW_SQ" 目前支持。

name : str, optional

索引名称。如果未提供,将根据列名自动生成。

metric : str

距离度量类型,即“L2”(“euclidean”的别名)、“cosine”或“dot”(点积)。默认为“L2”。

replace : bool

如果索引已存在,则替换现有索引。

num_partitions : int, optional

IVF(倒排文件索引)的分区数量。

ivf_centroids : optional

它可以是np.ndarraypyarrow.FixedSizeListArraypyarrow.FixedShapeTensorArray。 一个num_partitions x dimension维度的现有K均值中心点数组, 用于IVF聚类。如果未提供,将训练一个新的KMeans模型。

pq_codebook : optional,

它可以是np.ndarraypyarrow.FixedSizeListArray, 或pyarrow.FixedShapeTensorArray。 一个num_sub_vectors x (2 ^ nbits * dimensions // num_sub_vectors) 数组,表示PQ码本的K均值中心点。

注意:目前nbits始终为8。 如果未提供,将训练一个新的PQ模型。

num_sub_vectors : int, optional

PQ(乘积量化)的子向量数量。

accelerator: str | 'torch.Device' | None = None

如果设置,将使用加速器来加快训练过程。 支持的加速器包括:"cuda"(英伟达GPU)和"mps"(苹果硅GPU)。 如果未设置,则使用CPU。

index_cache_size : int, optional

索引缓存的大小,以条目数表示。默认值为256。

shuffle_partition_batches : int, optional

批次数,使用数据集的row group大小,决定每个shuffle分区包含的数量。默认值为10240。

假设row group大小为1024,每个shuffle分区将包含10240 * 1024 = 10,485,760行。减小此值会减少shuffle操作的内存消耗但会增加完成时间,反之亦然。

shuffle_partition_concurrency : int, optional

并发处理的shuffle分区数量。默认值为2

减小该值可以减少shuffle操作的内存消耗,但会增加完成时间,反之亦然。

storage_options : optional, dict

针对特定存储连接的额外选项。这用于存储连接参数,如凭证、端点等。

filter_nan : bool

默认为True。False是不安全的,如果存在任何null/nan值会导致崩溃(否则不会)。禁用用于可空列的空值过滤器。可获得小幅速度提升。

one_pass_ivfpq : bool

默认为False。如果启用,索引类型必须为“IVF_PQ”。可减少磁盘IO。

**kwargs

传递给索引构建过程的参数。

SQ(标量量化)仅适用于IVF_HNSW_SQ索引类型,这种量化方法用于减少索引的内存占用,它将浮点向量映射为整数向量,每个整数占用num_bits位,目前仅支持8位。

If index_type is “IVF_*”, then the following parameters are required:

num_partitions

If index_type is with “PQ”, then the following parameters are required:

子向量数量

IVF_PQ的可选参数:

  • ivf_centroids

    用于IVF聚类的现有K均值中心点。

  • num_bits

    PQ(乘积量化)的位数。默认为8。 仅支持4和8。

Optional parameters for IVF_HNSW_*:
max_level

Int,图中最大层级数。

m

Int,图中每个节点的边数。

ef_construction

Int,构建过程中需要检查的节点数量。

示例

import lance

dataset = lance.dataset("/tmp/sift.lance")
dataset.create_index(
    "vector",
    "IVF_PQ",
    num_partitions=256,
    num_sub_vectors=16
)
import lance

dataset = lance.dataset("/tmp/sift.lance")
dataset.create_index(
    "vector",
    "IVF_HNSW_SQ",
    num_partitions=256,
)

实验性加速器(GPU)支持:

  • accelerate: 使用GPU训练IVF分区。

    目前仅支持CUDA(英伟达)或MPS(苹果)平台。 需要安装PyTorch。

import lance

dataset = lance.dataset("/tmp/sift.lance")
dataset.create_index(
    "vector",
    "IVF_PQ",
    num_partitions=256,
    num_sub_vectors=16,
    accelerator="cuda"
)

参考文献

create_scalar_index(: str, index_type: 'BTREE' | 'BITMAP' | 'LABEL_LIST' | 'INVERTED' | 'FTS' | 'NGRAM', name: str | None = None, *, 替换: bool = True, **kwargs)

在列上创建标量索引。

标量索引与向量索引类似,可用于加速扫描。当扫描包含对已索引列的过滤表达式时,标量索引能显著提升查询速度。例如,若my_col列建有标量索引,以下扫描操作将执行得更快:

import lance

dataset = lance.dataset("/tmp/images.lance")
my_table = dataset.scanner(filter="my_col != 7").to_table()

带有预过滤器的向量搜索也可以从标量索引中受益。例如,

import lance

dataset = lance.dataset("/tmp/images.lance")
my_table = dataset.scanner(
    nearest=dict(
       column="vector",
       q=[1, 2, 3, 4],
       k=10,
    )
    filter="my_col != 7",
    prefilter=True
)

目前有5种标量索引类型可供选择。

  • BTREE。最常见的类型是BTREE。该索引的灵感来源于btree数据结构,尽管只有btree的前几层会被缓存在内存中。它将在具有大量唯一值且每个值对应行数较少的列上表现良好。

  • BITMAP。该索引为列中的每个唯一值存储一个位图。这种索引适用于具有少量唯一值且每个值对应多行数据的列。

  • LABEL_LIST. 一种特殊索引,用于对值基数较小的列表列进行索引。例如,包含标签列表的列(如["tag1", "tag2", "tag3"])可以使用LABEL_LIST索引。该索引只能加速带有array_has_anyarray_has_all过滤器的查询。

  • NGRAM. 一种用于索引字符串列的特殊索引。该索引会为字符串中的每个n元语法创建位图。默认情况下我们使用三元语法。当前该索引可以加速在过滤器中使用contains函数的查询。

  • FTS/INVERTED. 该索引用于对文档列进行索引。这种索引可以进行全文搜索。例如,一个包含查询字符串"hello world"中任意单词的列。结果将按BM25算法排序。

请注意,可以使用环境变量LANCE_BYPASS_SPILLING来绕过磁盘溢出。将其设置为true可以避免内存耗尽问题(更多信息请参阅https://github.com/apache/datafusion/issues/10073)。

实验性API

Parameters:
column : str

要建立索引的列。必须是布尔型、整型、浮点型或字符串类型的列。

index_type : str

索引的类型。可选值为 "BTREE", "BITMAP", "LABEL_LIST", "NGRAM", "FTS""INVERTED"

name : str, optional

索引名称。如果未提供,将根据列名自动生成。

replace : bool, default True

如果索引已存在,则替换现有索引。

with_position : bool, default True

这是针对INVERTED索引的。如果设为True,索引将存储文档中单词的位置信息,以便支持短语查询。这将显著增加索引大小。即使设置为True,也不会影响非短语查询的性能。

base_tokenizer : str, default "simple"

这是针对INVERTED索引的配置。指定使用的基础分词器,可选值包括: * "simple":根据空白字符和标点符号进行分词。 * "whitespace":仅根据空白字符进行分词。 * "raw":不进行分词处理。

language : str, default "English"

这是针对INVERTED索引的。用于词干提取和停用词处理的语言。仅当stemremove_stop_words为true时使用

max_token_length : Optional[int], default 40

这是针对INVERTED索引的设置。表示最大令牌长度。任何超过此长度的令牌将被移除。

lower_case : bool, default True

这是针对INVERTED索引的。如果设为True,索引会将所有文本转换为小写。

stem : bool, default False

这是针对INVERTED索引的。如果为True,索引将对词干进行提取。

remove_stop_words : bool, default False

这是针对INVERTED索引的。如果为True,该索引将移除停用词。

ascii_folding : bool, default False

这是针对INVERTED索引的。如果设为True,索引会尽可能将非ASCII字符转换为ASCII字符。例如会将带重音符号的字母如"é"转换为"e"。

示例

import lance

dataset = lance.dataset("/tmp/images.lance")
dataset.create_index(
    "category",
    "BTREE",
)

标量索引只能加速使用等值、比较、范围(例如my_col BETWEEN 0 AND 100)和集合成员(例如my_col IN (0, 1, 2))等基础过滤条件的扫描

当筛选条件包含多个索引列且这些条件通过AND或OR逻辑连接时,可以使用标量索引 (例如 my_col < 0 AND other_col> 100)

如果过滤条件包含未建立索引的列,虽然可以使用标量索引,但根据过滤条件的结构,可能无法实际使用。例如,若列not_indexed没有标量索引,那么过滤条件my_col = 0 OR not_indexed = 1将无法利用my_col上的任何标量索引。

要判断扫描是否使用了标量索引,可以使用explain_plan查看lancedb生成的查询计划。使用标量索引的查询将包含ScalarIndexQuery关系或MaterializeIndex操作符。

property data_storage_version : str

该数据集所使用的数据存储格式版本

delete(predicate: str | 表达式)

从数据集中删除行。

这会将行标记为已删除,但不会从文件中物理移除它们。这样可以保持现有索引仍然有效。

Parameters:
predicate : str or pa.compute.Expression

用于选择要删除行的谓词。可以是SQL字符串或pyarrow表达式。

示例

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.delete("a = 1 or b in ('a', 'b')")
>>> dataset.to_table()
pyarrow.Table
a: int64
b: string
----
a: [[3]]
b: [["c"]]
static drop(base_uri: str | Path, storage_options: dict[str, str] | None = None) None
drop_columns(columns: list[str])

从数据集中删除一列或多列

这是一个仅涉及元数据的操作,不会从底层存储中删除实际数据。如需删除数据,您必须随后调用compact_files来重写不包含被删除列的数据,然后调用cleanup_old_versions来移除旧文件。

Parameters:
columns : list of str

要删除的列名。这些可以是嵌套列引用(例如“a.b.c”)或顶级列名(例如“a”)。

示例

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.drop_columns(["a"])
>>> dataset.to_table().to_pandas()
   b
0  a
1  b
2  c
drop_index(name: str)

从数据集中删除索引

注意:索引是通过"索引名称"删除的。这与字段名称不同。如果在创建索引时未指定名称,则会自动生成一个名称。您可以使用list_indices方法来获取索引的名称。

get_fragment(fragment_id: int) LanceFragment | None

获取指定片段ID的片段。

get_fragments(filter: Expression | None = None) list[LanceFragment]

从数据集中获取所有片段。

注意:过滤器功能暂不支持。

property has_index
head(num_rows, **kwargs)

加载数据集的前N行。

Parameters:
num_rows : int

要加载的行数。

**kwargs : dict, optional

完整参数描述请参见scanner()方法。

Returns:

表格

Return type:

表格

index_statistics(index_name: str) dict[str, Any]
insert(数据: ReaderLike, *, mode='append', **kwargs)

将数据插入数据集。

Parameters:
data_obj : Reader-like

要写入的数据。可接受的类型包括: - Pandas DataFrame、Pyarrow Table、Dataset、Scanner 或 RecordBatchReader - Huggingface 数据集

mode : str, default 'append'

写入数据时使用的模式。可选选项有:

create - 创建新数据集(如果uri已存在则会报错)。 overwrite - 创建新的快照版本 append - 创建新版本,该版本是输入数据与最新版本的合并(如果uri不存在则会报错)

**kwargs : dict, optional

传递给write_dataset()的其他关键字参数。

join(right_dataset, keys, right_keys=None, join_type='left outer', left_suffix=None, right_suffix=None, coalesce_keys=True, use_threads=True)

未实现(仅覆盖pyarrow数据集以防止段错误)

property lance_schema : LanceSchema

该数据集的LanceSchema

property latest_version : int

返回数据集的最新版本。

list_indices() list[索引]
property max_field_id : int

清单中的max_field_id

merge(data_obj: ReaderLike, left_on: str, right_on: str | None = None, schema=None)

将另一个数据集合并到当前数据集中。

执行左连接操作,其中数据集作为左表,data_obj作为右表。存在于数据集但不在左表中的行将被填充为null值,除非Lance不支持某些类型的null值,这种情况下会抛出错误。

Parameters:
data_obj : Reader-like

要合并的数据。可接受的类型包括: - Pandas DataFrame、Pyarrow Table、Dataset、Scanner、 Iterator[RecordBatch] 或 RecordBatchReader

left_on : str

数据集中用于连接的列名。

right_on : str or None

数据对象中用于连接的列名。如果为None,则默认为left_on。

示例

>>> import lance
>>> import pyarrow as pa
>>> df = pa.table({'x': [1, 2, 3], 'y': ['a', 'b', 'c']})
>>> dataset = lance.write_dataset(df, "dataset")
>>> dataset.to_table().to_pandas()
   x  y
0  1  a
1  2  b
2  3  c
>>> new_df = pa.table({'x': [1, 2, 3], 'z': ['d', 'e', 'f']})
>>> dataset.merge(new_df, 'x')
>>> dataset.to_table().to_pandas()
   x  y  z
0  1  a  d
1  2  b  e
2  3  c  f

另请参阅

LanceDataset.add_columns

通过逐批计算添加新列。

merge_insert(开启: str | Iterable[str]) MergeInsertBuilder

返回一个构建器,可用于创建“合并插入”操作

该操作可以在单个事务中添加行、更新行和删除行。这是一个非常通用的工具,可用于实现诸如“如果不存在则插入”、“更新或插入(即upsert)”等行为,甚至可以用新数据替换部分现有数据(例如替换所有月份为“一月”的数据)。

合并插入操作通过使用连接将源表中的新数据与目标表中的现有数据相结合。记录分为三类。

"匹配"记录是指同时存在于源表和目标表中的记录。"未匹配"记录仅存在于源表中(例如这些是新数据)。"源未匹配"记录仅存在于目标表中(这是旧数据)。

此方法返回的构建器可用于自定义每种数据类别应执行的操作。

请注意,此操作会导致数据重新排序。这是因为更新的行会从数据集中删除,然后以新值重新插入到末尾。由于内部使用了哈希连接操作,新插入行的顺序可能会随机波动。

Parameters:
on : Union[str, Iterable[str]]

要连接的列(或多列)。这是源表和目标表中记录匹配的方式。通常这是某种键或ID列。

示例

使用when_matched_update_all()when_not_matched_insert_all()来执行"upsert"操作。这将更新数据集中已存在的行,并插入不存在的行。

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [2, 1, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
>>> # Perform a "upsert" operation
>>> dataset.merge_insert("a")     \
...             .when_matched_update_all()     \
...             .when_not_matched_insert_all() \
...             .execute(new_table)
{'num_inserted_rows': 1, 'num_updated_rows': 2, 'num_deleted_rows': 0}
>>> dataset.to_table().sort_by("a").to_pandas()
   a  b
0  1  b
1  2  x
2  3  y
3  4  z

使用when_not_matched_insert_all()执行"不存在则插入"操作。这只会插入数据集中尚不存在的行。

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example2")
>>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
>>> # Perform an "insert if not exists" operation
>>> dataset.merge_insert("a")     \
...             .when_not_matched_insert_all() \
...             .execute(new_table)
{'num_inserted_rows': 1, 'num_updated_rows': 0, 'num_deleted_rows': 0}
>>> dataset.to_table().sort_by("a").to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  z

您不需要提供所有列。如果只想更新部分列,可以省略不想更新的列。被省略的列在更新时会保留现有值,如果是插入操作则会设为null。

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"], \
...                   "c": ["x", "y", "z"]})
>>> dataset = lance.write_dataset(table, "example3")
>>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
>>> # Perform an "upsert" operation, only updating column "a"
>>> dataset.merge_insert("a")     \
...             .when_matched_update_all()     \
...             .when_not_matched_insert_all() \
...             .execute(new_table)
{'num_inserted_rows': 1, 'num_updated_rows': 2, 'num_deleted_rows': 0}
>>> dataset.to_table().sort_by("a").to_pandas()
   a  b     c
0  1  a     x
1  2  x     y
2  3  y     z
3  4  z  None
migrate_manifest_paths_v2()

将清单路径迁移到新格式。

这将更新清单以使用新的v2格式路径。

该函数具有幂等性,可以多次运行而不会改变对象存储的状态。

警告:当其他并发操作正在进行时,不应运行此操作。 并且该操作应在完成后再恢复其他操作。

property optimize : DatasetOptimizer
property partition_expression

未实现(仅覆盖pyarrow数据集以防止段错误)

prewarm_index(name: str)

预热索引

这将把整个索引加载到内存中。这有助于避免索引查询时的冷启动问题。如果索引无法放入索引缓存中,则会导致I/O浪费。

Parameters:
name : str

需要预热的索引名称。

replace_field_metadata(field_name: str, new_metadata: dict[str, str])

替换模式中某个字段的元数据

Parameters:
field_name : str

要替换元数据的字段名称

new_metadata : dict

要设置的新元数据

replace_schema(schema: 模式/架构)

未实现(仅覆盖pyarrow数据集以防止段错误)

参见 :py:method:`replace_schema_metadata`:py:method:`replace_field_metadata`

replace_schema_metadata(new_metadata: dict[str, str])

替换数据集的结构元数据

Parameters:
new_metadata : dict

要设置的新元数据

restore()

将当前检出的版本恢复为数据集的最新版本。

这将创建一个新的提交。

sample(num_rows: int, columns: list[str] | dict[str, str] | None = None, randomize_order: bool = True, **kwargs) 表格

随机选取数据样本

Parameters:
num_rows : int

要检索的行数

columns : list of str, or dict of str to str default None

要获取的列名列表。 或者列名到SQL表达式的字典。 如果为None或未指定,则获取所有列。

**kwargs : dict, optional

完整参数描述请参见scanner()方法。

Returns:

表格

Return type:

表格

scanner(columns: list[str] | dict[str, str] | None = None, filter: 表达式 | str | None = None, limit: int | None = None, offset: int | None = None, nearest: dict | None = None, batch_size: int | None = None, batch_readahead: int | None = None, fragment_readahead: int | None = None, scan_in_order: bool | None = None, 片段: Iterable[LanceFragment] | None = None, full_text_query: str | dict | FullTextQuery | None = None, *, prefilter: bool | None = None, with_row_id: bool | None = None, with_row_address: bool | None = None, use_stats: bool | None = None, fast_search: bool | None = None, io_buffer_size: int | None = None, late_materialization: bool | list[str] | None = None, use_scalar_index: bool | None = None, include_deleted_rows: bool | None = None, scan_stats_callback: Callable[[ScanStatistics], None] | None = None, strict_batch_size: bool | None = None) LanceScanner

返回一个支持各种下推操作的Scanner。

Parameters:
columns : list of str, or dict of str to str default None

要获取的列名列表。 或者列名到SQL表达式的字典。 如果为None或未指定,则获取所有列。

filter : pa.compute.Expression or str

表达式或字符串,必须是一个有效的SQL where子句。有关有效的SQL表达式,请参阅 Lance filter pushdown

limit : int, default None

最多获取这么多行。如果为None或未指定,则获取所有行。

offset : int, default None

从这一行开始获取。如果未指定则为0。

nearest : dict, default None

获取与K个最相似向量对应的行。示例:

{
    "column": <embedding col name>,
    "q": <query vector as pa.Float32Array>,
    "k": 10,
    "minimum_nprobes": 20,
    "maximum_nprobes": 50,
    "refine_factor": 1
}

batch_size : int, default None

返回批次的目标大小。在某些情况下,批次大小可能达到此值的两倍(但绝不会超过)。而在其他情况下,批次大小可能小于此值。

io_buffer_size : int, default None

IO缓冲区的大小。有关更多信息,请参阅ScannerBuilder.io_buffer_size

batch_readahead : int, optional

预读取的批次数量。

fragment_readahead : int, optional

预读取的片段数量。

scan_in_order : bool, default True

是否按顺序读取片段和批次。如果为false,吞吐量可能会更高,但批次将无序返回,内存使用可能会增加。

fragments : iterable of LanceFragment, default None

如果指定,则仅扫描这些片段。如果scan_in_order为True,那么片段将按照给定的顺序进行扫描。

prefilter : bool, default False

如果为True,则在运行向量查询之前应用过滤器。 这将生成更准确的结果,但查询成本可能更高。 通常在过滤器具有高选择性时效果良好。

如果为False,则在运行向量查询之后应用过滤器。 这样性能会更好,但如果最接近查询的行不匹配过滤器, 结果可能会少于请求的行数(或为空)。 通常在过滤器选择性不高时效果良好。

use_scalar_index : bool, default True

Lance会自动使用标量索引来优化查询。在某些极端情况下,这可能会使查询性能变差,此时可以通过该参数禁用标量索引。

late_materialization : bool or List[str], default None

允许自定义控制延迟物化。延迟物化会在过滤操作后通过take操作获取非查询列,这在结果较少或列数据量非常大时非常有用。

当结果较多或列数据非常窄时,早期物化可能更优。

如果设为True,则所有列都采用延迟物化; 如果设为False,则所有列都采用早期物化; 如果是字符串列表,则只有列表中的列会延迟物化。

默认采用启发式算法,假设过滤器会筛选出约0.1%的行。如果您的过滤器选择性更强(例如按id查找),可能需要设为True;如果过滤器选择性较弱(例如匹配20%的行),可能需要设为False。

full_text_query : str or dict, optional

要搜索的查询字符串,结果将按BM25算法排序。 例如"hello world"会匹配包含"hello"或"world"的文档。 或者使用包含以下键的字典:

  • columns: list[str]

    要搜索的列, 目前columns列表中仅支持单个列。

  • query: str

    要搜索的查询字符串。

如果为True,则搜索将仅在索引数据上执行,这样可以缩短搜索时间。

scan_stats_callback : Callable[[ScanStatistics], None], default None

一个回调函数,在扫描完成后将调用该函数并传入扫描统计信息。回调函数引发的错误将被记录但不会重新抛出。

include_deleted_rows : bool, default False

如果为True,则已被删除但仍存在于片段中的行将被返回。这些行的_rowid列将被设为null。所有其他列将反映磁盘上存储的值,可能不为null。

注意:如果是搜索操作或take操作(包括标量索引扫描),则无法返回已删除的行。

注意

目前,如果同时指定了filter和nearest,那么:

  1. nearest 会优先执行。

  2. 结果会在之后进行过滤。

为了调试近似最近邻(ANN)结果,您可以选择即使存在索引也不使用它,只需指定use_index=False。例如,以下代码将始终返回精确的KNN结果:

dataset.to_table(nearest={
    "column": "vector",
    "k": 10,
    "q": <query vector>,
    "use_index": False
}
property schema : 模式/架构

该数据集的pyarrow模式

session() _Session

返回数据集会话,该会话保存了数据集的状态。

property stats : LanceStats

实验性API

property tags : 标签

数据集标签管理。

与Git类似,标签是一种向数据集特定版本添加元数据的方式。

警告

带标签的版本不受cleanup_old_versions()清理过程的影响。

要删除已标记的版本,您必须先delete()关联的标记。

示例

ds = lance.open("dataset.lance")
ds.tags.create("v2-prod-20250203", 10)

tags = ds.tags.list()
take(索引: list[int] | 数组, columns: list[str] | dict[str, str] | None = None) 表格

按索引选择数据行。

Parameters:
indices : Array or array-like

数据集中要选择的行索引。

columns : list of str, or dict of str to str default None

要获取的列名列表。 或者列名到SQL表达式的字典。 如果为None或未指定,则获取所有列。

Returns:

表格

Return type:

pyarrow.Table

take_blobs(blob_column: str, ids: list[int] | 数组 | None = None, 地址: list[int] | 数组 | None = None, 索引: list[int] | 数组 | None = None) list[BlobFile]

按行ID选择二进制大对象。

无需在处理前将大型二进制blob数据加载到内存中,该API允许您将二进制blob数据作为常规的Python类文件对象打开。更多详情请参阅lance.BlobFile

必须且只能指定ids、addresses或indices中的一个参数。 :param blob_column: 要选择的blob列名称。 :type blob_column: str :param ids: 数据集中要选择的行ID。 :type ids: Integer Array或类似数组 :param addresses: 数据集中要选择行的(不稳定)内存地址。 :type addresses: Integer Array或类似数组 :param indices: 数据集中行的偏移量/索引。 :type indices: Integer Array或类似数组

Returns:

blob_files

Return type:

列表[BlobFile]

to_batches(columns: list[str] | dict[str, str] | None = None, filter: 表达式 | str | None = None, limit: int | None = None, offset: int | None = None, nearest: dict | None = None, batch_size: int | None = None, batch_readahead: int | None = None, fragment_readahead: int | None = None, scan_in_order: bool | None = None, *, prefilter: bool | None = None, with_row_id: bool | None = None, with_row_address: bool | None = None, use_stats: bool | None = None, full_text_query: str | dict | None = None, io_buffer_size: int | None = None, late_materialization: bool | list[str] | None = None, use_scalar_index: bool | None = None, strict_batch_size: bool | None = None, **kwargs) Iterator[RecordBatch]

将数据集读取为物化的记录批次。

Parameters:
**kwargs : dict, optional

scanner()的参数。

Returns:

record_batches

Return type:

RecordBatch的迭代器

to_table(columns: list[str] | dict[str, str] | None = None, filter: 表达式 | str | None = None, limit: int | None = None, offset: int | None = None, nearest: dict | None = None, batch_size: int | None = None, batch_readahead: int | None = None, fragment_readahead: int | None = None, scan_in_order: bool | None = None, *, prefilter: bool | None = None, with_row_id: bool | None = None, with_row_address: bool | None = None, use_stats: bool | None = None, fast_search: bool | None = None, full_text_query: str | dict | FullTextQuery | None = None, io_buffer_size: int | None = None, late_materialization: bool | list[str] | None = None, use_scalar_index: bool | None = None, include_deleted_rows: bool | None = None) 表格

将数据作为pyarrow.Table读入内存

Parameters:
columns : list of str, or dict of str to str default None

要获取的列名列表。 或者列名到SQL表达式的字典。 如果为None或未指定,则获取所有列。

filter : pa.compute.Expression or str

表达式或字符串,必须是一个有效的SQL where子句。有关有效的SQL表达式,请参阅 Lance filter pushdown

limit : int, default None

最多获取这么多行。如果为None或未指定,则获取所有行。

offset : int, default None

从这一行开始获取。如果未指定则为0。

nearest : dict, default None

获取与K个最相似向量对应的行。示例:

{
    "column": <embedding col name>,
    "q": <query vector as pa.Float32Array>,
    "k": 10,
    "metric": "cosine",
    "minimum_nprobes": 20,
    "maximum_nprobes": 50,
    "refine_factor": 1
}

batch_size : int, optional

每次读取的行数。

io_buffer_size : int, default None

IO缓冲区的大小。有关更多信息,请参阅ScannerBuilder.io_buffer_size

batch_readahead : int, optional

预读取的批次数量。

fragment_readahead : int, optional

预读取的片段数量。

scan_in_order : bool, optional, default True

是否按顺序读取片段和批次。如果为false,吞吐量可能会更高,但批次将无序返回,内存使用可能会增加。

prefilter : bool, optional, default False

在向量搜索之前运行过滤器。

late_materialization : bool or List[str], default None

允许自定义控制延迟物化。更多信息请参阅 ScannerBuilder.late_materialization

use_scalar_index : bool, default True

允许自定义控制标量索引的使用。更多信息请参见 ScannerBuilder.use_scalar_index

with_row_id : bool, optional, default False

返回行ID。

with_row_address : bool, optional, default False

返回行地址

use_stats : bool, optional, default True

在过滤过程中使用统计下推。

full_text_query : str or dict, optional

用于搜索的查询字符串,结果将按BM25算法排序。 例如:"hello world"会匹配包含"hello"或"world"的文档。 或者使用包含以下键的字典:

  • columns: list[str]

    要搜索的列, 目前仅支持在列列表中指定单个列。

  • query: str

    要搜索的查询字符串。

include_deleted_rows : bool, optional, default False

如果为True,则已被删除但仍存在于片段中的行将被返回。这些行的_rowid列将被设为null。所有其他列将反映磁盘上存储的值,可能不为null。

注意:如果是搜索操作或take操作(包括标量索引扫描),则无法返回已删除的行。

笔记

如果同时指定了filter和nearest,那么:

  1. nearest 会优先执行。

  2. 除非将pre-filter设置为True,否则结果会在之后进行过滤。

update(更新: dict[str, str], where: str | None = None) UpdateResult

更新符合where条件的行的列值。

Parameters:
updates : dict of str to str

列名到SQL表达式的映射关系。

where : str, optional

一个SQL谓词,指示应更新哪些行。

Returns:

updates – 包含更新行数的字典。

Return type:

字典

示例

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
>>> dataset = lance.write_dataset(table, "example")
>>> update_stats = dataset.update(dict(a = 'a + 2'), where="b != 'a'")
>>> update_stats["num_updated_rows"] = 2
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  4  b
2  5  c
property uri : str

数据的位置

validate()

验证数据集。

此操作会检查数据集的完整性,如果数据集已损坏则会抛出异常。

property version : int

返回当前检出的数据集版本

versions()

返回此数据集中的所有版本。

class lance.dataset.LanceOperation
class Append(片段: Iterable[FragmentMetadata])

向数据集追加新行。

fragments

包含新行的片段。

Type:

列表[FragmentMetadata]

警告

这是一个用于分布式操作的高级API。若要在单台机器上向数据集追加数据,请使用lance.write_dataset()

示例

要向数据集追加新行,首先使用 lance.fragment.LanceFragment.create()创建片段。然后 将片段元数据收集到列表中并传递给此类。 最后,将该操作传递给LanceDataset.commit() 方法来创建新数据集。

>>> import lance
>>> import pyarrow as pa
>>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]})
>>> dataset = lance.write_dataset(tab1, "example")
>>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]})
>>> fragment = lance.fragment.LanceFragment.create("example", tab2)
>>> operation = lance.LanceOperation.Append([fragment])
>>> dataset = lance.LanceDataset.commit("example", operation,
...                                     read_version=dataset.version)
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
fragments : Iterable[FragmentMetadata]
class BaseOperation

可应用于数据集的操作基类。

请参阅LanceOperation下的可用操作。

class CreateIndex(uuid: str, name: str, 字段: list[int], dataset_version: int, fragment_ids: set[int], index_version: int)

在数据集上创建索引的操作。

dataset_version : int
fields : List[int]
fragment_ids : Set[int]
index_version : int
name : str
uuid : str
class DataReplacement(replacements: list[DataReplacementGroup])

该操作用于替换数据集中的现有数据文件。

replacements : List[DataReplacementGroup]
class DataReplacementGroup(fragment_id: int, new_file: 数据文件)

数据替换组

fragment_id : int
new_file : 数据文件
class Delete(updated_fragments: Iterable[FragmentMetadata], deleted_fragment_ids: Iterable[int], predicate: str)

从数据集中移除片段或行。

updated_fragments

已更新为包含新删除向量的片段。

Type:

列表[FragmentMetadata]

deleted_fragment_ids

已完全删除的片段ID。这些是LanceFragment.delete()返回None的片段。

Type:

整数列表

predicate

用于选择要删除行的原始SQL谓词。

Type:

字符串

警告

这是一个用于分布式操作的高级API。若要在单台机器上从数据集中删除行,请使用lance.LanceDataset.delete()

示例

要从数据集中删除行,请在每个片段上调用lance.fragment.LanceFragment.delete()。如果返回一个新片段,则将其添加到updated_fragments列表中。如果返回None,则表示整个片段已被删除,因此将该片段ID添加到deleted_fragment_ids中。最后,将该操作传递给LanceDataset.commit()方法以完成删除操作。

>>> import lance
>>> import pyarrow as pa
>>> table = pa.table({"a": [1, 2], "b": ["a", "b"]})
>>> dataset = lance.write_dataset(table, "example")
>>> table = pa.table({"a": [3, 4], "b": ["c", "d"]})
>>> dataset = lance.write_dataset(table, "example", mode="append")
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
>>> predicate = "a >= 2"
>>> updated_fragments = []
>>> deleted_fragment_ids = []
>>> for fragment in dataset.get_fragments():
...     new_fragment = fragment.delete(predicate)
...     if new_fragment is not None:
...         updated_fragments.append(new_fragment)
...     else:
...         deleted_fragment_ids.append(fragment.fragment_id)
>>> operation = lance.LanceOperation.Delete(updated_fragments,
...                                         deleted_fragment_ids,
...                                         predicate)
>>> dataset = lance.LanceDataset.commit("example", operation,
...                                     read_version=dataset.version)
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
deleted_fragment_ids : Iterable[int]
predicate : str
updated_fragments : Iterable[FragmentMetadata]
class Merge(片段: Iterable[FragmentMetadata], schema: LanceSchema | 模式/架构)

添加列的操作。与覆盖不同,此操作不应改变片段的结构,从而保留现有的索引。

fragments

构成新数据集的片段。

Type:

FragmentMetadata的可迭代对象

schema

新数据集的模式。建议传递LanceSchema,传递pyarrow.Schema已弃用。

Type:

LanceSchema 或 pyarrow.Schema

警告

这是一个用于分布式操作的高级API。若要在单机上覆盖或创建新数据集,请使用lance.write_dataset()

示例

要向数据集添加新列,首先定义一个方法,该方法将基于现有列创建新列。然后使用lance.fragment.LanceFragment.add_columns()

>>> import lance
>>> import pyarrow as pa
>>> import pyarrow.compute as pc
>>> table = pa.table({"a": [1, 2, 3, 4], "b": ["a", "b", "c", "d"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
>>> def double_a(batch: pa.RecordBatch) -> pa.RecordBatch:
...     doubled = pc.multiply(batch["a"], 2)
...     return pa.record_batch([doubled], ["a_doubled"])
>>> fragments = []
>>> for fragment in dataset.get_fragments():
...     new_fragment, new_schema = fragment.merge_columns(double_a,
...                                                       columns=['a'])
...     fragments.append(new_fragment)
>>> operation = lance.LanceOperation.Merge(fragments, new_schema)
>>> dataset = lance.LanceDataset.commit("example", operation,
...                                     read_version=dataset.version)
>>> dataset.to_table().to_pandas()
   a  b  a_doubled
0  1  a          2
1  2  b          4
2  3  c          6
3  4  d          8
fragments : Iterable[FragmentMetadata]
schema : LanceSchema | 模式/架构
class Overwrite(new_schema: LanceSchema | 模式/架构, 片段: Iterable[FragmentMetadata])

覆盖或创建新的数据集。

new_schema

新数据集的模式结构。

Type:

pyarrow.Schema

fragments

构成新数据集的片段。

Type:

列表[FragmentMetadata]

警告

这是一个用于分布式操作的高级API。若要在单机上覆盖或创建新数据集,请使用lance.write_dataset()

示例

要创建或覆盖数据集,首先使用 lance.fragment.LanceFragment.create()创建片段。然后将这些片段的元数据收集到列表中,并与模式一起传递给这个类。最后,将该操作传递给 LanceDataset.commit()方法来创建新数据集。

>>> import lance
>>> import pyarrow as pa
>>> tab1 = pa.table({"a": [1, 2], "b": ["a", "b"]})
>>> tab2 = pa.table({"a": [3, 4], "b": ["c", "d"]})
>>> fragment1 = lance.fragment.LanceFragment.create("example", tab1)
>>> fragment2 = lance.fragment.LanceFragment.create("example", tab2)
>>> fragments = [fragment1, fragment2]
>>> operation = lance.LanceOperation.Overwrite(tab1.schema, fragments)
>>> dataset = lance.LanceDataset.commit("example", operation)
>>> dataset.to_table().to_pandas()
   a  b
0  1  a
1  2  b
2  3  c
3  4  d
fragments : Iterable[FragmentMetadata]
new_schema : LanceSchema | 模式/架构
class Project(schema: LanceSchema)

用于投影列的操作。 可使用此运算符删除列或重命名/交换列。

schema

新数据集的lance模式。

Type:

LanceSchema

示例

使用投影运算符交换列:

>>> import lance
>>> import pyarrow as pa
>>> import pyarrow.compute as pc
>>> from lance.schema import LanceSchema
>>> table = pa.table({"a": [1, 2], "b": ["a", "b"], "b1": ["c", "d"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.to_table().to_pandas()
   a  b b1
0  1  a  c
1  2  b  d
>>>
>>> ## rename column `b` into `b0` and rename b1 into `b`
>>> table = pa.table({"a": [3, 4], "b0": ["a", "b"], "b": ["c", "d"]})
>>> lance_schema = LanceSchema.from_pyarrow(table.schema)
>>> operation = lance.LanceOperation.Project(lance_schema)
>>> dataset = lance.LanceDataset.commit("example", operation, read_version=1)
>>> dataset.to_table().to_pandas()
   a b0  b
0  1  a  c
1  2  b  d
schema : LanceSchema
class Restore(版本: int)

恢复数据集先前版本的操作。

version : int
class Rewrite(groups: Iterable[RewriteGroup], rewritten_indices: Iterable[RewrittenIndex])

将操作重写为一个或多个文件和索引合并为一个或多个文件和索引。

groups

已被重写的文件组。

Type:

列表[RewriteGroup]

rewritten_indices

已被重写的索引。

Type:

列表[RewrittenIndex]

警告

这是一个高级API,不适用于一般用途。

groups : Iterable[RewriteGroup]
rewritten_indices : Iterable[RewrittenIndex]
class RewriteGroup(old_fragments: Iterable[FragmentMetadata], new_fragments: Iterable[FragmentMetadata])

重写文件的集合

new_fragments : Iterable[FragmentMetadata]
old_fragments : Iterable[FragmentMetadata]
class RewrittenIndex(old_id: str, new_id: str)

一个已被重写的索引

new_id : str
old_id : str
class Update(removed_fragment_ids: list[int], updated_fragments: list[FragmentMetadata], new_fragments: list[FragmentMetadata], fields_modified: list[int])

更新数据集中行的操作。

removed_fragment_ids

已完全移除的片段ID。

Type:

整数列表

updated_fragments

已更新为包含新删除向量的片段。

Type:

列表[FragmentMetadata]

new_fragments

包含新行的片段。

Type:

列表[FragmentMetadata]

fields_modified

如果在updated_fragments中有任何字段被修改,那么必须在此列出这些字段,以便从覆盖这些字段的索引中移除这些片段。

Type:

整数列表

fields_modified : List[int]
new_fragments : List[FragmentMetadata]
removed_fragment_ids : List[int]
updated_fragments : List[FragmentMetadata]
class lance.dataset.LanceScanner(scanner: _Scanner, dataset: LanceDataset)
analyze_plan() str

执行此扫描器的计划并显示运行时指标。

Parameters:
verbose : bool, default False

使用详细输出格式。

Returns:

计划

Return type:

字符串

count_rows()

统计符合扫描器筛选条件的行数。

Returns:

计数

Return type:

int

property dataset_schema : 模式/架构

从片段中读取批次时使用的模式。

explain_plan(verbose=False) str

返回此扫描器的执行计划。

Parameters:
verbose : bool, default False

使用详细输出格式。

Returns:

计划

Return type:

字符串

static from_batches(*args, **kwargs)

未实现

static from_dataset(*args, **kwargs)

未实现

static from_fragment(*args, **kwargs)

未实现

head(num_rows)

加载数据集的前N行。

Parameters:
num_rows : int

要加载的行数。

Return type:

表格

property projected_schema : 模式/架构

数据的物化模式,考虑了投影。

这是从扫描器返回的任何数据的模式。

scan_batches()

以记录批次形式消费Scanner,并附带对应的片段。

Returns:

record_batches

Return type:

TaggedRecordBatch的迭代器

take(索引)

未实现

to_batches(self)

以记录批次形式消费扫描器数据。

Returns:

record_batches

Return type:

RecordBatch的迭代器

to_reader(self)

将此扫描器作为RecordBatchReader使用。

Return type:

RecordBatchReader

to_table() 表格

将数据读入内存并返回一个pyarrow表格。

class lance.dataset.LanceStats(dataset: _Dataset)

关于LanceDataset的统计信息。

data_stats() DataStatistics

关于数据集中数据的统计信息。

dataset_stats(max_rows_per_group: int = 1024) DatasetStats

关于数据集的相关统计信息。

index_stats(index_name: str) dict[str, Any]

关于索引的统计信息。

Parameters:
index_name : str

要获取统计信息的索引名称。

class lance.dataset.MergeInsertBuilder(dataset, 开启)
conflict_retries(max_retries: int) MergeInsertBuilder

设置操作在出现争用时的重试次数。

如果该值设置为大于0,则操作将保留输入数据的副本(根据数据大小存储在内存或磁盘上),并在出现争用时重试操作。

默认值为10。

execute(data_obj: ReaderLike, *, schema: pa.Schema | None = None)

执行合并插入操作

此函数会更新原始数据集,并返回一个包含合并统计信息的字典——例如插入、更新和删除的行数。

Parameters:
data_obj : ReaderLike

作为操作源表使用的新数据。该参数可以是任何数据源(例如表/数据集),只要write_dataset()能够接受即可。

schema : Optional[pa.Schema]

数据的模式。仅当数据源是某种生成器时才需要提供此信息。

execute_uncommitted(data_obj: ReaderLike, *, schema: pa.Schema | None = None) tuple[事务, dict[str, Any]]

执行合并插入操作但不提交

此函数会更新原始数据集,并返回一个包含合并统计信息的字典——例如插入、更新和删除的行数。

Parameters:
data_obj : ReaderLike

作为操作源表使用的新数据。该参数可以是任何数据源(例如表/数据集),只要write_dataset()能够接受即可。

schema : Optional[pa.Schema]

数据的模式。仅当数据源是某种生成器时才需要提供此信息。

retry_timeout(timeout: timedelta) MergeInsertBuilder

设置用于限制重试的超时时间。

这是放弃操作前允许的最大耗时。无论完成需要多长时间,至少会进行一次尝试。一旦达到此超时时间,后续尝试将被取消。如果在第一次尝试期间已达到超时,操作将在进行第二次尝试前立即取消。

默认值为30秒。

when_matched_update_all(condition: str | None = None) MergeInsertBuilder

配置更新匹配行的操作

调用此方法后,当执行合并插入操作时,任何同时匹配源表和目标表的行将被更新。目标表中的行将被移除,而源表中的行将被添加。

可以指定一个可选条件。这应该是一个SQL过滤器,如果存在,则只有同时满足此过滤器的匹配行才会被更新。SQL过滤器应使用前缀target.来引用目标表中的列,使用前缀source.来引用源表中的列。例如,source.last_update < target.last_update

如果指定了条件且行不满足该条件,则这些行将不会被更新。不满足筛选条件不会导致"匹配"行变为"不匹配"行。

when_not_matched_by_source_delete(expr: str | None = None) MergeInsertBuilder

配置操作以删除不匹配的源行

调用此方法后,当执行合并插入操作时,仅存在于目标表中的行将被删除。可以指定一个可选过滤器来限制删除操作的范围。如果提供了过滤器(作为SQL过滤器),则只有匹配过滤器的行会被删除。

when_not_matched_insert_all() MergeInsertBuilder

配置操作以插入不匹配的行

调用此方法后,当执行合并插入操作时,仅存在于源表中的所有行将被插入到目标表中。

class lance.dataset.ScannerBuilder(ds: LanceDataset)
apply_defaults(default_opts: dict[str, Any]) ScannerBuilder
batch_readahead(nbatches: int | None = None) ScannerBuilder

读取v2文件时忽略此参数

batch_size(batch_size: int) ScannerBuilder

设置扫描器的批量大小

columns(cols: list[str] | dict[str, str] | None = None) ScannerBuilder

启用快速搜索,仅对已索引的数据执行搜索。

用户可以使用Table::optimize()create_index()将新数据包含到索引中,从而使新数据可被搜索。

filter(filter: str | 表达式) ScannerBuilder
fragment_readahead(nfragments: int | None = None) ScannerBuilder

通过全文搜索筛选行。实验性API,在我们支持在类似SQL的filter表达式中实现此功能后,可能会移除该API

在搜索之前必须在给定列上创建倒排索引,

Parameters:
query : str | Query

如果是字符串,表示要搜索的查询字符串,将执行匹配查询。 如果是Query对象,表示要搜索的查询对象, 此时columns参数将被忽略。

columns : list of str, optional

要搜索的列。如果为None,则搜索所有已索引的列。

include_deleted_rows(flag: bool) ScannerBuilder

包含已删除的行

已被删除但仍存在于片段中的行将被返回。这些行的所有列(除_rowaddr外)都将设置为null

io_buffer_size(io_buffer_size: int) ScannerBuilder

设置扫描器的I/O缓冲区大小

这是为处理来自存储的I/O数据而预留的RAM容量。它用于控制扫描器使用的内存量。如果缓冲区已满,扫描器将阻塞,直到缓冲区被处理完毕。

通常这个值应该与并发I/O线程的数量成比例。默认值为2GiB,这个大小可以轻松为32到256个并发I/O线程提供足够的空间。

这个值并不是扫描器使用内存的硬性上限。部分内存空间用于计算(可通过批量大小控制),且Lance在内存返回给用户后不会继续追踪其使用情况。

目前,如果存在单个数据批次大于IO缓冲区大小的情况,扫描器将会发生死锁。这是一个已知问题,将在未来的版本中修复。

该参数仅在读取v2文件时使用

late_materialization(late_materialization: bool | list[str]) ScannerBuilder
limit(n: int | None = None) ScannerBuilder
nearest(: str, q: QueryVectorLike, k: int | None = None, metric: str | None = None, nprobes: int | None = None, minimum_nprobes: int | None = None, maximum_nprobes: int | None = None, refine_factor: int | None = None, use_index: bool = True, ef: int | None = None) ScannerBuilder
offset(n: int | None = None) ScannerBuilder
prefilter(prefilter: bool) ScannerBuilder
scan_in_order(scan_in_order: bool = True) ScannerBuilder

是否按片段和批次的顺序扫描数据集。

如果设置为False,扫描器可能会并发读取片段并乱序返回批次。这可以提高性能,因为允许扫描过程中更多的并发操作,但也可能占用更多内存。

使用v2文件格式时,此参数将被忽略。在v2文件格式中,按顺序扫描不会产生任何性能损耗,因此所有扫描操作都会按顺序执行。

scan_stats_callback(callback: Callable[[ScanStatistics], None]) ScannerBuilder

设置一个回调函数,该函数将在扫描完成后被调用并传入扫描统计信息。回调函数引发的错误将被记录但不会重新抛出。

strict_batch_size(strict_batch_size: bool = False) ScannerBuilder

如果为True,则除最后一批外的所有批次都将严格包含batch_size行记录。 默认为false。 若启用此选项,则小批次数据需要合并处理,这将导致数据复制并产生(通常非常小的)性能损耗。

to_scanner() LanceScanner
use_scalar_index(use_scalar_index: bool = True) ScannerBuilder

设置查询时是否应使用标量索引

当存在标量索引时,扫描将利用它们来优化带过滤条件的查询。但在某些边缘情况下,标量索引可能导致性能下降。该参数允许用户在此类情况下禁用标量索引。

use_stats(use_stats: bool = True) ScannerBuilder

启用统计信息用于查询规划。

禁用统计功能通常用于调试和基准测试场景。 在正常使用情况下应保持开启。

with_fragments(片段: Iterable[LanceFragment] | None) ScannerBuilder
with_row_address(with_row_address: bool = True) ScannerBuilder

支持返回带有行地址的结果。

行地址是数据集中每行记录的唯一但不稳定的标识符,它由片段ID(高32位)和片段内的行偏移量(低32位)组成。通常更推荐使用行ID,因为当行记录被修改或压缩时,行ID不会发生变化。不过,在某些高级使用场景中,行地址可能仍然有用。

with_row_id(with_row_id: bool = True) ScannerBuilder

启用返回行ID的功能。

class lance.dataset.Tag
manifest_size : int
version : int
class lance.dataset.Tags(dataset: _Dataset)

数据集标签管理器。

create(标签: str, 版本: int) None

为指定数据集版本创建标签。

Parameters:
tag : str,

要创建的标签名称。该名称在数据集的所有标签名称中必须是唯一的。

version : int,

要标记的数据集版本。

delete(标签: str) None

从数据集中删除标签。

Parameters:
tag : str,

要删除的标签名称。

list() dict[str, Tag]

列出所有数据集标签。

Returns:

一个将标签名映射到版本号的字典。

Return type:

字典[str, Tag]

update(标签: str, 版本: int) None

将标签更新至新版本。

Parameters:
tag : str,

要更新的标签名称。

version : int,

要标记的新数据集版本。

class lance.dataset.Transaction(读取版本: 'int', 操作: 'LanceOperation.BaseOperation', uuid: 'str' = <factory>, blobs_op: 'Optional[LanceOperation.BaseOperation]' = None)
blobs_op : BaseOperation | None = None
operation : BaseOperation
read_version : int
uuid : str
class lance.dataset.UpdateResult
num_rows_updated : int
class lance.dataset.VectorIndexReader(dataset: LanceDataset, index_name: str)

该类允许您初始化特定向量索引的读取器,获取分区数量,访问索引的质心,并读取索引的特定分区。

Parameters:
dataset : LanceDataset

包含索引的数据集。

index_name : str

要读取的向量索引名称。

示例

import lance
from lance.dataset import VectorIndexReader
import numpy as np
import pyarrow as pa
vectors = np.random.rand(256, 2)
data = pa.table({"vector": pa.array(vectors.tolist(),
    type=pa.list_(pa.float32(), 2))})
dataset = lance.write_dataset(data, "/tmp/index_reader_demo")
dataset.create_index("vector", index_type="IVF_PQ",
    num_partitions=4, num_sub_vectors=2)
reader = VectorIndexReader(dataset, "vector_idx")
assert reader.num_partitions() == 4
partition = reader.read_partition(0)
assert "_rowid" in partition.column_names

异常

ValueError

如果指定的索引不是向量索引。

centroids() ndarray

返回索引的质心

Returns:

IVF的质心,形状为(num_partitions, dim)

Return type:

np.ndarray

num_partitions() int

返回数据集中的分区数量。

Returns:

分区数量。

Return type:

int

read_partition(partition_id: int, *, with_vector: bool = False) 表格

返回给定IVF分区的pyarrow表

Parameters:
partition_id : int

要读取的分区ID

with_vector : bool, default False

是否在读取器中包含向量列,对于IVF_PQ来说,向量列就是PQ编码

Returns:

给定分区的pyarrow表,包含行ID和量化向量(如果with_vector为True)。

Return type:

pa.Table

class lance.dataset.Version
metadata : Dict[str, str]
timestamp : int | datetime
version : int
lance.dataset.write_dataset(data_obj: ReaderLike, uri: str | Path | LanceDataset, schema: pa.Schema | None = None, mode: str = 'create', *, max_rows_per_file: int = 1048576, max_rows_per_group: int = 1024, max_bytes_per_file: int = 96636764160, commit_lock: CommitLock | None = None, 进度: FragmentWriteProgress | None = None, storage_options: dict[str, str] | None = None, data_storage_version: str | None = None, use_legacy_format: bool | None = None, enable_v2_manifest_paths: bool = False, enable_move_stable_row_ids: bool = False, auto_cleanup_options: AutoCleanupConfig | None = None) LanceDataset

将给定的data_obj写入指定的uri

Parameters:
data_obj : Reader-like

要写入的数据。可接受的类型包括: - Pandas DataFrame、Pyarrow Table、Dataset、Scanner 或 RecordBatchReader - Huggingface 数据集

uri : str, Path, or LanceDataset

数据集写入的目标位置(目录)。如果传入的是LanceDataset,则会复用现有会话。

schema : Schema, optional

如果指定且输入为pandas DataFrame,则使用此模式替代默认的pandas到arrow表的转换。

mode : str

create - 创建新数据集(如果uri已存在则会报错)。 overwrite - 创建新的快照版本 append - 创建新版本,该版本是输入数据与最新版本的合并(如果uri不存在则会报错)

max_rows_per_file : int, default 1024 * 1024

在开始新文件之前要写入的最大行数

max_rows_per_group : int, default 1024

在同一个文件中开始新分组前的最大行数

max_bytes_per_file : int, default 90 * 1024 * 1024 * 1024

在开始新文件之前要写入的最大字节数。这是一个软性限制。该限制在每组数据写入后进行检查,这意味着较大的组可能会导致此限制被显著超出。默认值为90 GB,因为我们在对象存储上对每个文件有100 GB的硬性限制。

commit_lock : CommitLock, optional

自定义提交锁。仅当您的对象存储不支持原子提交时才需要。详情请参阅用户指南。

progress : FragmentWriteProgress, optional

实验性API。用于跟踪片段写入进度的功能。传入一个自定义类,该类需定义在开始写入每个片段和完成写入时调用的钩子函数。

storage_options : optional, dict

针对特定存储连接的额外选项。这用于存储连接参数,如凭证、端点等。

data_storage_version : optional, str, default None

要使用的数据存储格式版本。较新的版本效率更高,但需要较新版本的lance才能读取。默认值(None)将使用最新的稳定版本。更多详情请参阅用户指南。

use_legacy_format : optional, bool, default None

已弃用的设置数据存储版本的方法。请改用data_storage_version参数。

enable_v2_manifest_paths : bool, optional

如果为True,且这是一个新数据集,则使用新的V2清单路径。 这些路径能更高效地打开存储在对象存储上具有多个版本的数据集。如果数据集已存在,此参数将不起作用。要迁移现有数据集,请改用 LanceDataset.migrate_manifest_paths_v2()方法。默认为False。

enable_move_stable_row_ids : bool, optional

实验性参数:如果设置为true,写入器将使用移动稳定的行ID。 这些行ID在压缩操作后保持稳定,但在更新后不稳定。 这使得压缩更加高效,因为使用稳定的行ID时, 无需更新二级索引来指向新的行ID。

auto_cleanup_options : optional, AutoCleanupConfig

数据集自动清理的配置选项。 如果设置此选项且这是一个新数据集,旧数据集版本将根据此参数自动清理。 要为现有数据集添加自动清理功能,请使用Dataset::update_config设置 lance.auto_cleanup.interval和lance.auto_cleanup.older_than。 必须同时设置这两个参数才能启用自动清理功能。 如果不设置此参数(默认行为), 则不会执行自动清理。 注意:此选项仅在创建新数据集时生效, 对现有数据集没有影响。

数据集片段

class lance.fragment.DataFile(路径: str, 字段: list[int], column_indices: list[int] = None, file_major_version: int = 0, file_minor_version: int = 0, file_size_bytes: int | None = None)

片段中的数据文件。

path

数据文件的路径。

Type:

字符串

fields

该文件中各列的字段ID。

Type:

整数列表

column_indices

字段在文件中存储的列索引。其长度将与fields相同。

Type:

整数列表

file_major_version

数据存储格式的主版本号。

Type:

int

file_minor_version

数据存储格式的次要版本。

Type:

int

file_size_bytes

数据文件的大小(以字节为单位),如果可用的话。

Type:

可选[int]

column_indices : List[int]
field_ids() list[int]
fields : List[int]
file_major_version : int = 0
file_minor_version : int = 0
file_size_bytes : int | None = None
property path : str
class lance.fragment.DeletionFile(read_version, id, 文件类型, num_deleted_rows)
asdict()
file_type
static from_json(json)
id
json()
num_deleted_rows
path(fragment_id, base_uri=None)
read_version
class lance.fragment.FragmentMetadata(id: int, 文件: list[数据文件], physical_rows: int, deletion_file: DeletionFile | None = None, row_id_meta: RowIdMeta | None = None)

片段的元数据。

id

片段的ID。

Type:

int

files

片段的数据文件。每个数据文件必须具有相同的行数。每个文件存储不同列的子集。

Type:

列表[DataFile]

physical_rows

该片段最初的行数。这是删除前数据文件中的行数。

Type:

int

deletion_file

删除文件(如果有的话)。

Type:

可选[DeletionFile]

row_id_meta

行ID元数据(如果有的话)。

Type:

可选[RowIdMeta]

data_files() list[数据文件]
deletion_file : DeletionFile | None = None
files : List[数据文件]
static from_json(json_data: str) FragmentMetadata
id : int
property num_deletions : int

已从此片段中删除的行数。

property num_rows : int

删除操作后此片段中的行数。

physical_rows : int
row_id_meta : RowIdMeta | None = None
to_json() dict

获取这个作为简单的JSON可序列化字典。

class lance.fragment.LanceFragment(dataset: LanceDataset, fragment_id: int | None, *, 片段: _Fragment | None = None)
count_rows(self, 表达式 filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, bool cache_metadata=True, MemoryPool memory_pool=None)

统计符合扫描器筛选条件的行数。

Parameters:
filter : Expression, default None

Scan将仅返回与筛选条件匹配的行。 如果可能,谓词将被下推以利用数据源中的分区信息或内部元数据,例如Parquet统计信息。否则在生成RecordBatches之前会对已加载的记录批次进行过滤。

batch_size : int, default 131_072

扫描记录批次的最大行数。如果扫描的记录批次超出内存限制,可以调用此方法来减小其大小。

batch_readahead : int, default 16

文件中预读取的批次数量。这可能不适用于所有文件格式。增加此数值将提高内存使用量,但也可能提升IO利用率。

fragment_readahead : int, default 4

预读取的文件数量。增加此数值将提高内存使用率,但也可能提升IO利用率。

fragment_scan_options : FragmentScanOptions, default None

特定于某次扫描和片段类型的选项,这些选项在同个数据集的不同扫描之间可能会发生变化。

use_threads : bool, default True

如果启用,则将根据可用CPU核心数量使用最大并行度。

cache_metadata : bool, default True

如果启用,扫描时可能会缓存元数据以加速重复扫描。

memory_pool : MemoryPool, default None

如需内存分配,可在此指定。若未指定,则使用默认池。

Returns:

计数

Return type:

int

static create(dataset_uri: str | Path, 数据: ReaderLike, fragment_id: int | None = None, schema: pa.Schema | None = None, max_rows_per_group: int = 1024, 进度: FragmentWriteProgress | None = None, mode: str = 'append', *, data_storage_version: str | None = None, use_legacy_format: bool | None = None, storage_options: dict[str, str] | None = None) FragmentMetadata

从给定数据创建FragmentMetadata

如果数据集尚未创建,可以使用此功能。

警告

内部API。该方法不面向终端用户使用。

Parameters:
dataset_uri : str

数据集的URI。

fragment_id : int

片段的ID。

data : pa.Table or pa.RecordBatchReader

要写入片段的数据。

schema : pa.Schema, optional

数据的模式。如果未指定,将从数据中推断模式。

max_rows_per_group : int, default 1024

数据文件中每个分组的最大行数。

progress : FragmentWriteProgress, optional

实验性API。用于跟踪片段写入进度的功能。传入一个自定义类,该类需定义在开始写入每个片段和完成写入时调用的钩子函数。

mode : str, default "append"

写入模式。如果指定为“append”,数据将与现有数据集的模式进行校验。否则,传递“create”或“overwrite”将为模式分配新的字段ID。

data_storage_version : optional, str, default None

要使用的数据存储格式版本。较新的版本效率更高,但需要较新版本的lance才能读取。默认值(None)将使用最新的稳定版本。更多详情请参阅用户指南。

use_legacy_format : bool, default None

已弃用的参数。请改用 data_storage_version。

storage_options : optional, dict

针对特定存储连接的额外选项。这用于存储连接参数,如凭证、端点等。

另请参阅

lance.dataset.LanceOperation.Overwrite

该操作用于创建一个新数据集或覆盖现有数据集,使用此API生成的片段。有关使用此API的示例,请参阅文档页面。

lance.dataset.LanceOperation.Append

该操作用于将通过此API创建的片段追加到现有数据集中。查看文档页面了解使用此API的示例。

Return type:

FragmentMetadata

static create_from_file(文件名: str, dataset: LanceDataset, fragment_id: int) FragmentMetadata

从给定的数据文件URI创建片段。

如果数据文件从数据集中丢失,可以使用此功能。

警告

内部API。该方法不面向终端用户使用。

Parameters:
filename : str

数据文件的文件名。

dataset : LanceDataset

该片段所属的数据集。

fragment_id : int

片段的ID。

data_files()

返回此片段的数据文件。

delete(predicate: str) FragmentMetadata | None

从此片段中删除行。

这将添加或更新该片段的删除文件。它不会修改或删除该片段的数据文件。如果删除后没有剩余行,该方法将返回None。

警告

内部API。该方法不面向终端用户使用。

Parameters:
predicate : str

指定要删除行的SQL谓词。

Returns:

包含新删除文件的新片段,如果没有剩余行则为None。

Return type:

FragmentMetadata 或 None

示例

>>> import lance
>>> import pyarrow as pa
>>> tab = pa.table({"a": [1, 2, 3], "b": [4, 5, 6]})
>>> dataset = lance.write_dataset(tab, "dataset")
>>> frag = dataset.get_fragment(0)
>>> frag.delete("a > 1")
FragmentMetadata(id=0, files=[DataFile(path='...', fields=[0, 1], ...), ...)
>>> frag.delete("a > 0") is None
True

另请参阅

lance.dataset.LanceOperation.Delete

用于将这些更改提交到数据集的操作。有关使用此API的示例,请参阅文档页面。

deletion_file()

返回删除文件(如果有的话)

property fragment_id
head(self, int num_rows, columns=None, 表达式 filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, bool cache_metadata=True, MemoryPool memory_pool=None)

加载片段的前N行。

Parameters:
num_rows : int

要加载的行数。

columns : list of str, default None

要投影的列。这可以是一个要包含的列名列表(顺序和重复项将被保留),或者是一个字典,包含{新列名: 表达式}值用于更高级的投影。

列或表达式列表可以使用特殊字段: __batch_index(批次在片段内的索引), __fragment_index(片段在数据集内的索引), __last_in_fragment(批次是否为片段中的最后一个),以及 __filename(源文件名或源片段的描述)。

这些列将被传递到数据集和相应的数据片段,以避免加载、复制和反序列化计算链后续不需要的列。 默认情况下会投影所有可用列。如果引用的列名在数据集的Schema中不存在,则会引发异常。

filter : Expression, default None

Scan将仅返回与筛选条件匹配的行。 如果可能,谓词将被下推以利用数据源中的分区信息或内部元数据,例如Parquet统计信息。否则在生成RecordBatches之前会对已加载的记录批次进行过滤。

batch_size : int, default 131_072

扫描记录批次的最大行数。如果扫描的记录批次超出内存限制,可以调用此方法来减小其大小。

batch_readahead : int, default 16

文件中预读取的批次数量。这可能不适用于所有文件格式。增加此数值将提高内存使用量,但也可能提升IO利用率。

fragment_readahead : int, default 4

预读取的文件数量。增加此数值将提高内存使用率,但也可能提升IO利用率。

fragment_scan_options : FragmentScanOptions, default None

特定于某次扫描和片段类型的选项,这些选项在同个数据集的不同扫描之间可能会发生变化。

use_threads : bool, default True

如果启用,则将根据可用CPU核心数量使用最大并行度。

cache_metadata : bool, default True

如果启用,扫描时可能会缓存元数据以加速重复扫描。

memory_pool : MemoryPool, default None

如需内存分配,可在此指定。若未指定,则使用默认池。

Return type:

表格

merge(data_obj: ReaderLike, left_on: str, right_on: str | None = None, schema=None) tuple[FragmentMetadata, LanceSchema]

将另一个数据集合并到此片段中。

执行左连接操作,其中片段作为左表,data_obj作为右表。存在于数据集中但不在左表的行将被填充为null值,除非Lance不支持某些类型的null值,这种情况下会抛出错误。

Parameters:
data_obj : Reader-like

要合并的数据。可接受的类型包括: - Pandas DataFrame、Pyarrow Table、Dataset、Scanner、 Iterator[RecordBatch] 或 RecordBatchReader

left_on : str

数据集中用于连接的列名。

right_on : str or None

数据对象中用于连接的列名。如果为None,则默认为left_on。

示例

>>> import lance
>>> import pyarrow as pa
>>> df = pa.table({'x': [1, 2, 3], 'y': ['a', 'b', 'c']})
>>> dataset = lance.write_dataset(df, "dataset")
>>> dataset.to_table().to_pandas()
   x  y
0  1  a
1  2  b
2  3  c
>>> fragments = dataset.get_fragments()
>>> new_df = pa.table({'x': [1, 2, 3], 'z': ['d', 'e', 'f']})
>>> merged = []
>>> schema = None
>>> for f in fragments:
...     f, schema = f.merge(new_df, 'x')
...     merged.append(f)
>>> merge = lance.LanceOperation.Merge(merged, schema)
>>> dataset = lance.LanceDataset.commit("dataset", merge, read_version=1)
>>> dataset.to_table().to_pandas()
   x  y  z
0  1  a  d
1  2  b  e
2  3  c  f

另请参阅

LanceDataset.merge_columns

向此片段添加列。

Returns:

包含合并列和最终模式的新片段。

Return type:

元组[FragmentMetadata, LanceSchema]

merge_columns(value_func: dict[str, str] | BatchUDF | ReaderLike | collections.abc.Callable[[pa.RecordBatch], pa.RecordBatch], columns: list[str] | None = None, batch_size: int | None = None, reader_schema: pa.Schema | None = None) tuple[FragmentMetadata, LanceSchema]

向此片段添加列。

警告

内部API。该方法不面向终端用户使用。

参数及其解释与lance.dataset.LanceDataset.add_columns()操作中的相同。

唯一的区别在于,不是修改数据集,而是创建一个新的片段。新片段的模式也会被返回。这些可以在后续操作中用于将更改提交到数据集。

另请参阅

lance.dataset.LanceOperation.Merge

用于将这些更改提交到数据集的操作。有关使用此API的示例,请参阅文档页面。

Returns:

一个包含新增列和最终模式的新片段。

Return type:

元组[FragmentMetadata, LanceSchema]

property metadata : FragmentMetadata

返回此片段的元数据。

Return type:

FragmentMetadata

property num_deletions : int

返回此片段中已删除的行数。

property partition_expression : 模式/架构

一个表达式,对于该片段查看的所有数据评估为真。

property physical_rows : int

返回此片段中原始的行数。

要获取删除后的行数,请改用count_rows()

property physical_schema : 模式/架构

返回此片段的物理模式。该模式可能与数据集读取模式不同。

scanner(*, columns: list[str] | dict[str, str] | None = None, batch_size: int | None = None, filter: str | pa.compute.Expression | None = None, limit: int | None = None, offset: int | None = None, with_row_id: bool = False, with_row_address: bool = False, batch_readahead: int = 16) LanceScanner

详情请参阅 Dataset::scanner

property schema : 模式/架构

返回此片段的模式。

take(self, 索引, columns=None, 表达式 filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, bool cache_metadata=True, MemoryPool memory_pool=None)

按索引选择数据行。

Parameters:
indices : Array or array-like

数据集中要选择的行索引。

columns : list of str, default None

要投影的列。这可以是一个要包含的列名列表(顺序和重复项将被保留),或者是一个字典,包含{新列名: 表达式}值用于更高级的投影。

列或表达式列表可以使用特殊字段: __batch_index(批次在片段内的索引), __fragment_index(片段在数据集内的索引), __last_in_fragment(批次是否为片段中的最后一个),以及 __filename(源文件名或源片段的描述)。

这些列将被传递到数据集和相应的数据片段,以避免加载、复制和反序列化计算链后续不需要的列。 默认情况下会投影所有可用列。如果引用的列名在数据集的Schema中不存在,则会引发异常。

filter : Expression, default None

Scan将仅返回与筛选条件匹配的行。 如果可能,谓词将被下推以利用数据源中的分区信息或内部元数据,例如Parquet统计信息。否则在生成RecordBatches之前会对已加载的记录批次进行过滤。

batch_size : int, default 131_072

扫描记录批次的最大行数。如果扫描的记录批次超出内存限制,可以调用此方法来减小其大小。

batch_readahead : int, default 16

文件中预读取的批次数量。这可能不适用于所有文件格式。增加此数值将提高内存使用量,但也可能提升IO利用率。

fragment_readahead : int, default 4

预读取的文件数量。增加此数值将提高内存使用率,但也可能提升IO利用率。

fragment_scan_options : FragmentScanOptions, default None

特定于某次扫描和片段类型的选项,这些选项在同个数据集的不同扫描之间可能会发生变化。

use_threads : bool, default True

如果启用,则将根据可用CPU核心数量使用最大并行度。

cache_metadata : bool, default True

如果启用,扫描时可能会缓存元数据以加速重复扫描。

memory_pool : MemoryPool, default None

如需内存分配,可在此指定。若未指定,则使用默认池。

Return type:

表格

to_batches(self, Schema schema=None, columns=None, 表达式 filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, bool cache_metadata=True, MemoryPool memory_pool=None)

将片段读取为物化的记录批次。

Parameters:
schema : Schema, optional

用于扫描的具体模式。

columns : list of str, default None

要投影的列。这可以是一个要包含的列名列表(顺序和重复项将被保留),或者是一个字典,包含{新列名: 表达式}值用于更高级的投影。

列或表达式列表可以使用特殊字段: __batch_index(批次在片段内的索引), __fragment_index(片段在数据集内的索引), __last_in_fragment(批次是否为片段中的最后一个),以及 __filename(源文件名或源片段的描述)。

这些列将被传递到数据集和相应的数据片段,以避免加载、复制和反序列化计算链后续不需要的列。 默认情况下会投影所有可用列。如果引用的列名在数据集的Schema中不存在,则会引发异常。

filter : Expression, default None

Scan将仅返回与筛选条件匹配的行。 如果可能,谓词将被下推以利用数据源中的分区信息或内部元数据,例如Parquet统计信息。否则在生成RecordBatches之前会对已加载的记录批次进行过滤。

batch_size : int, default 131_072

扫描记录批次的最大行数。如果扫描的记录批次超出内存限制,可以调用此方法来减小其大小。

batch_readahead : int, default 16

文件中预读取的批次数量。这可能不适用于所有文件格式。增加此数值将提高内存使用量,但也可能提升IO利用率。

fragment_readahead : int, default 4

预读取的文件数量。增加此数值将提高内存使用率,但也可能提升IO利用率。

fragment_scan_options : FragmentScanOptions, default None

特定于某次扫描和片段类型的选项,这些选项在同个数据集的不同扫描之间可能会发生变化。

use_threads : bool, default True

如果启用,则将根据可用CPU核心数量使用最大并行度。

cache_metadata : bool, default True

如果启用,扫描时可能会缓存元数据以加速重复扫描。

memory_pool : MemoryPool, default None

如需内存分配,可在此指定。若未指定,则使用默认池。

Returns:

record_batches

Return type:

RecordBatch的迭代器

to_table(self, Schema schema=None, columns=None, 表达式 filter=None, int batch_size=_DEFAULT_BATCH_SIZE, int batch_readahead=_DEFAULT_BATCH_READAHEAD, int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None, bool use_threads=True, bool cache_metadata=True, MemoryPool memory_pool=None)

将此片段转换为表格。

请谨慎使用此便捷工具。它会在创建表之前,将扫描结果序列化并全部加载到内存中。

Parameters:
schema : Schema, optional

用于扫描的具体模式。

columns : list of str, default None

要投影的列。这可以是一个要包含的列名列表(顺序和重复项将被保留),或者是一个字典,包含{新列名: 表达式}值用于更高级的投影。

列或表达式列表可以使用特殊字段: __batch_index(批次在片段内的索引), __fragment_index(片段在数据集内的索引), __last_in_fragment(批次是否为片段中的最后一个),以及 __filename(源文件名或源片段的描述)。

这些列将被传递到数据集和相应的数据片段,以避免加载、复制和反序列化计算链后续不需要的列。 默认情况下会投影所有可用列。如果引用的列名在数据集的Schema中不存在,则会引发异常。

filter : Expression, default None

Scan将仅返回与筛选条件匹配的行。 如果可能,谓词将被下推以利用数据源中的分区信息或内部元数据,例如Parquet统计信息。否则在生成RecordBatches之前会对已加载的记录批次进行过滤。

batch_size : int, default 131_072

扫描记录批次的最大行数。如果扫描的记录批次超出内存限制,可以调用此方法来减小其大小。

batch_readahead : int, default 16

文件中预读取的批次数量。这可能不适用于所有文件格式。增加此数值将提高内存使用量,但也可能提升IO利用率。

fragment_readahead : int, default 4

预读取的文件数量。增加此数值将提高内存使用率,但也可能提升IO利用率。

fragment_scan_options : FragmentScanOptions, default None

特定于某次扫描和片段类型的选项,这些选项在同个数据集的不同扫描之间可能会发生变化。

use_threads : bool, default True

如果启用,则将根据可用CPU核心数量使用最大并行度。

cache_metadata : bool, default True

如果启用,扫描时可能会缓存元数据以加速重复扫描。

memory_pool : MemoryPool, default None

如需内存分配,可在此指定。若未指定,则使用默认池。

Returns:

表格

Return type:

表格

class lance.fragment.RowIdMeta
asdict()
static from_json(json)
json()
lance.fragment.write_fragments(数据: ReaderLike, dataset_uri: str | Path | LanceDataset, schema: 模式/架构 | None = None, *, return_transaction: True, mode: str = 'append', max_rows_per_file: int = 1024 * 1024, max_rows_per_group: int = 1024, max_bytes_per_file: int = DEFAULT_MAX_BYTES_PER_FILE, 进度: FragmentWriteProgress | None = None, data_storage_version: str | None = None, use_legacy_format: bool | None = None, storage_options: dict[str, str] | None = None, enable_move_stable_row_ids: bool = False) 事务
lance.fragment.write_fragments(数据: ReaderLike, dataset_uri: str | Path | LanceDataset, schema: 模式/架构 | None = None, *, return_transaction: False = False, mode: str = 'append', max_rows_per_file: int = 1024 * 1024, max_rows_per_group: int = 1024, max_bytes_per_file: int = DEFAULT_MAX_BYTES_PER_FILE, 进度: FragmentWriteProgress | None = None, data_storage_version: str | None = None, use_legacy_format: bool | None = None, storage_options: dict[str, str] | None = None, enable_move_stable_row_ids: bool = False) list[FragmentMetadata]

将数据写入一个或多个片段。

警告

这是一个底层API,专为手动实现分布式写入而设计。对于大多数用户,推荐使用lance.write_dataset() API。

Parameters:
data : pa.Table or pa.RecordBatchReader

要写入片段的数据。

dataset_uri : str, Path, or LanceDataset

数据集或数据集对象的URI。

schema : pa.Schema, optional

数据的模式。如果未指定,将从数据中推断模式。

return_transaction : bool, default False

如果为真,事务将被返回。

mode : str, default "append"

写入模式。如果指定为“append”,数据将与现有数据集的模式进行校验。否则,传递“create”或“overwrite”将为模式分配新的字段ID。

max_rows_per_file : int, default 1024 * 1024

每个数据文件的最大行数。

max_rows_per_group : int, default 1024

数据文件中每个分组的最大行数。

max_bytes_per_file : int, default 90 * 1024 * 1024 * 1024

在开始新文件之前要写入的最大字节数。这是一个软性限制。该限制在每组数据写入后进行检查,这意味着较大的组可能会导致此限制被显著超出。默认值为90 GB,因为我们在对象存储上对每个文件有100 GB的硬性限制。

progress : FragmentWriteProgress, optional

实验性API。用于跟踪片段写入进度的功能。传入一个自定义类,该类需定义在开始写入每个片段和完成写入时调用的钩子函数。

data_storage_version : optional, str, default None

要使用的数据存储格式版本。较新的版本效率更高,但需要较新版本的lance才能读取。默认值(None)将使用2.0版本。更多详情请参阅用户指南。

use_legacy_format : optional, bool, default None

已弃用的设置数据存储版本的方法。请改用data_storage_version参数。

storage_options : Optional[Dict[str, str]]

针对特定存储连接的额外选项。这用于存储连接参数,如凭证、端点等。

enable_move_stable_row_ids : bool

实验性功能:如果设置为true,写入器将使用移动稳定的行ID。这些行ID在压缩操作后保持稳定,但在更新后不稳定。这使得压缩操作更高效,因为使用稳定的行ID时,无需更新二级索引来指向新的行ID。

Returns:

如果return_transaction为False:

返回已写入分片的FragmentMetadata列表。此时分片ID保持为零值,表示尚未分配具体ID。这些ID将在分片提交到数据集时被分配。

如果return_transaction为True:

返回写入事务对象。事务类型将对应指定的mode参数。该事务对象可传递给LanceDataset.commit()方法。

Return type:

列表[FragmentMetadata] | Transaction