特征提取与转换 - 基于RDD的API
词频-逆文档频率
注意 我们推荐使用基于DataFrame的API,详细信息请参见 ML用户指南中的TF-IDF 。
词频-逆文档频率 (TF-IDF)
是一种特征向量化方法,广泛用于文本挖掘,以反映一个术语对语料库中文档的重要性。
用
$t$
表示一个术语,用
$d$
表示一个文档,用
$D$
表示语料库。
术语频率
$TF(t, d)$
是术语
$t$
出现在文档
$d$
中的次数,
而文档频率
$DF(t, D)$
是包含术语
$t$
的文档数量。
如果我们仅使用术语频率来衡量重要性,很容易过分强调那些出现频率很高但对文档的信息量很少的术语,例如:“a”、“the”和“of”。
如果一个术语在语料库中频繁出现,这意味着它对特定文档没有特殊信息。
逆文档频率是测量一个术语提供信息量的数值:
\[
IDF(t, D) = \log \frac{|D| + 1}{DF(t, D) + 1},
\]
其中
$|D|$
是语料库中文档的总数。
由于使用了对数,如果一个术语出现在所有文档中,则其 IDF 值变为 0。
注意,为了避免对语料库外的术语进行零除法,应用了平滑项。
TF-IDF 测量简单地是 TF 和 IDF 的乘积:
\[
TFIDF(t, d, D) = TF(t, d) \cdot IDF(t, D).
\]
对于术语频率和文档频率的定义有几种变体。
在
spark.mllib
中,我们将 TF 和 IDF 分开,以使其灵活。
我们对词频的实现利用了
哈希技巧
。
一个原始特征通过应用哈希函数被映射到一个索引(术语)。
然后根据映射的索引计算词频。
这种方法避免了计算全球术语到索引映射的需要,
对于大型语料库,这可能是昂贵的,但它可能面临哈希碰撞的潜在问题,
其中不同的原始特征在哈希后可能变成相同的术语。
为了减少碰撞的机会,我们可以增加目标特征维度,即
哈希表的桶数量。
默认特征维度是
$2^{20} = 1,048,576$
。
注意:
spark.mllib
不提供文本分割的工具。我们建议用户访问
斯坦福NLP小组
和
scalanlp/chalk
。
TF和IDF在
HashingTF
和
IDF
中实现。
HashingTF
将一个列表的RDD作为输入。每条记录可以是一个字符串或其他类型的可迭代对象。
有关API的详细信息,请参阅
HashingTF
Python文档
。
from pyspark.mllib.feature import HashingTF, IDF
# 加载文档(每行一个)。
documents = sc.textFile("data/mllib/kmeans_data.txt").map(lambda line: line.split(" "))
hashingTF = HashingTF()
tf = hashingTF.transform(documents)
# 在应用 HashingTF 时,只需要对数据进行一次遍历,而应用 IDF 需要两次遍历:
# 第一次计算 IDF 向量,第二次通过 IDF 缩放词频。
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)
# spark.mllib 的 IDF 实现提供了一个忽略术语的选项
# 该术语在少于最小文档数的情况下出现。
# 在这种情况下,这些术语的 IDF 设置为 0。
# 可以通过将 minDocFreq 值传递给 IDF 构造函数来使用此功能。
idfIgnore = IDF(minDocFreq=2).fit(tf)
tfidfIgnore = idfIgnore.transform(tf)
TF 和 IDF 在
HashingTF
和
IDF
中实现。
HashingTF
以
RDD[Iterable[_]]
作为输入。
每条记录可以是一个字符串或其他类型的可迭代对象。
请参考
HashingTF
的Scala文档
以获取有关API的详细信息。
import org.apache.spark.mllib.feature.{HashingTF, IDF}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD
// 加载文档(每行一个)。
val documents: RDD[Seq[String]] = sc.textFile("data/mllib/kmeans_data.txt")
.map(_.split(" ").toSeq)
val hashingTF = new HashingTF()
val tf: RDD[Vector] = hashingTF.transform(documents)
// 在应用 HashingTF 时只需要对数据进行一次传递,而应用 IDF 需要两次传递:
// 首先计算 IDF 向量,其次通过 IDF 缩放词频。
tf.cache()
val idf = new IDF().fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)
// spark.mllib IDF 实现提供了一个选项,可以忽略出现在少于
// 最小文档数的术语。在这种情况下,这些术语的 IDF 设置为 0。
// 通过将 minDocFreq 值传递给 IDF 构造函数可以使用此功能。
val idfIgnore = new IDF(minDocFreq = 2).fit(tf)
val tfidfIgnore: RDD[Vector] = idfIgnore.transform(tf)
词向量模型
Word2Vec 计算单词的分布式向量表示。 分布式表示的主要优点是相似的单词在向量空间中靠近,这使得对新模式的泛化更容易,模型估计更稳健。分布式向量表示已被证明在许多自然语言处理应用中是有用的,例如命名实体识别、消歧义、解析、标记和机器翻译。
模型
在我们对Word2Vec的实现中,我们使用了跳字模型。跳字模型的训练目标是学习能够很好地预测同一句子中的上下文的词向量表示。
从数学上讲,给定一组训练词
$w_1, w_2, \dots, w_T$
,跳字模型的目标是最大化平均对数似然
\[
\frac{1}{T} \sum_{t = 1}^{T}\sum_{j=-k}^{j=k} \log p(w_{t+j} | w_t)
\]
其中 $k$ 是训练窗口的大小。
在跳字模型中,每个单词 $w$ 都与两个向量 $u_w$ 和 $v_w$ 相关联,分别表示单词和上下文的向量表示。给定单词 $w_j$ 正确预测单词 $w_i$ 的概率由 softmax 模型决定,其形式为
\[
p(w_i | w_j ) = \frac{\exp(u_{w_i}^{\top}v_{w_j})}{\sum_{l=1}^{V} \exp(u_l^{\top}v_{w_j})}
\]
其中 $V$ 是词汇大小。
带有softmax的skip-gram模型代价高昂,因为计算$\log p(w_i | w_j)$的成本与$V$成正比,而$V$的数量级通常可以达到百万级。为了加速Word2Vec的训练,我们使用了层次softmax,这将计算$\log p(w_i | w_j)$的复杂度降低到了$O(\log(V))$
示例
下面的示例演示了如何加载一个文本文件,将其解析为一个
Seq[String]
的 RDD,
构建一个
Word2Vec
实例,然后用输入数据拟合一个
Word2VecModel
。最后,
我们展示指定单词的前 40 个同义词。要运行此示例,请首先下载
text8
数据并将其提取到您选择的目录。
在这里,我们假设提取的文件是
text8
,并且与您运行 spark shell 的目录相同。
有关API的更多详细信息,请参考
Word2Vec
Python文档
。
from pyspark.mllib.feature import Word2Vec
inp = sc.textFile("data/mllib/sample_lda_data.txt").map(lambda row: row.split(" "))
word2vec = Word2Vec()
model = word2vec.fit(inp)
synonyms = model.findSynonyms('1', 5)
for word, cosine_distance in synonyms:
print("{}: {}".format(word, cosine_distance))
请参考
Word2Vec
Scala文档
获取有关API的详细信息。
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
val input = sc.textFile("data/mllib/sample_lda_data.txt").map(line => line.split(" ").toSeq)
val word2vec = new Word2Vec()
val model = word2vec.fit(input)
val synonyms = model.findSynonyms("1", 5)
for((synonym, cosineSimilarity) <- synonyms) {
println(s"$synonym $cosineSimilarity")
}
// 保存和加载模型
model.save(sc, "myModelPath")
val sameModel = Word2VecModel.load(sc, "myModelPath")
标准化缩放器
通过对训练集中样本的列总结统计进行缩放至单位方差和/或去除均值来标准化特征。这是一个非常常见的预处理步骤。
例如,支持向量机的RBF核或L1和L2正则化线性模型通常在所有特征具有单位方差和/或零均值时效果更好。
标准化可以提高优化过程中的收敛速度,并且防止具有非常大方差的特征在模型训练过程中产生过大的影响。
模型拟合
StandardScaler
在构造函数中具有以下参数:
-
withMean默认值为 False。在缩放之前使用均值对数据进行中心化。它将生成稠密输出,因此在应用于稀疏输入时要小心。 -
withStd默认值为 True。将数据缩放到单位标准差。
我们在
fit
方法中提供了
StandardScaler
,该方法可以接受
RDD[Vector]
的输入,学习摘要统计信息,然后
返回一个模型,该模型可以将输入数据集转换为单位标准差和/或零均值特征,具体取决于我们如何配置
StandardScaler
。
该模型实现了
VectorTransformer
,可以对
Vector
进行标准化,生成一个转换后的
Vector
,或者对
RDD[Vector]
进行标准化,生成一个转换后的
RDD[Vector]
。
请注意,如果一个特征的方差为零,它将在该特征的
Vector
中返回默认的
0.0
值。
例子
下面的示例演示了如何加载libsvm格式的数据集,并标准化特征,以便新的特征具有单位标准差和/或零均值。
有关API的更多详细信息,请参阅
StandardScaler
Python文档
。
from pyspark.mllib.feature import StandardScaler
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.util import MLUtils
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
label = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)
scaler1 = StandardScaler().fit(features)
scaler2 = StandardScaler(withMean=True, withStd=True).fit(features)
# data1 将具有单位方差。
data1 = label.zip(scaler1.transform(features))
# data2 将具有单位方差和零均值。
data2 = label.zip(scaler2.transform(features.map(lambda x: Vectors.dense(x.toArray()))))
请参阅
StandardScaler
Scala 文档
以获取有关 API 的详细信息。
import org.apache.spark.mllib.feature.{StandardScaler, StandardScalerModel}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val scaler1 = new StandardScaler().fit(data.map(x => x.features))
val scaler2 = new StandardScaler(withMean = true, withStd = true).fit(data.map(x => x.features))
// scaler3 是与 scaler2 相同的模型,将产生相同的转换
val scaler3 = new StandardScalerModel(scaler2.std, scaler2.mean)
// data1 将具有单位方差。
val data1 = data.map(x => (x.label, scaler1.transform(x.features)))
// data2 将具有单位方差和零均值。
val data2 = data.map(x => (x.label, scaler2.transform(Vectors.dense(x.features.toArray))))
标准化器
归一化将单个样本缩放到单位 $L^p$ 范数。这是文本分类或聚类的常见操作。例如,两个 $L^2$ 归一化的 TF-IDF 向量的点积是向量的余弦相似度。
Normalizer
在构造函数中具有以下参数:
-
p$L^p$ 空间中的归一化,默认情况下 $p = 2$。
Normalizer
实现了
VectorTransformer
,它可以对
Vector
应用标准化,以产生一个变换后的
Vector
,或者对
RDD[Vector]
应用标准化,以产生一个变换后的
RDD[Vector]
。
注意,如果输入的范数为零,它将返回输入向量。
示例
下面的示例演示了如何加载libsvm格式的数据集,并使用$L^2$范数和$L^\infty$范数对特征进行归一化。
有关API的更多详细信息,请参阅
Normalizer
Python 文档
。
from pyspark.mllib.feature import Normalizer
from pyspark.mllib.util import MLUtils
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
labels = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)
normalizer1 = Normalizer()
normalizer2 = Normalizer(p=float("inf"))
# data1 中的每个样本将使用 $L^2$ 范数进行标准化。
data1 = labels.zip(normalizer1.transform(features))
# data2 中的每个样本将使用 $L^\infty$ 范数进行标准化。
data2 = labels.zip(normalizer2.transform(features))
有关API的详细信息,请参考
Normalizer
Scala 文档
。
import org.apache.spark.mllib.feature.Normalizer
import org.apache.spark.mllib.util.MLUtils
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val normalizer1 = new Normalizer()
val normalizer2 = new Normalizer(p = Double.PositiveInfinity)
// data1中的每个样本将使用 $L^2$ 范数进行标准化。
val data1 = data.map(x => (x.label, normalizer1.transform(x.features)))
// data2中的每个样本将使用 $L^\infty$ 范数进行标准化。
val data2 = data.map(x => (x.label, normalizer2.transform(x.features)))
卡方选择器
特征选择 尝试识别用于模型构建的相关特征。它减少了特征空间的大小,这可以提高速度和统计学习行为。
ChiSqSelector
实现
卡方特征选择。它在具有分类特征的标记数据上运行。ChiSqSelector 使用
卡方独立性检验
来决定选择哪些
特征。它支持五种选择方法:
numTopFeatures
、
percentile
、
fpr
、
fdr
、
fwe
:
-
numTopFeatures根据卡方检验选择固定数量的顶部特征。这类似于选择具有最高预测能力的特征。 -
percentile类似于numTopFeatures,但选择的是所有特征的一部分,而不是固定数量。 -
fpr选择所有 p 值低于阈值的特征,从而控制选择的假阳性率。 -
fdr使用 Benjamini-Hochberg 程序 选择所有假发现率低于阈值的特征。 -
fwe选择所有 p 值低于阈值的特征。阈值由 1/numFeatures 缩放,从而控制选择的家庭错误率。
默认情况下,选择方法是
numTopFeatures
,默认的顶级特征数量设置为 50。 用户可以使用
setSelectorType
选择一种选择方法。
可以使用保留的验证集来调整选择的特征数量。
模型拟合
该
fit
方法接受
一个具有分类特征的
RDD[LabeledPoint]
作为输入,学习统计摘要,然后
返回一个
ChiSqSelectorModel
,可以将输入数据集转化为减少后的特征空间。
ChiSqSelectorModel
可以应用于
Vector
以生成减少后的
Vector
,或者应用于
RDD[Vector]
以生成减少后的
RDD[Vector]
。
请注意,用户也可以通过提供一个排序的特征索引数组(必须按升序排列)手动构建一个
ChiSqSelectorModel
。
示例
以下示例展示了ChiSqSelector的基本用法。使用的数据集具有特征矩阵,由每个特征的灰度值组成,这些值在0到255之间变化。
有关API的详细信息,请参阅
ChiSqSelector
Scala 文档
。
import org.apache.spark.mllib.feature.ChiSqSelector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
// 加载一些libsvm格式的数据
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// 将数据离散化为16个相等的区间,因为ChiSqSelector需要分类特征
// 尽管特征是双精度数,ChiSqSelector将每个唯一值视为一个类别
val discretizedData = data.map { lp =>
LabeledPoint(lp.label, Vectors.dense(lp.features.toArray.map { x => (x / 16).floor }))
}
// 创建ChiSqSelector,该选择器将选择692个特征中的前50个
val selector = new ChiSqSelector(50)
// 创建ChiSqSelector模型(选择特征)
val transformer = selector.fit(discretizedData)
// 从每个特征向量中过滤前50个特征
val filteredData = discretizedData.map { lp =>
LabeledPoint(lp.label, transformer.transform(lp.features))
}
有关API的详细信息,请参阅
ChiSqSelector
Java文档
。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.feature.ChiSqSelector;
import org.apache.spark.mllib.feature.ChiSqSelectorModel;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
JavaRDD<LabeledPoint> points = MLUtils.loadLibSVMFile(jsc.sc(),
"data/mllib/sample_libsvm_data.txt").toJavaRDD().cache();
// 将数据分为16个相等的区间,因为ChiSqSelector需要分类特征
// 尽管特征是双精度数,但ChiSqSelector将每个唯一值视为一个类别
JavaRDD<LabeledPoint> discretizedData = points.map(lp -> {
double[] discretizedFeatures = new double[lp.features().size()];
for (int i = 0; i < lp.features().size(); ++i) {
discretizedFeatures[i] = Math.floor(lp.features().apply(i) / 16);
}
return new LabeledPoint(lp.label(), Vectors.dense(discretizedFeatures));
});
// 创建ChiSqSelector,选择692个特征中的前50个
ChiSqSelector selector = new ChiSqSelector(50);
// 创建ChiSqSelector模型(选择特征)
ChiSqSelectorModel transformer = selector.fit(discretizedData.rdd());
// 从每个特征向量中过滤出前50个特征
JavaRDD<LabeledPoint> filteredData = discretizedData.map(lp ->
new LabeledPoint(lp.label(), transformer.transform(lp.features())));
元素级乘法
ElementwiseProduct
将每个输入向量乘以提供的“权重”向量,使用逐元素相乘。换句话说,它通过标量乘数缩放数据集的每一列。这表示输入向量
v
和变换向量
scalingVec
之间的
Hadamard 积
,以产生结果向量。
将
scalingVec
表示为“
w
”,此转换可以写为:
\[ \begin{pmatrix}
v_1 \\
\vdots \\
v_N
\end{pmatrix} \circ \begin{pmatrix}
w_1 \\
\vdots \\
w_N
\end{pmatrix}
= \begin{pmatrix}
v_1 w_1 \\
\vdots \\
v_N w_N
\end{pmatrix}
\]
ElementwiseProduct
在构造函数中具有以下参数:
-
scalingVec: 变换向量。
ElementwiseProduct
实现了
VectorTransformer
,可以对
Vector
应用加权,从而生成一个转换后的
Vector
,或者对
RDD[Vector]
应用加权,从而生成一个转换后的
RDD[Vector]
。
示例
下面的例子演示了如何使用转换向量值来转换向量。
请参考
ElementwiseProduct
Python 文档
以获取有关该API的更多详细信息。
from pyspark.mllib.feature import ElementwiseProduct
from pyspark.mllib.linalg import Vectors
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda x: [float(t) for t in x.split(" ")])
# 创建权重向量
transformingVector = Vectors.dense([0.0, 1.0, 2.0])
transformer = ElementwiseProduct(transformingVector)
# 批量转换
transformedData = transformer.transform(parsedData)
# 单行转换
transformedData2 = transformer.transform(parsedData.first())
有关API的详细信息,请参阅
ElementwiseProduct
Scala文档
。
import org.apache.spark.mllib.feature.ElementwiseProduct
import org.apache.spark.mllib.linalg.Vectors
// 创建一些向量数据;对稀疏向量也适用
val data = sc.parallelize(Seq(Vectors.dense(1.0, 2.0, 3.0), Vectors.dense(4.0, 5.0, 6.0)))
val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct(transformingVector)
// 批量转换和逐行转换给出相同的结果:
val transformedData = transformer.transform(data)
val transformedData2 = data.map(x => transformer.transform(x))
有关API的详细信息,请参阅
ElementwiseProduct
Java文档
。
import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.feature.ElementwiseProduct;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// 创建一些向量数据;也适用于稀疏向量
JavaRDD<Vector> data = jsc.parallelize(Arrays.asList(
Vectors.dense(1.0, 2.0, 3.0), Vectors.dense(4.0, 5.0, 6.0)));
Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0);
ElementwiseProduct transformer = new ElementwiseProduct(transformingVector);
// 批处理转换和逐行转换给出相同的结果:
JavaRDD<Vector> transformedData = transformer.transform(data);
JavaRDD<Vector> transformedData2 = data.map(transformer::transform);
主成分分析
一个特征转换器,使用PCA将向量投影到低维空间。您可以在 降维 中阅读详细信息。