PySpark集成¶
本页面概述了使用PySpark构建Kedro管道时的一些最佳实践。内容假设读者已具备Kedro和PySpark的基础知识。
在conf/base/spark.yml中集中Spark配置¶
Spark允许您指定多种不同的配置选项。我们建议将所有选项存储在conf/base/spark.yml文件中。以下是一个示例文件内容,用于指定Spark驱动程序的maxResultSize以及使用FAIR调度器:
spark.driver.maxResultSize: 3g
spark.scheduler.mode: FAIR
注意
Spark的最佳配置取决于您的Spark集群设置。
使用钩子初始化SparkSession¶
在执行任何PySpark操作之前,您应该使用after_context_created hook来初始化您的SparkSession。这样可以确保在运行Kedro管道之前已经初始化了SparkSession。
以下是一个示例实现,通过读取上一节创建的spark.yml配置文件,在src/中初始化SparkSession:
from kedro.framework.hooks import hook_impl
from pyspark import SparkConf
from pyspark.sql import SparkSession
class SparkHooks:
@hook_impl
def after_context_created(self, context) -> None:
"""Initialises a SparkSession using the config
defined in project's conf folder.
"""
# Load the spark configuration in spark.yaml using the config loader
parameters = context.config_loader["spark"]
spark_conf = SparkConf().setAll(parameters.items())
# Initialise the spark session
spark_session_conf = (
SparkSession.builder.appName(context.project_path.name)
.enableHiveSupport()
.config(conf=spark_conf)
)
_spark_session = spark_session_conf.getOrCreate()
_spark_session.sparkContext.setLogLevel("WARN")
您应修改此代码以适应集群的设置,例如如果在YARN上运行Spark,则将master设置为yarn。
在流水线中任何位置调用SparkSession.builder.getOrCreate()来获取SparkSession。SparkSession.builder.getOrCreate()是一个全局的单例。
我们不建议将Spark会话存储在上下文对象上,因为它无法被序列化,从而会阻止某些插件初始化上下文。
你还需要通过更新src/中的HOOKS变量来注册SparkHooks,具体如下:
from <package_name>.hooks import SparkHooks
HOOKS = (SparkHooks(),)
使用Kedro内置的Spark数据集来加载和保存原始数据¶
我们建议使用Kedro内置的Spark数据集将原始数据加载到Spark的DataFrame中,同时也能将其写回存储。我们提供的一些内置Spark数据集包括:
以下示例展示了如何在conf/base/catalog.yml中使用spark.SparkDataset将位于S3中的CSV文件读取到DataFrame中:
weather:
type: spark.SparkDataset
filepath: s3a://your_bucket/data/01_raw/weather*
file_format: csv
load_args:
header: True
inferSchema: True
save_args:
sep: '|'
header: True
或者使用Python API:
import pyspark.sql
from kedro.io import DataCatalog
from kedro_datasets.spark import SparkDataset
spark_ds = SparkDataset(
filepath="s3a://your_bucket/data/01_raw/weather*",
file_format="csv",
load_args={"header": True, "inferSchema": True},
save_args={"sep": "|", "header": True},
)
catalog = DataCatalog({"weather": spark_ds})
df = catalog.load("weather")
assert isinstance(df, pyspark.sql.DataFrame)
Spark与Delta Lake交互¶
Delta Lake 是一个开源项目,可在数据湖之上构建湖仓一体架构。它在现有数据湖(如S3、ADLS、GCS和HDFS)的基础上提供ACID事务支持,并统一流式和批处理数据处理。
要配置支持Delta Lake的PySpark环境,请参考Delta Lake文档中的建议。您可能需要更新项目中的src/文件里的SparkHooks,以配置支持Delta Lake的SparkSession:
from kedro.framework.hooks import hook_impl
from pyspark import SparkConf
from pyspark.sql import SparkSession
+ from delta import configure_spark_with_delta_pip
class SparkHooks:
@hook_impl
def after_context_created(self, context) -> None:
"""Initialises a SparkSession using the config
defined in project's conf folder.
"""
# Load the spark configuration in spark.yaml using the config loader
parameters = context.config_loader["spark"]
spark_conf = SparkConf().setAll(parameters.items())
# Initialise the spark session
spark_session_conf = (
SparkSession.builder.appName(context.project_path.name)
.enableHiveSupport()
.config(conf=spark_conf)
)
- _spark_session = spark_session_conf.getOrCreate()
+ _spark_session = configure_spark_with_delta_pip(spark_session_conf).getOrCreate()
_spark_session.sparkContext.setLogLevel("WARN")
请参阅Delta Lake集成指南中关于Kedro与Delta Lake集成的更详细章节。
对中间DataFrame使用MemoryDataset¶
对于操作DataFrame但不需要执行Spark操作(例如将DataFrame写入存储)的节点,我们建议使用默认的MemoryDataset来保存DataFrame。换句话说,无需在DataCatalog或catalog.yml中指定它。这使您能够利用Spark的优化器和惰性求值特性。
对非DataFrame的Spark对象使用MemoryDataset并设置copy_mode="assign"¶
有时,您可能希望在管道中使用非DataFrame的Spark对象作为输入和输出。例如,假设您有一个train_model节点用于使用Spark ML的RandomForrestClassifier训练分类器,以及一个predict节点用于使用该分类器进行预测。在这种情况下,train_model节点将输出一个RandomForestClassifier对象,该对象随后成为predict节点的输入。以下是该管道的代码:
from typing import Any, Dict
from kedro.pipeline import node, pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.sql import DataFrame
def train_model(training_data: DataFrame) -> RandomForestClassifier:
"""Node for training a random forest model to classify the data."""
classifier = RandomForestClassifier(numTrees=10)
return classifier.fit(training_data)
def predict(model: RandomForestClassifier, testing_data: DataFrame) -> DataFrame:
"""Node for making predictions given a pre-trained model and a testing dataset."""
predictions = model.transform(testing_data)
return predictions
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(train_model, inputs=["training_data"], outputs="example_classifier"),
node(
predict,
inputs=dict(model="example_classifier", testing_data="testing_data"),
outputs="example_predictions",
),
]
)
要使管道正常工作,您需要在catalog.yml中按如下方式指定example_classifier:
example_classifier:
type: MemoryDataset
copy_mode: assign
assign复制模式确保MemoryDataset将被直接赋予Spark对象本身,而非其深拷贝版本,因为深拷贝通常不适用于Spark对象。
使用ThreadRunner最大化并发性的技巧¶
在底层实现中,每个执行Spark操作(如save、collect)的Kedro节点都会通过同一个SparkSession实例提交到Spark集群作为Spark作业运行。如果这些作业由不同线程提交,它们可能会并发执行。为了实现这一点,您需要使用ThreadRunner来运行Kedro管道:
kedro run --runner=ThreadRunner
为了进一步提高并发级别,如果您使用的是Spark >= 0.8版本,还可以通过启用公平共享机制,让每个节点大致均等地分享Spark集群资源,从而获得大致相同的并发执行机会。默认情况下,任务是以先进先出(FIFO)方式执行的,这意味着如果某个作业占用过多资源,可能会阻塞其他作业的执行。要启用公平共享机制,请将以下配置添加到conf/base/spark.yml文件中(该文件在初始化SparkSession章节中创建):
spark.scheduler.mode: FAIR
更多信息,请参阅Spark文档中关于应用程序内的作业调度的内容。