CSV 文件

Spark SQL提供 spark.read().csv("file_name") 从CSV格式的文件或文件夹中读取数据到Spark DataFrame,并使用 dataframe.write().csv("path") 将数据写入CSV文件。函数 option() 可以用来定制读取或写入的行为,例如控制头部、分隔符字符、字符集等。

# spark 来自前面的示例
sc = spark.sparkContext
# CSV 数据集由路径指向。
# 路径可以是单个 CSV 文件或 CSV 文件的目录
path = "examples/src/main/resources/people.csv"
df = spark.read.csv(path)
df.show()
# +------------------+
# |               _c0|
# +------------------+
# |      name;age;job|
# |Jorge;30;Developer|
# |  Bob;32;Developer|
# +------------------+

# 读取带有分隔符的 CSV,默认分隔符为 ","
df2 = spark.read.option("delimiter", ";").csv(path)
df2.show()
# +-----+---+---------+
# |  _c0|_c1|      _c2|
# +-----+---+---------+
# | name|age|      job|
# |Jorge| 30|Developer|
# |  Bob| 32|Developer|
# +-----+---+---------+

# 读取带有分隔符和表头的 CSV
df3 = spark.read.option("delimiter", ";").option("header", True).csv(path)
df3.show()
# +-----+---+---------+
# | name|age|      job|
# +-----+---+---------+
# |Jorge| 30|Developer|
# |  Bob| 32|Developer|
# +-----+---+---------+

# 你还可以使用 options() 来使用多个选项
df4 = spark.read.options(delimiter=";", header=True).csv(path)
# "output" 是一个包含多个 CSV 文件和一个 _SUCCESS 文件的文件夹。
df3.write.csv("output")
# 读取文件夹中的所有文件,请确保文件夹中只存在 CSV 文件。
folderPath = "examples/src/main/resources"
df5 = spark.read.csv(folderPath)
df5.show()
# 错误的模式,因为读取了非 CSV 文件
# +-----------+
# |        _c0|
# +-----------+
# |238val_238|
# |  86val_86|
# |311val_311|
# |  27val_27|
# |165val_165|
# +-----------+
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.
// 一个 CSV 数据集由路径指向。
// 路径可以是单个 CSV 文件或 CSV 文件的目录。
val path = "examples/src/main/resources/people.csv"
val df = spark.read.csv(path)
df.show()
// +------------------+
// |               _c0|
// +------------------+
// |      name;age;job|
// |Jorge;30;Developer|
// |  Bob;32;Developer|
// +------------------+
// 读取带有分隔符的 CSV,默认分隔符为 ","
val df2 = spark.read.option("delimiter", ";").csv(path)
df2.show()
// +-----+---+---------+
// |  _c0|_c1|      _c2|
// +-----+---+---------+
// | name|age|      job|
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+
// 读取带有分隔符和表头的 CSV
val df3 = spark.read.option("delimiter", ";").option("header", "true").csv(path)
df3.show()
// +-----+---+---------+
// | name|age|      job|
// +-----+---+---------+
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+
// 还可以使用 options() 来使用多个选项
val df4 = spark.read.options(Map("delimiter"->";", "header"->"true")).csv(path)
// "output" 是一个文件夹,包含多个 CSV 文件和一个 _SUCCESS 文件。
df3.write.csv("output")
// 读取文件夹中的所有文件,请确保该文件夹中只存在 CSV 文件。
val folderPath = "examples/src/main/resources";
val df5 = spark.read.csv(folderPath);
df5.show();
// 错误的模式,因为读取了非 CSV 文件
// +-----------+
// |        _c0|
// +-----------+
// |238val_238|
// |  86val_86|
// |311val_311|
// |  27val_27|
// |165val_165|
// +-----------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// CSV 数据集由路径指向。
// 路径可以是单个 CSV 文件或 CSV 文件的目录。
String path = "examples/src/main/resources/people.csv";
Dataset<Row> df = spark.read().csv(path);
df.show();
// +------------------+
// |               _c0|
// +------------------+
// |      name;age;job|
// |Jorge;30;Developer|
// |  Bob;32;Developer|
// +------------------+
// 读取带分隔符的 CSV,默认分隔符为 ","
Dataset<Row> df2 = spark.read().option("delimiter", ";").csv(path);
df2.show();
// +-----+---+---------+
// |  _c0|_c1|      _c2|
// +-----+---+---------+
// | name|age|      job|
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+
// 读取带分隔符和表头的 CSV
Dataset<Row> df3 = spark.read().option("delimiter", ";").option("header", "true").csv(path);
df3.show();
// +-----+---+---------+
// | name|age|      job|
// +-----+---+---------+
// |Jorge| 30|Developer|
// |  Bob| 32|Developer|
// +-----+---+---------+
// 您也可以使用 options() 来使用多个选项
java.util.Map<String, String> optionsMap = new java.util.HashMap<String, String>();
optionsMap.put("delimiter",";");
optionsMap.put("header","true");
Dataset<Row> df4 = spark.read().options(optionsMap).csv(path);
// "output" 是一个包含多个 CSV 文件和一个 _SUCCESS 文件的文件夹。
df3.write().csv("output");
// 读取文件夹中的所有文件,请确保文件夹中只包含 CSV 文件。
String folderPath = "examples/src/main/resources";
Dataset<Row> df5 = spark.read().csv(folderPath);
df5.show();
// 错误的schema,因为读取了非CSV文件
// +-----------+
// |        _c0|
// +-----------+
// |238val_238|
// |  86val_86|
// |311val_311|
// |  27val_27|
// |165val_165|
// +-----------+
Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.

数据源选项

可以通过以下方式设置CSV的数据源选项:

属性名称 默认值 含义 范围
sep , 为每个字段和值设置一个分隔符。这个分隔符可以是一个或多个字符。 读/写
encoding UTF-8 用于读取时,按照给定的编码类型解码CSV文件。用于写入时,指定保存的CSV文件的编码(字符集)。CSV内置函数忽略此选项。 读/写
quote " 设置用于转义被引用值的单个字符,其中分隔符可能是值的一部分。对于读取,如果希望关闭引号,需设置为空字符串,而不是 null 。对于写入,如果设置为空字符串,将使用 u0000 (空字符)。 读/写
quoteAll false 一个指示所有值是否应该始终用引号括起来的标志。默认情况下仅转义包含引号字符的值。
escape \ 设置用于在已经被引号括起来的值内部转义引号的单个字符。 读/写
escapeQuotes true 一个指示包含引号的值是否应始终用引号括起来的标志。默认情况下转义所有包含引号字符的值。
comment 设置用于跳过以此字符开头的行的单个字符。默认情况下,此功能是禁用的。
header false 对于读取,将第一行用作列名。对于写入,将列名写为第一行。注意,如果给定的路径是字符串的RDD,则此标题选项将在存在时删除与标题相同的所有行。CSV内置函数忽略此选项。 读/写
inferSchema false 自动从数据推断输入模式。它需要遍历数据一次。CSV内置函数忽略此选项。
preferDate true 在模式推断( inferSchema )期间,尝试将包含日期的字符串列推断为 Date ,如果值满足 dateFormat 选项或默认日期格式。对于包含日期和时间戳的混合列,尝试将其推断为 TimestampType ,如果未指定时间戳格式,否则推断为 StringType
enforceSchema true 如果设置为 true ,则指定或推断的模式将被强制应用于数据源文件,CSV文件中的头部将被忽略。如果选项设置为 false ,则在 header 选项设置为 true 时,模式将根据CSV文件中的所有头部进行验证。模式中的字段名称和CSV头中的列名称按其位置进行检查,考虑 spark.sql.caseSensitive 。尽管默认值为true,但建议禁用 enforceSchema 选项以避免不正确的结果。CSV内置函数忽略此选项。
ignoreLeadingWhiteSpace false (读取时), true (写入时) 一个指示在读取/写入时是否应跳过值的前导空白的标志。 读/写
ignoreTrailingWhiteSpace false (读取时), true (写入时) 一个指示在读取/写入时是否应跳过值的尾随空白的标志。 读/写
nullValue 设置null值的字符串表示。从2.0.1开始,此 nullValue 参数适用于包括字符串类型在内的所有支持类型。 读/写
nanValue NaN 设置非数字值的字符串表示。
positiveInf Inf 设置正无穷大值的字符串表示。
negativeInf -Inf 设置负无穷大值的字符串表示。
dateFormat yyyy-MM-dd 设置指示日期格式的字符串。自定义日期格式遵循 日期时间模式 中的格式。这适用于日期类型。 读/写
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] 设置指示时间戳格式的字符串。自定义日期格式遵循 日期时间模式 中的格式。这适用于时间戳类型。 读/写
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] 设置指示无时区时间戳格式的字符串。自定义日期格式遵循 日期时间模式 中的格式。这适用于无时区的时间戳类型,请注意,在写入或读取此数据类型时不支持区偏移和时区组件。 读/写
enableDateTimeParsingFallback 如果时间解析器策略具有遗留设置或未提供自定义日期或时间戳模式,则启用。 允许在值不匹配设置模式时,回退到向后兼容(Spark 1.x和2.0)解析日期和时间戳的行为。
maxColumns 20480 定义一条记录可以拥有的最大列数。
maxCharsPerColumn -1 定义读取的任何给定值允许的最大字符数。默认情况下,-1表示无限长度
mode PERMISSIVE 允许在解析过程中处理损坏记录的模式。支持以下大小写不敏感的模式。注意,Spark在列修剪下仅尝试解析CSV中的必需列。因此,损坏记录可能基于所需的字段集各不相同。此行为可以通过 spark.sql.csv.parser.columnPruning.enabled (默认启用)进行控制。
  • PERMISSIVE :当遇到损坏记录时,将错误格式的字符串放入由 columnNameOfCorruptRecord 配置的字段中,并将错误格式的字段设置为 null 。为了保留损坏记录,用户可以在用户定义的模式中设置名为 columnNameOfCorruptRecord 的字符串类型字段。如果模式中没有该字段,则在解析期间将删除损坏记录。具有少于/多于模式令牌的记录不会被认为是CSV的损坏记录。当遇到的记录的令牌少于模式的长度时,将 null 设置为额外字段。当记录的令牌多于模式的长度时,丢弃额外的令牌。
  • DROPMALFORMED :忽略整个损坏记录。此模式在CSV内置函数中不受支持。
  • FAILFAST :当遇到损坏记录时抛出异常。
columnNameOfCorruptRecord ( spark.sql.columnNameOfCorruptRecord 配置的值) 允许重命名由 PERMISSIVE 模式创建的包含错误格式字符串的新字段。这将覆盖 spark.sql.columnNameOfCorruptRecord
multiLine false 每个文件解析一条可能跨越多行的记录。CSV内置函数忽略此选项。
charToEscapeQuoteEscaping escape \0 设置用于转义引号字符转义的单个字符。当转义字符和引号字符不同时,默认值为转义字符,否则为 \0 读/写
samplingRatio 1.0 定义用于模式推断的行的比例。CSV内置函数忽略此选项。
emptyValue (读取时), "" (写入时) 设置空值的字符串表示。 读/写
locale en-US 设置IETF BCP 47格式的语言标签作为区域设置。例如,在解析日期和时间戳时使用。
lineSep \r \r\n \n (读取时), \n (写入时) 定义用于解析/写入的行分隔符。最大长度为1个字符。CSV内置函数忽略此选项。 读/写
unescapedQuoteHandling STOP_AT_DELIMITER 定义CsvParser如何处理带有未转义引号的值。
  • STOP_AT_CLOSING_QUOTE :如果在输入中发现未转义引号,累积引号字符并将该值当作带引号的值进行解析,直到找到闭合引号。
  • BACK_TO_DELIMITER :如果在输入中发现未转义引号,将该值视为未带引号的值。这将使解析器累积当前解析值的所有字符,直到找到分隔符。如果在值中未找到分隔符,则解析器将继续累积输入中的字符,直到找到分隔符或行结束。
  • STOP_AT_DELIMITER :如果在输入中发现未转义引号,将该值视为未带引号的值。这将使解析器累积所有字符,直到在输入中找到分隔符或行结束。
  • SKIP_VALUE :如果在输入中发现未转义引号,将跳过为给定值解析的内容,而是产生在nullValue中设置的值。
  • RAISE_ERROR :如果在输入中发现未转义引号,将抛出TextParsingException。
compression (无) 在保存到文件时使用的压缩编解码器。可以是已知的小写不敏感缩写名称之一( none bzip2 gzip lz4 snappy deflate )。CSV内置函数忽略此选项。

其他通用选项可以在 通用文件源选项 中找到。