PySpark集成

本页面概述了使用PySpark构建Kedro管道时的一些最佳实践。内容假设读者已具备Kedro和PySpark的基础知识。

conf/base/spark.yml中集中Spark配置

Spark允许您指定多种不同的配置选项。我们建议将所有选项存储在conf/base/spark.yml文件中。以下是一个示例文件内容,用于指定Spark驱动程序的maxResultSize以及使用FAIR调度器:

spark.driver.maxResultSize: 3g
spark.scheduler.mode: FAIR

注意

Spark的最佳配置取决于您的Spark集群设置。

使用钩子初始化SparkSession

在执行任何PySpark操作之前,您应该使用after_context_created hook来初始化您的SparkSession。这样可以确保在运行Kedro管道之前已经初始化了SparkSession

以下是一个示例实现,通过读取上一节创建的spark.yml配置文件,在src//hooks.py中初始化SparkSession

from kedro.framework.hooks import hook_impl
from pyspark import SparkConf
from pyspark.sql import SparkSession


class SparkHooks:
    @hook_impl
    def after_context_created(self, context) -> None:
        """Initialises a SparkSession using the config
        defined in project's conf folder.
        """

        # Load the spark configuration in spark.yaml using the config loader
        parameters = context.config_loader["spark"]
        spark_conf = SparkConf().setAll(parameters.items())

        # Initialise the spark session
        spark_session_conf = (
            SparkSession.builder.appName(context.project_path.name)
            .enableHiveSupport()
            .config(conf=spark_conf)
        )
        _spark_session = spark_session_conf.getOrCreate()
        _spark_session.sparkContext.setLogLevel("WARN")

您应修改此代码以适应集群的设置,例如如果在YARN上运行Spark,则将master设置为yarn

在流水线中任何位置调用SparkSession.builder.getOrCreate()来获取SparkSessionSparkSession.builder.getOrCreate()是一个全局的单例

我们不建议将Spark会话存储在上下文对象上,因为它无法被序列化,从而会阻止某些插件初始化上下文。

你还需要通过更新src//settings.py中的HOOKS变量来注册SparkHooks,具体如下:

from <package_name>.hooks import SparkHooks

HOOKS = (SparkHooks(),)

使用Kedro内置的Spark数据集来加载和保存原始数据

我们建议使用Kedro内置的Spark数据集将原始数据加载到Spark的DataFrame中,同时也能将其写回存储。我们提供的一些内置Spark数据集包括:

以下示例展示了如何在conf/base/catalog.yml中使用spark.SparkDataset将位于S3中的CSV文件读取到DataFrame中:

weather:
  type: spark.SparkDataset
  filepath: s3a://your_bucket/data/01_raw/weather*
  file_format: csv
  load_args:
    header: True
    inferSchema: True
  save_args:
    sep: '|'
    header: True

或者使用Python API:

import pyspark.sql
from kedro.io import DataCatalog
from kedro_datasets.spark import SparkDataset

spark_ds = SparkDataset(
    filepath="s3a://your_bucket/data/01_raw/weather*",
    file_format="csv",
    load_args={"header": True, "inferSchema": True},
    save_args={"sep": "|", "header": True},
)
catalog = DataCatalog({"weather": spark_ds})

df = catalog.load("weather")
assert isinstance(df, pyspark.sql.DataFrame)

Spark与Delta Lake交互

Delta Lake 是一个开源项目,可在数据湖之上构建湖仓一体架构。它在现有数据湖(如S3、ADLS、GCS和HDFS)的基础上提供ACID事务支持,并统一流式和批处理数据处理。 要配置支持Delta Lake的PySpark环境,请参考Delta Lake文档中的建议。您可能需要更新项目中的src//hooks.py文件里的SparkHooks,以配置支持Delta Lake的SparkSession

from kedro.framework.hooks import hook_impl
from pyspark import SparkConf
from pyspark.sql import SparkSession
+ from delta import configure_spark_with_delta_pip

class SparkHooks:
    @hook_impl
    def after_context_created(self, context) -> None:
        """Initialises a SparkSession using the config
        defined in project's conf folder.
        """

        # Load the spark configuration in spark.yaml using the config loader
        parameters = context.config_loader["spark"]
        spark_conf = SparkConf().setAll(parameters.items())

        # Initialise the spark session
        spark_session_conf = (
            SparkSession.builder.appName(context.project_path.name)
            .enableHiveSupport()
            .config(conf=spark_conf)
        )
-       _spark_session = spark_session_conf.getOrCreate()
+       _spark_session = configure_spark_with_delta_pip(spark_session_conf).getOrCreate()
        _spark_session.sparkContext.setLogLevel("WARN")

请参阅Delta Lake集成指南中关于Kedro与Delta Lake集成的更详细章节。

对中间DataFrame使用MemoryDataset

对于操作DataFrame但不需要执行Spark操作(例如将DataFrame写入存储)的节点,我们建议使用默认的MemoryDataset来保存DataFrame。换句话说,无需在DataCatalogcatalog.yml中指定它。这使您能够利用Spark的优化器和惰性求值特性。

对非DataFrame的Spark对象使用MemoryDataset并设置copy_mode="assign"

有时,您可能希望在管道中使用非DataFrame的Spark对象作为输入和输出。例如,假设您有一个train_model节点用于使用Spark ML的RandomForrestClassifier训练分类器,以及一个predict节点用于使用该分类器进行预测。在这种情况下,train_model节点将输出一个RandomForestClassifier对象,该对象随后成为predict节点的输入。以下是该管道的代码:

from typing import Any, Dict

from kedro.pipeline import node, pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.sql import DataFrame


def train_model(training_data: DataFrame) -> RandomForestClassifier:
    """Node for training a random forest model to classify the data."""
    classifier = RandomForestClassifier(numTrees=10)
    return classifier.fit(training_data)


def predict(model: RandomForestClassifier, testing_data: DataFrame) -> DataFrame:
    """Node for making predictions given a pre-trained model and a testing dataset."""
    predictions = model.transform(testing_data)
    return predictions


def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            node(train_model, inputs=["training_data"], outputs="example_classifier"),
            node(
                predict,
                inputs=dict(model="example_classifier", testing_data="testing_data"),
                outputs="example_predictions",
            ),
        ]
    )

要使管道正常工作,您需要在catalog.yml中按如下方式指定example_classifier

example_classifier:
  type: MemoryDataset
  copy_mode: assign

assign复制模式确保MemoryDataset将被直接赋予Spark对象本身,而非其深拷贝版本,因为深拷贝通常不适用于Spark对象。

使用ThreadRunner最大化并发性的技巧

在底层实现中,每个执行Spark操作(如savecollect)的Kedro节点都会通过同一个SparkSession实例提交到Spark集群作为Spark作业运行。如果这些作业由不同线程提交,它们可能会并发执行。为了实现这一点,您需要使用ThreadRunner来运行Kedro管道:

kedro run --runner=ThreadRunner

为了进一步提高并发级别,如果您使用的是Spark >= 0.8版本,还可以通过启用公平共享机制,让每个节点大致均等地分享Spark集群资源,从而获得大致相同的并发执行机会。默认情况下,任务是以先进先出(FIFO)方式执行的,这意味着如果某个作业占用过多资源,可能会阻塞其他作业的执行。要启用公平共享机制,请将以下配置添加到conf/base/spark.yml文件中(该文件在初始化SparkSession章节中创建):

spark.scheduler.mode: FAIR

更多信息,请参阅Spark文档中关于应用程序内的作业调度的内容。