Mars 文档#
Mars是一个基于张量的统一框架,用于大规模数据计算,它扩展了numpy、pandas、scikit-learn和许多其他库。
架构概述#
开始使用#
通过以下方式在本地启动新的运行时:
>>> import mars
>>> mars.new_session()
或者连接到一个已经初始化的Mars集群。
>>> import mars
>>> mars.new_session('http://<web_ip>:<ui_port>')
Mars 张量#
Mars tensor 提供了一个类似于 Numpy 的熟悉接口。
Numpy |
Mars 张量 |
import numpy as np
N = 200_000_000
a = np.random.uniform(-1, 1, size=(N, 2))
print((np.linalg.norm(a, axis=1) < 1)
.sum() * 4 / N)
|
import mars.tensor as mt
N = 200_000_000
a = mt.random.uniform(-1, 1, size=(N, 2))
print(((mt.linalg.norm(a, axis=1) < 1)
.sum() * 4 / N).execute())
|
3.14174502
CPU 时间: 用户 11.6 秒, 系统: 8.22 秒,
总计: 19.9 秒
实际 时间: 22.5 秒
|
3.14161908
CPU 时间: 用户 966 毫秒, 系统: 544 毫秒,
总计: 1.51 秒
实际 时间: 3.77 秒
|
Mars 可以利用多个核心,甚至在笔记本电脑上,在分布式环境下可能会更快。
Mars 数据框#
Mars DataFrame 提供了一个类似于 pandas 的熟悉接口。
熊猫 |
Mars 数据框 |
import numpy as np
import pandas as pd
df = pd.DataFrame(
np.random.rand(100000000, 4),
columns=list('abcd'))
print(df.sum())
|
import mars.tensor as mt
import mars.dataframe as md
df = md.DataFrame(
mt.random.rand(100000000, 4),
columns=list('abcd'))
print(df.sum().execute())
|
CPU 时间: 用户 10.9 秒, 系统: 2.69 秒,
总计: 13.6 秒
实际 时间: 11 秒
|
CPU 时间: 用户 1.21 秒, 系统: 212 毫秒,
总计: 1.42 秒
实际 时间: 2.75 秒
|
Mars 学习#
Mars learn提供了一个类似于scikit-learn的熟悉界面。
Scikit-learn |
Mars 学习 |
from sklearn.datasets import make_blobs
from sklearn.decomposition import PCA
X, y = make_blobs(
n_samples=100000000, n_features=3,
centers=[[3, 3, 3], [0, 0, 0],
[1, 1, 1], [2, 2, 2]],
cluster_std=[0.2, 0.1, 0.2, 0.2],
random_state=9)
pca = PCA(n_components=3)
pca.fit(X)
print(pca.explained_variance_ratio_)
print(pca.explained_variance_)
|
from mars.learn.datasets import make_blobs
from mars.learn.decomposition import PCA
X, y = make_blobs(
n_samples=100000000, n_features=3,
centers=[[3, 3, 3], [0, 0, 0],
[1, 1, 1], [2, 2, 2]],
cluster_std=[0.2, 0.1, 0.2, 0.2],
random_state=9)
pca = PCA(n_components=3)
pca.fit(X)
print(pca.explained_variance_ratio_)
print(pca.explained_variance_)
|
Mars学习还与许多库集成:
Mars 远程#
Mars远程允许用户并行执行函数。
import numpy as np
def calc_chunk(n, i):
rs = np.random.RandomState(i)
a = rs.uniform(-1, 1, size=(n, 2))
d = np.linalg.norm(a, axis=1)
return (d < 1).sum()
def calc_pi(fs, N):
return sum(fs) * 4 / N
N = 200_000_000
n = 10_000_000
fs = [calc_chunk(n, i)
for i in range(N // n)]
pi = calc_pi(fs, N)
print(pi)
|
import numpy as np
import mars.remote as mr
def calc_chunk(n, i):
rs = np.random.RandomState(i)
a = rs.uniform(-1, 1, size=(n, 2))
d = np.linalg.norm(a, axis=1)
return (d < 1).sum()
def calc_pi(fs, N):
return sum(fs) * 4 / N
N = 200_000_000
n = 10_000_000
fs = [mr.spawn(calc_chunk, args=(n, i))
for i in range(N // n)]
pi = mr.spawn(calc_pi, args=(fs, N))
print(pi.execute().fetch())
|
3.1416312
CPU 时间: 用户 32.2 秒, 系统: 4.86 秒,
总计: 37.1 秒
墙 时间: 12.4 秒
|
3.1416312
CPU 时间: 用户 616 毫秒, 系统: 307 毫秒,
总计: 923 毫秒
墙 时间: 3.99 秒
|
Mars上的DASK#
参考Mars上的DASK。
Mars在Ray上#
请参阅 Mars on Ray。
易于扩展和扩展#
Mars可以缩放到单台机器,也可以扩展到数百台机器的集群。 本地和分布式版本共享同一段代码,从单台机器迁移到集群相对简单,以处理更多数据或获得更好的性能。
Mars 可以以几种方式运行: