pyspark.sql.DataFrame

class pyspark.sql. DataFrame ( jdf : py4j.java_gateway.JavaObject , sql_ctx : Union [ SQLContext , SparkSession ] ) [source]

一个分布式的数据集合,按命名列分组。

新增于版本 1.3.0。

在版本 3.4.0 中更改: 支持 Spark Connect。

注释

DataFrame 应仅如上所述创建。不应通过使用构造函数直接创建。

示例

一个 DataFrame 等同于 Spark SQL 中的一个关系表, 并且可以通过 SparkSession 中的各种函数来创建:

>>> people = spark.createDataFrame([
...     {"deptId": 1, "age": 40, "name": "Hyukjin Kwon", "gender": "M", "salary": 50},
...     {"deptId": 1, "age": 50, "name": "Takuya Ueshin", "gender": "M", "salary": 100},
...     {"deptId": 2, "age": 60, "name": "Xinrong Meng", "gender": "F", "salary": 150},
...     {"deptId": 3, "age": 20, "name": "Haejoon Lee", "gender": "M", "salary": 200}
... ])

一旦创建,可以使用在以下定义的各种领域特定语言(DSL)函数进行操作: DataFrame Column

要从 DataFrame 中选择一列,请使用 apply 方法:

>>> age_col = people.age

一个更具体的例子:

>>> # To create DataFrame using SparkSession
... department = spark.createDataFrame([
...     {"id": 1, "name": "PySpark"},
...     {"id": 2, "name": "ML"},
...     {"id": 3, "name": "Spark SQL"}
... ])
>>> people.filter(people.age > 30).join(
...     department, people.deptId == department.id).groupBy(
...     department.name, "gender").agg({"salary": "avg", "age": "max"}).show()
+-------+------+-----------+--------+
|   name|gender|avg(salary)|max(age)|
+-------+------+-----------+--------+
|     ML|     F|      150.0|      60|
|PySpark|     M|       75.0|      50|
+-------+------+-----------+--------+

方法

agg (*exprs)

在整个 DataFrame 上进行聚合,不进行分组( df.groupBy().agg() 的简写)。

alias (别名)

返回一个设置了别名的新 DataFrame

approxQuantile (列, 概率, 相对误差)

计算 DataFrame 数值列的近似分位数。

cache ()

使用默认存储级别( MEMORY_AND_DISK_DESER )持久化 DataFrame

checkpoint ([eager])

返回此 DataFrame 的检查点版本。

coalesce (numPartitions)

返回一个具有确切 numPartitions 分区的新 DataFrame

colRegex (colName)

根据作为正则表达式指定的列名选择列,并将其返回为 Column

collect ()

返回所有记录作为 Row 的列表。

corr (col1, col2[, method])

计算 DataFrame 中两列的相关性,返回一个双精度值。

count ()

返回此 DataFrame 中的行数。

cov (col1, col2)

计算给定列的样本协方差,这些列由它们的名称指定,结果为双精度值。

createGlobalTempView (名称)

使用此 DataFrame 创建一个全局临时视图。

createOrReplaceGlobalTempView (名称)

使用给定的名称创建或替换一个全局临时视图。

createOrReplaceTempView (名称)

创建或替换一个本地临时视图,使用这个 DataFrame

createTempView (名称)

使用此 DataFrame 创建一个本地临时视图。

crossJoin (其他)

返回与另一个 DataFrame 的笛卡尔积。

crosstab (列1, 列2)

计算给定列的对频率表。

cube (*cols)

为当前的 DataFrame 使用指定的列创建一个多维立方体,以便我们可以对它们运行聚合操作。

describe (*cols)

计算数值和字符串列的基本统计信息。

distinct ()

返回一个新的 DataFrame ,包含此 DataFrame 中的不同行。

drop (*cols)

返回一个新的 DataFrame ,不包含指定的列。

dropDuplicates ([subset])

返回一个新的 DataFrame ,其中删除了重复的行,可以选择仅考虑某些列。

dropDuplicatesWithinWatermark ([subset])

返回一个新的 DataFrame ,其中删除了重复的行,

drop_duplicates ([subset])

drop_duplicates() dropDuplicates() 的别名。

dropna ([how, thresh, subset])

返回一个新的 DataFrame ,省略包含空值的行。

exceptAll (other)

返回一个新的 DataFrame ,包含在此 DataFrame 中但不在另一个 DataFrame 中的行,同时保留重复项。

explain ([extended, mode])

打印逻辑和物理计划到控制台以进行调试。

fillna (值[, 子集])

替换空值, na.fill() 的别名。

filter (条件)

使用给定的条件过滤行。

first ()

返回第一行作为 Row

foreach (f)

f 函数应用于这个 DataFrame 的所有 Row

foreachPartition (f)

f 函数应用于这个 DataFrame 的每个分区。

freqItems (cols[, support])

查找列中的频繁项,可能带有误报。

groupBy (*cols)

使用指定的列对 DataFrame 进行分组,以便我们可以对其运行聚合操作。

groupby (*cols)

groupby() groupBy() 的别名。

head ([n])

返回前 n 行。

hint (名称, *参数)

指定当前 DataFrame 的一些提示。

inputFiles ()

返回一个尽力而为的快照,该快照由组成此 DataFrame 的文件组成。

intersect (其他)

返回一个新的 DataFrame ,其中仅包含此 DataFrame 和另一个 DataFrame 中都存在的行。

intersectAll (其他)

返回一个新的 DataFrame ,包含此 DataFrame 和另一个 DataFrame 中的所有行,同时保留重复项。

isEmpty ()

检查 DataFrame 是否为空并返回一个布尔值。

isLocal ()

如果可以在本地运行(不需要任何Spark执行器),则返回 True collect() take() 方法。

join (其他[, on, how])

与另一个 DataFrame 进行连接,使用给定的连接表达式。

limit (数量)

将结果数量限制为指定的数量。

localCheckpoint ([eager])

返回此 DataFrame 的本地检查点版本。

mapInArrow (函数, 模式[, 屏障])

使用一个Python原生函数映射当前 DataFrame 中的批次迭代器,该函数接受并输出PyArrow的 RecordBatch ,并返回结果作为 DataFrame

mapInPandas (func, schema[, barrier])

使用一个Python原生函数映射当前 DataFrame 中的批次迭代器,该函数接受并输出一个pandas DataFrame,并返回结果作为 DataFrame

melt (ids, values, variableColumnName, …)

将DataFrame从宽格式透视为长格式,可以选择保留标识符列集。

observe (观察, *表达式)

定义(命名)要在 DataFrame 上观察的指标。

offset (num)

返回一个新的 :class: DataFrame ,通过跳过前 n 行。

orderBy (*cols, **kwargs)

返回一个新的 DataFrame ,按指定的列排序。

pandas_api ([index_col])

将现有的 DataFrame 转换为 pandas-on-Spark DataFrame。

persist ([storageLevel])

设置存储级别以在第一次计算后跨操作持久化 DataFrame 的内容。

printSchema ([level])

以树形格式打印出模式。

randomSplit (权重[, 种子])

随机拆分此 DataFrame 并提供权重。

registerTempTable (名称)

将此 DataFrame 使用给定的名称注册为临时表。

repartition (numPartitions, *cols)

返回一个新的 DataFrame ,由给定的分区表达式进行分区。

repartitionByRange (numPartitions, *cols)

返回一个新的 DataFrame ,由给定的分区表达式进行分区。

replace (要替换的值[, 替换值, 子集])

返回一个新的 DataFrame ,用另一个值替换某个值。

rollup (*cols)

为当前的 DataFrame 创建一个多维度的汇总,使用指定的列,以便我们可以对它们进行聚合。

sameSemantics (其他)

当两个 DataFrame 中的逻辑查询计划相等时,返回 True ,因此它们返回相同的结果。

sample ([withReplacement, fraction, seed])

返回此 DataFrame 的采样子集。

sampleBy (列, 比例[, 种子])

返回基于每个层级给定比例的分层抽样,且不进行替换。

select (*cols)

将一组表达式投影并返回一个新的 DataFrame

selectExpr (*expr)

将一组SQL表达式投影并返回一个新的 DataFrame

semanticHash ()

返回对此 DataFrame 的逻辑查询计划的哈希码。

show ([n, truncate, vertical])

打印前 n 行到控制台。

sort (*cols, **kwargs)

返回一个新的 DataFrame ,按指定的列排序。

sortWithinPartitions (*cols, **kwargs)

返回一个新的 DataFrame ,其中每个分区按指定的列进行排序。

subtract (其他)

返回一个新的 DataFrame ,包含在此 DataFrame 中但不在另一个 DataFrame 中的行。

summary (*统计数据)

计算数值和字符串列的指定统计信息。

tail (数量)

返回最后 num 行作为 list Row

take (数量)

返回前 num 行作为一个 list Row

to (schema)

返回一个新的 DataFrame ,其中每一行都经过调整以匹配指定的模式。

toDF (*cols)

返回一个新的 DataFrame ,其中包含指定的新列名

toJSON ([use_unicode])

将一个 DataFrame 转换为 RDD 的字符串。

toLocalIterator ([prefetchPartitions])

返回一个包含此 DataFrame 中所有行的迭代器。

toPandas ()

返回此 DataFrame 作为Pandas pandas.DataFrame

to_koalas ([index_col])

to_pandas_on_spark ([index_col])

transform (func, *args, **kwargs)

返回一个新的 DataFrame

union (其他)

返回一个新的 DataFrame ,包含此数据框和另一个 DataFrame 中的所有行。

unionAll (其他)

返回一个新的 DataFrame ,包含此数据框和另一个 DataFrame 中的所有行。

unionByName (其他[, 允许缺失列])

返回一个新的 DataFrame ,包含此数据框和另一个 DataFrame 中的所有行。

unpersist ([blocking])

DataFrame 标记为非持久化,并从内存和磁盘中移除其所有块。

unpivot (ids, values, variableColumnName, …)

将DataFrame从宽格式透视为长格式,可以选择保留标识符列集。

where (条件)

where() filter() 的别名。

withColumn (列名, 列)

通过添加一列或替换具有相同名称的现有列,返回一个新的 DataFrame

withColumnRenamed (现有列名, 新列名)

通过重命名现有列返回一个新的 DataFrame

withColumns (*colsMap)

通过添加多个列或替换具有相同名称的现有列,返回一个新的 DataFrame

withColumnsRenamed (colsMap)

通过重命名多个列返回一个新的 DataFrame

withMetadata (columnName, metadata)

通过更新现有列的元数据返回一个新的 DataFrame

withWatermark (eventTime, delayThreshold)

为此 DataFrame 定义一个事件时间水印。

writeTo (表)

创建一个用于v2源的写配置构建器。

属性

columns

检索 DataFrame 中所有列的名称,并将其作为列表返回。

dtypes

返回所有列名及其数据类型作为列表。

isStreaming

如果此 DataFrame 包含一个或多个持续返回数据的数据源,则返回 True

na

返回一个用于处理缺失值的 DataFrameNaFunctions

rdd

返回内容作为 pyspark.RDD Row

schema

返回此 DataFrame 的架构为 pyspark.sql.types.StructType

sparkSession

返回创建此 DataFrame 的Spark会话。

sql_ctx

stat

返回一个用于统计函数的 DataFrameStatFunctions

storageLevel

获取 DataFrame 的当前存储级别。

write

用于将非流式 DataFrame 的内容保存到外部存储的接口。

writeStream

用于将流式 DataFrame 的内容保存到外部存储的接口。