JSON 文件

Spark SQL可以自动推断JSON数据集的模式并将其加载为DataFrame。 这种转换可以通过 SparkSession.read.json 在JSON文件上完成。

请注意,提供的文件作为 json 文件 并不是一个典型的 JSON 文件。每行必须包含一个独立的、自包含的有效 JSON 对象。有关更多信息,请参见 JSON Lines 文本格式,也称为换行分隔的 JSON

对于常规的多行JSON文件,将 multiLine 参数设置为 True

# spark 来自之前的例子。
sc = spark.sparkContext
# 一个 JSON 数据集由路径指向。
# 该路径可以是一个单一的文本文件或存储文本文件的目录
path = "examples/src/main/resources/people.json"
peopleDF = spark.read.json(path)
# 推断出的模式可以使用 printSchema() 方法进行可视化
peopleDF.printSchema()
# root
#  |-- age: long (nullable = true)
#  |-- name: string (nullable = true)

# 使用 DataFrame 创建一个临时视图
peopleDF.createOrReplaceTempView("people")
# 可以使用 spark 提供的 sql 方法运行 SQL 语句
teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
# +------+
# |  name|
# +------+
# |Justin|
# +------+

# 另外,可以为由
# 一个 RDD[String] 表示的 JSON 数据集创建 DataFrame,每个字符串存储一个 JSON 对象
jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']
otherPeopleRDD = sc.parallelize(jsonStrings)
otherPeople = spark.read.json(otherPeopleRDD)
otherPeople.show()
# +---------------+----+
# |        address|name|
# +---------------+----+
# |[Columbus,Ohio]| Yin|
# +---------------+----+
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.

Spark SQL可以自动推断JSON数据集的架构并将其加载为一个 Dataset[Row] 。 此转换可以使用 SparkSession.read.json() Dataset[String] 或JSON文件上进行。

请注意,作为 一个 json 文件 提供的文件不是典型的 JSON 文件。每一行必须包含一个单独的、自包含的有效 JSON 对象。有关更多信息,请参见 JSON Lines 文本格式,也称为换行分隔 JSON

对于常规多行 JSON 文件,将 multiLine 选项设置为 true

// 原始类型 (Int, String, 等) 和 产品类型 (案例类) 的编码器
// 通过在创建数据集时导入该内容来支持。
import spark.implicits._
// 通过路径指向一个 JSON 数据集。
// 路径可以是单个文本文件或存储文本文件的目录
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)
// 可以使用 printSchema() 方法可视化推断的模式
peopleDF.printSchema()
// root
//  |-- age: long (nullable = true)
//  |-- name: string (nullable = true)
// 使用 DataFrame 创建一个临时视图
peopleDF.createOrReplaceTempView("people")
// 可以使用 spark 提供的 sql 方法运行 SQL 语句
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
// +------+
// |  name|
// +------+
// |Justin|
// +------+
// 或者,可以为一个由字符串表示的 JSON 数据集创建 DataFrame
// 每个字符串存储一个 JSON 对象
val otherPeopleDataset = spark.createDataset(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople.show()
// +---------------+----+
// |        address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.

Spark SQL可以自动推断JSON数据集的模式并将其加载为一个 Dataset 。 此转换可以使用 SparkSession.read().json() Dataset 或者JSON文件上完成。

请注意,提供的文件作为 json文件 并不是一个典型的JSON文件。每行必须包含一个独立的、自给自足的有效JSON对象。有关更多信息,请参见 JSON Lines文本格式,也称为换行分隔的JSON

对于常规的多行JSON文件,将 multiLine 选项设置为 true

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// JSON 数据集由路径指向。
// 路径可以是单个文本文件或存储文本文件的目录
Dataset<Row> people = spark.read().json("examples/src/main/resources/people.json");
// 推断的模式可以使用 printSchema() 方法进行可视化
people.printSchema();
// root
//  |-- age: long (nullable = true)
//  |-- name: string (nullable = true)
// 使用 DataFrame 创建一个临时视图
people.createOrReplaceTempView("people");
// 可以通过使用 spark 提供的 sql 方法运行 SQL 语句
Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
namesDF.show();
// +------+
// |  name|
// +------+
// |Justin|
// +------+
// 或者,可以为一个由每个字符串表示一个 JSON 对象的 Dataset 创建 DataFrame。
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());
Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);
anotherPeople.show();
// +---------------+----+
// |        address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+
Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.

Spark SQL 可以自动推断 JSON 数据集的结构并将其加载为 DataFrame。使用 read.json() 函数,它从 JSON 文件目录加载数据,每行文件都是一个 JSON 对象。

请注意,作为 json文件 提供的文件并不是典型的JSON文件。每一行必须包含一个独立的、自包含的有效JSON对象。有关更多信息,请参见 JSON Lines文本格式,也称为换行分隔的JSON

对于常规的多行JSON文件,将命名参数 multiLine 设置为 TRUE

# 一个 JSON 数据集由路径指向。
# 路径可以是单个文本文件或存储文本文件的目录。
path <- "examples/src/main/resources/people.json"
# 从由路径指向的文件创建一个 DataFrame
people <- read.json(path)
# 推断的模式可以使用 printSchema() 方法可视化。
printSchema(people)
## root
##  |-- age: long (nullable = true)
##  |-- name: string (nullable = true)
# 将此 DataFrame 注册为一个表。
createOrReplaceTempView(people, "people")
# 可以使用 sql 方法运行 SQL 语句。
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
##     name
## 1 Justin
Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.
CREATE TEMPORARY VIEW jsonTable
USING org.apache.spark.sql.json
OPTIONS (
path "examples/src/main/resources/people.json"
)
SELECT * FROM jsonTable

数据源选项

JSON的数据源选项可以通过以下方式设置:

属性名称 默认值 含义 范围
timeZone ( spark.sql.session.timeZone 配置的值) 设置一个字符串,表示用于格式化JSON数据源或分区值中的时间戳的时区ID。支持以下格式的 timeZone
  • 基于区域的区域ID:应具有'区域/城市'的形式,例如'America/Los_Angeles'。
  • 区域偏移:应采用'(+|-)HH:mm'格式,例如'-08:00'或'+01:00'。'UTC'和'Z'也支持作为'+00:00'的别名。
其他短名称如'CST'不推荐使用,因为它们可能存在歧义。
读/写
primitivesAsString false 将所有原始值推断为字符串类型。
prefersDecimal false 将所有浮点值推断为十进制类型。如果值不适合十进制,则将其推断为双精度。
allowComments false 忽略JSON记录中的Java/C++样式注释。
allowUnquotedFieldNames false 允许不带引号的JSON字段名称。
allowSingleQuotes true 除了双引号外,还允许单引号。
allowNumericLeadingZeros false 允许数字中的前导零(例如:00012)。
allowBackslashEscapingAnyCharacter false 允许使用反斜杠转义机制对所有字符进行引用。
mode PERMISSIVE 允许处理解析过程中损坏记录的模式。
  • PERMISSIVE :遇到损坏记录时,将错误字符串放入由 columnNameOfCorruptRecord 配置的字段中,并将错误字段设置为 null 。要保留损坏记录,用户可以在用户定义的模式中设置一个名为 columnNameOfCorruptRecord 的字符串类型字段。如果模式中没有该字段,则在解析过程中会丢弃损坏记录。在推断模式时,隐式地在输出模式中添加 columnNameOfCorruptRecord 字段。
  • DROPMALFORMED :忽略所有损坏的记录。此模式在JSON内置函数中不受支持。
  • FAILFAST :遇到损坏记录时抛出异常。
columnNameOfCorruptRecord ( spark.sql.columnNameOfCorruptRecord 配置的值) 允许重命名由 PERMISSIVE 模式创建的包含错误字符串的新字段。这会覆盖spark.sql.columnNameOfCorruptRecord。
dateFormat yyyy-MM-dd 设置一个字符串,表示日期格式。自定义日期格式遵循 日期时间模式 中的格式。这适用于日期类型。 读/写
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] 设置一个字符串,表示时间戳格式。自定义日期格式遵循 日期时间模式 中的格式。这适用于时间戳类型。 读/写
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] 设置一个字符串,表示无时区的时间戳格式。自定义日期格式遵循 日期时间模式 中的格式。这适用于没有时区类型的时间戳,注意在写入或读取此数据类型时不支持区域偏移和时区组件。 读/写
enableDateTimeParsingFallback 如果时间解析器策略具有遗留设置或未提供自定义日期或时间戳模式,则启用。 如果值不匹配设置的模式,则允许回退到向后兼容(Spark 1.x和2.0)解析日期和时间戳的行为。
multiLine false 每个文件解析一条记录,该记录可能跨多行。JSON内置函数忽略此选项。
allowUnquotedControlChars false 允许JSON字符串包含不带引号的控制字符(值小于32的ASCII字符,包括制表符和换行符)或不允许。
encoding multiLine 设置为 true 时自动检测(用于读取), UTF-8 (用于写入) 用于读取,允许强制设置JSON文件的标准基本或扩展编码之一。例如UTF-16BE,UTF-32LE。用于写入,指定保存的json文件的编码(字符集)。JSON内置函数忽略此选项。 读/写
lineSep \r , \r\n , \n (用于读取), \n (用于写入) 定义应在解析时使用的行分隔符。JSON内置函数忽略此选项。 读/写
samplingRatio 1.0 定义用于模式推断的输入JSON对象的比例。
dropFieldIfAllNull false 在模式推断期间是否忽略所有null值或空数组的列。
locale en-US 使用IETF BCP 47格式设置区域作为语言标签。例如,在解析日期和时间戳时使用 locale
allowNonNumericNumbers true 允许JSON解析器将“非数字”(NaN)标记集识别为合法浮点数值。
  • +INF :表示正无穷大,以及 +Infinity Infinity 的别名。
  • -INF :表示负无穷大,别名 -Infinity
  • NaN :表示其他非数字,例如除以零的结果。
compression (无) 保存到文件时使用的压缩编解码器。这可以是已知的不区分大小写的缩写名称之一(无,bzip2,gzip,lz4,snappy和deflate)。JSON内置函数忽略此选项。
ignoreNullFields ( spark.sql.jsonGenerator.ignoreNullFields 配置的值) 生成JSON对象时是否忽略null字段。

其他通用选项可以在 通用文件源选项 中找到。