模式演进

Lance支持模式演进:可以添加、删除和修改数据集中的列。大多数这些操作都无需重写数据集中的数据文件,因此是非常高效的操作。

通常情况下,模式变更会与大多数并发写入操作产生冲突。例如,当您修改数据集模式的同时有其他用户正在追加数据,根据操作顺序的不同,要么您的模式变更会失败,要么对方的追加操作会失败。因此,建议在没有其他写入操作时执行模式变更。

重命名列

可以使用lance.LanceDataset.alter_columns()方法重命名列。

table = pa.table({"id": pa.array([1, 2, 3])})
dataset = lance.write_dataset(table, "ids")
dataset.alter_columns({"path": "id", "name": "new_id"})
print(dataset.to_table().to_pandas())
   new_id
0       1
1       2
2       3

这同样适用于嵌套列。要访问嵌套列,请使用点号 (.)来分隔嵌套层级。例如:

data = [
  {"meta": {"id": 1, "name": "Alice"}},
  {"meta": {"id": 2, "name": "Bob"}},
]
schema = pa.schema([
    ("meta", pa.struct([
        ("id", pa.int32()),
        ("name", pa.string()),
    ]))
])
dataset = lance.write_dataset(data, "nested_rename")
dataset.alter_columns({"path": "meta.id", "name": "new_id"})
print(dataset.to_table().to_pandas())
                             meta
0  {'new_id': 1, 'name': 'Alice'}
1    {'new_id': 2, 'name': 'Bob'}

转换列数据类型

除了更改列名外,您还可以使用lance.LanceDataset.alter_columns()方法更改列的数据类型。这需要将该列重写到新的数据文件中,但不需要重写其他列。

注意

如果列已建立索引,当列类型变更时,该索引将被删除。

此方法可用于更改列中的向量类型。例如,我们可以将float32嵌入列转换为float16列,以牺牲精度为代价节省磁盘空间:

table = pa.table({
   "id": pa.array([1, 2, 3]),
   "embedding": pa.FixedShapeTensorArray.from_numpy_ndarray(
       np.random.rand(3, 128).astype("float32"))
})
dataset = lance.write_dataset(table, "embeddings")
dataset.alter_columns({"path": "embedding",
                       "data_type": pa.list_(pa.float16(), 128)})
print(dataset.schema)
id: int64
embedding: fixed_size_list<item: halffloat>[128]
  child 0, item: halffloat

添加新列

可以通过lance.LanceDataset.add_columns()方法在单个操作中添加并填充新列。有两种方式可以指定如何填充新列:第一种是为每个新列提供SQL表达式,第二种是提供一个函数来生成新列数据。

SQL表达式既可以是独立表达式,也可以引用现有列。SQL字面值可用于为所有现有行设置单一值。

table = pa.table({"name": pa.array(["Alice", "Bob", "Carla"])})
dataset = lance.write_dataset(table, "names")
dataset.add_columns({
    "hash": "sha256(name)",
    "status": "'active'",
})
print(dataset.to_table().to_pandas())
    name                                               hash  status
0  Alice  b';\xc5\x10b\x97<E\x8dZo-\x8dd\xa0#$cT\xad~\x0...  active
1    Bob  b'\xcd\x9f\xb1\xe1H\xcc\xd8D.Z\xa7I\x04\xccs\x...  active
2  Carla  b'\xad\x8d\x83\xff\xd8+Z\x8e\xd4)\xe8Y+\\\xb3\...  active

你也可以提供一个Python函数来生成新列的数据。例如,这可以用于计算一个新的嵌入列。该函数应接收一个PyArrow RecordBatch,并返回一个PyArrow RecordBatch或Pandas DataFrame。该函数将对数据集中的每个批次调用一次。

如果函数计算成本高昂且可能失败,建议在UDF中设置一个检查点文件。该检查点文件会在每次调用后保存UDF的状态,这样如果UDF失败,可以从最后一个检查点重新启动。请注意,这个文件可能会变得相当大,因为它需要存储多达整个数据文件的未保存结果。

import lance
import pyarrow as pa
import numpy as np

table = pa.table({"id": pa.array([1, 2, 3])})
dataset = lance.write_dataset(table, "ids")

@lance.batch_udf(checkpoint_file="embedding_checkpoint.sqlite")
def add_random_vector(batch):
    embeddings = np.random.rand(batch.num_rows, 128).astype("float32")
    return pd.DataFrame({"embedding": embeddings})
dataset.add_columns(add_random_vector)

仅通过Schema添加新列

我们在生产环境中常见的一个用例是向数据集添加新列但不填充数据。这对于后续运行大型分布式作业来惰性填充该列非常有用。要实现这一点,您可以使用lance.LanceDataset.add_columns()方法,通过pyarrow.Fieldpyarrow.Schema来添加列。

table = pa.table({"id": pa.array([1, 2, 3])})
dataset = lance.write_dataset(table, "null_columns")

# With pyarrow Field
dataset.add_columns(pa.field("embedding", pa.list_(pa.float32(), 128)))
assert dataset.schema == pa.schema([
    ("id", pa.int64()),
    ("embedding", pa.list_(pa.float32(), 128)),
])

# With pyarrow Schema
dataset.add_columns(pa.schema([
    ("label", pa.string()),
    ("score", pa.float32()),
]))
assert dataset.schema == pa.schema([
    ("id", pa.int64()),
    ("embedding", pa.list_(pa.float32(), 128)),
    ("label", pa.string()),
    ("score", pa.float32()),
])

此操作非常快速,因为它仅更新数据集的元数据。

使用merge添加新列

如果您已经预先计算了一个或多个新列,可以使用lance.LanceDataset.merge()方法将它们添加到现有数据集中。这样可以在无需重写整个数据集的情况下填充额外的列。

要使用merge方法,需要提供一个包含要添加列的新数据集,以及一个用于将新数据与现有数据集连接的列名。

例如,假设我们有一个包含嵌入向量和ID的数据集:

table = pa.table({
   "id": pa.array([1, 2, 3]),
   "embedding": pa.array([np.array([1, 2, 3]), np.array([4, 5, 6]),
                          np.array([7, 8, 9])])
})
dataset = lance.write_dataset(table, "embeddings", mode="overwrite")

现在如果我们想添加一列已生成的标签,可以通过合并一个新表来实现:

new_data = pa.table({
   "id": pa.array([1, 2, 3]),
   "label": pa.array(["horse", "rabbit", "cat"])
})
dataset.merge(new_data, "id")
print(dataset.to_table().to_pandas())
   id  embedding   label
0   1  [1, 2, 3]   horse
1   2  [4, 5, 6]  rabbit
2   3  [7, 8, 9]     cat

删除列

最后,你可以使用lance.LanceDataset.drop_columns()方法从数据集中删除列。这是一个仅修改元数据的操作,不会删除磁盘上的数据,因此执行速度非常快。

>>> table = pa.table({"id": pa.array([1, 2, 3]),
...                  "name": pa.array(["Alice", "Bob", "Carla"])})
>>> dataset = lance.write_dataset(table, "names", mode="overwrite")
>>> dataset.drop_columns(["name"])
>>> dataset.schema
id: int64

要从磁盘上实际删除数据,必须重写文件以移除列,然后删除旧文件。这可以通过使用lance.dataset.DatasetOptimizer.compact_files()后接lance.LanceDataset.cleanup_old_versions()来完成。