入门指南
- 起始点:SparkSession
- 创建 DataFrames
- 未类型化数据集操作(即 DataFrame 操作)
- 以编程方式运行 SQL 查询
- 全局临时视图
- 创建数据集
- 与 RDDs 互操作
- 标量函数
- 聚合函数
起始点:SparkSession
进入Spark所有功能的入口是
SparkSession
类。要创建一个基本的
SparkSession
,只需使用
SparkSession.builder
:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL 基本示例") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
进入Spark所有功能的入口是
SparkSession
类。要创建一个基本的
SparkSession
,只需使用
SparkSession.builder()
:
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL 基本示例")
.config("spark.some.config.option", "some-value")
.getOrCreate()
进入Spark所有功能的入口是
SparkSession
类。要创建一个基本的
SparkSession
,只需使用
SparkSession.builder()
:
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL 基础示例")
.config("spark.some.config.option", "some-value")
.getOrCreate();
进入Spark所有功能的入口是
SparkSession
类。要初始化一个基本的
SparkSession
,只需调用
sparkR.session()
:
sparkR.session(appName = "R Spark SQL 基本示例", sparkConfig = list(spark.some.config.option = "some-value"))
注意,当第一次调用时,
sparkR.session()
初始化一个全局的
SparkSession
单例实例,并在后续调用中始终返回对该实例的引用。通过这种方式,用户只需初始化一次
SparkSession
,然后像
read.df
这样的 SparkR 函数将能够隐式访问这个全局实例,用户不需要四处传递
SparkSession
实例。
SparkSession
在 Spark 2.0 中提供了对 Hive 特性的内置支持,包括使用 HiveQL 编写查询、访问 Hive UDFs,以及从 Hive 表中读取数据的能力。要使用这些功能,您不需要有现有的 Hive 设置。
创建数据框
通过一个
SparkSession
,应用程序可以从一个
现有的
RDD
、从一个 Hive 表,或者从
Spark 数据源
创建 DataFrames。
作为一个例子,以下内容基于JSON文件的内容创建一个DataFrame:
# spark 是一个现有的 SparkSession
df = spark.read.json("examples/src/main/resources/people.json")
# 将 DataFrame 的内容显示到 stdout
df.show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
通过一个
SparkSession
,应用程序可以从一个
现有的
RDD
、从 Hive 表,或从
Spark 数据源
创建 DataFrames。
作为示例,以下内容基于JSON文件的内容创建一个DataFrame:
val df = spark.read.json("examples/src/main/resources/people.json")
// 将 DataFrame 的内容显示到标准输出
df.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
通过
SparkSession
,应用程序可以从一个
现有的
RDD
、从 Hive 表,或者从
Spark 数据源
创建 DataFrame。
作为一个示例,以下内容基于JSON文件的内容创建一个DataFrame:
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");
// 将DataFrame的内容显示到stdout
df.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
通过一个
SparkSession
,应用程序可以从本地 R data.frame、从 Hive 表或从
Spark 数据源
创建 DataFrames。
作为示例,以下代码基于JSON文件的内容创建一个DataFrame:
df <- read.json("examples/src/main/resources/people.json")
# 显示数据框的内容
head(df)
## 年龄 姓名
## 1 NA Michael
## 2 30 Andy
## 3 19 Justin
# 另一种方法打印前几行,并可选择截断长值的打印
showDF(df)
## +----+-------+
## | 年龄| 姓名|
## +----+-------+
## |null|Michael|
## | 30| Andy|
## | 19| Justin|
## +----+-------+
未类型数据集操作(即数据框操作)
DataFrames 提供了一种特定领域的语言,用于在 Scala 、 Java 、 Python 和 R 中进行结构化数据操作。
如上所述,在Spark 2.0中,DataFrames只是Scala和Java API中的
Row
的Dataset。这些操作被称为“非类型转换”,与具有强类型的Scala/Java Datasets的“类型转换”形成对比。
在这里,我们包含了一些使用数据集的结构化数据处理的基本示例:
在 Python 中,可以通过属性(
df.age
)或索引(
df['age']
)访问 DataFrame 的列。前者在互动数据探索中很方便,但强烈建议用户使用后者,这种形式对未来是兼容的,并且在列名也作为 DataFrame 类的属性时不会出错。
# spark, df来自之前的示例
# 以树形格式打印模式
df.printSchema()
# root
# |-- age: long (可为空 = true)
# |-- name: string (可为空 = true)
# 仅选择"name"列
df.select("name").show()
# +-------+
# | name|
# +-------+
# |Michael|
# | Andy|
# | Justin|
# +-------+
# 选择所有人,但将年龄加1
df.select(df['name'], df['age'] + 1).show()
# +-------+---------+
# | name|(age + 1)|
# +-------+---------+
# |Michael| null|
# | Andy| 31|
# | Justin| 20|
# +-------+---------+
# 选择年龄大于21的人
df.filter(df['age'] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+
# 按年龄计数
df.groupBy("age").count().show()
# +----+-----+
# | age|count|
# +----+-----+
# | 19| 1|
# |null| 1|
# | 30| 1|
# +----+-----+
有关可以在DataFrame上执行的操作类型的完整列表,请参考 API文档 。
除了简单的列引用和表达式,DataFrames 还拥有丰富的函数库,包括字符串处理、日期算法、常见数学运算等。完整的列表可在 DataFrame 函数参考 中找到。
// 此导入用于使用 $-符号
import spark.implicits._
// 以树形格式打印模式
df.printSchema()
// root
// |-- age: long (可为空 = true)
// |-- name: string (可为空 = true)
// 只选择 "name" 列
df.select("name").show()
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+
// 选择所有人,但将年龄增加 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+
// 选择年龄大于 21 的人
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// 按年龄计数
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
有关可以对数据集执行的操作类型的完整列表,请参阅 API文档 。
除了简单的列引用和表达式外,数据集还拥有丰富的函数库,包括字符串操作、日期运算、常见数学运算等。完整列表可在 DataFrame 函数参考 中找到。
// col("...") 优于 df.col("...")
import static org.apache.spark.sql.functions.col;
// 以树形格式打印模式
df.printSchema();
// 根
// |-- age: long (可为空 = true)
// |-- name: string (可为空 = true)
// 仅选择 "name" 列
df.select("name").show();
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+
// 选择所有人,但将年龄增加 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+
// 选择年龄大于 21 的人
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// 按年龄统计人数
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
有关可以对数据集执行的操作类型的完整列表,请参考 API 文档 。
除了简单的列引用和表达式,数据集还拥有丰富的函数库,包括字符串操作、日期运算、常见数学操作等。完整列表可在 DataFrame 函数参考 中找到。
# 创建 DataFrame
df <- read.json("examples/src/main/resources/people.json")
# 显示 DataFrame 的内容
head(df)
## age name
## 1 NA Michael
## 2 30 Andy
## 3 19 Justin
# 以树形格式打印 schema
printSchema(df)
## root
## |-- age: long (nullable = true)
## |-- name: string (nullable = true)
# 仅选择 "name" 列
head(select(df, "name"))
## name
## 1 Michael
## 2 Andy
## 3 Justin
# 选择所有人,但将年龄加 1
head(select(df, df$name, df$age + 1))
## name (age + 1.0)
## 1 Michael NA
## 2 Andy 31
## 3 Justin 20
# 选择年龄大于 21 的人
head(where(df, df$age > 21))
## age name
## 1 30 Andy
# 按年龄统计人数
head(count(groupBy(df, "age")))
## age count
## 1 19 1
## 2 NA 1
## 3 30 1
有关可以在 DataFrame 上执行的操作类型的完整列表,请参阅 API Documentation 。
除了简单的列引用和表达式,DataFrame 还拥有丰富的函数库,包括字符串操作、日期运算、常见的数学运算等。完整列表可在 DataFrame 函数参考 中找到。
以编程方式运行 SQL 查询
在
SparkSession
上的
sql
函数使应用程序能够以编程方式运行 SQL 查询,并将结果返回为
DataFrame
。
# 注册 DataFrame 为 SQL 临时视图
df.createOrReplaceTempView("people")
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
在
SparkSession
上的
sql
函数使应用程序能够以编程方式运行SQL查询,并将结果作为
DataFrame
返回。
// 将 DataFrame 注册为 SQL 临时视图
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
在
SparkSession
上的
sql
函数使应用程序能够以编程方式运行 SQL 查询,并将结果返回为
Dataset
。
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// 注册 DataFrame 为 SQL 临时视图
df.createOrReplaceTempView("people");
Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
// +----+-------+
// | 年龄| 姓名|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
该
sql
函数使应用程序能够以编程方式运行 SQL 查询,并将结果返回为
SparkDataFrame
。
df <- sql("选择 * 从 表格")
全局临时视图
在Spark SQL中,临时视图是会话范围的,如果创建它的会话终止,临时视图将会消失。如果您想要一个在所有会话之间共享并在Spark应用程序终止之前保持活跃的临时视图,可以创建一个全局临时视图。全局临时视图与系统保留的数据库
global_temp
相关联,我们必须使用合格的名称来引用它,例如
SELECT * FROM global_temp.view1
。
# 注册 DataFrame 为全局临时视图
df.createGlobalTempView("people")
# 全局临时视图与系统保留的数据库 `global_temp` 相关联
spark.sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
# 全局临时视图是跨会话的
spark.newSession().sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+
// 将 DataFrame 注册为全局临时视图
df.createGlobalTempView("people")
// 全局临时视图与系统保留的数据库 `global_temp` 相关联
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// 全局临时视图跨会话可用
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// 将DataFrame注册为全局临时视图
df.createGlobalTempView("people");
// 全局临时视图与系统保存的数据库 `global_temp` 相关联
spark.sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// 全局临时视图是跨会话的
spark.newSession().sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
创建 全局 临时 视图 temp_view 作为 选择 a + 1, b * 2 来自 tbl
选择 * 来自 global_temp.temp_view
创建数据集
数据集类似于 RDDs,但它们使用专门的 Encoder 来序列化对象,而不是使用 Java 序列化或 Kryo,以便于处理或通过网络传输。虽然编码器和标准序列化都负责将对象转换为字节,但编码器是动态生成的代码,并使用一种格式,使 Spark 能够在不将字节反序列化回对象的情况下执行许多操作,如过滤、排序和哈希。
case class Person(name: String, age: Long)
// 为案例类创建编码器
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+
// 导入 spark.implicits._ 可以自动提供大多数常见类型的编码器
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // 返回: Array(2, 3, 4)
// DataFrame 可以通过提供类转换为 Dataset。映射将根据名称进行
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
import java.util.Arrays;
import java.util.Collections;
import java.io.Serializable;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
public static class Person implements Serializable {
private String name;
private long age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public long getAge() {
return age;
}
public void setAge(long age) {
this.age = age;
}
}
// 创建一个 Bean 类的实例
Person person = new Person();
person.setName("Andy");
person.setAge(32);
// 为 Java beans 创建编码器
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(
Collections.singletonList(person),
personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+
// 为最常见类型提供了编码器在 Encoders 类中
Encoder<Long> longEncoder = Encoders.LONG();
Dataset<Long> primitiveDS = spark.createDataset(Arrays.asList(1L, 2L, 3L), longEncoder);
Dataset<Long> transformedDS = primitiveDS.map(
(MapFunction<Long, Long>) value -> value + 1L,
longEncoder);
transformedDS.collect(); // 返回 [2, 3, 4]
// DataFrame 可以通过提供类来转换为 Dataset。基于名称的映射
String path = "examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
与RDDs的互操作
Spark SQL支持两种不同的方法将现有的RDD转换为Datasets。第一种方法使用反射来推断包含特定类型对象的RDD的模式。这种基于反射的方法导致代码更加简洁,并且在编写Spark应用程序时您已经知道模式时效果很好。
创建数据集的第二种方法是通过一个编程接口,这使您能够构建一个模式,然后将其应用到现有的 RDD。尽管这种方法更冗长,但它允许您在运行时不知道列及其类型的情况下构建数据集。
使用反射推断架构
Spark SQL 可以将 Row 对象的 RDD 转换为 DataFrame,并推断数据类型。Row 是通过将一组键/值对作为 kwargs 传递给 Row 类构造的。该列表的键定义了表的列名,类型通过对整个数据集进行采样推断,类似于在 JSON 文件上执行的推断。
from pyspark.sql import Row
sc = spark.sparkContext
# 加载一个文本文件并将每一行转换为 Row。
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
# 推断模式,并将 DataFrame 注册为表。
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")
# 可以在已注册为表的 DataFrame 上运行 SQL。
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
# SQL 查询的结果是 DataFrame 对象。
# rdd 返回内容作为 :class:`pyspark.RDD` 的 :class:`Row`。
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
print(name)
# Name: Justin
Scala接口支持将包含案例类的RDD自动转换为DataFrame。案例类定义了表的模式。通过反射读取案例类的参数名称,并将其作为列的名称。案例类还可以是嵌套的或包含复杂类型,比如
Seq
s 或
Array
s。这个RDD可以被隐式转换为DataFrame,然后注册为表。表可以在后续的SQL语句中使用。
// 从RDD隐式转换为DataFrame
import spark.implicits._
// 从文本文件创建一个Person对象的RDD,并转换为DataFrame
val peopleDF = spark.sparkContext
.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
// 将DataFrame注册为临时视图
peopleDF.createOrReplaceTempView("people")
// 通过使用Spark提供的sql方法可以运行SQL语句
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
// 可以通过字段索引访问结果行的列
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// 或者通过字段名称
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// 对于Dataset[Map[K,V]]没有预定义的编码器,需要明确定义
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// 原始类型和案例类也可以定义为
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()
// row.getValuesMap[T] 一次检索多个列到Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))
Spark SQL 支持自动将一个 RDD 的
JavaBeans
转换为 DataFrame。
使用反射获取的
BeanInfo
定义了表的模式。目前,Spark SQL
不支持包含
Map
字段的 JavaBeans。不过,支持嵌套 JavaBeans 和
List
或
Array
字段。您可以通过创建一个实现 Serializable 的类,并为其所有字段提供 getter 和 setter 来创建一个 JavaBean。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
// 从文本文件创建一个 Person 对象的 RDD
JavaRDD<Person> peopleRDD = spark.read()
.textFile("examples/src/main/resources/people.txt")
.javaRDD()
.map(line -> {
String[] parts = line.split(",");
Person person = new Person();
person.setName(parts[0]);
person.setAge(Integer.parseInt(parts[1].trim()));
return person;
});
// 为 JavaBean 的 RDD 应用一个架构以获取 DataFrame
Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
// 将 DataFrame 注册为临时视图
peopleDF.createOrReplaceTempView("people");
// 可以通过使用 spark 提供的 sql 方法运行 SQL 语句
Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
// 可以通过字段索引访问结果中一行的列
Encoder<String> stringEncoder = Encoders.STRING();
Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
stringEncoder);
teenagerNamesByIndexDF.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
// 或通过字段名称
Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),
stringEncoder);
teenagerNamesByFieldDF.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
程序matically指定架构
当无法提前定义一个kwargs字典时(例如,记录的结构编码在一个字符串中,或者一个文本数据集将被解析,并且字段将为不同的用户投影不同),可以通过三个步骤程序matically创建一个
DataFrame
。
- 从原始 RDD 创建一个元组或列表的 RDD;
-
创建一个由
StructType
表示的架构,与步骤 1 中创建的 RDD 中元组或列表的结构相匹配。 -
通过
SparkSession
提供的createDataFrame
方法将架构应用于 RDD。
例如:
# 导入数据类型
from pyspark.sql.types import StringType, StructType, StructField
sc = spark.sparkContext
# 加载文本文件并将每一行转换为Row。
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# 每一行被转换为一个元组。
people = parts.map(lambda p: (p[0], p[1].strip()))
# 模式编码在一个字符串中。
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
# 将模式应用到 RDD。
schemaPeople = spark.createDataFrame(people, schema)
# 使用 DataFrame 创建临时视图
schemaPeople.createOrReplaceTempView("people")
# 可以在已注册为表的 DataFrame 上运行 SQL。
results = spark.sql("SELECT name FROM people")
results.show()
# +-------+
# | name|
# +-------+
# |Michael|
# | Andy|
# | Justin|
# +-------+
当案例类无法提前定义时(例如,记录的结构编码在一个字符串中,或者一个文本数据集将被解析,并且字段将为不同的用户以不同的方式投影),可以通过三个步骤以编程方式创建一个
DataFrame
。
-
从原始 RDD 创建一个
Row
的 RDD; -
创建一个由
StructType
表示的模式,该模式与第 1 步中创建的 RDD 中的Row
的结构匹配。 -
通过
SparkSession
提供的createDataFrame
方法将模式应用于Row
的 RDD。
例如:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
// 创建一个 RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
// 模式编码在一个字符串中
val schemaString = "name age"
// 根据模式字符串生成模式
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// 将 RDD(人)的记录转换为 Rows
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))
// 将模式应用于 RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)
// 使用 DataFrame 创建临时视图
peopleDF.createOrReplaceTempView("people")
// 可以在使用 DataFrames 创建的临时视图上运行 SQL
val results = spark.sql("SELECT name FROM people")
// SQL 查询的结果是 DataFrames,并支持所有正常的 RDD 操作
// 可以通过字段索引或字段名称访问结果中行的列
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
// +-------------+
当JavaBean类无法提前定义时(例如,记录的结构编码在一个字符串中,或者文本数据集将被解析并且字段将为不同的用户以不同的方式投影),可以通过三个步骤以编程方式创建一个
Dataset
。
-
从原始 RDD 创建一个
Row
的 RDD; -
创建一个由
StructType
表示的模式,匹配步骤 1 中 RDD 中Row
的结构。 -
通过
SparkSession
提供的createDataFrame
方法,将模式应用于Row
的 RDD。
例如:
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// 创建一个 RDD
JavaRDD<String> peopleRDD = spark.sparkContext()
.textFile("examples/src/main/resources/people.txt", 1)
.toJavaRDD();
// 模式是以字符串编码的
String schemaString = "name age";
// 根据模式字符串生成模式
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {
StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);
// 将 RDD(人)中的记录转换为 Rows
JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
String[] attributes = record.split(",");
return RowFactory.create(attributes[0], attributes[1].trim());
});
// 将模式应用于 RDD
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);
// 使用 DataFrame 创建临时视图
peopleDataFrame.createOrReplaceTempView("people");
// 可以在使用 DataFrames 创建的临时视图上运行 SQL
Dataset<Row> results = spark.sql("SELECT name FROM people");
// SQL 查询的结果是 DataFrame,并支持所有正常的 RDD 操作
// 可以通过字段索引或字段名称访问结果中行的列
Dataset<String> namesDS = results.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
Encoders.STRING());
namesDS.show();
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
// +-------------+
标量函数
标量函数是每行返回一个单一值的函数,与返回一组行的值的聚合函数相对。Spark SQL 支持多种 内置标量函数 。它还支持 用户定义的标量函数 。
聚合函数
聚合函数是对一组行返回单个值的函数。
内置聚合函数
提供了常见的聚合,如
count()
、
count_distinct()
、
avg()
、
max()
、
min()
等。用户不仅限于预定义的聚合函数,还可以创建自己的聚合函数。有关用户定义聚合函数的更多细节,请参阅
用户定义聚合函数
的文档。