CSV 文件
Spark SQL提供
spark.read().csv("file_name")
从CSV格式的文件或文件夹中读取数据到Spark DataFrame,并使用
dataframe.write().csv("path")
将数据写入CSV文件。函数
option()
可以用来定制读取或写入的行为,例如控制头部、分隔符字符、字符集等。
# spark 来自前面的示例
sc = spark.sparkContext
# CSV 数据集由路径指向。
# 路径可以是单个 CSV 文件或 CSV 文件的目录
path = "examples/src/main/resources/people.csv"
df = spark.read.csv(path)
df.show()
# +------------------+
# | _c0|
# +------------------+
# | name;age;job|
# |Jorge;30;Developer|
# | Bob;32;Developer|
# +------------------+
# 读取带有分隔符的 CSV,默认分隔符为 ","
df2 = spark.read.option("delimiter", ";").csv(path)
df2.show()
# +-----+---+---------+
# | _c0|_c1| _c2|
# +-----+---+---------+
# | name|age| job|
# |Jorge| 30|Developer|
# | Bob| 32|Developer|
# +-----+---+---------+
# 读取带有分隔符和表头的 CSV
df3 = spark.read.option("delimiter", ";").option("header", True).csv(path)
df3.show()
# +-----+---+---------+
# | name|age| job|
# +-----+---+---------+
# |Jorge| 30|Developer|
# | Bob| 32|Developer|
# +-----+---+---------+
# 你还可以使用 options() 来使用多个选项
df4 = spark.read.options(delimiter=";", header=True).csv(path)
# "output" 是一个包含多个 CSV 文件和一个 _SUCCESS 文件的文件夹。
df3.write.csv("output")
# 读取文件夹中的所有文件,请确保文件夹中只存在 CSV 文件。
folderPath = "examples/src/main/resources"
df5 = spark.read.csv(folderPath)
df5.show()
# 错误的模式,因为读取了非 CSV 文件
# +-----------+
# | _c0|
# +-----------+
# |238val_238|
# | 86val_86|
# |311val_311|
# | 27val_27|
# |165val_165|
# +-----------+
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.
// 一个 CSV 数据集由路径指向。
// 路径可以是单个 CSV 文件或 CSV 文件的目录。
val path = "examples/src/main/resources/people.csv"
val df = spark.read.csv(path)
df.show()
// +------------------+
// | _c0|
// +------------------+
// | name;age;job|
// |Jorge;30;Developer|
// | Bob;32;Developer|
// +------------------+
// 读取带有分隔符的 CSV,默认分隔符为 ","
val df2 = spark.read.option("delimiter", ";").csv(path)
df2.show()
// +-----+---+---------+
// | _c0|_c1| _c2|
// +-----+---+---------+
// | name|age| job|
// |Jorge| 30|Developer|
// | Bob| 32|Developer|
// +-----+---+---------+
// 读取带有分隔符和表头的 CSV
val df3 = spark.read.option("delimiter", ";").option("header", "true").csv(path)
df3.show()
// +-----+---+---------+
// | name|age| job|
// +-----+---+---------+
// |Jorge| 30|Developer|
// | Bob| 32|Developer|
// +-----+---+---------+
// 还可以使用 options() 来使用多个选项
val df4 = spark.read.options(Map("delimiter"->";", "header"->"true")).csv(path)
// "output" 是一个文件夹,包含多个 CSV 文件和一个 _SUCCESS 文件。
df3.write.csv("output")
// 读取文件夹中的所有文件,请确保该文件夹中只存在 CSV 文件。
val folderPath = "examples/src/main/resources";
val df5 = spark.read.csv(folderPath);
df5.show();
// 错误的模式,因为读取了非 CSV 文件
// +-----------+
// | _c0|
// +-----------+
// |238val_238|
// | 86val_86|
// |311val_311|
// | 27val_27|
// |165val_165|
// +-----------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// CSV 数据集由路径指向。
// 路径可以是单个 CSV 文件或 CSV 文件的目录。
String path = "examples/src/main/resources/people.csv";
Dataset<Row> df = spark.read().csv(path);
df.show();
// +------------------+
// | _c0|
// +------------------+
// | name;age;job|
// |Jorge;30;Developer|
// | Bob;32;Developer|
// +------------------+
// 读取带分隔符的 CSV,默认分隔符为 ","
Dataset<Row> df2 = spark.read().option("delimiter", ";").csv(path);
df2.show();
// +-----+---+---------+
// | _c0|_c1| _c2|
// +-----+---+---------+
// | name|age| job|
// |Jorge| 30|Developer|
// | Bob| 32|Developer|
// +-----+---+---------+
// 读取带分隔符和表头的 CSV
Dataset<Row> df3 = spark.read().option("delimiter", ";").option("header", "true").csv(path);
df3.show();
// +-----+---+---------+
// | name|age| job|
// +-----+---+---------+
// |Jorge| 30|Developer|
// | Bob| 32|Developer|
// +-----+---+---------+
// 您也可以使用 options() 来使用多个选项
java.util.Map<String, String> optionsMap = new java.util.HashMap<String, String>();
optionsMap.put("delimiter",";");
optionsMap.put("header","true");
Dataset<Row> df4 = spark.read().options(optionsMap).csv(path);
// "output" 是一个包含多个 CSV 文件和一个 _SUCCESS 文件的文件夹。
df3.write().csv("output");
// 读取文件夹中的所有文件,请确保文件夹中只包含 CSV 文件。
String folderPath = "examples/src/main/resources";
Dataset<Row> df5 = spark.read().csv(folderPath);
df5.show();
// 错误的schema,因为读取了非CSV文件
// +-----------+
// | _c0|
// +-----------+
// |238val_238|
// | 86val_86|
// |311val_311|
// | 27val_27|
// |165val_165|
// +-----------+
Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.
数据源选项
可以通过以下方式设置CSV的数据源选项:
-
的
.option
/.options
方法-
DataFrameReader
-
DataFrameWriter
-
DataStreamReader
-
DataStreamWriter
-
-
下面的内置函数
-
from_csv
-
to_csv
-
schema_of_csv
-
-
OPTIONS
子句在 创建表使用数据源
属性名称 | 默认值 | 含义 | 范围 |
---|---|---|---|
sep
|
, | 为每个字段和值设置一个分隔符。这个分隔符可以是一个或多个字符。 | 读/写 |
encoding
|
UTF-8 | 用于读取时,按照给定的编码类型解码CSV文件。用于写入时,指定保存的CSV文件的编码(字符集)。CSV内置函数忽略此选项。 | 读/写 |
quote
|
" |
设置用于转义被引用值的单个字符,其中分隔符可能是值的一部分。对于读取,如果希望关闭引号,需设置为空字符串,而不是
null
。对于写入,如果设置为空字符串,将使用
u0000
(空字符)。
|
读/写 |
quoteAll
|
false | 一个指示所有值是否应该始终用引号括起来的标志。默认情况下仅转义包含引号字符的值。 | 写 |
escape
|
\ | 设置用于在已经被引号括起来的值内部转义引号的单个字符。 | 读/写 |
escapeQuotes
|
true | 一个指示包含引号的值是否应始终用引号括起来的标志。默认情况下转义所有包含引号字符的值。 | 写 |
comment
|
设置用于跳过以此字符开头的行的单个字符。默认情况下,此功能是禁用的。 | 读 | |
header
|
false | 对于读取,将第一行用作列名。对于写入,将列名写为第一行。注意,如果给定的路径是字符串的RDD,则此标题选项将在存在时删除与标题相同的所有行。CSV内置函数忽略此选项。 | 读/写 |
inferSchema
|
false | 自动从数据推断输入模式。它需要遍历数据一次。CSV内置函数忽略此选项。 | 读 |
preferDate
|
true |
在模式推断(
inferSchema
)期间,尝试将包含日期的字符串列推断为
Date
,如果值满足
dateFormat
选项或默认日期格式。对于包含日期和时间戳的混合列,尝试将其推断为
TimestampType
,如果未指定时间戳格式,否则推断为
StringType
。
|
读 |
enforceSchema
|
true |
如果设置为
true
,则指定或推断的模式将被强制应用于数据源文件,CSV文件中的头部将被忽略。如果选项设置为
false
,则在
header
选项设置为
true
时,模式将根据CSV文件中的所有头部进行验证。模式中的字段名称和CSV头中的列名称按其位置进行检查,考虑
spark.sql.caseSensitive
。尽管默认值为true,但建议禁用
enforceSchema
选项以避免不正确的结果。CSV内置函数忽略此选项。
|
读 |
ignoreLeadingWhiteSpace
|
false
(读取时),
true
(写入时)
|
一个指示在读取/写入时是否应跳过值的前导空白的标志。 | 读/写 |
ignoreTrailingWhiteSpace
|
false
(读取时),
true
(写入时)
|
一个指示在读取/写入时是否应跳过值的尾随空白的标志。 | 读/写 |
nullValue
|
设置null值的字符串表示。从2.0.1开始,此
nullValue
参数适用于包括字符串类型在内的所有支持类型。
|
读/写 | |
nanValue
|
NaN | 设置非数字值的字符串表示。 | 读 |
positiveInf
|
Inf | 设置正无穷大值的字符串表示。 | 读 |
negativeInf
|
-Inf | 设置负无穷大值的字符串表示。 | 读 |
dateFormat
|
yyyy-MM-dd | 设置指示日期格式的字符串。自定义日期格式遵循 日期时间模式 中的格式。这适用于日期类型。 | 读/写 |
timestampFormat
|
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] | 设置指示时间戳格式的字符串。自定义日期格式遵循 日期时间模式 中的格式。这适用于时间戳类型。 | 读/写 |
timestampNTZFormat
|
yyyy-MM-dd'T'HH:mm:ss[.SSS] | 设置指示无时区时间戳格式的字符串。自定义日期格式遵循 日期时间模式 中的格式。这适用于无时区的时间戳类型,请注意,在写入或读取此数据类型时不支持区偏移和时区组件。 | 读/写 |
enableDateTimeParsingFallback
|
如果时间解析器策略具有遗留设置或未提供自定义日期或时间戳模式,则启用。 | 允许在值不匹配设置模式时,回退到向后兼容(Spark 1.x和2.0)解析日期和时间戳的行为。 | 读 |
maxColumns
|
20480 | 定义一条记录可以拥有的最大列数。 | 读 |
maxCharsPerColumn
|
-1 | 定义读取的任何给定值允许的最大字符数。默认情况下,-1表示无限长度 | 读 |
mode
|
PERMISSIVE |
允许在解析过程中处理损坏记录的模式。支持以下大小写不敏感的模式。注意,Spark在列修剪下仅尝试解析CSV中的必需列。因此,损坏记录可能基于所需的字段集各不相同。此行为可以通过
spark.sql.csv.parser.columnPruning.enabled
(默认启用)进行控制。
|
读 |
columnNameOfCorruptRecord
|
(
spark.sql.columnNameOfCorruptRecord
配置的值)
|
允许重命名由
PERMISSIVE
模式创建的包含错误格式字符串的新字段。这将覆盖
spark.sql.columnNameOfCorruptRecord
。
|
读 |
multiLine
|
false | 每个文件解析一条可能跨越多行的记录。CSV内置函数忽略此选项。 | 读 |
charToEscapeQuoteEscaping
|
escape
或
\0
|
设置用于转义引号字符转义的单个字符。当转义字符和引号字符不同时,默认值为转义字符,否则为
\0
。
|
读/写 |
samplingRatio
|
1.0 | 定义用于模式推断的行的比例。CSV内置函数忽略此选项。 | 读 |
emptyValue
|
(读取时),
""
(写入时)
|
设置空值的字符串表示。 | 读/写 |
locale
|
en-US | 设置IETF BCP 47格式的语言标签作为区域设置。例如,在解析日期和时间戳时使用。 | 读 |
lineSep
|
\r
、
\r\n
和
\n
(读取时),
\n
(写入时)
|
定义用于解析/写入的行分隔符。最大长度为1个字符。CSV内置函数忽略此选项。 | 读/写 |
unescapedQuoteHandling
|
STOP_AT_DELIMITER |
定义CsvParser如何处理带有未转义引号的值。
|
读 |
compression
|
(无) |
在保存到文件时使用的压缩编解码器。可以是已知的小写不敏感缩写名称之一(
none
、
bzip2
、
gzip
、
lz4
、
snappy
和
deflate
)。CSV内置函数忽略此选项。
|
写 |
其他通用选项可以在 通用文件源选项 中找到。