基本统计

\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \]

目录

相关性

计算两组数据之间的相关性是统计学中一个常见的操作。在 spark.ml 中,我们提供了灵活性来计算多个系列之间的成对相关性。目前支持的相关性方法是皮尔逊相关性和斯皮尔曼相关性。

Correlation 计算输入向量数据集的相关性矩阵,使用指定的方法。 输出将是一个包含向量列的相关性矩阵的DataFrame。

from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation
data = [(Vectors.sparse(4, [(0, 1.0), (3, -2.0)]),),
(Vectors.dense([4.0, 5.0, 0.0, 3.0]),),
(Vectors.dense([6.0, 7.0, 0.0, 8.0]),),
(Vectors.sparse(4, [(0, 9.0), (3, 1.0)]),)]
df = spark.createDataFrame(data, ["features"])
r1 = Correlation.corr(df, "features").head()
print("皮尔逊相关矩阵:\n" + str(r1[0]))
r2 = Correlation.corr(df, "features", "spearman").head()
print("斯皮尔曼相关矩阵:\n" + str(r2[0]))
Find full example code at "examples/src/main/python/ml/correlation_example.py" in the Spark repo.

Correlation 计算输入向量数据集的相关性矩阵,使用指定的方法。 输出将是一个包含向量列的相关性矩阵的DataFrame。

import org.apache.spark.ml.linalg.{Matrix, Vectors}
import org.apache.spark.ml.stat.Correlation
import org.apache.spark.sql.Row
val data = Seq(
Vectors.sparse(4, Seq((0, 1.0), (3, -2.0))),
Vectors.dense(4.0, 5.0, 0.0, 3.0),
Vectors.dense(6.0, 7.0, 0.0, 8.0),
Vectors.sparse(4, Seq((0, 9.0), (3, 1.0)))
)
val df = data.map(Tuple1.apply).toDF("features")
val Row(coeff1: Matrix) = Correlation.corr(df, "features").head
println(s"皮尔逊相关矩阵:\n $coeff1")
val Row(coeff2: Matrix) = Correlation.corr(df, "features", "spearman").head
println(s"斯皮尔曼相关矩阵:\n $coeff2")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/CorrelationExample.scala" in the Spark repo.

Correlation 计算输入向量的数据集的相关矩阵,使用指定的方法。输出将是一个包含向量列相关矩阵的DataFrame。

import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.stat.Correlation;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
List<Row> data = Arrays.asList(
RowFactory.create(Vectors.sparse(4, new int[]{0, 3}, new double[]{1.0, -2.0})),
RowFactory.create(Vectors.dense(4.0, 5.0, 0.0, 3.0)),
RowFactory.create(Vectors.dense(6.0, 7.0, 0.0, 8.0)),
RowFactory.create(Vectors.sparse(4, new int[]{0, 3}, new double[]{9.0, 1.0}))
);
StructType schema = new StructType(new StructField[]{
new StructField("特征", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);
Row r1 = Correlation.corr(df, "特征").head();
System.out.println("皮尔逊相关矩阵:\n" + r1.get(0).toString());
Row r2 = Correlation.corr(df, "特征", "斯皮尔曼").head();
System.out.println("斯皮尔曼相关矩阵:\n" + r2.get(0).toString());
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaCorrelationExample.java" in the Spark repo.

假设检验

假设检验是统计学中的一种强大工具,用于确定结果是否具有统计学意义,以及该结果是否是偶然发生的。 spark.ml 目前支持用于独立性的皮尔逊卡方( $\chi^2$)检验。

卡方检验

ChiSquareTest 对每个特征与标签进行 Pearson 独立性检验。 对于每个特征,(特征,标签)对被转换为一个列联矩阵,并计算卡方统计量。所有标签和特征值必须是分类的。

有关API的详细信息,请参阅 ChiSquareTest Python 文档

from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import ChiSquareTest
data = [(0.0, Vectors.dense(0.5, 10.0)),
(0.0, Vectors.dense(1.5, 20.0)),
(1.0, Vectors.dense(1.5, 30.0)),
(0.0, Vectors.dense(3.5, 30.0)),
(0.0, Vectors.dense(3.5, 40.0)),
(1.0, Vectors.dense(3.5, 40.0))]
df = spark.createDataFrame(data, ["label", "features"])
r = ChiSquareTest.test(df, "features", "label").head()
print("p值: " + str(r.pValues))
print("自由度: " + str(r.degreesOfFreedom))
print("统计量: " + str(r.statistics))
Find full example code at "examples/src/main/python/ml/chi_square_test_example.py" in the Spark repo.

请参阅 ChiSquareTest 的Scala文档 以获取有关API的详细信息。

import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.stat.ChiSquareTest
val data = Seq(
(0.0, Vectors.dense(0.5, 10.0)),
(0.0, Vectors.dense(1.5, 20.0)),
(1.0, Vectors.dense(1.5, 30.0)),
(0.0, Vectors.dense(3.5, 30.0)),
(0.0, Vectors.dense(3.5, 40.0)),
(1.0, Vectors.dense(3.5, 40.0))
)
val df = data.toDF("label", "features")
val chi = ChiSquareTest.test(df, "features", "label").head
println(s"pValues = ${chi.getAs[Vector](0)}")
println(s"自由度 ${chi.getSeq[Int](1).mkString("[", ",", "]")}")
println(s"统计量 ${chi.getAs[Vector](2)}")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/ChiSquareTestExample.scala" in the Spark repo.

有关API的详细信息,请参阅 ChiSquareTest Java文档

import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.stat.ChiSquareTest;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
List<Row> data = Arrays.asList(
RowFactory.create(0.0, Vectors.dense(0.5, 10.0)),
RowFactory.create(0.0, Vectors.dense(1.5, 20.0)),
RowFactory.create(1.0, Vectors.dense(1.5, 30.0)),
RowFactory.create(0.0, Vectors.dense(3.5, 30.0)),
RowFactory.create(0.0, Vectors.dense(3.5, 40.0)),
RowFactory.create(1.0, Vectors.dense(3.5, 40.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);
Row r = ChiSquareTest.test(df, "features", "label").head();
System.out.println("pValues: " + r.get(0).toString());
System.out.println("degreesOfFreedom: " + r.getList(1).toString());
System.out.println("statistics: " + r.get(2).toString());
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaChiSquareTestExample.java" in the Spark repo.

总结器

我们通过 Summarizer 提供 Dataframe 的向量列摘要统计。 可用的指标包括按列计算的最大值、最小值、均值、总和、方差、标准差和非零值的数量,以及总计数。

有关API的详细信息,请参阅 Summarizer Python文档

from pyspark.ml.stat import Summarizer
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
df = sc.parallelize([Row(weight=1.0, features=Vectors.dense(1.0, 1.0, 1.0)),
Row(weight=0.0, features=Vectors.dense(1.0, 2.0, 3.0))]).toDF()
# 创建多个指标的汇总器 "mean" 和 "count"
summarizer = Summarizer.metrics("mean", "count")
# 计算带权重的多个指标的统计信息
df.select(summarizer.summary(df.features, df.weight)).show(truncate=False)
# 计算不带权重的多个指标的统计信息
df.select(summarizer.summary(df.features)).show(truncate=False)
# 计算带权重的单个指标 "mean" 的统计信息
df.select(Summarizer.mean(df.features, df.weight)).show(truncate=False)
# 计算不带权重的单个指标 "mean" 的统计信息
df.select(Summarizer.mean(df.features)).show(truncate=False)
Find full example code at "examples/src/main/python/ml/summarizer_example.py" in the Spark repo.

以下示例演示了如何使用 Summarizer 计算输入数据框的向量列的均值和方差,包括带权重列和不带权重列的情况。

import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.stat.Summarizer
val data = Seq(
(Vectors.dense(2.0, 3.0, 5.0), 1.0),
(Vectors.dense(4.0, 6.0, 7.0), 2.0)
)
val df = data.toDF("特征", "权重")
val (meanVal, varianceVal) = df.select(metrics("均值", "方差")
.summary($"特征", $"权重").as("摘要"))
.select("摘要.均值", "摘要.方差")
.as[(Vector, Vector)].first()
println(s"带权重: 均值 = ${meanVal}, 方差 = ${varianceVal}")
val (meanVal2, varianceVal2) = df.select(mean($"特征"), variance($"特征"))
.as[(Vector, Vector)].first()
println(s"无权重: 均值 = ${meanVal2}, 和 = ${varianceVal2}")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/SummarizerExample.scala" in the Spark repo.

以下示例演示了使用 Summarizer 来计算输入数据框的向量列的均值和方差,包括和不包括权重列的情况。

import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.stat.Summarizer;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(Vectors.dense(2.0, 3.0, 5.0), 1.0),
RowFactory.create(Vectors.dense(4.0, 6.0, 7.0), 2.0)
);
StructType schema = new StructType(new StructField[]{
new StructField("features", new VectorUDT(), false, Metadata.empty()),
new StructField("weight", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
Row result1 = df.select(Summarizer.metrics("mean", "variance")
.summary(new Column("features"), new Column("weight")).as("summary"))
.select("summary.mean", "summary.variance").first();
System.out.println("带权重: 平均值 = " + result1.<Vector>getAs(0).toString() +
", 方差 = " + result1.<Vector>getAs(1).toString());
Row result2 = df.select(
Summarizer.mean(new Column("features")),
Summarizer.variance(new Column("features"))
).first();
System.out.println("不带权重: 平均值 = " + result2.<Vector>getAs(0).toString() +
", 方差 = " + result2.<Vector>getAs(1).toString());
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaSummarizerExample.java" in the Spark repo.