协同过滤 - 基于RDD的API
协同过滤
协同过滤
通常用于推荐系统。这些技术旨在填补用户-项目关联矩阵中的缺失条目。
spark.mllib
目前支持
基于模型的协同过滤,其中用户和产品由一小组潜在因素描述,这些因素可用于预测缺失条目。
spark.mllib
使用
交替最小二乘法
(ALS)
算法来学习这些潜在因素。
spark.mllib
中的实现具有以下参数:
- numBlocks 是用于并行计算的块数量(设置为 -1 以自动配置)。
- rank 是要使用的特征数量(也称为潜在因子的数量)。
- iterations 是要运行的 ALS 的迭代次数。ALS 通常在 20 次迭代或更少时收敛到合理的解决方案。
- lambda 指定 ALS 中的正则化参数。
- implicitPrefs 指定是否使用 显式反馈 的 ALS 变体或适应 隐式反馈 数据的变体。
- alpha 是适用于 ALS 的隐式反馈变体的参数,控制 基线 对偏好观察的置信度。
显式反馈与隐式反馈
基于矩阵分解的协同过滤的标准方法将用户-项目矩阵中的条目视为用户对项目的 显式 偏好,例如,用户对电影的评分。
在许多现实世界的使用案例中,常常只能获得
隐式反馈
(例如观看次数、点击次数、购买、点赞、分享等)。在
spark.mllib
中处理这些数据的方法来源于
隐式反馈数据集的协同过滤
。本质上,这种方法并不是直接建模评分矩阵,而是将数据视为代表用户行为观察中的
强度
的数字(例如点击次数或某人观看电影的累计时长)。这些数字与观察到的用户偏好的置信度级别相关,而不是对项目给出的明确评分。模型随后尝试找到潜在因素,以便预测用户对项目的预期偏好。
正则化参数的缩放
自v1.1以来,在解决每个最小二乘问题时,我们通过用户生成的评分数量来调整正则化参数
lambda
,在更新用户因素时,或者通过产品收到的评分数量来调整产品因素。在论文“
Large-Scale Parallel Collaborative Filtering for the Netflix Prize
”中讨论了这种方法,称为“ALS-WR”。它使得
lambda
对数据集规模的依赖性降低,因此我们可以将从样本子集中学习到的最佳参数应用于整个数据集,并期待类似的性能。
示例
在下面的示例中,我们加载评分数据。每一行由一个用户、一个产品和一个评分组成。 我们使用默认的 ALS.train() 方法,它假定评分是显式的。我们通过测量评分预测的均方误差来评估推荐。
有关API的更多详情,请参阅
ALS
Python文档
。
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
# 加载和解析数据
data = sc.textFile("data/mllib/als/test.data")
ratings = data.map(lambda l: l.split(','))\
.map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
# 使用交替最小二乘法构建推荐模型
rank = 10
numIterations = 10
model = ALS.train(ratings, rank, numIterations)
# 在训练数据上评估模型
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("均方误差 = " + str(MSE))
# 保存和加载模型
model.save(sc, "target/tmp/myCollaborativeFilter")
sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
如果评分矩阵是从其他信息源得出的(即它是从其他信号推断的),您可以使用trainImplicit方法以获得更好的结果。
# 使用基于隐式评分的交替最小二乘法构建推荐模型
model = ALS.trainImplicit(ratings, rank, numIterations, alpha=0.01)
在下面的示例中,我们加载评分数据。每一行包含一个用户、一个产品和一个评分。 我们使用默认的 ALS.train() 方法,假设评分是显式的。我们通过测量评分预测的均方误差来评估推荐模型。
有关API的更多细节,请参阅
ALS
Scala文档
。
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import org.apache.spark.mllib.recommendation.Rating
// 加载和解析数据
val data = sc.textFile("data/mllib/als/test.data")
val ratings = data.map(_.split(',') match { case Array(user, item, rate) =>
Rating(user.toInt, item.toInt, rate.toDouble)
})
// 使用 ALS 构建推荐模型
val rank = 10
val numIterations = 10
val model = ALS.train(ratings, rank, numIterations, 0.01)
// 在评分数据上评估模型
val usersProducts = ratings.map { case Rating(user, product, rate) =>
(user, product)
}
val predictions =
model.predict(usersProducts).map { case Rating(user, product, rate) =>
((user, product), rate)
}
val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>
((user, product), rate)
}.join(predictions)
val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =>
val err = (r1 - r2)
err * err
}.mean()
println(s"均方误差 = $MSE")
// 保存和加载模型
model.save(sc, "target/tmp/myCollaborativeFilter")
val sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
如果评分矩阵是从其他信息来源得出的(即,它是从其他信号推断出来的),您可以使用
trainImplicit
方法来获得更好的结果。
val alpha = 0.01
val lambda = 0.01
val model = ALS.trainImplicit(ratings, rank, numIterations, lambda, alpha)
MLlib的所有方法使用对Java友好的类型,因此您可以在Java中以与Scala相同的方式导入和调用它们。唯一的警告是,这些方法接受Scala RDD对象,而Spark Java API使用一个单独的
JavaRDD
类。您可以通过在您的
JavaRDD
对象上调用
.rdd()
将Java RDD转换为Scala RDD。下面给出一个自包含的应用示例,该示例等效于提供的Scala示例:
有关API的更多详细信息,请参阅
ALS
Java文档
。
import scala.Tuple2;
import org.apache.spark.api.java.*;
import org.apache.spark.mllib.recommendation.ALS;
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;
import org.apache.spark.SparkConf;
SparkConf conf = new SparkConf().setAppName("Java Collaborative Filtering Example");
JavaSparkContext jsc = new JavaSparkContext(conf);
// 加载并解析数据
String path = "data/mllib/als/test.data";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Rating> ratings = data.map(s -> {
String[] sarray = s.split(",");
return new Rating(Integer.parseInt(sarray[0]),
Integer.parseInt(sarray[1]),
Double.parseDouble(sarray[2]));
});
// 构建使用 ALS 的推荐模型
int rank = 10;
int numIterations = 10;
MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01);
// 在评分数据上评估模型
JavaRDD<Tuple2<Object, Object>> userProducts =
ratings.map(r -> new Tuple2<>(r.user(), r.product()));
JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = JavaPairRDD.fromJavaRDD(
model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD()
.map(r -> new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating()))
);
JavaRDD<Tuple2Double, Double>> ratesAndPreds = JavaPairRDD.fromJavaRDD(
ratings.map(r -> new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating())))
.join(predictions).values();
double MSE = ratesAndPreds.mapToDouble(pair -> {
double err = pair._1() - pair._2();
return err * err;
}).mean();
System.out.println("均方误差 = " + MSE);
// 保存和加载模型
model.save(jsc.sc(), "target/tmp/myCollaborativeFilter");
MatrixFactorizationModel sameModel = MatrixFactorizationModel.load(jsc.sc(),
"target/tmp/myCollaborativeFilter");
为了运行上述应用程序,请遵循Spark快速入门指南中 独立应用程序 部分提供的说明。确保在构建文件中将 spark-mllib 作为依赖项包含在内。
教程
来自Spark峰会2014的
训练练习
包括一个动手教程,
使用
spark.mllib
的个性化电影推荐
。