可插拔的shuffle和可插拔的排序功能允许用替代实现来替换内置的shuffle和排序逻辑。该功能的典型用例包括:使用HTTP以外的不同应用协议(如RDMA)将数据从Map节点shuffle到Reducer节点;或者用支持哈希聚合和Limit-N查询的自定义算法来替换排序逻辑。
重要提示:可插拔的shuffle和可插拔的排序功能目前处于实验阶段且不稳定。这意味着所提供的API可能会在Hadoop的未来版本中发生变更并破坏兼容性。
自定义shuffle实现需要在NodeManager中运行一个org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices.AuxiliaryService
实现类,并在Reducer任务中运行一个org.apache.hadoop.mapred.ShuffleConsumerPlugin
实现类。
Hadoop提供的默认实现可用作参考:
org.apache.hadoop.mapred.ShuffleHandler
org.apache.hadoop.mapreduce.task.reduce.Shuffle
自定义排序实现需要在Mapper任务中运行一个org.apache.hadoop.mapred.MapOutputCollector
实现类,以及(可选地,取决于排序实现)在Reducer任务中运行一个org.apache.hadoop.mapred.ShuffleConsumerPlugin
实现类。
Hadoop提供的默认实现可以用作参考:
org.apache.hadoop.mapred.MapTask$MapOutputBuffer
org.apache.hadoop.mapreduce.task.reduce.Shuffle
除了在NodeManager中运行以服务shuffle的辅助服务(默认为ShuffleHandler
),所有可插拔组件都在作业任务中运行。这意味着它们可以基于每个作业进行配置。为Shuffle提供服务的辅助服务必须在NodeManager的配置中进行设置。
属性 | 默认值 | 说明 |
---|---|---|
mapreduce.job.reduce.shuffle.consumer.plugin.class |
org.apache.hadoop.mapreduce.task.reduce.Shuffle |
The ShuffleConsumerPlugin implementation to use |
mapreduce.job.map.output.collector.class |
org.apache.hadoop.mapred.MapTask$MapOutputBuffer |
The MapOutputCollector implementation(s) to use |
这些属性也可以在mapred-site.xml
中设置,以更改所有作业的默认值。
收集器类配置可以指定一个以逗号分隔的收集器实现列表。在这种情况下,map任务将依次尝试实例化每个实现,直到其中一个实现成功初始化。例如,如果某个给定的收集器实现仅兼容特定类型的键或值,这种配置方式会非常有用。
yarn-site.xml
:有两种方式可以配置辅助服务:通过清单文件或通过配置(旧方法)。如果使用清单文件,则不会从配置中读取辅助服务的配置。
如果使用清单文件,必须通过在yarn-site.xml中将属性yarn.nodemanager.aux-services.manifest.enabled
设置为true来启用该功能。文件路径可以在yarn-site.xml中通过属性yarn.nodemanager.aux-services.manifest
设置,或者通过向端点http://nm-http-address:port/ws/v1/node/auxiliaryservices
发送PUT请求将文件发送到每个节点管理器。如果在配置中设置了文件路径,节点管理器将按照yarn.nodemanager.aux-services.manifest.reload-ms
指定的间隔(默认为0;设置间隔<=0表示不会自动重新加载)检查该文件是否有新修改。
否则,请设置以下属性以通过配置来配置辅助服务。
属性 | 默认值 | 说明 |
---|---|---|
yarn.nodemanager.aux-services |
...,mapreduce_shuffle |
The auxiliary service name |
yarn.nodemanager.aux-services.mapreduce_shuffle.class |
org.apache.hadoop.mapred.ShuffleHandler |
The auxiliary service class to use |
yarn.nodemanager.aux-services.%s.classpath |
NONE | local directory which includes the related jar file as well as all the dependencies’ jar file. We could specify the single jar file or use /dep/* to load all jars under the dep directory. |
yarn.nodemanager.aux-services.%s.remote-classpath |
NONE | The remote absolute or relative path to jar file |
使用清单:
{ "services": [ { "name": "mapreduce_shuffle", "version": "1", "configuration": { "properties": { "class.name": "org.apache.hadoop.mapred.ShuffleHandler" } } }, { "name": "AuxServiceFromHDFS", "version": "1", "configuration": { "properties": { "class.name": "org.apache.auxtest.AuxServiceFromHDFS2" }, "files": [ { "src_file": "hdfs:///aux/test/aux-service-hdfs.jar", "type": "STATIC" } ] } } ] }
或者使用配置:
<configuration> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle,AuxServiceFromHDFS</value> </property> <property> <name>yarn.nodemanager.aux-services.AuxServiceFromHDFS.remote-classpath</name> <value>/aux/test/aux-service-hdfs.jar</value> </property> <property> <name>yarn.nodemanager.aux-services.AuxServiceFromHDFS.class</name> <value>org.apache.auxtest.AuxServiceFromHDFS2</value> </property> </configuration>
使用清单:
{ "services": [ { "name": "mapreduce_shuffle", "version": "1", "configuration": { "properties": { "class.name": "org.apache.hadoop.mapred.ShuffleHandler" } } }, { "name": "AuxServiceFromHDFS", "version": "1", "configuration": { "properties": { "class.name": "org.apache.auxtest.AuxServiceFromHDFS2" }, "files": [ { "src_file": "file:///aux/test/aux-service-hdfs.jar", "type": "STATIC" } ] } } ] }
或者使用配置:
<configuration> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle,AuxServiceFromHDFS</value> </property> <property> <name>yarn.nodemanager.aux-services.AuxServiceFromHDFS.classpath</name> <value>/aux/test/aux-service-hdfs.jar</value> </property> <property> <name>yarn.nodemanager.aux-services.AuxServiceFromHDFS.class</name> <value>org.apache.auxtest.AuxServiceFromHDFS2</value> </property> </configuration>
重要提示: 如果除了默认的mapreduce_shuffle
服务外还需要设置辅助服务,则应在yarn.nodemanager.aux-services
属性中添加新的服务键,例如mapred.shufflex
。然后定义对应类的属性必须为yarn.nodemanager.aux-services.mapreduce_shufflex.class
。另外,如果使用辅助服务清单文件,则应将服务添加到服务列表中。