快速入门
本教程提供了使用Spark的快速介绍。我们将首先通过Spark的交互式shell(使用Python或Scala)介绍API,然后展示如何在Java、Scala和Python中编写应用程序。
要跟随本指南,首先从 Spark 网站 下载一个打包好的 Spark 发行版。由于我们不使用 HDFS,您可以下载任何版本 Hadoop 的包。
注意,在Spark 2.0之前,Spark的主要编程接口是弹性分布式数据集(RDD)。在Spark 2.0之后,RDD被数据集(Dataset)取代,数据集与RDD一样是强类型的,但在底层有更丰富的优化。RDD接口仍然被支持,您可以在 RDD编程指南 中获取更详细的参考。然而,我们强烈建议您切换使用数据集,因为它的性能优于RDD。请参阅 SQL编程指南 以获取有关数据集的更多信息。
使用 Spark Shell 进行交互式分析
基础
Spark的shell提供了一种简单的方式来学习API,以及一个强大的工具来交互式地分析数据。它可以在Scala(运行在Java VM上,因此是使用现有Java库的好方法)或Python中使用。通过在Spark目录中运行以下命令来启动它:
./bin/pyspark
或者如果在您当前的环境中使用pip安装了PySpark:
pyspark
Spark的主要抽象是一个称为Dataset的分布式集合。可以通过Hadoop InputFormats(例如HDFS文件)创建Datasets,或通过转换其他Datasets来创建。由于Python的动态特性,我们不需要在Python中强类型Dataset。因此,Python中的所有Datasets都是Dataset[Row],我们称其为
DataFrame
,以与Pandas和R中的数据框概念保持一致。让我们从Spark源目录中的README文件文本创建一个新的DataFrame:
>>> textFile = spark.read.text("README.md")
您可以直接从 DataFrame 获取值,通过调用一些操作,或者转换 DataFrame 以获得一个新的 DataFrame。更多详细信息,请阅读 API 文档 。
>>> textFile.count() # 该 DataFrame 中的行数
126
>>> textFile.first() # 该 DataFrame 中的第一行
Row(value=u'# Apache Spark')
现在让我们将这个DataFrame转换为一个新的。我们调用
filter
来返回一个包含文件中一部分行的新DataFrame。
>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
我们可以将变换和操作链接在一起:
>>> textFile.filter(textFile.value.contains("Spark")).count() # 有多少行包含 "Spark"?
15
./bin/spark-shell
Spark的主要抽象是一个名为Dataset的分布式项目集合。可以从Hadoop InputFormats(例如HDFS文件)创建Datasets,也可以通过转换其他Datasets来创建。让我们从Spark源目录中的README文件的文本创建一个新的Dataset:
scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: 字符串]
您可以直接从数据集获取值,通过调用一些操作,或者转换数据集以获取新的数据集。有关更多详细信息,请阅读 API文档 。
scala> textFile.count() // 此数据集中的项数
res0: Long = 126 // 可能与你的不同,因为 README.md 会随着时间变化,类似于其他输出
scala> textFile.first() // 此数据集中的第一个项
res1: String = # Apache Spark
现在让我们将这个数据集转化为一个新的数据集。我们调用
filter
返回一个包含文件中部分项目的新数据集。
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
我们可以将转换和操作链在一起:
scala> textFile.filter(line => line.contains("Spark")).count() // 有多少行包含 "Spark"?
res3: Long = 15
关于数据集操作的更多信息
数据集操作和转换可以用于更复杂的计算。假设我们想找到单词最多的行:
>>> from pyspark.sql import functions as sf
>>> textFile.select(sf.size(sf.split(textFile.value, "\s+")).name("numWords")).agg(sf.max(sf.col("numWords"))).collect()
[Row(max(numWords)=15)]
首先将一行映射为一个整数值,并将其别名为“numWords”,创建一个新的DataFrame。
agg
在该DataFrame上被调用以找到最大的单词计数。
select
和
agg
的参数都是
Column
,我们可以使用
df.colName
从DataFrame中获取一列。我们还可以导入 pyspark.sql.functions,它提供了许多方便的函数,以从旧列构建新列。
一种常见的数据流模式是MapReduce,由Hadoop推广。Spark可以方便地实现MapReduce流程:
>>> wordCounts = textFile.select(sf.explode(sf.split(textFile.value, "\s+")).alias("word")).groupBy("word").count()
在这里,我们使用
explode
函数在
select
中,将行的 Dataset 转换为单词的 Dataset,然后结合
groupBy
和
count
来计算文件中每个单词的计数,生成一个包含 2 列:“word”和“count”的 DataFrame。为了在我们的 shell 中收集单词计数,我们可以调用
collect
:
>>> wordCounts.collect()
[Row(word=u'在线', count=1), Row(word=u'图表', count=1), ...]
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Int = 15
这首先将一行映射到一个整数值,创建一个新的数据集。
reduce
被调用在那个数据集上以找到最大的单词计数。
map
和
reduce
的参数是 Scala 函数字面量(闭包),可以使用任何语言特性或 Scala/Java 库。例如,我们可以轻松调用在其他地方声明的函数。我们将使用
Math.max()
函数来使这段代码更容易理解:
scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15
一种常见的数据流模式是MapReduce,由Hadoop推广。Spark可以方便地实现MapReduce流程:
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [值: string, 计数(1): bigint]
在这里,我们调用
flatMap
将一个行的 Dataset 转换为一个单词的 Dataset,然后结合
groupByKey
和
count
来计算文件中每个单词的计数,结果为一个 (String, Long) 对的 Dataset。为了在我们的 shell 中收集单词计数,我们可以调用
collect
:
scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((意味着,1), (在下面,2), (这个,3), (因为,1), (Python,2), (同意,1), (集群.,1), ...)
缓存
Spark还支持将数据集拉取到集群范围内的内存缓存中。当数据被重复访问时,这非常有用,例如在查询一个小的“热点”数据集或运行像PageRank这样的迭代算法时。作为一个简单的示例,让我们标记我们的
linesWithSpark
数据集以进行缓存:
>>> linesWithSpark.cache()
>>> linesWithSpark.count()
15
>>> linesWithSpark.count()
15
使用Spark来探索和缓存一个100行的文本文件似乎很傻。然而有趣的是,这些相同的功能可以用于非常大的数据集,即使它们被分布在数十或数百个节点上。您还可以通过将
bin/pyspark
连接到集群来进行交互式操作,如
RDD编程指南
中所述。
scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]
scala> linesWithSpark.count()
res8: Long = 15
scala> linesWithSpark.count()
res9: Long = 15
使用Spark来探索和缓存一个100行的文本文件可能看起来很傻。 有趣的是,这些相同的功能可以在非常大的数据集上使用,即使它们分布在十个或一百个节点上。 您还可以通过将
bin/spark-shell
连接到集群以交互方式执行此操作,如在
RDD编程指南
中所述。
自包含应用程序
假设我们希望使用Spark API编写一个独立的应用程序。我们将通过一个简单的应用程序来演示Scala(使用sbt)、Java(使用Maven)和Python(pip)。
现在我们将演示如何使用Python API(PySpark)编写应用程序。
如果您正在构建一个打包的 PySpark 应用程序或库,您可以将其添加到您的 setup.py 文件中,如下所示:
install_requires=[
'pyspark==3.5.3'
]
作为一个例子,我们将创建一个简单的 Spark 应用程序,
SimpleApp.py
:
"""SimpleApp.py"""
from pyspark.sql import SparkSession
logFile = "YOUR_SPARK_HOME/README.md" # 应该是您系统上的某个文件
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()
numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()
print("包含 a 的行数: %i, 包含 b 的行数: %i" % (numAs, numBs))
spark.stop()
这个程序仅计算文本文件中包含‘a’的行数和包含‘b’的行数。 请注意,您需要将 YOUR_SPARK_HOME 替换为安装 Spark 的位置。 与 Scala 和 Java 示例一样,我们使用 SparkSession 创建数据集。 对于使用自定义类或第三方库的应用程序,我们还可以通过将代码依赖项打包到 .zip 文件中,通过其
--py-files
参数将其添加到
spark-submit
中(有关详细信息,请参见
spark-submit --help
)。
SimpleApp
简单到我们不需要指定任何代码依赖项。
我们可以使用
bin/spark-submit
脚本运行这个应用程序:
# 使用 spark-submit 来运行你的应用程序
$ YOUR_SPARK_HOME/bin/spark-submit \
--master local[4] \
SimpleApp.py
...
Lines with a: 46, Lines with b: 23
如果您在环境中安装了 PySpark(例如,
pip install pyspark
),您可以使用常规的 Python 解释器运行您的应用程序,或者根据您的喜好使用提供的 ‘spark-submit’。
# 使用 Python 解释器运行你的应用程序
$ python SimpleApp.py
...
Lines with a: 46, Lines with b: 23
我们将创建一个非常简单的Spark应用程序,使用Scala语言——实际上是如此简单,以至于它被命名为
SimpleApp.scala
:
/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession
object SimpleApp {
def main(args: Array[String]): Unit = {
val logFile = "YOUR_SPARK_HOME/README.md" // 应该是你系统上的某个文件
val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
val logData = spark.read.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println(s"Lines with a: $numAs, Lines with b: $numBs")
spark.stop()
}
}
请注意,应用程序应该定义一个
main()
方法,而不是扩展
scala.App
。
scala.App
的子类可能无法正确工作。
此程序仅计算包含‘a’的行数和包含‘b’的行数在Spark README中的数量。请注意,您需要用Spark安装位置替换YOUR_SPARK_HOME。与之前使用Spark shell的例子不同,后者会初始化自己的SparkSession,我们在程序中作为一部分初始化SparkSession。
我们调用
SparkSession.builder
来构建一个
SparkSession
,然后设置应用名称,最后调用
getOrCreate
来获取
SparkSession
实例。
我们的应用程序依赖于Spark API,因此我们还将包括一个sbt配置文件,
build.sbt
,该文件说明了Spark是一个依赖项。该文件还添加了一个
Spark所依赖的仓库:
name := "简单项目"
version := "1.0"
scalaVersion := "2.12.18"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.3"
为了使sbt正常工作,我们需要将
SimpleApp.scala
和
build.sbt
按照典型的目录结构进行布局。一旦就位,我们就可以创建一个包含应用程序代码的JAR包,然后使用
spark-submit
脚本来运行我们的程序。
# 你的目录结构应该像这样
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
# 打包一个包含你的应用程序的jar
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.12/simple-project_2.12-1.0.jar
# 使用spark-submit来运行你的应用程序
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/scala-2.12/simple-project_2.12-1.0.jar
...
Lines with a: 46, Lines with b: 23
这个示例将使用Maven来编译一个应用程序JAR,但任何类似的构建系统都可以工作。
我们将创建一个非常简单的 Spark 应用程序,
SimpleApp.java
:
/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
public class SimpleApp {
public static void main(String[] args) {
String logFile = "YOUR_SPARK_HOME/README.md"; // 应该是您系统上的某个文件
SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
Dataset<String> logData = spark.read().textFile(logFile).cache();
long numAs = logData.filter(s -> s.contains("a")).count();
long numBs = logData.filter(s -> s.contains("b")).count();
System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
spark.stop();
}
}
该程序仅计算包含‘a’的行数和包含‘b’的行数在Spark README中。请注意,您需要将YOUR_SPARK_HOME替换为Spark安装的位置。与之前的Spark shell示例不同,后者初始化自己的SparkSession,我们作为程序的一部分初始化一个SparkSession。
为了构建程序,我们还编写了一个 Maven
pom.xml
文件,该文件将 Spark 列为依赖项。
请注意,Spark 工件标记了 Scala 版本。
edu.berkeley
simple-project
4.0.0
简单项目
jar
1.0
org.apache.spark
spark-sql_2.12
3.5.3
provided
我们根据规范的Maven目录结构来布局这些文件:
$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java
现在,我们可以使用Maven打包应用程序,并通过
./bin/spark-submit
执行它。
# 打包一个包含你的应用程序的JAR
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar
# 使用spark-submit来运行你的应用程序
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23
其他依赖管理工具如 Conda 和 pip 也可以用于自定义类或第三方库。另见 Python 包管理 。
接下来去哪里
祝贺你运行你的第一个 Spark 应用程序!
- 有关 API 的深入概述,请从 RDD 编程指南 和 SQL 编程指南 开始,或查看“编程指南”菜单以获取其他组件。
- 要在集群上运行应用程序,请前往 部署概览 。
-
最后,Spark 在
examples目录中包含了几个示例 ( Scala , Java , Python , R )。 您可以按照以下方式运行它们:
# 对于Scala和Java,使用run-example:
./bin/run-example SparkPi
# 对于Python示例,直接使用spark-submit:
./bin/spark-submit examples/src/main/python/pi.py
# 对于R示例,直接使用spark-submit:
./bin/spark-submit examples/src/main/r/dataframe.R