测试 PySpark ¶
本指南是编写强健的 PySpark 代码测试的参考。
要查看 PySpark 测试工具的文档,请查看这里。要查看 PySpark 内置测试工具的代码,请查看 Spark 仓库这里。要查看 PySpark 测试框架的 JIRA 板块工单,请查看这里。
构建一个PySpark应用程序 ¶
这里是如何启动一个PySpark应用程序的示例。如果您已经有一个准备测试的应用程序,可以随意跳到下一节“测试您的PySpark应用程序”。
首先,启动您的 Spark 会话。
[3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Create a SparkSession
spark = SparkSession.builder.appName("Testing PySpark Example").getOrCreate()
接下来,创建一个 DataFrame。
[5]:
sample_data = [{"name": "John D.", "age": 30},
{"name": "Alice G.", "age": 25},
{"name": "Bob T.", "age": 35},
{"name": "Eve A.", "age": 28}]
df = spark.createDataFrame(sample_data)
现在,让我们为我们的DataFrame定义并应用一个转换函数。
[7]:
from pyspark.sql.functions import col, regexp_replace
# Remove additional spaces in name
def remove_extra_spaces(df, column_name):
# Remove extra spaces from the specified column
df_transformed = df.withColumn(column_name, regexp_replace(col(column_name), "\\s+", " "))
return df_transformed
transformed_df = remove_extra_spaces(df, "name")
transformed_df.show()
+---+--------+
|age| name|
+---+--------+
| 30| John D.|
| 25|Alice G.|
| 35| Bob T.|
| 28| Eve A.|
+---+--------+
测试你的PySpark应用 ¶
现在让我们测试我们的PySpark转换函数。
一个选择是简单地通过眼睛观察结果数据框。然而,对于大型数据框或输入大小,这可能不切实际。
更好的方法是编写测试。以下是我们如何测试代码的一些示例。以下示例适用于 Spark 3.5 及更高版本。
请注意,这些示例并不是详尽无遗的,因为还有许多其他测试框架的替代方案,您可以使用它们来代替
unittest
或
pytest
。内置的 PySpark 测试工具函数是独立的,这意味着它们可以与任何测试框架或 CI 测试管道兼容。
选项 1: 仅使用 PySpark 内置测试实用程序函数 ¶
对于简单的临时验证案例,可以在独立的环境中使用PySpark测试工具,如
assertDataFrameEqual
和
assertSchemaEqual
。您可以在笔记本会话中轻松测试PySpark代码。例如,假设您想要验证两个DataFrame之间的相等性:
[10]:
import pyspark.testing
from pyspark.testing.utils import assertDataFrameEqual
# Example 1
df1 = spark.createDataFrame(data=[("1", 1000), ("2", 3000)], schema=["id", "amount"])
df2 = spark.createDataFrame(data=[("1", 1000), ("2", 3000)], schema=["id", "amount"])
assertDataFrameEqual(df1, df2) # pass, DataFrames are identical
[11]:
# Example 2
df1 = spark.createDataFrame(data=[("1", 0.1), ("2", 3.23)], schema=["id", "amount"])
df2 = spark.createDataFrame(data=[("1", 0.109), ("2", 3.23)], schema=["id", "amount"])
assertDataFrameEqual(df1, df2, rtol=1e-1) # pass, DataFrames are approx equal by rtol
您还可以简单地比较两个DataFrame的结构:
[13]:
from pyspark.testing.utils import assertSchemaEqual
from pyspark.sql.types import StructType, StructField, ArrayType, DoubleType
s1 = StructType([StructField("names", ArrayType(DoubleType(), True), True)])
s2 = StructType([StructField("names", ArrayType(DoubleType(), True), True)])
assertSchemaEqual(s1, s2) # pass, schemas are identical
选项 2:使用 单元测试 ¶
对于更复杂的测试场景,您可能希望使用一个测试框架。
最受欢迎的测试框架选项之一是单元测试。让我们逐步了解如何使用内置的Python
unittest
库编写PySpark测试。有关
unittest
库的更多信息,请参见这里:
https://docs.python.org/3/library/unittest.html
。
首先,您需要一个Spark会话。您可以使用来自
@classmethod
装饰器来自
unittest
包来处理Spark会话的设置和拆除。
[15]:
import unittest
class PySparkTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.spark = SparkSession.builder.appName("Testing PySpark Example").getOrCreate()
@classmethod
def tearDownClass(cls):
cls.spark.stop()
现在让我们编写一个
unittest
类。
[17]:
from pyspark.testing.utils import assertDataFrameEqual
class TestTranformation(PySparkTestCase):
def test_single_space(self):
sample_data = [{"name": "John D.", "age": 30},
{"name": "Alice G.", "age": 25},
{"name": "Bob T.", "age": 35},
{"name": "Eve A.", "age": 28}]
# Create a Spark DataFrame
original_df = spark.createDataFrame(sample_data)
# Apply the transformation function from before
transformed_df = remove_extra_spaces(original_df, "name")
expected_data = [{"name": "John D.", "age": 30},
{"name": "Alice G.", "age": 25},
{"name": "Bob T.", "age": 35},
{"name": "Eve A.", "age": 28}]
expected_df = spark.createDataFrame(expected_data)
assertDataFrameEqual(transformed_df, expected_df)
运行时,
unittest
将会选取所有以“test.”开头的函数。
选项 3:使用 Pytest ¶
我们也可以使用
pytest
编写我们的测试,它是最受欢迎的 Python 测试框架之一。有关
pytest
的更多信息,请参见文档:
https://docs.pytest.org/en/7.1.x/contents.html
。
使用
pytest
夹具允许我们在测试之间共享一个 Spark 会话,并在测试完成后将其删除。
[20]:
import pytest
@pytest.fixture
def spark_fixture():
spark = SparkSession.builder.appName("Testing PySpark Example").getOrCreate()
yield spark
我们可以这样定义我们的测试:
[22]:
import pytest
from pyspark.testing.utils import assertDataFrameEqual
def test_single_space(spark_fixture):
sample_data = [{"name": "John D.", "age": 30},
{"name": "Alice G.", "age": 25},
{"name": "Bob T.", "age": 35},
{"name": "Eve A.", "age": 28}]
# Create a Spark DataFrame
original_df = spark.createDataFrame(sample_data)
# Apply the transformation function from before
transformed_df = remove_extra_spaces(original_df, "name")
expected_data = [{"name": "John D.", "age": 30},
{"name": "Alice G.", "age": 25},
{"name": "Bob T.", "age": 35},
{"name": "Eve A.", "age": 28}]
expected_df = spark.createDataFrame(expected_data)
assertDataFrameEqual(transformed_df, expected_df)
当你使用
pytest
命令运行测试文件时,它将识别所有名称以“test.”开头的函数。
将所有内容结合在一起! ¶
让我们一起看看所有步骤,以单元测试示例为例。
[25]:
# pkg/etl.py
import unittest
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import regexp_replace
from pyspark.testing.utils import assertDataFrameEqual
# Create a SparkSession
spark = SparkSession.builder.appName("Sample PySpark ETL").getOrCreate()
sample_data = [{"name": "John D.", "age": 30},
{"name": "Alice G.", "age": 25},
{"name": "Bob T.", "age": 35},
{"name": "Eve A.", "age": 28}]
df = spark.createDataFrame(sample_data)
# Define DataFrame transformation function
def remove_extra_spaces(df, column_name):
# Remove extra spaces from the specified column using regexp_replace
df_transformed = df.withColumn(column_name, regexp_replace(col(column_name), "\\s+", " "))
return df_transformed
[26]:
# pkg/test_etl.py
import unittest
from pyspark.sql import SparkSession
# Define unit test base class
class PySparkTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.spark = SparkSession.builder.appName("Sample PySpark ETL").getOrCreate()
@classmethod
def tearDownClass(cls):
cls.spark.stop()
# Define unit test
class TestTranformation(PySparkTestCase):
def test_single_space(self):
sample_data = [{"name": "John D.", "age": 30},
{"name": "Alice G.", "age": 25},
{"name": "Bob T.", "age": 35},
{"name": "Eve A.", "age": 28}]
# Create a Spark DataFrame
original_df = spark.createDataFrame(sample_data)
# Apply the transformation function from before
transformed_df = remove_extra_spaces(original_df, "name")
expected_data = [{"name": "John D.", "age": 30},
{"name": "Alice G.", "age": 25},
{"name": "Bob T.", "age": 35},
{"name": "Eve A.", "age": 28}]
expected_df = spark.createDataFrame(expected_data)
assertDataFrameEqual(transformed_df, expected_df)
[27]:
unittest.main(argv=[''], verbosity=0, exit=False)
Ran 1 test in 1.734s
OK
[27]:
<unittest.main.TestProgram at 0x174539db0>