读取和写入数据

写入Lance数据集

如果您熟悉Apache PyArrow, 您会发现创建Lance数据集非常简单。 首先使用lance.write_dataset()函数编写一个pyarrow.Table

>>> import lance
>>> import pyarrow as pa

>>> table = pa.Table.from_pylist([{"name": "Alice", "age": 20},
...                               {"name": "Bob", "age": 30}])
>>> ds = lance.write_dataset(table, "./alice_and_bob.lance")

如果数据集过大无法完全加载到内存中,您可以使用lance.write_dataset()流式传输数据,该方法也支持pyarrow.RecordBatchIterator。在这种情况下,您需要为数据集提供一个pyarrow.Schema

>>> def producer() -> Iterator[pa.RecordBatch]:
...     """An iterator of RecordBatches."""
...     yield pa.RecordBatch.from_pylist([{"name": "Alice", "age": 20}])
...     yield pa.RecordBatch.from_pylist([{"name": "Bob", "age": 30}])

>>> schema = pa.schema([
...     ("name", pa.string()),
...     ("age", pa.int32()),
... ])

>>> ds = lance.write_dataset(producer(),
...                          "./alice_and_bob.lance",
...                          schema=schema, mode="overwrite")
>>> ds.count_rows()
2

lance.write_dataset() 支持写入 pyarrow.Tablepandas.DataFramepyarrow.dataset.Dataset 以及 Iterator[pyarrow.RecordBatch]

添加行

要将数据插入到您的数据集中,您可以使用LanceDataset.insertwrite_dataset()并设置mode=append参数。

>>> import lance
>>> import pyarrow as pa

>>> table = pa.Table.from_pylist([{"name": "Alice", "age": 20},
...                               {"name": "Bob", "age": 30}])
>>> ds = lance.write_dataset(table, "./insert_example.lance")

>>> new_table = pa.Table.from_pylist([{"name": "Carla", "age": 37}])
>>> ds.insert(new_table)
>>> ds.to_table().to_pandas()
    name  age
0  Alice   20
1    Bob   30
2  Carla   37

>>> new_table2 = pa.Table.from_pylist([{"name": "David", "age": 42}])
>>> ds = lance.write_dataset(new_table2, ds, mode="append")
>>> ds.to_table().to_pandas()
    name  age
0  Alice   20
1    Bob   30
2  Carla   37
3  David   42

删除行

Lance支持使用SQL过滤器从数据集中删除行,如filter-push-down中所述。 例如,要从上述数据集中删除Bob的行,可以使用:

>>> import lance

>>> dataset = lance.dataset("./alice_and_bob.lance")
>>> dataset.delete("name = 'Bob'")
>>> dataset2 = lance.dataset("./alice_and_bob.lance")
>>> dataset2.to_table().to_pandas()
    name  age
0  Alice   20

注意

Lance格式是不可变的。每次写入操作都会创建数据集的新版本,因此用户必须重新打开数据集才能看到更改。同样,行删除是通过在单独的删除索引中标记它们为已删除来实现的,而不是重写文件。这种方法速度更快,并且避免了使引用文件的任何索引失效,确保后续查询不会返回已删除的行。

更新行

Lance支持通过lance.LanceDataset.update()方法使用SQL表达式更新行。例如,如果我们发现数据集中Bob的名字有时被写成Blob,我们可以这样修复:

import lance

dataset = lance.dataset("./alice_and_bob.lance")
dataset.update({"name": "'Bob'"}, where="name = 'Blob'")

更新值是SQL表达式,这就是为什么'Bob'要用单引号包裹。这意味着我们可以根据需要引用现有列的复杂表达式。例如,如果两年过去了,我们希望在同一示例中更新Alice和Bob的年龄,可以这样写:

import lance

dataset = lance.dataset("./alice_and_bob.lance")
dataset.update({"age": "age + 2"})

如果您需要更新一组单独的行并赋予新值,通常使用下面描述的合并插入操作会更高效。

import lance

# Change the ages of both Alice and Bob
new_table = pa.Table.from_pylist([{"name": "Alice", "age": 30},
                                  {"name": "Bob", "age": 20}])

# This works, but is inefficient, see below for a better approach
dataset = lance.dataset("./alice_and_bob.lance")
for idx in range(new_table.num_rows):
  name = new_table[0][idx].as_py()
  new_age = new_table[1][idx].as_py()
  dataset.update({"age": new_age}, where=f"name='{name}'")

合并插入

LanceDB支持合并插入操作。该功能可用于批量添加新数据,同时(可能)与现有数据进行匹配。此操作适用于多种不同的使用场景。

批量更新

lance.LanceDataset.update() 方法适用于基于筛选条件更新行数据。但如果需要用新行完全替换现有行,那么 lance.LanceDataset.merge_insert() 操作会更高效:

>>> import lance

>>> dataset = lance.dataset("./alice_and_bob.lance")
>>> dataset.to_table().to_pandas()
    name  age
0  Alice   20
1    Bob   30
>>> # Change the ages of both Alice and Bob
>>> new_table = pa.Table.from_pylist([{"name": "Alice", "age": 2},
...                                   {"name": "Bob", "age": 3}])
>>> # This will use `name` as the key for matching rows.  Merge insert
>>> # uses a JOIN internally and so you typically want this column to
>>> # be a unique key or id of some kind.
>>> rst = dataset.merge_insert("name") \
...        .when_matched_update_all() \
...        .execute(new_table)
>>> dataset.to_table().to_pandas()
    name  age
0  Alice    2
1    Bob    3

请注意,与更新操作类似,被修改的行将被移除并重新插入表中,其位置会变到末尾。此外,由于内部使用了哈希连接操作,这些行的相对顺序可能会发生变化。

如果不存在则插入

有时我们只想在之前未插入过的情况下才插入数据。例如,当我们有一批数据但不确定哪些行是之前添加过的,同时又不希望创建重复行时,这种情况就会发生。我们可以使用合并插入操作来实现这一需求:

>>> # Bob is already in the table, but Carla is new
>>> new_table = pa.Table.from_pylist([{"name": "Bob", "age": 30},
...                                   {"name": "Carla", "age": 37}])
>>>
>>> dataset = lance.dataset("./alice_and_bob.lance")
>>>
>>> # This will insert Carla but leave Bob unchanged
>>> _ = dataset.merge_insert("name") \
...        .when_not_matched_insert_all() \
...        .execute(new_table)
>>> # Verify that Carla was added but Bob remains unchanged
>>> dataset.to_table().to_pandas()
    name  age
0  Alice   20
1    Bob   30
2  Carla   37

更新或插入 (Upsert)

有时我们希望结合上述两种行为。如果某行已存在,我们希望更新它;如果该行不存在,我们希望添加它。这种操作有时被称为"upsert"。我们也可以使用合并插入操作来实现:

>>> import lance
>>> import pyarrow as pa
>>>
>>> # Change Carla's age and insert David
>>> new_table = pa.Table.from_pylist([{"name": "Carla", "age": 27},
...                                   {"name": "David", "age": 42}])
>>>
>>> dataset = lance.dataset("./alice_and_bob.lance")
>>>
>>> # This will update Carla and insert David
>>> _ = dataset.merge_insert("name") \
...        .when_matched_update_all() \
...        .when_not_matched_insert_all() \
...        .execute(new_table)
>>> # Verify the results
>>> dataset.to_table().to_pandas()
    name  age
0  Alice   20
1    Bob   30
2  Carla   27
3  David   42

替换部分数据

一种不太常见但仍然有用的行为是,用新数据替换现有行的某些区域(由过滤器定义)。这类似于在单个事务中同时执行删除和插入操作。例如:

>>> import lance
>>> import pyarrow as pa
>>>
>>> new_table = pa.Table.from_pylist([{"name": "Edgar", "age": 46},
...                                   {"name": "Francene", "age": 44}])
>>>
>>> dataset = lance.dataset("./alice_and_bob.lance")
>>> dataset.to_table().to_pandas()
      name  age
0    Alice   20
1      Bob   30
2  Charlie   45
3    Donna   50
>>>
>>> # This will remove anyone above 40 and insert our new data
>>> _ = dataset.merge_insert("name") \
...        .when_not_matched_insert_all() \
...        .when_not_matched_by_source_delete("age >= 40") \
...        .execute(new_table)
>>> # Verify the results - people over 40 replaced with new data
>>> dataset.to_table().to_pandas()
       name  age
0     Alice   20
1       Bob   30
2     Edgar   46
3  Francene   44

读取Lance数据集

要打开一个Lance数据集,请使用lance.dataset()函数:

import lance
ds = lance.dataset("s3://bucket/path/imagenet.lance")
# Or local path
ds = lance.dataset("./imagenet.lance")

注意

Lance目前支持本地文件系统、AWS s3和Google云存储(gs)作为存储后端。更多信息请参阅`Object Store Configuration`_

读取Lance数据集最直接的方法是使用lance.LanceDataset.to_table()方法将整个数据集加载到内存中。

table = ds.to_table()

由于Lance是一种高性能的列式存储格式,它能够通过利用列投影(column projection)下推和谓词过滤(predicate filtering)下推技术,实现对数据集子集的高效读取。

table = ds.to_table(
    columns=["image", "label"],
    filter="label = 2 AND text IS NOT NULL",
    limit=1000,
    offset=3000)

Lance理解读取诸如image等重量级列的成本。因此,它采用优化的查询计划来高效执行操作。

如果数据集过大无法全部加载到内存中,您可以使用lance.LanceDataset.to_batches()方法分批读取:

for batch in ds.to_batches(columns=["image"], filter="label = 10"):
    # do something with batch
    compute_on_batch(batch)

不出所料,to_batches() 函数接收与 to_table() 相同的参数。

Lance支持使用标准SQL表达式作为数据集过滤的谓词条件。 通过将SQL谓词直接下推到存储系统, 扫描过程中的整体I/O负载得以显著降低。

目前,Lance支持越来越多的表达式。

  • >, >=, <, <=, =

  • AND, OR, NOT

  • IS NULL, IS NOT NULL

  • IS TRUE, IS NOT TRUE, IS FALSE, IS NOT FALSE

  • IN

  • LIKE, NOT LIKE

  • regexp_match(column, pattern)

  • CAST

例如,以下筛选字符串是可接受的:

((label IN [10, 20]) AND (note['email'] IS NOT NULL))
    OR NOT note['created']

嵌套字段可以通过下标访问。结构体字段可以使用字段名作为下标,而列表字段可以使用索引作为下标。

如果您的列名包含特殊字符或是SQL关键字,可以使用反引号(`)进行转义。对于嵌套字段,路径的每个部分都必须用反引号包裹。

`CUBE` = 10 AND `column name with space` IS NOT NULL
  AND `nested with space`.`inner with space` < 2

警告

不支持包含句点(.)的字段名称。

日期、时间戳和小数的字面量可以通过在类型名称后写入字符串值来表示。例如

date_col = date '2021-01-01'
and timestamp_col = timestamp '2021-01-01 00:00:00'
and decimal_col = decimal(8,3) '1.000'

对于时间戳列,可以在类型参数中指定精度值。微秒级精度(6)是默认设置。

SQL

时间单位

timestamp(0)

timestamp(3)

毫秒

timestamp(6)

微秒

timestamp(9)

纳秒

Lance内部以Arrow格式存储数据。SQL类型到Arrow的映射关系如下:

SQL 类型

Arrow 类型

boolean

Boolean

tinyint / tinyint unsigned

Int8 / UInt8

smallint / smallint unsigned

Int16 / UInt16

intinteger / int unsignedinteger unsigned

Int32 / UInt32

bigint / bigint unsigned

Int64 / UInt64

float

Float32

double

Float64

decimal(precision, scale)

Decimal128

date

Date32

timestamp

时间戳 (1)

string

Utf8

binary

二进制

  1. 请参阅前表中的精度映射。

Lance作为列式格式的一个独特特性是,它允许您快速读取随机样本。

# Access the 2nd, 101th and 501th rows
data = ds.take([1, 100, 500], columns=["image", "label"])

实现快速随机访问单行数据的能力,在促进机器学习训练中的随机采样和洗牌等各种工作流程中发挥着关键作用。此外,它还使用户能够构建二级索引,从而快速执行查询以提高性能。

表维护

随着时间的推移,某些操作会导致Lance数据集出现布局不佳的情况。例如,多次小规模追加会产生大量小片段。或者删除大量行会导致查询变慢,因为需要过滤掉已删除的行。

为了解决这个问题,Lance提供了优化数据集布局的方法。

可以重写数据文件以减少文件数量。当向lance.dataset.DatasetOptimizer.compact_files()传递target_rows_per_fragment参数时,Lance会跳过已超过该行数的片段,并重写其他片段。片段将根据其ID进行合并,从而保留数据的固有顺序。

注意

压缩会创建表的新版本。它不会删除表的旧版本及其引用的文件。

import lance

dataset = lance.dataset("./alice_and_bob.lance")
dataset.optimize.compact_files(target_rows_per_fragment=1024 * 1024)

在压缩过程中,Lance还可以移除已删除的行。重写后的片段将不再包含删除文件。这可以提升扫描性能,因为在扫描过程中无需跳过软删除的行。

当文件被重写时,原始行地址将失效。这意味着如果这些文件之前属于任何ANN索引,现在将不再包含在内。因此,建议在重建索引之前先重写文件。