评估指标 - 基于RDD的API

spark.mllib 提供了多种机器学习算法,可以用于从数据中学习和进行预测。当这些算法被应用于构建机器学习模型时,需要根据应用和其要求评估模型的性能。 spark.mllib 还提供了一套用于评估机器学习模型性能的指标。

特定的机器学习算法属于更广泛的机器学习应用类型,如分类、回归、聚类等。每种类型都有成熟的性能评估指标,而当前在 spark.mllib 中可用的指标在本节中详细介绍。

分类模型评估

虽然有许多不同类型的分类算法,但分类模型的评估都有相似的原则。在一个 监督分类问题 中,对于每个数据点,存在一个真实输出和一个模型生成的预测输出。因此,每个数据点的结果可以分配到四个类别中的一个:

这四个数字是大多数分类器评估指标的基本构建块。在考虑分类器评估时,一个基本点是纯准确性(即预测是正确的还是错误的)一般不是一个好的指标。原因在于数据集可能高度不平衡。例如,如果一个模型旨在从一个95%数据点为 不是欺诈 和5%数据点为 欺诈 的数据集中预测欺诈,那么一个天真的分类器无论输入是什么都会预测 不是欺诈 ,这样它的准确率将是95%。因此,像 精确度和召回率 这样的指标通常被使用,因为它们考虑了错误的 类型 。在大多数应用中,精确度和召回率之间存在某种期望的平衡,这可以通过将两者结合成一个单一的指标来捕捉,这个指标被称为 F-measure

二分类

二分类器 用于将给定数据集中的元素分为两组中的一个(例如:欺诈或非欺诈),并且是多类别分类的一个特例。大多数二分类分类指标可以推广到多类别分类指标。

阈值调节

理解许多分类模型实际上输出一个“分数”(通常是概率)对于每个类别非常重要,其中较高的分数表示更高的可能性。在二元情况下,模型可能会为每个类别输出一个概率:$P(Y=1|X)$ 和 $P(Y=0|X)$。而不是简单地选择更高的概率,在某些情况下,模型可能需要调整,以便仅在概率非常高时预测一个类别(例如,仅在模型预测欺诈的概率超过90%时阻止信用卡交易)。因此,有一个预测 阈值 ,它根据模型输出的概率来决定预测的类别将是什么。

调整预测阈值将会改变模型的精确度和召回率,并且是模型优化的重要部分。为了可视化精确度、召回率和其他指标如何随着阈值的变化而变化,通常会将竞争指标绘制在一起,并根据阈值进行参数化。P-R 曲线绘制了不同阈值下的 (精确度,召回率) 点,而 接收者操作特征 ,或称 ROC 曲线则绘制了 (召回率,假阳性率) 点。

可用的指标

指标 定义
精确度(正预测值) $PPV=\frac{TP}{TP + FP}$
召回率(真正率) $TPR=\frac{TP}{P}=\frac{TP}{TP + FN}$
F-measure $F(\beta) = \left(1 + \beta^2\right) \cdot \left(\frac{PPV \cdot TPR} {\beta^2 \cdot PPV + TPR}\right)$
接收者操作特征(ROC) $FPR(T)=\int^\infty_{T} P_0(T)\,dT \\ TPR(T)=\int^\infty_{T} P_1(T)\,dT$
ROC曲线下面积 $AUROC=\int^1_{0} \frac{TP}{P} d\left(\frac{FP}{N}\right)$
精确率-召回率曲线下面积 $AUPRC=\int^1_{0} \frac{TP}{TP+FP} d\left(\frac{TP}{P}\right)$

示例

The following code snippets illustrate how to load a sample dataset, train a binary classification algorithm on the data, and evaluate the performance of the algorithm by several binary evaluation metrics.

请参阅 BinaryClassificationMetrics Python 文档 LogisticRegressionWithLBFGS Python 文档 获取有关 API 的更多详细信息。

from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.util import MLUtils
# 当前在scala中可用的几个方法在pyspark中缺失
# 以LIBSVM格式加载训练数据
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_binary_classification_data.txt")
# 将数据拆分为训练集(60%)和测试集(40%)
training, test = data.randomSplit([0.6, 0.4], seed=11)
training.cache()
# 运行训练算法以构建模型
model = LogisticRegressionWithLBFGS.train(training)
# 计算测试集上的原始分数
predictionAndLabels = test.map(lambda lp: (float(model.predict(lp.features)), lp.label))
# 实例化指标对象
metrics = BinaryClassificationMetrics(predictionAndLabels)
# 精确率-召回率曲线下面积
print("PR下面积 = %s" % metrics.areaUnderPR)
# ROC曲线下面积
print("ROC下面积 = %s" % metrics.areaUnderROC)
Find full example code at "examples/src/main/python/mllib/binary_classification_metrics_example.py" in the Spark repo.

请参考 LogisticRegressionWithLBFGS 的Scala文档 BinaryClassificationMetrics 的Scala文档 ,获取有关API的详细信息。

import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
// 以 LIBSVM 格式加载训练数据
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_binary_classification_data.txt")
// 将数据拆分为训练集(60%)和测试集(40%)
val Array(training, test) = data.randomSplit(Array(0.6, 0.4), seed = 11L)
training.cache()
// 运行训练算法来构建模型
val model = new LogisticRegressionWithLBFGS()
.setNumClasses(2)
.run(training)
// 清除预测阈值,以便模型将返回概率
model.clearThreshold
// 在测试集上计算原始分数
val predictionAndLabels = test.map { case LabeledPoint(label, features) =>
val prediction = model.predict(features)
(prediction, label)
}
// 实例化指标对象
val metrics = new BinaryClassificationMetrics(predictionAndLabels)
// 按阈值计算精确度
val precision = metrics.precisionByThreshold
precision.collect.foreach { case (t, p) =>
println(s"阈值: $t, 精确度: $p")
}
// 按阈值计算召回率
val recall = metrics.recallByThreshold
recall.collect.foreach { case (t, r) =>
println(s"阈值: $t, 召回率: $r")
}
// 精确度-召回率曲线
val PRC = metrics.pr
// F-measure
val f1Score = metrics.fMeasureByThreshold
f1Score.collect.foreach { case (t, f) =>
println(s"阈值: $t, F-score: $f, Beta = 1")
}
val beta = 0.5
val fScore = metrics.fMeasureByThreshold(beta)
fScore.collect.foreach { case (t, f) =>
println(s"阈值: $t, F-score: $f, Beta = 0.5")
}
// AUPRC
val auPRC = metrics.areaUnderPR
println(s"精确率-召回率曲线下的面积 = $auPRC")
// 计算 ROC 和 PR 曲线中使用的阈值
val thresholds = precision.map(_._1)
// ROC 曲线
val roc = metrics.roc
// AUROC
val auROC = metrics.areaUnderROC
println(s"ROC 曲线下的面积 = $auROC")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassificationMetricsExample.scala" in the Spark repo.

请参考 LogisticRegressionModel Java 文档 LogisticRegressionWithLBFGS Java 文档 以获取 API 的详细信息。

import scala.Tuple2;
import org.apache.spark.api.java.*;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS;
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
String path = "data/mllib/sample_binary_classification_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD();
// 将初始 RDD 分成两部分... [60% 训练数据, 40% 测试数据].
JavaRDD<LabeledPoint>[] splits =
data.randomSplit(new double[]{0.6, 0.4}, 11L);
JavaRDD<LabeledPoint> training = splits[0].cache();
JavaRDD<LabeledPoint> test = splits[1];
// 运行训练算法以构建模型.
LogisticRegressionModel model = new LogisticRegressionWithLBFGS()
.setNumClasses(2)
.run(training.rdd());
// 清除预测阈值,以便模型将返回概率
model.clearThreshold();
// 在测试集上计算原始分数.
JavaPairRDD<Object, Object> predictionAndLabels = test.mapToPair(p ->
new Tuple2<>(model.predict(p.features()), p.label()));
// 获取评估指标.
BinaryClassificationMetrics metrics =
new BinaryClassificationMetrics(predictionAndLabels.rdd());
// 根据阈值的精确度
JavaRDD<Tuple2<Object, Object>> precision = metrics.precisionByThreshold().toJavaRDD();
System.out.println("根据阈值的精确度: " + precision.collect());
// 根据阈值的召回率
JavaRDD recall = metrics.recallByThreshold().toJavaRDD();
System.out.println("根据阈值的召回率: " + recall.collect());
// 根据阈值的 F 分数
JavaRDD f1Score = metrics.fMeasureByThreshold().toJavaRDD();
System.out.println("根据阈值的 F1 分数: " + f1Score.collect());
JavaRDD f2Score = metrics.fMeasureByThreshold(2.0).toJavaRDD();
System.out.println("根据阈值的 F2 分数: " + f2Score.collect());
// 精确度-召回率曲线
JavaRDD prc = metrics.pr().toJavaRDD();
System.out.println("精确度-召回率曲线: " + prc.collect());
// 阈值
JavaRDD<Double> thresholds = precision.map(t -> Double.parseDouble(t._1().toString()));
// ROC 曲线
JavaRDD roc = metrics.roc().toJavaRDD();
System.out.println("ROC 曲线: " + roc.collect());
// AUPRC
System.out.println("精确度-召回率曲线下面积 = " + metrics.areaUnderPR());
// AUROC
System.out.println("ROC 曲线下面积 = " + metrics.areaUnderROC());
// 保存和加载模型
model.save(sc, "target/tmp/LogisticRegressionModel");
LogisticRegressionModel.load(sc, "target/tmp/LogisticRegressionModel");
Find full example code at "examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java" in the Spark repo.

多类分类

A multiclass classification 描述了一个分类问题,其中每个数据点有 $M \gt 2$ 个可能的标签($M=2$ 的情况是二元分类问题)。例如,将手写样本分类为数字 0 到 9,有 10 个可能的类别。

对于多类别指标,正例和负例的概念略有不同。预测和标签仍然可以是正的或负的,但它们必须在特定类别的上下文中进行考虑。每个标签和预测都采用多个类别中的一个值,因此它们对其特定类别而言被称为正例,对所有其他类别则被称为负例。因此,当预测和标签匹配时,就会发生真正的正例,而当预测和标签均不取某一特定类别的值时,就会发生真正的负例。根据这一约定,对于给定的数据样本,可以存在多个真正的负例。从以前定义的正标签和负标签扩展出假负例和假正例是直接的。

基于标签的指标

与仅有两个可能标签的二元分类相对,多类分类问题有许多可能的标签,因此引入了基于标签的指标的概念。准确率衡量所有标签的精确度 - 任何类别被正确预测的次数(真正例)与数据点数量的比率。按标签的精确度仅考虑一个类别,并衡量特定标签被正确预测的次数与该标签在输出中出现的次数的比率。

可用指标

定义类或标签,设置为

\[L = \{\ell_0, \ell_1, \ldots, \ell_{M-1} \}\]

真实的输出向量 $\mathbf{y}$ 由 $N$ 个元素组成

\[\mathbf{y}_0, \mathbf{y}_1, \ldots, \mathbf{y}_{N-1} \in L\]

多类预测算法生成一个包含 $N$ 个元素的预测向量 $\hat{\mathbf{y}}$

\[\hat{\mathbf{y}}_0, \hat{\mathbf{y}}_1, \ldots, \hat{\mathbf{y}}_{N-1} \in L\]

在本节中,一个修改过的德尔塔函数 $\hat{\delta}(x)$ 将会变得有用

\[\hat{\delta}(x) = \begin{cases}1 & \text{if $x = 0$}, \\ 0 & \text{otherwise}.\end{cases}\]
指标 定义
混淆矩阵 $C_{ij} = \sum_{k=0}^{N-1} \hat{\delta}(\mathbf{y}_k-\ell_i) \cdot \hat{\delta}(\hat{\mathbf{y}}_k - \ell_j)\\ \\ \left( \begin{array}{ccc} \sum_{k=0}^{N-1} \hat{\delta}(\mathbf{y}_k-\ell_1) \cdot \hat{\delta}(\hat{\mathbf{y}}_k - \ell_1) & \ldots & \sum_{k=0}^{N-1} \hat{\delta}(\mathbf{y}_k-\ell_1) \cdot \hat{\delta}(\hat{\mathbf{y}}_k - \ell_N) \\ \vdots & \ddots & \vdots \\ \sum_{k=0}^{N-1} \hat{\delta}(\mathbf{y}_k-\ell_N) \cdot \hat{\delta}(\hat{\mathbf{y}}_k - \ell_1) & \ldots & \sum_{k=0}^{N-1} \hat{\delta}(\mathbf{y}_k-\ell_N) \cdot \hat{\delta}(\hat{\mathbf{y}}_k - \ell_N) \end{array} \right)$
准确率 $ACC = \frac{TP}{TP + FP} = \frac{1}{N}\sum_{i=0}^{N-1} \hat{\delta}\left(\hat{\mathbf{y}}_i - \mathbf{y}_i\right)$
按标签计算的精确率 $PPV(\ell) = \frac{TP}{TP + FP} = \frac{\sum_{i=0}^{N-1} \hat{\delta}(\hat{\mathbf{y}}_i - \ell) \cdot \hat{\delta}(\mathbf{y}_i - \ell)} {\sum_{i=0}^{N-1} \hat{\delta}(\hat{\mathbf{y}}_i - \ell)}$
按标签计算的召回率 $TPR(\ell)=\frac{TP}{P} = \frac{\sum_{i=0}^{N-1} \hat{\delta}(\hat{\mathbf{y}}_i - \ell) \cdot \hat{\delta}(\mathbf{y}_i - \ell)} {\sum_{i=0}^{N-1} \hat{\delta}(\mathbf{y}_i - \ell)}$
按标签计算的F值 $F(\beta, \ell) = \left(1 + \beta^2\right) \cdot \left(\frac{PPV(\ell) \cdot TPR(\ell)} {\beta^2 \cdot PPV(\ell) + TPR(\ell)}\right)$
加权精确率 $PPV_{w}= \frac{1}{N} \sum\nolimits_{\ell \in L} PPV(\ell) \cdot \sum_{i=0}^{N-1} \hat{\delta}(\mathbf{y}_i-\ell)$
加权召回率 $TPR_{w}= \frac{1}{N} \sum\nolimits_{\ell \in L} TPR(\ell) \cdot \sum_{i=0}^{N-1} \hat{\delta}(\mathbf{y}_i-\ell)$
加权F值 $F_{w}(\beta)= \frac{1}{N} \sum\nolimits_{\ell \in L} F(\beta, \ell) \cdot \sum_{i=0}^{N-1} \hat{\delta}(\mathbf{y}_i-\ell)$

示例

The following code snippets illustrate how to load a sample dataset, train a multiclass classification algorithm on the data, and evaluate the performance of the algorithm by several multiclass classification evaluation metrics.

有关API的更多详情,请参阅 MulticlassMetrics Python文档

from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import MulticlassMetrics
# 以LIBSVM格式加载训练数据
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_multiclass_classification_data.txt")
# 将数据分为训练集 (60%) 和测试集 (40%)
training, test = data.randomSplit([0.6, 0.4], seed=11)
training.cache()
# 运行训练算法以构建模型
model = LogisticRegressionWithLBFGS.train(training, numClasses=3)
# 计算测试集上的原始分数
predictionAndLabels = test.map(lambda lp: (float(model.predict(lp.features)), lp.label))
# 实例化指标对象
metrics = MulticlassMetrics(predictionAndLabels)
# 整体统计信息
precision = metrics.precision(1.0)
recall = metrics.recall(1.0)
f1Score = metrics.fMeasure(1.0)
print("Summary Stats")
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)
# 按类别统计
labels = data.map(lambda lp: lp.label).distinct().collect()
for label in sorted(labels):
print("Class %s precision = %s" % (label, metrics.precision(label)))
print("Class %s recall = %s" % (label, metrics.recall(label)))
print("Class %s F1 Measure = %s" % (label, metrics.fMeasure(label, beta=1.0)))
# 加权统计
print("Weighted recall = %s" % metrics.weightedRecall)
print("Weighted precision = %s" % metrics.weightedPrecision)
print("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate)
Find full example code at "examples/src/main/python/mllib/multi_class_metrics_example.py" in the Spark repo.

有关API的详细信息,请参考 MulticlassMetrics Scala文档

import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
// 以LIBSVM格式加载训练数据
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_multiclass_classification_data.txt")
// 将数据分为训练集(60%)和测试集(40%)
val Array(training, test) = data.randomSplit(Array(0.6, 0.4), seed = 11L)
training.cache()
// 运行训练算法以构建模型
val model = new LogisticRegressionWithLBFGS()
.setNumClasses(3)
.run(training)
// 在测试集上计算原始分数
val predictionAndLabels = test.map { case LabeledPoint(label, features) =>
val prediction = model.predict(features)
(prediction, label)
}
// 实例化指标对象
val metrics = new MulticlassMetrics(predictionAndLabels)
// 混淆矩阵
println("混淆矩阵:")
println(metrics.confusionMatrix)
// 总体统计
val accuracy = metrics.accuracy
println("摘要统计")
println(s"准确率 = $accuracy")
// 按标签的精确度
val labels = metrics.labels
labels.foreach { l =>
println(s"精确度($l) = " + metrics.precision(l))
}
// 按标签的召回率
labels.foreach { l =>
println(s"召回率($l) = " + metrics.recall(l))
}
// 按标签的假阳性率
labels.foreach { l =>
println(s"假阳性率($l) = " + metrics.falsePositiveRate(l))
}
// 按标签的F值
labels.foreach { l =>
println(s"F1-分数($l) = " + metrics.fMeasure(l))
}
// 加权统计
println(s"加权精确度: ${metrics.weightedPrecision}")
println(s"加权召回率: ${metrics.weightedRecall}")
println(s"加权F1分数: ${metrics.weightedFMeasure}")
println(s"加权假阳性率: ${metrics.weightedFalsePositiveRate}")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/mllib/MulticlassMetricsExample.scala" in the Spark repo.

有关API的详细信息,请参阅 MulticlassMetrics Java文档

import scala.Tuple2;
import org.apache.spark.api.java.*;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS;
import org.apache.spark.mllib.evaluation.MulticlassMetrics;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.mllib.linalg.Matrix;
String path = "data/mllib/sample_multiclass_classification_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD();
// 将初始 RDD 分割成两个... [60% 训练数据,40% 测试数据]。
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.6, 0.4}, 11L);
JavaRDD<LabeledPoint> training = splits[0].cache();
JavaRDD<LabeledPoint> test = splits[1];
// 运行训练算法来构建模型。
LogisticRegressionModel model = new LogisticRegressionWithLBFGS()
.setNumClasses(3)
.run(training.rdd());
// 计算测试集上的原始分数。
JavaPairRDD<Object, Object> predictionAndLabels = test.mapToPair(p ->
new Tuple2<>(model.predict(p.features()), p.label()));
// 获取评估指标。
MulticlassMetrics metrics = new MulticlassMetrics(predictionAndLabels.rdd());
// 混淆矩阵
Matrix confusion = metrics.confusionMatrix();
System.out.println("Confusion matrix: \n" + confusion);
// 总体统计
System.out.println("Accuracy = " + metrics.accuracy());
// 按标签统计
for (int i = 0; i < metrics.labels().length; i++) {
System.out.format("Class %f precision = %f\n", metrics.labels()[i],metrics.precision(
metrics.labels()[i]));
System.out.format("Class %f recall = %f\n", metrics.labels()[i], metrics.recall(
metrics.labels()[i]));
System.out.format("Class %f F1 score = %f\n", metrics.labels()[i], metrics.fMeasure(
metrics.labels()[i]));
}
// 加权统计
System.out.format("Weighted precision = %f\n", metrics.weightedPrecision());
System.out.format("Weighted recall = %f\n", metrics.weightedRecall());
System.out.format("Weighted F1 score = %f\n", metrics.weightedFMeasure());
System.out.format("Weighted false positive rate = %f\n", metrics.weightedFalsePositiveRate());
// 保存和加载模型
model.save(sc, "target/tmp/LogisticRegressionModel");
LogisticRegressionModel sameModel = LogisticRegressionModel.load(sc,
"target/tmp/LogisticRegressionModel");
Find full example code at "examples/src/main/java/org/apache/spark/examples/mllib/JavaMulticlassClassificationMetricsExample.java" in the Spark repo.

多标签分类

一个 多标签分类 问题涉及将数据集中每个样本映射到一组类标签。在这种类型的分类问题中,标签并不互斥。例如,当将一组新闻文章分类到主题时,单篇文章可能既是科学类又是政治类。

由于标签不是互斥的,因此预测值和真实标签现在是标签 集合 的向量,而不是标签的向量。因此,多标签指标将精确度、召回率等基本概念扩展到集合操作中。例如,对于给定类别,真正例现在发生在该类别存在于预测集合中并且存在于真实标签集合中时,对于特定数据点。

可用指标

在这里我们定义一个文档集合 $D$,包含 $N$ 个文档

\[D = \left\{d_0, d_1, ..., d_{N-1}\right\}\]

定义 $L_0, L_1, …, L_{N-1}$ 为一组标签集,$P_0, P_1, …, P_{N-1}$ 为一组预测集,其中 $L_i$ 和 $P_i$ 分别是与文档 $d_i$ 相对应的标签集和预测集。

所有唯一标签的集合为

\[L = \bigcup_{k=0}^{N-1} L_k\]

以下对集合 $A$ 上指示函数 $I_A(x)$ 的定义是必要的

\[I_A(x) = \begin{cases}1 & \text{if $x \in A$}, \\ 0 & \text{otherwise}.\end{cases}\]
指标 定义
精确率 $\frac{1}{N} \sum_{i=0}^{N-1} \frac{\left|P_i \cap L_i\right|}{\left|P_i\right|}$
召回率 $\frac{1}{N} \sum_{i=0}^{N-1} \frac{\left|L_i \cap P_i\right|}{\left|L_i\right|}$
准确率 $\frac{1}{N} \sum_{i=0}^{N - 1} \frac{\left|L_i \cap P_i \right|} {\left|L_i\right| + \left|P_i\right| - \left|L_i \cap P_i \right|}$
按标签的精确率 $PPV(\ell)=\frac{TP}{TP + FP}= \frac{\sum_{i=0}^{N-1} I_{P_i}(\ell) \cdot I_{L_i}(\ell)} {\sum_{i=0}^{N-1} I_{P_i}(\ell)}$
按标签的召回率 $TPR(\ell)=\frac{TP}{P}= \frac{\sum_{i=0}^{N-1} I_{P_i}(\ell) \cdot I_{L_i}(\ell)} {\sum_{i=0}^{N-1} I_{L_i}(\ell)}$
按标签的F1测量 $F1(\ell) = 2 \cdot \left(\frac{PPV(\ell) \cdot TPR(\ell)} {PPV(\ell) + TPR(\ell)}\right)$
汉明损失 $\frac{1}{N \cdot \left|L\right|} \sum_{i=0}^{N - 1} \left|L_i\right| + \left|P_i\right| - 2\left|L_i \cap P_i\right|$
子集准确率 $\frac{1}{N} \sum_{i=0}^{N-1} I_{\{L_i\}}(P_i)$
F1测量 $\frac{1}{N} \sum_{i=0}^{N-1} 2 \frac{\left|P_i \cap L_i\right|}{\left|P_i\right| \cdot \left|L_i\right|}$
微观精确率 $\frac{TP}{TP + FP}=\frac{\sum_{i=0}^{N-1} \left|P_i \cap L_i\right|} {\sum_{i=0}^{N-1} \left|P_i \cap L_i\right| + \sum_{i=0}^{N-1} \left|P_i - L_i\right|}$
微观召回率 $\frac{TP}{TP + FN}=\frac{\sum_{i=0}^{N-1} \left|P_i \cap L_i\right|} {\sum_{i=0}^{N-1} \left|P_i \cap L_i\right| + \sum_{i=0}^{N-1} \left|L_i - P_i\right|}$
微观F1测量 $2 \cdot \frac{TP}{2 \cdot TP + FP + FN}=2 \cdot \frac{\sum_{i=0}^{N-1} \left|P_i \cap L_i\right|}{2 \cdot \sum_{i=0}^{N-1} \left|P_i \cap L_i\right| + \sum_{i=0}^{N-1} \left|L_i - P_i\right| + \sum_{i=0}^{N-1} \left|P_i - L_i\right|}$

示例

以下代码示例说明了如何评估多标签分类器的性能。示例使用了下面显示的虚假预测和标签数据进行多标签分类。

文档预测:

预测类别:

真实类别:

有关API的更多详细信息,请参阅 MultilabelMetrics Python文档

from pyspark.mllib.evaluation import MultilabelMetrics
scoreAndLabels = sc.parallelize([
([0.0, 1.0], [0.0, 2.0]),
([0.0, 2.0], [0.0, 1.0]),
([], [0.0]),
([2.0], [2.0]),
([2.0, 0.0], [2.0, 0.0]),
([0.0, 1.0, 2.0], [0.0, 1.0]),
([1.0], [1.0, 2.0])])
# 实例化度量对象
metrics = MultilabelMetrics(scoreAndLabels)
# 概要统计
print("召回率 = %s" % metrics.recall())
print("精确度 = %s" % metrics.precision())
print("F1 值 = %s" % metrics.f1Measure())
print("准确率 = %s" % metrics.accuracy)
# 单个标签统计
labels = scoreAndLabels.flatMap(lambda x: x[1]).distinct().collect()
for label in labels:
print("类别 %s 精确度 = %s" % (label, metrics.precision(label)))
print("类别 %s 召回率 = %s" % (label, metrics.recall(label)))
print("类别 %s F1 值 = %s" % (label, metrics.f1Measure(label)))
# 微观统计
print("微观精确度 = %s" % metrics.microPrecision)
print("微观召回率 = %s" % metrics.microRecall)
print("微观 F1 值 = %s" % metrics.microF1Measure)
# 汉明损失
print("汉明损失 = %s" % metrics.hammingLoss)
# 子集准确率
print("子集准确率 = %s" % metrics.subsetAccuracy)
Find full example code at "examples/src/main/python/mllib/multi_label_metrics_example.py" in the Spark repo.

有关API的详细信息,请参阅 MultilabelMetrics Scala文档

import org.apache.spark.mllib.evaluation.MultilabelMetrics
import org.apache.spark.rdd.RDD
val scoreAndLabels: RDD[(Array[Double], Array[Double])] = sc.parallelize(
Seq((Array(0.0, 1.0), Array(0.0, 2.0)),
(Array(0.0, 2.0), Array(0.0, 1.0)),
(Array.empty[Double], Array(0.0)),
(Array(2.0), Array(2.0)),
(Array(2.0, 0.0), Array(2.0, 0.0)),
(Array(0.0, 1.0, 2.0), Array(0.0, 1.0)),
(Array(1.0), Array(1.0, 2.0))), 2)
// 实例化指标对象
val metrics = new MultilabelMetrics(scoreAndLabels)
// 总结统计
println(s"召回率 = ${metrics.recall}")
println(s"精确率 = ${metrics.precision}")
println(s"F1 评分 = ${metrics.f1Measure}")
println(s"准确率 = ${metrics.accuracy}")
// 各个标签统计
metrics.labels.foreach(label =>
println(s"类 $label 精确率 = ${metrics.precision(label)}"))
metrics.labels.foreach(label => println(s"类 $label 召回率 = ${metrics.recall(label)}"))
metrics.labels.foreach(label => println(s"类 $label F1 分数 = ${metrics.f1Measure(label)}"))
// 微观统计
println(s"微观召回率 = ${metrics.microRecall}")
println(s"微观精确率 = ${metrics.microPrecision}")
println(s"微观 F1 评分 = ${metrics.microF1Measure}")
// 汉明损失
println(s"汉明损失 = ${metrics.hammingLoss}")
// 子集准确率
println(s"子集准确率 = ${metrics.subsetAccuracy}")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/mllib/MultiLabelMetricsExample.scala" in the Spark repo.

有关API的详细信息,请参阅 MultilabelMetrics Java文档

import java.util.Arrays;
import java.util.List;
import scala.Tuple2;
import org.apache.spark.api.java.*;
import org.apache.spark.mllib.evaluation.MultilabelMetrics;
import org.apache.spark.SparkConf;
List<Tuple2double[], double[]>> data = Arrays.asList(
new Tuple2<>(new double[]{0.0, 1.0}, new double[]{0.0, 2.0}),
new Tuple2<>(new double[]{0.0, 2.0}, new double[]{0.0, 1.0}),
new Tuple2<>(new double[]{}, new double[]{0.0}),
new Tuple2<>(new double[]{2.0}, new double[]{2.0}),
new Tuple2<>(new double[]{2.0, 0.0}, new double[]{2.0, 0.0}),
new Tuple2<>(new double[]{0.0, 1.0, 2.0}, new double[]{0.0, 1.0}),
new Tuple2<>(new double[]{1.0}, new double[]{1.0, 2.0})
);
JavaRDD<Tuple2<double[], double[]>> scoreAndLabels = sc.parallelize(data);
// 实例化指标对象
MultilabelMetrics metrics = new MultilabelMetrics(scoreAndLabels.rdd());
// 汇总统计
System.out.format("召回率 = %f\n", metrics.recall());
System.out.format("精确度 = %f\n", metrics.precision());
System.out.format("F1 值 = %f\n", metrics.f1Measure());
System.out.format("准确率 = %f\n", metrics.accuracy());
// 按标签统计
for (int i = 0; i < metrics.labels().length - 1; i++) {
System.out.format("类别 %1.1f 精确度 = %f\n", metrics.labels()[i], metrics.precision(
metrics.labels()[i]));
System.out.format("类别 %1.1f 召回率 = %f\n", metrics.labels()[i], metrics.recall(
metrics.labels()[i]));
System.out.format("类别 %1.1f F1 分数 = %f\n", metrics.labels()[i], metrics.f1Measure(
metrics.labels()[i]));
}
// 微观统计
System.out.format("微观召回率 = %f\n", metrics.microRecall());
System.out.format("微观精确度 = %f\n", metrics.microPrecision());
System.out.format("微观 F1 值 = %f\n", metrics.microF1Measure());
// 汉明损失
System.out.format("汉明损失 = %f\n", metrics.hammingLoss());
// 子集准确率
System.out.format("子集准确率 = %f\n", metrics.subsetAccuracy());
Find full example code at "examples/src/main/java/org/apache/spark/examples/mllib/JavaMultiLabelClassificationMetricsExample.java" in the Spark repo.

排名系统

排名算法的作用(通常被认为是一个 推荐系统 )是根据一些训练数据返回给用户一组相关的项目或文档。相关性的定义可能会有所不同,并且通常是特定于应用的。排名系统指标旨在量化这些排名或推荐在不同上下文中的有效性。一些指标将一组推荐文档与一个真实的相关文档集合进行比较,而其他指标可能明确地结合数值评级。

可用的指标

一个排名系统通常处理一组 $M$ 用户

\[U = \left\{u_0, u_1, ..., u_{M-1}\right\}\]

每个用户 ($u_i$) 具有一组 $N_i$ 相关的真实文档

\[D_i = \left\{d_0, d_1, ..., d_{N_i-1}\right\}\]

以及一份按相关性递减顺序排列的 $Q_i$ 推荐文档列表

\[R_i = \left[r_0, r_1, ..., r_{Q_i-1}\right]\]

排名系统的目标是为每个用户生成最相关的文档集合。集合的相关性和算法的有效性可以使用下面列出的指标进行测量。

有必要定义一个函数,该函数接受一个推荐文档和一组相关的真实文档,并返回推荐文档的相关性分数。

\[rel_D(r) = \begin{cases}1 & \text{if $r \in D$}, \\ 0 & \text{otherwise}.\end{cases}\]
指标 定义 备注
k 时的准确率 $p(k)=\frac{1}{M} \sum_{i=0}^{M-1} {\frac{1}{k} \sum_{j=0}^{\text{min}(Q_i, k) - 1} rel_{D_i}(R_i(j))}$ k 时的准确率 是一个衡量 前 k 个推荐文档中有多少在真实相关文档集合中的指标,平均计算所有用户的情况。此指标不考虑推荐的顺序。
平均精确度 $MAP=\frac{1}{M} \sum_{i=0}^{M-1} {\frac{1}{N_i} \sum_{j=0}^{Q_i-1} \frac{rel_{D_i}(R_i(j))}{j + 1}}$ MAP 是一个衡量 推荐文档中有多少在真实相关文档集合中的指标,推荐的顺序会被考虑在内(即,会对高度相关文档施加更高的惩罚)。
标准化折扣累积增益 $NDCG(k)=\frac{1}{M} \sum_{i=0}^{M-1} {\frac{1}{IDCG(D_i, k)}\sum_{j=0}^{n-1} \frac{rel_{D_i}(R_i(j))}{\text{log}(j+2)}} \\ \text{其中} \\ \hspace{5 mm} n = \text{min}\left(\text{max}\left(Q_i, N_i\right),k\right) \\ \hspace{5 mm} IDCG(D, k) = \sum_{j=0}^{\text{min}(\left|D\right|, k) - 1} \frac{1}{\text{log}(j+2)}$ k 时的 NDCG 是一个 衡量前 k 个推荐文档中有多少在真实相关文档集合中的指标,平均计算所有用户的情况。与 k 时的准确率不同,此指标考虑推荐的顺序 (假设文档按相关性递减的顺序排列)。

示例

以下代码片段说明了如何加载一个示例数据集,使用交替最小二乘推荐模型对数据进行训练,并通过几个排名指标评估推荐系统的性能。下面提供了方法论的简要总结。

MovieLens 评分在 1-5 的范围内:

因此,如果预测评分低于3,我们不应该推荐一部电影。 为了将评分映射到信心分数,我们使用:

这个映射意味着未观察到的条目通常介于“可以接受”和“相当糟糕”之间。在这个扩展的非正权重世界中,0的语义是“与从未交互过完全相同。”

有关API的更多详细信息,请参阅 RegressionMetrics Python文档 RankingMetrics Python文档

from pyspark.mllib.recommendation import ALS, Rating
from pyspark.mllib.evaluation import RegressionMetrics
# 读取评分数据
lines = sc.textFile("data/mllib/sample_movielens_data.txt")
def parseLine(line):
fields = line.split("::")
return Rating(int(fields[0]), int(fields[1]), float(fields[2]) - 2.5)
ratings = lines.map(lambda r: parseLine(r))
# 训练模型以预测用户-产品评分
model = ALS.train(ratings, 10, 10, 0.01)
# 对所有现有的用户-产品对获取预测评分
testData = ratings.map(lambda p: (p.user, p.product))
predictions = model.predictAll(testData).map(lambda r: ((r.user, r.product), r.rating))
ratingsTuple = ratings.map(lambda r: ((r.user, r.product), r.rating))
scoreAndLabels = predictions.join(ratingsTuple).map(lambda tup: tup[1])
# 实例化回归指标以比较预测和实际评分
metrics = RegressionMetrics(scoreAndLabels)
# 均方根误差
print("RMSE = %s" % metrics.rootMeanSquaredError)
# R平方
print("R-squared = %s" % metrics.r2)
Find full example code at "examples/src/main/python/mllib/ranking_metrics_example.py" in the Spark repo.

请参考 RegressionMetrics Scala 文档 RankingMetrics Scala 文档 获取有关 API 的详细信息。

import org.apache.spark.mllib.evaluation.{RankingMetrics, RegressionMetrics}
import org.apache.spark.mllib.recommendation.{ALS, Rating}
// 读取评分数据
val ratings = spark.read.textFile("data/mllib/sample_movielens_data.txt").rdd.map { line =>
val fields = line.split("::")
Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble - 2.5)
}.cache()
// 将评分映射为1或0,1表示应推荐的电影
val binarizedRatings = ratings.map(r => Rating(r.user, r.product,
if (r.rating > 0) 1.0 else 0.0)).cache()
// 总结评分
val numRatings = ratings.count()
val numUsers = ratings.map(_.user).distinct().count()
val numMovies = ratings.map(_.product).distinct().count()
println(s"从 $numUsers 用户获取了 $numRatings 条评分,共 $numMovies 部电影.")
// 构建模型
val numIterations = 10
val rank = 10
val lambda = 0.01
val model = ALS.train(ratings, rank, numIterations, lambda)
// 定义一个函数,将评分缩放到0到1之间
def scaledRating(r: Rating): Rating = {
val scaledRating = math.max(math.min(r.rating, 1.0), 0.0)
Rating(r.user, r.product, scaledRating)
}
// 获取每个用户的前十个推荐并将其缩放至[0, 1]
val userRecommended = model.recommendProductsForUsers(10).map { case (user, recs) =>
(user, recs.map(scaledRating))
}
// 假设用户评分3或更高的电影(映射为1)是相关文档
// 和前十个最相关的文档进行比较
val userMovies = binarizedRatings.groupBy(_.user)
val relevantDocuments = userMovies.join(userRecommended).map { case (user, (actual,
predictions)) =>
(predictions.map(_.product), actual.filter(_.rating > 0.0).map(_.product).toArray)
}
// 实例化指标对象
val metrics = new RankingMetrics(relevantDocuments)
// K的精度
Array(1, 3, 5).foreach { k =>
println(s"在 $k 时的精度 = ${metrics.precisionAt(k)}")
}
// 平均精度
println(s"平均精度 = ${metrics.meanAveragePrecision}")
// K的平均精度
println(s"在2时的平均精度 = ${metrics.meanAveragePrecisionAt(2)}")
// 规范化折扣累计增益
Array(1, 3, 5).foreach { k =>
println(s"在 $k 时的NDCG = ${metrics.ndcgAt(k)}")
}
// K的召回率
Array(1, 3, 5).foreach { k =>
println(s"在 $k 时的召回率 = ${metrics.recallAt(k)}")
}
// 获取每个数据点的预测
val allPredictions = model.predict(ratings.map(r => (r.user, r.product))).map(r => ((r.user,
r.product), r.rating))
val allRatings = ratings.map(r => ((r.user, r.product), r.rating))
val predictionsAndLabels = allPredictions.join(allRatings).map { case ((user, product),
(predicted, actual)) =>
(predicted, actual)
}
// 使用回归指标获取RMSE
val regressionMetrics = new RegressionMetrics(predictionsAndLabels)
println(s"RMSE = ${regressionMetrics.rootMeanSquaredError}")
// R平方
println(s"R平方 = ${regressionMetrics.r2}")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/mllib/RankingMetricsExample.scala" in the Spark repo.

请参阅 RegressionMetrics Java 文档 RankingMetrics Java 文档 以获得有关 API 的详细信息。

import java.util.*;
import scala.Tuple2;
import org.apache.spark.api.java.*;
import org.apache.spark.mllib.evaluation.RegressionMetrics;
import org.apache.spark.mllib.evaluation.RankingMetrics;
import org.apache.spark.mllib.recommendation.ALS;
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;
String path = "data/mllib/sample_movielens_data.txt";
JavaRDD<String> data = sc.textFile(path);
JavaRDD<Rating> ratings = data.map(line -> {
String[] parts = line.split("::");
return new Rating(Integer.parseInt(parts[0]), Integer.parseInt(parts[1]), Double
.parseDouble(parts[2]) - 2.5);
});
ratings.cache();
// 训练一个 ALS 模型
MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), 10, 10, 0.01);
// 为每个用户获取前 10 个推荐并将评分从 0 缩放到 1
JavaRDD<Tuple2<Object, Rating[]>> userRecs = model.recommendProductsForUsers(10).toJavaRDD();
JavaRDD<Tuple2<Object, Rating[]>> userRecsScaled = userRecs.map(t -> {
Rating[] scaledRatings = new Rating[t._2().length];
for (int i = 0; i < scaledRatings.length; i++) {
double newRating = Math.max(Math.min(t._2()[i].rating(), 1.0), 0.0);
scaledRatings[i] = new Rating(t._2()[i].user(), t._2()[i].product(), newRating);
}
return new Tuple2<>(t._1(), scaledRatings);
});
JavaPairRDD<Object, Rating[]> userRecommended = JavaPairRDD.fromJavaRDD(userRecsScaled);
// 将评分映射为 1 或 0,1 表示应该推荐的电影
JavaRDD<Rating> binarizedRatings = ratings.map(r -> {
double binaryRating;
if (r.rating() > 0.0) {
binaryRating = 1.0;
} else {
binaryRating = 0.0;
}
return new Rating(r.user(), r.product(), binaryRating);
});
// 按共同用户分组评分
JavaPairRDD<Object, Iterable<Rating>> userMovies = binarizedRatings.groupBy(Rating::user);
// 从所有用户评分中获取真实相关文档
JavaPairRDD<Object, ListInteger>> userMoviesList = userMovies.mapValues(docs -> {
List<Integer> products = new ArrayList<>();
for (Rating r : docs) {
if (r.rating() > 0.0) {
products.add(r.product());
}
}
return products;
});
// 从每个推荐中提取产品 ID
JavaPairRDD<Object, List<Integer>> userRecommendedList = userRecommended.mapValues(docs -> {
List<Integer> products = new ArrayList<>();
for (Rating r : docs) {
products.add(r.product());
}
return products;
});
JavaRDD<Tuple2<List<Integer>, List<Integer>>> relevantDocs = userMoviesList.join(
userRecommendedList).values();
// 实例化度量对象
RankingMetrics<Integer> metrics = RankingMetrics.of(relevantDocs);
// 精确度,NDCG 和召回率在 k
Integer[] kVector = {1, 3, 5};
for (Integer k : kVector) {
System.out.format("Precision at %d = %f\n", k, metrics.precisionAt(k));
System.out.format("NDCG at %d = %f\n", k, metrics.ndcgAt(k));
System.out.format("Recall at %d = %f\n", k, metrics.recallAt(k));
}
// 平均精度
System.out.format("Mean average precision = %f\n", metrics.meanAveragePrecision());
// k 的平均精度
System.out.format("Mean average precision at 2 = %f\n", metrics.meanAveragePrecisionAt(2));
// 使用数值评分和回归度量评估模型
JavaRDD<Tuple2<Object, Object>> userProducts =
ratings.map(r -> new Tuple2<>(r.user(), r.product()));
JavaPairRDD<Tuple2<Integer, Integer>, Object> predictions = JavaPairRDD.fromJavaRDD(
model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map(r ->
new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating())));
JavaRDD<Tuple2<Object, Object>> ratesAndPreds =
JavaPairRDD.fromJavaRDD(ratings.map(r ->
new Tuple2<Tuple2<Integer, Integer>, Object>(
new Tuple2<>(r.user(), r.product()),
r.rating())
)).join(predictions).values();
// 创建回归度量对象
RegressionMetrics regressionMetrics = new RegressionMetrics(ratesAndPreds.rdd());
// 均方根误差
System.out.format("RMSE = %f\n", regressionMetrics.rootMeanSquaredError());
// R 平方
System.out.format("R-squared = %f\n", regressionMetrics.r2());
Find full example code at "examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java" in the Spark repo.

回归模型评估

回归分析 用于根据多个自变量预测连续输出变量。

可用指标

指标 定义
均方误差 (MSE) $MSE = \frac{\sum_{i=0}^{N-1} (\mathbf{y}_i - \hat{\mathbf{y}}_i)^2}{N}$
均方根误差 (RMSE) $RMSE = \sqrt{\frac{\sum_{i=0}^{N-1} (\mathbf{y}_i - \hat{\mathbf{y}}_i)^2}{N}}$
平均绝对误差 (MAE) $MAE=\frac{1}{N}\sum_{i=0}^{N-1} \left|\mathbf{y}_i - \hat{\mathbf{y}}_i\right|$
决定系数 $(R^2)$ $R^2=1 - \frac{MSE}{\text{VAR}(\mathbf{y}) \cdot (N-1)}=1-\frac{\sum_{i=0}^{N-1} (\mathbf{y}_i - \hat{\mathbf{y}}_i)^2}{\sum_{i=0}^{N-1}(\mathbf{y}_i-\bar{\mathbf{y}})^2}$
解释方差 $1 - \frac{\text{VAR}(\mathbf{y} - \mathbf{\hat{y}})}{\text{VAR}(\mathbf{y})}$