Delta Standalone (已弃用)
警告
Delta Standalone 已弃用,并将在未来的版本中移除。我们推荐使用 Delta Kernel API。
Delta Standalone库是一个单节点Java库,可用于读取和写入Delta表。具体来说,该库提供了与事务日志中表元数据交互的API,通过实现Delta事务日志协议来实现Delta Lake格式的事务保证。值得注意的是,该项目不依赖于Apache Spark,并且只有少量的传递依赖。因此,任何处理引擎或应用程序都可以使用它来访问Delta表。
-
-
-
6.1. 读取Parquet数据(分布式)
6.2. 读取Parquet数据(单JVM环境)
使用案例
Delta Standalone针对您希望使用非Spark引擎读写Delta表的场景进行了优化。它是一个"底层"库,我们鼓励开发者为所需引擎贡献开源的高级别连接器,这些连接器使用Delta Standalone进行所有Delta Lake元数据交互。您可以在Delta Lake仓库中找到Hive源连接器和Flink接收器/源连接器。其他连接器正在开发中。
注意事项
Delta Standalone minimizes memory usage in the JVM by loading the Delta Lake transaction log incrementally, using an iterator. However, Delta Standalone runs in a single JVM, and is limited to the processing and memory capabilities of that JVM. Users must configure the JVM to avoid out of memory (OOM) issues.
Delta Standalone 提供了读取 Parquet 数据的基本 API,但不包含写入 Parquet 数据的 API。用户必须自行写出新的 Parquet 数据文件,然后使用 Delta Standalone 将这些更改提交到 Delta 表,使新数据对读取者可见。
API接口
Delta Standalone 提供了用于读取数据、查询元数据和提交事务日志的类和实体。这里重点介绍其中几个及其关键接口。完整类和实体请参阅Java API文档。
DeltaLog
DeltaLog 是用于以编程方式与Delta表事务日志中的元数据进行交互的主要接口。
使用
DeltaLog.forTable(hadoopConf, path)实例化一个DeltaLog,并传入Delta表根目录的path路径。通过
DeltaLog::snapshot访问当前快照。通过
DeltaLog::update获取最新快照,包括添加到日志中的任何新数据文件。使用
DeltaLog::getSnapshotForTimestampAsOf或DeltaLog::getSnapshotForVersionAsOf获取日志在某个历史状态的快照。使用
DeltaLog::startTransaction启动一个新事务以提交到事务日志。使用
DeltaLog::getChanges获取所有元数据操作而无需计算完整的快照。
快照
Snapshot 表示表在特定版本时的状态。
使用
Snapshot::getAllFiles获取元数据文件列表。如需对元数据文件进行内存优化的迭代操作,可使用
Snapshot::scan获取DeltaScan(后续会详述),也可选择性地传入predicate参数进行分区过滤。使用
Snapshot::open读取实际数据,它会返回Delta表行的迭代器。
OptimisticTransaction
向事务日志提交一组更新的主要类是OptimisticTransaction。在事务期间,所有读取操作都必须通过OptimisticTransaction实例而非DeltaLog进行,以便检测逻辑冲突和并发更新。
在事务期间使用
OptimisticTransaction::markFilesAsRead读取元数据文件,该方法会返回一个与readPredicate匹配的文件DeltaScan。使用
OptimisticTransaction::commit提交到事务日志。通过
OptimisticTransaction::txnVersion获取给定应用程序ID的最新提交版本(例如用于幂等性)。(注意此API要求用户提交SetTransaction操作。)在提交时通过
OptimisticTransaction::updateMetadata更新表的元数据。
DeltaScan
DeltaScan 是一个封装类,用于处理 Snapshot 中符合给定 readPredicate 条件的文件。
使用
DeltaScan::getFiles访问与readPredicate分区过滤条件匹配的文件。该方法返回一个针对表中元数据文件进行内存优化的迭代器。要进一步在非分区列上筛选返回的文件,可以获取未应用DeltaScan::getResidualPredicate的输入谓词部分。
API兼容性
目前Delta Standalone提供的唯一公共API位于io.delta.standalone包中。io.delta.standalone.internal包中的类和方法被视为内部实现,可能会在次要版本和补丁版本之间发生变化。
项目设置
您可以通过使用您偏好的构建工具将Delta Standalone库添加为依赖项。Delta Standalone依赖于hadoop-client和parquet-hadoop包。以下部分列出了示例构建文件。
构建文件
Maven
将hadoop-client的版本替换为您正在使用的版本。
Scala 2.12:
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-standalone_2.12</artifactId>
<version>0.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.0</version>
</dependency>
Scala 2.11:
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-standalone_2.11</artifactId>
<version>0.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.0</version>
</dependency>
存储配置
Delta Lake的ACID保证基于存储系统的原子性和持久性保证。并非所有存储系统都能提供所有必要的保证。
由于存储系统本身不一定能提供所有这些保证,Delta Lake的事务操作通常通过LogStore API而非直接访问存储系统来实现。为了为不同的存储系统提供ACID保证,您可能需要使用不同的LogStore实现。本节介绍如何为各种存储系统配置Delta Standalone。存储系统主要分为两类:
内置支持的存储系统:对于某些存储系统,您无需额外配置。Delta Standalone 使用路径的方案(即
s3a中的s3a://path)来动态识别存储系统,并使用提供事务保证的相应LogStore实现。不过对于 S3,并发写入存在额外注意事项。详情请参阅 关于 S3 的部分。其他存储系统:与Apache Spark类似,
LogStore使用Hadoop的FileSystemAPI进行读写操作。Delta Standalone支持在任何提供FileSystemAPI实现的存储系统上进行并发读取。对于具有事务保证的并发写入,根据FileSystem实现提供的保证分为两种情况:如果该实现提供一致的列表功能和原子性重命名且不覆盖(即rename(... , overwrite = false)会原子性地生成目标文件,或在文件已存在时抛出java.nio.file.FileAlreadyExistsException失败),那么默认使用重命名的LogStore实现将支持具有保证的并发写入。否则,您必须通过在使用DeltaLog.forTable(hadoopConf, path)实例化DeltaLog时设置以下Hadoop配置来配置自定义的LogStore实现:delta.logStore.<scheme>.impl=<full-qualified-class-name>
在这里,
表示您存储系统路径的协议方案。该配置使Delta Standalone能够动态地为这些路径使用指定的LogStore实现。您可以在应用程序中为不同协议方案配置多个此类设置,从而使其能够同时从不同存储系统读取和写入数据。注意
在0.5.0版本之前,Delta Standalone支持通过设置
io.delta.standalone.LOG_STORE_CLASS_KEY来配置LogStores。这种方法现已弃用。设置此配置将为所有路径使用配置的LogStore,从而禁用基于动态方案的委托。
Amazon S3配置
Delta Standalone 支持以两种不同模式对 S3 进行读写操作:单集群模式和多集群模式。
单集群 |
多集群 |
|
|---|---|---|
配置 |
开箱即用 |
处于实验阶段,需要额外配置 |
读取 |
支持从多个集群并发读取 |
支持从多个集群并发读取 |
写入 |
支持来自单个集群的并发写入 |
支持多集群写入 |
权限 |
S3凭证 |
S3和DynamoDB操作权限 |
单集群设置(默认)
默认情况下,Delta Standalone支持从多个集群并发读取。但是,对S3的并发写入必须来自单个集群才能提供事务保证。这是因为S3目前不提供互斥机制,也就是说,无法确保只有一个写入者能够创建文件。
警告
多个Spark驱动程序并发写入同一个Delta表可能导致数据丢失。
要在S3上使用Delta Standalone,您必须满足以下要求。如果使用访问密钥进行身份验证和授权,在通过DeltaLog.forTable(hadoopConf, path)实例化DeltaLog时,必须按如下方式配置Hadoop Configuration。
需求 (S3单集群)
S3凭证:IAM角色(推荐)或访问密钥。
与Delta Standalone编译版本对应的Hadoop AWS connector (hadoop-aws)。
配置 (S3 单集群)
在类路径中包含
hadoop-awsJAR。设置S3凭证。我们推荐使用IAM角色进行认证和授权。但如果想使用密钥,请按以下方式配置你的
org.apache.hadoop.conf.Configuration:conf.set("fs.s3a.access.key", "
" ); conf.set("fs.s3a.secret.key", "" );
多集群设置
注意
此支持是新的且处于实验阶段。
此模式支持从多个集群并发写入S3。通过配置Delta Standalone使用正确的LogStore实现来启用多集群支持。该实现使用DynamoDB来提供互斥机制。
警告
当从多个集群写入时,所有驱动程序都必须使用此LogStore实现以及相同的DynamoDB表和区域。如果某些驱动程序使用默认的LogStore,而其他驱动程序使用此实验性的LogStore,则可能导致数据丢失。
需求 (S3多集群)
满足需求(S3单集群)部分列出的所有要求
除了S3凭证外,您还需要DynamoDB操作权限
配置 (S3 多集群)
创建DynamoDB表。有关自行创建表(推荐)或自动创建表的更多详情,请参阅创建DynamoDB表。
按照配置(S3单集群)部分列出的配置步骤进行操作。
在类路径中包含
delta-storage-s3-dynamodbJAR 文件。配置
LogStore的实现。首先,为方案
s3配置这个LogStore实现。您也可以为方案s3a和s3n复制此命令。conf.set("delta.logStore.s3.impl", "io.delta.storage.S3DynamoDBLogStore");
配置键
描述
默认值
io.delta.storage.S3DynamoDBLogStore.ddb.tableName
要使用的DynamoDB表名称
delta_log
io.delta.storage.S3DynamoDBLogStore.ddb.region
客户端使用的区域
us-east-1
io.delta.storage.S3DynamoDBLogStore.credentials.provider
客户端使用的AWSCredentialsProvider*
io.delta.storage.S3DynamoDBLogStore.provisionedThroughput.rcu
(仅表创建时**) 读取容量单位
5
io.delta.storage.S3DynamoDBLogStore.provisionedThroughput.wcu
(仅表创建**) 写入容量单位
5
*有关AWS凭证提供程序的更多详细信息,请参阅AWS文档。
**这些配置仅在给定的DynamoDB表尚不存在且需要自动创建时使用。
生产环境配置 (S3多集群)
至此,这个多集群设置已经完全可运行。不过,在生产环境中运行时,您还可以进行额外的配置以提高性能并优化存储。详情请参阅Delta Lake文档。
Microsoft Azure 配置
Delta Standalone 支持从多个集群并发读写,并为各种 Azure 存储系统提供完整的事务保证。要使用 Azure 存储系统,您必须满足以下要求,并在实例化 DeltaLog 时按照指定配置 Hadoop 配置,如 DeltaLog.forTable(hadoopConf, path) 所示。
Azure Blob 存储
要求 (Azure Blob存储)
一个共享密钥或共享访问签名(SAS)。
Hadoop的Azure Blob Storage库,用于与Delta Standalone编译时所使用的Hadoop版本兼容的版本。
Hadoop 2的2.9.1+版本
Hadoop 3的3.0.1+版本
Azure Data Lake Storage Gen1
要求 (ADLS Gen 1)
用于OAuth 2.0访问的服务主体。
Hadoop的Azure Data Lake Storage Gen1库,用于与编译Delta Standalone时使用的Hadoop版本兼容的版本。
Hadoop 2的2.9.1+版本
Hadoop 3的3.0.1+版本
配置 (ADLS Gen 1)
在类路径中包含
hadoop-azure-datalakeJAR。设置Azure Data Lake Storage Gen1凭证。配置
org.apache.hadoop.conf.Configuration:conf.set("dfs.adls.oauth2.access.token.provider.type", "ClientCredential"); conf.set("dfs.adls.oauth2.client.id", "
" ); conf.set("dfs.adls.oauth2.credential", "" ); conf.set("dfs.adls.oauth2.refresh.url", "https://login.microsoftonline.com//oauth2/token" );
Azure Data Lake Storage Gen2
要求 (ADLS Gen 2)
在Azure Data Lake Storage Gen2中创建的账户。
服务主体已创建并被分配存储Blob数据贡献者角色给存储账户。
记录下主体的存储账户名称、目录ID(也称为租户ID)、应用ID和密码。这些将用于配置。
Hadoop的Azure Data Lake Storage Gen2库版本3.2+以及使用Hadoop 3.2+编译的Delta Standalone。
配置 (ADLS Gen 2)
在类路径中包含
hadoop-azure-datalakeJAR。此外,您可能还需要包含Maven构件hadoop-azure和wildfly-openssl的JAR。设置Azure Data Lake Storage Gen2凭据。使用以下配置您的
org.apache.hadoop.conf.Configuration:conf.set("fs.azure.account.auth.type.
.dfs.core.windows.net" , "OAuth"); conf.set("fs.azure.account.oauth.provider.type..dfs.core.windows.net" , "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider"); conf.set("fs.azure.account.oauth2.client.id..dfs.core.windows.net" , "" ); conf.set("fs.azure.account.oauth2.client.secret..dfs.core.windows.net" ,"" ); conf.set("fs.azure.account.oauth2.client.endpoint..dfs.core.windows.net" , "https://login.microsoftonline.com//oauth2/token" );其中
、、和是我们之前设置为需求的服务主体详细信息。
HDFS
Delta Standalone 内置支持 HDFS,可在多集群并发读写时提供完整的事务保证。有关配置凭据的信息,请参阅Hadoop文档。
Google 云存储
需求 (GCS)
GCS Connector (gcs-connector) Maven工件的JAR包。
Google Cloud Storage 账户和凭证
配置 (GCS)
在类路径中包含
gcs-connector的JAR文件。有关如何使用GCS连接器配置项目的详细信息,请参阅文档。
使用方法
这个示例展示了如何使用Delta Standalone来:
查找parquet文件。
写入parquet数据。
提交到事务日志。
从事务日志中读取。
读取回Parquet数据。
请注意,此示例使用虚构的非Spark引擎Zappy来实际写入parquet数据,因为Delta Standalone不提供任何数据写入API。相反,Delta Standalone Writer允许您在写入数据后将元数据提交到Delta日志。这就是为什么Delta Standalone能与众多连接器(如Flink、Presto、Trino等)良好协作的原因,因为这些连接器本身提供了parquet写入功能。
1. SBT配置
使用以下SBT项目配置:
// <project-root>/build.sbt
scalaVersion := "2.12.8"
libraryDependencies ++= Seq(
"io.delta" %% "delta-standalone" % "0.5.0",
"org.apache.hadoop" % "hadoop-client" % "3.1.0")
2. 模拟场景
我们有一个Delta表Sales存储销售数据,但发现2021年11月为客户XYZ写入的所有数据中的total_cost值都不正确。因此,我们需要用正确的值更新所有这些记录。我们将使用一个虚构的分布式引擎Zappy和Delta Standalone来更新我们的Delta表。
销售表的架构如下所示。
Sales
|-- year: int // partition column
|-- month: int // partition column
|-- day: int // partition column
|-- customer: string
|-- sale_id: string
|-- total_cost: float
3. 启动事务并查找相关文件
由于我们必须读取现有数据才能执行所需的更新操作,因此必须使用OptimisticTransaction::markFilesAsRead来自动检测对读取分区进行的任何并发修改。由于Delta Standalone仅支持分区剪枝,我们必须应用剩余谓词来进一步过滤返回的文件。
import io.delta.standalone.DeltaLog;
import io.delta.standalone.DeltaScan;
import io.delta.standalone.OptimisticTransaction;
import io.delta.standalone.actions.AddFile;
import io.delta.standalone.data.CloseableIterator;
import io.delta.standalone.expressions.And;
import io.delta.standalone.expressions.EqualTo;
import io.delta.standalone.expressions.Literal;
DeltaLog log = DeltaLog.forTable(new Configuration(), "/data/sales");
OptimisticTransaction txn = log.startTransaction();
DeltaScan scan = txn.markFilesAsRead(
new And(
new And(
new EqualTo(schema.column("year"), Literal.of(2021)), // partition filter
new EqualTo(schema.column("month"), Literal.of(11))), // partition filter
new EqualTo(schema.column("customer"), Literal.of("XYZ")) // non-partition filter
)
);
CloseableIterator<AddFile> iter = scan.getFiles();
Map<String, AddFile> addFileMap = new HashMap<String, AddFile>(); // partition filtered files: year=2021, month=11
while (iter.hasNext()) {
AddFile addFile = iter.next();
addFileMap.put(addFile.getPath(), addFile);
}
iter.close();
List<String> filteredFiles = ZappyReader.filterFiles( // fully filtered files: year=2021, month=11, customer=XYZ
addFileMap.keySet(),
toZappyExpression(scan.getResidualPredicate())
);
4. 写入更新后的Parquet数据
由于Delta Standalone不提供任何Parquet数据写入API,我们使用Zappy来写入数据。
ZappyDataFrame correctedSaleIdToTotalCost = ...;
ZappyDataFrame invalidSales = ZappyReader.readParquet(filteredFiles);
ZappyDataFrame correctedSales = invalidSales.join(correctedSaleIdToTotalCost, "id");
ZappyWriteResult dataWriteResult = ZappyWritter.writeParquet("/data/sales", correctedSales);
前面代码写入的数据文件将具有类似以下的层级结构:
$ tree /data/sales
.
├── _delta_log
│ └── ...
│ └── 00000000000000001082.json
│ └── 00000000000000001083.json
├── year=2019
│ └── month=1
...
├── year=2020
│ └── month=1
│ └── day=1
│ └── part-00000-195768ae-bad8-4c53-b0c2-e900e0f3eaee-c000.snappy.parquet // previous
│ └── part-00001-53c3c553-f74b-4384-b9b5-7aa45bc2291b-c000.snappy.parquet // new
| ...
│ └── day=2
│ └── part-00000-b9afbcf5-b90d-4f92-97fd-a2522aa2d4f6-c000.snappy.parquet // previous
│ └── part-00001-c0569730-5008-42fa-b6cb-5a152c133fde-c000.snappy.parquet // new
| ...
5. 提交到我们的Delta表
现在我们已经写入了正确的数据,需要提交到事务日志来添加新文件,并移除旧的不正确文件。
import io.delta.standalone.Operation;
import io.delta.standalone.actions.RemoveFile;
import io.delta.standalone.exceptions.DeltaConcurrentModificationException;
import io.delta.standalone.types.StructType;
List<RemoveFile> removeOldFiles = filteredFiles.stream()
.map(path -> addFileMap.get(path).remove())
.collect(Collectors.toList());
List<AddFile> addNewFiles = dataWriteResult.getNewFiles()
.map(file ->
new AddFile(
file.getPath(),
file.getPartitionValues(),
file.getSize(),
System.currentTimeMillis(),
true, // isDataChange
null, // stats
null // tags
);
).collect(Collectors.toList());
List<Action> totalCommitFiles = new ArrayList<>();
totalCommitFiles.addAll(removeOldFiles);
totalCommitFiles.addAll(addNewFiles);
try {
txn.commit(totalCommitFiles, new Operation(Operation.Name.UPDATE), "Zippy/1.0.0");
} catch (DeltaConcurrentModificationException e) {
// handle exception here
}
6. 从Delta表读取数据
Delta Standalone 提供了读取元数据和数据的API,如下所示。
6.1. 读取Parquet数据(分布式)
对于大多数使用场景,尤其是处理大量数据时,我们建议您使用Delta Standalone库作为仅元数据读取器,然后自行执行Parquet数据读取操作,通常以分布式方式进行。
Delta Standalone 提供了两种API来读取给定表快照中的文件。Snapshot::getAllFiles返回内存中的列表。从0.3.0版本开始,我们还提供了Snapshot::scan(filter)::getFiles,它支持分区剪枝和优化的内部迭代器实现。这里我们将使用后者。
import io.delta.standalone.Snapshot;
DeltaLog log = DeltaLog.forTable(new Configuration(), "/data/sales");
Snapshot latestSnapshot = log.update();
StructType schema = latestSnapshot.getMetadata().getSchema();
DeltaScan scan = latestSnapshot.scan(
new And(
new And(
new EqualTo(schema.column("year"), Literal.of(2021)),
new EqualTo(schema.column("month"), Literal.of(11))),
new EqualTo(schema.column("customer"), Literal.of("XYZ"))
)
);
CloseableIterator<AddFile> iter = scan.getFiles();
try {
while (iter.hasNext()) {
AddFile addFile = iter.next();
// Zappy engine to handle reading data in `addFile.getPath()` and apply any `scan.getResidualPredicate()`
}
} finally {
iter.close();
}
6.2. 读取Parquet数据(单JVM环境)
Delta Standalone 允许直接读取 Parquet 数据,使用 Snapshot::open。
import io.delta.standalone.data.RowRecord;
CloseableIterator<RowRecord> dataIter = log.update().open();
try {
while (dataIter.hasNext()) {
RowRecord row = dataIter.next();
int year = row.getInt("year");
String customer = row.getString("customer");
float totalCost = row.getFloat("total_cost");
}
} finally {
dataIter.close();
}
报告问题
我们使用GitHub Issues来跟踪社区报告的问题。您也可以通过联系社区获取解答。
贡献指南
我们欢迎对Delta Lake仓库的贡献。我们使用GitHub Pull Requests来接受变更。