特征提取与转换 - 基于RDD的API

词频-逆文档频率

注意 我们推荐使用基于DataFrame的API,详细信息请参见 ML用户指南中的TF-IDF

词频-逆文档频率 (TF-IDF) 是一种特征向量化方法,广泛用于文本挖掘,以反映一个术语对语料库中文档的重要性。 用 $t$ 表示一个术语,用 $d$ 表示一个文档,用 $D$ 表示语料库。 术语频率 $TF(t, d)$ 是术语 $t$ 出现在文档 $d$ 中的次数, 而文档频率 $DF(t, D)$ 是包含术语 $t$ 的文档数量。 如果我们仅使用术语频率来衡量重要性,很容易过分强调那些出现频率很高但对文档的信息量很少的术语,例如:“a”、“the”和“of”。 如果一个术语在语料库中频繁出现,这意味着它对特定文档没有特殊信息。 逆文档频率是测量一个术语提供信息量的数值: \[ IDF(t, D) = \log \frac{|D| + 1}{DF(t, D) + 1}, \] 其中 $|D|$ 是语料库中文档的总数。 由于使用了对数,如果一个术语出现在所有文档中,则其 IDF 值变为 0。 注意,为了避免对语料库外的术语进行零除法,应用了平滑项。 TF-IDF 测量简单地是 TF 和 IDF 的乘积: \[ TFIDF(t, d, D) = TF(t, d) \cdot IDF(t, D). \] 对于术语频率和文档频率的定义有几种变体。 在 spark.mllib 中,我们将 TF 和 IDF 分开,以使其灵活。

我们对词频的实现利用了 哈希技巧 。 一个原始特征通过应用哈希函数被映射到一个索引(术语)。 然后根据映射的索引计算词频。 这种方法避免了计算全球术语到索引映射的需要, 对于大型语料库,这可能是昂贵的,但它可能面临哈希碰撞的潜在问题, 其中不同的原始特征在哈希后可能变成相同的术语。 为了减少碰撞的机会,我们可以增加目标特征维度,即 哈希表的桶数量。 默认特征维度是 $2^{20} = 1,048,576$

注意: spark.mllib 不提供文本分割的工具。我们建议用户访问 斯坦福NLP小组 scalanlp/chalk

TF和IDF在 HashingTF IDF 中实现。 HashingTF 将一个列表的RDD作为输入。每条记录可以是一个字符串或其他类型的可迭代对象。

有关API的详细信息,请参阅 HashingTF Python文档

from pyspark.mllib.feature import HashingTF, IDF
# 加载文档(每行一个)。
documents = sc.textFile("data/mllib/kmeans_data.txt").map(lambda line: line.split(" "))
hashingTF = HashingTF()
tf = hashingTF.transform(documents)
# 在应用 HashingTF 时,只需要对数据进行一次遍历,而应用 IDF 需要两次遍历:
# 第一次计算 IDF 向量,第二次通过 IDF 缩放词频。
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)
# spark.mllib 的 IDF 实现提供了一个忽略术语的选项
# 该术语在少于最小文档数的情况下出现。
# 在这种情况下,这些术语的 IDF 设置为 0。
# 可以通过将 minDocFreq 值传递给 IDF 构造函数来使用此功能。
idfIgnore = IDF(minDocFreq=2).fit(tf)
tfidfIgnore = idfIgnore.transform(tf)
Find full example code at "examples/src/main/python/mllib/tf_idf_example.py" in the Spark repo.

TF 和 IDF 在 HashingTF IDF 中实现。 HashingTF RDD[Iterable[_]] 作为输入。 每条记录可以是一个字符串或其他类型的可迭代对象。

请参考 HashingTF 的Scala文档 以获取有关API的详细信息。

import org.apache.spark.mllib.feature.{HashingTF, IDF}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD
// 加载文档(每行一个)。
val documents: RDD[Seq[String]] = sc.textFile("data/mllib/kmeans_data.txt")
.map(_.split(" ").toSeq)
val hashingTF = new HashingTF()
val tf: RDD[Vector] = hashingTF.transform(documents)
// 在应用 HashingTF 时只需要对数据进行一次传递,而应用 IDF 需要两次传递:
// 首先计算 IDF 向量,其次通过 IDF 缩放词频。
tf.cache()
val idf = new IDF().fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)
// spark.mllib IDF 实现提供了一个选项,可以忽略出现在少于
// 最小文档数的术语。在这种情况下,这些术语的 IDF 设置为 0。
// 通过将 minDocFreq 值传递给 IDF 构造函数可以使用此功能。
val idfIgnore = new IDF(minDocFreq = 2).fit(tf)
val tfidfIgnore: RDD[Vector] = idfIgnore.transform(tf)
Find full example code at "examples/src/main/scala/org/apache/spark/examples/mllib/TFIDFExample.scala" in the Spark repo.

词向量模型

Word2Vec 计算单词的分布式向量表示。 分布式表示的主要优点是相似的单词在向量空间中靠近,这使得对新模式的泛化更容易,模型估计更稳健。分布式向量表示已被证明在许多自然语言处理应用中是有用的,例如命名实体识别、消歧义、解析、标记和机器翻译。

模型

在我们对Word2Vec的实现中,我们使用了跳字模型。跳字模型的训练目标是学习能够很好地预测同一句子中的上下文的词向量表示。 从数学上讲,给定一组训练词 $w_1, w_2, \dots, w_T$ ,跳字模型的目标是最大化平均对数似然 \[ \frac{1}{T} \sum_{t = 1}^{T}\sum_{j=-k}^{j=k} \log p(w_{t+j} | w_t) \] 其中 $k$ 是训练窗口的大小。

在跳字模型中,每个单词 $w$ 都与两个向量 $u_w$ 和 $v_w$ 相关联,分别表示单词和上下文的向量表示。给定单词 $w_j$ 正确预测单词 $w_i$ 的概率由 softmax 模型决定,其形式为 \[ p(w_i | w_j ) = \frac{\exp(u_{w_i}^{\top}v_{w_j})}{\sum_{l=1}^{V} \exp(u_l^{\top}v_{w_j})} \] 其中 $V$ 是词汇大小。

带有softmax的skip-gram模型代价高昂,因为计算$\log p(w_i | w_j)$的成本与$V$成正比,而$V$的数量级通常可以达到百万级。为了加速Word2Vec的训练,我们使用了层次softmax,这将计算$\log p(w_i | w_j)$的复杂度降低到了$O(\log(V))$

示例

下面的示例演示了如何加载一个文本文件,将其解析为一个 Seq[String] 的 RDD, 构建一个 Word2Vec 实例,然后用输入数据拟合一个 Word2VecModel 。最后, 我们展示指定单词的前 40 个同义词。要运行此示例,请首先下载 text8 数据并将其提取到您选择的目录。 在这里,我们假设提取的文件是 text8 ,并且与您运行 spark shell 的目录相同。

有关API的更多详细信息,请参考 Word2Vec Python文档

from pyspark.mllib.feature import Word2Vec
inp = sc.textFile("data/mllib/sample_lda_data.txt").map(lambda row: row.split(" "))
word2vec = Word2Vec()
model = word2vec.fit(inp)
synonyms = model.findSynonyms('1', 5)
for word, cosine_distance in synonyms:
print("{}: {}".format(word, cosine_distance))
Find full example code at "examples/src/main/python/mllib/word2vec_example.py" in the Spark repo.

请参考 Word2Vec Scala文档 获取有关API的详细信息。

import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
val input = sc.textFile("data/mllib/sample_lda_data.txt").map(line => line.split(" ").toSeq)
val word2vec = new Word2Vec()
val model = word2vec.fit(input)
val synonyms = model.findSynonyms("1", 5)
for((synonym, cosineSimilarity) <- synonyms) {
println(s"$synonym $cosineSimilarity")
}
// 保存和加载模型
model.save(sc, "myModelPath")
val sameModel = Word2VecModel.load(sc, "myModelPath")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/mllib/Word2VecExample.scala" in the Spark repo.

标准化缩放器

通过对训练集中样本的列总结统计进行缩放至单位方差和/或去除均值来标准化特征。这是一个非常常见的预处理步骤。

例如,支持向量机的RBF核或L1和L2正则化线性模型通常在所有特征具有单位方差和/或零均值时效果更好。

标准化可以提高优化过程中的收敛速度,并且防止具有非常大方差的特征在模型训练过程中产生过大的影响。

模型拟合

StandardScaler 在构造函数中具有以下参数:

我们在 fit 方法中提供了 StandardScaler ,该方法可以接受 RDD[Vector] 的输入,学习摘要统计信息,然后 返回一个模型,该模型可以将输入数据集转换为单位标准差和/或零均值特征,具体取决于我们如何配置 StandardScaler

该模型实现了 VectorTransformer ,可以对 Vector 进行标准化,生成一个转换后的 Vector ,或者对 RDD[Vector] 进行标准化,生成一个转换后的 RDD[Vector]

请注意,如果一个特征的方差为零,它将在该特征的 Vector 中返回默认的 0.0 值。

例子

下面的示例演示了如何加载libsvm格式的数据集,并标准化特征,以便新的特征具有单位标准差和/或零均值。

有关API的更多详细信息,请参阅 StandardScaler Python文档

from pyspark.mllib.feature import StandardScaler
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.util import MLUtils
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
label = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)
scaler1 = StandardScaler().fit(features)
scaler2 = StandardScaler(withMean=True, withStd=True).fit(features)
# data1 将具有单位方差。
data1 = label.zip(scaler1.transform(features))
# data2 将具有单位方差和零均值。
data2 = label.zip(scaler2.transform(features.map(lambda x: Vectors.dense(x.toArray()))))
Find full example code at "examples/src/main/python/mllib/standard_scaler_example.py" in the Spark repo.

请参阅 StandardScaler Scala 文档 以获取有关 API 的详细信息。

import org.apache.spark.mllib.feature.{StandardScaler, StandardScalerModel}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val scaler1 = new StandardScaler().fit(data.map(x => x.features))
val scaler2 = new StandardScaler(withMean = true, withStd = true).fit(data.map(x => x.features))
// scaler3 是与 scaler2 相同的模型,将产生相同的转换
val scaler3 = new StandardScalerModel(scaler2.std, scaler2.mean)
// data1 将具有单位方差。
val data1 = data.map(x => (x.label, scaler1.transform(x.features)))
// data2 将具有单位方差和零均值。
val data2 = data.map(x => (x.label, scaler2.transform(Vectors.dense(x.features.toArray))))
Find full example code at "examples/src/main/scala/org/apache/spark/examples/mllib/StandardScalerExample.scala" in the Spark repo.

标准化器

归一化将单个样本缩放到单位 $L^p$ 范数。这是文本分类或聚类的常见操作。例如,两个 $L^2$ 归一化的 TF-IDF 向量的点积是向量的余弦相似度。

Normalizer 在构造函数中具有以下参数:

Normalizer 实现了 VectorTransformer ,它可以对 Vector 应用标准化,以产生一个变换后的 Vector ,或者对 RDD[Vector] 应用标准化,以产生一个变换后的 RDD[Vector]

注意,如果输入的范数为零,它将返回输入向量。

示例

下面的示例演示了如何加载libsvm格式的数据集,并使用$L^2$范数和$L^\infty$范数对特征进行归一化。

有关API的更多详细信息,请参阅 Normalizer Python 文档

from pyspark.mllib.feature import Normalizer
from pyspark.mllib.util import MLUtils
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
labels = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)
normalizer1 = Normalizer()
normalizer2 = Normalizer(p=float("inf"))
# data1 中的每个样本将使用 $L^2$ 范数进行标准化。
data1 = labels.zip(normalizer1.transform(features))
# data2 中的每个样本将使用 $L^\infty$ 范数进行标准化。
data2 = labels.zip(normalizer2.transform(features))
Find full example code at "examples/src/main/python/mllib/normalizer_example.py" in the Spark repo.

有关API的详细信息,请参考 Normalizer Scala 文档

import org.apache.spark.mllib.feature.Normalizer
import org.apache.spark.mllib.util.MLUtils
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val normalizer1 = new Normalizer()
val normalizer2 = new Normalizer(p = Double.PositiveInfinity)
// data1中的每个样本将使用 $L^2$ 范数进行标准化。
val data1 = data.map(x => (x.label, normalizer1.transform(x.features)))
// data2中的每个样本将使用 $L^\infty$ 范数进行标准化。
val data2 = data.map(x => (x.label, normalizer2.transform(x.features)))
Find full example code at "examples/src/main/scala/org/apache/spark/examples/mllib/NormalizerExample.scala" in the Spark repo.

卡方选择器

特征选择 尝试识别用于模型构建的相关特征。它减少了特征空间的大小,这可以提高速度和统计学习行为。

ChiSqSelector 实现 卡方特征选择。它在具有分类特征的标记数据上运行。ChiSqSelector 使用 卡方独立性检验 来决定选择哪些 特征。它支持五种选择方法: numTopFeatures percentile fpr fdr fwe

默认情况下,选择方法是 numTopFeatures ,默认的顶级特征数量设置为 50。 用户可以使用 setSelectorType 选择一种选择方法。

可以使用保留的验证集来调整选择的特征数量。

模型拟合

fit 方法接受 一个具有分类特征的 RDD[LabeledPoint] 作为输入,学习统计摘要,然后 返回一个 ChiSqSelectorModel ,可以将输入数据集转化为减少后的特征空间。 ChiSqSelectorModel 可以应用于 Vector 以生成减少后的 Vector ,或者应用于 RDD[Vector] 以生成减少后的 RDD[Vector]

请注意,用户也可以通过提供一个排序的特征索引数组(必须按升序排列)手动构建一个 ChiSqSelectorModel

示例

以下示例展示了ChiSqSelector的基本用法。使用的数据集具有特征矩阵,由每个特征的灰度值组成,这些值在0到255之间变化。

有关API的详细信息,请参阅 ChiSqSelector Scala 文档

import org.apache.spark.mllib.feature.ChiSqSelector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
// 加载一些libsvm格式的数据
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// 将数据离散化为16个相等的区间,因为ChiSqSelector需要分类特征
// 尽管特征是双精度数,ChiSqSelector将每个唯一值视为一个类别
val discretizedData = data.map { lp =>
LabeledPoint(lp.label, Vectors.dense(lp.features.toArray.map { x => (x / 16).floor }))
}
// 创建ChiSqSelector,该选择器将选择692个特征中的前50个
val selector = new ChiSqSelector(50)
// 创建ChiSqSelector模型(选择特征)
val transformer = selector.fit(discretizedData)
// 从每个特征向量中过滤前50个特征
val filteredData = discretizedData.map { lp =>
LabeledPoint(lp.label, transformer.transform(lp.features))
}
Find full example code at "examples/src/main/scala/org/apache/spark/examples/mllib/ChiSqSelectorExample.scala" in the Spark repo.

有关API的详细信息,请参阅 ChiSqSelector Java文档

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.feature.ChiSqSelector;
import org.apache.spark.mllib.feature.ChiSqSelectorModel;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
JavaRDD<LabeledPoint> points = MLUtils.loadLibSVMFile(jsc.sc(),
"data/mllib/sample_libsvm_data.txt").toJavaRDD().cache();
// 将数据分为16个相等的区间,因为ChiSqSelector需要分类特征
// 尽管特征是双精度数,但ChiSqSelector将每个唯一值视为一个类别
JavaRDD<LabeledPoint> discretizedData = points.map(lp -> {
double[] discretizedFeatures = new double[lp.features().size()];
for (int i = 0; i < lp.features().size(); ++i) {
discretizedFeatures[i] = Math.floor(lp.features().apply(i) / 16);
}
return new LabeledPoint(lp.label(), Vectors.dense(discretizedFeatures));
});
// 创建ChiSqSelector,选择692个特征中的前50个
ChiSqSelector selector = new ChiSqSelector(50);
// 创建ChiSqSelector模型(选择特征)
ChiSqSelectorModel transformer = selector.fit(discretizedData.rdd());
// 从每个特征向量中过滤出前50个特征
JavaRDD<LabeledPoint> filteredData = discretizedData.map(lp ->
new LabeledPoint(lp.label(), transformer.transform(lp.features())));
Find full example code at "examples/src/main/java/org/apache/spark/examples/mllib/JavaChiSqSelectorExample.java" in the Spark repo.

元素级乘法

ElementwiseProduct 将每个输入向量乘以提供的“权重”向量,使用逐元素相乘。换句话说,它通过标量乘数缩放数据集的每一列。这表示输入向量 v 和变换向量 scalingVec 之间的 Hadamard 积 ,以产生结果向量。

scalingVec 表示为“ w ”,此转换可以写为:

\[ \begin{pmatrix} v_1 \\ \vdots \\ v_N \end{pmatrix} \circ \begin{pmatrix} w_1 \\ \vdots \\ w_N \end{pmatrix} = \begin{pmatrix} v_1 w_1 \\ \vdots \\ v_N w_N \end{pmatrix} \]

ElementwiseProduct 在构造函数中具有以下参数:

ElementwiseProduct 实现了 VectorTransformer ,可以对 Vector 应用加权,从而生成一个转换后的 Vector ,或者对 RDD[Vector] 应用加权,从而生成一个转换后的 RDD[Vector]

示例

下面的例子演示了如何使用转换向量值来转换向量。

请参考 ElementwiseProduct Python 文档 以获取有关该API的更多详细信息。

from pyspark.mllib.feature import ElementwiseProduct
from pyspark.mllib.linalg import Vectors
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda x: [float(t) for t in x.split(" ")])
# 创建权重向量
transformingVector = Vectors.dense([0.0, 1.0, 2.0])
transformer = ElementwiseProduct(transformingVector)
# 批量转换
transformedData = transformer.transform(parsedData)
# 单行转换
transformedData2 = transformer.transform(parsedData.first())
Find full example code at "examples/src/main/python/mllib/elementwise_product_example.py" in the Spark repo.

有关API的详细信息,请参阅 ElementwiseProduct Scala文档

import org.apache.spark.mllib.feature.ElementwiseProduct
import org.apache.spark.mllib.linalg.Vectors
// 创建一些向量数据;对稀疏向量也适用
val data = sc.parallelize(Seq(Vectors.dense(1.0, 2.0, 3.0), Vectors.dense(4.0, 5.0, 6.0)))
val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct(transformingVector)
// 批量转换和逐行转换给出相同的结果:
val transformedData = transformer.transform(data)
val transformedData2 = data.map(x => transformer.transform(x))
Find full example code at "examples/src/main/scala/org/apache/spark/examples/mllib/ElementwiseProductExample.scala" in the Spark repo.

有关API的详细信息,请参阅 ElementwiseProduct Java文档

import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.feature.ElementwiseProduct;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// 创建一些向量数据;也适用于稀疏向量
JavaRDD<Vector> data = jsc.parallelize(Arrays.asList(
Vectors.dense(1.0, 2.0, 3.0), Vectors.dense(4.0, 5.0, 6.0)));
Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0);
ElementwiseProduct transformer = new ElementwiseProduct(transformingVector);
// 批处理转换和逐行转换给出相同的结果:
JavaRDD<Vector> transformedData = transformer.transform(data);
JavaRDD<Vector> transformedData2 = data.map(transformer::transform);
Find full example code at "examples/src/main/java/org/apache/spark/examples/mllib/JavaElementwiseProductExample.java" in the Spark repo.

主成分分析

一个特征转换器,使用PCA将向量投影到低维空间。您可以在 降维 中阅读详细信息。