Apache Zeppelin 的 Spark 解释器
概述
Apache Spark 是一个快速且通用的集群计算系统。 它提供了Java、Scala、Python和R的高级API,以及一个支持通用执行图的优化引擎。 Apache Spark 在 Zeppelin 中通过 Spark 解释器组得到支持,该组由以下解释器组成。
| 名称 | 类别 | 描述 |
|---|---|---|
| %spark | SparkInterpreter | 创建一个SparkContext/SparkSession并提供Scala环境 |
| %spark.pyspark | PySparkInterpreter | 提供Python环境 |
| %spark.ipyspark | IPySparkInterpreter | 提供一个IPython环境 |
| %spark.r | SparkRInterpreter | 提供一个支持SparkR的原始R环境 |
| %spark.ir | SparkIRInterpreter | 提供基于Jupyter IRKernel的带有SparkR支持的R环境 |
| %spark.shiny | SparkShinyInterpreter | 用于创建支持SparkR的R shiny应用程序 |
| %spark.sql | SparkSQLInterpreter | 提供一个SQL环境 |
主要功能
| 功能 | 描述 |
|---|---|
| 支持多个版本的Spark | 您可以在一个Zeppelin实例中运行不同版本的Spark |
| 支持多个版本的Scala | 您可以在一个Zeppelin实例中运行不同版本的Scala(2.12/2.13)的Spark |
| 支持多种语言 | 支持Scala、SQL、Python、R,除此之外,您还可以跨语言协作,例如,您可以编写Scala UDF并在PySpark中使用它 |
| 支持多种执行模式 | Local | Standalone | Yarn | K8s |
| 交互式开发 | 交互式开发用户体验提高您的生产力 |
| 内联可视化 | 您可以使用Python/R的绘图库可视化Spark Dataset/DataFrame,甚至可以在Zeppelin中制作SparkR Shiny应用程序 | Multi-tenancy | Multiple user can work in one Zeppelin instance without affecting each other. | Rest API Support | You can not only submit Spark job via Zeppelin notebook UI, but also can do that via its rest api (You can use Zeppelin as Spark job server). |
在Zeppelin docker中玩转Spark
对于初学者,我们建议您在Zeppelin docker中玩转Spark。
在Zeppelin docker镜像中,我们已经安装了
miniconda和许多有用的python和R库
包括IPython和IRkernel的先决条件,因此%spark.pyspark将使用IPython,并且%spark.ir已启用。
无需任何额外配置,您可以直接运行Spark Tutorial文件夹下的大部分教程笔记。
首先你需要下载Spark,因为Zeppelin没有附带Spark的二进制发行版。
例如,这里我们下载Spark 3.1.2到/mnt/disk1/spark-3.1.2,
并将其挂载到Zeppelin docker容器中,并运行以下命令来启动Zeppelin docker容器。
docker run -u $(id -u) -p 8080:8080 -p 4040:4040 --rm -v /mnt/disk1/spark-3.1.2:/opt/spark -e SPARK_HOME=/opt/spark --name zeppelin apache/zeppelin:0.11.2
运行上述命令后,您可以打开http://localhost:8080在Zeppelin中玩转Spark。我们仅在Zeppelin docker中验证了spark本地模式,其他模式可能由于网络问题无法工作。
-p 4040:4040是为了暴露Spark web ui,以便您可以通过http://localhost:8081访问Spark web ui。
配置
The Spark interpreter can be configured with properties provided by Zeppelin. You can also set other Spark properties which are not listed in the table. For a list of additional properties, refer to Spark Available Properties.
| Property | Default | Description |
|---|---|---|
SPARK_HOME |
Location of spark distribution | |
| spark.master | local[*] | Spark master uri. e.g. spark://masterhost:7077 |
| spark.submit.deployMode | The deploy mode of Spark driver program, either "client" or "cluster", Which means to launch driver program locally ("client") or remotely ("cluster") on one of the nodes inside the cluster. | |
| spark.app.name | Zeppelin | The name of spark application. |
| spark.driver.cores | 1 | Number of cores to use for the driver process, only in cluster mode. |
| spark.driver.memory | 1g | Amount of memory to use for the driver process, i.e. where SparkContext is initialized, in the same format as JVM memory strings with a size unit suffix ("k", "m", "g" or "t") (e.g. 512m, 2g). |
| spark.executor.cores | 1 | The number of cores to use on each executor |
| spark.executor.memory | 1g | Executor memory per worker instance. e.g. 512m, 32g |
| spark.executor.instances | 2 | The number of executors for static allocation |
| spark.files | Comma-separated list of files to be placed in the working directory of each executor. Globs are allowed. | |
| spark.jars | Comma-separated list of jars to include on the driver and executor classpaths. Globs are allowed. | |
| spark.jars.packages | Comma-separated list of Maven coordinates of jars to include on the driver and executor classpaths. The coordinates should be groupId:artifactId:version. If spark.jars.ivySettings is given artifacts will be resolved according to the configuration in the file, otherwise artifacts will be searched for in the local maven repo, then maven central and finally any additional remote repositories given by the command-line option --repositories. | |
PYSPARK_PYTHON |
python | Python binary executable to use for PySpark in both driver and executors (default is python).
Property spark.pyspark.python take precedence if it is set |
PYSPARK_DRIVER_PYTHON |
python | Python binary executable to use for PySpark in driver only (default is PYSPARK_PYTHON).
Property spark.pyspark.driver.python take precedence if it is set |
| zeppelin.pyspark.useIPython | false | Whether use IPython when the ipython prerequisites are met in %spark.pyspark |
| zeppelin.R.cmd | R | R binary executable path. |
| zeppelin.spark.concurrentSQL | false | Execute multiple SQL concurrently if set true. |
| zeppelin.spark.concurrentSQL.max | 10 | Max number of SQL concurrently executed |
| zeppelin.spark.maxResult | 1000 | Max number rows of Spark SQL result to display. |
| zeppelin.spark.run.asLoginUser | true | Whether run spark job as the zeppelin login user, it is only applied when running spark job in hadoop yarn cluster and shiro is enabled. |
| zeppelin.spark.printREPLOutput | true | Print scala REPL output |
| zeppelin.spark.useHiveContext | true | Use HiveContext instead of SQLContext if it is true. Enable hive for SparkSession |
| zeppelin.spark.enableSupportedVersionCheck | true | Do not change - developer only setting, not for production use |
| zeppelin.spark.sql.interpolation | false | Enable ZeppelinContext variable interpolation into spark sql |
| zeppelin.spark.uiWebUrl | Overrides Spark UI default URL. Value should be a full URL (ex: http://{hostName}/{uniquePath}. In Kubernetes mode, value can be Jinja template string with 3 template variables PORT, SERVICENAME and SERVICEDOMAIN . (e.g.: http://{{PORT}}-{{SERVICENAME}}.{{SERVICEDOMAIN}} ). In yarn mode, value could be a knox url with {{applicationId}} as placeholder, (e.g.: https://knox-server:8443/gateway/yarnui/yarn/proxy/{{applicationId}}/) | |
| spark.webui.yarn.useProxy | false | whether use yarn proxy url as Spark weburl, e.g. http://localhost:8088/proxy/application1583396598068_0004 |
没有任何配置的情况下,Spark解释器可以在本地模式下直接使用。但如果您想连接到您的Spark集群,您需要按照以下两个简单的步骤操作。
- 设置 SPARK_HOME
- 设置主节点
设置 SPARK_HOME
有几种设置SPARK_HOME的选项。
- 在
zeppelin-env.sh中设置SPARK_HOME - 在解释器设置页面设置
SPARK_HOME - 通过内联通用配置设置
SPARK_HOME
在 zeppelin-env.sh 中设置 SPARK_HOME
如果你只使用一个版本的Spark,那么你可以在zeppelin-env.sh中设置SPARK_HOME,因为zeppelin-env.sh中的任何设置都是全局应用的。
例如
export SPARK_HOME=/usr/lib/spark
您可以选择在zeppelin-env.sh中设置更多的环境变量
# set hadoop conf dir
export HADOOP_CONF_DIR=/usr/lib/hadoop
在解释器设置页面设置 SPARK_HOME
如果你想使用多个版本的Spark,那么你需要创建多个Spark解释器并分别设置SPARK_HOME。例如,
为Spark 3.3创建一个新的Spark解释器spark33并在解释器设置页面中设置其SPARK_HOME,
为Spark 3.4创建一个新的Spark解释器spark34并在解释器设置页面中设置其SPARK_HOME。
通过内联通用配置设置SPARK_HOME
除了在解释器设置页面设置SPARK_HOME外,您还可以使用内联通用配置将配置与代码放在一起,以获得更大的灵活性。例如:
设置主节点
设置SPARK_HOME后,您需要在解释器设置页面或内联配置中设置spark.master属性。该值可能因您的Spark集群部署类型而异。
例如,
- local[*] 在本地模式下
- spark://master:7077 在独立集群中
- yarn-client 在 Yarn 客户端模式下(在 Spark 3.x 中不支持,请参阅下文了解如何在 Spark 3.x 中配置 yarn-client)
- yarn-cluster 在 Yarn 集群模式下(在 Spark 3.x 中不支持,请参考以下内容了解如何在 Spark 3.x 中配置 yarn-cluster)
- mesos://host:5050 在 Mesos 集群中
就是这样。Zeppelin 将以这种方式与任何版本的 Spark 和任何部署类型一起工作,而无需重新构建 Zeppelin。 有关 Spark 和 Zeppelin 版本兼容性的更多信息,请参阅 Zeppelin 下载页面 中的“可用解释器”部分。
请注意,如果不设置SPARK_HOME,它将以包含的Spark版本在本地模式下运行。包含的版本可能因构建配置文件而异。并且这个包含的Spark版本功能有限,因此始终建议设置SPARK_HOME。
Yarn客户端模式和本地模式将在与Zeppelin服务器相同的机器上运行驱动程序,这对于生产环境来说可能是危险的。因为当同时运行许多Spark解释器时,可能会耗尽内存。因此,我们建议您通过设置zeppelin.spark.only_yarn_cluster在zeppelin-site.xml中仅允许yarn-cluster模式。
为 Spark 3.x 配置 yarn 模式
在Spark 3.x中不再支持在spark.master中指定yarn-client和yarn-cluster,而是需要同时使用spark.master和spark.submit.deployMode。
| 模式 | spark.master | spark.submit.deployMode |
|---|---|---|
| Yarn 客户端 | yarn | client |
| Yarn集群 | yarn | cluster |
解释器绑定模式
默认的解释器绑定模式是globally shared。这意味着所有笔记共享同一个Spark解释器。
因此,我们建议您使用isolated per note,这意味着每个笔记都有自己的Spark解释器,互不影响。但如果创建了太多的Spark解释器,可能会耗尽您的机器资源,因此我们建议在生产环境中如果是在hadoop集群中运行Spark,始终使用yarn-cluster模式。您可以通过第一段中的%spark.conf使用内联配置来自定义您的Spark配置。
你也可以选择scoped模式。对于scoped每笔记模式,Zeppelin为每个笔记创建独立的scala编译器/python shell,但共享一个SparkContext/SqlContext/SparkSession。
SparkContext, SQLContext, SparkSession, ZeppelinContext
SparkContext、SQLContext、SparkSession(适用于spark 2.x、3.x)和ZeppelinContext会自动创建并分别作为变量名sc、sqlContext、spark和z在Scala、Python和R环境中暴露。
请注意,Scala/Python/R 环境共享相同的 SparkContext、SQLContext、SparkSession 和 ZeppelinContext 实例。
Yarn 模式
Zeppelin 支持 yarn client 和 yarn cluster 模式(yarn cluster 模式从 0.8.0 版本开始支持)。对于 yarn 模式,您必须指定 SPARK_HOME 和 HADOOP_CONF_DIR。通常您只有一个 hadoop 集群,因此您可以在 zeppelin-env.sh 中设置 HADOOP_CONF_DIR,这将应用于所有 Spark 解释器。如果您想针对多个 hadoop 集群使用 spark,那么您需要在解释器设置中或通过内联通用配置定义 HADOOP_CONF_DIR。
K8s 模式
关于如何在Zeppelin中在K8s上运行Spark,请查看此文档。
PySpark
在Zeppelin中使用PySpark有2种方式:
- Vanilla PySpark
- IPySpark
原版 PySpark(不推荐)
Vanilla PySpark 解释器与普通的 Python 解释器几乎相同,除了 Spark 解释器通过变量 sc、sqlContext、spark 注入 SparkContext、SQLContext、SparkSession。
默认情况下,当IPython可用时,Zeppelin会在%spark.pyspark中使用IPython(Zeppelin会检查是否满足ipython的先决条件),否则它将回退到普通的PySpark实现。
IPySpark(推荐)
你可以通过%spark.ipyspark显式地使用IPySpark。IPySpark解释器与IPython解释器几乎相同,除了Spark解释器通过变量sc、sqlContext、spark注入SparkContext、SQLContext、SparkSession。
关于IPython的功能,你可以参考文档Python Interpreter
SparkR
Zeppelin 通过 %spark.r、%spark.ir 和 %spark.shiny 支持 SparkR。以下是 SparkR 解释器的配置。
| Spark 属性 | 默认值 | 描述 |
|---|---|---|
| zeppelin.R.cmd | R | R 二进制可执行文件路径。 |
| zeppelin.R.knitr | true | 是否使用knitr。(建议安装knitr并在Zeppelin中使用) |
| zeppelin.R.image.width | 100% | R绘图图像宽度。 |
| zeppelin.R.render.options | out.format = 'html', comment = NA, echo = FALSE, results = 'asis', message = F, warning = F, fig.retina = 2 | R绘图选项。 |
| zeppelin.R.shiny.iframe_width | 100% | Shiny应用的IFrame宽度 |
| zeppelin.R.shiny.iframe_height | 500px | Shiny应用的IFrame高度 |
| zeppelin.R.shiny.portRange | : | Shiny 应用会在某个端口启动一个网页应用,此属性用于通过格式 ' |
参考R文档了解如何在Zeppelin中使用R。
SparkSql
Spark SQL 解释器与其他 Spark 解释器共享相同的 SparkContext/SparkSession。这意味着在 Scala、Python 或 R 代码中注册的任何表都可以通过 Spark SQL 访问。 例如:
%spark
case class People(name: String, age: Int)
var df = spark.createDataFrame(List(People("jeff", 23), People("andy", 20)))
df.createOrReplaceTempView("people")
%spark.sql
select * from people
你可以在一个段落中编写多个SQL语句。每个SQL语句用分号分隔。 一个段落中的SQL语句将按顺序运行。 但是,不同段落中的SQL语句可以通过以下配置并发运行。
- 将
zeppelin.spark.concurrentSQL设置为 true 以启用 SQL 并发功能,底层 zeppelin 将更改为使用 fairscheduler 进行 Spark。同时设置zeppelin.spark.concurrentSQL.max以控制并发运行的 SQL 语句的最大数量。 - 通过在你的
SPARK_CONF_DIR下创建fairscheduler.xml来配置池,查看官方spark文档配置池属性 通过设置段落本地属性来设置池属性。例如。
%spark(pool=pool1) sql statement
此池功能也适用于所有版本的Scala Spark和PySpark。对于SparkR,它仅从2.3.0版本开始可用。
依赖管理
对于Spark解释器,不建议使用Zeppelin的依赖管理来管理第三方依赖(%spark.dep在Zeppelin 0.9中也被移除了)。相反,你应该按照以下方式设置标准的Spark属性:
| Spark 属性 | Spark 提交参数 | 描述 |
|---|---|---|
| spark.files | --files | 逗号分隔的文件列表,这些文件将被放置在每个执行器的工作目录中。允许使用通配符。 |
| spark.jars | --jars | 逗号分隔的jar包列表,包含在驱动程序和执行程序的类路径中。允许使用通配符。 |
| spark.jars.packages | --packages | 逗号分隔的Maven坐标列表,用于包含在驱动程序和执行器类路径上的jar包。坐标应为groupId:artifactId:version。如果提供了spark.jars.ivySettings,则根据文件中的配置解析工件,否则将在本地maven仓库中搜索工件,然后在maven中央仓库中搜索,最后在命令行选项--repositories提供的任何其他远程仓库中搜索。 |
作为通用的Spark属性,您可以通过内联配置或解释器设置页面或在zeppelin-env.sh中通过环境变量SPARK_SUBMIT_OPTIONS来设置它们。
例如:
export SPARK_SUBMIT_OPTIONS="--files <my_file> --jars <my_jar> --packages <my_package>"
需要注意的是,SPARK_SUBMIT_OPTIONS 已被弃用,并将在未来的版本中移除。
ZeppelinContext
Zeppelin 自动在你的 Scala/Python 环境中将 ZeppelinContext 作为变量 z 注入。ZeppelinContext 提供了一些额外的功能和实用工具。
详情请参见 Zeppelin-Context。对于 Spark 解释器,你可以使用 z 来显示 Spark 的 Dataset/Dataframe。

设置带有Kerberos的Zeppelin
使用Zeppelin、Kerberos密钥分发中心(KDC)和YARN上的Spark的逻辑设置:

有几种方法可以让Spark在Zeppelin中与启用Kerberos的Hadoop集群一起工作。
共享一个单一的Hadoop集群。 在这种情况下,您只需要在zeppelin-site.xml中指定
zeppelin.server.kerberos.keytab和zeppelin.server.kerberos.principal,Spark解释器将默认使用这些设置。与多个Hadoop集群一起工作。 在这种情况下,您可以指定
spark.yarn.keytab和spark.yarn.principal来覆盖zeppelin.server.kerberos.keytab和zeppelin.server.kerberos.principal。
配置设置
在安装Zeppelin的服务器上,安装Kerberos客户端模块和配置文件krb5.conf。这是为了使服务器能够与KDC通信。
将以下两个属性添加到Spark配置中(
[SPARK_HOME]/conf/spark-defaults.conf):spark.yarn.principal spark.yarn.keytab
注意: 如果您没有权限访问上述的spark-defaults.conf文件,您可以选择通过Zeppelin UI中的Interpreter选项卡将上述行添加到Spark Interpreter设置中。
- 就是这样。开始使用 Zeppelin 吧!
用户模拟
在yarn模式下,启动zeppelin服务器的用户将用于启动Spark yarn应用程序。这不是一个好的做法。 大多数情况下,您会在Zeppelin中启用shiro,并希望使用登录用户来提交Spark yarn应用程序。为此, 您需要启用用户模拟以进行更多的安全控制。为了启用用户模拟,您需要执行以下步骤
步骤 1 启用用户模拟设置 hadoop 的 core-site.xml。例如,如果您使用用户 zeppelin 启动 Zeppelin,则将以下内容添加到 core-site.xml,然后重新启动 hdfs 和 yarn。
<property>
<name>hadoop.proxyuser.zeppelin.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.zeppelin.hosts</name>
<value>*</value>
</property>
步骤 2 在 Spark 解释器的解释器设置中启用解释器用户模拟。(当然首先需要启用 shiro)

步骤3(可选) 如果您使用的是kerberos集群,那么您需要在zeppelin-site.xml中设置zeppelin.server.kerberos.keytab和zeppelin.server.kerberos.principal为您想要模拟的用户(即步骤1中的用户)。
社区
Join our community 与其他人讨论。