RDD 编程指南
概述
在高层次上,每个Spark应用程序由一个运行用户的
driver program
的
main
函数,并在集群上执行各种
并行操作
。Spark提供的主要抽象是
弹性分布式数据集
(RDD),它是一个跨集群节点分区的元素集合,可以并行操作。RDD是通过从Hadoop文件系统中的文件(或任何其他Hadoop支持的文件系统)开始,或在驱动程序中使用现有的Scala集合并对其进行转换来创建的。用户还可以请求Spark在内存中
持久化
一个RDD,从而允许它在并行操作中高效重用。最后,RDD自动从节点故障中恢复。
Spark中的第二个抽象是 共享变量 ,可以在并行操作中使用。默认情况下,当Spark在不同节点上以一组任务并行运行一个函数时,它会将函数中使用的每个变量的副本发送到每个任务。有时,变量需要在任务之间,或在任务与驱动程序之间共享。Spark支持两种类型的共享变量: 广播变量 ,可以用于在所有节点的内存中缓存一个值,以及 累加器 ,这些是仅“添加”的变量,例如计数器和总和。
本指南展示了每种Spark支持的语言中的这些特性。如果您启动Spark的交互式Shell-即Scala Shell的
bin/spark-shell
或Python的
bin/pyspark
,将最容易跟随。
与Spark连接
Spark 3.5.3 与 Python 3.8+ 兼容。它可以使用标准的 CPython 解释器,因此可以使用像 NumPy 这样的 C 库。它也支持 PyPy 7.3.6+。
Python中的Spark应用程序可以通过包含Spark的
bin/spark-submit
脚本在运行时运行,或者通过在您的setup.py中包含它,如下所示:
install_requires=[
'pyspark==3.5.3'
]
要在Python中运行Spark应用程序而无需使用pip安装PySpark,请使用位于Spark目录中的
bin/spark-submit
脚本。
该脚本将加载Spark的Java/Scala库,并允许您将应用程序提交到集群中。
您还可以使用
bin/pyspark
启动交互式Python shell。
如果您希望访问 HDFS 数据,您需要使用连接到您版本的 HDFS 的 PySpark 构建。 预构建包 也可以在 Spark 主页上找到,用于常见的 HDFS 版本。
最后,您需要将一些Spark类导入到您的程序中。添加以下行:
from pyspark import SparkContext, SparkConf
PySpark要求驱动程序和工作程序使用相同的小版本的Python。它使用PATH中的默认python版本,您可以通过
PYSPARK_PYTHON
指定要使用的Python版本,例如:
$ PYSPARK_PYTHON=python3.8 bin/pyspark
$ PYSPARK_PYTHON=/path-to-your-pypy/pypy bin/spark-submit examples/src/main/python/pi.py
Spark 3.5.3 默认构建并分发以与 Scala 2.12 一起工作。 (Spark 也可以构建为与其他版本的 Scala 一起工作。) 要在 Scala 中编写应用程序,您需要使用兼容的 Scala 版本(例如 2.12.X)。
要编写一个Spark应用程序,您需要在Spark上添加Maven依赖项。Spark可以通过Maven Central获取:
groupId = org.apache.spark
artifactId = spark-core_2.12
version = 3.5.3
此外,如果您希望访问HDFS集群,则需要为您的HDFS版本添加对
hadoop-client
的依赖。
groupId = org.apache.hadoop
artifactId = hadoop-client
version =
最后,您需要将一些Spark类导入到您的程序中。添加以下行:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
(在Spark 1.3.0之前,您需要显式地
import org.apache.spark.SparkContext._
以启用基本的隐式转换。)
Spark 3.5.3 支持 lambda 表达式 用于简洁地编写函数,另外您可以使用 org.apache.spark.api.java.function 包中的类。
请注意,在Spark 2.2.0中移除了对Java 7的支持。
要在Java中编写Spark应用程序,您需要添加对Spark的依赖。Spark可以通过Maven Central获取,地址为:
groupId = org.apache.spark
artifactId = spark-core_2.12
version = 3.5.3
此外,如果您希望访问HDFS集群,则需要为您的HDFS版本添加对
hadoop-client
的依赖。
groupId = org.apache.hadoop
artifactId = hadoop-client
version =
最后,您需要将一些Spark类导入到您的程序中。添加以下行:
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
初始化 Spark
一个Spark程序的第一件事是创建一个
SparkContext
对象,这个对象告诉Spark如何访问集群。要创建一个
SparkContext
,你首先需要构建一个
SparkConf
对象,该对象包含有关你应用程序的信息。
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
一个Spark程序必须做的第一件事是创建一个
SparkContext
对象,它告诉Spark如何访问集群。要创建一个
SparkContext
,您首先需要构建一个
SparkConf
对象,该对象包含有关您应用程序的信息。
每个JVM中只能有一个活动的SparkContext。在创建新SparkContext之前,您必须
stop()
活动的SparkContext。
val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)
一个Spark程序必须做的第一件事是创建一个
JavaSparkContext
对象,它告诉Spark如何访问集群。要创建一个
SparkContext
,您首先需要构建一个
SparkConf
对象,该对象包含有关您应用程序的信息。
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);
appName
参数是您要在集群 UI 上显示的应用程序名称。
master
是一个
Spark, Mesos 或 YARN 集群 URL
,或者一个特殊的“local”字符串,用于在本地模式下运行。实际上,当在集群上运行时,您不想在程序中硬编码
master
,而是
使用
spark-submit
启动应用程序
并在那里接收它。然而,对于本地测试和单元测试,您可以传递“local”以在进程中运行 Spark。
使用终端
在PySpark shell中,已经为您创建了一个特殊的解释器感知的SparkContext,存储在名为
sc
的变量中。自己创建SparkContext将无效。您可以使用
--master
参数设置上下文连接的主节点,并通过传递以逗号分隔的列表将Python .zip、.egg或.py文件添加到运行时路径中,使用
--py-files
。有关第三方Python依赖项,请参见
Python包管理
。您还可以通过向
--packages
参数提供以逗号分隔的Maven坐标列表,将依赖项(例如Spark包)添加到您的shell会话中。任何可能存在依赖项的附加仓库(例如Sonatype)都可以传递给
--repositories
参数。例如,要在恰好四个内核上运行
bin/pyspark
,使用:
$ ./bin/pyspark --master local[4]
或者,为了将
code.py
也添加到搜索路径中(以便后续能够
import code
),使用:
$ ./bin/pyspark --master local[4] --py-files code.py
要获取完整的选项列表,请运行
pyspark --help
。在后台,
pyspark
调用更通用的
spark-submit
脚本
。
还可以在
IPython
中启动 PySpark shell,IPython 是增强版的 Python 解释器。PySpark 与 IPython 1.0.0 及更高版本兼容。要使用 IPython,在运行
bin/pyspark
时将
PYSPARK_DRIVER_PYTHON
变量设置为
ipython
:
$ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark
要使用 Jupyter Notebook(之前称为 IPython Notebook),
$ PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark
您可以通过设置
PYSPARK_DRIVER_PYTHON_OPTS
自定义
ipython
或
jupyter
命令。
在Jupyter Notebook服务器启动后,您可以从“文件”标签创建一个新笔记本。在笔记本中,您可以输入命令
%pylab inline
,作为您在开始尝试从Jupyter笔记本使用Spark之前的一部分。
在Spark Shell中,已经为您创建了一个特殊的解释器感知的SparkContext,存储在名为
sc
的变量中。自己创建SparkContext将不起作用。您可以使用
--master
参数设置上下文连接的主节点,并通过传递一个以逗号分隔的列表将JAR添加到类路径中,使用
--jars
参数。您还可以通过提供一个以逗号分隔的Maven坐标列表,将依赖项(例如Spark包)添加到您的Shell会话中,使用
--packages
参数。任何可能存在依赖项的其他存储库(例如Sonatype)都可以通过
--repositories
参数传递。例如,要在恰好四个核心上运行
bin/spark-shell
,请使用:
$ ./bin/spark-shell --master local[4]
或者,要将
code.jar
也添加到其类路径中,请使用:
$ ./bin/spark-shell --master local[4] --jars code.jar
要使用 Maven 坐标包含一个依赖项:
$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"
有关选项的完整列表,请运行
spark-shell --help
。在后台,
spark-shell
调用更通用的
spark-submit
脚本
。
弹性分布式数据集 (RDDs)
Spark围绕 弹性分布式数据集 (RDD)的概念展开,这是一种可以并行操作的容错元素集合。创建RDD有两种方法: 并行化 您驱动程序中的现有集合,或引用外部存储系统中的数据集,例如共享文件系统、HDFS、HBase或提供Hadoop InputFormat的任何数据源。
并行化集合
并行收集是通过在您的驱动程序程序中调用
SparkContext
的
parallelize
方法在现有的可迭代对象或集合上创建的。集合的元素被复制以形成一个可以并行操作的分布式数据集。例如,以下是如何创建一个包含数字1到5的并行集合:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
一旦创建,分布式数据集 (
distData
) 可以并行操作。例如,我们可以调用
distData.reduce(lambda a, b: a + b)
来将列表的元素相加。
我们将在后面描述对分布式数据集的操作。
并行化集合是通过调用
SparkContext
的
parallelize
方法,在您的驱动程序中的现有集合(Scala
Seq
)上创建的。集合的元素被复制,以形成一个可以并行操作的分布式数据集。例如,这里是如何创建一个包含数字1到5的并行化集合:
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
一旦创建,分布式数据集 (
distData
) 可以并行操作。例如,我们可以调用
distData.reduce((a, b) => a + b)
来计算数组元素的总和。我们稍后会描述对分布式数据集的操作。
并行集合是通过在驱动程序中调用
JavaSparkContext
的
parallelize
方法来创建的,该方法适用于现有的
Collection
。集合的元素被复制以形成一个可以并行操作的分布式数据集。例如,以下是如何创建一个包含数字1到5的并行集合:
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);
一旦创建,分布式数据集 (
distData
) 可以并行操作。例如,我们可以调用
distData.reduce((a, b) -> a + b)
来将列表的元素相加。我们稍后会描述对分布式数据集的操作。
并行集合的一个重要参数是将数据集切分成的
分区
数量。Spark 会为集群的每个分区运行一个任务。通常,您希望每个 CPU 有 2-4 个分区。通常,Spark 会根据您的集群自动设置分区数量。然而,您也可以通过将其作为第二个参数传递给
parallelize
手动设置它(例如
sc.parallelize(data, 10)
)。注意:代码中的某些地方使用切片一词(分区的同义词)以保持向后兼容性。
外部数据集
PySpark可以从任何Hadoop支持的存储源创建分布式数据集,包括您的本地文件系统、HDFS、Cassandra、HBase、 Amazon S3 等。Spark支持文本文件、 SequenceFiles ,以及任何其他Hadoop InputFormat 。
文本文件 RDD 可以使用
SparkContext
的
textFile
方法创建。此方法接受文件的 URI(可以是机器上的本地路径,或
hdfs://
,
s3a://
等 URI)并将其作为行的集合进行读取。以下是一个示例调用:
>>> distFile = sc.textFile("data.txt")
一旦创建,
distFile
可以通过数据集操作进行处理。例如,我们可以使用
map
和
reduce
操作将所有行的大小加起来,如下所示:
distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)
。
关于使用Spark读取文件的一些说明:
-
如果使用本地文件系统上的路径,则该文件必须在工作节点上的相同路径下也可访问。可以将文件复制到所有工作节点,或者使用网络挂载的共享文件系统。
-
Spark的所有基于文件的输入方法,包括
textFile
,也支持在目录、压缩文件和通配符上运行。例如,您可以使用textFile("/my/directory")
,textFile("/my/directory/*.txt")
,和textFile("/my/directory/*.gz")
。 -
textFile
方法还可以接受一个可选的第二个参数来控制文件的分区数。默认情况下,Spark为文件的每个块创建一个分区(在HDFS中,块大小默认是128MB),但您也可以通过传递更大的值来请求更多的分区。请注意,您不能有少于块的分区。
除了文本文件,Spark的Python API还支持几种其他数据格式:
-
SparkContext.wholeTextFiles
允许您读取包含多个小文本文件的目录,并将每个文件作为 (文件名, 内容) 对返回。这与textFile
相对,它将返回每个文件中的每一行作为一条记录。 -
RDD.saveAsPickleFile
和SparkContext.pickleFile
支持以简单格式保存 RDD,该格式由序列化的 Python 对象组成。序列化时使用批处理,默认批量大小为 10。 -
SequenceFile 和 Hadoop 输入/输出格式
注意
该功能目前标记为
实验性
,旨在供高级用户使用。未来可能会被基于 Spark SQL 的读/写支持替代,在这种情况下,Spark SQL 将是首选方法。
可写支持
PySpark SequenceFile 支持在 Java 中加载键值对的 RDD,将 Writables 转换为基本的 Java 类型,并使用 pickle 对结果的 Java 对象进行序列化。当将键值对的 RDD 保存到 SequenceFile 时,PySpark 会执行相反的操作。它将 Python 对象反序列化为 Java 对象,然后将它们转换为 Writables。以下 Writables 会被自动转换:
可写类型 | Python类型 |
---|---|
文本 | str |
整型可写 | int |
浮点型可写 | float |
双精度可写 | float |
布尔型可写 | bool |
字节可写 | bytearray |
空值可写 | None |
映射可写 | dict |
数组不能直接处理。用户在读取或写入时需要指定自定义
ArrayWritable
子类型。在写入时,用户还需要指定将数组转换成自定义
ArrayWritable
子类型的自定义转换器。在读取时,默认转换器将自定义
ArrayWritable
子类型转换为 Java
Object[]
,然后被序列化为 Python 元组。为了获得原始类型数组的 Python
array.array
,用户需要指定自定义转换器。
保存和加载序列文件
与文本文件类似,可以通过指定路径保存和加载SequenceFiles。可以指定键和值类,但对于标准Writable这是不需要的。
>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
>>> rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]
保存和加载其他Hadoop输入/输出格式
PySpark 也可以读取任何 Hadoop InputFormat 或写入任何 Hadoop OutputFormat,适用于“新”和“旧”的 Hadoop MapReduce API。如果需要,Hadoop 配置可以作为 Python 字典传入。以下是使用 Elasticsearch ESInputFormat 的示例:
$ ./bin/pyspark --jars /path/to/elasticsearch-hadoop.jar
>>> conf = {"es.resource" : "index/type"} # 假设Elasticsearch在localhost默认运行
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",
"org.apache.hadoop.io.NullWritable",
"org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=conf)
>>> rdd.first() # 结果是一个MapWritable,转换为Python字典
(u'Elasticsearch ID',
{u: True,
u: u,
u: 12345})
请注意,如果 InputFormat 仅依赖于 Hadoop 配置和/或输入路径,并且键和值类可以根据上述表格轻松转换,那么这种方法在此类情况下应该效果良好。
如果你有自定义的序列化二进制数据(例如从Cassandra / HBase加载数据),那么你首先需要在Scala/Java端将这些数据转换为可由pickle的pickler处理的格式。提供了一个
Converter
特征。只需扩展此特征并在
convert
方法中实现你的转换代码。请记得确保此类以及访问你的
InputFormat
所需的任何依赖项都被打包到你的Spark作业jar中,并包含在PySpark的classpath中。
查看
Python 示例
和
转换器示例
,了解如何使用 Cassandra / HBase
InputFormat
和
OutputFormat
与自定义转换器。
Spark 可以从任何 Hadoop 支持的存储源创建分布式数据集,包括您的本地文件系统、HDFS、Cassandra、HBase、 Amazon S3 等。Spark 支持文本文件、 SequenceFiles 以及其他任何 Hadoop InputFormat 。
文本文件 RDD 可以使用
SparkContext
的
textFile
方法创建。此方法接受文件的 URI(可以是机器上的本地路径,或
hdfs://
,
s3a://
等 URI)并将其作为行的集合进行读取。以下是一个示例调用:
scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] 在 textFile 在 <控制台>:26
一旦创建,
distFile
可以被数据集操作执行。例如,我们可以使用
map
和
reduce
操作来累加所有行的大小,如下所示:
distFile.map(s => s.length).reduce((a, b) => a + b)
。
关于使用Spark读取文件的一些说明:
-
如果使用本地文件系统中的路径,则该文件在工作节点上也必须能通过相同的路径访问。要么将文件复制到所有工作节点,要么使用网络挂载的共享文件系统。
-
所有Spark的基于文件的输入方法,包括
textFile
,都支持在目录、压缩文件和通配符上运行。例如,您可以使用textFile("/my/directory")
,textFile("/my/directory/*.txt")
,以及textFile("/my/directory/*.gz")
。当读取多个文件时,分区的顺序取决于文件系统返回文件的顺序。例如,它可能会遵循文件路径的字典顺序,也可能不会。在一个分区内,元素根据它们在底层文件中的顺序进行排序。 -
textFile
方法还接受一个可选的第二个参数,用于控制文件的分区数。默认情况下,Spark为文件的每个块创建一个分区(块在HDFS中默认为128MB),但您也可以通过传递更大的值请求更高的分区数。请注意,分区数不能少于块数。
除了文本文件,Spark 的 Scala API 还支持其他几种数据格式:
-
SparkContext.wholeTextFiles
允许你读取包含多个小文本文件的目录,并将每个文件作为 (文件名, 内容) 对返回。这与textFile
不同,后者会返回每行一个记录。分区由数据局部性决定,某些情况下可能导致分区过少。对于这些情况,wholeTextFiles
提供了一个可选的第二个参数,用于控制最小分区数。 -
对于 SequenceFiles ,使用 SparkContext 的
sequenceFile[K, V]
方法,其中K
和V
是文件中键和值的类型。这些应该是 Hadoop 的 Writable 接口的子类,如 IntWritable 和 Text 。此外,Spark 允许你为一些常见的 Writables 指定本地类型;例如,sequenceFile[Int, String]
将自动读取 IntWritables 和 Texts。 -
对于其他 Hadoop InputFormats,你可以使用
SparkContext.hadoopRDD
方法,它接受一个任意的JobConf
和输入格式类、键类和值类。按你为输入源的 Hadoop 作业设置这些。你还可以使用SparkContext.newAPIHadoopRDD
访问基于“新” MapReduce API(org.apache.hadoop.mapreduce
)的 InputFormats。 -
RDD.saveAsObjectFile
和SparkContext.objectFile
支持以简单格式保存 RDD,该格式由序列化的 Java 对象组成。虽然这不如专门的格式(如 Avro)高效,但它提供了一种保存任何 RDD 的简单方法。
Spark 可以从任何 Hadoop 支持的存储源创建分布式数据集,包括您的本地文件系统、HDFS、Cassandra、HBase、 Amazon S3 等。Spark 支持文本文件、 SequenceFiles 以及其他任何 Hadoop InputFormat 。
文本文件 RDD 可以使用
SparkContext
的
textFile
方法创建。此方法接受文件的 URI(可以是机器上的本地路径,或
hdfs://
,
s3a://
等 URI)并将其作为行的集合进行读取。以下是一个示例调用:
JavaRDD<String> distFile = sc.textFile("data.txt");
创建后,
distFile
可以通过数据集操作进行处理。例如,我们可以使用
map
和
reduce
操作将所有行的大小相加,如下所示:
distFile.map(s -> s.length()).reduce((a, b) -> a + b)
。
关于使用Spark读取文件的一些说明:
-
如果使用本地文件系统上的路径,则该文件必须在工作节点上的相同路径下也可访问。可以将文件复制到所有工作节点,或者使用网络挂载的共享文件系统。
-
Spark的所有基于文件的输入方法,包括
textFile
,也支持在目录、压缩文件和通配符上运行。例如,您可以使用textFile("/my/directory")
,textFile("/my/directory/*.txt")
,和textFile("/my/directory/*.gz")
。 -
textFile
方法还可以接受一个可选的第二个参数来控制文件的分区数。默认情况下,Spark为文件的每个块创建一个分区(在HDFS中,块大小默认是128MB),但您也可以通过传递更大的值来请求更多的分区。请注意,您不能有少于块的分区。
除了文本文件,Spark的Java API还支持其他几种数据格式:
-
JavaSparkContext.wholeTextFiles
允许您读取包含多个小文本文件的目录,并将每个文件作为 (文件名, 内容) 对返回。这与textFile
相对,后者会返回每个文件中每行一条记录。 -
对于 SequenceFiles ,使用 SparkContext 的
sequenceFile[K, V]
方法,其中K
和V
是文件中键和值的类型。这些应该是 Hadoop 的 Writable 接口的子类,如 IntWritable 和 Text 。 -
对于其他 Hadoop InputFormats,您可以使用
JavaSparkContext.hadoopRDD
方法,它接受任意的JobConf
和输入格式类、键类和值类。设置这些与您用于输入源的 Hadoop 作业相同。您还可以对基于“新” MapReduce API (org.apache.hadoop.mapreduce
) 的 InputFormats 使用JavaSparkContext.newAPIHadoopRDD
。 -
JavaRDD.saveAsObjectFile
和JavaSparkContext.objectFile
支持以简单格式保存 RDD,该格式由序列化的 Java 对象组成。虽然这不如像 Avro 这样的专用格式高效,但它提供了一种简单的方法来保存任何 RDD。
RDD 操作
RDD支持两种类型的操作:
转换
,它从现有数据集中创建一个新数据集,以及
行动
,它在对数据集进行计算后将值返回给驱动程序。例如,
map
是一种转换,它通过一个函数将每个数据集元素传递,并返回一个代表结果的新RDD。另一方面,
reduce
是一种行动,它使用某个函数聚合RDD的所有元素,并将最终结果返回给驱动程序(尽管还有一个并行的
reduceByKey
,它返回一个分布式数据集)。
在Spark中,所有的转换都是
延迟
的,因为它们不会立即计算结果。相反,它们仅仅记住应用于某个基本数据集(例如文件)的转换。只有当某个操作需要返回结果给驱动程序时,转换才会被计算。这种设计使Spark能够更高效地运行。例如,我们可以意识到通过
map
创建的数据集将在
reduce
中使用,并且只将
reduce
的结果返回给驱动程序,而不是返回更大的映射数据集。
默认情况下,每次对转化后的RDD执行操作时,可能会重新计算。然而,您也可以通过使用
persist
(或
cache
)方法将RDD持久化到内存中,在这种情况下,Spark将保持集群中的元素,以便下次查询时可以更快地访问。还支持将RDD持久化到磁盘,或在多个节点之间复制。
基础
为了说明RDD的基本概念,考虑下面的简单程序:
lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
第一行从外部文件定义了一个基础RDD。这个数据集并没有被加载到内存中或以其他方式进行操作:
lines
仅仅是指向文件的一个指针。第二行将
lineLengths
定义为
map
转换的结果。同样,由于惰性,
lineLengths
是
不
会立即计算的。最后,我们运行
reduce
,这是一个动作。在这一点上,Spark 将计算分解为任务在不同的机器上运行,每台机器运行其部分的映射和本地缩减,仅将其答案返回给驱动程序。
如果我们还想稍后再次使用
lineLengths
,我们可以添加:
lineLengths.persist()
在
reduce
之前,这将导致
lineLengths
在第一次计算后被保存在内存中。
为了说明RDD的基本概念,考虑下面的简单程序:
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
第一行从外部文件定义了一个基础RDD。这个数据集并没有被加载到内存中或者以其他方式被操作:
lines
仅仅是指向文件的指针。第二行将
lineLengths
定义为
map
转换的结果。同样,由于惰性,
lineLengths
并没有
立即计算。最后,我们运行
reduce
,这是一种操作。此时,Spark将计算分解为任务,在不同的机器上运行,每台机器同时运行其部分的映射和本地归约,只返回其答案给驱动程序。
如果我们之后还想再次使用
lineLengths
,我们可以添加:
lineLengths.persist()
在
reduce
之前,这会导致
lineLengths
在第一次计算后被保存在内存中。
为了说明RDD的基本概念,考虑下面的简单程序:
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);
第一行从外部文件定义了一个基础RDD。这个数据集并没有加载到内存中或者
以其他方式被操作:
lines
只是一个指向文件的指针。
第二行将
lineLengths
定义为
map
转换的结果。同样,
lineLengths
没有
立即计算,因其惰性特性。
最后,我们运行
reduce
,这是一个动作。在这一点上,Spark将计算拆分为任务
在不同的机器上运行,且每台机器同时运行其map部分和一个本地减少,
仅将其答案返回给驱动程序。
如果我们还想稍后再次使用
lineLengths
,我们可以添加:
lineLengths.persist(StorageLevel.MEMORY_ONLY());
在
reduce
之前,这将导致
lineLengths
在第一次计算后被存储在内存中。
将函数传递给Spark
Spark的API很大程度上依赖于将函数传递到驱动程序中以在集群上运行。 有三种推荐的方法来实现这一点:
- Lambda 表达式 ,用于可以写成表达式的简单函数。(Lambda 不支持多语句函数或不返回值的语句。)
-
在调用 Spark 的函数内部的本地
def
,用于较长的代码。 - 模块中的顶级函数。
例如,要传递一个较长的函数,这超出了使用
lambda
可以支持的范围,请考虑以下代码:
"""MyScript.py"""
if __name__ == "__main__":
def myFunc(s):
words = s.split(" ")
return len(words)
sc = SparkContext(...)
sc.textFile("file.txt").map(myFunc)
请注意,虽然也可以传递对类实例中方法的引用(与单例对象相对),但这需要同时传递包含该类的对象以及方法。比如,考虑:
class MyClass(object):
def func(self, s):
return s
def doStuff(self, rdd):
return rdd.map(self.func)
在这里,如果我们创建一个
new MyClass
并在其上调用
doStuff
,里面的
map
引用的是
func
方法
该
MyClass
实例的
,因此整个对象需要被发送到集群。
以类似的方式,访问外部对象的字段将引用整个对象:
class MyClass(object):
def __init__(self):
self.field = "Hello"
def doStuff(self, rdd):
return rdd.map(lambda s: self.field + s)
为避免这个问题,最简单的方法是将
field
复制到一个局部变量中,而不是从外部访问它:
def doStuff(self, rdd):
field = self.field
return rdd.map(lambda s: field + s)
Spark的API在驱动程序中依赖于传递函数以在集群上运行。 有两种推荐的方法来实现这一点:
- 匿名函数语法 ,可用于短代码块。
-
全局单例对象中的静态方法。例如,你可以定义
object MyFunctions
然后传递MyFunctions.func1
,如下所示:
object MyFunctions {
def func1(s: String): String = { ... }
}
myRdd.map(MyFunctions.func1)
请注意,虽然也可以传递对类实例中方法的引用(与单例对象相对),但这需要同时传递包含该类的对象以及方法。比如,考虑:
class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
在这里,如果我们创建一个新的
MyClass
实例并调用
doStuff
,内部的
map
引用了
func1
方法
该
MyClass
实例的
,因此整个对象需要被发送到集群。这类似于写
rdd.map(x => this.func1(x))
。
以类似的方式,访问外部对象的字段将引用整个对象:
class MyClass {
val field = "你好"
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
相当于写
rdd.map(x => this.field + x)
,它引用了所有的
this
。为避免此问题,最简单的方法是将
field
复制到一个局部变量中,而不是从外部访问它:
def doStuff(rdd: RDD[String]): RDD[String] = {
val field_ = this.field
rdd.map(x => field_ + x)
}
Spark的API在程序驱动程序中依赖于传递函数以在集群上运行。 在Java中,函数由实现 org.apache.spark.api.java.function 包中的接口的类表示。 创建此类函数有两种方法:
- 在您自己的类中实现函数接口,既可以作为匿名内部类,也可以作为命名类,并将其实例传递给Spark。
- 使用 lambda 表达式 简洁地定义实现。
虽然本指南的大部分内容使用了lambda语法以简洁,但实际上可以用长形式轻松使用所有相同的API。例如,我们可以将上面的代码写成如下:
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
});
或者,如果内联编写函数显得笨拙:
class GetLength implements Function<String, Integer> {
public Integer call(String s) { return s.length(); }
}
class Sum implements Function2<Integer, Integer, Integer> {
public Integer call(Integer a, Integer b) { return a + b; }
}
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());
请注意,Java中的匿名内部类也可以访问封闭范围内的变量,只要它们被标记为
final
。Spark会将这些变量的副本发送到每个工作节点,就像对待其他语言一样。
理解闭包
关于Spark,更难的事情之一是理解在集群中执行代码时变量和方法的范围和生命周期。修改其范围外变量的RDD操作可能是一个常见的混淆来源。在下面的示例中,我们将查看使用
foreach()
来递增计数器的代码,但其他操作也可能出现类似的问题。
示例
考虑下面这个简单的 RDD 元素和相加,它的行为可能会有所不同,具体取决于执行是否在同一个 JVM 内部。一个常见的例子是当在
local
模式下运行 Spark(
--master = local[n]
)与将 Spark 应用程序部署到集群时(例如,通过 spark-submit 到 YARN):
counter = 0
rdd = sc.parallelize(data)
# 错误:不要这样做!!
def increment_counter(x):
global counter
counter += x
rdd.foreach(increment_counter)
print("计数器值: ", counter)
var counter = 0
var rdd = sc.parallelize(data)
// 错误:不要这样做!!
rdd.foreach(x => counter += x)
println("计数器值: " + counter)
int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);
// 错误:不要这样做!!
rdd.foreach(x -> counter += x);
println("Counter value: " + counter);
本地模式与集群模式
上述代码的行为是未定义的,可能无法按预期工作。为了执行作业,Spark 将 RDD 操作的处理分解为任务,每个任务由一个执行器执行。在执行之前,Spark 计算任务的
闭包
。闭包是指必须对执行器可见的变量和方法,以便其对 RDD 进行计算(在此情况下为
foreach()
)。该闭包被序列化并发送到每个执行器。
发送到每个执行器的闭包中的变量现在是副本,因此,当
counter
在
foreach
函数中被引用时,它不再是驱动节点上的
counter
。驱动节点的内存中仍然有一个
counter
,但这对执行器来说不再可见!执行器只看到来自序列化闭包的副本。因此,最终的
counter
值仍然是零,因为对
counter
的所有操作都是引用序列化闭包中的值。
在本地模式下,在某些情况下,
foreach
函数实际上将在与驱动程序相同的 JVM 中执行,并将引用相同的原始
counter
,并可能实际更新它。
为了确保在这些情况下有明确的行为,应使用一个
Accumulator
。Spark中的累加器专门用于提供一个在集群中的工作节点间分割执行时安全更新变量的机制。本指南的累加器部分将更详细地讨论这些内容。
一般来说,闭包——像循环或局部定义的方法,不应该被用来改变某些全局状态。Spark 不定义或保证从闭包外引用的对象的变更行为。一些这样做的代码在本地模式下可能会正常工作,但这只是偶然,且这样的代码在分布式模式下不会按预期行为。 如果需要某种全局聚合,请改用累加器。
打印RDD的元素
另一个常见的习语是尝试使用
rdd.foreach(println)
或
rdd.map(println)
打印 RDD 的元素。在单台机器上,这将生成预期的输出并打印所有 RDD 的元素。然而,在
cluster
模式下,被执行者调用的
stdout
现在写入的是执行者的
stdout
,而不是驱动程序上的,因此驱动程序上的
stdout
不会显示这些!要在驱动程序上打印所有元素,可以使用
collect()
方法首先将 RDD 带到驱动节点,如此:
rdd.collect().foreach(println)
。不过,这可能导致驱动程序内存耗尽,因为
collect()
将整个 RDD 获取到单台机器上;如果你只需要打印 RDD 的几个元素,使用
take()
更安全:
rdd.take(100).foreach(println)
。
处理键值对
虽然大多数Spark操作可以在包含任何类型对象的RDD上进行,但一些特殊操作仅在键值对的RDD上可用。 最常见的操作是分布式“洗牌”操作,例如按键对元素进行分组或聚合。
在Python中,这些操作作用于包含内置Python元组的RDD,例如
(1, 2)
。只需创建这样的元组,然后调用所需的操作。
例如,以下代码使用
reduceByKey
操作对键值对进行计数,以统计文本文件中每行出现的次数:
lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
我们还可以使用
counts.sortByKey()
,例如,按字母顺序排序这些对,最后使用
counts.collect()
将它们作为对象列表返回给驱动程序。
虽然大多数Spark操作可以在包含任何类型对象的RDD上工作,但有一些特殊操作仅在键值对的RDD上可用。最常见的操作是分布式“洗牌”操作,比如通过键对元素进行分组或聚合。
在Scala中,这些操作在包含
Tuple2
对象的RDD上是自动可用的(语言中的内置元组,通过简单地编写
(a, b)
创建)。键值对操作在
PairRDDFunctions
类中可用,该类自动包装一个元组的RDD。
例如,以下代码使用
reduceByKey
操作在键值对上统计每行文本在文件中出现的次数:
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
我们也可以使用
counts.sortByKey()
,例如,按字母顺序对这些对进行排序,最后使用
counts.collect()
将它们作为对象数组带回驱动程序。
注意:
当使用自定义对象作为键值对操作中的键时,您必须确保自定义
equals()
方法与一个匹配的
hashCode()
方法相伴随。有关完整细节,请参阅
Object.hashCode()文档
中概述的合同。
虽然大多数Spark操作可以在包含任何类型对象的RDD上进行,但一些特殊操作仅在键值对的RDD上可用。 最常见的操作是分布式“洗牌”操作,例如按键对元素进行分组或聚合。
在Java中,键值对使用
scala.Tuple2
类
来自Scala标准库。您可以简单地调用
new Tuple2(a, b)
来创建一个元组,并稍后使用
tuple._1()
和
tuple._2()
访问其字段。
键值对的RDD由
JavaPairRDD
类表示。你可以使用特殊版本的
map
操作,从JavaRDD构建JavaPairRDD,比如
mapToPair
和
flatMapToPair
。JavaPairRDD将同时具有标准RDD函数和特殊的键值对函数。
例如,以下代码使用
reduceByKey
操作在键值对上统计每行文本在文件中出现的次数:
JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
我们也可以使用
counts.sortByKey()
,例如,按字母顺序对这些对进行排序,最后使用
counts.collect()
将它们作为对象数组带回驱动程序。
注意:
当使用自定义对象作为键值对操作中的键时,您必须确保自定义
equals()
方法伴随着匹配的
hashCode()
方法。有关详细信息,请参见
Object.hashCode() 文档
中概述的合同。
转换
下表列出了Spark支持的一些常见转换。请参考 RDD API文档 ( Scala , Java , Python , R ) 以及配对RDD函数文档 ( Scala , Java ) 以获取详细信息。
操作
下表列出了 Spark 支持的一些常见操作。请参阅 RDD API 文档 ( Scala , Java , Python , R )
和配对 RDD 函数文档 ( Scala , Java ) 详细信息。
操作 | 含义 |
---|---|
reduce ( func ) | 使用一个函数 func (接受两个参数并返回一个)对数据集的元素进行聚合。该函数应该是可交换和可结合的,以便能够正确地并行计算。 |
collect () | 将数据集的所有元素作为数组返回到驱动程序。这通常在过滤或其他返回足够小的数据子集的操作之后是有用的。 |
count () | 返回数据集中的元素数量。 |
first () | 返回数据集的第一个元素(类似于 take(1))。 |
take ( n ) | 返回数据集中前 n 个元素的数组。 |
takeSample ( withReplacement , num , [ seed ]) | 返回数据集中随机抽取的 num 个元素的数组,可以选择是否允许重复抽样,并可选择预先指定随机数生成器种子。 |
takeOrdered ( n , [ordering] ) | 使用自然顺序或自定义比较器返回前 n 个 RDD 元素。 |
saveAsTextFile ( path ) | 将数据集的元素写入指定目录的文本文件(或文本文件集)中,可以是本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统。Spark 将对每个元素调用 toString 方法,将其转换为文件中的文本行。 |
saveAsSequenceFile
(
path
)
(Java 和 Scala) |
将数据集的元素写入指定路径的 Hadoop SequenceFile,可以是本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统。这适用于实现 Hadoop 的 Writable 接口的键值对的 RDD。在 Scala 中,也适用于可隐式转换为 Writable 的类型(Spark 为基本类型如 Int、Double、String 等提供了转换)。 |
saveAsObjectFile
(
path
)
(Java 和 Scala) |
使用 Java 序列化将数据集的元素写入简单格式,随后可以使用
SparkContext.objectFile()
加载。
|
countByKey () | 仅在类型为 (K, V) 的 RDD 上可用。返回每个键的 (K, Int) 对的 hashmap,包含每个键的计数。 |
foreach ( func ) |
在数据集的每个元素上运行函数
func
。这通常用于副作用,例如更新
累加器
或与外部存储系统交互。
注意 :在
foreach()
外部修改累加器以外的变量可能会导致不可定义的行为。有关更多详细信息,请参见
理解闭包
。
|
Spark RDD API 还暴露了一些操作的异步版本,比如
foreachAsync
代替
foreach
,它立即返回一个
FutureAction
给调用者,而不是在操作完成时阻塞。这可以用来管理或等待操作的异步执行。
洗牌操作
在Spark中某些操作会触发一个事件,称为洗牌(shuffle)。洗牌是Spark重新分配数据的机制,以使数据在分区之间以不同方式分组。这通常涉及在执行器和机器之间复制数据,使得洗牌成为一个复杂且昂贵的操作。
背景
要理解洗牌期间发生的事情,我们可以考虑
reduceByKey
操作的例子。
reduceByKey
操作生成一个新的RDD,其中单个键的所有值都组合成一个元组 - 键和对与该键关联的所有值执行减少函数的结果。挑战在于,单个键的所有值并不一定驻留在同一个分区,甚至同一台机器上,但它们必须位于同一位置才能计算结果。
在Spark中,数据通常不会分布在分区中以满足特定操作的必要位置。在计算过程中,单个任务将操作一个单独的分区 - 因此,为了组织所有数据以便单个
reduceByKey
减少任务执行,Spark需要执行全对全操作。它必须从所有分区读取,以找到所有键的所有值,然后将跨分区的值汇总,以计算每个键的最终结果 - 这被称为
洗牌
。
尽管每个分区中新洗牌数据中的元素集合是确定的,分区的顺序也是如此,但这些元素的顺序则不是。如果希望在洗牌后获得可预测的有序数据,则可以使用:
-
mapPartitions
用于对每个分区进行排序,例如.sorted
-
repartitionAndSortWithinPartitions
用于在重新分区的同时有效地对分区进行排序 -
sortBy
用于创建一个全局有序的 RDD
可能导致洗牌的操作包括
重新分区
操作,例如
repartition
和
coalesce
,
按键
操作(除了计数)例如
groupByKey
和
reduceByKey
,以及
连接
操作,例如
cogroup
和
join
。
性能影响
洗牌
是一项昂贵的操作,因为它涉及到磁盘输入/输出、数据序列化和网络输入/输出。为了组织洗牌的数据,Spark 生成了一组任务 -
映射
任务来组织数据,以及一组
归约
任务来汇总数据。这个名称源自 MapReduce,并不直接与 Spark 的
map
和
reduce
操作相关。
在内部,来自单个映射任务的结果会保留在内存中,直到无法容纳为止。然后,这些结果会根据目标分区进行排序,并写入到一个单一的文件中。在归约端,任务读取相关的排序块。
某些洗牌操作可能消耗大量的堆内存,因为它们使用内存中的数据结构来组织记录,在传输之前或之后。具体来说,
reduceByKey
和
aggregateByKey
在映射端创建这些结构,而
'ByKey
操作则在归约端生成这些结构。当数据不适合内存时,Spark 会将这些表溢出到磁盘,增加磁盘 I/O 的额外开销和垃圾回收的增加。
洗牌还会在磁盘上生成大量的中间文件。从Spark 1.3开始,这些文件在相应的RDD不再使用并被垃圾回收之前会被保留。这是为了避免在重新计算血缘关系时需要重新创建洗牌文件。垃圾回收可能只在很长一段时间后发生,如果应用程序保留对这些RDD的引用,或者如果垃圾回收不经常启动。这意味着长时间运行的Spark作业可能会消耗大量的磁盘空间。临时存储目录由
spark.local.dir
配置参数在配置Spark上下文时指定。
通过调整各种配置参数可以调整洗牌行为。请参见 Spark 配置指南 中的“洗牌行为”部分。
RDD 持久性
在Spark中,最重要的能力之一是 持久化 (或 缓存 )一个数据集在内存中,以便跨操作使用。当你持久化一个RDD时,每个节点将存储它计算的任何分区在内存中,并在对该数据集(或从中派生的数据集)进行其他操作时重用它们。这允许未来的操作变得更快(通常快于10倍)。缓存是迭代算法和快速交互使用的关键工具。
您可以使用
persist()
或
cache()
方法将 RDD 标记为持久化。第一次在一个操作中计算时,它将保存在节点的内存中。Spark 的缓存是容错的 - 如果 RDD 的任何分区丢失,它将自动使用最初创建它的转换进行重新计算。
此外,每个持久化的 RDD 可以使用不同的
存储级别
进行存储,例如,允许您将数据集持久化到磁盘,将其持久化在内存中,但作为序列化的 Java 对象(以节省空间),在节点之间进行复制。这些级别是通过传递一个
StorageLevel
对象 (
Scala
,
Java
,
Python
) 到
persist()
来设置的。
cache()
方法是使用默认存储级别的简写,默认存储级别是
StorageLevel.MEMORY_ONLY
(在内存中存储反序列化的对象)。所有的存储级别包括:
存储级别 | 含义 |
---|---|
MEMORY_ONLY | 将RDD作为反序列化的Java对象存储在JVM中。如果RDD不能完全放入内存,某些分区将不会被缓存,每次需要时都会动态重新计算。这是默认级别。 |
MEMORY_AND_DISK | 将RDD作为反序列化的Java对象存储在JVM中。如果RDD不能完全放入内存,将不适合的分区存储在硬盘上,并在需要时从那里读取。 |
MEMORY_ONLY_SER
(Java和Scala) |
将RDD存储为 序列化 的Java对象(每个分区一个字节数组)。 这通常比反序列化对象更节省空间,尤其是在使用 快速序列化器 时,但读取时需要更多的CPU。 |
MEMORY_AND_DISK_SER
(Java和Scala) |
类似于MEMORY_ONLY_SER,但将不适合内存的分区溢出到硬盘,而不是每次需要时动态重新计算。 |
DISK_ONLY | 仅将RDD分区存储在硬盘上。 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等 | 与上述级别相同,但在两个集群节点上复制每个分区。 |
OFF_HEAP (实验性) | 类似于MEMORY_ONLY_SER,但将数据存储在 堆外内存 中。这要求启用堆外内存。 |
注意:
在Python中,存储的对象将始终通过
Pickle
库进行序列化,因此选择序列化级别并不重要。Python中可用的存储级别包括
MEMORY_ONLY
,
MEMORY_ONLY_2
,
MEMORY_AND_DISK
,
MEMORY_AND_DISK_2
,
DISK_ONLY
,
DISK_ONLY_2
,和
DISK_ONLY_3
。
Spark还会在洗牌操作中自动保存一些中间数据(例如
reduceByKey
),即使用户没有调用
persist
。这样做是为了避免在洗牌过程中,如果某个节点失败,会重新计算整个输入。如果用户计划重用生成的RDD,我们仍然建议调用
persist
。
选择哪个存储级别?
Spark的存储级别旨在提供内存使用和CPU效率之间的不同权衡。我们推荐以下过程来选择一个:
-
如果您的 RDD 适合默认存储级别 (
MEMORY_ONLY
), 就保持原样。这是最节省 CPU 的选项,可以让 RDD 上的操作运行得尽可能快。 -
如果不适合,请尝试使用
MEMORY_ONLY_SER
并 选择一个快速的序列化库 ,使对象更加节省空间,但仍然合理快速地访问。(Java 和 Scala) -
除非计算数据集的函数代价高,或者过滤了大量数据,否则不要溢出到磁盘。否则,重新计算一个分区的速度可能与从磁盘读取它一样快。
-
如果您希望快速恢复故障,请使用复制的存储级别(例如,如果使用 Spark 为 Web 应用程序提供请求服务)。 所有 存储级别通过重新计算丢失的数据提供完全的容错,但复制级别允许您继续在 RDD 上运行任务,而无需等待重新计算丢失的分区。
删除数据
Spark 会自动监控每个节点上的缓存使用情况,并以最少最近使用(LRU)的方式删除旧的数据分区。如果您希望手动删除 RDD 而不是等待它从缓存中被删除,请使用
RDD.unpersist()
方法。请注意,该方法默认情况下不会阻塞。要阻塞直到资源被释放,请在调用此方法时指定
blocking=true
。
共享变量
通常,当一个函数传递给Spark操作(如
map
或
reduce
)在远程集群节点上执行时,它是在函数中使用的所有变量的单独副本上进行工作的。这些变量被复制到每台机器上,远程机器上的变量没有更新会传播回驱动程序。支持跨任务的一般读写共享变量将是低效的。然而,Spark确实提供了两种有限类型的
共享变量
以适应两种常见的使用模式:广播变量和累加器。
广播变量
广播变量允许程序员在每台机器上保留一个只读变量的缓存,而不是将其副本随任务一起发送。例如,它们可以用于以高效的方式给每个节点提供一个大型输入数据集的副本。Spark还尝试使用高效的广播算法来分配广播变量,以减少通信成本。
Spark 操作通过一组阶段执行,这些阶段通过分布式“洗牌”操作分隔开。Spark 自动广播每个阶段中任务所需的公共数据。以这种方式广播的数据以序列化形式缓存,并在运行每个任务之前反序列化。这意味着,显式创建广播变量仅在跨多个阶段的任务需要相同数据或在以反序列化形式缓存数据时很重要。
广播变量是通过调用
SparkContext.broadcast(v)
从变量
v
创建的。
广播变量是
v
的一个包装器,其值可以通过调用
value
方法来访问。下面的代码展示了这一点:
>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast 对象 在 0x102789f10>
>>> broadcastVar.value
[1, 2, 3]
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
broadcastVar.value();
// 返回 [1, 2, 3]
在广播变量创建后,应该在集群上运行的任何函数中使用它,而不是值
v
,以确保
v
不会被多次传送到节点。此外,广播后对象
v
不应该被修改,以确保所有节点获得相同的广播变量值(例如,如果该变量稍后被传送到新节点)。
要释放广播变量复制到执行者上的资源,请调用
.unpersist()
。如果之后再次使用广播,它将被重新广播。要永久释放广播变量使用的所有资源,请调用
.destroy()
。在此之后广播变量无法使用。请注意,这些方法默认不阻塞。要阻塞直到资源被释放,请在调用时指定
blocking=true
。
累加器
累加器是通过关联性和交换性操作“添加”的变量,因此可以高效地在并行中支持它们。它们可以用于实现计数器(如在MapReduce中)或求和。Spark本地支持数值类型的累加器,程序员可以为新类型添加支持。
作为用户,您可以创建有名或无名的累加器。如下图所示,一个有名的累加器(在这个实例中是
counter
)将显示在修改该累加器的阶段的网页用户界面中。Spark在“任务”表中显示每个被任务修改的累加器的值。
在用户界面中跟踪累加器可以帮助理解正在运行的阶段的进展(注意:这在Python中尚不支持)。
累加器是通过调用
SparkContext.accumulator(v)
从初始值
v
创建的。运行在集群上的任务可以使用
add
方法或
+=
操作符向其添加值。然而,它们无法读取其值。只有驱动程序可以读取累加器的值,使用其
value
方法。
下面的代码演示了如何使用累加器来求数组元素的和:
>>> accum = sc.accumulator(0)
>>> accum
累加器<id=0, value=0>
>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
...
10/09/29 18:41:08 信息 SparkContext: 任务 完成 在 0.317106 秒
>>> accum.value
10
虽然这段代码使用了对 Int 类型的累加器的内置支持,但程序员也可以通过子类化
AccumulatorParam
创建自己的类型。AccumulatorParam 接口有两个方法:
zero
用于提供数据类型的“零值”,以及
addInPlace
用于将两个值相加。例如,假设我们有一个
Vector
类表示数学向量,我们可以这样写:
class VectorAccumulatorParam(AccumulatorParam):
def zero(self, initialValue):
return Vector.zeros(initialValue.size)
def addInPlace(self, v1, v2):
v1 += v2
return v1
# 然后,创建一个这种类型的累加器:
vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())
可以通过调用
SparkContext.longAccumulator()
或
SparkContext.doubleAccumulator()
来创建一个数字累加器,分别用于累加 Long 或 Double 类型的值。在集群上运行的任务可以使用
add
方法向其添加值。然而,它们无法读取其值。只有驱动程序可以读取累加器的值,使用其
value
方法。
下面的代码显示了一个累加器用来将数组的元素相加:
scala> val accum = sc.longAccumulator("我的累加器")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(我的 累加器), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 信息 SparkContext: 任务 完成 在 0.317106 秒
scala> accum.value
res2: Long = 10
虽然这段代码使用了内置对类型 Long 的累计器的支持,程序员也可以通过子类化
AccumulatorV2
创建自己的类型。AccumulatorV2 抽象类有几个必须重写的方法:
reset
用于将累计器重置为零,
add
用于向累计器添加另一个值,
merge
用于将另一个相同类型的累计器合并到这个累计器中。必须重写的其他方法包含在
API 文档
中。例如,假设我们有一个
MyVector
类表示数学向量,我们可以这样编写:
class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
private val myVector: MyVector = MyVector.createZeroVector
def reset(): Unit = {
myVector.reset()
}
def add(v: MyVector): Unit = {
myVector.add(v)
}
...
}
// 然后,创建这种类型的累加器:
val myVectorAcc = new VectorAccumulatorV2
// 然后,将其注册到spark上下文中:
sc.register(myVectorAcc, "MyVectorAcc1")
请注意,当程序员定义他们自己的 AccumulatorV2 类型时,生成的类型可能与添加的元素类型不同。
可以通过调用
SparkContext.longAccumulator()
或
SparkContext.doubleAccumulator()
创建一个数字累加器,分别用于累加 Long 或 Double 类型的值。然后,运行在集群上的任务可以使用
add
方法向其添加值。然而,它们无法读取其值。只有驱动程序可以读取累加器的值,使用其
value
方法。
下面的代码展示了一个累加器被用来将数组的元素相加:
LongAccumulator accum = jsc.sc().longAccumulator();
sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 信息 SparkContext: 任务在 0.317106 秒内完成
accum.value();
// 返回 10
虽然这段代码使用了内置的对 Long 类型累加器的支持,但程序员也可以通过子类化
AccumulatorV2
来创建自己的类型。AccumulatorV2 抽象类有几个必须重写的方法:
reset
用于将累加器重置为零,
add
用于向累加器添加另一个值,
merge
用于将另一个相同类型的累加器合并到这个累加器中。其他必须重写的方法包含在
API 文档
中。例如,假设我们有一个
MyVector
类,表示数学向量,我们可以写:
class VectorAccumulatorV2 implements AccumulatorV2<MyVector, MyVector> {
private MyVector myVector = MyVector.createZeroVector();
public void reset() {
myVector.reset();
}
public void add(MyVector v) {
myVector.add(v);
}
...
}
// 然后,创建这种类型的累加器:
VectorAccumulatorV2 myVectorAcc = new VectorAccumulatorV2();
// 然后,将其注册到 spark 上下文中:
jsc.sc().register(myVectorAcc, "MyVectorAcc1");
请注意,当程序员定义自己的AccumulatorV2类型时,结果类型可能与添加的元素类型不同。
警告 : 当一个Spark任务完成时,Spark会尝试将该任务中累积的更新合并到一个累加器中。 如果失败,Spark将忽略此失败,仍然将任务标记为成功,并继续运行其他任务。因此, 一个有缺陷的累加器不会影响Spark作业,但尽管Spark作业成功,它可能不会被正确更新。
对于仅在 操作 内执行的累加器更新,Spark 保证每个任务对累加器的更新只会应用一次,即重新启动的任务不会更新该值。在转换中,用户应当注意,如果任务或作业阶段被重新执行,则每个任务的更新可能会被应用多次。
累加器并不会改变Spark的惰性求值模型。如果它们在RDD的操作中被更新,它们的值仅在该RDD作为一个操作的一部分被计算时更新。因此,累加器的更新在像
map()
这样的惰性转换中并不保证会执行。下面的代码片段演示了这一特性:
accum = sc.accumulator(0)
def g(x):
accum.add(x)
return f(x)
data.map(g)
# 在这里,accum 仍然是 0,因为没有任何操作导致 `map` 被计算。
val accum = sc.longAccumulator
data.map { x => accum.add(x); x }
// 在这里,accum仍然是0,因为没有操作导致map操作被计算。
LongAccumulator accum = jsc.sc().longAccumulator();
data.map(x -> { accum.add(x); return f(x); });
// 在这里,累加器仍然是0,因为没有操作导致`map`被计算。
部署到集群
该
申请提交指南
描述了如何向集群提交应用程序。简而言之,一旦您将应用程序打包成一个JAR(用于Java/Scala)或一组
.py
或
.zip
文件(用于Python),
bin/spark-submit
脚本允许您将其提交给任何支持的集群管理器。
从 Java / Scala 启动 Spark 任务
org.apache.spark.launcher 包提供了用于使用简单的Java API作为子进程启动Spark作业的类。
单元测试
Spark 对任何流行的单元测试框架都很友好。只需在测试中创建一个
SparkContext
,并将主 URL 设置为
local
,运行你的操作,然后调用
SparkContext.stop()
来拆除它。确保在
finally
块或测试框架的
tearDown
方法中停止上下文,因为 Spark 不支持在同一程序中同时运行两个上下文。
从这里去哪里
您可以在Spark网站上查看一些
示例Spark程序
。此外,Spark在
examples
目录中包含几个示例(
Scala
,
Java
,
Python
,
R
)。您可以通过将类名传递给Spark的
bin/run-example
脚本来运行Java和Scala示例;例如:
./bin/run-example SparkPi
对于Python示例,请使用
spark-submit
替代:
./bin/spark-submit examples/src/main/python/pi.py
对于 R 示例,请使用
spark-submit
:
./bin/spark-submit examples/src/main/r/dataframe.R
有关优化程序的帮助, 配置 和 调优 指南提供最佳实践的信息。它们对于确保您的数据以有效格式存储在内存中尤为重要。有关部署的帮助, 集群模式概述 描述了分布式操作中涉及的组件和支持的集群管理器。