Hadoop: 可插拔的Shuffle与可插拔的Sort

简介

可插拔的shuffle和可插拔的排序功能允许用替代实现来替换内置的shuffle和排序逻辑。该功能的典型用例包括:使用HTTP以外的不同应用协议(如RDMA)将数据从Map节点shuffle到Reducer节点;或者用支持哈希聚合和Limit-N查询的自定义算法来替换排序逻辑。

重要提示:可插拔的shuffle和可插拔的排序功能目前处于实验阶段且不稳定。这意味着所提供的API可能会在Hadoop的未来版本中发生变更并破坏兼容性。

实现自定义Shuffle和自定义Sort

自定义shuffle实现需要在NodeManager中运行一个org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices.AuxiliaryService实现类,并在Reducer任务中运行一个org.apache.hadoop.mapred.ShuffleConsumerPlugin实现类。

Hadoop提供的默认实现可用作参考:

  • org.apache.hadoop.mapred.ShuffleHandler
  • org.apache.hadoop.mapreduce.task.reduce.Shuffle

自定义排序实现需要在Mapper任务中运行一个org.apache.hadoop.mapred.MapOutputCollector实现类,以及(可选地,取决于排序实现)在Reducer任务中运行一个org.apache.hadoop.mapred.ShuffleConsumerPlugin实现类。

Hadoop提供的默认实现可以用作参考:

  • org.apache.hadoop.mapred.MapTask$MapOutputBuffer
  • org.apache.hadoop.mapreduce.task.reduce.Shuffle

配置

除了在NodeManager中运行以服务shuffle的辅助服务(默认为ShuffleHandler),所有可插拔组件都在作业任务中运行。这意味着它们可以基于每个作业进行配置。为Shuffle提供服务的辅助服务必须在NodeManager的配置中进行设置。

作业配置属性(基于每个作业):

属性 默认值 说明
mapreduce.job.reduce.shuffle.consumer.plugin.class org.apache.hadoop.mapreduce.task.reduce.Shuffle The ShuffleConsumerPlugin implementation to use
mapreduce.job.map.output.collector.class org.apache.hadoop.mapred.MapTask$MapOutputBuffer The MapOutputCollector implementation(s) to use

这些属性也可以在mapred-site.xml中设置,以更改所有作业的默认值。

收集器类配置可以指定一个以逗号分隔的收集器实现列表。在这种情况下,map任务将依次尝试实例化每个实现,直到其中一个实现成功初始化。例如,如果某个给定的收集器实现仅兼容特定类型的键或值,这种配置方式会非常有用。

NodeManager 配置属性,所有节点中的 yarn-site.xml

有两种方式可以配置辅助服务:通过清单文件或通过配置(旧方法)。如果使用清单文件,则不会从配置中读取辅助服务的配置。

如果使用清单文件,必须通过在yarn-site.xml中将属性yarn.nodemanager.aux-services.manifest.enabled设置为true来启用该功能。文件路径可以在yarn-site.xml中通过属性yarn.nodemanager.aux-services.manifest设置,或者通过向端点http://nm-http-address:port/ws/v1/node/auxiliaryservices发送PUT请求将文件发送到每个节点管理器。如果在配置中设置了文件路径,节点管理器将按照yarn.nodemanager.aux-services.manifest.reload-ms指定的间隔(默认为0;设置间隔<=0表示不会自动重新加载)检查该文件是否有新修改。

否则,请设置以下属性以通过配置来配置辅助服务。

属性 默认值 说明
yarn.nodemanager.aux-services ...,mapreduce_shuffle The auxiliary service name
yarn.nodemanager.aux-services.mapreduce_shuffle.class org.apache.hadoop.mapred.ShuffleHandler The auxiliary service class to use
yarn.nodemanager.aux-services.%s.classpath NONE local directory which includes the related jar file as well as all the dependencies’ jar file. We could specify the single jar file or use /dep/* to load all jars under the dep directory.
yarn.nodemanager.aux-services.%s.remote-classpath NONE The remote absolute or relative path to jar file

从HDFS加载jar文件的示例:

使用清单:

{
  "services": [
    {
      "name": "mapreduce_shuffle",
      "version": "1",
      "configuration": {
        "properties": {
          "class.name": "org.apache.hadoop.mapred.ShuffleHandler"
        }
      }
    },
    {
      "name": "AuxServiceFromHDFS",
      "version": "1",
      "configuration": {
        "properties": {
          "class.name": "org.apache.auxtest.AuxServiceFromHDFS2"
        },
        "files": [
          {
            "src_file": "hdfs:///aux/test/aux-service-hdfs.jar",
            "type": "STATIC"
          }
        ]
      }
    }
  ]
}

或者使用配置:

<configuration>
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle,AuxServiceFromHDFS</value>
    </property>

    <property>
        <name>yarn.nodemanager.aux-services.AuxServiceFromHDFS.remote-classpath</name>
        <value>/aux/test/aux-service-hdfs.jar</value>
    </property>

    <property>
        <name>yarn.nodemanager.aux-services.AuxServiceFromHDFS.class</name>
        <value>org.apache.auxtest.AuxServiceFromHDFS2</value>
    </property>
</configuration>

从本地文件系统加载jar文件的示例:

使用清单:

{
  "services": [
    {
      "name": "mapreduce_shuffle",
      "version": "1",
      "configuration": {
        "properties": {
          "class.name": "org.apache.hadoop.mapred.ShuffleHandler"
        }
      }
    },
    {
      "name": "AuxServiceFromHDFS",
      "version": "1",
      "configuration": {
        "properties": {
          "class.name": "org.apache.auxtest.AuxServiceFromHDFS2"
        },
        "files": [
          {
            "src_file": "file:///aux/test/aux-service-hdfs.jar",
            "type": "STATIC"
          }
        ]
      }
    }
  ]
}

或者使用配置:

<configuration>
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle,AuxServiceFromHDFS</value>
    </property>

    <property>
        <name>yarn.nodemanager.aux-services.AuxServiceFromHDFS.classpath</name>
        <value>/aux/test/aux-service-hdfs.jar</value>
    </property>

    <property>
        <name>yarn.nodemanager.aux-services.AuxServiceFromHDFS.class</name>
        <value>org.apache.auxtest.AuxServiceFromHDFS2</value>
    </property>
</configuration>

重要提示: 如果除了默认的mapreduce_shuffle服务外还需要设置辅助服务,则应在yarn.nodemanager.aux-services属性中添加新的服务键,例如mapred.shufflex。然后定义对应类的属性必须为yarn.nodemanager.aux-services.mapreduce_shufflex.class。另外,如果使用辅助服务清单文件,则应将服务添加到服务列表中。