pyspark.SparkContext.newAPIHadoopFile

SparkContext. newAPIHadoopFile ( path : str , inputFormatClass : str , keyClass : str , valueClass : str , keyConverter : Optional [ str ] = None , valueConverter : Optional [ str ] = None , conf : Optional [ Dict [ str , str ] ] = None , batchSize : int = 0 ) → pyspark.rdd.RDD [ Tuple [ T , U ] ] [source]

从HDFS、本地文件系统(在所有节点上可用)或任何支持Hadoop的文件系统URI中读取带有任意键和值类的新API Hadoop InputFormat。机制与 SparkContext.sequenceFile 相同。

Hadoop 配置可以作为 Python 字典传递。这将转换为 Java 中的 Configuration

新增于版本 1.1.0。

Parameters
path str

Hadoop文件的路径

inputFormatClass str

Hadoop InputFormat 的完全限定类名 (例如 “org.apache.hadoop.mapreduce.lib.input.TextInputFormat”)

keyClass str

键Writable类的完全限定类名 (例如“org.apache.hadoop.io.Text”)

valueClass str

值 Writable 类的完全限定类名(例如“org.apache.hadoop.io.LongWritable”)

keyConverter str, optional

返回键 WritableConverter 的函数的完全限定名称 默认为 None

valueConverter str, optional

返回值 WritableConverter 的函数的完全限定名称 默认为 None

conf dict, optional

Hadoop 配置,以字典形式传入 默认为 None

batchSize int, optional, default 0

表示为单个Java对象的Python对象的数量。(默认值为0,自动选择batchSize)

Returns
RDD

键和对应值的元组RDD

示例

>>> import os
>>> import tempfile

设置相关类

>>> output_format_class = "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"
>>> input_format_class = "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat"
>>> key_class = "org.apache.hadoop.io.IntWritable"
>>> value_class = "org.apache.hadoop.io.Text"
>>> with tempfile.TemporaryDirectory() as d:
...     path = os.path.join(d, "new_hadoop_file")
...
...     # Write a temporary Hadoop file
...     rdd = sc.parallelize([(1, ""), (1, "a"), (3, "x")])
...     rdd.saveAsNewAPIHadoopFile(path, output_format_class, key_class, value_class)
...
...     loaded = sc.newAPIHadoopFile(path, input_format_class, key_class, value_class)
...     collected = sorted(loaded.collect())
>>> collected
[(1, ''), (1, 'a'), (3, 'x')]