决策树 - 基于RDD的API
Decision trees 及其集成方法是机器学习分类和回归任务中常用的方法。决策树被广泛使用,因为它们易于解释,能够处理分类特征,可以扩展到多类分类设置,不需要特征缩放,并且能够捕捉非线性和特征交互。树集成算法如随机森林和提升方法在分类和回归任务中都是表现最佳的算法之一。
spark.mllib
支持用于二元和多类分类以及回归的决策树,使用连续和分类特征。该实现按行对数据进行分区,允许使用数百万个实例进行分布式训练。
树的集成(随机森林和梯度提升树)在 集成指南 中进行了描述。
基础算法
决策树是一种贪心算法,它对特征空间进行递归的二分划分。树为每个最底层(叶子)分区预测相同的标签。每个分区通过从一组可能的分割中选择
最佳划分
来贪心地选择,以便最大化树节点的信息增益。换句话说,在每个树节点选择的划分是从集合
$\underset{s}{\operatorname{argmax}} IG(D,s)$
中选出的,其中
$IG(D,s)$
是当对数据集
$D$
应用划分
$s$
时的信息增益。
节点不纯度和信息增益
节点 杂质 是节点标签的同质性度量。当前的实现为分类提供了两种杂质度量(基尼杂质和熵)以及一种回归的杂质度量(方差)。
| 杂质 | 任务 | 公式 | 描述 |
|---|---|---|---|
| 基尼杂质 | 分类 | $\sum_{i=1}^{C} f_i(1-f_i)$ | $f_i$是节点上标签$i$的频率,$C$是唯一标签的数量。 |
| 熵 | 分类 | $\sum_{i=1}^{C} -f_ilog(f_i)$ | $f_i$是节点上标签$i$的频率,$C$是唯一标签的数量。 |
| 方差 | 回归 | $\frac{1}{N} \sum_{i=1}^{N} (y_i - \mu)^2$ | $y_i$是一个实例的标签,$N$是实例的数量,$\mu$是由$\frac{1}{N} \sum_{i=1}^N y_i$给出的均值。 |
信息增益是父节点杂质与两个子节点杂质加权和之间的差异。假设一个分裂
$s$
将大小为
$N$
的数据集
$D$
划分为两个数据集
$D_{left}$
和
$D_{right}$
,其大小分别为
$N_{left}$
和
$N_{right}$
,信息增益为:
$IG(D,s) = Impurity(D) - \frac{N_{left}}{N} Impurity(D_{left}) - \frac{N_{right}}{N} Impurity(D_{right})$
拆分候选人
连续特征
对于单机实现中的小数据集,每个连续特征的分裂候选通常是特征的唯一值。一些实现会对特征值进行排序,然后使用有序的唯一值作为分裂候选,以加快树的计算速度。
对大型分布式数据集进行排序特征值是非常耗费资源的。
此实现通过对数据的一个抽样部分进行分位数计算,来计算一组近似的分割候选。
有序的分割创建“箱”,并且可以使用
maxBins
参数指定最大箱数。
请注意,箱子的数量不能大于实例的数量
$N$
(这是一种罕见的情况,因为默认的
maxBins
值是 32)。如果条件不满足,树算法会自动减少箱子的数量。
分类特征
对于一个拥有
$M$
个可能值(类别)的分类特征,可以产生
$2^{M-1}-1$
个拆分候选。对于二元(0/1)分类和回归,我们可以通过按平均标签对分类特征值进行排序,减少拆分候选的数量到
$M-1$
。 (有关详细信息,请参见
Elements of Statistical Machine Learning
的第9.2.4节。)例如,对于一个具有三个类别A、B和C的分类特征的二元分类问题,它们对应的标签1的比例分别为0.2、0.6和0.4,分类特征的顺序为A、C、B。两个拆分候选为A | C, B 和 A , C | B,其中 | 表示拆分。
在多类别分类中,所有
$2^{M-1}-1$
可能的分裂会尽可能地使用。当
$2^{M-1}-1$
大于
maxBins
参数时,我们使用一种(启发式)方法,类似于用于二元分类和回归的方法。
$M$
个类别特征值按杂质排序,结果
$M-1$
个分裂候选被考虑。
停止规则
当满足以下任一条件时,递归树构建将在节点处停止:
-
节点深度等于
maxDepth训练参数。 -
没有拆分候选者导致的信息增益大于
minInfoGain。 -
没有拆分候选者产生的子节点每个至少有
minInstancesPerNode个训练实例。
使用技巧
我们包含了一些使用决策树的指南,通过讨论各种参数。 参数按重要性降序排列。 新用户应该主要考虑“问题规格参数”部分和
maxDepth
参数。
问题规范参数
这些参数描述了您想要解决的问题以及您的数据集。它们应被指定并且不需要调整。
-
algo: 决策树的类型,分为Classification或Regression。 -
numClasses: 类别的数量(仅适用于Classification)。 -
categoricalFeaturesInfo: 指定哪些特征是类别特征,以及这些特征可以取的类别值的数量。 这作为特征索引到特征种类(类别数量)的映射给出。 任何不在此映射中的特征将被视为连续特征。-
例如,
Map(0 -> 2, 4 -> 10)指定特征0是二元的(取值0或1),而特征4有 10 个类别(取值{0, 1, ..., 9})。 注意特征索引是从 0 开始的:特征0和4是实例特征向量的第 1 个和第 5 个元素。 -
请注意,您不必指定
categoricalFeaturesInfo。算法仍然可以运行并可能得到合理的结果。 但是,如果正确指定了类别特征,性能应该会更好。
-
例如,
停止标准
这些参数决定了树何时停止构建(添加新节点)。在调整这些参数时,请务必在保留的测试数据上进行验证,以避免过拟合。
-
maxDepth: 树的最大深度。更深的树更具表现力(可能允许更高的准确性),但训练成本更高,更容易产生过拟合。 -
minInstancesPerNode: 为了进一步分裂一个节点,每个子节点必须至少接收这个数量的训练实例。这通常与 RandomForest 一起使用,因为这些通常比单独的树训练得更深。 -
minInfoGain: 为了进一步分裂一个节点,分裂必须至少改善这个数量(在信息增益方面)。
可调参数
这些参数可以进行调整。在调优时请注意在保留的测试数据上进行验证,以避免过拟合。
-
maxBins: 用于离散化连续特征的箱子的数量。-
增加
maxBins允许算法考虑更多的切分候选,并做出细粒度的切分决策。然而,这也会增加计算和通信的开销。 -
注意,
maxBins参数必须至少等于任何分类特征的最大类别数量$M$。
-
增加
-
maxMemoryInMB: 用于收集足够统计信息的内存量。-
默认值谨慎选择为 256 MiB,以允许决策算法在大多数场景中工作。增加
maxMemoryInMB可以通过减少对数据的遍历次数来加快训练速度(如果有足够的内存)。然而,随着maxMemoryInMB的增加,可能会出现递减收益,因为每次迭代中通信的量可能与maxMemoryInMB成正比。 -
实现细节
:为了更快的处理,决策树算法收集关于要切分的节点组的统计信息(而不是一个节点一个节点地处理)。可以在一个组中处理的节点数量由内存需求决定(这会因特征而异)。
maxMemoryInMB参数指定每个工作节点可以用于这些统计信息的内存限制(以兆字节为单位)。
-
默认值谨慎选择为 256 MiB,以允许决策算法在大多数场景中工作。增加
-
subsamplingRate: 用于学习决策树的训练数据的比例。该参数对于训练树的集成(使用RandomForest和GradientBoostedTrees)最为相关,在这种情况下,可以对子样本进行抽样。对于训练单个决策树,此参数的实用性较低,因为训练实例的数量通常不是主要限制。 -
impurity: 用于在候选切分之间选择的纯度度量(如上所述)。该度量必须与algo参数匹配。
缓存和检查点
MLlib 1.2 添加了多个功能,以支持更大(更深)树和树集成的扩展。当
maxDepth
设置为较大时,启用节点 ID 缓存和检查点可能会很有用。这些参数在
RandomForest
的
numTrees
设置为较大时也很有用。
-
useNodeIdCache: 如果将此设置为 true,算法将在每次迭代中避免将当前模型(树或树的集合)传递给执行器。- 这对于深树(加速工作者上的计算)和大型随机森林(减少每次迭代的通信)很有用。
- 实施细节 : 默认情况下,算法将当前模型传达给执行器,以便执行器可以将训练实例与树节点匹配。当此设置开启时,算法将缓存这些信息。
节点ID缓存产生了一系列RDD(每次迭代一个)。这个较长的遗传可以导致性能问题,但对中间RDD进行检查点可以缓解这些问题。
请注意,只有在
useNodeIdCache
被设置为true时,检查点才适用。
-
checkpointDir: 检查点节点ID缓存RDD的目录。 -
checkpointInterval: 检查点节点ID缓存RDD的频率。将此设置得过低会导致写入HDFS时产生额外开销;如果设置得过高,则在执行器失败且需要重新计算RDD时可能会导致问题。
缩放
计算的规模大约与训练实例的数量、特征的数量以及
maxBins
参数呈线性关系。通信的规模大约与特征的数量和
maxBins
呈线性关系。
实现的算法能够读取稀疏和密集数据。然而,它并不针对稀疏输入进行优化。
示例
分类
下面的示例演示了如何加载一个
LIBSVM 数据文件
,将其解析为一个
LabeledPoint
的 RDD,然后使用 Gini impurity 作为不纯度度量,通过决策树进行分类,最大树深度为 5。测试误差被计算出来以衡量算法的准确性。
有关API的更多详细信息,请参阅
DecisionTree
Python文档
和
DecisionTreeModel
Python文档
。
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils
# 加载并解析数据文件为LabeledPoint的RDD。
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# 将数据分为训练集和测试集(30%用于测试)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# 训练一个决策树模型。
# 空的categoricalFeaturesInfo表示所有特征都是连续的。
model = DecisionTree.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},
impurity='gini', maxDepth=5, maxBins=32)
# 在测试实例上评估模型并计算测试错误
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(
lambda lp: lp[0] != lp[1]).count() / float(testData.count())
print('测试错误 = ' + str(testErr))
print('学习到的分类树模型:')
print(model.toDebugString())
# 保存和加载模型
model.save(sc, "target/tmp/myDecisionTreeClassificationModel")
sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeClassificationModel")
请参阅
DecisionTree
的Scala文档
和
DecisionTreeModel
的Scala文档
以获取API的详细信息。
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.util.MLUtils
// 加载并解析数据文件。
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// 将数据拆分为训练集和测试集(30%用于测试)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
// 训练一个决策树模型。
// 空的 categoricalFeaturesInfo 表示所有特征都是连续的。
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val impurity = "gini"
val maxDepth = 5
val maxBins = 32
val model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,
impurity, maxDepth, maxBins)
// 在测试实例上评估模型并计算测试错误
val labelAndPreds = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val testErr = labelAndPreds.filter(r => r._1 != r._2).count().toDouble / testData.count()
println(s"测试错误 = $testErr")
println(s"学习到的分类树模型:\n ${model.toDebugString}")
// 保存和加载模型
model.save(sc, "target/tmp/myDecisionTreeClassificationModel")
val sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeClassificationModel")
请参考
DecisionTree
Java 文档
和
DecisionTreeModel
Java 文档
以获取有关API的详细信息。
import java.util.HashMap;
import java.util.Map;
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.DecisionTree;
import org.apache.spark.mllib.tree.model.DecisionTreeModel;
import org.apache.spark.mllib.util.MLUtils;
SparkConf sparkConf = new SparkConf().setAppName("JavaDecisionTreeClassificationExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// 加载并解析数据文件。
String datapath = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), datapath).toJavaRDD();
// 将数据分为训练集和测试集(30%用于测试)
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.7, 0.3});
JavaRDD<LabeledPoint> trainingData = splits[0];
JavaRDD<LabeledPoint> testData = splits[1];
// 设置参数。
// 空的 categoricalFeaturesInfo 表示所有特征都是连续的。
int numClasses = 2;
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
String impurity = "gini";
int maxDepth = 5;
int maxBins = 32;
// 训练决策树模型进行分类。
DecisionTreeModel model = DecisionTree.trainClassifier(trainingData, numClasses,
categoricalFeaturesInfo, impurity, maxDepth, maxBins);
// 在测试实例上评估模型并计算测试错误
JavaPairRDD<Double, Double> predictionAndLabel =
testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
double testErr =
predictionAndLabel.filter(pl -> !pl._1().equals(pl._2())).count() / (double) testData.count();
System.out.println("测试错误: " + testErr);
System.out.println("学习到的分类树模型:\n" + model.toDebugString());
// 保存和加载模型
model.save(jsc.sc(), "target/tmp/myDecisionTreeClassificationModel");
DecisionTreeModel sameModel = DecisionTreeModel
.load(jsc.sc(), "target/tmp/myDecisionTreeClassificationModel");
回归
下面的示例演示了如何加载一个
LIBSVM 数据文件
,
将其解析为
LabeledPoint
的 RDD,然后
使用决策树进行回归,选择方差作为不纯度度量,最大树深度为 5。最后计算均方误差 (MSE) 以评估
拟合优度
。
请参阅
DecisionTree
Python 文档
和
DecisionTreeModel
Python 文档
以获取有关该 API 的更多详细信息。
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils
# 加载并解析数据文件到LabeledPoint的RDD中。
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# 将数据拆分为训练集和测试集(30%保留用于测试)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# 训练一个决策树模型。
# 空的categoricalFeaturesInfo表示所有特征都是连续的。
model = DecisionTree.trainRegressor(trainingData, categoricalFeaturesInfo={},
impurity='variance', maxDepth=5, maxBins=32)
# 在测试实例上评估模型并计算测试误差
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testMSE = labelsAndPredictions.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() /\
float(testData.count())
print('测试均方误差 = ' + str(testMSE))
print('学习到的回归树模型:')
print(model.toDebugString())
# 保存和加载模型
model.save(sc, "target/tmp/myDecisionTreeRegressionModel")
sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeRegressionModel")
请参阅
DecisionTree
的Scala文档
和
DecisionTreeModel
的Scala文档
以获取API的详细信息。
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.util.MLUtils
// 加载并解析数据文件。
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// 将数据分为训练集和测试集(30%用作测试)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
// 训练决策树模型。
// 空的 categoricalFeaturesInfo 表示所有特征都是连续的。
val categoricalFeaturesInfo = Map[Int, Int]()
val impurity = "variance"
val maxDepth = 5
val maxBins = 32
val model = DecisionTree.trainRegressor(trainingData, categoricalFeaturesInfo, impurity,
maxDepth, maxBins)
// 在测试实例上评估模型并计算测试误差
val labelsAndPredictions = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val testMSE = labelsAndPredictions.map{ case (v, p) => math.pow(v - p, 2) }.mean()
println(s"测试均方误差 = $testMSE")
println(s"学习到的回归树模型:\n ${model.toDebugString}")
// 保存和加载模型
model.save(sc, "target/tmp/myDecisionTreeRegressionModel")
val sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeRegressionModel")
有关API的详细信息,请参阅
DecisionTree
Java 文档
和
DecisionTreeModel
Java 文档
。
import java.util.HashMap;
import java.util.Map;
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.DecisionTree;
import org.apache.spark.mllib.tree.model.DecisionTreeModel;
import org.apache.spark.mllib.util.MLUtils;
SparkConf sparkConf = new SparkConf().setAppName("JavaDecisionTreeRegressionExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// 加载并解析数据文件。
String <span class="n