调优Spark

由于大多数 Spark 计算的内存特性,Spark 程序可能会受到集群中任何资源的瓶颈:CPU、网络带宽或内存。最常见的是,如果数据适合内存,则瓶颈是网络带宽,但有时您还需要进行一些调整,例如 以序列化形式存储 RDD ,以减少内存使用。 本指南将涵盖两个主要主题:数据序列化,这对良好的网络性能至关重要,并且可以减少内存使用,以及内存调优。我们还简单介绍几个较小的主题。

数据序列化

序列化在任何分布式应用程序的性能中扮演着重要角色。慢速序列化对象的格式,或消耗大量字节的格式,会极大地减缓计算速度。通常,这是您应该首先调优以优化 Spark 应用程序的内容。Spark 的目标是在便利性(允许您在操作中使用任何 Java 类型)和性能之间取得平衡。它提供了两个序列化库:

您可以通过使用 SparkConf 初始化您的作业来切换到使用 Kryo,并调用 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 。此设置配置了用于在工作节点之间交换数据的序列化器,以及在将 RDD 序列化到磁盘时使用的序列化器。Kryo 不是默认情况下使用的唯一原因是它需要自定义注册,但我们建议在任何网络密集型应用程序中尝试使用它。自 Spark 2.0.0 起,我们在对具有简单类型、简单类型数组或字符串类型的 RDD 进行交换时,内部使用 Kryo 序列化器。

Spark 自动包含了用于许多常用核心 Scala 类的 Kryo 序列化器,这些类在 Twitter chill 库的 AllScalaRegistrar 中被涵盖。

要注册您自己的自定义类到Kryo,请使用 registerKryoClasses 方法。

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)

Kryo 文档 描述了更高级的注册选项,如添加自定义序列化代码。

如果你的对象很大,你可能还需要增加 spark.kryoserializer.buffer 配置 。这个值需要足够大 以容纳你将序列化的 最大 对象。

最后,如果您不注册自定义类,Kryo 仍然可以工作,但它将不得不为每个对象存储完整的类名,这很浪费。

内存调优

调整内存使用时有三个考虑因素:你的对象使用的内存的 数量 (你可能希望你的整个数据集能够放入内存中),访问这些对象的 成本 ,以及 垃圾回收 的开销(如果你的对象有高周转率的话)。

默认情况下,Java对象的访问速度很快,但其占用的空间可能比其字段内部的“原始”数据多出2-5倍。这是由于几个原因造成的:

本节将首先概述Spark中的内存管理,然后讨论用户可以采取的具体策略,以更有效地利用他/她应用程序中的内存。特别是,我们将描述如何确定对象的内存使用情况,以及如何改善它——要么通过更改数据结构,要么通过以序列化格式存储数据。随后,我们将介绍调整Spark的缓存大小和Java垃圾收集器。

内存管理概述

在Spark中,内存使用主要分为两类:执行和存储。执行内存指用于计算的内存,包括洗牌、连接、排序和聚合,而存储内存则用于缓存和在集群中传播内部数据。在Spark中,执行和存储共享一个统一的区域( M )。当没有使用执行内存时,存储可以占用所有可用内存,反之亦然。如果有必要,执行可能会逐出存储,但仅在总存储内存使用量低于某个阈值( R )之前。换句话说, R 描述了 M 内的一个子区域,在该区域中缓存的块不会被逐出。由于实现的复杂性,存储可能不会逐出执行。

此设计确保了几个理想的属性。首先,不使用缓存的应用程序可以使用整个空间进行执行,从而避免不必要的磁盘溢出。其次,使用缓存的应用程序可以保留一个最小的存储空间(R),在此空间内它们的数据块不会被驱逐。最后,这种方法为各种工作负载提供了合理的开箱即用性能,而无需用户了解内存是如何内部划分的。

虽然有两个相关的配置,但典型用户不需要进行调整,因为默认值适用于大多数工作负载:

要使此堆内存量能舒适地适应JVM的老年代或“已终止”代,应该设置 spark.memory.fraction 的值。有关详细信息,请参阅下面有关高级GC调优的讨论。

确定内存消耗

确定数据集所需内存消耗的最佳方法是创建一个 RDD,将其放入缓存中,然后查看网页 UI 中的“Storage”页面。该页面将告诉您 RDD 占用了多少内存。

要估算特定对象的内存消耗,可以使用 SizeEstimator estimate 方法。这对于尝试不同的数据布局以减少内存使用,以及确定广播变量在每个执行者堆上占用的空间量非常有用。

调优数据结构

减少内存消耗的第一种方法是避免增加开销的Java特性,例如基于指针的数据结构和包装对象。有几种方法可以实现这一点:

  1. 设计你的数据结构时,优先选择对象数组和原始数据类型,而不是标准的Java或Scala集合类(例如 HashMap )。 fastutil 库提供了与Java标准库兼容的原始数据类型的方便集合类。
  2. 尽量避免使用嵌套结构来存储大量的小对象和指针。
  3. 考虑使用数字ID或枚举对象作为键,而不是字符串。
  4. 如果你的RAM少于32 GiB,请设置JVM标志 -XX:+UseCompressedOops ,使指针占用四个字节而不是八个字节。你可以在 spark-env.sh 中添加这些选项。

序列化 RDD 存储

当您的对象仍然过大而无法高效存储时,一种更简单的方法是以 序列化 形式存储它们,使用 RDD 持久化 API 中的序列化存储级别,例如 MEMORY_ONLY_SER 。Spark 将每个 RDD 分区作为一个大型字节数组进行存储。以序列化形式存储数据的唯一缺点是访问时间较慢,因为必须实时反序列化每个对象。如果您希望以序列化形式缓存数据,我们强烈建议 使用 Kryo ,因为它比 Java 序列化(当然比原始 Java 对象)要小得多。

垃圾回收调优

JVM 垃圾收集在你有大量“变动”的 RDD 存储时可能会成为一个问题。(在只读取一次 RDD 然后进行多次操作的程序中,通常没有问题。)当 Java 需要驱逐旧对象以腾出空间给新对象时,它需要遍历所有 Java 对象并找到未使用的对象。这里需要记住的主要一点是 垃圾收集的成本与 Java 对象的数量成正比 ,因此使用较少对象的数据结构(例如,使用一个 Int 数组而不是 LinkedList )可以大大降低这项成本。更好的方法是将对象以序列化形式持久化,如上所述:现在每个 RDD 分区只有 一个 对象(一个字节数组)。在尝试其他技术之前,如果 GC 是一个问题,首先要尝试的方法是使用 序列化缓存

GC 也可能是一个问题,因为您的任务的工作内存(运行任务所需的空间)与缓存于节点上的 RDD 之间存在干扰。我们将讨论如何控制分配给 RDD 缓存的空间以减轻这个问题。

测量 GC 的影响

GC 调优的第一步是收集垃圾回收发生的频率和花费的时间的统计信息。这可以通过将 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 添加到 Java 选项中来实现。(有关将 Java 选项传递给 Spark 作业的信息,请参见 配置指南 。) 下次运行 Spark 作业时,您将在工作节点的日志中看到每次垃圾回收发生时打印的消息。请注意,这些日志将位于集群的工作节点上(在它们工作目录中的 stdout 文件中), 而不是 在您的驱动程序程序上。

高级GC调优

要进一步调整垃圾回收,我们首先需要了解一些关于JVM内存管理的基本信息:

在Spark中,GC调优的目标是确保只有长期存在的RDD被存储在老生代中,并且年轻代的大小足够存储短期存在的对象。这将有助于避免在任务执行过程中收集临时对象的完全垃圾回收(GC)。一些可能有用的步骤包括:

我们的经验表明,GC调优的效果依赖于您的应用程序和可用的内存量。 网上描述了 更多的调优选项 , 但从高层来看,管理全GC发生的频率可以帮助减少开销。

对于执行器的GC调优标志,可以通过在作业配置中设置 spark.executor.defaultJavaOptions spark.executor.extraJavaOptions 来指定。

其他考虑

并行级别

除非为每个操作设置足够高的并行度,否则群集将不会被充分利用。Spark 会根据文件的大小自动设置每个文件上运行的“map”任务数量(尽管您可以通过可选参数控制 SparkContext.textFile 等),而对于分布式的“reduce”操作,例如 groupByKey reduceByKey ,它使用最大的父 RDD 的分区数量。您可以将并行度作为第二个参数传递(请参见 spark.PairRDDFunctions 文档),或者设置配置属性 spark.default.parallelism 来更改默认值。一般来说,我们建议每个 CPU 核心设置 2-3 个任务。

输入路径上的并行列出

有时候,当作业输入有大量目录时,您可能需要增加目录列表的并行度,否则该过程可能会花费很长时间,尤其是在针对像 S3 这样的对象存储时。如果您的作业在使用 Hadoop 输入格式的 RDD 上工作(例如,通过 SparkContext.sequenceFile ),则并行度通过 spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads 控制(当前默认值为 1)。

对于基于文件的数据源的 Spark SQL,您可以调整 spark.sql.sources.parallelPartitionDiscovery.threshold spark.sql.sources.parallelPartitionDiscovery.parallelism 来提高列举的并行性。有关更多详细信息,请参考 Spark SQL 性能调优指南

减少任务的内存使用

有时,你会遇到 OutOfMemoryError,并不是因为你的 RDD 不适合放在内存中,而是因为你某个任务的工作集,例如 groupByKey 中的一个 reduce 任务,太大。Spark 的 shuffle 操作( sortByKey groupByKey reduceByKey join 等)在每个任务内部构建一个哈希表来执行分组,这个哈希表通常会很大。这里最简单的解决方法是 增加并行度 ,使每个任务的输入集更小。Spark 可以高效支持短至 200 毫秒的任务,因为它在许多任务之间重用一个 executor JVM,并且任务启动成本低,因此你可以安全地将并行度增加到超过你的集群核心数量。

广播大变量

使用 广播功能 ,可以显著减少每个序列化任务的大小以及在集群上启动作业的成本。如果你的任务在内部使用了来自驱动程序的任何大型对象(例如静态查找表),考虑将其变为广播变量。Spark会在主节点上打印每个任务的序列化大小,因此你可以查看这些信息来判断你的任务是否过大;通常,超过约20 KiB的任务可能值得进行优化。

数据本地性

数据本地性会对Spark作业的性能产生重大影响。如果数据与操作它的代码在一起,那么计算往往会很快。但是如果代码和数据分开了,就必须进行移动。通常,从一个地方到另一个地方传输序列化代码比传输一块数据要快,因为代码的大小要比数据小得多。Spark根据数据本地性的这一一般原则构建其调度策略。

数据局部性是指数据与处理它的代码的接近程度。根据数据的当前地点,有几个局部性级别。从最近到最远的顺序如下:

Spark倾向于在最佳本地性级别上调度所有任务,但这并不总是可能。在没有任何空闲执行器上有未处理数据的情况下,Spark会切换到较低的本地性级别。有两个选项:a) 等待繁忙的CPU释放以便在同一服务器上开始一个任务,或者b) 立即在更远的地方开始一个新任务,这需要将数据移动到那里。

Spark 通常所做的是稍等一下,希望忙碌的 CPU 释放出来。一旦超时时间到期,它就开始将数据从远处移动到空闲的 CPU。每个级别之间的备用等待超时时间可以单独配置,也可以在一个参数中全部配置;有关详细信息,请参见 spark.locality 参数,在 配置页面 上。 如果你的任务很长并且局部性差,则应增加这些设置,但默认设置通常效果很好。

总结

这是一个简短的指南,旨在指出您在调优 Spark 应用程序时应该了解的主要问题 – 最重要的是数据序列化和内存调优。对于大多数程序,切换到 Kryo 序列化并以序列化形式持久化数据将解决大多数常见的性能问题。如有其他调优最佳实践,请随时在 Spark 邮件列表 上询问。