Delta Kernel Java 用户指南

什么是Delta Kernel?

Delta Kernel 是一个用于操作 Delta 表的库。具体来说,它提供了简单而专注的 API 来读写 Delta 表,而无需理解 Delta 协议的细节。您可以使用该库执行以下操作:

为您的项目设置Delta Kernel

你需要io.delta:delta-kernel-apiio.delta:delta-kernel-defaults依赖项。以下是Maven pom文件依赖列表的示例。

delta-kernel-api 模块包含了Kernel的核心部分,它抽象出Delta协议以实现对Delta表的读写操作。该模块利用由连接器传递给Kernel API的Engine接口来执行繁重操作,例如读写Parquet或JSON文件、评估表达式或执行文件系统操作(如列出Delta日志目录内容等)。Kernel在delta-kernel-defaults模块中提供了Engine的默认实现。连接器可以实现自己的Engine版本,以利用其原生实现Engine提供的功能。例如:连接器可以使用自己的Parquet读取器,而不是使用DefaultEngine中的读取器。更多细节将在later中介绍。

<dependencies>
  <dependency>
    <groupId>io.delta</groupId>
    <artifactId>delta-kernel-api</artifactId>
    <version>${delta-kernel.version}</version>
  </dependency>

  <dependency>
    <groupId>io.delta</groupId>
    <artifactId>delta-kernel-defaults</artifactId>
    <version>${delta-kernel.version}</version>
  </dependency>
</dependencies>

如果你的连接器没有使用Kernel提供的`DefaultEngine`,则可以跳过上述列表中的delta-kernel-defaults依赖项。

在单进程中读取Delta表

在本节中,我们将逐步介绍如何构建一个非常简单的单进程Delta连接器,该连接器可以使用Delta Kernel提供的默认`Engine`实现来读取Delta表。

您可以在项目中自行编写这段代码,或者使用Delta代码仓库中提供的示例

步骤1:对Delta表进行全表扫描

主要入口点是`io.delta.kernel.Table`,它是Delta表的编程表示形式。假设您在目录myTablePath下有一个Delta表。您可以按如下方式创建Table对象:

import io.delta.kernel.*;
import io.delta.kernel.defaults.*;
import org.apache.hadoop.conf.Configuration;

String myTablePath = <my-table-path>; // fully qualified table path. Ex: file:/user/tables/myTable
Configuration hadoopConf = new Configuration();
Engine myEngine = DefaultEngine.create(hadoopConf);
Table myTable = Table.forPath(myEngine, myTablePath);

注意我们创建的默认`Engine`用于初始化myTable对象。该对象允许您插入自己的库来处理计算密集型操作,如Parquet文件读取、JSON解析等。目前您可以暂时忽略它。稍后当我们讨论如何为分布式处理引擎构建更复杂的连接器时,会进一步探讨这个话题。

通过这个myTable对象,您可以创建一个`Snapshot`对象,它表示表在特定版本中的一致状态(也称为快照一致性)。

Snapshot mySnapshot = myTable.getLatestSnapshot(myEngine);

现在我们有了表格的一致快照视图,可以查询关于表格的更多详细信息。例如,您可以获取此快照的版本和架构。

long version = mySnapshot.getVersion(myEngine);
StructType tableSchema = mySnapshot.getSchema(myEngine);

接下来,为了读取表数据,我们需要构建一个`Scan`对象。要构建Scan对象,需要先创建一个`ScanBuilder`对象,该对象可选地允许选择要读取的列子集或设置查询过滤器。目前,我们先忽略这些可选设置。

Scan myScan = mySnapshot.getScanBuilder(myEngine).build()

// Common information about scanning for all data files to read.
Row scanState = myScan.getScanState(myEngine)

// Information about the list of scan files to read
CloseableIterator<FilteredColumnarBatch> scanFiles = myScan.getScanFiles(myEngine)

这个`Scan`对象包含了开始读取表所需的所有元数据。要从表中读取文件数据,需要两个关键信息。

  • myScan.getScanFiles(Engine): 返回扫描文件作为列式批次(表示为FilteredColumnarBatch的迭代器,稍后会详细介绍),其中批次中的每个选定行都包含有关存储表数据的单个文件的信息。

  • myScan.getScanState(Engine): 返回读取任何文件所需的快照级别信息。请注意,这是单行数据且对所有扫描文件通用。

对于每个扫描文件,必须从文件中读取物理数据。要读取的列在扫描文件状态中指定。一旦读取了物理数据,您需要调用`ScanFile.transformPhysicalData(…)`,并传入扫描状态和从扫描文件读取的物理数据。该API负责将物理数据(例如添加分区列)转换为表的逻辑数据。以下是一个在单线程中读取所有表数据的示例。

CloserableIterator<FilteredColumnarBatch> fileIter = scanObject.getScanFiles(myEngine);

Row scanStateRow = scanObject.getScanState(myEngine);

while(fileIter.hasNext()) {
  FilteredColumnarBatch scanFileColumnarBatch = fileIter.next();

  // Get the physical read schema of columns to read from the Parquet data files
  StructType physicalReadSchema =
    ScanStateRow.getPhysicalDataReadSchema(engine, scanStateRow);

  try (CloseableIterator<Row> scanFileRows = scanFileColumnarBatch.getRows()) {
    while (scanFileRows.hasNext()) {
      Row scanFileRow = scanFileRows.next();

      // From the scan file row, extract the file path, size and modification time metadata
      // needed to read the file.
      FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFileRow);

      // Open the scan file which is a Parquet file using connector's own
      // Parquet reader or default Parquet reader provided by the Kernel (which
      // is used in this example).
      CloseableIterator<ColumnarBatch> physicalDataIter =
        engine.getParquetHandler().readParquetFiles(
          singletonCloseableIterator(fileStatus),
          physicalReadSchema,
          Optional.empty() /* optional predicate the connector can apply to filter data from the reader */
        );

      // Now the physical data read from the Parquet data file is converted to a table
      // logical data. Logical data may include the addition of partition columns and/or
      // subset of rows deleted
      try (
         CloseableIterator<FilteredColumnarBatch> transformedData =
           Scan.transformPhysicalData(
             engine,
             scanStateRow,
             scanFileRow,
             physicalDataIter)) {
        while (transformedData.hasNext()) {
          FilteredColumnarBatch logicalData = transformedData.next();
          ColumnarBatch dataBatch = logicalData.getData();

          // Not all rows in `dataBatch` are in the selected output.
          // An optional selection vector determines whether a row with a
          // specific row index is in the final output or not.
          Optional<ColumnVector> selectionVector = dataReadResult.getSelectionVector();

          // access the data for the column at ordinal 0
          ColumnVector column0 = dataBatch.getColumnVector(0);
          for (int rowIndex = 0; rowIndex < column0.getSize(); rowIndex++) {
            // check if the row is selected or not
            if (!selectionVector.isPresent() || // there is no selection vector, all records are selected
               (!selectionVector.get().isNullAt(rowId) && selectionVector.get().getBoolean(rowId)))  {
              // Assuming the column type is String.
              // If it is a different type, call the relevant function on the `ColumnVector`
              System.out.println(column0.getString(rowIndex));
            }
          }

          // access the data for column at ordinal 1
          ColumnVector column1 = dataBatch.getColumnVector(1);
          for (int rowIndex = 0; rowIndex < column1.getSize(); rowIndex++) {
            // check if the row is selected or not
            if (!selectionVector.isPresent() || // there is no selection vector, all records are selected
               (!selectionVector.get().isNullAt(rowId) && selectionVector.get().getBoolean(rowId)))  {
              // Assuming the column type is Long.
              // If it is a different type, call the relevant function on the `ColumnVector`
              System.out.println(column1.getLong(rowIndex));
            }
          }
          // .. more ..
        }
      }
    }
  }
}

这里提供了一些在单个进程中读取Delta表的实际工作示例here

[!重要提示] 所有Delta协议级别的细节都编码在Scan.getScanFiles API返回的行中,但您无需理解这些细节即可正确读取表数据。您只需要从每个扫描文件行中获取Parquet文件状态,并将Parquet文件中的数据读取到ColumnarBatch格式中。物理数据通过`Scan.transformPhysicalData`转换为表的逻辑数据。转换为逻辑数据的过程由协议、表的元数据以及扫描文件决定。随着Delta协议的发展,这一转换步骤也会相应演进,而您的代码无需更改以适应协议变更。这正是Delta Kernel提供的抽象层的主要优势。

[!注意] 请注意,每当调用Delta Kernel API时,同一个Engine实例myEngine会被多次传递。之所以在每次调用时都传递这个实例,是因为它是连接器上下文,应该由连接器在Delta Kernel API之外进行维护,以便连接器能够控制Engine

步骤2:利用文件跳过功能提升扫描性能

我们已经探讨了如何进行全表扫描。然而,使用Delta格式的真正优势在于您可以通过查询过滤器跳过文件。为了实现这一点,Delta Kernel提供了一个表达式框架来编码您的过滤器,并将其提供给Delta Kernel以便在生成扫描文件时跳过文件。例如,假设您的表按columnX分区,您只想查询分区columnX=1。您可以生成表达式并按如下方式使用它来构建扫描:

import io.delta.kernel.expressions.*;

import io.delta.kernel.defaults.engine.*;

Engine myEngine = DefaultEngine.create(new Configuration());

Predicate filter = new Predicate(
  "=",
  Arrays.asList(new Column("columnX"), Literal.ofInt(1)));

Scan myFilteredScan = mySnapshot.buildScan(engine)
  .withFilter(myEngine, filter)
  .build()

// Subset of the given filter that is not guaranteed to be satisfied by
// Delta Kernel when it returns data. This filter is used by Delta Kernel
// to do data skipping as much as possible. The connector should use this filter
// on top of the data returned by Delta Kernel in order for further filtering.
Optional<Predicate> remainingFilter = myFilteredScan.getRemainingFilter();

通过myFilteredScan.getScanFiles(myEngine)返回的扫描文件将仅包含所需分区的文件行。同样地,您可以为非分区列提供筛选条件,如果表中的数据通过这些列进行了良好的聚类,那么Delta Kernel将能够尽可能跳过文件。

创建Delta表

在本节中,我们将逐步介绍如何构建一个Delta连接器,该连接器可以使用Delta Kernel提供的默认`Engine`实现来创建Delta表。

您可以在项目中自行编写这段代码,或者使用Delta代码仓库中提供的示例

主要入口点是`io.delta.kernel.Table`,它是Delta表的编程表示。假设你想在目录myTablePath下创建Delta表,可以按如下方式创建Table对象:

package io.delta.kernel.examples;

import io.delta.kernel.*;
import io.delta.kernel.types.*;
import io.delta.kernel.utils.CloseableIterable;

String myTablePath = <my-table-path>;
Configuration hadoopConf = new Configuration();
Engine myEngine = DefaultEngine.create(hadoopConf);
Table myTable = Table.forPath(myEngine, myTablePath);

注意我们创建的默认`Engine`用于初始化myTable对象。该对象允许您插入自己的库来处理计算密集型操作,如Parquet文件读取、JSON解析等。目前您可以暂时忽略它。稍后当我们讨论如何为分布式处理引擎构建更复杂的连接器时,会进一步探讨这个话题。

从这个myTable对象你可以创建一个`TransactionBuilder`对象,它允许你构建一个`Transaction`对象

TransactionBuilder txnBuilder =
  myTable.createTransactionBuilder(
    myEngine,
    "Examples", /* engineInfo - connector can add its own identifier which is noted in the Delta Log */
    Operation.CREATE_TABLE /* What is the operation we are trying to perform. This is noted in the Delta Log */
  );

现在你已经有了TransactionBuilder对象,可以设置表的模式和分区列。

StructType mySchema = new StructType()
  .add("id", IntegerType.INTEGER)
  .add("name", StringType.STRING)
  .add("city", StringType.STRING)
  .add("salary", DoubleType.DOUBLE);

// Partition columns are optional. Use it only if you are creating a partitioned table.
List<String> myPartitionColumns = Collections.singletonList("city");

// Set the schema of the new table on the transaction builder
txnBuilder = txnBuilder
  .withSchema(engine, mySchema);

// Set the partition columns of the new table only if you are creating
// a partitioned table; otherwise, this step can be skipped.
txnBuilder = txnBuilder
  .withPartitionColumns(engine, examplePartitionColumns);

TransactionBuilder 允许设置表的附加属性,例如启用特定的Delta功能或为幂等写入设置标识符。我们将在接下来的章节中探讨这些内容。下一步是从TransactionBuilder对象构建Transaction

// Build the transaction
Transaction txn = txnBuilder.build(engine);

Transaction 对象允许连接器可选地添加任何数据并最终提交事务。成功提交确保表以给定的模式创建。在此示例中,我们只是创建一个表,而不向表中添加任何数据。

// Commit the transaction.
// As we are just creating the table and not adding any data, the `dataActions` is empty.
TransactionCommitResult commitResult =
  txn.commit(
    engine,
    CloseableIterable.emptyIterable() /* dataActions */
  );

`TransactionCommitResult` 包含事务提交的版本号以及表是否准备好进行检查点。由于我们正在创建表,版本号将是 0。我们稍后将讨论什么是检查点以及表准备好进行检查点的含义。

这里提供了一些创建分区和非分区Delta表的工作示例here

创建表并插入数据

在本节中,我们将逐步介绍如何使用Delta Kernel提供的默认`Engine`实现来构建一个Delta连接器,该连接器可以创建Delta表并向表中插入数据(类似于SQL中的CREATE TABLE

AS 结构)。

您可以在项目中自行编写这段代码,或者使用Delta代码仓库中提供的示例

第一步是构建一个Transaction。以下是相关代码。如需了解代码每个步骤的详细含义,请阅读create table部分。

package io.delta.kernel.examples;

import io.delta.kernel.*;
import io.delta.kernel.types.*;
import io.delta.kernel.utils.CloseableIterable;

String myTablePath = <my-table-path>;
Configuration hadoopConf = new Configuration();
Engine myEngine = DefaultEngine.create(hadoopConf);
Table myTable = Table.forPath(myEngine, myTablePath);

StructType mySchema = new StructType()
  .add("id", IntegerType.INTEGER)
  .add("name", StringType.STRING)
  .add("city", StringType.STRING)
  .add("salary", DoubleType.DOUBLE);

// Partition columns are optional. Use it only if you are creating a partitioned table.
List<String> myPartitionColumns = Collections.singletonList("city");

TransactionBuilder txnBuilder =
  myTable.createTransactionBuilder(
    myEngine,
    "Examples", /* engineInfo - connector can add its own identifier which is noted in the Delta Log */
    Operation.WRITE /* What is the operation we are trying to perform? This is noted in the Delta Log */
  );

// Set the schema of the new table on the transaction builder
txnBuilder = txnBuilder
  .withSchema(engine, mySchema);

// Set the partition columns of the new table only if you are creating
// a partitioned table; otherwise, this step can be skipped.
txnBuilder = txnBuilder
  .withPartitionColumns(engine, examplePartitionColumns);

// Build the transaction
Transaction txn = txnBuilder.build(engine);

现在我们有了`Transaction`对象,下一步是生成确认表模式并按表分区进行分区的数据。

StructType dataSchema = txn.getSchema(engine)

// Optional for un-partitioned tables
List<String> partitionColumnNames = txn.getPartitionColumns(engine)

连接器可以利用数据模式和分区列名来规划查询并生成数据。在实际拥有需要写入表的数据的任务中,连接器可以请求内核将表模式中给定的数据转换为可实际写入Parquet数据文件的物理数据。对于分区表,数据需要先按分区列进行分区,然后连接器应请求内核分别转换每个分区的数据。之所以需要分区步骤,是因为Delta表中任何给定的数据文件都只包含属于一个确切分区的数据。

获取事务的状态。事务状态包含如何将表模式中的数据转换为需要写入的物理数据的信息。这些转换取决于表所采用的协议和功能特性。

Row txnState = txn.getTransactionState(engine);

准备数据。

// The data generated by the connector to write into a table
CloseableIterator<FilteredColumnarBatch> data = ...

// Create partition value map
Map<String, Literal> partitionValues =
  Collections.singletonMap(
    "city", // partition column name
     // partition value. Depending upon the partition column type, the
     // partition value should be created. In this example, the partition
     // column is of type StringType, so we are creating a string literal.
     Literal.ofString(city)
  );

连接器数据以`FilteredColumnarBatch`的迭代器形式传递。每个FilteredColumnarBatch包含一个`ColumnarBatch`,其中实际存储了列式访问格式的数据,以及一个可选的选区向量,允许连接器指定将ColumnarBatch中的哪些行写入表中。

分区值以分区列名到分区值的映射形式传递。对于未分区的表,映射应为空,因为它没有分区列。

// Transform the logical data to physical data that needs to be written to the Parquet
// files
CloseableIterator<FilteredColumnarBatch> physicalData =
  Transaction.transformLogicalData(engine, txnState, data, partitionValues);

上述代码将给定的分区数据转换为需要写入Parquet数据文件的FilteredColumnarBatch迭代器。为了写入数据文件,连接器需要从Kernel获取`WriteContext`,它会告知连接器数据文件的写入位置以及从每个数据文件收集统计信息的列。

// Get the write context
DataWriteContext writeContext = Transaction.getWriteContext(engine, txnState, partitionValues);

现在,连接器已拥有需要写入Parquet数据文件的物理数据,并且知道这些文件应该写入的位置,可以开始写入数据文件了。

CloseableIterator<DataFileStatus> dataFiles = engine.getParquetHandler()
  .writeParquetFiles(
    writeContext.getTargetDirectory(),
    physicalData,
    writeContext.getStatisticsColumns()
  );

在上述代码中,连接器使用了Engine提供的ParquetHandler来写入数据,但连接器也可以选择自己的Parquet文件写入器来写入数据。还需注意的是,上述调用的返回值是一个迭代器,包含每个已写入数据文件的`DataFileStatus`。它基本上包含文件路径、文件元数据,以及由`WriteContext.getStatisticsColumns()`指定的列的可选文件级统计信息。

将每个DataFileStatus转换为可以写入Delta表日志的Delta日志操作。

CloseableIterator<Row> dataActions =
  Transaction.generateAppendActions(engine, txnState, dataFiles, writeContext);

下一步是将上述生成的所有Delta日志操作构建为`CloseableIterable`。构建Iterable的原因是事务提交需要多次访问Delta日志操作列表(以解决表存在多并发写入时的冲突)。Kernel提供了一个实用方法来创建内存版的CloseableIterable。该接口还允许连接器实现自定义方案,当数据操作内容过大无法存入内存时,可将其溢出写入磁盘。

// Create a iterable out of the data actions. If the contents are too big to fit in memory,
// the connector may choose to write the data actions to a temporary file and return an
// iterator that reads from the file.
CloseableIterable<Row> dataActionsIterable = CloseableIterable.inMemoryIterable(dataActions);

最后一步是提交事务!

TransactionCommitStatus commitStatus = txn.commit(engine, dataActionsIterable)

`TransactionCommitResult` 包含事务提交的版本号以及表是否准备好进行检查点。由于我们正在创建表,版本号将是 0。我们稍后将讨论什么是检查点以及表准备好进行检查点的含义。

这里提供了一些工作示例,用于创建并向分区和非分区Delta表中插入数据,详情请见此处

向现有Delta表进行盲追加

在本节中,我们将逐步介绍如何使用Delta Kernel提供的默认`Engine`实现来构建一个Delta连接器,该连接器可将数据插入现有Delta表(类似于SQL中的INSERT INTO

结构)。

您可以选择在项目中自行编写这段代码,或者使用Delta代码仓库中提供的示例。具体步骤与创建表并插入数据完全相同,唯一的区别是在构建TransactionBuilder时不需要提供任何schema或分区列。

// Create a `Table` object with the given destination table path
Table table = Table.forPath(engine, tablePath);

// Create a transaction builder to build the transaction
TransactionBuilder txnBuilder =
  table.createTransactionBuilder(
    engine,
    "Examples", /* engineInfo */
    Operation.WRITE
  );

/ Build the transaction - no need to provide the schema as the table already exists.
Transaction txn = txnBuilder.build(engine);

// Get the transaction state
Row txnState = txn.getTransactionState(engine);

List<Row> dataActions = new ArrayList<>();

// Generate the sample data for three partitions. Process each partition separately.
// This is just an example. In a real-world scenario, the data may come from different
// partitions. Connectors already have the capability to partition by partition values
// before writing to the table

// In the test data `city` is a partition column
for (String city : Arrays.asList("San Francisco", "Campbell", "San Jose")) {
  FilteredColumnarBatch batch1 = generatedPartitionedDataBatch(
            5 /* offset */, city /* partition value */);
  FilteredColumnarBatch batch2 = generatedPartitionedDataBatch(
            5 /* offset */, city /* partition value */);
  FilteredColumnarBatch batch3 = generatedPartitionedDataBatch(
            10 /* offset */, city /* partition value */);

    CloseableIterator<FilteredColumnarBatch> data =
            toCloseableIterator(Arrays.asList(batch1, batch2, batch3).iterator());

    // Create partition value map
    Map<String, Literal> partitionValues =
            Collections.singletonMap(
                    "city", // partition column name
                    // partition value. Depending upon the parition column type, the
                    // partition value should be created. In this example, the partition
                    // column is of type StringType, so we are creating a string literal.
                    Literal.ofString(city));


    // First transform the logical data to physical data that needs to be written
    // to the Parquet
    // files
    CloseableIterator<FilteredColumnarBatch> physicalData =
            Transaction.transformLogicalData(engine, txnState, data, partitionValues);

    // Get the write context
    DataWriteContext writeContext =
            Transaction.getWriteContext(engine, txnState, partitionValues);


    // Now write the physical data to Parquet files
    CloseableIterator<DataFileStatus> dataFiles = engine.getParquetHandler()
            .writeParquetFiles(
                    writeContext.getTargetDirectory(),
                    physicalData,
                    writeContext.getStatisticsColumns());


    // Now convert the data file status to data actions that needs to be written to the Delta
    // table log
    CloseableIterator<Row> partitionDataActions = Transaction.generateAppendActions(
            engine,
            txnState,
            dataFiles,
            writeContext);

    // Now add all the partition data actions to the main data actions list. In a
    // distributed query engine, the partition data is written to files at tasks on executor
    // nodes. The data actions are collected at the driver node and then written to the
    // Delta table log using the `Transaction.commit`
    while (partitionDataActions.hasNext()) {
        dataActions.add(partitionDataActions.next());
    }
}

// Create a iterable out of the data actions. If the contents are too big to fit in memory,
// the connector may choose to write the data actions to a temporary file and return an
// iterator that reads from the file.
CloseableIterable<Row> dataActionsIterable = CloseableIterable.inMemoryIterable(
        toCloseableIterator(dataActions.iterator()));

// Commit the transaction.
TransactionCommitResult commitResult = txn.commit(engine, dataActionsIterable);

对Delta表的幂等盲追加

幂等写入允许连接器确保属于特定事务版本和应用ID的数据最多只被插入表中一次。在增量处理系统(如流处理系统)中,使用其自身应用特定版本来跟踪进度时,需要记录已完成哪些进度,以避免在写入过程中遇到故障和重试时重复数据。通过设置事务标识符,Delta表可以确保具有相同标识符的数据不会被多次写入。更多信息请参阅Delta协议章节Transaction Identifiers

为了使数据追加操作具有幂等性,请在`TransactionBuilder`上设置事务标识符

// Set the transaction identifiers for idempotent writes
// Delta/Kernel makes sure that there exists only one transaction in the Delta log
// with the given application id and txn version
txnBuilder =
  txnBuilder.withTransactionId(
    engine,
    "my app id", /* application id */
    100 /* monotonically increasing txn version with each new data insert */
  );

这就是连接器为实现幂等性盲追加所需做的全部工作。

对Delta表进行检查点操作

检查点是Delta Log中的一项优化功能,旨在更快地构建Delta表的状态。它本质上包含了创建检查点版本时表的状态。Delta Kernel允许连接器选择性地创建检查点。该功能会为表中的每若干次提交(可配置的表属性)创建检查点。

Transaction.commit 的结果返回一个 TransactionCommitResult,其中包含事务提交的版本号以及表是否被读取用于检查点。创建检查点需要时间,因为它需要构建表的完整状态。如果连接器不想自行创建检查点,而是使用其他能更快创建检查点的连接器,则可以跳过检查点步骤。

如果需要进行检查点操作,Table对象提供了检查点表的API接口。

TransactionCommitResult commitResult = txn.commit(engine, dataActionsIterable);

if (commitResult.isReadyForCheckpoint()) {
  // Checkpoint the table
  Table.forPath(engine, tablePath).checkpoint(engine, commitResult.getVersion());
}

为分布式处理引擎构建Delta连接器

与仅通过单一进程读取表的简单应用程序不同,为Apache Spark™和Trino等复杂处理引擎构建连接器可能需要相当多的额外工作。例如,要为SQL引擎构建连接器,您必须执行以下操作

  • 了解引擎提供的API以构建连接器,以及如何使用Delta Kernel为连接器和引擎提供操作Delta表所需的信息。

  • 决定使用哪些库来执行计算密集型操作,如读取Parquet文件、解析JSON、计算表达式等。Delta Kernel提供了所有扩展点,允许您插入任何库,而无需理解Delta协议的所有底层细节。

  • 处理分布式引擎特有的细节。例如,

    • 由Delta Kernel提供的Delta表元数据序列化。

    • 高效地将从Parquet读取的数据转换为引擎内存处理格式。

在本节中,我们将概述构建连接器所需的步骤。

步骤0:验证先决条件

在前一节展示如何读取简单表格时,我们简要介绍了`Engine`。这是主要的扩展点,您可以在此插入计算密集型操作的自定义实现,例如读取Parquet文件、解析JSON等。在简单场景中,我们使用了适用于大多数情况的默认帮助程序实现。然而,若要为复杂处理引擎构建高性能连接器,您很可能需要使用与引擎兼容的库来提供自己的实现。因此在开始构建连接器之前,理解这些需求并规划构建自己的引擎非常重要。

以下是构建能够读取Delta表连接器所需的库/功能

  • 从您的存储/文件系统执行文件列表和文件读取操作。

  • 以列式数据格式读取Parquet文件,优先采用内存中的列式格式。

  • 解析JSON数据

  • 读取JSON文件

  • 在内存列式批次上评估表达式

对于每一项功能,您可以选择自行构建实现或复用默认实现。

步骤1:在您的连接器项目中设置Delta Kernel

在Delta Kernel项目中,您可以选择依赖多个不同的组件。

  1. Delta Kernel核心API - 这是一个必备依赖项,包含所有主要API如表(Table)、快照(Snapshot)和扫描(Scan),您将使用这些API来访问Delta表的元数据和数据。它具有极少的依赖项,降低了与连接器和引擎中任何依赖项发生冲突的可能性。同时它还提供了`Engine`接口,允许您插入计算密集型操作的实现,但该接口本身不提供任何实现。

  2. Delta Kernel 默认实现 - 它有一个名为 `DefaultEngine` 的默认实现,以及额外的依赖项如 Hadoop。如果您希望复用全部或部分此实现,可以选择依赖此模块。

设置Java项目

如上所述,您可以按以下方式导入一个或两个构件:

<!-- Must have dependency -->
<dependency>
  <groupId>io.delta</groupId>
  <artifactId>delta-kernel-api</artifactId>
  <version>${delta-kernel.version}</version>
</dependency>

<!-- Optional depdendency -->
<dependency>
  <groupId>io.delta</groupId>
  <artifactId>delta-kernel-defaults</artifactId>
  <version>${delta-kernel.version}</version>
</dependency>

步骤2:构建你自己的引擎

在本节中,我们将探讨`Engine`接口,并逐步讲解如何实现您自己的实现方案,以便您可以接入特定连接器/引擎的计算密集型操作、线程模型、资源管理等实现。

[!重要提示] 在验证过程中,如果您认为默认Engine实现的所有依赖项都能与您的连接器和引擎兼容,则可以跳过此步骤,直接进入使用默认引擎实现连接器的第3步。如果后续需要为您的连接器自定义辅助工具,可以重新执行此步骤。

步骤2.1:实现Engine接口

`Engine`接口整合了多个子接口,每个子接口都有特定的用途。以下是这些子接口的简要概述。更详细的内容请参阅API文档(Java)。

interface Engine {
  /**
   * Get the connector provided {@link ExpressionHandler}.
   * @return An implementation of {@link ExpressionHandler}.
  */
  ExpressionHandler getExpressionHandler();

  /**
   * Get the connector provided {@link JsonHandler}.
   * @return An implementation of {@link JsonHandler}.
   */
  JsonHandler getJsonHandler();

  /**
   * Get the connector provided {@link FileSystemClient}.
   * @return An implementation of {@link FileSystemClient}.
   */
  FileSystemClient getFileSystemClient();

  /**
   * Get the connector provided {@link ParquetHandler}.
   * @return An implementation of {@link ParquetHandler}.
   */
  ParquetHandler getParquetHandler();
}

要构建您自己的Engine实现,您可以选择使用每个子接口的默认实现,或者完全从头开始构建每一个。

class MyEngine extends DefaultEngine {

  FileSystemClient getFileSystemClient() {
    // Build a new implementation from scratch
    return new MyFileSystemClient();
  }

  // For all other sub-clients, use the default implementations provided by the `DefaultEngine`.
}

接下来,我们将逐步介绍如何实现每个接口。

步骤2.2:实现FileSystemClient接口

`FileSystemClient` 接口包含基本的文件系统操作,如列出目录、将路径解析为完全限定路径以及从文件中读取字节。在实现此接口与S3、Hadoop或ADLS等存储系统交互时,必须注意以下几点:

  • 凭证和权限:连接器必须使用必要的配置和凭证填充其FileSystemClient,以便客户端从存储系统中检索所需数据。例如,基于Hadoop的FileSystem抽象的实现可以通过Hadoop配置传递S3凭证。

  • 解密:如果文件系统对象被加密,则实现必须在返回数据之前对数据进行解密。

步骤2.3: 实现 ParquetHandler

顾名思义,该接口包含与读写Parquet文件相关的所有功能。其设计允许连接器接入多种实现方式,从简单的单线程读取器到具备预取功能和高级连接器特定表达式下推的多线程高级读取器。接下来我们将探讨需要实现的方法及其相关保证。

方法 readParquetFiles(CloseableIterator fileIter, StructType physicalSchema, java.util.Optional predicate)

方法接收包含Parquet文件元数据(如文件路径、大小等)的FileStatus作为输入参数。需要从Parquet文件中读取的列由物理模式定义。要实现此方法,您可能需要先实现自己的`ColumnarBatch``ColumnVector`,它们用于表示从Parquet文件生成的内存数据。

在识别要读取的列时,请注意物理模式中存在多种类型的列(表示为`StructType`)。

  • 数据列:预期从Parquet文件中读取的列。根据定义列的StructField对象,读取Parquet文件中名称或字段ID匹配的列。如果列具有字段ID(存储在StructField元数据中的parquet.field.id),则应使用该字段ID来匹配Parquet文件中的列。否则,应使用列名进行匹配。

  • 元数据列:这些是特殊的列,必须使用关于Parquet文件的元数据进行填充(`StructField#isMetadataColumn` 用于判断 StructType 中的某一列是否为元数据列)。要了解如何填充此类列,首先将列名与标准元数据列名常量集进行匹配。例如,

    • StructFileld#isMetadataColumn() 返回 true 且列名为 StructField.METADATA_ROW_INDEX_COLUMN_NAME 时,必须生成一个列向量,其中填充的是 Parquet 文件中每行的实际索引(即不考虑 Parquet 数据跳过可能返回的行子集后的索引)。

需求与保证

任何实现都必须遵循以下保证。

  • 返回的ColumnarBatches的模式必须与物理模式匹配。

    • 如果找不到数据列且StructField.isNullable = true,则返回一个ColumnVector的空值向量。如果该列不可为空,则抛出错误。

  • 输出迭代器必须保持与输入迭代器相同的顺序。也就是说,如果输入迭代器中file1file2之前,那么在输出迭代器中file1的列式批次也必须位于file2的列式批次之前。

方法 writeParquetFiles(String directoryPath, CloseableIterator dataIter, java.util.List statsColumns)

method接收给定数据并将其写入指定目录中的一个或多个Parquet文件。数据以FilteredColumnarBatches迭代器的形式提供,其中包含一个ColumnarBatch和一个可选的选择向量,该向量为ColumnarBatch中的每一行包含一个条目,用于指示该行是否被选中。ColumnarBatch还包含数据的模式。此模式应转换为Parquet模式,包括每个列StructField中存在的任何字段ID`FieldMetadata`

还有一个参数statsColumns,它向Parquet写入器提示需要为每个文件收集哪些列的统计信息。这些统计信息包括statsColumns列表中每列的minmaxnull_count。统计信息收集是可选的,但当存在时,Kernel会使用这些统计信息作为Delta表提交的一部分持久化存储。这将帮助读取查询根据查询谓词修剪不需要的数据文件。

对于每个写入的数据文件,调用者期望获取一个`DataFileStatus`对象。该对象包含数据文件路径、大小、修改时间以及可选的列统计信息。

方法 writeParquetFileAtomically(String filePath, CloseableIterator data)

方法将给定的data写入位于filePath路径的Parquet文件。此写入是原子性写入,即要么创建包含所有给定内容的Parquet文件,要么完全不创建Parquet文件。这不应创建包含部分内容的文件。

默认实现利用来自`delta-storage`模块的`LogStore`实现来确保原子性。想要实现自己版本ParquetHandler的连接器可以参考默认实现获取详细信息。

性能优化建议
  • 将数据表示为ColumnVectorColumnarBatch会对查询性能产生重大影响,最佳做法是直接将Parquet文件数据读取为引擎原生格式的向量和批次,以避免潜在的高成本内存数据格式转换。围绕引擎原生格式的等效类创建Kernel ColumnVectorColumnarBatch包装器。

步骤2.4: 实现ExpressionHandler接口

`ExpressionHandler` 接口包含了处理可应用于列式数据表达式所需的所有方法。

方法 getEvaluator(StructType batchSchema, Expression expresion, DataType outputType)

这个方法生成一个`ExpressionEvaluator`类型的对象,该对象可以在一批行数据上评估expression以产生单列向量的结果。为了生成这个函数,getEvaluator()方法接收表达式和ColumnarBatch数据模式作为输入,这些表达式将应用于该数据模式。同一个对象可用于评估具有相同模式和表达式的多个输入列式批次,该评估器就是为此创建的。

方法 getPredicateEvaluator(StructType inputSchema, Predicate predicate)

方法用于为Predicate类型的表达式创建表达式求值器。Predicate类型的表达式会返回一个布尔值作为输出。

返回的对象类型为`PredicateEvaluator`。这是一个特殊接口,用于对输入批次执行谓词评估,返回一个选择向量,其中包含输入批次中每行对应的值,指示该行是否通过谓词判断。可选地,它可以接收一个现有的选择向量与输入批次一起进行评估。结果选择向量会与给定的现有选择向量结合,并返回一个新的选择向量。这种机制允许通过多次谓词评估处理输入批次,而无需在每次谓词评估后重写输入批次来移除未通过谓词的行。新的选择向量应该与现有选择向量相同或更具选择性。例如,如果某行在现有选择向量中被标记为未选中,那么即使给定谓词对该行返回true,它在返回的选择向量中也应保持未选中状态。

方法 createSelectionVector(boolean[] values, int from, int to)

方法允许为输入的布尔类型值创建ColumnVector。这使得连接器能够以所需的内存格式维护所有创建的ColumnVector

需求与保证

任何实现都必须遵循以下保证。

  • 实现必须处理表达式的所有可能变体。如果实现遇到不知道如何处理表达式类型,则必须抛出特定于语言的异常。

  • 生成的ExpressionEvaluator将要使用的ColumnarBatch保证具有生成期间提供的模式。因此,将表达式求值逻辑绑定到列序号而不是列名是安全的,从而使实际求值更快。

步骤2.5:实现JsonHandler

This 引擎接口允许连接器使用插件自身的JSON处理代码,并将其暴露给Delta Kernel。

方法 readJsonFiles(CloseableIterator fileIter, StructType physicalSchema, java.util.Optional predicate)

方法以JSON文件的FileStatus作为输入,并返回一系列列式批次的数据。需要从JSON文件中读取的列由物理模式定义,返回的批次必须与该模式匹配。要实现此方法,您可能需要先实现自己的`ColumnarBatch``ColumnVector`,用于表示从JSON文件生成的内存数据。

在识别要读取的列时,请注意物理模式中存在多种类型的列(表示为`StructType`)。

方法 parseJson(ColumnVector jsonStringVector, StructType outputSchema, java.util.Optional selectionVector)

方法允许将JSON格式的字符串值ColumnVector解析为outputSchema指定的输出格式。如果在outputSchema中未找到给定列,则返回空值。它可选地接受一个选择向量,该向量指示要解析输入字符串ColumnVector中的哪些条目。如果未选择某个条目,则会在输出中为该特定条目返回null值作为解析输出。

方法 deserializeStructType(String structTypeJson)

method允许将JSON编码(遵循Delta模式序列化规则)的StructType模式解析为StructType。大多数JsonHandler的实现无需重写此方法,可直接使用默认`JsonHandler`实现中的版本。

方法 writeJsonFileAtomically(String filePath, CloseableIterator data, boolean overwrite)

方法将给定的data写入位于filePath路径的JSON文件。此写入是原子写入,即要么创建包含所有给定内容的JSON文件,要么完全不创建Parquet文件。这不应创建包含部分内容的文件。

默认实现利用`delta-storage`模块中的`LogStore`实现来确保原子性。想要实现自定义JsonHandler版本的连接器可以参考默认实现的细节。

该实现预期将按照API Javadoc中描述的序列化规则(将Row对象转换为JSON字符串)进行处理。

步骤2.6: 实现 ColumnarBatchColumnVector

`ColumnarBatch``ColumnVector` 是两个用于表示从文件读取到内存中数据的接口。这种表示方式对查询性能有重大影响。每个引擎通常都有其原生内存数据表示方式,用于执行数据转换操作。例如在 Apache Spark™ 中,行数据在内部表示为 UnsafeRow 以实现高效处理。因此最佳做法是将 Parquet 文件数据直接读取为原生格式的向量和批次,以避免潜在的高成本内存数据格式转换。推荐的方法是构建扩展这两个接口的包装类,但在内部使用引擎原生类来存储数据。当连接器需要将从内核接收的列式批次转发给引擎时,它必须足够智能以跳过已经采用引擎原生格式的向量和批次转换。

步骤3:在您的连接器中构建读取支持

在本节中,我们将逐步介绍连接器读取表时可能需要调用的Kernel API调用序列。连接器在与引擎交互时进行这些调用的确切时机完全取决于引擎-连接器API,因此不在本指南讨论范围内。不过,我们将尽量提供可能(但不保证)适用于您连接器-引擎设置的通用指导原则。为此,我们假设引擎在处理读取/扫描查询时会经历以下阶段 - 逻辑计划分析、物理计划生成和物理计划执行。基于这些通用特征,读取Delta表的典型控制和数据流如下:

header-rows

1

widths

30 70

步骤 此步骤发生的典型查询阶段 解析要查询的表快照 当需要解析和验证计划的模式及其他细节时的逻辑计划分析阶段 根据查询参数解析要扫描的文件 物理计划生成阶段,此时扫描的最终参数已确定。例如: 修剪掉未使用的列后要读取的数据模式 过滤器重新排列后要应用的查询过滤器 将文件信息分发给工作节点 仅当使用分布式引擎时的物理计划执行阶段 使用文件信息读取列式数据 当引擎正在处理数据时的物理计划执行阶段

让我们详细了解每个步骤的细节。

步骤3.1:解析要查询的表快照

第一步是解析一致的快照及其关联的模式。连接器/引擎通常需要这一步来解析和验证扫描查询的逻辑计划(如果您的引擎中存在逻辑计划的概念)。为了实现这一点,连接器必须执行以下操作。

  • 从查询中解析表路径:如果路径直接可用,则这很简单。否则,如果它是基于目录表的查询(例如,在Hive Metastore中定义的Delta表),那么连接器必须从目录中解析表路径。

  • 初始化Engine对象:创建您在步骤2中选择的Engine新实例。

  • 初始化Kernel对象并获取表结构:假设查询是基于表的最新可用版本/快照,可以按以下方式获取表结构:

import io.delta.kernel.*;
import io.delta.kernel.defaults.engine.*;

Engine myEngine = new MyEngine();
Table myTable = Table.forPath(myTablePath);
Snapshot mySnapshot = myTable.getLatestSnapshot(myEngine);
StructType mySchema = mySnapshot.getSchema(myEngine);

如果你想查询表的特定版本(即非当前模式),那么你可以通过myTable.getSnapshot(version)获取所需的快照。

步骤3.2:解析要扫描的文件

接下来,我们需要利用查询中的更多信息构建一个Scan对象。这里我们假设连接器/引擎已能够从查询中提取以下详细信息(例如,在优化逻辑计划之后):

  • 读取模式:查询需要读取的表中的列。这可能是完整的列集合,也可能是列的子集。

  • 查询过滤器:可用于跳过读取表数据的分区或数据列上的过滤器。

要向Kernel提供这些信息,您需要执行以下操作:

  • 将引擎特定的模式和过滤表达式转换为Kernel模式和表达式:对于模式,您需要创建一个StructType对象。对于过滤器,您需要使用Expression的所有可用子类来创建一个Expression对象。

  • 使用转换后的信息构建扫描:按以下方式构建扫描:

import io.delta.kernel.expressions.*;
import io.delta.kernel.types.*;

StructType readSchema = ... ;  // convert engine schema
Predicate filterExpr = ... ;   // convert engine filter expression

Scan myScan = mySnapshot.buildScan(engine)
  .withFilter(myEngine, filterExpr)
  .withReadSchema(myEngine, readSchema)
  .build()
  • 解析文件读取所需的信息:生成的Scan对象包含两组信息。

    • 扫描文件:myScan.getScanFiles()返回一个ColumnarBatch的迭代器。迭代器中的每个批次包含多行数据,每行数据包含基于查询过滤器选中的单个文件的信息。

    • 扫描状态:myScan.getScanState() 返回一个包含所有需要读取文件共有信息的 Row

Row myScanStateRow = myScan.getScanState();
CloseableIterator<FilteredColumnarBatch> myScanFilesAsBatches = myScan.getScanFiles();

while (myScanFilesAsBatches.hasNext()) {
  FilteredColumnarBatch scanFileBatch = myScanFilesAsBatches.next();

  CloseableIterator<Row> myScanFilesAsRows = scanFileBatch.getRows();
}

正如我们即将看到的,从选定的文件中读取列式数据需要同时使用扫描状态行和包含文件信息的扫描文件行。

需求与保证

以下是定义此扫描时需要确保的详细信息。

  • 提供的readSchema必须与引擎执行查询时期望的数据模式完全一致。在查询规划阶段定义的模式与查询执行时模式之间的任何不匹配都将导致运行时失败。因此,您必须在引擎完成所有优化(如列裁剪)并最终确定逻辑计划后,才能使用该readSchema构建扫描。

  • 在适用的情况下(例如使用Java Kernel API时),您必须确保在消费扫描文件的ColumnarBatch时调用close()方法(即要么序列化行,要么使用它们来读取表数据)。

步骤3.3:将文件信息分发给工作节点

如果您正在为Spark/Presto/Trino/Flink等分布式引擎构建连接器,那么您的连接器需要将查询规划机(以下称为驱动程序)上的所有扫描元数据发送到任务执行机(以下称为工作节点)。您必须对扫描状态和扫描文件行进行序列化和反序列化处理。连接器的职责是为`Row`实现序列化和反序列化工具。如果连接器希望将一个扫描文件的读取拆分为多个任务,它可以向任务添加额外的连接器特定拆分上下文。在任务执行时,连接器可以使用自己的Parquet读取器来读取拆分信息所指示的文件特定部分。

自定义Row序列化器/反序列化器

以下是构建自定义序列化器/反序列化器的步骤,使其能够处理任何模式的`Row`

  • 序列化

    • 首先序列化行模式,即StructType对象。

    • 然后,使用该模式识别Row中每列/序号的类型,并据此逐个序列化所有值。

  • 反序列化

    • 定义一个继承Row接口的自定义类。它必须能够处理复杂类型,如数组、嵌套结构和映射。

    • 首先反序列化模式。

    • 然后,使用该模式反序列化这些值,并将它们放入自定义Row类的实例中。

import io.delta.kernel.utils.*;

// In the driver where query planning is being done
Byte[] scanStateRowBytes = RowUtils.serialize(scanStateRow);
Byte[] scanFileRowBytes = RowUtils.serialize(scanFileRow);

// Optionally the connector adds a split info to the task (scan file, scan state) to
// split reading of a Parquet file into multiple tasks. The task gets split info
// along with the scan file row and scan state row.
Split split = ...; // connector specific class, not related to Kernel

// Send these over to the worker

// In the worker when data will be read, after rowBytes have been sent over
Row scanStateRow = RowUtils.deserialize(scanStateRowBytes);
Row scanFileRow = RowUtils.deserialize(scanFileRowBytes);
Split split = ... deserialize split info ...;

步骤3.4:读取列式数据

最后,我们准备好读取列式数据了。您需要执行以下操作:

  • 根据扫描文件行、扫描状态以及可选的拆分信息,从Parquet文件中读取物理数据

  • 使用Kernel的API将物理数据转换为表的逻辑数据。

Row scanStateRow = ... ;
Row scanFileRow = ... ;
Split split = ...;

// Additional option predicate such as dynamic filters the connector wants to
// pass to the reader when reading files.
Predicate optPredicate = ...;

// Get the physical read schema of columns to read from the Parquet data files
StructType physicalReadSchema =
  ScanStateRow.getPhysicalDataReadSchema(engine, scanStateRow);

// From the scan file row, extract the file path, size and modification metadata
// needed to read the file.
FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFileRow);

// Open the scan file which is a Parquet file using connector's own
// Parquet reader which supports reading specific parts (split) of the file.
// If the connector doesn't have its own Parquet reader, it can use the
// default Parquet reader provider which at the moment doesn't support reading
// a specific part of the file, but reads the entire file from the beginning.
CloseableIterator<ColumnarBatch> physicalDataIter =
  connectParquetReader.readParquetFile(
    fileStatus
    physicalReadSchema,
    split, // what part of the Parquet file to read data from
    optPredicate /* additional predicate the connector can apply to filter data from the reader */
  );

// Now the physical data read from the Parquet data file is converted to logical data
// the table represents.
// Logical data may include the addition of partition columns and/or
// subset of rows deleted
CloseableIterator<FilteredColumnarBatch> transformedData =
  Scan.transformPhysicalData(
    engine,
    scanState,
    scanFileRow,
    physicalDataIter));
  • 按批次解析数据:每个`FilteredColumnarBatch`包含两个组件:

    • 列式批次(由FilteredColumnarBatch.getData()返回):这是从文件中读取的数据,其模式与在先前步骤构建Scan对象时提供的readSchema相匹配。

    • 可选的选择向量(由FilteredColumnarBatch.getSelectionVector()返回):可选地,一个布尔向量,用于定义批次中哪些行是有效的,应该被引擎处理。

如果存在选择向量,则需要将其应用到批次中以解析最终可消费数据。

  • 转换为引擎特定的数据格式:每个连接器/引擎都有其本地的行/列批处理格式和接口。为了将读取的数据批次返回给引擎,您需要将它们转换为适合这些引擎特定格式和/或接口的形式。以下是一些可以提高效率的技巧。

    • 匹配引擎特定格式:某些引擎可能期望数据采用内存中的格式,这可能与getData()生成的数据不同。因此,您需要根据需要为批次中的每个列向量进行数据转换。

    • 匹配引擎特定接口:您可能需要实现扩展引擎特定接口的包装类,并适当封装行数据。

为了获得最佳性能,您可以实现自己的Parquet读取器和其他Engine实现,以确保生成的每个ColumnVector已经是引擎原生格式,从而无需进行任何转换。

现在您应该能够正确读取Delta表了。

步骤4:在您的连接器中构建追加支持

在本节中,我们将逐步介绍您的连接器可能需要调用的Kernel API序列,以便向表中追加数据。在连接器与引擎交互的上下文中,这些调用的确切时机完全取决于引擎-连接器API,因此不在本指南的讨论范围内。不过,我们将尽量提供可能适用于(但不保证)您的连接器-引擎设置的通用指导原则。为此,我们假设引擎在处理写入查询时会经历以下阶段——逻辑计划分析、物理计划生成和物理计划执行。基于这些通用特征,读取Delta表的典型控制和数据流如下:

步骤

此步骤发生的典型查询阶段

确定需要写入表的数据结构。当表尚不存在时,模式(schema)会从现有表或查询计划中write操作符的父操作派生而来。

逻辑计划分析阶段,当需要解析和验证计划的模式(write操作符模式与表模式匹配等)及其他细节时。

根据表模式和分区列(无论是现有表还是查询计划中的新表)确定数据的物理分区

物理计划生成阶段,确定写入任务数量、数据模式和分区方式

将写入任务定义(包含事务状态)分发给工作节点。

物理计划执行,仅当它是分布式引擎时。

任务将数据写入数据文件并将数据文件信息发送给驱动程序。

物理计划执行,即数据实际写入表位置时

完成查询。在此阶段,任务写入的数据文件的所有信息将被汇总,并提交到物理执行开始时创建的事务中。

完成查询。这发生在启动查询的驱动程序上。

让我们详细了解每个步骤的细节。

步骤4.1:确定需要写入表的数据模式

第一步是解析输出数据的模式。连接器/引擎通常需要这一步来解析和验证查询的逻辑计划(如果您的引擎中存在逻辑计划的概念)。为实现这一点,连接器需要执行以下操作。从高层次来看,查询计划是一个操作符树,其中叶级操作符从存储/表中生成或读取数据,并将其向上传递给父操作符节点。这种数据传输一直持续到到达根操作符节点,在那里查询最终完成(结果发送给客户端或数据写入另一个表)。

  • 创建Table对象

  • 尝试从Table对象获取schema。

    • 如果找不到该表

      • 查询包括创建表(例如,CREATE TABLE AS SQL查询);

        • 模式源自于向write操作符提供数据的上游操作符。

      • 如果查询不包含创建新表的操作,则会抛出表未找到的异常

    • 如果表已存在

      • 从表中获取模式并检查是否与write操作符的模式匹配。如果不匹配则抛出异常。

  • 创建一个TransactionBuilder - 这基本上开始了事务构建的步骤。

import io.delta.kernel.*;
import io.delta.kernel.defaults.engine.*;

Engine myEngine = new MyEngine();
Table myTable = Table.forPath(myTablePath);

StructType writeOperatorSchema = // ... derived from the query operator tree ...
StructType dataSchema;
boolean isNewTable = false;

try {
  Snapshot mySnapshot = myTable.getLatestSnapshot(myEngine);
  dataSchema = mySnapshot.getSchema(myEngine);

  // .. check dataSchema and writeOperatorSchema match ...
} catch(TableNotFoundException e) {
  isNewTable = true;
  dataSchema = writeOperatorSchema;
}

TransactionBuilder txnBuilder =
  myTable.createTransactionBuilder(
    myEngine,
    "Examples", /* engineInfo - connector can add its own identifier which is noted in the Delta Log */
    Operation /* What is the operation we are trying to perform? This is noted in the Delta Log */
  );

if (isNewTable) {
  // For a new table set the table schema in the transaction builder
  txnBuilder = txnBuilder.withSchema(engine, dataSchema)
}

步骤4.2:根据表结构和分区列确定数据的物理分区

分区列可以从查询中获取(对于新表,查询定义了分区列)或从现有表中获取。

TransactionBuilder txnBuilder = ... from the last step ...
Transaction txn;

List<String> partitionColumns = ...
if (newTable) {
  partitionColumns = ... derive from the query parameters (ex. PARTITION BY clause in SQL) ...
  txnBuilder = txnBuilder.withPartitionColumns(engine, partitionColumns);
  txn = txnBuilder.build(engine);
} else {
  txn = txnBuilder.build(engine);
  partitionColumns = txn.getPartitionColumns(engine);
}

在这一步骤结束时,我们获得了要生成数据的Transaction和模式及其分区信息。

步骤4.3:将包含事务状态的写入任务定义分发到工作节点

如果您正在为Spark/Presto/Trino/Flink等分布式引擎构建连接器,那么您的连接器需要将所有的写入器元数据从查询规划机器(以下称为driver)发送到任务执行机器(以下称为workers)。您必须对事务状态进行序列化和反序列化。连接器的工作是实现`Row`的序列化和反序列化工具。关于自定义Row序列化/反序列化的更多细节可以在这里找到。

Row txnState = txn.getState(engine);

String jsonTxnState = serializeToJson(txnState);

步骤4.4:任务将数据写入数据文件,并将数据文件信息发送给驱动程序。

在这一步骤中(在每个任务内部的工作节点上执行):

  • 反序列化事务状态

  • 任务中的Writer操作符从其父操作符获取数据。

  • 数据被转换为FilteredColumnarBatch。每个`FilteredColumnarBatch`包含两个组件:

    • 列式批次(由FilteredColumnarBatch.getData()返回):这是从文件中读取的数据,其模式与在先前步骤中构建Scan对象时提供的readSchema相匹配。

    • 可选的选择向量(由FilteredColumnarBatch.getSelectionVector()返回):可选地,一个布尔向量,用于定义批次中哪些行是有效的,应该被引擎消费。

  • 该连接器可以围绕其自身内存格式中的数据创建FilteredColumnBatch包装器。

  • 检查数据是否已分区。如果未分区,则按分区值对数据进行分区。

  • 为每个分区生成分区列到分区值的映射

  • 使用Kernel将分区数据转换为应存入数据文件的物理数据

  • 将物理数据写入一个或多个数据文件。

  • 将数据文件状态转换为Delta日志操作

  • 序列化Delta日志操作Row对象并将其发送到驱动节点

Row txnState = ... deserialize from JSON string sent by the driver ...

CloseableIterator<FilteredColumnarBatch> data = ... generate data ...

// If the table is un-partitioned then this is an empty map
Map<String, Literal> partitionValues = ... prepare the partition values ...


// First transform the logical data to physical data that needs to be written
// to the Parquet files
CloseableIterator<FilteredColumnarBatch> physicalData =
  Transaction.transformLogicalData(engine, txnState, data, partitionValues);

// Get the write context
DataWriteContext writeContext = Transaction.getWriteContext(engine, txnState, partitionValues);

// Now write the physical data to Parquet files
CloseableIterator<DataFileStatus> dataFiles =
  engine.getParquetHandler()
    .writeParquetFiles(
      writeContext.getTargetDirectory(),
      physicalData,
      writeContext.getStatisticsColumns());

// Now convert the data file status to data actions that needs to be written to the Delta table log
CloseableIterator<Row> partitionDataActions =
  Transaction.generateAppendActions(
    engine,
    txnState,
    dataFiles,
    writeContext);

.... serialize `partitionDataActions` and send them to driver node

步骤4.5:完成查询。

在驱动节点上,会接收来自所有任务的Delta日志操作并将其提交到事务中。这些任务将Delta日志操作以序列化的JSON格式发送,并将其反序列化回Row对象。

// Create a iterable out of the data actions. If the contents are too big to fit in memory,
// the connector may choose to write the data actions to a temporary file and return an
// iterator that reads from the file.
CloseableIterable<Row> dataActionsIterable = CloseableIterable.inMemoryIterable(
        toCloseableIterator(dataActions.iterator()));

// Commit the transaction.
TransactionCommitResult commitResult = txn.commit(engine, dataActionsIterable);

// Optional step
if (commitResult.isReadyForCheckpoint()) {
  // Checkpoint the table
  Table.forPath(engine, tablePath).checkpoint(engine, commitResult.getVersion());
}

就是这样。现在你应该能够使用Kernel API向Delta表追加数据了。

迁移指南

Kernel API仍在不断演进,新功能正在持续添加。内核开发者会尽力确保每个新版本中的API变更尽可能向后兼容,但有时对于一个快速发展的项目来说,维持向后兼容性确实存在挑战。

本节提供关于如何将您的连接器迁移至Delta Kernel最新版本的指导。随着每个新版本的发布,示例都会保持与最新API变更同步。您可以参考这些示例来了解如何使用新API。

从Delta Lake 3.1.0版本迁移到3.2.0版本

以下是Delta Kernel 3.2.0中的API变更,可能需要您对连接器进行相应修改。

TableClient 重命名为 `Engine`

TableClient 接口已更名为 Engine。这是本版本中最重大的API变更。TableClient 这个接口名称并不能准确体现其提供的功能。从高层次来看,它提供了读取Parquet文件、JSON文件、对数据进行表达式求值以及文件系统功能等能力。这些基本上都是Kernel所依赖的核心操作,作为一个独立接口允许连接器替换它们自己对相同功能的定制实现(例如自定义的Parquet读取器)。本质上,这些功能就是engine功能的核心。通过更名为Engine,我们用一个更易于理解的恰当名称来体现该接口的功能。

DefaultTableClient 已更名为 `DefaultEngine`

`Table.forPath(Engine engine, String tablePath)` 行为变更

之前当传入一个不存在的表路径时,API会抛出TableNotFoundException异常。现在它不再抛出异常,而是返回一个Table对象。当尝试从该表对象获取Snapshot时,才会抛出TableNotFoundException异常。

`FileSystemClient.resolvePath` 行为变更

之前当传入一个不存在的路径时,API会抛出FileNotFoundException异常。现在它不再抛出该异常,但仍会将给定路径解析为完全限定路径。