用户指南#

注意

版本 0.4.1 的新功能

Mars远程API提供了一种简单而强大的方式来并行运行Python函数。

Mars远程的主要API是 mars.remote.spawn()。它返回一个Mars对象,而尚未发生任何执行。当 .execute() 被调用时,已生成的函数将被提交给Mars进行执行,因此如果多个已生成的函数一起执行,它们可能会并行运行。

>>> import mars.remote as mr
>>> def inc(x):
>>>     return x + 1
>>>
>>> result = mr.spawn(inc, args=(0,))
>>> result
Object <op=RemoteFunction, key=e0b31261d70dd9b1e00da469666d72d9>
>>> result.execute().fetch()
1

生成的函数列表可以转换为 mars.remote.ExecutableTuple,并且可以调用.execute()来一起运行 这些函数。

>>> results = [mr.spawn(inc, args=(i,)) for i in range(10)]
>>> mr.ExecutableTuple(results).execute().fetch()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Mars通过 mars.remote.spawn() 返回的对象可以作为其他生成函数的参数处理。

>>> results = [mr.spawn(inc, args=(i,)) for i in range(10)]   # list of spawned functions
>>> def sum_all(xs):
        return sum(xs)
>>> mr.spawn(sum_all, args=(results,)).execute().fetch()
55

Mars确保只有在前10个inc调用完成后,才能调用sum_all。用户无需担心依赖的数据,例如,当sum_all被调用时,参数xs已被前一个inc函数的真实输出替换。

对于分布式设置,10 inc 函数可以分配给不同的 工作者。用户无需关心函数是如何分配的,以及生成的函数的输出是如何在工作者之间移动的。

用户还可以在已生成的函数内部生成新的函数。

>>> def driver():
>>>     results = [mr.spawn(inc, args=(i,)) for i in range(10)]
>>>     return mr.ExecutableTuple(results).execute().fetch()
>>>
>>> mr.spawn(driver).execute().fetch()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Mars张量、数据框等也可以在生成的函数中使用。

>>> import mars.tensor as mt
>>> def driver2():
>>>     t = mt.random.rand(10, 10)
>>>     return t.sum().to_numpy()
>>>
>>> mr.spawn(driver2).execute().fetch()
52.47844223908132

参数 n_output 可以指示生成的函数将返回的输出数量。这在将不同的输出传递给不同的函数时很重要。

>>> def triage(alist):
>>>     ret = [], []
>>>     for i in alist:
>>>         if i < 0.5:
>>>             ret[0].append(i)
>>>         else:
>>>             ret[1].append(i)
>>>     return ret
>>>
>>> def sum_all(xs):
>>>     return sum(xs)
>>>
>>> l = [0.4, 0.7, 0.2, 0.8]
>>> la, lb = mr.spawn(triage, args=(l,), n_output=2)
>>>
>>> sa = mr.spawn(sum_all, args=(la,))
>>> sb = mr.spawn(sum_all, args=(lb,))
>>> mr.ExecutableTuple([sa, sb]).execute().fetch()
>>> [0.6000000000000001, 1.5]