光线生成器#

Python 生成器 是行为类似于迭代器的函数,每次迭代产生一个值。Ray 也支持生成器 API。

任何用 ray.remote 装饰的生成器函数都变成一个 Ray 生成器任务。生成器任务在任务完成前将输出流回给调用者。

+import ray
 import time

 # Takes 25 seconds to finish.
+@ray.remote
 def f():
     for i in range(5):
         time.sleep(5)
         yield i

-for obj in f():
+for obj_ref in f.remote():
     # Prints every 5 seconds and stops after 25 seconds.
-    print(obj)
+    print(ray.get(obj_ref))

上述 Ray 生成器每 5 秒产生一次输出,共 5 次。使用普通的 Ray 任务,你必须等待 25 秒才能访问输出。使用 Ray 生成器,调用者可以在任务 f 完成之前访问对象引用。

Ray 生成器在以下情况中非常有用

  • 您希望通过在任务完成前产生输出并进行垃圾回收(GC)来减少堆内存或对象存储内存的使用。

  • 你熟悉Python生成器,并希望找到等效的编程模型。

Ray 库使用 Ray 生成器来支持流式使用场景

  • Ray Serve 使用 Ray 生成器来支持 流式响应

  • Ray Data 是一个流数据处理库,它使用 Ray 生成器来控制和减少并发内存使用。

Ray 生成器与现有的 Ray API 无缝协作

入门#

定义一个 Python 生成器函数并用 ray.remote 装饰它以创建一个 Ray 生成器。

import ray
import time

@ray.remote
def task():
    for i in range(5):
        time.sleep(5)
        yield i

Ray 生成器任务返回一个 ObjectRefGenerator 对象,该对象兼容生成器和异步生成器 API。你可以从该类访问 next__iter____anext____aiter__ API。

每当任务调用 yield 时,相应的输出会作为 Ray 对象引用从生成器中准备好并可用。你可以调用 next(gen) 来获取一个对象引用。如果 next 没有更多项可以生成,它会引发 StopIteration。如果 __anext__ 没有更多项可以生成,它会引发 StopAsyncIteration

next API 会阻塞线程,直到任务通过 yield 生成下一个对象引用。由于 ObjectRefGenerator 只是一个 Python 生成器,你也可以使用 for 循环来迭代对象引用。

如果你想避免阻塞线程,你可以使用 asyncio 或者 ray.wait API

gen = task.remote()
# Blocks for 5 seconds.
ref = next(gen)
# return 0
ray.get(ref)
# Blocks for 5 seconds.
ref = next(gen)
# Return 1
ray.get(ref)

# Returns 2~4 every 5 seconds.
for ref in gen:
    print(ray.get(ref))

备注

对于一个普通的 Python 生成器,生成器函数在调用生成器的 next 函数时暂停和恢复。Ray 会急切地执行生成器任务直到完成,无论调用者是否正在轮询部分结果。

错误处理#

如果生成器任务失败(由于应用程序异常或系统错误,如意外的节点故障),next(gen) 返回一个包含异常的对象引用。当你调用 ray.get 时,Ray 会引发该异常。

@ray.remote
def task():
    for i in range(5):
        time.sleep(1)
        if i == 1:
            raise ValueError
        yield i

gen = task.remote()
# it's okay.
ray.get(next(gen))

# Raises an exception
try:
    ray.get(next(gen))
except ValueError as e:
    print(f"Exception is raised when i == 1 as expected {e}")

在上面的例子中,如果一个应用程序未能完成任务,Ray 会按正确的顺序返回带有异常的对象引用。例如,如果 Ray 在第二次 yield 后引发异常,第三次 next(gen) 将始终返回一个带有异常的对象引用。如果系统错误导致任务失败(例如,节点故障或工作进程故障),next(gen) 将随时返回包含系统级异常的对象引用,而无需保证顺序。这意味着当你有 N 次 yield 时,生成器可以在发生故障时创建从 1 到 N + 1 个对象引用(N 个输出 + 带有系统级异常的引用)。

从Actor任务生成的生成器#

Ray 生成器兼容 所有执行模型。它可以无缝地与常规执行器、异步执行器线程化执行器 一起工作。

@ray.remote
class Actor:
    def f(self):
        for i in range(5):
            yield i

@ray.remote
class AsyncActor:
    async def f(self):
        for i in range(5):
            yield i

@ray.remote(max_concurrency=5)
class ThreadedActor:
    def f(self):
        for i in range(5):
            yield i

actor = Actor.remote()
for ref in actor.f.remote():
    print(ray.get(ref))

actor = AsyncActor.remote()
for ref in actor.f.remote():
    print(ray.get(ref))

actor = ThreadedActor.remote()
for ref in actor.f.remote():
    print(ray.get(ref))

使用 Ray 生成器与 asyncio#

返回的 ObjectRefGenerator 也兼容 asyncio。你可以使用 __anext__async for 循环。

import asyncio

@ray.remote
def task():
    for i in range(5):
        time.sleep(1)
        yield i


async def main():
    async for ref in task.remote():
        print(await ref)

asyncio.run(main())

对象引用的垃圾回收#

next(generator) 返回的 ref 只是一个普通的 Ray 对象引用,并且以相同的方式进行分布式引用计数。如果引用没有通过 next API 从生成器中消费,当生成器被垃圾回收(GC)时,这些引用将被垃圾回收。

@ray.remote
def task():
    for i in range(5):
        time.sleep(1)
        yield i

gen = task.remote()
ref1 = next(gen)
del gen

在下面的例子中,Ray 将 ref1 视为一个普通的 Ray 对象引用,在 Ray 返回它之后。其他未通过 next(gen) 消耗的引用在生成器被垃圾回收时会被移除。在这个例子中,垃圾回收发生在您调用 del gen 时。

容错#

容错功能 适用于 Ray 生成器任务和参与者任务。例如;

取消#

The ray.cancel() 函数同时适用于 Ray 生成器任务和 actor 任务。从语义上讲,取消生成器任务与取消常规任务没有区别。当你取消一个任务时,next(gen) 可以返回包含 TaskCancelledError 的引用,而没有任何特殊的顺序保证。

如何在不阻塞线程的情况下等待生成器(兼容 ray.wait 和 ray.get)#

当使用生成器时,next API 会阻塞其线程,直到下一个对象引用可用。然而,您可能并不总是希望这种行为。您可能希望在不阻塞线程的情况下等待生成器。使用 Ray 生成器可以通过以下方式实现非阻塞等待:

等待生成器任务完成

ObjectRefGenerator 有一个 API completed。它返回一个对象引用,该引用在生成器任务完成或出错时可用。例如,你可以执行 ray.get(<generator_instance>.completed()) 来等待任务完成。注意,不允许使用 ray.get 来获取 ObjectRefGenerator

使用 asyncio 和 await

ObjectRefGenerator 与 asyncio 兼容。你可以创建多个 asyncio 任务来创建生成器任务并等待它,以避免阻塞线程。

import asyncio

@ray.remote
def task():
    for i in range(5):
        time.sleep(1)
        yield i


async def async_task():
    async for ref in task.remote():
        print(await ref)

async def main():
    t1 = async_task()
    t2 = async_task()
    await asyncio.gather(t1, t2)

asyncio.run(main())

使用 ray.wait

你可以将 ObjectRefGenerator 作为输入传递给 ray.wait。如果 下一个项目 可用,则生成器是“就绪”的。一旦 Ray 从就绪列表中找到,next(gen) 将立即返回下一个对象引用,而不会阻塞。有关更多详细信息,请参见下面的示例。

@ray.remote
def task():
    for i in range(5):
        time.sleep(5)
        yield i

gen = task.remote()

# Because it takes 5 seconds to make the first yield,
# with 0 timeout, the generator is unready.
ready, unready = ray.wait([gen], timeout=0)
print("timeout 0, nothing is ready.")
print(ready)
assert len(ready) == 0
assert len(unready) == 1

# Without a timeout argument, ray.wait waits until the given argument
# is ready. When a next item is ready, it returns.
ready, unready = ray.wait([gen])
print("Wait for 5 seconds. The next item is ready.")
assert len(ready) == 1
assert len(unready) == 0
next(gen)

# Because the second yield hasn't happened yet,
ready, unready = ray.wait([gen], timeout=0)
print("Wait for 0 seconds. The next item is not ready.")
print(ready, unready)
assert len(ready) == 0
assert len(unready) == 1

所有输入参数(如 timeoutnum_returnsfetch_local)从 ray.wait 与生成器一起工作。

ray.wait 可以将常规的 Ray 对象引用与输入的生成器混合使用。在这种情况下,应用程序应处理来自 ray.wait 的所有输入参数(如 timeoutnum_returnsfetch_local)与生成器一起工作。

from ray._raylet import ObjectRefGenerator

@ray.remote
def generator_task():
    for i in range(5):
        time.sleep(5)
        yield i

@ray.remote
def regular_task():
    for i in range(5):
        time.sleep(5)
    return

gen = [generator_task.remote()]
ref = [regular_task.remote()]
ready, unready = [], [*gen, *ref]
result = []

while unready:
    ready, unready = ray.wait(unready)
    for r in ready:
        if isinstance(r, ObjectRefGenerator):
            try:
                ref = next(r)
                result.append(ray.get(ref))
            except StopIteration:
                pass
            else:
                unready.append(r)
        else:
            result.append(ray.get(r))

线程安全#

ObjectRefGenerator 对象不是线程安全的。

限制#

光线生成器不支持这些功能:

  • throwsendclose API。

  • 生成器的 return 语句。

  • ObjectRefGenerator 传递给另一个任务或角色。

  • Ray 客户端