机器学习管道

\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \]

在本节中,我们介绍 机器学习管道 的概念。 机器学习管道提供了一组统一的高级API,这些API构建在 数据框架 之上,帮助用户创建和调整实际的 机器学习管道。

目录

管道中的主要概念

MLlib 标准化了机器学习算法的 API,以便更容易将多个算法组合成单个管道或工作流。 本节涵盖了 Pipelines API 引入的关键概念,其中管道概念主要受到 scikit-learn 项目的启发。

数据框

机器学习可以应用于多种数据类型,例如向量、文本、图像和结构化数据。 这个API采用了来自Spark SQL的 DataFrame 以支持多种数据类型。

DataFrame 支持许多基本和结构化类型;请参阅 Spark SQL 数据类型参考 获取支持的类型列表。 除了 Spark SQL 指南中列出的类型, DataFrame 还可以使用 ML Vector 类型。

一个 DataFrame 可以从一个普通的 RDD 隐式或显式地创建。请参见下面的代码示例和 Spark SQL 编程指南 以获取示例。

在一个 DataFrame 中,列是有名称的。下面的代码示例使用了“text”、“features”和“label”等名称。

管道组件

变压器

A Transformer 是一个包含特征变换器和学习模型的抽象。 从技术上讲, Transformer 实现了一个方法 transform() ,该方法将一个 DataFrame 转换为 另一个,通常是通过附加一个或多个列。 例如:

估计量

一个 Estimator 抽象了学习算法或任何适合或在数据上训练的算法的概念。技术上,一个 Estimator 实现了一个方法 fit() ,该方法接受一个 DataFrame 并生成一个 Model ,该模型是一个 Transformer 。例如,像 LogisticRegression 这样的学习算法是一个 Estimator ,调用 fit() 会训练一个 LogisticRegressionModel ,该模型是一个 Model ,因此也是一个 Transformer

管道组件的属性

Transformer.transform() Estimator.fit() 都是无状态的。未来可能会通过其他概念支持有状态算法。

每个 Transformer Estimator 的实例都有一个唯一的 ID,这在指定参数时很有用(下面将讨论)。

管道

在机器学习中,通常会运行一系列算法来处理和学习数据。 例如,一个简单的文本文件处理工作流程可能包括几个阶段:

MLlib 将这样的工作流表示为一个 Pipeline ,它由一系列顺序执行的 PipelineStage Transformer Estimator )组成。 我们将在本节中使用这个简单的工作流作为示例。

它是如何工作的

A Pipeline 被定义为一系列阶段,每个阶段要么是 Transformer ,要么是 Estimator 。 这些阶段按顺序运行,输入 DataFrame 在经过每个阶段时都会被转换。 对于 Transformer 阶段,在 DataFrame 上调用 transform() 方法。 对于 Estimator 阶段,调用 fit() 方法以生成一个 Transformer (该 Transformer 成为 PipelineModel 或已拟合的 Pipeline 的一部分),然后在 DataFrame 上调用该 Transformer transform() 方法。

我们为简单文本文档工作流说明这一点。下面的图是有关 训练时间 使用的 Pipeline

机器学习流程示例

上面,最上行表示一个 Pipeline ,具有三个阶段。 前两个阶段( Tokenizer HashingTF )是 Transformer s(蓝色),第三个阶段( LogisticRegression )是一个 Estimator (红色)。 底行表示数据流经管道,其中圆柱体表示 DataFrame 。 对原始 DataFrame 调用 Pipeline.fit() 方法,该 DataFrame 包含原始文本文档和标签。 Tokenizer.transform() 方法将原始文本文档拆分为单词,在 DataFrame 中添加一个包含单词的新列。 HashingTF.transform() 方法将单词列转换为特征向量,在 DataFrame 中添加一个包含这些向量的新列。 现在,由于 LogisticRegression 是一个 Estimator ,因此 Pipeline 首先调用 LogisticRegression.fit() 来生成 LogisticRegressionModel 。 如果 Pipeline 有更多的 Estimator ,它会在将 DataFrame 传递给下一个阶段之前,调用 LogisticRegressionModel transform() 方法。

A Pipeline 是一个 Estimator 。因此,在 Pipeline fit() 方法运行后,它会生成一个 PipelineModel ,这是一个 Transformer 。这个 PipelineModel 测试时 使用;下图说明了这种用法。

机器学习管道模型示例

在上面的图中, PipelineModel 的阶段数量与原始的 Pipeline 相同,但原始 Pipeline 中的所有 Estimator 都变成了 Transformer 。 当调用 PipelineModel transform() 方法时,会在测试数据集上依次通过拟合的管道。 每个阶段的 transform() 方法更新数据集并将其传递给下一个阶段。

Pipeline PipelineModel 有助于确保训练数据和测试数据经历相同的特征处理步骤。

详情

DAG Pipeline s Pipeline 的阶段被指定为一个有序数组。这里给出的示例都是线性 Pipeline ,即 Pipeline 中的每个阶段使用前一个阶段生成的数据。只要数据流图形成一个有向无环图(DAG),就可以创建非线性 Pipeline 。该图目前是基于每个阶段的输入和输出列名(通常作为参数指定)隐式地指定的。如果 Pipeline 形成 DAG,则阶段必须按拓扑顺序指定。

运行时检查 :由于 Pipeline 可以处理具有不同类型的 DataFrame ,因此它们无法使用编译时类型检查。 Pipeline PipelineModel 在实际运行 Pipeline 之前进行运行时检查。此类型检查是使用 DataFrame 模式 进行的,这是对 DataFrame 中列的数据类型的描述。

唯一的管道阶段 :一个 Pipeline 的阶段应该是唯一的实例。例如,不能将相同的实例 myHashingTF 插入到 Pipeline 中两次,因为 Pipeline 阶段必须具有唯一的 ID。然而,不同的实例 myHashingTF1 myHashingTF2 (都是类型为 HashingTF )可以放入同一个 Pipeline ,因为不同的实例将会创建具有不同的 ID。

参数

MLlib Estimator Transformer 使用统一的API来指定参数。

A Param 是一个具有自包含文档的命名参数。 A ParamMap 是一组 (参数, 值) 对。

有两种主要方式将参数传递给算法:

  1. 为实例设置参数。例如,如果 lr LogisticRegression 的一个实例,可以调用 lr.setMaxIter(10) 使 lr.fit() 最多使用 10 次迭代。这个 API 类似于 spark.mllib 包中使用的 API。
  2. ParamMap 传递给 fit() transform() ParamMap 中的任何参数都将覆盖之前通过 setter 方法指定的参数。

参数属于特定实例的 Estimator Transformer 。例如,如果我们有两个 LogisticRegression 实例 lr1 lr2 ,那么我们可以构建一个 ParamMap ,同时指定两个 maxIter 参数: ParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20) 。如果在一个 Pipeline 中有两个算法具有 maxIter 参数,这就很有用。

机器学习持久性:保存和加载管道

通常,将模型或管道保存到磁盘以备后用是值得的。在Spark 1.6中,向Pipeline API添加了模型导入/导出功能。 从Spark 2.3开始,基于DataFrame的API在 spark.ml pyspark.ml 中得到了完整的支持。

机器学习持久化在Scala、Java和Python中工作。然而,R目前使用了修改过的格式,因此在R中保存的模型只能在R中加载;这个问题将在未来解决,并在 SPARK-15572 中进行跟踪。

机器学习持久性向后兼容性

一般来说,MLlib 保持 ML 持久性的向后兼容性。也就是说,如果您在一个版本的 Spark 中保存了一个 ML 模型或管道,那么您应该能够在未来的版本中加载并使用它。然而,也有一些罕见的例外,下面将进行描述。

模型持久化:在Spark版本X中使用Apache Spark ML持久化保存的模型或Pipeline能够被Spark版本Y加载吗?

模型行为:在Spark版本Y中,模型或Pipeline在Spark版本X中是否表现完全相同?

对于模型持久性和模型行为,任何在小版本或补丁版本之间的重大更改都将在Spark版本发布说明中报告。如果在发布说明中没有报告故障,那么它应被视为一个需要修复的错误。

代码示例

本节提供了代码示例,阐明了上述讨论的功能。有关更多信息,请参阅API文档( Scala Java ,和 Python )。

示例:估算器、转换器和参数

此示例涵盖了 Estimator Transformer Param 的概念。

请参考 Estimator Python 文档 Transformer Python 文档 Params Python 文档 以获取有关 API 的更多详细信息。

from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
# 从 (标签, 特征) 元组列表准备训练数据。
training = spark.createDataFrame([
(1.0, Vectors.dense([0.0, 1.1, 0.1])),
(0.0, Vectors.dense([2.0, 1.0, -1.0])),
(0.0, Vectors.dense([2.0, 1.3, 1.0])),
(1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])
# 创建一个 LogisticRegression 实例。此实例是一个估计器。
lr = LogisticRegression(maxIter=10, regParam=0.01)
# 打印参数、文档和任何默认值。
print("LogisticRegression 参数:\n" + lr.explainParams() + "\n")
# 学习 LogisticRegression 模型。这使用存储在 lr 中的参数。
model1 = lr.fit(training)
# 因为 model1 是一个模型(即由估计器生成的变换器),
# 我们可以查看它在 fit() 期间使用的参数。
# 这将打印参数(名称: 值)对,其中名称是此
# LogisticRegression 实例的唯一 ID。
print("模型 1 使用的参数: ")
print(model1.extractParamMap())
# 我们也可以使用 Python 字典指定参数作为 paramMap
paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter] = 30 # 指定 1 个参数,覆盖原来的 maxIter。
# 指定多个参数。
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55}) # type: ignore

# 您可以组合 paramMaps,它们是 Python 字典。
# 更改输出列名称
paramMap2 = {lr.probabilityCol: "myProbability"}
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2) # type: ignore

# 现在使用 paramMapCombined 参数学习新模型。
# paramMapCombined 覆盖之前通过 lr.set* 方法设置的所有参数。
model2 = lr.fit(training, paramMapCombined)
print("模型 2 使用的参数: ")
print(model2.extractParamMap())
# 准备测试数据
test = spark.createDataFrame([
(1.0, Vectors.dense([-1.0, 1.5, 1.3])),
(0.0, Vectors.dense([3.0, 2.0, -0.1])),
(1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])
# 使用 Transformer.transform() 方法对测试数据进行预测。
# LogisticRegression.transform 将仅使用 'features' 列。
# 注意 model2.transform() 输出一个 "myProbability" 列,而不是通常的
# 'probability' 列,因为我们之前重命名了 lr.probabilityCol 参数。
prediction = model2.transform(test)
result = prediction.select("features", "label", "myProbability", "prediction") \
    .collect()
for row in result:
print("features=%s, label=%s -> prob=%s, prediction=%s"
% (row.features, row.label, row.myProbability, row.prediction))
Find full example code at "examples/src/main/python/ml/estimator_transformer_param_example.py" in the Spark repo.

有关API的详细信息,请参阅 Estimator Scala文档 Transformer Scala文档 Params Scala文档

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row
// 从(label, features)元组的列表中准备训练数据。
val training = spark.createDataFrame(Seq(
(1.0, Vectors.dense(0.0, 1.1, 0.1)),
(0.0, Vectors.dense(2.0, 1.0, -1.0)),
(0.0, Vectors.dense(2.0, 1.3, 1.0)),
(1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")
// 创建一个LogisticRegression实例。这个实例是一个估计器。
val lr = new LogisticRegression()
// 输出参数、文档及任何默认值。
println(s"LogisticRegression参数:\n ${lr.explainParams()}\n")
// 我们可以使用设置方法来设置参数。
lr.setMaxIter(10)
.setRegParam(0.01)
// 学习一个LogisticRegression模型。 这使用存储在lr中的参数。
val model1 = lr.fit(training)
// 由于model1是一个模型(即由估计器生成的转换器),
// 我们可以查看它在fit()期间使用的参数。
// 这将打印参数(名称:值)对,名称为此
// LogisticRegression实例的唯一ID。
println(s"模型1是采用以下参数训练的: ${model1.parent.extractParamMap}")
// 我们也可以使用ParamMap指定参数,
// ParamMap支持多种指定参数的方法。
val paramMap = ParamMap(lr.maxIter -> 20)
.put(lr.maxIter, 30) // 指定1个参数。这将覆盖原来的maxIter。
.put(lr.regParam -> 0.1, lr.threshold -> 0.55) // 指定多个参数。
// 也可以组合多个ParamMaps。
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // 更改输出列名。
val paramMapCombined = paramMap ++ paramMap2
// 现在使用paramMapCombined参数学习新的模型。
// paramMapCombined会覆盖之前通过lr.set*方法设置的所有参数。
val model2 = lr.fit(training, paramMapCombined)
println(s"模型2是采用以下参数训练的: ${model2.parent.extractParamMap}")
// 准备测试数据。
val test = spark.createDataFrame(Seq(
(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
(0.0, Vectors.dense(3.0, 2.0, -0.1)),
(1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")
// 使用Transformer.transform()方法对测试数据进行预测。
// LogisticRegression.transform将仅使用"features"列。
// 请注意model2.transform()输出的是'myProbability'列,而不是通常的
// 'probability'列,因为我们之前重命名了lr.probabilityCol参数。
model2.transform(test)
.select("features", "label", "myProbability", "prediction")
.collect()
.foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
println(s"($features, $label) -> prob=$prob, prediction=$prediction")
}
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/EstimatorTransformerParamExample.scala" in the Spark repo.

请参阅 Estimator Java文档 , Transformer Java文档 Params Java文档 以获取API的详细信息。

import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// 准备训练数据。
List<Row> dataTraining = Arrays.asList(
RowFactory.create(1.0, Vectors.dense(0.0, 1.1, 0.1)),
RowFactory.create(0.0, Vectors.dense(2.0, 1.0, -1.0)),
RowFactory.create(0.0, Vectors.dense(2.0, 1.3, 1.0)),
RowFactory.create(1.0, Vectors.dense(0.0, 1.2, -0.5))
);
StructType schema = new StructType(new StructField[]{
new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> training = spark.createDataFrame(dataTraining, schema);
// 创建一个 LogisticRegression 实例。这个实例是一个估计器。
LogisticRegression lr = new LogisticRegression();
// 打印参数,文档和任何默认值。
System.out.println("LogisticRegression 参数:\n" + lr.explainParams() + "\n");
// 我们可以使用设置方法设置参数。
lr.setMaxIter(10).setRegParam(0.01);
// 学习一个 LogisticRegression 模型。 这使用储存在 lr 中的参数。
LogisticRegressionModel model1 = lr.fit(training);
// 由于 model1 是一个模型(即由估计器生成的转换器),
// 我们可以查看它在 fit() 期间使用的参数。
// 这将打印参数(名称:值)对,其中名称是此
// LogisticRegression 实例的唯一 ID。
System.out.println("Model 1 是使用参数拟合的: " + model1.parent().extractParamMap());
// 我们也可以通过 ParamMap 来指定参数。
ParamMap paramMap = new ParamMap()
.put(lr.maxIter().w(20)) // 指定 1 个参数。
.put(lr.maxIter(), 30) // 这将覆盖原始的 maxIter。
.put(lr.regParam().w(0.1), lr.threshold().w(0.55)); // 指定多个参数。
// 还可以组合 ParamMaps。
ParamMap paramMap2 = new ParamMap()
.put(lr.probabilityCol().w("myProbability")); // 更改输出列名称
ParamMap paramMapCombined = paramMap.$plus$plus(paramMap2);
// 现在使用 paramMapCombined 参数学习一个新模型。
// paramMapCombined 覆盖了之前通过 lr.set* 方法设置的所有参数。
LogisticRegressionModel model2 = lr.fit(training, paramMapCombined);
System.out.println("Model 2 是使用参数拟合的: " + model2.parent().extractParamMap());
// 准备测试文档。
List<Row> dataTest = Arrays.asList(
RowFactory.create(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
RowFactory.create(0.0, Vectors.dense(3.0, 2.0, -0.1)),
RowFactory.create(1.0, Vectors.dense(0.0, 2.2, -1.5))
);
Dataset<Row> test = spark.createDataFrame(dataTest, schema);
// 使用 Transformer.transform() 方法对测试文档进行预测。
// LogisticRegression.transform 将只使用 'features' 列。
// 请注意 model2.transform() 输出 'myProbability' 列,而不是通常的
// 'probability' 列,因为我们之前重命名了 lr.probabilityCol 参数。
Dataset<Row> results = model2.transform(test);
Dataset<Row> rows = results.select("features", "label", "myProbability", "prediction");
for (Row r: rows.collectAsList()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2)
+ ", prediction=" + r.get(3));
}
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaEstimatorTransformerParamExample.java" in the Spark repo.

示例:管道

此示例遵循上述图中的简单文本文档 Pipeline

有关API的更多详细信息,请参考 Pipeline Python文档

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
# 从 (id, text, label) 元组列表准备训练文档。
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])
# 配置一个机器学习管道,它由三个阶段组成:分词器、HashingTF 和逻辑回归。
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
# 将管道拟合到训练文档。
model = pipeline.fit(training)
# 准备测试文档,这些文档没有标签 (id, text) 元组。
test = spark.createDataFrame([
(4, "spark i j k"),
(5, "l m n"),
(6, "spark hadoop spark"),
(7, "apache hadoop")
], ["id", "text"])
# 对测试文档进行预测并打印感兴趣的列。
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
rid, text, prob, prediction = row
print(
"(%d, %s) --> prob=%s, prediction=%f" % (
rid, text, str(prob), prediction # type: ignore
 )
)
Find full example code at "examples/src/main/python/ml/pipeline_example.py" in the Spark repo.

有关API的详细信息,请参阅 Pipeline Scala文档

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
// 从一个 (id, text, label) 元组的列表中准备训练文档。
val training = spark.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")
// 配置一个 ML 管道,它由三个阶段组成:分词器、hashingTF 和 lr。
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
// 将管道拟合到训练文档上。
val model = pipeline.fit(training)
// 现在我们可以选择将拟合的管道保存到磁盘
model.write.overwrite().save("/tmp/spark-logistic-regression-model")
// 我们也可以将这个未拟合的管道保存到磁盘
pipeline.write.overwrite().save("/tmp/unfit-lr-model")
// 并在生产中重新加载它
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")
// 准备测试文档,未标记的 (id, text) 元组。
val test = spark.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "spark hadoop spark"),
(7L, "apache hadoop")
)).toDF("id", "text")
// 对测试文档进行预测。
model.transform(test)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}
Find full example code at "examples/src/main/scala/org/apache/spark/examples/ml/PipelineExample.scala" in the Spark repo.

有关API的详细信息,请参阅 Pipeline Java文档

import java.util.Arrays;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// 准备训练文档,这些文档带有标签。
Dataset<Row> training = spark.createDataFrame(Arrays.asList(
new JavaLabeledDocument(0L, "a b c d e spark", 1.0),
new JavaLabeledDocument(1L, "b d", 0.0),
new JavaLabeledDocument(2L, "spark f g h", 1.0),
new JavaLabeledDocument(3L, "hadoop mapreduce", 0.0)
), JavaLabeledDocument.class);
// 配置一个机器学习管道,该管道由三个阶段组成:分词器、HashingTF 和 lr。
Tokenizer tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words");
HashingTF hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol())
.setOutputCol("features");
LogisticRegression lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001);
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[] {tokenizer, hashingTF, lr});
// 将管道拟合到训练文档上。
PipelineModel model = pipeline.fit(training);
// 准备测试文档,这些文档没有标签。
Dataset<Row> test = spark.createDataFrame(Arrays.asList(
new JavaDocument(4L, "spark i j k"),
new JavaDocument(5L, "l m n"),
new JavaDocument(6L, "spark hadoop spark"),
new JavaDocument(7L, "apache hadoop")
), JavaDocument.class);
// 对测试文档进行预测。
Dataset<Row> predictions = model.transform(test);
for (Row r : predictions.select("id", "text", "probability", "prediction").collectAsList()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> prob=" + r.get(2)
+ ", prediction=" + r.get(3));
}
Find full example code at "examples/src/main/java/org/apache/spark/examples/ml/JavaPipelineExample.java" in the Spark repo.

模型选择(超参数调优)

使用ML管道的一个重要好处是超参数优化。有关自动模型选择的更多信息,请参阅 ML调优指南