机器学习管道
\[
\newcommand{\R}{\mathbb{R}}
\newcommand{\E}{\mathbb{E}}
\newcommand{\x}{\mathbf{x}}
\newcommand{\y}{\mathbf{y}}
\newcommand{\wv}{\mathbf{w}}
\newcommand{\av}{\mathbf{\alpha}}
\newcommand{\bv}{\mathbf{b}}
\newcommand{\N}{\mathbb{N}}
\newcommand{\id}{\mathbf{I}}
\newcommand{\ind}{\mathbf{1}}
\newcommand{\0}{\mathbf{0}}
\newcommand{\unit}{\mathbf{e}}
\newcommand{\one}{\mathbf{1}}
\newcommand{\zero}{\mathbf{0}}
\]
在本节中,我们介绍 机器学习管道 的概念。 机器学习管道提供了一组统一的高级API,这些API构建在 数据框架 之上,帮助用户创建和调整实际的 机器学习管道。
目录
管道中的主要概念
MLlib 标准化了机器学习算法的 API,以便更容易将多个算法组合成单个管道或工作流。 本节涵盖了 Pipelines API 引入的关键概念,其中管道概念主要受到 scikit-learn 项目的启发。
-
DataFrame: 该机器学习API使用来自Spark SQL的DataFrame作为机器学习数据集,可以容纳多种数据类型。例如,一个DataFrame可以有不同的列来存储文本、特征向量、真实标签和预测结果。 -
Transformer:Transformer是一种算法,可以将一个DataFrame转换为另一个DataFrame。例如,一个机器学习模型是一个Transformer,它将包含特征的DataFrame转换为包含预测的DataFrame。 -
Estimator:Estimator是一种算法,可以在DataFrame上进行拟合,以生成一个Transformer。例如,学习算法是一个Estimator,它在DataFrame上进行训练并生成一个模型。 -
Pipeline:Pipeline将多个Transformer和Estimator连接在一起,以指定机器学习工作流。 -
Parameter: 所有Transformer和Estimator现在共享一个通用的API来指定参数。
数据框
机器学习可以应用于多种数据类型,例如向量、文本、图像和结构化数据。 这个API采用了来自Spark SQL的
DataFrame
以支持多种数据类型。
DataFrame
支持许多基本和结构化类型;请参阅
Spark SQL 数据类型参考
获取支持的类型列表。
除了 Spark SQL 指南中列出的类型,
DataFrame
还可以使用 ML
Vector
类型。
一个
DataFrame
可以从一个普通的
RDD
隐式或显式地创建。请参见下面的代码示例和
Spark SQL 编程指南
以获取示例。
在一个
DataFrame
中,列是有名称的。下面的代码示例使用了“text”、“features”和“label”等名称。
管道组件
变压器
A
Transformer
是一个包含特征变换器和学习模型的抽象。
从技术上讲,
Transformer
实现了一个方法
transform()
,该方法将一个
DataFrame
转换为
另一个,通常是通过附加一个或多个列。
例如:
-
特征转换器可能会接受一个
DataFrame,读取一列(例如,文本),将其映射到一个新列(例如,特征向量),并输出一个新的DataFrame,该映射列附加在后面。 -
学习模型可能会接受一个
DataFrame,读取包含特征向量的列,为每个特征向量预测标签,并输出一个新的DataFrame,将预测标签作为一列附加在后面。
估计量
一个
Estimator
抽象了学习算法或任何适合或在数据上训练的算法的概念。技术上,一个
Estimator
实现了一个方法
fit()
,该方法接受一个
DataFrame
并生成一个
Model
,该模型是一个
Transformer
。例如,像
LogisticRegression
这样的学习算法是一个
Estimator
,调用
fit()
会训练一个
LogisticRegressionModel
,该模型是一个
Model
,因此也是一个
Transformer
。
管道组件的属性
Transformer.transform()
和
Estimator.fit()
都是无状态的。未来可能会通过其他概念支持有状态算法。
每个
Transformer
或
Estimator
的实例都有一个唯一的 ID,这在指定参数时很有用(下面将讨论)。
管道
在机器学习中,通常会运行一系列算法来处理和学习数据。 例如,一个简单的文本文件处理工作流程可能包括几个阶段:
- 将每个文档的文本拆分为单词。
- 将每个文档的单词转换为数值特征向量。
- 使用特征向量和标签学习一个预测模型。
MLlib 将这样的工作流表示为一个
Pipeline
,它由一系列顺序执行的
PipelineStage
(
Transformer
和
Estimator
)组成。
我们将在本节中使用这个简单的工作流作为示例。
它是如何工作的
A
Pipeline
被定义为一系列阶段,每个阶段要么是
Transformer
,要么是
Estimator
。
这些阶段按顺序运行,输入
DataFrame
在经过每个阶段时都会被转换。
对于
Transformer
阶段,在
DataFrame
上调用
transform()
方法。
对于
Estimator
阶段,调用
fit()
方法以生成一个
Transformer
(该
Transformer
成为
PipelineModel
或已拟合的
Pipeline
的一部分),然后在
DataFrame
上调用该
Transformer
的
transform()
方法。
我们为简单文本文档工作流说明这一点。下面的图是有关
训练时间
使用的
Pipeline
。
上面,最上行表示一个
Pipeline
,具有三个阶段。
前两个阶段(
Tokenizer
和
HashingTF
)是
Transformer
s(蓝色),第三个阶段(
LogisticRegression
)是一个
Estimator
(红色)。
底行表示数据流经管道,其中圆柱体表示
DataFrame
。
对原始
DataFrame
调用
Pipeline.fit()
方法,该
DataFrame
包含原始文本文档和标签。
Tokenizer.transform()
方法将原始文本文档拆分为单词,在
DataFrame
中添加一个包含单词的新列。
HashingTF.transform()
方法将单词列转换为特征向量,在
DataFrame
中添加一个包含这些向量的新列。
现在,由于
LogisticRegression
是一个
Estimator
,因此
Pipeline
首先调用
LogisticRegression.fit()
来生成
LogisticRegressionModel
。
如果
Pipeline
有更多的
Estimator
,它会在将
DataFrame
传递给下一个阶段之前,调用
LogisticRegressionModel
的
transform()
方法。
A
Pipeline
是一个
Estimator
。因此,在
Pipeline
的
fit()
方法运行后,它会生成一个
PipelineModel
,这是一个
Transformer
。这个
PipelineModel
在
测试时
使用;下图说明了这种用法。
在上面的图中,
PipelineModel
的阶段数量与原始的
Pipeline
相同,但原始
Pipeline
中的所有
Estimator
都变成了
Transformer
。
当调用
PipelineModel
的
transform()
方法时,会在测试数据集上依次通过拟合的管道。
每个阶段的
transform()
方法更新数据集并将其传递给下一个阶段。
Pipeline
和
PipelineModel
有助于确保训练数据和测试数据经历相同的特征处理步骤。
详情
DAG
Pipeline
s
:
Pipeline
的阶段被指定为一个有序数组。这里给出的示例都是线性
Pipeline
,即
Pipeline
中的每个阶段使用前一个阶段生成的数据。只要数据流图形成一个有向无环图(DAG),就可以创建非线性
Pipeline
。该图目前是基于每个阶段的输入和输出列名(通常作为参数指定)隐式地指定的。如果
Pipeline
形成 DAG,则阶段必须按拓扑顺序指定。
运行时检查
:由于
Pipeline
可以处理具有不同类型的
DataFrame
,因此它们无法使用编译时类型检查。
Pipeline
和
PipelineModel
在实际运行
Pipeline
之前进行运行时检查。此类型检查是使用
DataFrame
模式
进行的,这是对
DataFrame
中列的数据类型的描述。
唯一的管道阶段
:一个
Pipeline
的阶段应该是唯一的实例。例如,不能将相同的实例
myHashingTF
插入到
Pipeline
中两次,因为
Pipeline
阶段必须具有唯一的 ID。然而,不同的实例
myHashingTF1
和
myHashingTF2
(都是类型为
HashingTF
)可以放入同一个
Pipeline
,因为不同的实例将会创建具有不同的 ID。
参数
MLlib
Estimator
和
Transformer
使用统一的API来指定参数。
A
Param
是一个具有自包含文档的命名参数。
A
ParamMap
是一组 (参数, 值) 对。
有两种主要方式将参数传递给算法:
-
为实例设置参数。例如,如果
lr是LogisticRegression的一个实例,可以调用lr.setMaxIter(10)使lr.fit()最多使用 10 次迭代。这个 API 类似于spark.mllib包中使用的 API。 -
将
ParamMap传递给fit()或transform()。ParamMap中的任何参数都将覆盖之前通过 setter 方法指定的参数。
参数属于特定实例的
Estimator
和
Transformer
。例如,如果我们有两个
LogisticRegression
实例
lr1
和
lr2
,那么我们可以构建一个
ParamMap
,同时指定两个
maxIter
参数:
ParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20)
。如果在一个
Pipeline
中有两个算法具有
maxIter
参数,这就很有用。
机器学习持久性:保存和加载管道
通常,将模型或管道保存到磁盘以备后用是值得的。在Spark 1.6中,向Pipeline API添加了模型导入/导出功能。
从Spark 2.3开始,基于DataFrame的API在
spark.ml
和
pyspark.ml
中得到了完整的支持。
机器学习持久化在Scala、Java和Python中工作。然而,R目前使用了修改过的格式,因此在R中保存的模型只能在R中加载;这个问题将在未来解决,并在 SPARK-15572 中进行跟踪。
机器学习持久性向后兼容性
一般来说,MLlib 保持 ML 持久性的向后兼容性。也就是说,如果您在一个版本的 Spark 中保存了一个 ML 模型或管道,那么您应该能够在未来的版本中加载并使用它。然而,也有一些罕见的例外,下面将进行描述。
模型持久化:在Spark版本X中使用Apache Spark ML持久化保存的模型或Pipeline能够被Spark版本Y加载吗?
- 主要版本:没有保证,但会尽力而为。
- 次要版本和补丁版本:是的;这些是向后兼容的。
- 关于格式的说明:对于稳定的持久化格式没有保证,但模型加载本身被设计为向后兼容。
模型行为:在Spark版本Y中,模型或Pipeline在Spark版本X中是否表现完全相同?
- 主要版本:没有保证,但会尽力而为。
- 次要版本和补丁版本:行为相同,除了修复错误。
对于模型持久性和模型行为,任何在小版本或补丁版本之间的重大更改都将在Spark版本发布说明中报告。如果在发布说明中没有报告故障,那么它应被视为一个需要修复的错误。
代码示例
本节提供了代码示例,阐明了上述讨论的功能。有关更多信息,请参阅API文档( Scala , Java ,和 Python )。
示例:估算器、转换器和参数
此示例涵盖了
Estimator
、
Transformer
和
Param
的概念。
请参考
Estimator
Python 文档
,
Transformer
Python 文档
和
Params
Python 文档
以获取有关 API 的更多详细信息。
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
# 从 (标签, 特征) 元组列表准备训练数据。
training = spark.createDataFrame([
(1.0, Vectors.dense([0.0, 1.1, 0.1])),
(0.0, Vectors.dense([2.0, 1.0, -1.0])),
(0.0, Vectors.dense([2.0, 1.3, 1.0])),
(1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])
# 创建一个 LogisticRegression 实例。此实例是一个估计器。
lr = LogisticRegression(maxIter=10, regParam=0.01)
# 打印参数、文档和任何默认值。
print("LogisticRegression 参数:\n" + lr.explainParams() + "\n")
# 学习 LogisticRegression 模型。这使用存储在 lr 中的参数。
model1 = lr.fit(training)
# 因为 model1 是一个模型(即由估计器生成的变换器),
# 我们可以查看它在 fit() 期间使用的参数。
# 这将打印参数(名称: 值)对,其中名称是此
# LogisticRegression 实例的唯一 ID。
print("模型 1 使用的参数: ")
print(model1.extractParamMap())
# 我们也可以使用 Python 字典指定参数作为 paramMap
paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter] = 30 # 指定 1 个参数,覆盖原来的 maxIter。
# 指定多个参数。
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55}) # type: ignore
# 您可以组合 paramMaps,它们是 Python 字典。
# 更改输出列名称
paramMap2 = {lr.probabilityCol: "myProbability"}
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2) # type: ignore
# 现在使用 paramMapCombined 参数学习新模型。
# paramMapCombined 覆盖之前通过 lr.set* 方法设置的所有参数。
model2 = lr.fit(training, paramMapCombined)
print("模型 2 使用的参数: ")
print(model2.extractParamMap())
# 准备测试数据
test = spark.createDataFrame([
(1.0, Vectors.dense([-1.0, 1.5, 1.3])),
(0.0, Vectors.dense([3.0, 2.0, -0.1])),
(1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])
# 使用 Transformer.transform() 方法对测试数据进行预测。
# LogisticRegression.transform 将仅使用 'features' 列。
# 注意 model2.transform() 输出一个 "myProbability" 列,而不是通常的
# 'probability' 列,因为我们之前重命名了 lr.probabilityCol 参数。
prediction = model2.transform(test)
result = prediction.select("features", "label", "myProbability", "prediction") \
.collect()
for row in result:
print("features=%s, label=%s -> prob=%s, prediction=%s"
% (row.features, row.label, row.myProbability, row.prediction))
有关API的详细信息,请参阅
Estimator
Scala文档
、
Transformer
Scala文档
和
Params
Scala文档
。
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row
// 从(label, features)元组的列表中准备训练数据。
val training = spark.createDataFrame(Seq(
(1.0, Vectors.dense(0.0, 1.1, 0.1)),
(0.0, Vectors.dense(2.0, 1.0, -1.0)),
(0.0, Vectors.dense(2.0, 1.3, 1.0)),
(1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")
// 创建一个LogisticRegression实例。这个实例是一个估计器。
val lr = new LogisticRegression()
// 输出参数、文档及任何默认值。
println(s"LogisticRegression参数:\n ${lr.explainParams()}\n")
// 我们可以使用设置方法来设置参数。
lr.setMaxIter(10)
.setRegParam(0.01)
// 学习一个LogisticRegression模型。 这使用存储在lr中的参数。
val model1 = lr.fit(training)
// 由于model1是一个模型(即由估计器生成的转换器),
// 我们可以查看它在fit()期间使用的参数。
// 这将打印参数(名称:值)对,名称为此
// LogisticRegression实例的唯一ID。
println(s"模型1是采用以下参数训练的: ${model1.parent.extractParamMap}")
// 我们也可以使用ParamMap指定参数,
// ParamMap支持多种指定参数的方法。
val paramMap = ParamMap(lr.maxIter -> 20)
.put(lr.maxIter, 30) // 指定1个参数。这将覆盖原来的maxIter。
.put(lr.regParam -> 0.1, lr.threshold -> 0.55) // 指定多个参数。
// 也可以组合多个ParamMaps。
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // 更改输出列名。
val paramMapCombined = paramMap ++ paramMap2
// 现在使用paramMapCombined参数学习新的模型。
// paramMapCombined会覆盖之前通过lr.set*方法设置的所有参数。
val model2 = lr.fit(training, paramMapCombined)
println(s"模型2是采用以下参数训练的: ${model2.parent.extractParamMap}")
// 准备测试数据。
val test = spark.createDataFrame(Seq(
(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
(0.0, Vectors.dense(3.0, 2.0, -0.1)),
(1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")
// 使用Transformer.transform()方法对测试数据进行预测。
// LogisticRegression.transform将仅使用"features"列。
// 请注意model2.transform()输出的是'myProbability'列,而不是通常的
// 'probability'列,因为我们之前重命名了lr.probabilityCol参数。
model2.transform(test)
.select("features", "label", "myProbability", "prediction")
.collect()
.foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
println(s"($features, $label) -> prob=$prob, prediction=$prediction")
}
请参阅
Estimator
Java文档
,
Transformer
Java文档
和
Params
Java文档
以获取API的详细信息。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// 准备训练数据。
List<Row> dataTraining = Arrays.asList(
RowFactory.create(1.0, Vectors.dense(0.0, 1.1, 0.1)),
RowFactory.create(0.0, Vectors.dense(2.0, 1.0, -1.0)),
RowFactory.create(0.0, Vectors.dense(2.0, 1.3, 1.0)),
RowFactory.create(1.0, Vectors.dense(0.0, 1.2, -0.5))
);
StructType schema = new StructType(new StructField[]{
new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> training = spark.createDataFrame(dataTraining, schema);
// 创建一个 LogisticRegression 实例。这个实例是一个估计器。
LogisticRegression lr = new LogisticRegression();
// 打印参数,文档和任何默认值。
System.out.println("LogisticRegression 参数:\n" + lr.explainParams() + "\n");
// 我们可以使用设置方法设置参数。
lr.setMaxIter(10).setRegParam(0.01);
// 学习一个 LogisticRegression 模型。 这使用储存在 lr 中的参数。
LogisticRegressionModel model1 = lr.fit(training);
// 由于 model1 是一个模型(即由估计器生成的转换器),
// 我们可以查看它在 fit() 期间使用的参数。
// 这将打印参数(名称:值)对,其中名称是此
// LogisticRegression 实例的唯一 ID。
System.out.println("Model 1 是使用参数拟合的: " + model1.parent().extractParamMap());
// 我们也可以通过 ParamMap 来指定参数。
ParamMap paramMap = new ParamMap()
.put(lr.maxIter().w(20)) // 指定 1 个参数。
.put(lr.maxIter(), 30) // 这将覆盖原始的 maxIter。
.put(lr.regParam().w(0.1), lr.threshold().w(0.55)); // 指定多个参数。
// 还可以组合 ParamMaps。
ParamMap paramMap2 = new ParamMap()
.put(lr.probabilityCol().w("myProbability")); // 更改输出列名称
ParamMap paramMapCombined = paramMap.$plus$plus(paramMap2);
// 现在使用 paramMapCombined 参数学习一个新模型。
// paramMapCombined 覆盖了之前通过 lr.set* 方法设置的所有参数。
LogisticRegressionModel model2 = lr.fit(training, paramMapCombined);
System.out.println("Model 2 是使用参数拟合的: " + model2.parent().extractParamMap());
// 准备测试文档。
List<Row> dataTest = Arrays.asList(
RowFactory.create(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
RowFactory.create(0.0, Vectors.dense(3.0, 2.0, -0.1)),
RowFactory.create(1.0, Vectors.dense(0.0, 2.2, -1.5))
);
Dataset<Row> test = spark.createDataFrame(dataTest, schema);
// 使用 Transformer.transform() 方法对测试文档进行预测。
// LogisticRegression.transform 将只使用 'features' 列。
// 请注意 model2.transform() 输出 'myProbability' 列,而不是通常的
// 'probability' 列,因为我们之前重命名了 lr.probabilityCol 参数。
Dataset<Row> results = model2.transform(test);
Dataset<Row> rows = results.select("features", "label", "myProbability", "prediction");
for (Row r: rows.collectAsList()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2)
+ ", prediction=" + r.get(3));
}
示例:管道
此示例遵循上述图中的简单文本文档
Pipeline
。
有关API的更多详细信息,请参考
Pipeline
Python文档
。
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
# 从 (id, text, label) 元组列表准备训练文档。
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])
# 配置一个机器学习管道,它由三个阶段组成:分词器、HashingTF 和逻辑回归。
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
# 将管道拟合到训练文档。
model = pipeline.fit(training)
# 准备测试文档,这些文档没有标签 (id, text) 元组。
test = spark.createDataFrame([
(4, "spark i j k"),
(5, "l m n"),
(6, "spark hadoop spark"),
(7, "apache hadoop")
], ["id", "text"])
# 对测试文档进行预测并打印感兴趣的列。
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
rid, text, prob, prediction = row
print(
"(%d, %s) --> prob=%s, prediction=%f" % (
rid, text, str(prob), prediction # type: ignore
)
)
有关API的详细信息,请参阅
Pipeline
Scala文档
。
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
// 从一个 (id, text, label) 元组的列表中准备训练文档。
val training = spark.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")
// 配置一个 ML 管道,它由三个阶段组成:分词器、hashingTF 和 lr。
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
// 将管道拟合到训练文档上。
val model = pipeline.fit(training)
// 现在我们可以选择将拟合的管道保存到磁盘
model.write.overwrite().save("/tmp/spark-logistic-regression-model")
// 我们也可以将这个未拟合的管道保存到磁盘
pipeline.write.overwrite().save("/tmp/unfit-lr-model")
// 并在生产中重新加载它
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")
// 准备测试文档,未标记的 (id, text) 元组。
val test = spark.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "spark hadoop spark"),
(7L, "apache hadoop")
)).toDF("id", "text")
// 对测试文档进行预测。
model.transform(test)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}
有关API的详细信息,请参阅
Pipeline
Java文档
。
import java.util.Arrays;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// 准备训练文档,这些文档带有标签。
Dataset<Row> training = spark.createDataFrame(Arrays.asList(
new JavaLabeledDocument(0L, "a b c d e spark", 1.0),
new JavaLabeledDocument(1L, "b d", 0.0),
new JavaLabeledDocument(2L, "spark f g h", 1.0),
new JavaLabeledDocument(3L, "hadoop mapreduce", 0.0)
), JavaLabeledDocument.class);
// 配置一个机器学习管道,该管道由三个阶段组成:分词器、HashingTF 和 lr。
Tokenizer tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words");
HashingTF hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol())
.setOutputCol("features");
LogisticRegression lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001);
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[] {tokenizer, hashingTF, lr});
// 将管道拟合到训练文档上。
PipelineModel model = pipeline.fit(training);
// 准备测试文档,这些文档没有标签。
Dataset<Row> test = spark.createDataFrame(Arrays.asList(
new JavaDocument(4L, "spark i j k"),
new JavaDocument(5L, "l m n"),
new JavaDocument(6L, "spark hadoop spark"),
new JavaDocument(7L, "apache hadoop")
), JavaDocument.class);
// 对测试文档进行预测。
Dataset<Row> predictions = model.transform(test);
for (Row r : predictions.select("id", "text", "probability", "prediction").collectAsList()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> prob=" + r.get(2)
+ ", prediction=" + r.get(3));
}
模型选择(超参数调优)
使用ML管道的一个重要好处是超参数优化。有关自动模型选择的更多信息,请参阅 ML调优指南 。