Apache Spark上的LightGBM
LightGBM
LightGBM 是一个开源的、分布式的、高性能的梯度提升(GBDT, GBRT, GBM, 或 MART)框架。该框架专注于创建高质量且支持GPU的决策树算法,用于排序、分类以及许多其他机器学习任务。LightGBM 是微软 DMTK 项目的一部分。
通过SynapseML使用LightGBM的优势
- 可组合性: LightGBM 模型可以集成到现有的 SparkML 管道中,并用于批处理、流处理和服务工作负载。
- 性能: 在Higgs数据集上,Spark上的LightGBM比SparkML快10-30%,并且AUC提高了15%。并行实验已验证,在特定设置下,LightGBM可以通过使用多台机器进行训练实现线性加速。
- 功能: LightGBM 提供了大量的可调参数,可以用来定制决策树系统。Spark 上的 LightGBM 还支持新类型的问题,如分位数回归。
- 跨平台 LightGBM on Spark 可在 Spark、PySpark 和 SparklyR 上使用
用法
在 PySpark 中,你可以通过以下方式运行 LightGBMClassifier
:
from synapse.ml.lightgbm import LightGBMClassifier
model = LightGBMClassifier(learningRate=0.3,
numIterations=100,
numLeaves=31).fit(train)
同样地,你可以通过设置application
和alpha
参数来运行LightGBMRegressor
:
from synapse.ml.lightgbm import LightGBMRegressor
model = LightGBMRegressor(application='quantile',
alpha=0.3,
learningRate=0.3,
numIterations=100,
numLeaves=31).fit(train)
对于一个端到端的应用程序,请查看LightGBM notebook示例。
参数/参数
SynapseML 为许多常见的 LightGBM 参数提供了 getter/setter 方法。 在 Python 中,您可以使用属性-值对,或者在 Scala 中使用 流式设置器。本节展示了这两种方法的示例。
import com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier
val classifier = new LightGBMClassifier()
.setLearningRate(0.2)
.setNumLeaves(50)
LightGBM 拥有比 SynapseML 暴露的参数多得多的参数。对于需要设置一些 SynapseML 未提供设置器的参数的情况,可以使用 passThroughArgs。这个参数只是一个自由字符串,您可以用它来向 SynapseML 发送的命令中添加额外的参数以配置 LightGBM。
在 Python 中:
from synapse.ml.lightgbm import LightGBMClassifier
model = LightGBMClassifier(passThroughArgs="force_row_wise=true min_sum_hessian_in_leaf=2e-3",
numIterations=100,
numLeaves=31).fit(train)
在Scala中:
import com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier
val classifier = new LightGBMClassifier()
.setPassThroughArgs("force_row_wise=true min_sum_hessian_in_leaf=2e-3")
.setLearningRate(0.2)
.setNumLeaves(50)
有关格式化选项和特定参数文档,请参阅 LightGBM 文档。SynapseML 专门为 Spark 分布式环境设置了一些 参数,这些参数不应更改。某些参数仅适用于 CLI 模式,在 Spark 中不起作用。
你可以混合使用passThroughArgs和显式参数,如示例所示。SynapseML 将它们合并以创建一个参数字符串发送给LightGBM。如果你在两个地方都设置了参数, passThroughArgs将优先。
架构
Spark上的LightGBM使用简单包装器和接口生成器(SWIG)为LightGBM添加Java支持。这些Java绑定使用Java本地接口调用分布式C++ API。
我们通过在MapPartitions调用中使用Spark执行器调用LGBM_NetworkInit
来初始化LightGBM。然后,我们将每个工作节点的分区传递给LightGBM,以创建LightGBM的内存分布式数据集。然后,我们可以训练LightGBM以生成一个模型,该模型可以用于推理。
LightGBMClassifier
和 LightGBMRegressor
使用 SparkML API,
继承自相同的基类,与 SparkML 管道集成,
并可以使用 SparkML 的交叉验证器 进行调整。
构建的模型可以使用saveNativeModel()
保存为带有原生LightGBM模型的SparkML管道。此外,它们与PMML完全兼容,并且可以通过JPMML-SparkML-LightGBM插件转换为PMML格式。
动态分配限制
原生的LightGBM库具有分布式模式,允许算法在多个机器上工作。SynapseML 使用此模式从Spark调用LightGBM。SynapseML首先收集所有Spark执行器的网络信息,将其传递给LightGBM,然后 等待LightGBM完成其工作。然而,原生的LightGBM算法实现假设所有网络在单个 训练或评分会话期间是恒定的。原生的LightGBM分布式模式是这样设计的,并不是SynapseML本身的限制。
如果在数据处理过程中Spark执行器发生变化,动态计算变化可能会导致LightGBM出现问题。Spark自然可以利用集群自动扩展,并且可以动态替换任何失败的执行器,但LightGBM无法处理这些网络变化。特别是大数据集会受到影响,因为它们更有可能导致执行器扩展或在单次处理过程中出现单个执行器失败。
如果您由于执行器更改而遇到通过SynapseML暴露的LightGBM问题(例如,偶尔的任务失败或网络挂起),有几种选择。
- 在Spark平台上,关闭您已配置的集群上的任何自动扩展。
- 手动设置numTasks为较小的值,以便使用较少的执行器(减少单个执行器失败的概率)。
- 在笔记本单元格中使用配置关闭动态执行器扩展。在Synapse和Fabric中,您可以使用:
%%configure
{
"conf":
{
"spark.dynamicAllocation.enabled": "false"
}
}
注意:如果您的计算平台利用“实时池”来提高笔记本性能,设置任何自定义配置可能会影响集群启动时间。
如果您仍然遇到问题,可以考虑使用numBatches将数据分割成更小的部分。分割成多个批次会增加总处理时间,但可能有助于提高可靠性。
数据传输模式
SynapseML 在将控制权交给实际的 LightGBM 执行代码进行训练和推理之前,必须将数据从 Spark 分区传递到 LightGBM 原生数据集。SynapseML 有两种模式控制数据的传输方式:流式和批量。此模式不会影响训练,但可能会影响内存使用和整体拟合/转换时间。
批量执行模式
"Bulk"模式较旧,需要在创建Datasets之前将所有数据累积在执行器内存中。对于大数据,这种模式可能会导致OOM错误,特别是因为数据必须以原始的未压缩双格式大小累积。目前,"bulk"模式是默认的,因为"streaming"是新的,但SynapseML最终将使流式传输成为默认模式。
对于批量模式,原生的LightGBM数据集可以按分区创建(useSingleDatasetMode=false),或者按执行器创建(useSingleDatasetMode=true)。通常,每个执行器一个数据集更高效,因为它减少了LightGBM在训练或拟合期间的网络规模和复杂性。它还避免了在同一执行器节点上的分区上使用慢速网络协议。
流式执行模式
"streaming" 执行模式使用专为 SynapseML 创建的新原生 LightGBM API,这些 API 不需要将数据的额外副本加载到内存中。特别是,数据直接从分区传递到数据集,以小的“微批次”形式传递,类似于 Spark 流式处理。microBatchSize
参数控制这些微批次的大小。较小的微批次大小减少了内存开销,但较大的大小避免了重复将数据传输到原生层的开销。默认值为 100,由于一次只加载 100 行数据,因此比批量模式使用更少的内存。如果您的数据集列数较少,可以增加批次大小。或者,如果您的数据集列数较多,可以减少微批次大小以避免 OOM 问题。
LightGBM中的这些新流式API是线程安全的,允许同一执行器中的所有分区并行将数据推送到共享的Dataset中。因此,流式模式始终使用更高效的"useSingleDatasetMode=true",每个执行器只创建一个Dataset。
您可以明确指定执行模式和微批处理大小作为参数。
val lgbm = new LightGBMClassifier()
.setExecutionMode("streaming")
.setMicroBatchSize(100)
.setLabelCol(labelColumn)
.setObjective("binary")
...
<train classifier>
对于流模式,每个分区只创建一个Dataset,因此useSingleDataMode没有效果。它实际上总是为真。
数据采样
为了使LightGBM算法工作,它必须首先创建一组用于优化的分箱边界。它通过在开始任何训练或推理之前对数据进行采样来完成此计算。(LightGBM文档)。使用的样本数量通过binSampleCount设置,它必须是数据的最小百分比,否则LightGBM会拒绝它。
对于批量模式,这种采样会自动在整个数据上进行,每个执行器使用自己的分区仅计算一部分特征的样本。这种分布式采样可能会产生微妙的影响,因为分区可能会影响计算的分箱。此外,无论什么情况,所有数据都会被采样。
对于流式模式,有更明确的用户控制来进行这种采样,所有操作都在驱动程序中完成。 samplingMode属性控制行为。这些方法的效率从第一个到最后一个逐渐提高。
- global - 类似于批量模式,随机样本是通过遍历整个数据来计算的(因此数据会被遍历两次)
- subset - (默认)仅从第一个samplingSubsetSize元素中采样。假设此子集具有代表性。
- fixed - 没有随机样本。使用前binSampleSize行。假设数据是随机的。 对于大量行数,subset和fixed模式可以节省对整个数据的第一次迭代。
参考数据集
每次fit调用时都会对数据进行采样以计算分箱边界。 如果多次重复拟合(例如,超参数调优),这种计算是重复的工作。
对于流式模式,客户端可以设置一个优化选项,以使用先前计算的区间边界。采样计算会生成一个参考数据集,该数据集可以重复使用。拟合后,估计器上将有一个referenceDataset属性,该属性是计算并用于该拟合的。如果在下一个估计器上设置该属性(或者您重用相同的估计器),它将使用该属性而不是重新采样数据。
from synapse.ml.lightgbm import LightGBMClassifier
classifier = LightGBMClassifier(learningRate=0.3,
numIterations=100,
numLeaves=31)
model1 = classifier.fit(train)
classifier.learningRate = 0.4
model2 = classifier.fit(train)
对 'fit' 的 'model2' 调用不会重新采样数据,并使用与 'model1' 相同的分箱边界。
注意:某些参数实际上会影响分箱边界的计算,并且每次都需要使用新的参考数据集。 这些参数包括可以从SynapseML设置的isEnableSparse、useMissing和zeroAsMissing。如果您手动设置 一些参数与passThroughArgs,您应该查看LightGBM文档以了解它们是否会影响分箱边界。如果您设置了 任何影响分箱边界的参数并重用相同的估计器,您应该在调用之间将referenceDataset设置为空数组。
屏障执行模式
默认情况下,LightGBM 使用常规的 Spark 范式来启动任务,并与驱动程序通信以协调任务执行。 驱动程序线程会聚合所有任务的主机:端口信息,然后将完整列表传回给工作节点,以便调用 NetworkInit。 此过程要求驱动程序知道有多少任务,预期任务数量与实际数量之间的不匹配会导致初始化死锁。
如果您遇到网络问题,可以尝试使用Spark的barrier执行模式。SynapseML提供了一个UseBarrierExecutionMode
标志,
以使用Apache Spark的barrier()
阶段来确保所有任务同时执行。
Barrier执行模式改变了逻辑,以同步方式在所有任务中聚合host:port
信息。
要在scala中使用它,您可以调用setUseBarrierExecutionMode(true),例如:
val lgbm = new LightGBMClassifier()
.setLabelCol(labelColumn)
.setObjective(binaryObjective)
.setUseBarrierExecutionMode(true)
...
<train classifier>
注意:屏障执行模式也可能导致复杂的问题,因此仅在需要时使用。