在YARN上运行#

Mars可以在 YARN 集群上部署。您可以使用 mars.deploy.yarn 在Hadoop环境中启动Mars集群。

基本步骤#

Mars使用Skein将自身部署到YARN集群中。这个库连接了YARN应用程序的Java接口和Python接口。

在YARN中启动Mars之前,您需要先检查您的环境。由于Skein仅支持Linux,您需要在Linux客户端上工作,否则您需要自己修复和编译多个软件包。客户端还需要Skein库。您可以使用conda安装Skein。

conda install -c conda-forge skein

或使用pip安装

pip install skein

然后你需要检查你集群内的Python环境。如果在你的YARN节点上安装了Python环境,并且所有必要的包都已安装,这将为你启动集群节省很多时间。否则,你需要打包你的本地环境并将其指定给Mars。

当您使用 Conda 时,可以使用 conda-pack 打包您的环境:

conda activate local-env
conda install -c conda-forge conda-pack
conda-pack

或者在使用虚拟环境时使用 venv-pack 来打包您的环境:

source local-env/bin/activate
pip install venv-pack
venv-pack

这两个命令将创建一个 tar.gz 归档文件,你可以在部署你的 Mars 集群时使用它。

然后是时候启动你的Mars集群。当你从现有的conda环境、虚拟环境、Python可执行文件或预打包的环境归档启动时,选择不同的选项:

import os
from mars.deploy.yarn import new_cluster

# specify location of Hadoop and JDK on client side
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-1.8.0-openjdk'
os.environ['HADOOP_HOME'] = '/usr/local/hadoop'
os.environ['PATH'] = '/usr/local/hadoop:' + os.environ['PATH']

# use a conda environment at /path/to/remote/conda/env
cluster = new_cluster(environment='conda:///path/to/remote/conda/env')

# use a virtual environment at /path/to/remote/virtual/env
cluster = new_cluster(environment='venv:///path/to/remote/virtual/env')

# use a remote python executable
cluster = new_cluster(environment='python:///path/to/remote/python')

# use a local packed environment archive
cluster = new_cluster(environment='path/to/local/env/pack.tar.gz')

# get web endpoint, may be used elsewhere
print(cluster.session.endpoint)

# new cluster will start a session and set it as default one
# execute will then run in the local cluster
a = mt.random.rand(10, 10)
a.dot(a.T).execute()

# after all jobs executed, you can turn off the cluster
cluster.stop()

自定义集群#

new_cluster 函数提供了几个关键字参数供用户定义集群。您可以使用参数 app_name 来自定义 Yarn 应用程序的名称,或者使用参数 timeout 来指定集群创建的超时时间。扩展集群的参数也可用。

监督者的参数:

参数

描述

supervisor_num

集群中的主管数量,默认值为1

supervisor_cpu

每个监督者的中央处理器数量

supervisor_mem

集群中监督者的内存大小,以字节或大小单位表示,例如 1g

supervisor_extra_env

要在监督者中设置的环境变量字典

工人的参数:

参数

描述

worker_num

集群中的工作节点数量,默认为1

worker_cpu

每个工作线程的CPU数量,必填。

worker_mem

集群中工作节点的内存大小,单位为字节或大小单位,如 1g,是必需的。

worker_spill_paths

工作节点在主机上的溢出路径列表

worker_cache_mem

每个工作线程的共享内存的大小或比例。有关Mars工作线程的内存管理的详细信息,请参阅内存调优部分。

最小工作线程数

返回 new_cluster 所需的最小就绪工作线程数,默认值为 worker_num

worker_extra_env

在工作进程中设置的环境变量字典。

例如,如果您想创建一个Mars集群,其中有1个主管和100个工作节点,每个工作节点有4个核心和16GB内存,并在95个工作节点准备好时停止等待,您可以使用下面的代码:

import os
from mars.deploy.yarn import new_cluster

os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-1.8.0-openjdk'
os.environ['HADOOP_HOME'] = '/usr/local/hadoop'

cluster = new_cluster('path/to/env/pack.tar.gz', supervisor_num=1, web_num=1,
                      worker_num=100, worker_cpu=4, worker_mem='16g',
                      min_worker_num=95)