快速入门:数据框 ¶
这是PySpark DataFrame API的简短介绍和快速入门。PySpark DataFrame是懒惰求值的。它们是在
RDD
之上实现的。当Spark
转换
数据时,它并不会立即计算转换,而是计划如何稍后计算。当调用类似
collect()
的
操作
时,计算才开始。这个笔记本展示了DataFrame的基本用法,主要面向新用户。您可以在
快速入门页面
的“实时笔记本:DataFrame”中自行运行这些示例的最新版本。
在Apache Spark文档网站中还有其他有用的信息,请参阅 Spark SQL和DataFrames 的最新版本, RDD编程指南 , 结构化流编程指南 , Spark流编程指南 和 机器学习库(MLlib)指南 。
PySpark 应用程序以初始化
SparkSession
开始,这是 PySpark 的入口点,如下所示。 如果通过 pyspark 可执行文件在 PySpark shell 中运行,它会自动为用户在变量 spark 中创建会话。
[1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
数据框创建 ¶
可以通过
pyspark.sql.SparkSession.createDataFrame
创建一个 PySpark DataFrame,通常是通过传递一个列表的列表、元组、字典和
pyspark.sql.Row
,一个
pandas DataFrame
以及一个由此类列表组成的 RDD 来实现。
pyspark.sql.SparkSession.createDataFrame
接受
schema
参数以指定 DataFrame 的模式。当省略时,PySpark 会通过从数据中获取一个样本来推断相应的模式。
首先,您可以从一组行创建一个PySpark DataFrame
[2]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row
df = spark.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df
[2]:
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]
创建一个具有明确架构的 PySpark DataFrame。
[3]:
df = spark.createDataFrame([
(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df
[3]:
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]
从 pandas DataFrame 创建一个 PySpark DataFrame
[4]:
pandas_df = pd.DataFrame({
'a': [1, 2, 3],
'b': [2., 3., 4.],
'c': ['string1', 'string2', 'string3'],
'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df
[4]:
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]
以上创建的DataFrame都具有相同的结果和模式。
[6]:
# All DataFrames above result same.
df.show()
df.printSchema()
+---+---+-------+----------+-------------------+
| a| b| c| d| e|
+---+---+-------+----------+-------------------+
| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
| 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+
root
|-- a: long (nullable = true)
|-- b: double (nullable = true)
|-- c: string (nullable = true)
|-- d: date (nullable = true)
|-- e: timestamp (nullable = true)
查看数据 ¶
可以使用
DataFrame.show()
显示 DataFrame 的顶部行。
[7]:
df.show(1)
+---+---+-------+----------+-------------------+
| a| b| c| d| e|
+---+---+-------+----------+-------------------+
| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+
only showing top 1 row
或者,您可以为笔记本(例如 Jupyter)中 PySpark DataFrame 的即时评估启用
spark.sql.repl.eagerEval.enabled
配置。可以通过
spark.sql.repl.eagerEval.maxNumRows
配置来控制显示的行数。
[8]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df
[8]:
a | b | c | d | e |
---|---|---|---|---|
1 | 2.0 | string1 | 2000-01-01 | 2000-01-01 12:00:00 |
2 | 3.0 | string2 | 2000-02-01 | 2000-01-02 12:00:00 |
3 | 4.0 | string3 | 2000-03-01 | 2000-01-03 12:00:00 |
行也可以垂直显示。当行太长而无法水平显示时,这很有用。
[9]:
df.show(1, vertical=True)
-RECORD 0------------------
a | 1
b | 2.0
c | string1
d | 2000-01-01
e | 2000-01-01 12:00:00
only showing top 1 row
您可以看到DataFrame的模式和列名称如下:
[10]:
df.columns
[10]:
['a', 'b', 'c', 'd', 'e']
[11]:
df.printSchema()
root
|-- a: long (nullable = true)
|-- b: double (nullable = true)
|-- c: string (nullable = true)
|-- d: date (nullable = true)
|-- e: timestamp (nullable = true)
显示数据框的摘要
[12]:
df.select("a", "b", "c").describe().show()
+-------+---+---+-------+
|summary| a| b| c|
+-------+---+---+-------+
| count| 3| 3| 3|
| mean|2.0|3.0| null|
| stddev|1.0|1.0| null|
| min| 1|2.0|string1|
| max| 3|4.0|string3|
+-------+---+---+-------+
DataFrame.collect()
将分布式数据收集到驱动端作为Python中的本地数据。请注意,当数据集太大以至于无法适应驱动端时,这可能会引发内存溢出错误,因为它将所有数据从执行器收集到驱动端。
[13]:
df.collect()
[13]:
[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),
Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]
为了避免抛出内存溢出异常,使用
DataFrame.take()
或
DataFrame.tail()
。
[14]:
df.take(1)
[14]:
[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0))]
PySpark DataFrame 还提供将数据转换回
pandas DataFrame
以利用 pandas API。请注意,
toPandas
也会将所有数据收集到驱动程序端,当数据太大而无法适应驱动程序端时,这可能会导致内存溢出错误。
[15]:
df.toPandas()
[15]:
a | b | c | d | e | |
---|---|---|---|---|---|
0 | 1 | 2.0 | 字符串1 | 2000-01-01 | 2000-01-01 12:00:00 |
1 | 2 | 3.0 | 字符串2 | 2000-02-01 | 2000-01-02 12:00:00 |
2 | 3 | 4.0 | 字符串3 | 2000-03-01 | 2000-01-03 12:00:00 |
选择和访问数据 ¶
PySpark DataFrame 是懒惰求值的,简单选择一个列并不会触发计算,而是返回一个
Column
实例。
[16]:
df.a
[16]:
Column<b'a'>
实际上,大多数按列操作返回
Column
。
[17]:
from pyspark.sql import Column
from pyspark.sql.functions import upper
type(df.c) == type(upper(df.c)) == type(df.c.isNull())
[17]:
True
这些
Column
可以用来从 DataFrame 中选择列。例如,
DataFrame.select()
接受返回另一个 DataFrame 的
Column
实例。
[18]:
df.select(df.c).show()
+-------+
| c|
+-------+
|string1|
|string2|
|string3|
+-------+
指派新的
Column
实例。
[19]:
df.withColumn('upper_c', upper(df.c)).show()
+---+---+-------+----------+-------------------+-------+
| a| b| c| d| e|upper_c|
+---+---+-------+----------+-------------------+-------+
| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
| 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+
要选择一部分行,请使用
DataFrame.filter()
.
[20]:
df.filter(df.a == 1).show()
+---+---+-------+----------+-------------------+
| a| b| c| d| e|
+---+---+-------+----------+-------------------+
| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+
应用函数 ¶
PySpark支持各种UDF和API,允许用户执行Python本机函数。另请参阅最新的 Pandas UDFs 和 Pandas函数API 。例如,下面的示例允许用户在Python本机函数中直接使用 一个pandas Series 的API。
[21]:
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
# Simply plus one by using pandas Series.
return series + 1
df.select(pandas_plus_one(df.a)).show()
+------------------+
|pandas_plus_one(a)|
+------------------+
| 2|
| 3|
| 4|
+------------------+
另一个例子是
DataFrame.mapInPandas
,它允许用户在
pandas DataFrame
中直接使用API,而没有任何限制,例如结果长度。
[22]:
def pandas_filter_func(iterator):
for pandas_df in iterator:
yield pandas_df[pandas_df.a == 1]
df.mapInPandas(pandas_filter_func, schema=df.schema).show()
+---+---+-------+----------+-------------------+
| a| b| c| d| e|
+---+---+-------+----------+-------------------+
| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+
数据分组 ¶
PySpark DataFrame 还提供了一种通过使用常见的方法,即分割-应用-合并策略来处理分组数据。它根据某个条件对数据进行分组,对每个组应用一个函数,然后将它们重新组合回 DataFrame。
[23]:
df = spark.createDataFrame([
['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df.show()
+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
| red|banana| 1| 10|
| blue|banana| 2| 20|
| red|carrot| 3| 30|
| blue| grape| 4| 40|
| red|carrot| 5| 50|
|black|carrot| 6| 60|
| red|banana| 7| 70|
| red| grape| 8| 80|
+-----+------+---+---+
对分组进行分组,然后对结果组应用
avg()
函数。
[24]:
df.groupby('color').avg().show()
+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
| red| 4.8| 48.0|
|black| 6.0| 60.0|
| blue| 3.0| 30.0|
+-----+-------+-------+
您还可以通过使用 pandas API 对每个组应用 Python 原生函数。
[25]:
def plus_mean(pandas_df):
return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())
df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()
+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
| red|banana| -3| 10|
| red|carrot| -1| 30|
| red|carrot| 0| 50|
| red|banana| 2| 70|
| red| grape| 3| 80|
|black|carrot| 0| 60|
| blue|banana| -1| 20|
| blue| grape| 1| 40|
+-----+------+---+---+
共同分组和应用一个函数。
[26]:
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
('time', 'id', 'v1'))
df2 = spark.createDataFrame(
[(20000101, 1, 'x'), (20000101, 2, 'y')],
('time', 'id', 'v2'))
def merge_ordered(l, r):
return pd.merge_ordered(l, r)
df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas(
merge_ordered, schema='time int, id int, v1 double, v2 string').show()
+--------+---+---+---+
| time| id| v1| v2|
+--------+---+---+---+
|20000101| 1|1.0| x|
|20000102| 1|3.0| x|
|20000101| 2|2.0| y|
|20000102| 2|4.0| y|
+--------+---+---+---+
获取数据进/出 ¶
CSV简单易用。Parquet和ORC是高效且紧凑的文件格式,可以更快地读写。
在PySpark中还有许多其他数据源可用,例如JDBC、文本、binaryFile、Avro等。另请参阅Apache Spark文档中的最新 Spark SQL、DataFrames和Datasets指南 。
CSV ¶
[27]:
df.write.csv('foo.csv', header=True)
spark.read.csv('foo.csv', header=True).show()
+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
| red|banana| 1| 10|
| blue|banana| 2| 20|
| red|carrot| 3| 30|
| blue| grape| 4| 40|
| red|carrot| 5| 50|
|black|carrot| 6| 60|
| red|banana| 7| 70|
| red| grape| 8| 80|
+-----+------+---+---+
Parquet ¶
[28]:
df.write.parquet('bar.parquet')
spark.read.parquet('bar.parquet').show()
+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
| red|banana| 1| 10|
| blue|banana| 2| 20|
| red|carrot| 3| 30|
| blue| grape| 4| 40|
| red|carrot| 5| 50|
|black|carrot| 6| 60|
| red|banana| 7| 70|
| red| grape| 8| 80|
+-----+------+---+---+
ORC ¶
[29]:
df.write.orc('zoo.orc')
spark.read.orc('zoo.orc').show()
+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
| red|banana| 1| 10|
| blue|banana| 2| 20|
| red|carrot| 3| 30|
| blue| grape| 4| 40|
| red|carrot| 5| 50|
|black|carrot| 6| 60|
| red|banana| 7| 70|
| red| grape| 8| 80|
+-----+------+---+---+
使用SQL ¶
DataFrame和Spark SQL共享相同的执行引擎,因此它们可以无缝地互换使用。例如,您可以将DataFrame注册为一个表,并像下面这样轻松运行SQL:
[30]:
df.createOrReplaceTempView("tableA")
spark.sql("SELECT count(*) from tableA").show()
+--------+
|count(1)|
+--------+
| 8|
+--------+
此外,UDF可以在SQL中开箱即用地注册和调用:
[31]:
@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series:
return s + 1
spark.udf.register("add_one", add_one)
spark.sql("SELECT add_one(v1) FROM tableA").show()
+-----------+
|add_one(v1)|
+-----------+
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+-----------+
这些SQL表达式可以直接混合并用作PySpark列。
[32]:
from pyspark.sql.functions import expr
df.selectExpr('add_one(v1)').show()
df.select(expr('count(*)') > 0).show()
+-----------+
|add_one(v1)|
+-----------+
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+-----------+
+--------------+
|(count(1) > 0)|
+--------------+
| true|
+--------------+