快速入门:Spark上的Pandas API ¶
这是关于Spark上pandas API的简短介绍,主要面向新用户。本笔记本向您展示了pandas与Spark上pandas API之间的一些关键区别。您可以在 快速入门页面 中的“实时笔记本:pandas API on Spark”中自己运行这些示例。
通常,我们在Spark上导入pandas API,如下所示:
[1]:
import pandas as pd
import numpy as np
import pyspark.pandas as ps
from pyspark.sql import SparkSession
对象创建 ¶
通过传递值的列表创建一个在Spark上的pandas系列,让Spark上的pandas API创建一个默认整数索引:
[2]:
s = ps.Series([1, 3, 5, np.nan, 6, 8])
[3]:
s
[3]:
0 1.0
1 3.0
2 5.0
3 NaN
4 6.0
5 8.0
dtype: float64
通过传递可以转换为类似系列的对象字典来创建pandas-on-Spark DataFrame。
[4]:
psdf = ps.DataFrame(
{'a': [1, 2, 3, 4, 5, 6],
'b': [100, 200, 300, 400, 500, 600],
'c': ["one", "two", "three", "four", "five", "six"]},
index=[10, 20, 30, 40, 50, 60])
[5]:
psdf
[5]:
a | b | c | |
---|---|---|---|
10 | 1 | 100 | 一 |
20 | 2 | 200 | 二 |
30 | 3 | 300 | 三 |
40 | 4 | 400 | 四 |
50 | 5 | 500 | 五 |
60 | 6 | 600 | 六 |
通过传递numpy数组、使用datetime索引和带标签的列来创建pandas DataFrame:
[6]:
dates = pd.date_range('20130101', periods=6)
[7]:
dates
[7]:
DatetimeIndex(['2013-01-01', '2013-01-02', '2013-01-03', '2013-01-04',
'2013-01-05', '2013-01-06'],
dtype='datetime64[ns]', freq='D')
[8]:
pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))
[9]:
pdf
[9]:
A | B | C | D | |
---|---|---|---|---|
2013-01-01 | 0.912558 | -0.795645 | -0.289115 | 0.187606 |
2013-01-02 | -0.059703 | -1.233897 | 0.316625 | -1.226828 |
2013-01-03 | 0.332871 | -1.262010 | -0.434844 | -0.579920 |
2013-01-04 | 0.924016 | -1.022019 | -0.405249 | -1.036021 |
2013-01-05 | -0.772209 | -1.228099 | 0.068901 | 0.896679 |
2013-01-06 | 1.485582 | -0.709306 | -0.202637 | -0.248766 |
现在,这个 pandas DataFrame 可以被转换为 pandas-on-Spark DataFrame
[10]:
psdf = ps.from_pandas(pdf)
[11]:
type(psdf)
[11]:
pyspark.pandas.frame.DataFrame
它的外观和行为与 pandas DataFrame 相同。
[12]:
psdf
[12]:
A | B | C | D | |
---|---|---|---|---|
2013-01-01 | 0.912558 | -0.795645 | -0.289115 | 0.187606 |
2013-01-02 | -0.059703 | -1.233897 | 0.316625 | -1.226828 |
2013-01-03 | 0.332871 | -1.262010 | -0.434844 | -0.579920 |
2013-01-04 | 0.924016 | -1.022019 | -0.405249 | -1.036021 |
2013-01-05 | -0.772209 | -1.228099 | 0.068901 | 0.896679 |
2013-01-06 | 1.485582 | -0.709306 | -0.202637 | -0.248766 |
此外,可以轻松地从Spark DataFrame创建一个pandas-on-Spark DataFrame。
从pandas DataFrame创建Spark DataFrame
[13]:
spark = SparkSession.builder.getOrCreate()
[14]:
sdf = spark.createDataFrame(pdf)
[15]:
sdf.show()
+--------------------+-------------------+--------------------+--------------------+
| A| B| C| D|
+--------------------+-------------------+--------------------+--------------------+
| 0.91255803205208|-0.7956452608556638|-0.28911463069772175| 0.18760566615081622|
|-0.05970271470242...| -1.233896949308984| 0.3166246451758431| -1.2268284000402265|
| 0.33287106947536615|-1.2620100816441786| -0.4348444277082644| -0.5799199651437185|
| 0.9240158461589916|-1.0220190956326003| -0.4052488880650239| -1.0360212104348547|
| -0.7722090016558953|-1.2280986385313222| 0.0689011451939635| 0.8966790729426755|
| 1.4855822995785612|-0.7093056426018517| -0.2026366848847041|-0.24876619876451092|
+--------------------+-------------------+--------------------+--------------------+
从 Spark DataFrame 创建 pandas-on-Spark DataFrame。
[16]:
psdf = sdf.pandas_api()
[17]:
psdf
[17]:
A | B | C | D | |
---|---|---|---|---|
0 | 0.912558 | -0.795645 | -0.289115 | 0.187606 |
1 | -0.059703 | -1.233897 | 0.316625 | -1.226828 |
2 | 0.332871 | -1.262010 | -0.434844 | -0.579920 |
3 | 0.924016 | -1.022019 | -0.405249 | -1.036021 |
4 | -0.772209 | -1.228099 | 0.068901 | 0.896679 |
5 | 1.485582 | -0.709306 | -0.202637 | -0.248766 |
具有特定的 数据类型 。目前支持Spark和pandas都常用的类型。
[18]:
psdf.dtypes
[18]:
A float64
B float64
C float64
D float64
dtype: object
以下是如何显示下面框架中的前几行。
请注意,Spark 数据框中的数据默认不保持自然顺序。可以通过设置
compute.ordered_head
选项来保持自然顺序,但这会导致内部排序的性能开销。
[19]:
psdf.head()
[19]:
A | B | C | D | |
---|---|---|---|---|
0 | 0.912558 | -0.795645 | -0.289115 | 0.187606 |
1 | -0.059703 | -1.233897 | 0.316625 | -1.226828 |
2 | 0.332871 | -1.262010 | -0.434844 | -0.579920 |
3 | 0.924016 | -1.022019 | -0.405249 | -1.036021 |
4 | -0.772209 | -1.228099 | 0.068901 | 0.896679 |
显示索引、列以及底层的numpy数据。
[20]:
psdf.index
[20]:
Int64Index([0, 1, 2, 3, 4, 5], dtype='int64')
[21]:
psdf.columns
[21]:
Index(['A', 'B', 'C', 'D'], dtype='object')
[22]:
psdf.to_numpy()
[22]:
array([[ 0.91255803, -0.79564526, -0.28911463, 0.18760567],
[-0.05970271, -1.23389695, 0.31662465, -1.2268284 ],
[ 0.33287107, -1.26201008, -0.43484443, -0.57991997],
[ 0.92401585, -1.0220191 , -0.40524889, -1.03602121],
[-0.772209 , -1.22809864, 0.06890115, 0.89667907],
[ 1.4855823 , -0.70930564, -0.20263668, -0.2487662 ]])
显示您数据的快速统计摘要
[23]:
psdf.describe()
[23]:
A | B | C | D | |
---|---|---|---|---|
计数 | 6.000000 | 6.000000 | 6.000000 | 6.000000 |
均值 | 0.470519 | -1.041829 | -0.157720 | -0.334542 |
标准差 | 0.809428 | 0.241511 | 0.294520 | 0.793014 |
最小值 | -0.772209 | -1.262010 | -0.434844 | -1.226828 |
25% | -0.059703 | -1.233897 | -0.405249 | -1.036021 |
50% | 0.332871 | -1.228099 | -0.289115 | -0.579920 |
75% | 0.924016 | -0.795645 | 0.068901 | 0.187606 |
最大值 | 1.485582 | -0.709306 | 0.316625 | 0.896679 |
转置您的数据
[24]:
psdf.T
[24]:
0 | 1 | 2 | 3 | 4 | 5 | |
---|---|---|---|---|---|---|
A | 0.912558 | -0.059703 | 0.332871 | 0.924016 | -0.772209 | 1.485582 |
B | -0.795645 | -1.233897 | -1.262010 | -1.022019 | -1.228099 | -0.709306 |
C | -0.289115 | 0.316625 | -0.434844 | -0.405249 | 0.068901 | -0.202637 |
D | 0.187606 | -1.226828 | -0.579920 | -1.036021 | 0.896679 | -0.248766 |
按其索引排序
[25]:
psdf.sort_index(ascending=False)
[25]:
A | B | C | D | |
---|---|---|---|---|
5 | 1.485582 | -0.709306 | -0.202637 | -0.248766 |
4 | -0.772209 | -1.228099 | 0.068901 | 0.896679 |
3 | 0.924016 | -1.022019 | -0.405249 | -1.036021 |
2 | 0.332871 | -1.262010 | -0.434844 | -0.579920 |
1 | -0.059703 | -1.233897 | 0.316625 | -1.226828 |
0 | 0.912558 | -0.795645 | -0.289115 | 0.187606 |
按值排序
[26]:
psdf.sort_values(by='B')
[26]:
A | B | C | D | |
---|---|---|---|---|
2 | 0.332871 | -1.262010 | -0.434844 | -0.579920 |
1 | -0.059703 | -1.233897 | 0.316625 | -1.226828 |
4 | -0.772209 | -1.228099 | 0.068901 | 0.896679 |
3 | 0.924016 | -1.022019 | -0.405249 | -1.036021 |
0 | 0.912558 | -0.795645 | -0.289115 | 0.187606 |
5 | 1.485582 | -0.709306 | -0.202637 | -0.248766 |
缺失数据 ¶
Pandas API on Spark 主要使用值
np.nan
来表示缺失数据。它默认不包含在计算中。
[27]:
pdf1 = pdf.reindex(index=dates[0:4], columns=list(pdf.columns) + ['E'])
[28]:
pdf1.loc[dates[0]:dates[1], 'E'] = 1
[29]:
psdf1 = ps.from_pandas(pdf1)
[30]:
psdf1
[30]:
A | B | C | D | E | |
---|---|---|---|---|---|
2013-01-01 | 0.912558 | -0.795645 | -0.289115 | 0.187606 | 1.0 |
2013-01-02 | -0.059703 | -1.233897 | 0.316625 | -1.226828 | 1.0 |
2013-01-03 | 0.332871 | -1.262010 | -0.434844 | -0.579920 | NaN |
2013-01-04 | 0.924016 | -1.022019 | -0.405249 | -1.036021 | NaN |
删除任何缺失数据的行。
[31]:
psdf1.dropna(how='any')
[31]:
A | B | C | D | E | |
---|---|---|---|---|---|
2013-01-01 | 0.912558 | -0.795645 | -0.289115 | 0.187606 | 1.0 |
2013-01-02 | -0.059703 | -1.233897 | 0.316625 | -1.226828 | 1.0 |
填补缺失的数据。
[32]:
psdf1.fillna(value=5)
[32]:
A | B | C | D | E | |
---|---|---|---|---|---|
2013-01-01 | 0.912558 | -0.795645 | -0.289115 | 0.187606 | 1.0 |
2013-01-02 | -0.059703 | -1.233897 | 0.316625 | -1.226828 | 1.0 |
2013-01-03 | 0.332871 | -1.262010 | -0.434844 | -0.579920 | 5.0 |
2013-01-04 | 0.924016 | -1.022019 | -0.405249 | -1.036021 | 5.0 |
操作 ¶
Spark 配置 ¶
在PySpark中,可以在Spark的pandas API内部应用各种配置。比如,您可以启用Arrow优化,以大大加快内部pandas转换的速度。有关更多信息,请参阅PySpark文档中的Apache Arrow在Pandas中的用法指南。
[34]:
prev = spark.conf.get("spark.sql.execution.arrow.pyspark.enabled") # Keep its default value.
ps.set_option("compute.default_index_type", "distributed") # Use default index prevent overhead.
import warnings
warnings.filterwarnings("ignore") # Ignore warnings coming from Arrow optimizations.
[35]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)
%timeit ps.range(300000).to_pandas()
900 ms ± 186 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
[36]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", False)
%timeit ps.range(300000).to_pandas()
3.08 s ± 227 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
[37]:
ps.reset_option("compute.default_index_type")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", prev) # Set its default value back.
分组 ¶
通过“分组”,我们指的是一个涉及以下一个或多个步骤的过程:
-
根据一些标准将数据分成组
-
对每个组独立应用一个函数
-
将结果组合成一个数据结构
[38]:
psdf = ps.DataFrame({'A': ['foo', 'bar', 'foo', 'bar',
'foo', 'bar', 'foo', 'foo'],
'B': ['one', 'one', 'two', 'three',
'two', 'two', 'one', 'three'],
'C': np.random.randn(8),
'D': np.random.randn(8)})
[39]:
psdf
[39]:
A | B | C | D | |
---|---|---|---|---|
0 | foo | one | 1.039632 | -0.571950 |
1 | bar | one | 0.972089 | 1.085353 |
2 | foo | two | -1.931621 | -2.579164 |
3 | bar | three | -0.654371 | -0.340704 |
4 | foo | two | -0.157080 | 0.893736 |
5 | bar | two | 0.882795 | 0.024978 |
6 | foo | one | -0.149384 | 0.201667 |
7 | foo | three | -1.355136 | 0.693883 |
对分组进行聚合,然后将 sum() 函数应用于结果组。
[40]:
psdf.groupby('A').sum()
[40]:
C | D | |
---|---|---|
A | ||
bar | 1.200513 | 0.769627 |
foo | -2.553589 | -1.361828 |
按多个列分组形成层次索引,我们可以再次应用求和函数。
[41]:
psdf.groupby(['A', 'B']).sum()
[41]:
C | D | ||
---|---|---|---|
A | B | ||
foo | one | 0.890248 | -0.370283 |
two | -2.088701 | -1.685428 | |
bar | three | -0.654371 | -0.340704 |
foo | three | -1.355136 | 0.693883 |
bar | two | 0.882795 | 0.024978 |
one | 0.972089 | 1.085353 |
绘图 ¶
[42]:
pser = pd.Series(np.random.randn(1000),
index=pd.date_range('1/1/2000', periods=1000))
[43]:
psser = ps.Series(pser)
[44]:
psser = psser.cummax()
[45]:
psser.plot()
在 DataFrame 上, plot() 方法是一个方便的方法,用于绘制所有带标签的列:
[46]:
pdf = pd.DataFrame(np.random.randn(1000, 4), index=pser.index,
columns=['A', 'B', 'C', 'D'])
[47]:
psdf = ps.from_pandas(pdf)
[48]:
psdf = psdf.cummax()
[49]:
psdf.plot()
有关更多详细信息,请参阅 绘图 文档。
数据的进出 ¶
CSV ¶
CSV 非常简单且易于使用。请查看 这里 以写入 CSV 文件,以及 这里 以读取 CSV 文件。
[50]:
psdf.to_csv('foo.csv')
ps.read_csv('foo.csv').head(10)
[50]:
A | B | C | D | |
---|---|---|---|---|
0 | -1.187097 | -0.134645 | 0.377094 | -0.627217 |
1 | 0.331741 | 0.166218 | 0.377094 | -0.627217 |
2 | 0.331741 | 0.439450 | 0.377094 | 0.365970 |
3 | 0.621620 | 0.439450 | 1.190180 | 0.365970 |
4 | 0.621620 | 0.439450 | 1.190180 | 0.365970 |
5 | 2.169198 | 1.069183 | 1.395642 | 0.365970 |
6 | 2.755738 | 1.069183 | 1.395642 | 1.045868 |
7 | 2.755738 | 1.069183 | 1.395642 | 1.045868 |
8 | 2.755738 | 1.069183 | 1.395642 | 1.045868 |
9 | 2.755738 | 1.508732 | 1.395642 | 1.556933 |
Parquet ¶
Parquet是一种高效且紧凑的文件格式,可以更快地读取和写入。有关写入Parquet文件的信息,请参见 这里 ,有关读取Parquet文件的信息,请参见 这里 。
[51]:
psdf.to_parquet('bar.parquet')
ps.read_parquet('bar.parquet').head(10)
[51]:
A | B | C | D | |
---|---|---|---|---|
0 | -1.187097 | -0.134645 | 0.377094 | -0.627217 |
1 | 0.331741 | 0.166218 | 0.377094 | -0.627217 |
2 | 0.331741 | 0.439450 | 0.377094 | 0.365970 |
3 | 0.621620 | 0.439450 | 1.190180 | 0.365970 |
4 | 0.621620 | 0.439450 | 1.190180 | 0.365970 |
5 | 2.169198 | 1.069183 | 1.395642 | 0.365970 |
6 | 2.755738 | 1.069183 | 1.395642 | 1.045868 |
7 | 2.755738 | 1.069183 | 1.395642 | 1.045868 |
8 | 2.755738 | 1.069183 | 1.395642 | 1.045868 |
9 | 2.755738 | 1.508732 | 1.395642 | 1.556933 |
Spark IO ¶
此外,pandas API 在 Spark 上完全支持 Spark 的各种数据源,如 ORC 和外部数据源。请参阅 这里 以将其写入指定的数据源,以及 这里 以从数据源中读取。
[52]:
psdf.to_spark_io('zoo.orc', format="orc")
ps.read_spark_io('zoo.orc', format="orc").head(10)
[52]:
A | B | C | D | |
---|---|---|---|---|
0 | -1.187097 | -0.134645 | 0.377094 | -0.627217 |
1 | 0.331741 | 0.166218 | 0.377094 | -0.627217 |
2 | 0.331741 | 0.439450 | 0.377094 | 0.365970 |
3 | 0.621620 | 0.439450 | 1.190180 | 0.365970 |
4 | 0.621620 | 0.439450 | 1.190180 | 0.365970 |
5 | 2.169198 | 1.069183 | 1.395642 | 0.365970 |
6 | 2.755738 | 1.069183 | 1.395642 | 1.045868 |
7 | 2.755738 | 1.069183 | 1.395642 | 1.045868 |
8 | 2.755738 | 1.069183 | 1.395642 | 1.045868 |
9 | 2.755738 | 1.508732 | 1.395642 | 1.556933 |
有关更多详细信息,请参见 输入/输出 文档。