JSON 文件
Spark SQL可以自动推断JSON数据集的模式并将其加载为DataFrame。 这种转换可以通过
SparkSession.read.json
在JSON文件上完成。
请注意,提供的文件作为 json 文件 并不是一个典型的 JSON 文件。每行必须包含一个独立的、自包含的有效 JSON 对象。有关更多信息,请参见 JSON Lines 文本格式,也称为换行分隔的 JSON 。
对于常规的多行JSON文件,将
multiLine
参数设置为
True
。
# spark 来自之前的例子。
sc = spark.sparkContext
# 一个 JSON 数据集由路径指向。
# 该路径可以是一个单一的文本文件或存储文本文件的目录
path = "examples/src/main/resources/people.json"
peopleDF = spark.read.json(path)
# 推断出的模式可以使用 printSchema() 方法进行可视化
peopleDF.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)
# 使用 DataFrame 创建一个临时视图
peopleDF.createOrReplaceTempView("people")
# 可以使用 spark 提供的 sql 方法运行 SQL 语句
teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
# +------+
# | name|
# +------+
# |Justin|
# +------+
# 另外,可以为由
# 一个 RDD[String] 表示的 JSON 数据集创建 DataFrame,每个字符串存储一个 JSON 对象
jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']
otherPeopleRDD = sc.parallelize(jsonStrings)
otherPeople = spark.read.json(otherPeopleRDD)
otherPeople.show()
# +---------------+----+
# | address|name|
# +---------------+----+
# |[Columbus,Ohio]| Yin|
# +---------------+----+
Spark SQL可以自动推断JSON数据集的架构并将其加载为一个
Dataset[Row]
。
此转换可以使用
SparkSession.read.json()
在
Dataset[String]
或JSON文件上进行。
请注意,作为 一个 json 文件 提供的文件不是典型的 JSON 文件。每一行必须包含一个单独的、自包含的有效 JSON 对象。有关更多信息,请参见 JSON Lines 文本格式,也称为换行分隔 JSON 。
对于常规多行 JSON 文件,将
multiLine
选项设置为
true
。
// 原始类型 (Int, String, 等) 和 产品类型 (案例类) 的编码器
// 通过在创建数据集时导入该内容来支持。
import spark.implicits._
// 通过路径指向一个 JSON 数据集。
// 路径可以是单个文本文件或存储文本文件的目录
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)
// 可以使用 printSchema() 方法可视化推断的模式
peopleDF.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// 使用 DataFrame 创建一个临时视图
peopleDF.createOrReplaceTempView("people")
// 可以使用 spark 提供的 sql 方法运行 SQL 语句
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
// +------+
// | name|
// +------+
// |Justin|
// +------+
// 或者,可以为一个由字符串表示的 JSON 数据集创建 DataFrame
// 每个字符串存储一个 JSON 对象
val otherPeopleDataset = spark.createDataset(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople.show()
// +---------------+----+
// | address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+
Spark SQL可以自动推断JSON数据集的模式并将其加载为一个
Dataset
。
此转换可以使用
SparkSession.read().json()
在
Dataset
或者JSON文件上完成。
请注意,提供的文件作为 json文件 并不是一个典型的JSON文件。每行必须包含一个独立的、自给自足的有效JSON对象。有关更多信息,请参见 JSON Lines文本格式,也称为换行分隔的JSON 。
对于常规的多行JSON文件,将
multiLine
选项设置为
true
。
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// JSON 数据集由路径指向。
// 路径可以是单个文本文件或存储文本文件的目录
Dataset<Row> people = spark.read().json("examples/src/main/resources/people.json");
// 推断的模式可以使用 printSchema() 方法进行可视化
people.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// 使用 DataFrame 创建一个临时视图
people.createOrReplaceTempView("people");
// 可以通过使用 spark 提供的 sql 方法运行 SQL 语句
Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
namesDF.show();
// +------+
// | name|
// +------+
// |Justin|
// +------+
// 或者,可以为一个由每个字符串表示一个 JSON 对象的 Dataset 创建 DataFrame。
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());
Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);
anotherPeople.show();
// +---------------+----+
// | address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+
Spark SQL 可以自动推断 JSON 数据集的结构并将其加载为 DataFrame。使用
read.json()
函数,它从 JSON 文件目录加载数据,每行文件都是一个 JSON 对象。
请注意,作为 json文件 提供的文件并不是典型的JSON文件。每一行必须包含一个独立的、自包含的有效JSON对象。有关更多信息,请参见 JSON Lines文本格式,也称为换行分隔的JSON 。
对于常规的多行JSON文件,将命名参数
multiLine
设置为
TRUE
。
# 一个 JSON 数据集由路径指向。
# 路径可以是单个文本文件或存储文本文件的目录。
path <- "examples/src/main/resources/people.json"
# 从由路径指向的文件创建一个 DataFrame
people <- read.json(path)
# 推断的模式可以使用 printSchema() 方法可视化。
printSchema(people)
## root
## |-- age: long (nullable = true)
## |-- name: string (nullable = true)
# 将此 DataFrame 注册为一个表。
createOrReplaceTempView(people, "people")
# 可以使用 sql 方法运行 SQL 语句。
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
## name
## 1 Justin
CREATE TEMPORARY VIEW jsonTable
USING org.apache.spark.sql.json
OPTIONS (
path "examples/src/main/resources/people.json"
)
SELECT * FROM jsonTable
数据源选项
JSON的数据源选项可以通过以下方式设置:
-
的
.option
/.options
方法-
DataFrameReader
-
DataFrameWriter
-
DataStreamReader
-
DataStreamWriter
-
-
以下内置函数
-
from_json
-
to_json
-
schema_of_json
-
-
OPTIONS
子句在 CREATE TABLE USING DATA_SOURCE
属性名称 | 默认值 | 含义 | 范围 |
---|---|---|---|
timeZone
|
(
spark.sql.session.timeZone
配置的值)
|
设置一个字符串,表示用于格式化JSON数据源或分区值中的时间戳的时区ID。支持以下格式的
timeZone
:
|
读/写 |
primitivesAsString
|
false
|
将所有原始值推断为字符串类型。 | 读 |
prefersDecimal
|
false
|
将所有浮点值推断为十进制类型。如果值不适合十进制,则将其推断为双精度。 | 读 |
allowComments
|
false
|
忽略JSON记录中的Java/C++样式注释。 | 读 |
allowUnquotedFieldNames
|
false
|
允许不带引号的JSON字段名称。 | 读 |
allowSingleQuotes
|
true
|
除了双引号外,还允许单引号。 | 读 |
allowNumericLeadingZeros
|
false
|
允许数字中的前导零(例如:00012)。 | 读 |
allowBackslashEscapingAnyCharacter
|
false
|
允许使用反斜杠转义机制对所有字符进行引用。 | 读 |
mode
|
PERMISSIVE
|
允许处理解析过程中损坏记录的模式。
|
读 |
columnNameOfCorruptRecord
|
(
spark.sql.columnNameOfCorruptRecord
配置的值)
|
允许重命名由
PERMISSIVE
模式创建的包含错误字符串的新字段。这会覆盖spark.sql.columnNameOfCorruptRecord。
|
读 |
dateFormat
|
yyyy-MM-dd
|
设置一个字符串,表示日期格式。自定义日期格式遵循 日期时间模式 中的格式。这适用于日期类型。 | 读/写 |
timestampFormat
|
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]
|
设置一个字符串,表示时间戳格式。自定义日期格式遵循 日期时间模式 中的格式。这适用于时间戳类型。 | 读/写 |
timestampNTZFormat
|
yyyy-MM-dd'T'HH:mm:ss[.SSS] | 设置一个字符串,表示无时区的时间戳格式。自定义日期格式遵循 日期时间模式 中的格式。这适用于没有时区类型的时间戳,注意在写入或读取此数据类型时不支持区域偏移和时区组件。 | 读/写 |
enableDateTimeParsingFallback
|
如果时间解析器策略具有遗留设置或未提供自定义日期或时间戳模式,则启用。 | 如果值不匹配设置的模式,则允许回退到向后兼容(Spark 1.x和2.0)解析日期和时间戳的行为。 | 读 |
multiLine
|
false
|
每个文件解析一条记录,该记录可能跨多行。JSON内置函数忽略此选项。 | 读 |
allowUnquotedControlChars
|
false
|
允许JSON字符串包含不带引号的控制字符(值小于32的ASCII字符,包括制表符和换行符)或不允许。 | 读 |
encoding
|
在
multiLine
设置为
true
时自动检测(用于读取),
UTF-8
(用于写入)
|
用于读取,允许强制设置JSON文件的标准基本或扩展编码之一。例如UTF-16BE,UTF-32LE。用于写入,指定保存的json文件的编码(字符集)。JSON内置函数忽略此选项。 | 读/写 |
lineSep
|
\r
,
\r\n
,
\n
(用于读取),
\n
(用于写入)
|
定义应在解析时使用的行分隔符。JSON内置函数忽略此选项。 | 读/写 |
samplingRatio
|
1.0
|
定义用于模式推断的输入JSON对象的比例。 | 读 |
dropFieldIfAllNull
|
false
|
在模式推断期间是否忽略所有null值或空数组的列。 | 读 |
locale
|
en-US
|
使用IETF BCP 47格式设置区域作为语言标签。例如,在解析日期和时间戳时使用
locale
。
|
读 |
allowNonNumericNumbers
|
true
|
允许JSON解析器将“非数字”(NaN)标记集识别为合法浮点数值。
|
读 |
compression
|
(无) | 保存到文件时使用的压缩编解码器。这可以是已知的不区分大小写的缩写名称之一(无,bzip2,gzip,lz4,snappy和deflate)。JSON内置函数忽略此选项。 | 写 |
ignoreNullFields
|
(
spark.sql.jsonGenerator.ignoreNullFields
配置的值)
|
生成JSON对象时是否忽略null字段。 | 写 |
其他通用选项可以在 通用文件源选项 中找到。