MapReduce教程

目的

本文档全面介绍了Hadoop MapReduce框架的所有面向用户的方面,并作为教程使用。

先决条件

确保Hadoop已安装、配置并正在运行。更多详情:

概述

Hadoop MapReduce是一个软件框架,用于轻松编写应用程序,以可靠、容错的方式在大型商用硬件集群(数千个节点)上并行处理海量数据(多TB数据集)。

一个MapReduce 作业通常将输入数据集分割成独立的块,这些块由映射任务以完全并行的方式处理。框架会对映射的输出进行排序,然后将其作为归约任务的输入。通常,作业的输入和输出都存储在文件系统中。框架负责调度任务、监控任务并重新执行失败的任务。

通常计算节点和存储节点是相同的,也就是说,MapReduce框架和Hadoop分布式文件系统(参见HDFS架构指南)运行在同一组节点上。这种配置允许框架在已存在数据的节点上高效调度任务,从而在整个集群中实现极高的聚合带宽。

MapReduce框架由一个主节点ResourceManager、每个集群节点上的工作节点NodeManager以及每个应用程序对应的MRAppMaster组成(参见YARN架构指南)。

至少,应用程序需要指定输入/输出位置,并通过实现适当的接口和/或抽象类来提供mapreduce函数。这些参数以及其他作业参数共同构成了作业配置

Hadoop的作业客户端随后将作业(jar/可执行文件等)和配置提交给ResourceManager,后者负责将软件/配置分发给工作节点、调度任务并监控它们,同时向作业客户端提供状态和诊断信息。

尽管Hadoop框架是用Java™实现的,但MapReduce应用程序并不需要用Java编写。

  • Hadoop Streaming 是一个实用工具,允许用户使用任何可执行文件(例如shell工具)作为mapper和/或reducer来创建和运行作业。

  • Hadoop Pipes 是一个兼容 SWIG 的 C++ API,用于实现 MapReduce 应用程序(非基于 JNI™)。

输入与输出

MapReduce框架专门处理键值对,也就是说,该框架将作业的输入视为一组键值对,并生成一组可能类型不同的键值对作为作业输出。

keyvalue类必须能被框架序列化,因此需要实现Writable接口。此外,key类还必须实现WritableComparable接口以便框架进行排序。

MapReduce作业的输入和输出类型:

(输入) -> map -> -> combine -> -> reduce -> (输出)

示例:WordCount v1.0

在深入细节之前,让我们通过一个MapReduce应用示例来了解它们的工作原理。

WordCount 是一个简单的应用程序,用于统计给定输入集中每个单词的出现次数。

这适用于本地独立、伪分布式或完全分布式Hadoop安装(Single Node Setup)。

源代码

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

使用说明

假设环境变量设置如下:

export JAVA_HOME=/usr/java/default
export PATH=${JAVA_HOME}/bin:${PATH}
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar

编译 WordCount.java 并创建 jar 包:

$ bin/hadoop com.sun.tools.javac.Main WordCount.java
$ jar cf wc.jar WordCount*.class

假设:

  • /user/joe/wordcount/input - HDFS中的输入目录
  • /user/joe/wordcount/output - HDFS中的输出目录

作为输入的示例文本文件:

$ bin/hadoop fs -ls /user/joe/wordcount/input/
/user/joe/wordcount/input/file01
/user/joe/wordcount/input/file02

$ bin/hadoop fs -cat /user/joe/wordcount/input/file01
Hello World Bye World

$ bin/hadoop fs -cat /user/joe/wordcount/input/file02
Hello Hadoop Goodbye Hadoop

运行应用程序:

$ bin/hadoop jar wc.jar WordCount /user/joe/wordcount/input /user/joe/wordcount/output

输出:

$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2

应用程序可以通过-files选项指定一个逗号分隔的路径列表,这些路径将出现在任务的当前工作目录中。-libjars选项允许应用程序将jar文件添加到map和reduce任务的类路径中。-archives选项则允许传递逗号分隔的归档文件列表作为参数。这些归档文件会被解压,并在任务的当前工作目录中创建以归档文件名命名的链接。更多关于命令行选项的详细信息请参阅Commands Guide

使用-libjars-files-archives运行wordcount示例:

bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files cachefile.txt -libjars mylib.jar -archives myarchive.zip input output

在这里,myarchive.zip 将被放置并解压到一个名为“myarchive.zip”的目录中。

用户可以通过在-files-archives选项中使用#符号,为传递的文件和归档指定不同的符号名称。

例如,

bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files dir1/dict.txt#dict1,dir2/dict.txt#dict2 -archives mytar.tgz#tgzdir input output

在这里,任务可以通过符号名称dict1和dict2分别访问文件dir1/dict.txt和dir2/dict.txt。归档文件mytar.tgz将被放置并解压到一个名为"tgzdir"的目录中。

应用程序可以通过在命令行中使用选项-Dmapreduce.map.env、-Dmapreduce.reduce.env和-Dyarn.app.mapreduce.am.env分别为mapper、reducer和application master任务指定环境变量。

例如,以下设置会为mappers和reducers设置环境变量FOO_VAR=bar和LIST_VAR=a,b,c,

bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -Dmapreduce.map.env.FOO_VAR=bar -Dmapreduce.map.env.LIST_VAR=a,b,c -Dmapreduce.reduce.env.FOO_VAR=bar -Dmapreduce.reduce.env.LIST_VAR=a,b,c input output

操作指南

WordCount应用程序非常简单明了。

public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {
  StringTokenizer itr = new StringTokenizer(value.toString());
  while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    context.write(word, one);
  }
}

Mapper实现通过map方法,按照指定的TextInputFormat格式逐行处理数据。然后使用StringTokenizer将每行按空格分割成多个词元,并输出< , 1>格式的键值对。

对于给定的示例输入,第一个map发出:

< Hello, 1>
< World, 1>
< Bye, 1>
< World, 1>

第二个map发出:

< Hello, 1>
< Hadoop, 1>
< Goodbye, 1>
< Hadoop, 1>

我们将在教程稍后部分详细了解为给定作业生成的映射任务数量,以及如何以细粒度方式控制它们。

    job.setCombinerClass(IntSumReducer.class);

WordCount还指定了一个combiner。因此,每个map的输出在按键排序后,会通过本地combiner(根据作业配置,该combiner与Reducer相同)进行本地聚合。

第一个map的输出:

< Bye, 1>
< Hello, 1>
< World, 2>

第二个map的输出:

< Goodbye, 1>
< Hadoop, 2>
< Hello, 1>
public void reduce(Text key, Iterable<IntWritable> values,
                   Context context
                   ) throws IOException, InterruptedException {
  int sum = 0;
  for (IntWritable val : values) {
    sum += val.get();
  }
  result.set(sum);
  context.write(key, result);
}

Reducer实现通过reduce方法对值进行求和,这些值是每个键(在本例中为单词)的出现次数。

因此,该作业的输出为:

< Bye, 1>
< Goodbye, 1>
< Hadoop, 2>
< Hello, 2>
< World, 2>

main方法在Job中指定作业的各个方面,例如输入/输出路径(通过命令行传递)、键/值类型、输入/输出格式等。然后调用job.waitForCompletion提交作业并监控其进度。

我们将在本教程稍后部分更详细地学习JobInputFormatOutputFormat以及其他接口和类。

MapReduce - 用户界面

本节详细介绍了MapReduce框架中所有面向用户的方面。这将帮助用户以细粒度的方式实现、配置和优化他们的作业。但请注意,每个类/接口的javadoc仍然是最全面的文档;本文档仅作为教程使用。

首先让我们来看MapperReducer接口。应用程序通常通过实现这些接口来提供mapreduce方法。

接下来我们将讨论其他核心接口,包括JobPartitionerInputFormatOutputFormat等。

最后,我们将总结讨论该框架的一些有用功能,例如DistributedCacheIsolationRunner等。

有效载荷

应用程序通常实现MapperReducer接口来提供mapreduce方法。这些构成了任务的核心。

Mapper

Mapper 将输入的键/值对映射为一组中间键/值对。

Map是将输入记录转换为中间记录的独立任务。转换后的中间记录不必与输入记录类型相同。一个给定的输入键值对可以映射为零个或多个输出键值对。

Hadoop MapReduce框架会为作业的InputFormat生成的每个InputSplit生成一个map任务。

总体而言,mapper实现通过Job.setMapperClass(Class)方法传递给作业。框架随后会为任务中的InputSplit里的每个键/值对调用map(WritableComparable, Writable, Context)。应用程序可以重写cleanup(Context)方法来执行任何必要的清理操作。

输出键值对不需要与输入键值对类型相同。一个给定的输入键值对可以映射到零个或多个输出键值对。通过调用context.write(WritableComparable, Writable)来收集输出键值对。

应用程序可以使用Counter来报告其统计信息。

与给定输出键关联的所有中间值随后由框架进行分组,并传递给Reducer以确定最终输出。用户可以通过Job.setGroupingComparatorClass(Class)指定Comparator来控制分组过程。

Mapper的输出会被排序,然后按Reducer进行分区。分区总数与作业的reduce任务数相同。用户可以通过实现自定义的Partitioner来控制哪些键(以及记录)发送到哪个Reducer

用户可以选择性地通过Job.setCombinerClass(Class)指定一个combiner,用于对中间输出进行本地聚合,这有助于减少从Mapper传输到Reducer的数据量。

中间排序后的输出始终以简单的(key-len, key, value-len, value)格式存储。应用程序可以控制是否压缩中间输出以及通过Configuration指定要使用的CompressionCodec压缩编解码器。

需要多少个Map任务?

map的数量通常由输入数据的总大小决定,也就是输入文件块的总数。

对于map任务来说,每个节点合适的并行度大约在10-100个map之间,尽管对于CPU负载很轻的map任务可以设置到300个。任务启动需要一些时间,因此最好让每个map任务至少执行一分钟。

因此,如果您预计有10TB的输入数据且块大小为128MB,最终将产生82,000个映射任务,除非使用Configuration.set(MRJobConfig.NUM_MAPS, int)(该设置仅作为框架的参考提示)将其设置得更高。

Reducer

Reducer 将共享同一个键的一组中间值缩减为更小的一组值。

作业的reduce数量由用户通过Job.setNumReduceTasks(int)设置。

总体而言,Reducer实现会通过Job.setReducerClass(Class)方法接收作业的Job对象,并可重写该方法进行自身初始化。框架随后会对分组输入中的每个键值对调用reduce(WritableComparable, Iterable, Context)方法。应用程序还可以重写cleanup(Context)方法来执行必要的清理操作。

Reducer 包含3个主要阶段:shuffle(洗牌)、sort(排序)和reduce(归约)。

Shuffle

Reducer的输入是经过排序的mapper输出。在此阶段,框架通过HTTP获取所有mapper输出的相关分区。

排序

在此阶段,该框架按键对Reducer的输入进行分组(因为不同的映射器可能输出了相同的键)。

shuffle和排序阶段同时进行;在获取map输出时会对它们进行合并。

二次排序

如果需要将中间键的分组等价规则与归约前的键分组规则设为不同,则可以通过Job.setSortComparatorClass(Class)指定一个Comparator。由于Job.setGroupingComparatorClass(Class)可用于控制中间键的分组方式,因此这两者可以结合使用来模拟值的二次排序

Reduce

在此阶段,对于分组输入中的每个键值对,都会调用reduce(WritableComparable, Iterable, Context)方法。

reduce任务的输出通常通过Context.write(WritableComparable, Writable)写入FileSystem

应用程序可以使用Counter来报告其统计信息。

Reducer的输出结果未经排序

需要多少个Reduce任务?

正确的reduce任务数量似乎是0.951.75乘以(<节点数量> * <每个节点的最大容器数>)。

当设置为0.95时,所有reduce任务可以立即启动,并在map任务完成后开始传输map输出结果。当设置为1.75时,速度较快的节点将完成第一轮reduce任务,并启动第二轮reduce任务,从而实现更好的负载均衡效果。

增加reduce任务数量会增加框架开销,但能提高负载均衡并降低故障成本。

上述缩放因子略小于整数,以便在框架中保留一些reduce槽位用于推测任务和失败任务。

Reducer 无

如果不希望进行归约操作,将reduce任务的数量设置为是合法的。

在这种情况下,map任务的输出直接写入FileSystem,进入由FileOutputFormat.setOutputPath(Job, Path)设置的输出路径。框架在将map输出写入FileSystem之前不会对其进行排序。

分区器

Partitioner 对键空间进行分区。

分区器(Partitioner)控制着中间map输出键的分区方式。通常通过哈希函数,使用键(或键的子集)来推导分区。分区总数与作业的reduce任务数相同。因此这决定了中间键(以及记录)会被发送到哪个m个reduce任务中进行归约处理。

HashPartitioner 是默认的 Partitioner

计数器

Counter 是MapReduce应用程序用于报告其统计数据的工具。

MapperReducer 实现可以使用 Counter 来报告统计信息。

Hadoop MapReduce 附带了一个,其中包含通常有用的映射器、归约器和分区器。

作业配置

Job 表示一个MapReduce作业的配置。

Job 是用户向Hadoop框架描述MapReduce作业以供执行的主要接口。框架会尽力按照Job的描述忠实地执行作业,但是:

Job 通常用于指定 Mapper、合并器(如果有)、PartitionerReducerInputFormatOutputFormat 的实现。FileInputFormat 用于指定输入文件集(通过FileInputFormat.setInputPaths(Job, Path…)/ FileInputFormat.addInputPath(Job, Path))以及(FileInputFormat.setInputPaths(Job, String…)/ FileInputFormat.addInputPaths(Job, String))),并确定输出文件的写入位置(FileOutputFormat.setOutputPath(Path))。

可选地,Job用于指定作业的其他高级选项,例如要使用的Comparator、放入DistributedCache的文件、是否压缩中间和/或作业输出(以及如何压缩)、作业任务是否以推测方式执行(setMapSpeculativeExecution(boolean))/ setReduceSpeculativeExecution(boolean))、每个任务的最大尝试次数(setMaxMapAttempts(int)/ setMaxReduceAttempts(int))等。

当然,用户可以使用Configuration.set(String, String)/Configuration.get(String)来设置/获取应用程序所需的任意参数。但对于大量(只读)数据,请使用DistributedCache

任务执行与环境

MRAppMaster 在单独的 JVM 中以子进程形式执行 Mapper/Reducer 任务

子任务会继承父任务MRAppMaster的运行环境。用户可以通过mapreduce.{map|reduce}.java.opts参数和Job配置项为子JVM指定额外选项,例如通过-Djava.library.path=<>设置运行时链接器搜索共享库的非标准路径等。如果mapreduce.{map|reduce}.java.opts参数中包含符号@taskid@,该符号会被替换为MapReduce任务的taskid实际值。

以下是一个包含多个参数和替换的示例,展示了JVM垃圾回收日志记录,以及启动一个无需密码的JVM JMX智能体,以便能够连接jconsole等工具来监控子进程内存、线程状态并获取线程转储。同时,该示例将map和reduce子JVM的最大堆内存分别设置为512MB和1024MB。此外,它还向子JVM的java.library.path添加了一个额外路径。

<property>
  <name>mapreduce.map.java.opts</name>
  <value>
  -Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc
  -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
  </value>
</property>

<property>
  <name>mapreduce.reduce.java.opts</name>
  <value>
  -Xmx1024M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc
  -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
  </value>
</property>

内存管理

用户/管理员还可以使用mapreduce.{map|reduce}.memory.mb来指定启动的子任务及其递归启动的任何子进程的最大虚拟内存。请注意,此处设置的值是每个进程的限制。mapreduce.{map|reduce}.memory.mb的值应以兆字节(MB)为单位指定。此外,该值必须大于或等于传递给JavaVM的-Xmx参数,否则虚拟机可能无法启动。

注意:mapreduce.{map|reduce}.java.opts仅用于配置从MRAppMaster启动的子任务。守护进程的内存选项配置请参阅Configuring the Environment of the Hadoop Daemons

框架某些部分可用的内存也是可配置的。在map和reduce任务中,通过调整影响操作并发性和数据落盘频率的参数,可能会对性能产生影响。监控作业的文件系统计数器——尤其是与map输出字节数和reduce输入字节数相关的指标——对于调优这些参数至关重要。

Map参数

从map任务发出的记录会被序列化到一个缓冲区中,元数据则存入统计缓冲区。如下文选项所述,当序列化缓冲区或元数据的容量超过阈值时,缓冲区内容将在后台进行排序并写入磁盘,同时map任务继续输出记录。若在溢出过程中任一缓冲区完全填满,map线程将被阻塞。当map任务完成时,所有剩余记录会被写入磁盘,所有磁盘上的分段将合并为单个文件。减少磁盘溢出次数可以缩短map任务时间,但更大的缓冲区也会减少mapper可用的内存。

名称 类型 描述
mapreduce.task.io.sort.mb int 存储从map任务发出的记录的序列化和统计缓冲区的累积大小,单位为兆字节。
mapreduce.map.sort.spill.percent float 序列化缓冲区中的软限制阈值。一旦达到该值,后台线程就会开始将内容溢出到磁盘。

其他注意事项

  • 如果在溢出过程中任一溢出阈值被超过,收集操作将持续到溢出完成。例如,如果mapreduce.map.sort.spill.percent设置为0.33,且在溢出运行时缓冲区的剩余部分被填满,下一次溢出将包含所有已收集的记录(即缓冲区的0.66部分),而不会产生额外的溢出。换句话说,这些阈值定义的是触发条件,而非阻塞条件。

  • 大于序列化缓冲区的记录将首先触发溢出,然后被溢出到一个单独的文件中。该记录是否会先经过合并器是未定义的。

Shuffle/Reduce 参数

如前所述,每个reduce任务通过HTTP获取由Partitioner分配给它的输出到内存中,并定期将这些输出合并到磁盘。如果启用了map输出的中间压缩,每个输出会被解压到内存中。以下选项会影响reduce之前合并到磁盘的频率以及在reduce期间分配给map输出的内存。

名称 类型 描述
mapreduce.task.io.soft.factor int 指定磁盘上同时合并的段数量。该参数限制了合并过程中打开的文件和压缩编解码器的数量。如果文件数量超过此限制,合并将分多次进行。虽然此限制也适用于map阶段,但大多数作业应配置为不太可能在map阶段触及此限制。
mapreduce.reduce.merge.inmem.thresholds int 在合并到磁盘之前,被提取到内存中的已排序map输出数量。与前面说明中的溢出阈值类似,这不是定义分区单位,而是定义触发器。实际上,这个值通常设置得非常高(1000)或禁用(0),因为合并内存中的段通常比从磁盘合并成本更低(参见本表后面的说明)。此阈值仅影响shuffle期间内存合并的频率。
mapreduce.reduce.shuffle.merge.percent float 启动内存合并前已获取的map输出内存阈值,表示为分配给内存存储map输出的百分比。由于无法放入内存的map输出可能会被阻塞,设置过高可能会降低获取与合并之间的并行度。相反,对于输入能完全放入内存的reduce任务,设置为1.0这样的高值已被证明是有效的。此参数仅影响shuffle期间内存合并的频率。
mapreduce.reduce.shuffle.input.buffer.percent float The percentage of memory- relative to the maximum heapsize as typically specified in mapreduce.reduce.java.opts- that can be allocated to storing map outputs during the shuffle. Though some memory should be set aside for the framework, in general it is advantageous to set this high enough to store large and numerous map outputs.
mapreduce.reduce.input.buffer.percent float 在reduce阶段可以保留map输出结果的内存百分比(相对于最大堆内存)。当reduce开始时,map输出结果会被合并到磁盘,直到剩余部分低于此参数定义的资源限制。默认情况下,所有map输出都会在reduce开始前合并到磁盘,以最大化reduce可用的内存。对于内存消耗较少的reduce任务,可以增加此值以避免磁盘读写操作。

其他注意事项

  • 如果map输出的大小超过分配给复制map输出的内存的25%,它将直接写入磁盘,而无需先通过内存暂存。

  • 当运行带有combiner时,关于高合并阈值和大缓冲区的推理可能不成立。对于在所有map输出获取完成前启动的合并操作,combiner会在数据溢出到磁盘时运行。在某些情况下,通过投入资源来合并map输出(使磁盘溢出量变小并实现溢出与获取的并行化),而不是激进地增加缓冲区大小,可以获得更好的reduce时间。

  • 当将内存中的map输出合并到磁盘以开始reduce操作时,如果由于存在待溢出的分段且磁盘上已有至少mapreduce.task.io.sort.factor个分段而需要进行中间合并,则内存中的map输出将参与该中间合并过程。

已配置参数

以下属性在每个任务执行的作业配置中被本地化:

名称 类型 描述
mapreduce.job.id String 作业ID
mapreduce.job.jar String job.jar在作业目录中的位置
mapreduce.job.local.dir String 作业特定的共享临时空间
mapreduce.task.id String 任务ID
mapreduce.task.attempt.id String 任务尝试ID
mapreduce.task.is.map boolean 这是否是一个map任务
mapreduce.task.partition int 作业中任务的ID
mapreduce.map.input.file String map任务正在读取的源文件名
mapreduce.map.input.start long map输入分片的起始偏移量
mapreduce.map.input.length long map输入分片的字节数
mapreduce.task.output.dir String 任务的临时输出目录

注意:在流式作业执行期间,"mapreduce"参数的名称会被转换。点号(.)会变成下划线(_)。例如,mapreduce.job.id会变成mapreduce_job_id,mapreduce.job.jar会变成mapreduce_job_jar。要在流式作业的mapper/reducer中获取这些值,请使用带下划线的参数名称。

任务日志

任务的标准输出(stdout)、错误输出(stderr)流以及系统日志由NodeManager读取,并记录到${HADOOP_LOG_DIR}/userlogs目录中。

分发库

DistributedCache 也可用于分发jar包和本地库,供map和/或reduce任务使用。子JVM始终会将其当前工作目录添加到java.library.pathLD_LIBRARY_PATH中。因此可以通过System.loadLibrarySystem.load加载缓存的库。关于如何通过分布式缓存加载共享库的更多细节,请参阅Native Libraries文档。

作业提交与监控

Job 是用户作业与 ResourceManager 交互的主要接口。

Job 提供了提交作业、跟踪进度、访问组件任务报告和日志、获取MapReduce集群状态信息等功能。

作业提交过程包括:

  1. 检查作业的输入和输出规范。

  2. 计算作业的InputSplit值。

  3. 如有必要,为作业的DistributedCache设置必要的记账信息。

  4. 将作业的jar包和配置文件复制到FileSystem上的MapReduce系统目录中。

  5. 将作业提交到ResourceManager并可选地监控其状态。

作业历史文件也会记录到用户指定的目录 mapreduce.jobhistory.intermediate-done-dirmapreduce.jobhistory.done-dir,默认情况下是作业输出目录。

用户可以通过以下命令查看指定目录中的历史日志摘要 $ mapred job -history output.jhist 该命令将打印作业详情、失败和被终止的任务详情。如需查看更详细的作业信息(例如每个任务的成功尝试和任务尝试次数),可以使用以下命令 $ mapred job -history all output.jhist

通常用户使用Job来创建应用程序,描述作业的各个方面,提交作业并监控其进度。

作业控制

用户可能需要串联多个MapReduce作业来完成单个MapReduce作业无法处理的复杂任务。这相当容易实现,因为作业的输出通常会写入分布式文件系统,而这些输出又可以被用作下一个作业的输入。

然而,这也意味着确保作业完成(成功/失败)的责任完全落在客户端身上。在这种情况下,各种作业控制选项包括:

作业输入

InputFormat 描述了MapReduce作业的输入规范。

MapReduce框架依赖于作业的InputFormat来实现:

  1. 验证作业的输入规范。

  2. 将输入文件分割成逻辑上的InputSplit实例,每个实例随后会被分配给单独的Mapper

  3. 提供RecordReader实现,用于从逻辑InputSplit中提取输入记录以供Mapper处理。

基于文件的InputFormat实现的默认行为(通常是FileInputFormat的子类)是根据输入文件的总字节大小将输入分割为逻辑InputSplit实例。然而,输入文件的FileSystem块大小被视为输入分割的上限。分割大小的下限可以通过mapreduce.input.fileinputformat.split.minsize来设置。

显然,基于输入大小的逻辑分片对许多应用来说是不够的,因为必须遵守记录边界。在这种情况下,应用程序应该实现一个RecordReader,它负责遵守记录边界,并向单个任务提供面向记录的逻辑InputSplit视图。

TextInputFormat 是默认的 InputFormat

如果TextInputFormat是某个作业的InputFormat,框架会检测带有.gz扩展名的输入文件,并使用适当的CompressionCodec自动解压它们。但需要注意的是,具有上述扩展名的压缩文件无法被分割,每个压缩文件都由单个映射器完整处理。

InputSplit

InputSplit 表示由单个 Mapper 处理的数据。

通常InputSplit提供面向字节的输入视图,而RecordReader负责处理并呈现面向记录的视图。

FileSplit 是默认的 InputSplit。它会将 mapreduce.map.input.file 设置为逻辑分片对应的输入文件路径。

RecordReader

RecordReaderInputSplit 中读取 键值对。

通常,RecordReader会将由InputSplit提供的面向字节的输入视图转换为面向记录的视图,供Mapper实现处理。因此,RecordReader承担了处理记录边界的职责,并为任务提供键和值。

作业输出

OutputFormat 描述了MapReduce作业的输出规范。

MapReduce框架依赖于作业的OutputFormat来实现:

  1. 验证作业的输出规范;例如,检查输出目录是否已存在。

  2. 提供用于写入作业输出文件的RecordWriter实现。输出文件存储在FileSystem中。

TextOutputFormat 是默认的 OutputFormat

OutputCommitter

OutputCommitter 描述了MapReduce作业中任务输出的提交过程。

MapReduce框架依赖于作业的OutputCommitter来实现以下功能:

  1. 在初始化期间设置作业。例如,在作业初始化期间为作业创建临时输出目录。当作业处于PREP状态并在初始化任务之后,作业设置由一个单独的任务完成。一旦设置任务完成,作业将移动到RUNNING状态。

  2. 在作业完成后进行清理工作。例如,在作业完成后移除临时输出目录。作业清理由作业结束时的一个独立任务完成。作业在清理任务完成后被声明为成功/失败/已终止。

  3. 设置任务的临时输出。任务设置作为同一任务的一部分,在任务初始化期间完成。

  4. 检查任务是否需要提交。这是为了避免在任务不需要提交时执行提交流程。

  5. 提交任务输出。任务完成后,如有需要,该任务将提交其输出。

  6. 放弃任务提交。如果任务已经失败/被终止,其输出将被清理。如果任务无法清理(在异常块中),将启动一个具有相同尝试ID的单独任务来执行清理。

FileOutputCommitter是默认的OutputCommitter。作业设置/清理任务会占用NodeManager上可用的map或reduce容器。其中JobCleanup任务、TaskCleanup任务和JobSetup任务具有最高优先级,且优先级按此顺序排列。

任务副作用文件

在某些应用中,组件任务需要创建和/或写入辅助文件,这些文件与实际的工作输出文件不同。

在这种情况下,可能会出现同一个MapperReducer的两个实例同时运行(例如推测性任务)并尝试在FileSystem上打开和/或写入同一文件(路径)的问题。因此,应用程序编写者必须为每个任务尝试(例如使用attemptid,如attempt_200709221812_0001_m_000000_0)选择唯一名称,而不仅仅是为每个任务选择。

为避免这些问题,当OutputCommitterFileOutputCommitter时,MapReduce框架会在FileSystem上为每个任务尝试维护一个特殊的${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}子目录(可通过${mapreduce.task.output.dir}访问),用于存储该任务尝试的输出。当任务尝试成功完成时,${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}中的文件(仅限该目录)会被提升${mapreduce.output.fileoutputformat.outputdir}。当然,框架会丢弃失败任务尝试的子目录。这个过程对应用程序完全透明。

应用程序编写者可以利用这一特性,通过在任务执行期间使用FileOutputFormat.getWorkOutputPath(Conext)${mapreduce.task.output.dir}中创建所需的任何辅助文件,框架会为成功的任务尝试同样地提升这些文件,从而消除了为每个任务尝试选择唯一路径的需要。

注意:在执行特定任务尝试期间,${mapreduce.task.output.dir}的实际值为${mapreduce.output.fileoutputformat.outputdir}/_temporary/_{$taskid},该值由MapReduce框架设置。因此,只需在MapReduce任务中通过FileOutputFormat.getWorkOutputPath(Conext)返回的路径中创建任何辅助文件,即可利用此功能。

整个讨论同样适用于reducer=NONE(即0个reducer)的作业映射,因为在这种情况下,映射的输出会直接写入HDFS。

RecordWriter

RecordWriter 将输出的 键值对写入输出文件。

RecordWriter实现将作业输出写入FileSystem

其他实用功能

向队列提交作业

用户将作业提交到队列。队列作为作业的集合,使系统能够提供特定功能。例如,队列使用ACL来控制哪些用户可以提交作业。队列主要供Hadoop调度器使用。

Hadoop默认配置了一个名为'default'的强制性队列。队列名称在Hadoop站点配置的mapreduce.job.queuename属性中定义。某些作业调度器(如Capacity Scheduler)支持多队列配置。

作业通过mapreduce.job.queuename属性或Configuration.set(MRJobConfig.QUEUE_NAME, String) API定义需要提交到的队列。设置队列名称是可选的。如果作业提交时未关联队列名称,则会被提交到'default'默认队列。

计数器

Counters代表全局计数器,由MapReduce框架或应用程序定义。每个Counter可以是任意Enum类型。特定Enum的计数器会被分组到Counters.Group类型的组中。

应用程序可以定义任意的Counters(类型为Enum),并通过在map和/或reduce方法中调用Counters.incrCounter(Enum, long)或Counters.incrCounter(String, String, long)来更新它们。这些计数器随后会被框架进行全局聚合。

DistributedCache

DistributedCache 高效地分发应用程序特定的大型只读文件。

DistributedCache 是MapReduce框架提供的一种机制,用于缓存应用程序所需的文件(文本、归档文件、jar包等)。

应用程序通过Job中的URL(hdfs://)指定需要缓存的文件。DistributedCache假定通过hdfs:// URL指定的文件已经存在于FileSystem上。

该框架会在任何任务在该工作节点上执行之前,将必要文件复制到工作节点。其高效性源于每个作业仅需复制一次文件的能力,以及可以缓存这些在worker节点上解压的归档文件。

DistributedCache会跟踪缓存文件的修改时间戳。显然,在作业执行期间,应用程序或外部程序不应修改这些缓存文件。

DistributedCache 可用于分发简单的只读数据/文本文件以及更复杂的类型,如压缩包和jar文件。压缩包(zip、tar、tgz和tar.gz文件)在工作节点上会被解压。文件会被设置执行权限

可以通过设置属性mapreduce.job.cache.{files |archives}来分发文件/归档文件。如果需要分发多个文件/归档文件,可以用逗号分隔的路径来添加。这些属性也可以通过APIJob.addCacheFile(URI)/Job.addCacheArchive(URI)Job.setCacheFiles(URI[])/Job.setCacheArchives(URI[])来设置,其中URI的格式为hdfs://host:port/absolute-path#link-name。在Streaming中,可以通过命令行选项-cacheFile/-cacheArchive来分发文件。

DistributedCache也可以作为基础软件分发机制,用于map和/或reduce任务中。它可用于分发jar包和本地库。Job.addArchiveToClassPath(Path)Job.addFileToClassPath(Path)接口可用于缓存文件/jar包,并将它们添加到子JVM的classpath中。同样可以通过设置配置属性mapreduce.job.classpath.{files |archives}来实现。类似地,被符号链接到任务工作目录的缓存文件可用于分发本地库并加载它们。

私有与公共分布式缓存文件

DistributedCache文件可以是私有的或公共的,这决定了它们在工作节点上的共享方式。

  • "私有" DistributedCache 文件会被缓存在用户专属的本地目录中,只有该用户的任务才能使用这些文件。这些文件仅由特定用户的所有任务和作业共享,工作节点上的其他用户作业无法访问。DistributedCache 文件的私有性取决于其在文件系统(通常是HDFS)上的权限设置。如果文件没有全局可读权限,或者文件所在目录路径没有全局可执行权限以供查找,则该文件将成为私有文件。

  • "公共" DistributedCache 文件会被缓存在全局目录中,其文件访问权限设置为对所有用户公开可见。这些文件可以被工作节点上所有用户的任务和作业共享。DistributedCache 文件通过在文件上传的存储系统(通常是 HDFS)上的权限设置而成为公共文件。如果文件具有全局可读权限,并且文件所在路径的目录具有全局可执行权限(用于查找),则该文件将成为公共文件。换句话说,如果用户希望让所有用户都能访问某个文件,必须将该文件的权限设置为全局可读,并且文件路径上的目录权限必须设置为全局可执行。

性能分析

性能分析是一种实用工具,用于获取内置Java分析器在部分map和reduce任务中的代表性样本(2到3个)。

用户可以通过设置配置属性mapreduce.task.profile来指定系统是否应收集作业中某些任务的性能分析信息。该值可通过API Configuration.set(MRJobConfig.TASK_PROFILE, boolean)进行设置。若该值设为true,则启用任务性能分析。分析信息将存储在用户日志目录中。默认情况下,作业不启用性能分析功能。

一旦用户配置需要进行分析,她/他可以使用配置属性mapreduce.task.profile.{maps|reduces}来设置要分析的MapReduce任务范围。该值可以通过API Configuration.set(MRJobConfig.NUM_{MAP|REDUCE}_PROFILES, String)进行设置。默认情况下,指定的范围是0-2

用户还可以通过设置配置属性mapreduce.task.profile.params来指定性能分析器的配置参数。该值可以使用API Configuration.set(MRJobConfig.TASK_PROFILE_PARAMS, String)进行设置。如果字符串中包含%s,在任务运行时它将被替换为性能分析输出文件的名称。这些参数会通过命令行传递给任务子JVM。性能分析参数的默认值为-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s

调试

MapReduce框架提供了一种运行用户自定义脚本进行调试的功能。当MapReduce任务失败时,用户可以运行调试脚本,例如处理任务日志。该脚本可以访问任务的stdout和stderr输出、系统日志以及jobconf。调试脚本的stdout和stderr输出会显示在控制台诊断信息中,同时也作为作业用户界面的一部分展示。

在以下章节中,我们将讨论如何为作业提交调试脚本。该脚本文件需要分发并提交至框架。

如何分发脚本文件:

用户需要使用DistributedCache分发符号链接脚本文件。

如何提交脚本:

快速提交调试脚本的方法是为属性mapreduce.map.debug.scriptmapreduce.reduce.debug.script设置值,分别用于调试map和reduce任务。这些属性也可以通过APIConfiguration.set(MRJobConfig.MAP_DEBUG_SCRIPT, String)Configuration.set(MRJobConfig.REDUCE_DEBUG_SCRIPT, String)来设置。在流模式下,可以使用命令行选项-mapdebug-reducedebug提交调试脚本,分别用于调试map和reduce任务。

脚本的参数包括任务的标准输出、标准错误、系统日志和作业配置文件。调试命令在MapReduce任务失败的节点上运行,格式为:
$script $stdout $stderr $syslog $jobconf

Pipes程序将C++程序名称作为命令的第五个参数。因此对于pipes程序,命令为
$script $stdout $stderr $syslog $jobconf $program

默认行为:

对于管道,默认会运行一个脚本在gdb下处理核心转储,打印堆栈跟踪并提供有关运行线程的信息。

数据压缩

Hadoop MapReduce为应用程序开发者提供了对中间map输出和作业输出(即reduce的输出)进行压缩的功能。它还内置了对zlib压缩算法的CompressionCodec实现。同时支持gzipbzip2snappylz4文件格式。

出于性能(zlib)和Java库不可用的原因,Hadoop还提供了上述压缩编解码器的本地实现。关于它们的使用和可用性的更多详细信息请参见此处

中间输出

应用程序可以通过Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS, boolean) API控制中间map输出的压缩,并通过Configuration.set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, Class) API设置要使用的CompressionCodec

作业输出

应用程序可以通过FileOutputFormat.setCompressOutput(Job, boolean) API控制作业输出的压缩,并且可以通过FileOutputFormat.setOutputCompressorClass(Job, Class) API指定要使用的CompressionCodec

如果作业输出需要存储在SequenceFileOutputFormat中,可以通过SequenceFileOutputFormat.setOutputCompressionType(Job, SequenceFile.CompressionType) API指定所需的SequenceFile.CompressionType(即RECORD/BLOCK - 默认为RECORD)。

跳过错误记录

Hadoop提供了一个选项,可以在处理map输入时跳过一组特定的错误输入记录。应用程序可以通过SkipBadRecords类来控制此功能。

当map任务在某些输入上确定性地崩溃时,可以使用此功能。这通常是由于map函数中的错误导致的。通常情况下,用户需要修复这些错误。然而,有时这是不可能的。例如,错误可能存在于无法获取源代码的第三方库中。在这种情况下,即使经过多次尝试,任务也无法成功完成,作业将失败。通过此功能,只有坏记录周围的一小部分数据会丢失,这对于某些应用程序(例如对非常大的数据进行统计分析)可能是可以接受的。

默认情况下此功能是禁用的。如需启用,请参考SkipBadRecords.setMapperMaxSkipRecords(Configuration, long)SkipBadRecords.setReducerMaxSkipGroups(Configuration, long)

启用此功能后,框架在经历一定数量的map失败后会进入"跳过模式"。更多详情请参阅SkipBadRecords.setAttemptsToStartSkipping(Configuration, int)。在"跳过模式"下,map任务会维护正在处理的记录范围。为此,框架依赖于已处理记录计数器。参见SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDSSkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS。该计数器使框架能够知道有多少记录已成功处理,从而确定导致任务崩溃的记录范围。在后续尝试中,将跳过这些记录范围。

跳过的记录数量取决于应用程序增加处理记录计数器的频率。建议在每条记录处理完成后增加该计数器。对于通常采用批量处理方式的应用,这可能无法实现。在这种情况下,框架可能会跳过坏记录周围的额外记录。用户可以通过SkipBadRecords.setMapperMaxSkipRecords(Configuration, long)SkipBadRecords.setReducerMaxSkipGroups(Configuration, long)控制跳过的记录数量。框架会尝试使用类似二分查找的方法缩小跳过记录的范围。跳过的范围会被分成两半,只执行其中一半。在后续失败时,框架会确定哪一半包含坏记录。任务将被重新执行,直到达到可接受的跳过值或用尽所有任务尝试次数。要增加任务尝试次数,可使用Job.setMaxMapAttempts(int)Job.setMaxReduceAttempts(int)

跳过的记录以序列文件格式写入HDFS,供后续分析。可以通过SkipBadRecords.setSkipOutputPath(JobConf, Path)更改存储位置。

示例:WordCount v2.0

这里是一个更完整的WordCount示例,它使用了我们目前讨论过的MapReduce框架提供的许多功能。

这需要HDFS处于运行状态,特别是对于DistributedCache相关的功能。因此它仅适用于伪分布式完全分布式的Hadoop安装。

源代码

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils;

public class WordCount2 {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    static enum CountersEnum { INPUT_WORDS }

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    private boolean caseSensitive;
    private Set<String> patternsToSkip = new HashSet<String>();

    private Configuration conf;
    private BufferedReader fis;

    @Override
    public void setup(Context context) throws IOException,
        InterruptedException {
      conf = context.getConfiguration();
      caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
      if (conf.getBoolean("wordcount.skip.patterns", false)) {
        URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
        for (URI patternsURI : patternsURIs) {
          Path patternsPath = new Path(patternsURI.getPath());
          String patternsFileName = patternsPath.getName().toString();
          parseSkipFile(patternsFileName);
        }
      }
    }

    private void parseSkipFile(String fileName) {
      try {
        fis = new BufferedReader(new FileReader(fileName));
        String pattern = null;
        while ((pattern = fis.readLine()) != null) {
          patternsToSkip.add(pattern);
        }
      } catch (IOException ioe) {
        System.err.println("Caught exception while parsing the cached file '"
            + StringUtils.stringifyException(ioe));
      }
    }

    @Override
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      String line = (caseSensitive) ?
          value.toString() : value.toString().toLowerCase();
      for (String pattern : patternsToSkip) {
        line = line.replaceAll(pattern, "");
      }
      StringTokenizer itr = new StringTokenizer(line);
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
        Counter counter = context.getCounter(CountersEnum.class.getName(),
            CountersEnum.INPUT_WORDS.toString());
        counter.increment(1);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
    String[] remainingArgs = optionParser.getRemainingArgs();
    if ((remainingArgs.length != 2) && (remainingArgs.length != 4)) {
      System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount2.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    List<String> otherArgs = new ArrayList<String>();
    for (int i=0; i < remainingArgs.length; ++i) {
      if ("-skip".equals(remainingArgs[i])) {
        job.addCacheFile(new Path(remainingArgs[++i]).toUri());
        job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
      } else {
        otherArgs.add(remainingArgs[i]);
      }
    }
    FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

示例运行

作为输入的示例文本文件:

$ bin/hadoop fs -ls /user/joe/wordcount/input/
/user/joe/wordcount/input/file01
/user/joe/wordcount/input/file02

$ bin/hadoop fs -cat /user/joe/wordcount/input/file01
Hello World, Bye World!

$ bin/hadoop fs -cat /user/joe/wordcount/input/file02
Hello Hadoop, Goodbye to hadoop.

运行应用程序:

$ bin/hadoop jar wc.jar WordCount2 /user/joe/wordcount/input /user/joe/wordcount/output

输出:

$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
Bye 1
Goodbye 1
Hadoop, 1
Hello 2
World! 1
World, 1
hadoop. 1
to 1

请注意,这里的输入与我们最初看到的版本有所不同,并且这些差异如何影响输出结果。

现在,让我们通过DistributedCache插入一个模式文件,该文件列出了需要忽略的单词模式。

$ bin/hadoop fs -cat /user/joe/wordcount/patterns.txt
\.
\,
\!
to

再次运行它,这次使用更多选项:

$ bin/hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=true /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt

如预期所示,输出:

$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
Bye 1
Goodbye 1
Hadoop 1
Hello 2
World 2
hadoop 1

再运行一次,这次关闭大小写敏感:

$ bin/hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=false /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt

果然,输出如下:

$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
bye 1
goodbye 1
hadoop 2
hello 2
world 2

亮点

第二个版本的WordCount通过利用MapReduce框架提供的某些特性改进了前一个版本:

  • 演示应用程序如何在Mapper(和Reducer)实现的setup方法中访问配置参数。

  • 演示如何使用DistributedCache来分发作业所需的只读数据。在此示例中,它允许用户在计数时指定要跳过的单词模式。

  • 展示GenericOptionsParser在处理Hadoop通用命令行选项方面的实用性。

  • 演示应用程序如何使用Counters计数器,以及如何设置传递给map映射(和reduce归约)方法的应用程序特定状态信息。

Java和JNI是Oracle America, Inc.在美国及其他国家的商标或注册商标。