Hadoop Streaming

Hadoop Streaming

Hadoop streaming是Hadoop发行版附带的一个实用工具。该工具允许您使用任何可执行文件或脚本作为mapper和/或reducer来创建和运行Map/Reduce作业。例如:

mapred streaming \
  -input myInputDirs \
  -output myOutputDir \
  -mapper /bin/cat \
  -reducer /usr/bin/wc

流式处理的工作原理

在上述示例中,mapper和reducer都是可执行程序,它们从标准输入(stdin)逐行读取数据,并将结果输出到标准输出(stdout)。该工具会创建一个Map/Reduce作业,将作业提交到合适的集群,并监控作业进度直至完成。

当为映射器指定可执行文件时,每个映射器任务在初始化时会将该可执行文件作为独立进程启动。在映射器任务运行时,它会将输入转换为行数据,并通过进程的标准输入(stdin)逐行传送。同时,映射器会从进程的标准输出(stdout)收集面向行的输出,并将每行转换为键/值对,作为映射器的输出收集。默认情况下,行中第一个制表符之前的内容被视为key,而该行剩余部分(不包括制表符)则作为value。如果行中不存在制表符,则整行被视为键,值为空。不过,这可以通过设置-inputformat命令选项来自定义,后文将对此进行讨论。

当为reducer指定可执行文件时,每个reducer任务会将该可执行文件作为独立进程启动,然后初始化reducer。在reducer任务运行时,它会将输入的键/值对转换为行数据,并通过进程的标准输入(stdin)进行传输。同时,reducer会从进程的标准输出(stdout)收集面向行的输出,将每行转换为键/值对,作为reducer的输出结果。默认情况下,一行中第一个制表符之前的内容作为键,其余部分(不包括制表符)作为值。但这一行为可以通过设置-outputformat命令行选项来自定义,具体将在后文讨论。

这是Map/Reduce框架与流式映射器/归约器之间通信协议的基础。

用户可以指定stream.non.zero.exit.is.failuretruefalse,分别将退出状态非零的流式任务标记为FailureSuccess。默认情况下,退出状态非零的流式任务会被视为失败任务。

流式命令选项

Streaming支持流式命令选项以及通用命令选项。通用命令行语法如下所示。

注意: 请确保将通用选项放在流式选项之前,否则命令将执行失败。示例请参阅Making Archives Available to Tasks

mapred streaming [genericOptions] [streamingOptions]

Hadoop流式处理命令选项如下:

参数 可选/必选 描述
-input 目录名或文件名 必填 mapper的输入位置
-output directoryname 必填 reducer的输出位置
-mapper 可执行文件或Java类名 可选 Mapper可执行文件。如果未指定,默认使用IdentityMapper
-reducer executable 或 JavaClassName 可选 Reducer可执行程序。如果未指定,默认使用IdentityReducer
-file filename 可选 使mapper、reducer或combiner可执行文件在计算节点上本地可用
-inputformat JavaClassName 可选 您提供的类应返回Text类的键/值对。如果未指定,默认使用TextInputFormat
-outputformat JavaClassName 可选 您提供的类应能处理Text类的键值对。如果未指定,默认使用TextOutputformat
-partitioner JavaClassName 可选 决定将键发送到哪个reduce的类
-combiner streamingCommand 或 JavaClassName 可选 用于map输出的Combiner可执行程序
-cmdenv name=value 可选 将环境变量传递给流处理命令
-inputreader 可选 用于向后兼容:指定记录读取器类(而非输入格式类)
-verbose 可选 详细输出
-lazyOutput 可选 延迟创建输出。例如,如果输出格式基于FileOutputFormat,则仅在首次调用Context.write时才会创建输出文件
-numReduceTasks 可选 指定reducer的数量
-mapdebug 可选 当map任务失败时调用的脚本
-reducedebug 可选 当reduce任务失败时调用的脚本

将Java类指定为Mapper/Reducer

你可以提供一个Java类作为mapper和/或reducer。

mapred streaming \
  -input myInputDirs \
  -output myOutputDir \
  -inputformat org.apache.hadoop.mapred.KeyValueTextInputFormat \
  -mapper org.apache.hadoop.mapred.lib.IdentityMapper \
  -reducer /usr/bin/wc

您可以将stream.non.zero.exit.is.failure设置为truefalse,分别使以非零状态退出的流式任务被视为FailureSuccess。默认情况下,以非零状态退出的流式任务会被视为失败任务。

打包文件与作业提交

您可以指定任何可执行文件作为mapper和/或reducer。这些可执行文件不需要预先存在于集群中的机器上;但如果不存在,您需要使用"-file"选项来告知框架将您的可执行文件打包作为作业提交的一部分。例如:

mapred streaming \
  -input myInputDirs \
  -output myOutputDir \
  -mapper myPythonScript.py \
  -reducer /usr/bin/wc \
  -file myPythonScript.py

上述示例指定了一个用户自定义的Python可执行文件作为mapper。选项"-file myPythonScript.py"会使该Python可执行文件作为作业提交的一部分被分发到集群机器上。

除了可执行文件外,您还可以打包mapper和/或reducer可能使用的其他辅助文件(例如字典、配置文件等)。例如:

mapred streaming \
  -input myInputDirs \
  -output myOutputDir \
  -mapper myPythonScript.py \
  -reducer /usr/bin/wc \
  -file myPythonScript.py \
  -file myDictionary.txt

为作业指定其他插件

与普通的Map/Reduce作业一样,您可以为流式作业指定其他插件:

 -inputformat JavaClassName
 -outputformat JavaClassName
 -partitioner JavaClassName
 -combiner streamingCommand or JavaClassName

您为输入格式提供的类应返回Text类的键/值对。如果未指定输入格式类,则默认使用TextInputFormat。由于TextInputFormat返回LongWritable类的键(这些键实际上不属于输入数据的一部分),这些键将被丢弃;只有值会被传输到流式映射器。

您为输出格式提供的类应能处理Text类的键值对。如果未指定输出格式类,则默认使用TextOutputFormat。

设置环境变量

要在流式处理命令中设置环境变量,请使用:

 -cmdenv EXAMPLE_DIR=/home/example/dictionaries/

通用命令选项

Streaming支持流式命令选项以及通用命令选项。通用的命令行语法如下所示。

注意: 请确保将通用选项放在流式选项之前,否则命令将执行失败。具体示例请参阅Making Archives Available to Tasks

hadoop command [genericOptions] [streamingOptions]

这里列出了可以与流式处理一起使用的Hadoop通用命令选项:

参数 可选/必选 描述
-conf configuration_file 可选 指定应用程序配置文件
-D property=value 可选 为指定属性使用该值
-fs 主机:端口 或 本地 可选 指定一个namenode
-files 可选 指定要复制到Map/Reduce集群的逗号分隔文件
-libjars 可选 指定逗号分隔的jar文件以包含在类路径中
-archives 可选 指定要在计算节点上解压的以逗号分隔的归档文件

使用-D选项指定配置变量

您可以通过使用“-D =”来指定额外的配置变量。

指定目录

要更改本地临时目录,请使用:

 -D dfs.data.dir=/tmp

要指定额外的本地临时目录,请使用:

 -D mapred.local.dir=/tmp/local
 -D mapred.system.dir=/tmp/system
 -D mapred.temp.dir=/tmp/temp

注意:有关作业配置参数的更多详情,请参阅:mapred-default.xml

指定仅Map任务

通常,您可能只想使用映射函数处理输入数据。为此,只需将mapreduce.job.reduces设置为零。Map/Reduce框架将不会创建任何归约器任务。相反,映射器任务的输出将成为作业的最终输出。

 -D mapreduce.job.reduces=0

为了向后兼容,Hadoop Streaming还支持“-reducer NONE”选项,该选项等同于“-D mapreduce.job.reduces=0”。

指定Reducer数量

要指定reducer的数量,例如两个,请使用:

mapred streaming \
  -D mapreduce.job.reduces=2 \
  -input myInputDirs \
  -output myOutputDir \
  -mapper /bin/cat \
  -reducer /usr/bin/wc

自定义行如何分割为键/值对

如前所述,当Map/Reduce框架从映射器的标准输出中读取一行时,它会将该行拆分为键/值对。默认情况下,行中第一个制表符之前的前缀作为键,行的其余部分(不包括制表符)作为值。

不过,您可以自定义此默认设置。您可以指定除制表符(默认值)以外的字段分隔符,并且可以指定第n个(n >= 1)字符而非行首字符(默认值)作为键与值之间的分隔符。例如:

mapred streaming \
  -D stream.map.output.field.separator=. \
  -D stream.num.map.output.key.fields=4 \
  -input myInputDirs \
  -output myOutputDir \
  -mapper /bin/cat \
  -reducer /bin/cat

在上述示例中,“-D stream.map.output.field.separator=.”指定“.”作为映射输出的字段分隔符,行中第四个“.”之前的前缀将作为键,行的其余部分(不包括第四个“.”)将作为值。如果一行中少于四个“.”,则整行将作为键,值将是一个空的Text对象(类似于通过new Text("")创建的对象)。

同样,您可以使用“-D stream.reduce.output.field.separator=SEP”和“-D stream.num.reduce.output.fields=NUM”来指定reduce输出行中第n个字段分隔符作为键与值之间的分隔符。

同样,您可以将"stream.map.input.field.separator"和"stream.reduce.input.field.separator"指定为Map/Reduce输入的字段分隔符。默认情况下分隔符是制表符。

处理大文件和归档

-files 和 -archives 选项允许您向任务提供文件和归档资源。参数是您已上传至HDFS的文件或归档的URI地址。这些文件和归档会在多个作业间缓存。您可以从fs.default.name配置变量中获取host和fs_port的值。

注意: -files和-archives选项是通用选项。请确保将通用选项放在命令选项之前,否则命令将执行失败。

为任务提供文件访问

-files选项会在任务的当前工作目录中创建一个指向文件本地副本的符号链接。

在这个例子中,Hadoop会自动在任务的当前工作目录下创建一个名为testfile.txt的符号链接。该符号链接指向testfile.txt的本地副本。

-files hdfs://host:fs_port/user/testfile.txt

用户可以使用#为-files指定不同的符号链接名称。

-files hdfs://host:fs_port/user/testfile.txt#testfile

可以像这样指定多个条目:

-files hdfs://host:fs_port/user/testfile1.txt,hdfs://host:fs_port/user/testfile2.txt

为任务提供归档文件

-archives选项允许您将jar文件本地复制到任务的当前工作目录,并自动解压这些文件。

在此示例中,Hadoop会自动在任务的当前工作目录下创建一个名为testfile.jar的符号链接。该符号链接指向存储上传jar文件解压后内容的目录。

-archives hdfs://host:fs_port/user/testfile.jar

用户可以使用#为-archives指定不同的符号链接名称。

-archives hdfs://host:fs_port/user/testfile.tgz#tgzdir

在这个示例中,input.txt文件包含两行,分别指定了两个文件的名称:cachedir.jar/cache.txt和cachedir.jar/cache2.txt。"cachedir.jar"是指向归档目录的符号链接,该目录包含"cache.txt"和"cache2.txt"两个文件。

mapred streaming \
                -archives 'hdfs://hadoop-nn1.example.com/user/me/samples/cachefile/cachedir.jar' \
                -D mapreduce.job.maps=1 \
                -D mapreduce.job.reduces=1 \
                -D mapreduce.job.name="Experiment" \
                -input "/user/me/samples/cachefile/input.txt" \
                -output "/user/me/samples/cachefile/out" \
                -mapper "xargs cat" \
                -reducer "cat"

$ ls test_jar/
cache.txt  cache2.txt

$ jar cvf cachedir.jar -C test_jar/ .
added manifest
adding: cache.txt(in = 30) (out= 29)(deflated 3%)
adding: cache2.txt(in = 37) (out= 35)(deflated 5%)

$ hdfs dfs -put cachedir.jar samples/cachefile

$ hdfs dfs -cat /user/me/samples/cachefile/input.txt
cachedir.jar/cache.txt
cachedir.jar/cache2.txt

$ cat test_jar/cache.txt
This is just the cache string

$ cat test_jar/cache2.txt
This is just the second cache string

$ hdfs dfs -ls /user/me/samples/cachefile/out
Found 2 items
-rw-r--r-* 1 me supergroup        0 2013-11-14 17:00 /user/me/samples/cachefile/out/_SUCCESS
-rw-r--r-* 1 me supergroup       69 2013-11-14 17:00 /user/me/samples/cachefile/out/part-00000

$ hdfs dfs -cat /user/me/samples/cachefile/out/part-00000
This is just the cache string
This is just the second cache string

更多使用示例

Hadoop分区器类

Hadoop有一个库类KeyFieldBasedPartitioner,对许多应用程序非常有用。该类允许Map/Reduce框架基于特定的键字段(而非整个键)对映射输出进行分区。例如:

mapred streaming \
  -D stream.map.output.field.separator=. \
  -D stream.num.map.output.key.fields=4 \
  -D map.output.key.field.separator=. \
  -D mapreduce.partition.keypartitioner.options=-k1,2 \
  -D mapreduce.job.reduces=12 \
  -input myInputDirs \
  -output myOutputDir \
  -mapper /bin/cat \
  -reducer /bin/cat \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

这里,-D stream.map.output.field.separator=.-D stream.num.map.output.key.fields=4 与前面示例中的解释相同。这两个变量被streaming用来识别mapper的键/值对。

上述Map/Reduce作业的map输出键通常包含四个由"."分隔的字段。然而,Map/Reduce框架会使用-D mapred.text.key.partitioner.options=-k1,2选项根据键的前两个字段对map输出进行分区。此处,-D map.output.key.field.separator=.指定了分区使用的分隔符。这确保了所有键中前两个字段相同的键/值对都会被分配到同一个reducer。

这实际上等同于将前两个字段指定为主键,接下来的两个字段作为次键。主键用于分区,而主键和次键的组合则用于排序。 这里展示一个简单的示例:

map的输出(键)

11.12.1.2
11.14.2.3
11.11.4.1
11.12.1.1
11.14.2.2

分区为3个reducer(前2个字段用作分区键)

11.11.4.1
-----------
11.12.1.2
11.12.1.1
-----------
11.14.2.3
11.14.2.2

在每个分区内为reducer进行排序(所有4个字段都用于排序)

11.11.4.1
-----------
11.12.1.1
11.12.1.2
-----------
11.14.2.2
11.14.2.3

Hadoop Comparator 类

Hadoop有一个库类KeyFieldBasedComparator,对许多应用程序非常有用。该类提供了Unix/GNU排序工具的部分功能特性。例如:

mapred streaming \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
  -D stream.map.output.field.separator=. \
  -D stream.num.map.output.key.fields=4 \
  -D mapreduce.map.output.key.field.separator=. \
  -D mapreduce.partition.keycomparator.options=-k2,2nr \
  -D mapreduce.job.reduces=1 \
  -input myInputDirs \
  -output myOutputDir \
  -mapper /bin/cat \
  -reducer /bin/cat

上述Map/Reduce作业的map输出键通常包含四个由"."分隔的字段。然而,Map/Reduce框架会使用-D mapreduce.partition.keycomparator.options=-k2,2nr选项按键的第二个字段进行排序。其中,-n表示按数值排序,-r表示结果应倒序排列。下面是一个简单示例:

map的输出(键)

11.12.1.2
11.14.2.3
11.11.4.1
11.12.1.1
11.14.2.2

为reducer排序输出(其中第二个字段用于排序)

11.14.2.3
11.14.2.2
11.12.1.2
11.12.1.1
11.11.4.1

Hadoop聚合包

Hadoop有一个名为Aggregate的库包。Aggregate提供了一个特殊的reducer类和一个特殊的combiner类,以及一系列简单的聚合器,这些聚合器可以对一系列值执行"求和"、"最大值"、"最小值"等聚合操作。Aggregate允许您定义一个mapper插件类,该插件类将为mapper的每个输入键/值对生成"可聚合项"。combiner/reducer将通过调用适当的聚合器来聚合这些可聚合项。

要使用Aggregate,只需指定“-reducer aggregate”:

mapred streaming \
  -input myInputDirs \
  -output myOutputDir \
  -mapper myAggregatorForKeyCount.py \
  -reducer aggregate \
  -file myAggregatorForKeyCount.py

Python程序myAggregatorForKeyCount.py内容如下:

#!/usr/bin/python3

import sys

def generateLongCountToken(id):
    return "LongValueSum:" + id + "\t" + "1"

def main(argv):
    line = sys.stdin.readline()
    try:
        while line:
            line = line[:-1]
            fields = line.split("\t")
            print(generateLongCountToken(fields[0]))
            line = sys.stdin.readline()
    except "end of file":
        return None

if __name__ == "__main__":
     main(sys.argv)

Hadoop字段选择类

Hadoop有一个库类FieldSelectionMapReduce,它能像Unix的"cut"工具一样高效处理文本数据。该类中定义的map函数将每个输入键/值对视为字段列表。您可以指定字段分隔符(默认为制表符),选择任意字段列表作为map输出键,以及任意字段列表作为map输出值。同样地,该类中定义的reduce函数也将每个输入键/值对视为字段列表,您可以选择任意字段列表作为reduce输出键和reduce输出值。例如:

mapred streaming \
  -D mapreduce.map.output.key.field.separator=. \
  -D mapreduce.partition.keypartitioner.options=-k1,2 \
  -D mapreduce.fieldsel.data.field.separator=. \
  -D mapreduce.fieldsel.map.output.key.value.fields.spec=6,5,1-3:0- \
  -D mapreduce.fieldsel.reduce.output.key.value.fields.spec=0-2:5- \
  -D mapreduce.map.output.key.class=org.apache.hadoop.io.Text \
  -D mapreduce.job.reduces=12 \
  -input myInputDirs \
  -output myOutputDir \
  -mapper org.apache.hadoop.mapred.lib.FieldSelectionMapReduce \
  -reducer org.apache.hadoop.mapred.lib.FieldSelectionMapReduce \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

选项“-D mapreduce.fieldsel.map.output.key.value.fields.spec=6,5,1-3:0-”用于指定映射输出的键/值选择。键选择规范与值选择规范通过“:”分隔。在此情况下,映射输出键将包含字段6、5、1、2和3。映射输出值将包含所有字段(0-表示字段0及其后所有字段)。

选项“-D mapreduce.fieldsel.reduce.output.key.value.fields.spec=0-2:5-”指定了reduce输出的键/值选择。在这种情况下,reduce输出键将包含字段0、1、2(对应原始字段6、5、1)。reduce输出值将包含从字段5开始的所有字段(对应所有原始字段)。

常见问题

如何使用Hadoop Streaming运行一组任意的(半)独立任务?

通常你并不需要Map Reduce的全部功能,而只是需要运行同一程序的多个实例——要么处理数据的不同部分,要么使用不同参数处理相同数据。这时可以使用Hadoop Streaming来实现。

如何逐个处理文件,每个文件对应一个map任务?

举个例子,考虑在hadoop集群中对一组文件进行压缩(zip)的问题。你可以通过使用Hadoop Streaming和自定义映射脚本来实现:

  • 生成一个包含输入文件的完整HDFS路径的文件。每个map任务将获取一个文件名作为输入。

  • 创建一个映射器脚本,该脚本在给定文件名的情况下,会将文件获取到本地磁盘,对文件进行gzip压缩,并将其放回所需的输出目录中。

我应该使用多少个reducer?

详情请参阅MapReduce教程:Reducer

如果我在shell脚本中设置了别名,在-mapper之后还能生效吗?

例如,假设我执行:alias c1='cut -f1'。那么 -mapper "c1" 会生效吗?

使用别名将不起作用,但允许变量替换,如本示例所示:

$ hdfs dfs -cat /user/me/samples/student_marks
alice   50
bruce   70
charlie 80
dan     75

$ c2='cut -f2'; mapred streaming \
  -D mapreduce.job.name='Experiment' \
  -input /user/me/samples/student_marks \
  -output /user/me/samples/student_out \
  -mapper "$c2" -reducer 'cat'

$ hdfs dfs -cat /user/me/samples/student_out/part-00000
50
70
75
80

可以使用UNIX管道吗?

例如,命令 -mapper "cut -f1 | sed s/foo/bar/g" 能正常工作吗?

目前这无法正常工作,并出现“java.io.IOException: Broken pipe”错误。这很可能是一个需要调查的bug。

What do I do if I get the “No space left on device” error?

例如,当我通过-file选项分发大型可执行文件(例如3.6G)来运行流式作业时,会出现"设备上没有剩余空间"的错误。

jar包打包发生在由配置变量stream.tmpdir指向的目录中。stream.tmpdir的默认值为/tmp。建议将该值设置为具有更多空间的目录:

-D stream.tmpdir=/export/bigspace/…

如何指定多个输入目录?

您可以通过多个‘-input’选项指定多个输入目录:

mapred streaming \
  -input '/user/foo/dir1' -input '/user/foo/dir2' \
    (rest of the command)

如何生成gzip格式的输出文件?

除了纯文本文件外,您还可以生成gzip文件作为输出结果。在流式作业中传递以下选项:'-D mapreduce.output.fileoutputformat.compress=true -D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec'。

如何在使用流式处理时提供自定义的输入/输出格式?

您可以通过打包自定义类并将自定义jar包放入$HADOOP_CLASSPATH来指定自己的自定义类。

如何使用流式处理解析XML文档?

您可以使用记录读取器StreamXmlRecordReader来处理XML文档。

mapred streaming \
  -inputreader "StreamXmlRecord,begin=BEGIN_STRING,end=END_STRING" \
    (rest of the command)

在BEGIN_STRING和END_STRING之间找到的任何内容都将被视为map任务的一条记录。

StreamXmlRecordReader 理解的键值属性包括:

  • (字符串) 'begin' - 标记记录开头的字符,'end' - 标记记录结尾的字符。
  • (boolean) 'slowmatch' - 切换为在CDATA而非常规标签内查找起始和结束字符。默认为false。
  • (整数) 'lookahead' - 使用'slowmatch'时同步CDATA的最大前瞻字节数,应大于'maxrec'。默认为2*'maxrec'。
  • (整数) 'maxrec' - 在'slowmatch'期间每次匹配之间读取的最大记录大小。默认为50000字节。

如何在流式应用程序中更新计数器?

流式处理进程可以使用stderr来发送计数器信息。应将reporter:counter:,,发送到stderr以更新计数器。

如何在流式应用中更新状态?

流式处理进程可以使用stderr来发送状态信息。要设置状态,应将reporter:status:发送到stderr。

如何在流式作业的mapper/reducer中获取Job变量?

请参阅配置参数。在流式作业执行期间,"mapred"参数的名称会被转换。点号(.)会变为下划线(_)。例如,mapreduce.job.id变为mapreduce_job_id,mapreduce.job.jar变为mapreduce_job_jar。在代码中请使用带下划线的参数名称。

如果遇到“error=7, 参数列表过长”错误该怎么办

该任务会将整个配置复制到环境中。如果任务正在处理大量输入文件,将任务配置添加到环境中可能会导致环境溢出。环境中的任务配置副本对于运行任务并非必需,可以通过以下设置进行截断:

-D stream.jobconf.truncate.limit=20000

默认情况下,数值不会被截断(-1)。零值(0)仅复制名称而不复制数值。在绝大多数场景中,20000是一个能防止环境变量溢出的安全值。