调优Spark
由于大多数 Spark 计算的内存特性,Spark 程序可能会受到集群中任何资源的瓶颈:CPU、网络带宽或内存。最常见的是,如果数据适合内存,则瓶颈是网络带宽,但有时您还需要进行一些调整,例如 以序列化形式存储 RDD ,以减少内存使用。 本指南将涵盖两个主要主题:数据序列化,这对良好的网络性能至关重要,并且可以减少内存使用,以及内存调优。我们还简单介绍几个较小的主题。
数据序列化
序列化在任何分布式应用程序的性能中扮演着重要角色。慢速序列化对象的格式,或消耗大量字节的格式,会极大地减缓计算速度。通常,这是您应该首先调优以优化 Spark 应用程序的内容。Spark 的目标是在便利性(允许您在操作中使用任何 Java 类型)和性能之间取得平衡。它提供了两个序列化库:
-
Java 序列化
:
默认情况下,Spark 使用 Java 的
ObjectOutputStream框架序列化对象,并且可以与您创建的任何实现了java.io.Serializable的类一起工作。 您还可以通过扩展java.io.Externalizable更加精确地控制序列化的性能。 Java 序列化灵活,但通常速度相当慢,并且对于许多类会导致较大的序列化格式。 -
Kryo 序列化
:Spark 还可以使用 Kryo 库(版本 4)更快地序列化对象。Kryo 在速度和紧凑性上都显著快于 Java 序列化(通常快出 10 倍),但不支持所有
Serializable类型,并要求您提前 注册 在程序中将使用的类,以获得最佳性能。
您可以通过使用
SparkConf
初始化您的作业来切换到使用 Kryo,并调用
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
。此设置配置了用于在工作节点之间交换数据的序列化器,以及在将 RDD 序列化到磁盘时使用的序列化器。Kryo 不是默认情况下使用的唯一原因是它需要自定义注册,但我们建议在任何网络密集型应用程序中尝试使用它。自 Spark 2.0.0 起,我们在对具有简单类型、简单类型数组或字符串类型的 RDD 进行交换时,内部使用 Kryo 序列化器。
Spark 自动包含了用于许多常用核心 Scala 类的 Kryo 序列化器,这些类在 Twitter chill 库的 AllScalaRegistrar 中被涵盖。
要注册您自己的自定义类到Kryo,请使用
registerKryoClasses
方法。
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
该 Kryo 文档 描述了更高级的注册选项,如添加自定义序列化代码。
如果你的对象很大,你可能还需要增加
spark.kryoserializer.buffer
配置
。这个值需要足够大
以容纳你将序列化的
最大
对象。
最后,如果您不注册自定义类,Kryo 仍然可以工作,但它将不得不为每个对象存储完整的类名,这很浪费。
内存调优
调整内存使用时有三个考虑因素:你的对象使用的内存的 数量 (你可能希望你的整个数据集能够放入内存中),访问这些对象的 成本 ,以及 垃圾回收 的开销(如果你的对象有高周转率的话)。
默认情况下,Java对象的访问速度很快,但其占用的空间可能比其字段内部的“原始”数据多出2-5倍。这是由于几个原因造成的:
-
每个不同的Java对象都有一个“对象头”,大约16字节,包含诸如指向其类的指针等信息。对于一个数据很少的对象(例如一个
Int字段),这可能比数据本身更大。 -
Java
String的开销约为40字节,超过原始字符串数据(因为它将数据存储在Char数组中,并保留额外的数据,如长度),并且由于String内部使用UTF-16编码,每个字符存储为 两个 字节。因此,一个10字符的字符串可能会轻易消耗60字节。 -
常见的集合类,如
HashMap和LinkedList,使用链式数据结构,每个条目都有一个“包装”对象(例如Map.Entry)。该对象不仅有一个头部,还有指向列表中下一个对象的指针(通常每个8字节)。 -
原始类型的集合通常将其存储为“包装”对象,例如
java.lang.Integer。
本节将首先概述Spark中的内存管理,然后讨论用户可以采取的具体策略,以更有效地利用他/她应用程序中的内存。特别是,我们将描述如何确定对象的内存使用情况,以及如何改善它——要么通过更改数据结构,要么通过以序列化格式存储数据。随后,我们将介绍调整Spark的缓存大小和Java垃圾收集器。
内存管理概述
在Spark中,内存使用主要分为两类:执行和存储。执行内存指用于计算的内存,包括洗牌、连接、排序和聚合,而存储内存则用于缓存和在集群中传播内部数据。在Spark中,执行和存储共享一个统一的区域(
M
)。当没有使用执行内存时,存储可以占用所有可用内存,反之亦然。如果有必要,执行可能会逐出存储,但仅在总存储内存使用量低于某个阈值(
R
)之前。换句话说,
R
描述了
M
内的一个子区域,在该区域中缓存的块不会被逐出。由于实现的复杂性,存储可能不会逐出执行。
此设计确保了几个理想的属性。首先,不使用缓存的应用程序可以使用整个空间进行执行,从而避免不必要的磁盘溢出。其次,使用缓存的应用程序可以保留一个最小的存储空间(R),在此空间内它们的数据块不会被驱逐。最后,这种方法为各种工作负载提供了合理的开箱即用性能,而无需用户了解内存是如何内部划分的。
虽然有两个相关的配置,但典型用户不需要进行调整,因为默认值适用于大多数工作负载:
-
spark.memory.fraction表示M的大小占 (JVM 堆空间 - 300MiB) 的比例(默认为 0.6)。剩余的空间(40%)保留给用户数据结构、Spark 内部元数据,以及在稀疏和异常大的记录情况下防止 OOM 错误。 -
spark.memory.storageFraction表示R的大小占M的比例(默认为 0.5)。R是M内部用于缓存块的存储空间,这些缓存块不会被执行驱逐。
要使此堆内存量能舒适地适应JVM的老年代或“已终止”代,应该设置
spark.memory.fraction
的值。有关详细信息,请参阅下面有关高级GC调优的讨论。
确定内存消耗
确定数据集所需内存消耗的最佳方法是创建一个 RDD,将其放入缓存中,然后查看网页 UI 中的“Storage”页面。该页面将告诉您 RDD 占用了多少内存。
要估算特定对象的内存消耗,可以使用
SizeEstimator
的
estimate
方法。这对于尝试不同的数据布局以减少内存使用,以及确定广播变量在每个执行者堆上占用的空间量非常有用。
调优数据结构
减少内存消耗的第一种方法是避免增加开销的Java特性,例如基于指针的数据结构和包装对象。有几种方法可以实现这一点:
-
设计你的数据结构时,优先选择对象数组和原始数据类型,而不是标准的Java或Scala集合类(例如
HashMap)。 fastutil 库提供了与Java标准库兼容的原始数据类型的方便集合类。 - 尽量避免使用嵌套结构来存储大量的小对象和指针。
- 考虑使用数字ID或枚举对象作为键,而不是字符串。
-
如果你的RAM少于32 GiB,请设置JVM标志
-XX:+UseCompressedOops,使指针占用四个字节而不是八个字节。你可以在spark-env.sh中添加这些选项。
序列化 RDD 存储
当您的对象仍然过大而无法高效存储时,一种更简单的方法是以
序列化
形式存储它们,使用
RDD 持久化 API
中的序列化存储级别,例如
MEMORY_ONLY_SER
。Spark 将每个 RDD 分区作为一个大型字节数组进行存储。以序列化形式存储数据的唯一缺点是访问时间较慢,因为必须实时反序列化每个对象。如果您希望以序列化形式缓存数据,我们强烈建议
使用 Kryo
,因为它比 Java 序列化(当然比原始 Java 对象)要小得多。
垃圾回收调优
JVM 垃圾收集在你有大量“变动”的 RDD 存储时可能会成为一个问题。(在只读取一次 RDD 然后进行多次操作的程序中,通常没有问题。)当 Java 需要驱逐旧对象以腾出空间给新对象时,它需要遍历所有 Java 对象并找到未使用的对象。这里需要记住的主要一点是
垃圾收集的成本与 Java 对象的数量成正比
,因此使用较少对象的数据结构(例如,使用一个
Int
数组而不是
LinkedList
)可以大大降低这项成本。更好的方法是将对象以序列化形式持久化,如上所述:现在每个 RDD 分区只有
一个
对象(一个字节数组)。在尝试其他技术之前,如果 GC 是一个问题,首先要尝试的方法是使用
序列化缓存
。
GC 也可能是一个问题,因为您的任务的工作内存(运行任务所需的空间)与缓存于节点上的 RDD 之间存在干扰。我们将讨论如何控制分配给 RDD 缓存的空间以减轻这个问题。
测量 GC 的影响
GC 调优的第一步是收集垃圾回收发生的频率和花费的时间的统计信息。这可以通过将
-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
添加到 Java 选项中来实现。(有关将 Java 选项传递给 Spark 作业的信息,请参见
配置指南
。) 下次运行 Spark 作业时,您将在工作节点的日志中看到每次垃圾回收发生时打印的消息。请注意,这些日志将位于集群的工作节点上(在它们工作目录中的
stdout
文件中),
而不是
在您的驱动程序程序上。
高级GC调优
要进一步调整垃圾回收,我们首先需要了解一些关于JVM内存管理的基本信息:
-
Java堆空间分为两个区域:年轻代和老年代。年轻代用于存放短寿命对象,而老年代则用于存放寿命较长的对象。
-
年轻代进一步分为三个区域 [Eden, Survivor1, Survivor2]。
-
垃圾回收过程的简要描述:当Eden满时,会对Eden进行一次小垃圾回收(minor GC),并将Eden和Survivor1中存活的对象复制到Survivor2。Survivor区域会交换。如果一个对象足够老或者Survivor2已满,则将其移动到老年代。最后,当老年代接近满时,会调用一次完全垃圾回收(full GC)。
在Spark中,GC调优的目标是确保只有长期存在的RDD被存储在老生代中,并且年轻代的大小足够存储短期存在的对象。这将有助于避免在任务执行过程中收集临时对象的完全垃圾回收(GC)。一些可能有用的步骤包括:
-
通过收集GC统计信息检查是否存在过多的垃圾回收。如果在任务完成之前多次调用了完整的GC,这意味着没有足够的内存可用于执行任务。
-
如果有过多的次要收集但主要GC不多,为Eden分配更多内存将有帮助。您可以将Eden的大小设置为每个任务所需内存的一个高估值。如果Eden的大小确定为
E,则可以使用选项-Xmn=4/3*E来设置年轻代的大小。(按4/3的比例增长是为了考虑幸存者区域所占用的空间。) -
在打印的GC统计信息中,如果OldGen接近满,则通过降低
spark.memory.fraction来减少用于缓存的内存量;缓存较少的对象比降低任务执行速度要好。或者,可以考虑减少年轻代的大小。这意味着如果您已按上述设置,则降低-Xmn。如果没有,请尝试更改JVM的NewRatio参数的值。许多JVM的默认值为2,这意味着老年代占用堆的2/3。它应该足够大,以使这个比例超过spark.memory.fraction。 -
尝试使用
-XX:+UseG1GC的G1GC垃圾回收器。在某些垃圾回收是瓶颈的情况下,它可以提高性能。请注意,对于大执行器堆大小,可能需要通过-XX:G1HeapRegionSize增加 G1区域大小 。 -
作为一个例子,如果您的任务正在从HDFS读取数据,可以使用从HDFS读取的数据块的大小来估计任务使用的内存量。请注意,解压缩的块大小通常是块大小的2倍或3倍。因此,如果我们希望有3或4个任务的工作空间,而HDFS块大小为128 MiB,我们可以估计Eden的大小为
4*3*128MiB。 -
监测新的设置如何改变垃圾回收的频率和所用时间。
我们的经验表明,GC调优的效果依赖于您的应用程序和可用的内存量。 网上描述了 更多的调优选项 , 但从高层来看,管理全GC发生的频率可以帮助减少开销。
对于执行器的GC调优标志,可以通过在作业配置中设置
spark.executor.defaultJavaOptions
或
spark.executor.extraJavaOptions
来指定。
其他考虑
并行级别
除非为每个操作设置足够高的并行度,否则群集将不会被充分利用。Spark 会根据文件的大小自动设置每个文件上运行的“map”任务数量(尽管您可以通过可选参数控制
SparkContext.textFile
等),而对于分布式的“reduce”操作,例如
groupByKey
和
reduceByKey
,它使用最大的父 RDD 的分区数量。您可以将并行度作为第二个参数传递(请参见
spark.PairRDDFunctions
文档),或者设置配置属性
spark.default.parallelism
来更改默认值。一般来说,我们建议每个 CPU 核心设置 2-3 个任务。
输入路径上的并行列出
有时候,当作业输入有大量目录时,您可能需要增加目录列表的并行度,否则该过程可能会花费很长时间,尤其是在针对像 S3 这样的对象存储时。如果您的作业在使用 Hadoop 输入格式的 RDD 上工作(例如,通过
SparkContext.sequenceFile
),则并行度通过
spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads
控制(当前默认值为 1)。
对于基于文件的数据源的 Spark SQL,您可以调整
spark.sql.sources.parallelPartitionDiscovery.threshold
和
spark.sql.sources.parallelPartitionDiscovery.parallelism
来提高列举的并行性。有关更多详细信息,请参考
Spark SQL 性能调优指南
。
减少任务的内存使用
有时,你会遇到 OutOfMemoryError,并不是因为你的 RDD 不适合放在内存中,而是因为你某个任务的工作集,例如
groupByKey
中的一个 reduce 任务,太大。Spark 的 shuffle 操作(
sortByKey
、
groupByKey
、
reduceByKey
、
join
等)在每个任务内部构建一个哈希表来执行分组,这个哈希表通常会很大。这里最简单的解决方法是
增加并行度
,使每个任务的输入集更小。Spark 可以高效支持短至 200 毫秒的任务,因为它在许多任务之间重用一个 executor JVM,并且任务启动成本低,因此你可以安全地将并行度增加到超过你的集群核心数量。
广播大变量
使用 广播功能 ,可以显著减少每个序列化任务的大小以及在集群上启动作业的成本。如果你的任务在内部使用了来自驱动程序的任何大型对象(例如静态查找表),考虑将其变为广播变量。Spark会在主节点上打印每个任务的序列化大小,因此你可以查看这些信息来判断你的任务是否过大;通常,超过约20 KiB的任务可能值得进行优化。
数据本地性
数据本地性会对Spark作业的性能产生重大影响。如果数据与操作它的代码在一起,那么计算往往会很快。但是如果代码和数据分开了,就必须进行移动。通常,从一个地方到另一个地方传输序列化代码比传输一块数据要快,因为代码的大小要比数据小得多。Spark根据数据本地性的这一一般原则构建其调度策略。
数据局部性是指数据与处理它的代码的接近程度。根据数据的当前地点,有几个局部性级别。从最近到最远的顺序如下:
-
PROCESS_LOCAL数据在与运行代码相同的JVM中。这是最佳的局部性。 -
NODE_LOCAL数据在同一节点上。 例如,可以在HDFS中的同一节点上,或在同一节点上的另一个执行器中。这比PROCESS_LOCAL慢一些,因为数据需要在进程之间传输。 -
NO_PREF数据从任何地方访问速度相同,且没有局部性偏好。 -
RACK_LOCAL数据在同一机架的服务器上。数据在同一机架上的不同服务器上,因此需要通过网络发送,通常通过一个交换机。 -
ANY数据则在网络的其他地方,不在同一机架上。
Spark倾向于在最佳本地性级别上调度所有任务,但这并不总是可能。在没有任何空闲执行器上有未处理数据的情况下,Spark会切换到较低的本地性级别。有两个选项:a) 等待繁忙的CPU释放以便在同一服务器上开始一个任务,或者b) 立即在更远的地方开始一个新任务,这需要将数据移动到那里。
Spark 通常所做的是稍等一下,希望忙碌的 CPU 释放出来。一旦超时时间到期,它就开始将数据从远处移动到空闲的 CPU。每个级别之间的备用等待超时时间可以单独配置,也可以在一个参数中全部配置;有关详细信息,请参见
spark.locality
参数,在
配置页面
上。 如果你的任务很长并且局部性差,则应增加这些设置,但默认设置通常效果很好。
总结
这是一个简短的指南,旨在指出您在调优 Spark 应用程序时应该了解的主要问题 – 最重要的是数据序列化和内存调优。对于大多数程序,切换到 Kryo 序列化并以序列化形式持久化数据将解决大多数常见的性能问题。如有其他调优最佳实践,请随时在 Spark 邮件列表 上询问。