pyspark.SparkContext.newAPIHadoopFile ¶
-
SparkContext.
newAPIHadoopFile
( path : str , inputFormatClass : str , keyClass : str , valueClass : str , keyConverter : Optional [ str ] = None , valueConverter : Optional [ str ] = None , conf : Optional [ Dict [ str , str ] ] = None , batchSize : int = 0 ) → pyspark.rdd.RDD [ Tuple [ T , U ] ] [source] ¶ -
从HDFS、本地文件系统(在所有节点上可用)或任何支持Hadoop的文件系统URI中读取带有任意键和值类的新API Hadoop InputFormat。机制与 SparkContext.sequenceFile 相同。
Hadoop 配置可以作为 Python 字典传递。这将转换为 Java 中的 Configuration
新增于版本 1.1.0。
- Parameters
-
- path str
-
Hadoop文件的路径
- inputFormatClass str
-
Hadoop InputFormat 的完全限定类名 (例如 “org.apache.hadoop.mapreduce.lib.input.TextInputFormat”)
- keyClass str
-
键Writable类的完全限定类名 (例如“org.apache.hadoop.io.Text”)
- valueClass str
-
值 Writable 类的完全限定类名(例如“org.apache.hadoop.io.LongWritable”)
- keyConverter str, optional
-
返回键 WritableConverter 的函数的完全限定名称 默认为 None
- valueConverter str, optional
-
返回值 WritableConverter 的函数的完全限定名称 默认为 None
- conf dict, optional
-
Hadoop 配置,以字典形式传入 默认为 None
- batchSize int, optional, default 0
-
表示为单个Java对象的Python对象的数量。(默认值为0,自动选择batchSize)
- Returns
-
-
RDD
-
键和对应值的元组RDD
-
另请参阅
示例
>>> import os >>> import tempfile
设置相关类
>>> output_format_class = "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat" >>> input_format_class = "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat" >>> key_class = "org.apache.hadoop.io.IntWritable" >>> value_class = "org.apache.hadoop.io.Text"
>>> with tempfile.TemporaryDirectory() as d: ... path = os.path.join(d, "new_hadoop_file") ... ... # Write a temporary Hadoop file ... rdd = sc.parallelize([(1, ""), (1, "a"), (3, "x")]) ... rdd.saveAsNewAPIHadoopFile(path, output_format_class, key_class, value_class) ... ... loaded = sc.newAPIHadoopFile(path, input_format_class, key_class, value_class) ... collected = sorted(loaded.collect())
>>> collected [(1, ''), (1, 'a'), (3, 'x')]