光线生成器#
Python 生成器 是行为类似于迭代器的函数,每次迭代产生一个值。Ray 也支持生成器 API。
任何用 ray.remote 装饰的生成器函数都变成一个 Ray 生成器任务。生成器任务在任务完成前将输出流回给调用者。
+import ray
import time
# Takes 25 seconds to finish.
+@ray.remote
def f():
for i in range(5):
time.sleep(5)
yield i
-for obj in f():
+for obj_ref in f.remote():
# Prints every 5 seconds and stops after 25 seconds.
- print(obj)
+ print(ray.get(obj_ref))
上述 Ray 生成器每 5 秒产生一次输出,共 5 次。使用普通的 Ray 任务,你必须等待 25 秒才能访问输出。使用 Ray 生成器,调用者可以在任务 f 完成之前访问对象引用。
Ray 生成器在以下情况中非常有用
您希望通过在任务完成前产生输出并进行垃圾回收(GC)来减少堆内存或对象存储内存的使用。
你熟悉Python生成器,并希望找到等效的编程模型。
Ray 库使用 Ray 生成器来支持流式使用场景
Ray 生成器与现有的 Ray API 无缝协作
你可以在actor任务和非actor任务中使用Ray生成器。
光线生成器与内置的 容错功能 如重试或血统重建一起工作。
Ray 生成器与 Ray API 如 ray.wait、ray.cancel 等一起工作。
入门#
定义一个 Python 生成器函数并用 ray.remote 装饰它以创建一个 Ray 生成器。
import ray
import time
@ray.remote
def task():
for i in range(5):
time.sleep(5)
yield i
Ray 生成器任务返回一个 ObjectRefGenerator 对象,该对象兼容生成器和异步生成器 API。你可以从该类访问 next、__iter__、__anext__、__aiter__ API。
每当任务调用 yield 时,相应的输出会作为 Ray 对象引用从生成器中准备好并可用。你可以调用 next(gen) 来获取一个对象引用。如果 next 没有更多项可以生成,它会引发 StopIteration。如果 __anext__ 没有更多项可以生成,它会引发 StopAsyncIteration。
next API 会阻塞线程,直到任务通过 yield 生成下一个对象引用。由于 ObjectRefGenerator 只是一个 Python 生成器,你也可以使用 for 循环来迭代对象引用。
如果你想避免阻塞线程,你可以使用 asyncio 或者 ray.wait API。
gen = task.remote()
# Blocks for 5 seconds.
ref = next(gen)
# return 0
ray.get(ref)
# Blocks for 5 seconds.
ref = next(gen)
# Return 1
ray.get(ref)
# Returns 2~4 every 5 seconds.
for ref in gen:
print(ray.get(ref))
备注
对于一个普通的 Python 生成器,生成器函数在调用生成器的 next 函数时暂停和恢复。Ray 会急切地执行生成器任务直到完成,无论调用者是否正在轮询部分结果。
错误处理#
如果生成器任务失败(由于应用程序异常或系统错误,如意外的节点故障),next(gen) 返回一个包含异常的对象引用。当你调用 ray.get 时,Ray 会引发该异常。
@ray.remote
def task():
for i in range(5):
time.sleep(1)
if i == 1:
raise ValueError
yield i
gen = task.remote()
# it's okay.
ray.get(next(gen))
# Raises an exception
try:
ray.get(next(gen))
except ValueError as e:
print(f"Exception is raised when i == 1 as expected {e}")
在上面的例子中,如果一个应用程序未能完成任务,Ray 会按正确的顺序返回带有异常的对象引用。例如,如果 Ray 在第二次 yield 后引发异常,第三次 next(gen) 将始终返回一个带有异常的对象引用。如果系统错误导致任务失败(例如,节点故障或工作进程故障),next(gen) 将随时返回包含系统级异常的对象引用,而无需保证顺序。这意味着当你有 N 次 yield 时,生成器可以在发生故障时创建从 1 到 N + 1 个对象引用(N 个输出 + 带有系统级异常的引用)。
从Actor任务生成的生成器#
Ray 生成器兼容 所有执行模型。它可以无缝地与常规执行器、异步执行器 和 线程化执行器 一起工作。
@ray.remote
class Actor:
def f(self):
for i in range(5):
yield i
@ray.remote
class AsyncActor:
async def f(self):
for i in range(5):
yield i
@ray.remote(max_concurrency=5)
class ThreadedActor:
def f(self):
for i in range(5):
yield i
actor = Actor.remote()
for ref in actor.f.remote():
print(ray.get(ref))
actor = AsyncActor.remote()
for ref in actor.f.remote():
print(ray.get(ref))
actor = ThreadedActor.remote()
for ref in actor.f.remote():
print(ray.get(ref))
使用 Ray 生成器与 asyncio#
返回的 ObjectRefGenerator 也兼容 asyncio。你可以使用 __anext__ 或 async for 循环。
import asyncio
@ray.remote
def task():
for i in range(5):
time.sleep(1)
yield i
async def main():
async for ref in task.remote():
print(await ref)
asyncio.run(main())
对象引用的垃圾回收#
从 next(generator) 返回的 ref 只是一个普通的 Ray 对象引用,并且以相同的方式进行分布式引用计数。如果引用没有通过 next API 从生成器中消费,当生成器被垃圾回收(GC)时,这些引用将被垃圾回收。
@ray.remote
def task():
for i in range(5):
time.sleep(1)
yield i
gen = task.remote()
ref1 = next(gen)
del gen
在下面的例子中,Ray 将 ref1 视为一个普通的 Ray 对象引用,在 Ray 返回它之后。其他未通过 next(gen) 消耗的引用在生成器被垃圾回收时会被移除。在这个例子中,垃圾回收发生在您调用 del gen 时。
容错#
容错功能 适用于 Ray 生成器任务和参与者任务。例如;
任务容错功能:
max_retries,retry_exceptionsActor 容错特性:
max_restarts,max_task_retries对象容错功能: 对象重建
取消#
The ray.cancel() 函数同时适用于 Ray 生成器任务和 actor 任务。从语义上讲,取消生成器任务与取消常规任务没有区别。当你取消一个任务时,next(gen) 可以返回包含 TaskCancelledError 的引用,而没有任何特殊的顺序保证。
如何在不阻塞线程的情况下等待生成器(兼容 ray.wait 和 ray.get)#
当使用生成器时,next API 会阻塞其线程,直到下一个对象引用可用。然而,您可能并不总是希望这种行为。您可能希望在不阻塞线程的情况下等待生成器。使用 Ray 生成器可以通过以下方式实现非阻塞等待:
等待生成器任务完成
ObjectRefGenerator 有一个 API completed。它返回一个对象引用,该引用在生成器任务完成或出错时可用。例如,你可以执行 ray.get(<generator_instance>.completed()) 来等待任务完成。注意,不允许使用 ray.get 来获取 ObjectRefGenerator。
使用 asyncio 和 await
ObjectRefGenerator 与 asyncio 兼容。你可以创建多个 asyncio 任务来创建生成器任务并等待它,以避免阻塞线程。
import asyncio
@ray.remote
def task():
for i in range(5):
time.sleep(1)
yield i
async def async_task():
async for ref in task.remote():
print(await ref)
async def main():
t1 = async_task()
t2 = async_task()
await asyncio.gather(t1, t2)
asyncio.run(main())
使用 ray.wait
你可以将 ObjectRefGenerator 作为输入传递给 ray.wait。如果 下一个项目 可用,则生成器是“就绪”的。一旦 Ray 从就绪列表中找到,next(gen) 将立即返回下一个对象引用,而不会阻塞。有关更多详细信息,请参见下面的示例。
@ray.remote
def task():
for i in range(5):
time.sleep(5)
yield i
gen = task.remote()
# Because it takes 5 seconds to make the first yield,
# with 0 timeout, the generator is unready.
ready, unready = ray.wait([gen], timeout=0)
print("timeout 0, nothing is ready.")
print(ready)
assert len(ready) == 0
assert len(unready) == 1
# Without a timeout argument, ray.wait waits until the given argument
# is ready. When a next item is ready, it returns.
ready, unready = ray.wait([gen])
print("Wait for 5 seconds. The next item is ready.")
assert len(ready) == 1
assert len(unready) == 0
next(gen)
# Because the second yield hasn't happened yet,
ready, unready = ray.wait([gen], timeout=0)
print("Wait for 0 seconds. The next item is not ready.")
print(ready, unready)
assert len(ready) == 0
assert len(unready) == 1
所有输入参数(如 timeout、num_returns 和 fetch_local)从 ray.wait 与生成器一起工作。
ray.wait 可以将常规的 Ray 对象引用与输入的生成器混合使用。在这种情况下,应用程序应处理来自 ray.wait 的所有输入参数(如 timeout、num_returns 和 fetch_local)与生成器一起工作。
from ray._raylet import ObjectRefGenerator
@ray.remote
def generator_task():
for i in range(5):
time.sleep(5)
yield i
@ray.remote
def regular_task():
for i in range(5):
time.sleep(5)
return
gen = [generator_task.remote()]
ref = [regular_task.remote()]
ready, unready = [], [*gen, *ref]
result = []
while unready:
ready, unready = ray.wait(unready)
for r in ready:
if isinstance(r, ObjectRefGenerator):
try:
ref = next(r)
result.append(ray.get(ref))
except StopIteration:
pass
else:
unready.append(r)
else:
result.append(ray.get(r))
线程安全#
ObjectRefGenerator 对象不是线程安全的。
限制#
光线生成器不支持这些功能:
throw、send和closeAPI。生成器的
return语句。将
ObjectRefGenerator传递给另一个任务或角色。