SparkR(Spark上的R)
- 概述
- SparkDataFrame
- 机器学习
- R 和 Spark 之间的数据类型映射
- 结构化流
- SparkR 中的 Apache Arrow
- R 函数名称冲突
- 迁移指南
概述
SparkR 是一个 R 包,提供了一个轻量级的前端,以便从 R 使用 Apache Spark。 在 Spark 3.5.3 中,SparkR 提供了一个分布式数据框实现,支持选择、过滤、聚合等操作(类似于 R 数据框, dplyr ),但适用于大数据集。SparkR 还支持使用 MLlib 的分布式机器学习。
SparkDataFrame
A SparkDataFrame 是组织成命名列的分布式数据集合。它在概念上等同于关系数据库中的表或 R 中的数据框,但在内部具有更丰富的优化。SparkDataFrames 可以从多种来源构建,例如:结构化数据文件、Hive 中的表、外部数据库或现有的本地 R 数据框。
此页面上的所有示例都使用R或Spark发行版中包含的示例数据,并且可以使用
./bin/sparkR
shell运行。
启动:SparkSession
SparkR的入口点是
SparkSession
,它将您的R程序连接到Spark集群。您可以使用
sparkR.session
创建一个
SparkSession
,并传入应用程序名称、依赖的spark包等选项。此外,您还可以通过
SparkSession
使用SparkDataFrames。如果您是从
sparkR
shell工作,
SparkSession
应该已经为您创建,您无需调用
sparkR.session
。
sparkR.session()
从 RStudio 启动
你也可以从RStudio启动SparkR。你可以通过RStudio、R shell、Rscript或其他R IDE将你的R程序连接到Spark集群。首先,确保在环境中设置了SPARK_HOME(你可以检查
Sys.getenv
),加载SparkR包,并调用
sparkR.session
如下。它将检查Spark的安装,如果未找到,将自动下载并缓存。或者,你也可以手动运行
install.spark
。
除了调用
sparkR.session
,你还可以指定某些 Spark 驱动程序属性。通常这些
应用程序属性
和
运行环境
不能以编程方式设置,因为驱动程序 JVM 进程已经启动,在这种情况下,SparkR 会为你处理此事。要设置它们,将它们作为其他配置属性传递给
sparkConfig
参数到
sparkR.session()
。
if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
Sys.setenv(SPARK_HOME = "/home/spark")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))
以下Spark驱动程序属性可以在
sparkConfig
中通过
sparkR.session
从RStudio进行设置:
| 属性名称 | 属性组 |
spark-submit
等效项
|
|---|---|---|
spark.master
|
应用程序属性 |
--master
|
spark.kerberos.keytab
|
应用程序属性 |
--keytab
|
spark.kerberos.principal
|
应用程序属性 |
--principal
|
spark.driver.memory
|
应用程序属性 |
--driver-memory
|
spark.driver.extraClassPath
|
运行时环境 |
--driver-class-path
|
spark.driver.extraJavaOptions
|
运行时环境 |
--driver-java-options
|
spark.driver.extraLibraryPath
|
运行时环境 |
--driver-library-path
|
创建 SparkDataFrames
通过一个
SparkSession
,应用程序可以从本地 R 数据框创建
SparkDataFrame
,从一个
Hive 表
,或从其他
数据源
。
来自本地数据框
创建数据框的最简单方法是将本地 R 数据框转换为 SparkDataFrame。具体来说,我们可以使用
as.DataFrame
或
createDataFrame
并传入本地 R 数据框来创建 SparkDataFrame。作为一个例子,以下代码基于 R 的
faithful
数据集创建一个
SparkDataFrame
。
df <- as.DataFrame(faithful)
# 显示 SparkDataFrame 的第一部分
head(df)
## 爆发等待时间
##1 3.600 79
##2 1.800 54
##3 3.333 74
来自数据源
SparkR 支持通过
SparkDataFrame
接口操作多种数据源。 本节描述了使用数据源加载和保存数据的一般方法。 您可以查看 Spark SQL 编程指南以获取有关内置数据源的更多
特定选项
。
从数据源创建SparkDataFrames的一般方法是
read.df
。该方法接受要加载的文件路径和数据源类型,并将自动使用当前活动的SparkSession。
SparkR支持本地读取JSON、CSV和Parquet文件,并通过可从
第三方项目
获得的包,您可以找到流行文件格式(如Avro)的数据源连接器。这些包可以通过在
spark-submit
或
sparkR
命令中指定
--packages
来添加,或者在交互式R Shell或RStudio中使用
sparkPackages
参数初始化SparkSession。
sparkR.session(sparkPackages = "org.apache.spark:spark-avro_2.12:3.5.3")
我们可以通过一个示例 JSON 输入文件来查看如何使用数据源。请注意,这里使用的文件是 不是 一个典型的 JSON 文件。文件中的每一行必须包含一个独立的、自包含的有效 JSON 对象。有关更多信息,请参阅 JSON Lines 文本格式,也称为换行分隔的 JSON 。因此,常规的多行 JSON 文件通常会失败。
people <- read.df("./examples/src/main/resources/people.json", "json")
head(people)
## 年龄 姓名
##1 NA Michael
##2 30 Andy
##3 19 Justin
# SparkR 自动推断 JSON 文件的结构
printSchema(people)
# 根
# |-- 年龄: long (可为空 = true)
# |-- 姓名: string (可为空 = true)
# 同样,可以使用 read.json 读取多个文件
people <- read.json(c("./examples/src/main/resources/people.json", "./examples/src/main/resources/people2.json"))
数据源API原生支持CSV格式的输入文件。有关更多信息,请参阅SparkR read.df API文档。
df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")
The data sources API can also be used to save out SparkDataFrames into multiple file formats. For example, we can save the SparkDataFrame from the previous example
to a Parquet file using
write.df
.
write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite")
来自 Hive 表
您还可以从Hive表创建SparkDataFrames。为此,我们需要创建一个具有Hive支持的SparkSession,以便访问Hive MetaStore中的表。请注意,Spark应该已经构建了
Hive支持
,更多详细信息可以在
SQL编程指南
中找到。在SparkR中,默认情况下将尝试创建一个启用Hive支持的SparkSession(
enableHiveSupport = TRUE
)。
sparkR.session()
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# 查询可以用HiveQL表达。
results <- sql("FROM src SELECT key, value")
# 结果现在是一个SparkDataFrame
head(results)
## key value
## 1 238 val_238
## 2 86 val_86
## 3 311 val_311
SparkDataFrame 操作
SparkDataFrames支持多种函数以进行结构化数据处理。 在这里,我们包含了一些基本示例,完整列表可以在 API 文档中找到:
选择行,列
# 创建 SparkDataFrame
df <- as.DataFrame(faithful)
# 获取 SparkDataFrame 的基本信息
df
## SparkDataFrame[eruptions:double, waiting:double]
# 仅选择 "eruptions" 列
head(select(df, df$eruptions))
## eruptions
##1 3.600
##2 1.800
##3 3.333
# 你也可以将列名作为字符串传入
head(select(df, "eruptions"))
# 过滤 SparkDataFrame,仅保留等待时间少于 50 分钟的行
head(filter(df, dfwaiting < 50))
## eruptions waiting
##1 1.750 47
##2 1.750 47
##3 1.867 48
分组,聚合
SparkR 数据框支持在分组后聚合数据的许多常用函数。例如,我们可以计算如下所示的
faithful
数据集中的
waiting
时间的直方图
# 我们使用 `n` 操作符来计算每个等待时间出现的次数
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
## waiting count
##1 70 4
##2 67 1
##3 69 2
# 我们还可以对聚合结果进行排序,以获取最常见的等待时间
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
## waiting count
##1 78 15
##2 83 14
##3 81 13
除了标准聚合,SparkR 支持
OLAP cube
操作符
cube
:
head(agg(cube(df, "cyl", "disp", "gear"), avg(df$mpg)))
## cyl disp gear avg(mpg)
##1 NA 140.8 4 22.8
##2 4 75.7 4 30.4
##3 8 400.0 3 19.2
##4 8 318.0 3 15.5
##5 NA 351.0 NA 15.8
##6 NA 275.8 NA 16.3
和
rollup
:
head(agg(rollup(df, "cyl", "disp", "gear"), avg(df$mpg)))
## cyl disp gear avg(mpg)
##1 4 75.7 4 30.4
##2 8 400.0 3 19.2
##3 8 318.0 3 15.5
##4 4 78.7 NA 32.4
##5 8 304.0 3 15.2
##6 4 79.0 NA 27.3
对列的操作
SparkR 还提供了一些可以直接应用于列的数据处理和聚合中的函数。下面的示例展示了基本算术函数的使用。
# 将等待时间从小时转换为秒。
# 注意,我们可以将其分配给同一个 SparkDataFrame 中的新列
df$waiting_secs <- df$waiting * 60
head(df)
## eruptions waiting waiting_secs
##1 3.600 79 4740
##2 1.800 54 3240
##3 3.333 74 4440
应用用户自定义函数
在SparkR中,我们支持几种类型的用户自定义函数:
使用
dapply
或
dapplyCollect
在大数据集上运行给定函数
分组应用函数
对每个
SparkDataFrame
的分区应用一个函数。要应用于每个
SparkDataFrame
分区的函数应该只有一个参数,传递给每个分区相应的
data.frame
。函数的输出应该是一个
data.frame
。模式指定了结果的
SparkDataFrame
的行格式。它必须与返回值的
数据类型
匹配。
# 将等待时间从小时转换为秒。
# 请注意,我们可以将 UDF 应用于 DataFrame。
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
structField("waiting_secs", "double"))
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
## eruptions waiting waiting_secs
##1 3.600 79 4740
##2 1.800 54 3240
##3 3.333 74 4440
##4 2.283 62 3720
##5 4.533 85 5100
##6 2.883 55 3300
dapplyCollect
像
dapply
一样,将一个函数应用于每个
SparkDataFrame
的分区并收集结果。函数的输出应该是一个
data.frame
。但是,不需要传递Schema。请注意,如果UDF在所有分区上运行的输出无法传送到驱动程序并适合驱动程序内存,
dapplyCollect
可能会失败。
# 将等待时间从小时转换为秒。
# 注意,我们可以将 UDF 应用于 DataFrame 并返回 R 的 data.frame
ldf <- dapplyCollect(
df,
function(x) {
x <- cbind(x, "waiting_secs" = x$waiting * 60)
})
head(ldf, 3)
## 喷发 等待 waiting_secs
##1 3.600 79 4740
##2 1.800 54 3240
##3 3.333 74 4440
在大型数据集上运行给定函数,通过输入列进行分组并使用
gapply
或
gapplyCollect
gapply
对每个
SparkDataFrame
的组应用一个函数。该函数将应用于
SparkDataFrame
的每个组,并且应该只有两个参数:分组键和对应于该键的 R
data.frame
。组是从
SparkDataFrame
的列中选择的。函数的输出应该是一个
data.frame
。 schema 指定了生成的
SparkDataFrame
的行格式。它必须代表 R 函数的输出 schema,基于 Spark
数据类型
。返回的
data.frame
的列名由用户设置。
# 确定等待时间最长的六次喷发时间(以分钟为单位)。
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
},
schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
## waiting max_eruption
##1 64 5.100
##2 69 5.067
##3 71 5.033
##4 87 5.000
##5 63 4.933
##6 89 4.900
gapplyCollect
与
gapply
类似,将一个函数应用到
SparkDataFrame
的每个分区,并将结果收集回 R 的 data.frame。函数的输出应该是一个
data.frame
。但是,模式不需要被传递。请注意,如果在所有分区上运行的 UDF 的输出无法拉取到驱动程序并适合驱动程序内存,则
gapplyCollect
可能会失败。
# 确定等待时间最长的六次喷发时间(分钟)。
result <- gapplyCollect(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
colnames(y) <- c("waiting", "max_eruption")
y
})
head(result[order(result$max_eruption, decreasing = TRUE), ])
## waiting max_eruption
##1 64 5.100
##2 69 5.067
##3 71 5.033
##4 87 5.000
##5 63 4.933
##6 89 4.900
使用
spark.lapply
分布式运行本地 R 函数
spark.lapply
类似于
lapply
在原生 R 中,
spark.lapply
在元素列表上运行一个函数并使用 Spark 分配计算。以类似于
doParallel
或
lapply
的方式对列表元素应用一个函数。所有计算的结果应该适合在单个机器上。如果不是这样,它们可以执行类似
df <- createDataFrame(list)
的操作,然后使用
dapply
# 执行多个模型的分布式训练,使用 spark.lapply。这里,我们传递
# 一个只读参数列表,指定广义线性模型的族。
families <- c("gaussian", "poisson")
train <- function(family) {
model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
summary(model)
}
# 返回模型总结的列表
model.summaries <- spark.lapply(families, train)
# 打印每个模型的总结
print(model.summaries)
主动执行
如果启用即时执行,当创建
SparkDataFrame
时,数据会立即返回给 R 客户端。默认情况下,未启用即时执行,可以通过在启动
SparkSession
时将配置属性
spark.sql.repl.eagerEval.enabled
设置为
true
来启用。
要显示的最大行数和每列数据的最大字符数可以通过
spark.sql.repl.eagerEval.maxNumRows
和
spark.sql.repl.eagerEval.truncate
配置属性进行控制。这些属性仅在启用急切执行时有效。如果这些属性没有被明确设置,默认情况下,将显示最多 20 行和每列最多 20 个字符。
# 启动启用了急切执行的 Spark 会话
sparkR.session(master = "local[*]",
sparkConfig = list(spark.sql.repl.eagerEval.enabled = "true",
spark.sql.repl.eagerEval.maxNumRows = as.integer(10)))
# 创建一个分组和排序的 SparkDataFrame
df <- createDataFrame(faithful)
df2 <- arrange(summarize(groupBy(df, df$waiting), count = n(df$waiting)), "waiting")
# 类似于 R 的 data.frame,显示返回的数据,而不是 SparkDataFrame 类字符串
df2
##+-------+-----+
##|waiting|count|
##+-------+-----+
##| 43.0| 1|
##| 45.0| 3|
##| 46.0| 5|
##| 47.0| 4|
##| 48.0| 3|
##| 49.0| 5|
##| 50.0| 5|
##| 51.0| 6|
##| 52.0| 5|
##| 53.0| 7|
##+-------+-----+
##仅显示前 10 行
请注意,要在
sparkR
shell 中启用急切执行,请将
spark.sql.repl.eagerEval.enabled=true
配置属性添加到
--conf
选项中。
从SparkR运行SQL查询
A SparkDataFrame还可以在Spark SQL中注册为临时视图,这允许您对其数据运行SQL查询。
sql
函数使应用程序能够以编程方式运行SQL查询,并将结果作为
SparkDataFrame
返回。
# 加载一个 JSON 文件
people <- read.df("./examples/src/main/resources/people.json", "json")
# 注册这个 SparkDataFrame 作为一个临时视图。
createOrReplaceTempView(people, "people")
# 可以使用 sql 方法执行 SQL 语句
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
## name
##1 Justin
机器学习
算法
SparkR 目前支持以下机器学习算法:
分类
-
spark.logit:逻辑回归 -
spark.mlp:多层感知器 (MLP) -
spark.naiveBayes:朴素贝叶斯 -
spark.svmLinear:线性支持向量机 -
spark.fmClassifier:分解机分类器
回归
-
spark.survreg:加速失效时间 (AFT) 生存模型 -
spark.glmorglm:广义线性模型 (GLM) -
spark.isoreg:同调回归 -
spark.lm:线性回归 -
spark.fmRegressor:分解机器回归
树
-
spark.decisionTree:用于回归的决策树Regression和Classification -
spark.gbt:用于回归的梯度提升树Regression和Classification -
spark.randomForest:用于回归的随机森林Regression和Classification
聚类
-
spark.bisectingKmeans:二分k均值 -
spark.gaussianMixture:高斯混合模型 (GMM) -
spark.kmeans:K均值 -
spark.lda:潜在狄利克雷分配 (LDA) -
spark.powerIterationClustering (PIC):幂迭代聚类 (PIC)
协同过滤
频繁模式挖掘
统计
-
spark.kstest:科尔莫戈罗夫-史密诺夫检验
在底层,SparkR使用MLlib来训练模型。请参考MLlib用户指南的相应部分获取示例代码。
用户可以调用
summary
来打印拟合模型的摘要,
predict
来对新数据进行预测,以及
write.ml
/
read.ml
来保存/加载拟合模型。
SparkR支持一部分可用的R公式运算符用于模型拟合,包括‘~’, ‘.’, ‘:’, ‘+’, 和 ‘-’。
模型持久性
以下示例显示如何通过 SparkR 保存/加载 MLlib 模型。
training <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
# 使用spark.glm拟合一个"gaussian"族的广义线性模型
df_list <- randomSplit(training, c(7,3), 2)
gaussianDF <- df_list[[1]]
gaussianTestDF <- df_list[[2]]
gaussianGLM <- spark.glm(gaussianDF, label ~ features, family = "gaussian")
# 保存并加载一个拟合的MLlib模型
modelPath <- tempfile(pattern = "ml", fileext = ".tmp")
write.ml(gaussianGLM, modelPath)
gaussianGLM2 <- read.ml(modelPath)
# 检查模型摘要
summary(gaussianGLM2)
# 检查模型预测
gaussianPredictions <- predict(gaussianGLM2, gaussianTestDF)
head(gaussianPredictions)
unlink(modelPath)
R和Spark之间的数据类型映射
| R | Spark |
|---|---|
| 字节 | 字节 |
| 整数 | 整数 |
| 浮点数 | 浮点数 |
| 双精度 | 双精度 |
| 数值 | 双精度 |
| 字符 | 字符串 |
| 字符串 | 字符串 |
| 二进制 | 二进制 |
| 原始 | 二进制 |
| 逻辑 | 布尔 |
| POSIXct | 时间戳 |
| POSIXlt | 时间戳 |
| Date | 日期 |
| 数组 | 数组 |
| 列表 | 数组 |
| 环境 | 映射 |
结构化流
SparkR 支持结构化流处理 API。结构化流处理是建立在 Spark SQL 引擎之上的一个可扩展且具容错性的流处理引擎。有关更多信息,请参见 结构化流处理编程指南 上的 R API
Apache Arrow 在 SparkR 中
Apache Arrow 是一种内存中的列式数据格式,用于在 Spark 中有效地在 JVM 和 R 进程之间传输数据。另见已完成的 PySpark 优化, 使用 Apache Arrow 的 PySpark Pandas 使用指南 。本指南旨在解释如何在 SparkR 中使用 Arrow 优化,并包含一些关键点。
确保安装 Arrow
Arrow R 库可在 CRAN 上获得,可以按如下方式安装。
Rscript -e 'install.packages("arrow", repos="https://cloud.r-project.org/")'
有关更多详细信息,请参考 Apache Arrow 的官方文档 。
请注意,您必须确保在所有集群节点上安装并可用Arrow R包。目前支持的最低版本是1.0.0;然而,这可能会在次要版本之间变化,因为SparkR中的Arrow优化是实验性的。
启用R数据框的转换,
dapply
和
gapply
当使用调用
collect(spark_df)
将 Spark DataFrame 转换为 R DataFrame 时,可以使用箭头优化;当使用
createDataFrame(r_df)
从 R DataFrame 创建 Spark DataFrame 时;当通过
dapply(...)
对每个分区应用 R 原生函数时;以及当通过
gapply(...)
对分组数据应用 R 原生函数时。要在执行时使用箭头,用户需要首先将 Spark 配置 ‘spark.sql.execution.arrow.sparkr.enabled’ 设置为 ‘true’。默认情况下这是禁用的。
无论优化是否启用,SparkR 都会产生相同的结果。此外,当优化由于任何原因在实际计算之前失败时,Spark DataFrame 和 R DataFrame 之间的转换会自动回退到非 Arrow 优化实现。
# 启动启用 Arrow 优化的 spark 会话
sparkR.session(master = "local[*]",
sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "true"))
# 将 Spark DataFrame 转换为 R DataFrame
spark_df <- createDataFrame(mtcars)
# 将 Spark DataFrame 收集为 R DataFrame
collect(spark_df)
# 对每个分区应用 R 原生函数。
collect(dapply(spark_df, function(rdf) { data.frame(rdf$gear + 1) }, structType("gear double")))
# 对分组数据应用 R 原生函数。
collect(gapply(spark_df,
"gear",
function(key, group) {
data.frame(gear = key[[1]], disp = mean(group$disp) > group$disp)
},
structType("gear double, disp boolean")))
请注意,即使使用Arrow,
collect(spark_df)
也会将DataFrame中的所有记录收集到驱动程序中,这应在数据的小子集上进行。此外,
gapply(...)
和
dapply(...)
中指定的输出模式应与给定函数返回的R DataFrame相匹配。
支持的 SQL 类型
目前,所有Spark SQL数据类型都支持基于Arrow的转换,除了
FloatType
,
BinaryType
,
ArrayType
,
StructType
和
MapType
。
R 函数名称冲突
在R中加载和附加一个新包时,可能会出现一个名称 冲突 ,其中一个函数遮蔽了另一个函数。
以下函数被SparkR包屏蔽:
| 掩蔽函数 | 如何访问 |
|---|---|
cov
在
package:stats
|
|
filter
在
package:stats
|
|
sample
在
package:base
|
base::sample(x, size, replace = FALSE, prob = NULL)
|
由于SparkR的部分内容是基于
dplyr
包建模的,SparkR中的某些函数与
dplyr
中的函数同名。根据两个包的加载顺序,第一个加载的包中的一些函数会被后加载的包中的函数屏蔽。在这种情况下,请在调用前加上包名,例如
SparkR::cume_dist(x)
或
dplyr::cume_dist(x)
。
您可以使用
search()
检查 R 中的搜索路径
迁移指南
迁移指南现在已存档 在此页面 。