在Ray上运行#
Mars 还与 Ray 深度集成,可以高效原生地运行在 Ray 上。
基本步骤#
在本地安装Ray:
pip install ray
(可选)启动一个Ray集群或Mars自动启动一个Ray集群:
import ray
ray.init()
(可选) 或使用 Ray client 连接到现有的 Ray 集群:
import ray
ray.init(address='ray://<head_node_host>:10001')
在Ray集群中创建一个Mars运行时并进行计算:
import mars
import mars.tensor as mt
import mars.dataframe as md
# This driver is the Mars supervisor.
session = mars.new_session(backend='ray')
mt.random.RandomState(0).rand(1000_0000, 5).sum().execute()
df = md.DataFrame(
mt.random.rand(1000_0000, 4, chunk_size=500_0000),
columns=list('abcd'))
print(df.sum().execute())
print(df.describe().execute())
# Convert mars dataframe to ray dataset
ds = md.to_ray_dataset(df)
print(ds.schema(), ds.count())
ds.filter(lambda row: row['a'] > 0.5).show(5)
# Convert ray dataset to mars dataframe
df2 = md.read_ray_dataset(ds)
print(df2.head(5).execute())
停止在Ray运行时创建的Mars:
session.stop_server()
自定义集群#
有两种方法可以初始化Mars on Ray会话:
- mars.new_session(…) # 在当前进程中启动Mars主管。
建议用于大多数使用案例。
- mars.new_ray_session(…) # 为Mars主管启动一个Ray actor。
推荐用于大规模计算或通过Ray客户端进行计算。
为Mars主管启动一个Ray演员:
import mars
# Start a Ray actor for Mars supervisor.
session = mars.new_ray_session(backend='ray')
连接到创建的Mars在Ray运行时并进行计算,监管器虚拟地址是Mars监管器的Ray演员名称, 例如 ray://ray-cluster-1672904753/0/0。
import mars
import mars.tensor as mt
# Be aware that `mars.new_ray_session()` connects to an existing Mars
# cluster requires Ray runtime.
# e.g. Current process is a initialized Ray driver, client or worker.
session = mars.new_ray_session(
address='ray://<supervisor virtual address>',
session_id='abcd',
backend='ray',
default=True)
session.execute(mt.random.RandomState(0).rand(100, 5).sum())
该new_ray_session函数提供了几个关键字参数,以供用户定义集群。
监督者的参数:
参数 |
描述 |
|---|---|
supervisor_cpu |
监督者的CPU数量,默认值为1。 |
supervisor_mem |
监控器的内存大小(以字节为单位),默认1G。 |
工人的参数:
参数 |
描述 |
|---|---|
worker_cpu |
每个工作线程的CPU数量,默认为2。 |
worker_mem |
工作线程的内存大小,以字节为单位,默认为2G。 |
例如,如果您想创建一个带有独立监控器的Mars集群,可以使用下面的代码(在这个例子中,一个Ray节点总共有16个CPU):
import mars
session = mars.new_ray_session(supervisor_cpu=16)