SparkR(Spark上的R)

概述

SparkR 是一个 R 包,提供了一个轻量级的前端,以便从 R 使用 Apache Spark。 在 Spark 3.5.3 中,SparkR 提供了一个分布式数据框实现,支持选择、过滤、聚合等操作(类似于 R 数据框, dplyr ),但适用于大数据集。SparkR 还支持使用 MLlib 的分布式机器学习。

SparkDataFrame

A SparkDataFrame 是组织成命名列的分布式数据集合。它在概念上等同于关系数据库中的表或 R 中的数据框,但在内部具有更丰富的优化。SparkDataFrames 可以从多种来源构建,例如:结构化数据文件、Hive 中的表、外部数据库或现有的本地 R 数据框。

此页面上的所有示例都使用R或Spark发行版中包含的示例数据,并且可以使用 ./bin/sparkR shell运行。

启动:SparkSession

SparkR的入口点是 SparkSession ,它将您的R程序连接到Spark集群。您可以使用 sparkR.session 创建一个 SparkSession ,并传入应用程序名称、依赖的spark包等选项。此外,您还可以通过 SparkSession 使用SparkDataFrames。如果您是从 sparkR shell工作, SparkSession 应该已经为您创建,您无需调用 sparkR.session

sparkR.session()

从 RStudio 启动

你也可以从RStudio启动SparkR。你可以通过RStudio、R shell、Rscript或其他R IDE将你的R程序连接到Spark集群。首先,确保在环境中设置了SPARK_HOME(你可以检查 Sys.getenv ),加载SparkR包,并调用 sparkR.session 如下。它将检查Spark的安装,如果未找到,将自动下载并缓存。或者,你也可以手动运行 install.spark

除了调用 sparkR.session ,你还可以指定某些 Spark 驱动程序属性。通常这些 应用程序属性 运行环境 不能以编程方式设置,因为驱动程序 JVM 进程已经启动,在这种情况下,SparkR 会为你处理此事。要设置它们,将它们作为其他配置属性传递给 sparkConfig 参数到 sparkR.session()

if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
Sys.setenv(SPARK_HOME = "/home/spark")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))

以下Spark驱动程序属性可以在 sparkConfig 中通过 sparkR.session 从RStudio进行设置:

属性名称 属性组 spark-submit 等效项
spark.master 应用程序属性 --master
spark.kerberos.keytab 应用程序属性 --keytab
spark.kerberos.principal 应用程序属性 --principal
spark.driver.memory 应用程序属性 --driver-memory
spark.driver.extraClassPath 运行时环境 --driver-class-path
spark.driver.extraJavaOptions 运行时环境 --driver-java-options
spark.driver.extraLibraryPath 运行时环境 --driver-library-path

创建 SparkDataFrames

通过一个 SparkSession ,应用程序可以从本地 R 数据框创建 SparkDataFrame ,从一个 Hive 表 ,或从其他 数据源

来自本地数据框

创建数据框的最简单方法是将本地 R 数据框转换为 SparkDataFrame。具体来说,我们可以使用 as.DataFrame createDataFrame 并传入本地 R 数据框来创建 SparkDataFrame。作为一个例子,以下代码基于 R 的 faithful 数据集创建一个 SparkDataFrame

df <- as.DataFrame(faithful)
# 显示 SparkDataFrame 的第一部分
head(df)
##  爆发等待时间
##1     3.600      79
##2     1.800      54
##3     3.333      74

来自数据源

SparkR 支持通过 SparkDataFrame 接口操作多种数据源。 本节描述了使用数据源加载和保存数据的一般方法。 您可以查看 Spark SQL 编程指南以获取有关内置数据源的更多 特定选项

从数据源创建SparkDataFrames的一般方法是 read.df 。该方法接受要加载的文件路径和数据源类型,并将自动使用当前活动的SparkSession。 SparkR支持本地读取JSON、CSV和Parquet文件,并通过可从 第三方项目 获得的包,您可以找到流行文件格式(如Avro)的数据源连接器。这些包可以通过在 spark-submit sparkR 命令中指定 --packages 来添加,或者在交互式R Shell或RStudio中使用 sparkPackages 参数初始化SparkSession。

sparkR.session(sparkPackages = "org.apache.spark:spark-avro_2.12:3.5.3")

我们可以通过一个示例 JSON 输入文件来查看如何使用数据源。请注意,这里使用的文件是 不是 一个典型的 JSON 文件。文件中的每一行必须包含一个独立的、自包含的有效 JSON 对象。有关更多信息,请参阅 JSON Lines 文本格式,也称为换行分隔的 JSON 。因此,常规的多行 JSON 文件通常会失败。

people <- read.df("./examples/src/main/resources/people.json", "json")
head(people)
##  年龄    姓名
##1  NA Michael
##2  30    Andy
##3  19  Justin
# SparkR 自动推断 JSON 文件的结构
printSchema(people)
# 根
#  |-- 年龄: long (可为空 = true)
#  |-- 姓名: string (可为空 = true)
# 同样,可以使用 read.json 读取多个文件
people <- read.json(c("./examples/src/main/resources/people.json", "./examples/src/main/resources/people2.json"))

数据源API原生支持CSV格式的输入文件。有关更多信息,请参阅SparkR read.df API文档。

df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")

The data sources API can also be used to save out SparkDataFrames into multiple file formats. For example, we can save the SparkDataFrame from the previous example to a Parquet file using write.df .

write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite")

来自 Hive 表

您还可以从Hive表创建SparkDataFrames。为此,我们需要创建一个具有Hive支持的SparkSession,以便访问Hive MetaStore中的表。请注意,Spark应该已经构建了 Hive支持 ,更多详细信息可以在 SQL编程指南 中找到。在SparkR中,默认情况下将尝试创建一个启用Hive支持的SparkSession( enableHiveSupport = TRUE )。

sparkR.session()
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# 查询可以用HiveQL表达。
results <- sql("FROM src SELECT key, value")
# 结果现在是一个SparkDataFrame
head(results)
##  key   value
## 1 238 val_238
## 2  86  val_86
## 3 311 val_311

SparkDataFrame 操作

SparkDataFrames支持多种函数以进行结构化数据处理。 在这里,我们包含了一些基本示例,完整列表可以在 API 文档中找到:

选择行,列

# 创建 SparkDataFrame
df <- as.DataFrame(faithful)
# 获取 SparkDataFrame 的基本信息
df
## SparkDataFrame[eruptions:double, waiting:double]
# 仅选择 "eruptions" 列
head(select(df, df$eruptions))
##  eruptions
##1     3.600
##2     1.800
##3     3.333
# 你也可以将列名作为字符串传入
head(select(df, "eruptions"))
# 过滤 SparkDataFrame,仅保留等待时间少于 50 分钟的行
head(filter(df, dfwaiting < 50))
##  eruptions waiting
##1     1.750      47
##2     1.750      47
##3     1.867      48

分组,聚合

SparkR 数据框支持在分组后聚合数据的许多常用函数。例如,我们可以计算如下所示的 faithful 数据集中的 waiting 时间的直方图

# 我们使用 `n` 操作符来计算每个等待时间出现的次数
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
##  waiting count
##1      70     4
##2      67     1
##3      69     2
# 我们还可以对聚合结果进行排序,以获取最常见的等待时间
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
##   waiting count
##1      78    15
##2      83    14
##3      81    13

除了标准聚合,SparkR 支持 OLAP cube 操作符 cube :

head(agg(cube(df, "cyl", "disp", "gear"), avg(df$mpg)))
##  cyl  disp gear avg(mpg)
##1  NA 140.8    4     22.8
##2   4  75.7    4     30.4
##3   8 400.0    3     19.2
##4   8 318.0    3     15.5
##5  NA 351.0   NA     15.8
##6  NA 275.8   NA     16.3

rollup :

head(agg(rollup(df, "cyl", "disp", "gear"), avg(df$mpg)))
##  cyl  disp gear avg(mpg)
##1   4  75.7    4     30.4
##2   8 400.0    3     19.2
##3   8 318.0    3     15.5
##4   4  78.7   NA     32.4
##5   8 304.0    3     15.2
##6   4  79.0   NA     27.3

对列的操作

SparkR 还提供了一些可以直接应用于列的数据处理和聚合中的函数。下面的示例展示了基本算术函数的使用。

# 将等待时间从小时转换为秒。
# 注意,我们可以将其分配给同一个 SparkDataFrame 中的新列
df$waiting_secs <- df$waiting * 60
head(df)
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440

应用用户自定义函数

在SparkR中,我们支持几种类型的用户自定义函数:

使用 dapply dapplyCollect 在大数据集上运行给定函数

分组应用函数

对每个 SparkDataFrame 的分区应用一个函数。要应用于每个 SparkDataFrame 分区的函数应该只有一个参数,传递给每个分区相应的 data.frame 。函数的输出应该是一个 data.frame 。模式指定了结果的 SparkDataFrame 的行格式。它必须与返回值的 数据类型 匹配。

# 将等待时间从小时转换为秒。
# 请注意,我们可以将 UDF 应用于 DataFrame。
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
structField("waiting_secs", "double"))
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440
##4     2.283      62         3720
##5     4.533      85         5100
##6     2.883      55         3300
dapplyCollect

dapply 一样,将一个函数应用于每个 SparkDataFrame 的分区并收集结果。函数的输出应该是一个 data.frame 。但是,不需要传递Schema。请注意,如果UDF在所有分区上运行的输出无法传送到驱动程序并适合驱动程序内存, dapplyCollect 可能会失败。

# 将等待时间从小时转换为秒。
# 注意,我们可以将 UDF 应用于 DataFrame 并返回 R 的 data.frame
ldf <- dapplyCollect(
df,
function(x) {
x <- cbind(x, "waiting_secs" = x$waiting * 60)
})
head(ldf, 3)
##  喷发   等待  waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440

在大型数据集上运行给定函数,通过输入列进行分组并使用 gapply gapplyCollect

gapply

对每个 SparkDataFrame 的组应用一个函数。该函数将应用于 SparkDataFrame 的每个组,并且应该只有两个参数:分组键和对应于该键的 R data.frame 。组是从 SparkDataFrame 的列中选择的。函数的输出应该是一个 data.frame 。 schema 指定了生成的 SparkDataFrame 的行格式。它必须代表 R 函数的输出 schema,基于 Spark 数据类型 。返回的 data.frame 的列名由用户设置。

# 确定等待时间最长的六次喷发时间(以分钟为单位)。
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
},
schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
##    waiting   max_eruption
##1      64       5.100
##2      69       5.067
##3      71       5.033
##4      87       5.000
##5      63       4.933
##6      89       4.900
gapplyCollect

gapply 类似,将一个函数应用到 SparkDataFrame 的每个分区,并将结果收集回 R 的 data.frame。函数的输出应该是一个 data.frame 。但是,模式不需要被传递。请注意,如果在所有分区上运行的 UDF 的输出无法拉取到驱动程序并适合驱动程序内存,则 gapplyCollect 可能会失败。

# 确定等待时间最长的六次喷发时间(分钟)。
result <- gapplyCollect(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
colnames(y) <- c("waiting", "max_eruption")
y
})
head(result[order(result$max_eruption, decreasing = TRUE), ])
##    waiting   max_eruption
##1      64       5.100
##2      69       5.067
##3      71       5.033
##4      87       5.000
##5      63       4.933
##6      89       4.900

使用 spark.lapply 分布式运行本地 R 函数

spark.lapply

类似于 lapply 在原生 R 中, spark.lapply 在元素列表上运行一个函数并使用 Spark 分配计算。以类似于 doParallel lapply 的方式对列表元素应用一个函数。所有计算的结果应该适合在单个机器上。如果不是这样,它们可以执行类似 df <- createDataFrame(list) 的操作,然后使用 dapply

# 执行多个模型的分布式训练,使用 spark.lapply。这里,我们传递
# 一个只读参数列表,指定广义线性模型的族。
families <- c("gaussian", "poisson")
train <- function(family) {
model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
summary(model)
}
# 返回模型总结的列表
model.summaries <- spark.lapply(families, train)
# 打印每个模型的总结
print(model.summaries)

主动执行

如果启用即时执行,当创建 SparkDataFrame 时,数据会立即返回给 R 客户端。默认情况下,未启用即时执行,可以通过在启动 SparkSession 时将配置属性 spark.sql.repl.eagerEval.enabled 设置为 true 来启用。

要显示的最大行数和每列数据的最大字符数可以通过 spark.sql.repl.eagerEval.maxNumRows spark.sql.repl.eagerEval.truncate 配置属性进行控制。这些属性仅在启用急切执行时有效。如果这些属性没有被明确设置,默认情况下,将显示最多 20 行和每列最多 20 个字符。

# 启动启用了急切执行的 Spark 会话
sparkR.session(master = "local[*]",
sparkConfig = list(spark.sql.repl.eagerEval.enabled = "true",
spark.sql.repl.eagerEval.maxNumRows = as.integer(10)))
# 创建一个分组和排序的 SparkDataFrame
df <- createDataFrame(faithful)
df2 <- arrange(summarize(groupBy(df, df$waiting), count = n(df$waiting)), "waiting")
# 类似于 R 的 data.frame,显示返回的数据,而不是 SparkDataFrame 类字符串
df2
##+-------+-----+
##|waiting|count|
##+-------+-----+
##|   43.0|    1|
##|   45.0|    3|
##|   46.0|    5|
##|   47.0|    4|
##|   48.0|    3|
##|   49.0|    5|
##|   50.0|    5|
##|   51.0|    6|
##|   52.0|    5|
##|   53.0|    7|
##+-------+-----+
##仅显示前 10 行

请注意,要在 sparkR shell 中启用急切执行,请将 spark.sql.repl.eagerEval.enabled=true 配置属性添加到 --conf 选项中。

从SparkR运行SQL查询

A SparkDataFrame还可以在Spark SQL中注册为临时视图,这允许您对其数据运行SQL查询。
sql 函数使应用程序能够以编程方式运行SQL查询,并将结果作为 SparkDataFrame 返回。

# 加载一个 JSON 文件
people <- read.df("./examples/src/main/resources/people.json", "json")
# 注册这个 SparkDataFrame 作为一个临时视图。
createOrReplaceTempView(people, "people")
# 可以使用 sql 方法执行 SQL 语句
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
##    name
##1 Justin

机器学习

算法

SparkR 目前支持以下机器学习算法:

分类

回归

聚类

协同过滤

频繁模式挖掘

统计

在底层,SparkR使用MLlib来训练模型。请参考MLlib用户指南的相应部分获取示例代码。 用户可以调用 summary 来打印拟合模型的摘要, predict 来对新数据进行预测,以及 write.ml / read.ml 来保存/加载拟合模型。 SparkR支持一部分可用的R公式运算符用于模型拟合,包括‘~’, ‘.’, ‘:’, ‘+’, 和 ‘-’。

模型持久性

以下示例显示如何通过 SparkR 保存/加载 MLlib 模型。

training <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
# 使用spark.glm拟合一个"gaussian"族的广义线性模型
df_list <- randomSplit(training, c(7,3), 2)
gaussianDF <- df_list[[1]]
gaussianTestDF <- df_list[[2]]
gaussianGLM <- spark.glm(gaussianDF, label ~ features, family = "gaussian")
# 保存并加载一个拟合的MLlib模型
modelPath <- tempfile(pattern = "ml", fileext = ".tmp")
write.ml(gaussianGLM, modelPath)
gaussianGLM2 <- read.ml(modelPath)
# 检查模型摘要
summary(gaussianGLM2)
# 检查模型预测
gaussianPredictions <- predict(gaussianGLM2, gaussianTestDF)
head(gaussianPredictions)
unlink(modelPath)
Find full example code at "examples/src/main/r/ml/ml.R" in the Spark repo.

R和Spark之间的数据类型映射

R Spark
字节 字节
整数 整数
浮点数 浮点数
双精度 双精度
数值 双精度
字符 字符串
字符串 字符串
二进制 二进制
原始 二进制
逻辑 布尔
POSIXct 时间戳
POSIXlt 时间戳
Date 日期
数组 数组
列表 数组
环境 映射

结构化流

SparkR 支持结构化流处理 API。结构化流处理是建立在 Spark SQL 引擎之上的一个可扩展且具容错性的流处理引擎。有关更多信息,请参见 结构化流处理编程指南 上的 R API

Apache Arrow 在 SparkR 中

Apache Arrow 是一种内存中的列式数据格式,用于在 Spark 中有效地在 JVM 和 R 进程之间传输数据。另见已完成的 PySpark 优化, 使用 Apache Arrow 的 PySpark Pandas 使用指南 。本指南旨在解释如何在 SparkR 中使用 Arrow 优化,并包含一些关键点。

确保安装 Arrow

Arrow R 库可在 CRAN 上获得,可以按如下方式安装。

Rscript -e 'install.packages("arrow", repos="https://cloud.r-project.org/")'

有关更多详细信息,请参考 Apache Arrow 的官方文档

请注意,您必须确保在所有集群节点上安装并可用Arrow R包。目前支持的最低版本是1.0.0;然而,这可能会在次要版本之间变化,因为SparkR中的Arrow优化是实验性的。

启用R数据框的转换, dapply gapply

当使用调用 collect(spark_df) 将 Spark DataFrame 转换为 R DataFrame 时,可以使用箭头优化;当使用 createDataFrame(r_df) 从 R DataFrame 创建 Spark DataFrame 时;当通过 dapply(...) 对每个分区应用 R 原生函数时;以及当通过 gapply(...) 对分组数据应用 R 原生函数时。要在执行时使用箭头,用户需要首先将 Spark 配置 ‘spark.sql.execution.arrow.sparkr.enabled’ 设置为 ‘true’。默认情况下这是禁用的。

无论优化是否启用,SparkR 都会产生相同的结果。此外,当优化由于任何原因在实际计算之前失败时,Spark DataFrame 和 R DataFrame 之间的转换会自动回退到非 Arrow 优化实现。

# 启动启用 Arrow 优化的 spark 会话
sparkR.session(master = "local[*]",
sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "true"))
# 将 Spark DataFrame 转换为 R DataFrame
spark_df <- createDataFrame(mtcars)
# 将 Spark DataFrame 收集为 R DataFrame
collect(spark_df)
# 对每个分区应用 R 原生函数。
collect(dapply(spark_df, function(rdf) { data.frame(rdf$gear + 1) }, structType("gear double")))
# 对分组数据应用 R 原生函数。
collect(gapply(spark_df,
"gear",
function(key, group) {
data.frame(gear = key[[1]], disp = mean(group$disp) > group$disp)
},
structType("gear double, disp boolean")))

请注意,即使使用Arrow, collect(spark_df) 也会将DataFrame中的所有记录收集到驱动程序中,这应在数据的小子集上进行。此外, gapply(...) dapply(...) 中指定的输出模式应与给定函数返回的R DataFrame相匹配。

支持的 SQL 类型

目前,所有Spark SQL数据类型都支持基于Arrow的转换,除了 FloatType , BinaryType , ArrayType , StructType MapType

R 函数名称冲突

在R中加载和附加一个新包时,可能会出现一个名称 冲突 ,其中一个函数遮蔽了另一个函数。

以下函数被SparkR包屏蔽:

掩蔽函数 如何访问
cov package:stats
stats::cov(x, y = NULL, use = "everything",
           method = c("pearson", "kendall", "spearman"))
filter package:stats
stats::filter(x, filter, method = c("convolution", "recursive"),
              sides = 2, circular = FALSE, init)
sample package:base base::sample(x, size, replace = FALSE, prob = NULL)

由于SparkR的部分内容是基于 dplyr 包建模的,SparkR中的某些函数与 dplyr 中的函数同名。根据两个包的加载顺序,第一个加载的包中的一些函数会被后加载的包中的函数屏蔽。在这种情况下,请在调用前加上包名,例如 SparkR::cume_dist(x) dplyr::cume_dist(x)

您可以使用 search() 检查 R 中的搜索路径

迁移指南

迁移指南现在已存档 在此页面