任务#

Ray 使得任意函数能够在独立的 Python 工作者上异步执行。这些函数被称为 Ray 远程函数,它们的异步调用被称为 Ray 任务。以下是一个示例。

import ray
import time


# A regular Python function.
def normal_function():
    return 1


# By adding the `@ray.remote` decorator, a regular Python function
# becomes a Ray remote function.
@ray.remote
def my_function():
    return 1


# To invoke this remote function, use the `remote` method.
# This will immediately return an object ref (a future) and then create
# a task that will be executed on a worker process.
obj_ref = my_function.remote()

# The result can be retrieved with ``ray.get``.
assert ray.get(obj_ref) == 1


@ray.remote
def slow_function():
    time.sleep(10)
    return 1


# Ray tasks are executed in parallel.
# All computation is performed in the background, driven by Ray's internal event loop.
for _ in range(4):
    # This doesn't block.
    slow_function.remote()

更多详情请参见 ray.remote API。

public class MyRayApp {
  // A regular Java static method.
  public static int myFunction() {
    return 1;
  }
}

// Invoke the above method as a Ray task.
// This will immediately return an object ref (a future) and then create
// a task that will be executed on a worker process.
ObjectRef<Integer> res = Ray.task(MyRayApp::myFunction).remote();

// The result can be retrieved with ``ObjectRef::get``.
Assert.assertTrue(res.get() == 1);

public class MyRayApp {
  public static int slowFunction() throws InterruptedException {
    TimeUnit.SECONDS.sleep(10);
    return 1;
  }
}

// Ray tasks are executed in parallel.
// All computation is performed in the background, driven by Ray's internal event loop.
for(int i = 0; i < 4; i++) {
  // This doesn't block.
  Ray.task(MyRayApp::slowFunction).remote();
}
// A regular C++ function.
int MyFunction() {
  return 1;
}
// Register as a remote function by `RAY_REMOTE`.
RAY_REMOTE(MyFunction);

// Invoke the above method as a Ray task.
// This will immediately return an object ref (a future) and then create
// a task that will be executed on a worker process.
auto res = ray::Task(MyFunction).Remote();

// The result can be retrieved with ``ray::ObjectRef::Get``.
assert(*res.Get() == 1);

int SlowFunction() {
  std::this_thread::sleep_for(std::chrono::seconds(10));
  return 1;
}
RAY_REMOTE(SlowFunction);

// Ray tasks are executed in parallel.
// All computation is performed in the background, driven by Ray's internal event loop.
for(int i = 0; i < 4; i++) {
  // This doesn't block.
  ray::Task(SlowFunction).Remote();
a

使用 State API 中的 ray summary tasks 查看正在运行和已完成的任务及其数量:

# This API is only available when you download Ray via `pip install "ray[default]"`
ray summary tasks
======== Tasks Summary: 2023-05-26 11:09:32.092546 ========
Stats:
------------------------------------
total_actor_scheduled: 0
total_actor_tasks: 0
total_tasks: 5


Table (group by func_name):
------------------------------------
    FUNC_OR_CLASS_NAME    STATE_COUNTS    TYPE
0   slow_function         RUNNING: 4      NORMAL_TASK
1   my_function           FINISHED: 1     NORMAL_TASK

指定所需资源#

您可以在任务中指定资源需求(更多详情请参见 资源需求)。

# Specify required resources.
@ray.remote(num_cpus=4, num_gpus=2)
def my_function():
    return 1


# Override the default resource requirements.
my_function.options(num_cpus=3).remote()
// Specify required resources.
Ray.task(MyRayApp::myFunction).setResource("CPU", 4.0).setResource("GPU", 2.0).remote();
// Specify required resources.
ray::Task(MyFunction).SetResource("CPU", 4.0).SetResource("GPU", 2.0).Remote();

将对象引用传递给 Ray 任务#

除了值之外,对象引用 也可以传递到远程函数中。当任务执行时,在函数体内 参数将是底层值。例如,考虑这个函数:

@ray.remote
def function_with_an_argument(value):
    return value + 1


obj_ref1 = my_function.remote()
assert ray.get(obj_ref1) == 1

# You can pass an object ref as an argument to another Ray task.
obj_ref2 = function_with_an_argument.remote(obj_ref1)
assert ray.get(obj_ref2) == 2
public class MyRayApp {
    public static int functionWithAnArgument(int value) {
        return value + 1;
    }
}

ObjectRef<Integer> objRef1 = Ray.task(MyRayApp::myFunction).remote();
Assert.assertTrue(objRef1.get() == 1);

// You can pass an object ref as an argument to another Ray task.
ObjectRef<Integer> objRef2 = Ray.task(MyRayApp::functionWithAnArgument, objRef1).remote();
Assert.assertTrue(objRef2.get() == 2);
static int FunctionWithAnArgument(int value) {
    return value + 1;
}
RAY_REMOTE(FunctionWithAnArgument);

auto obj_ref1 = ray::Task(MyFunction).Remote();
assert(*obj_ref1.Get() == 1);

// You can pass an object ref as an argument to another Ray task.
auto obj_ref2 = ray::Task(FunctionWithAnArgument).Remote(obj_ref1);
assert(*obj_ref2.Get() == 2);

注意以下行为:

  • 由于第二个任务依赖于第一个任务的输出,Ray 将不会执行第二个任务,直到第一个任务完成。

  • 如果这两个任务被安排在不同的机器上,第一个任务的输出(对应于 obj_ref1/objRef1 的值)将通过网络发送到第二个任务被安排的机器上。

等待部分结果#

在 Ray 任务结果上调用 ray.get 会阻塞,直到任务完成执行。在启动多个任务后,您可能想知道哪些任务已经完成执行,而不阻塞所有任务。这可以通过 ray.wait() 实现。该函数的工作原理如下。

object_refs = [slow_function.remote() for _ in range(2)]
# Return as soon as one of the tasks finished execution.
ready_refs, remaining_refs = ray.wait(object_refs, num_returns=1, timeout=None)
WaitResult<Integer> waitResult = Ray.wait(objectRefs, /*num_returns=*/0, /*timeoutMs=*/1000);
System.out.println(waitResult.getReady());  // List of ready objects.
System.out.println(waitResult.getUnready());  // list of unready objects.
ray::WaitResult<int> wait_result = ray::Wait(object_refs, /*num_objects=*/0, /*timeout_ms=*/1000);

生成器#

Ray 兼容 Python 生成器语法。更多详情请参见 Ray 生成器

多重返回#

默认情况下,Ray 任务只返回一个对象引用。然而,你可以通过设置 num_returns 选项来配置 Ray 任务返回多个对象引用。

# By default, a Ray task only returns a single Object Ref.
@ray.remote
def return_single():
    return 0, 1, 2


object_ref = return_single.remote()
assert ray.get(object_ref) == (0, 1, 2)


# However, you can configure Ray tasks to return multiple Object Refs.
@ray.remote(num_returns=3)
def return_multiple():
    return 0, 1, 2


object_ref0, object_ref1, object_ref2 = return_multiple.remote()
assert ray.get(object_ref0) == 0
assert ray.get(object_ref1) == 1
assert ray.get(object_ref2) == 2

对于返回多个对象的任务,Ray 还支持远程生成器,允许任务一次返回一个对象以减少工作线程的内存使用。Ray 还支持动态设置返回值数量的选项,这在任务调用者不知道预期有多少返回值时非常有用。有关更多使用案例的详细信息,请参阅 用户指南

@ray.remote(num_returns=3)
def return_multiple_as_generator():
    for i in range(3):
        yield i


# NOTE: Similar to normal functions, these objects will not be available
# until the full task is complete and all returns have been generated.
a, b, c = return_multiple_as_generator.remote()

取消任务#

Ray 任务可以通过在返回的对象引用上调用 ray.cancel() 来取消。

@ray.remote
def blocking_operation():
    time.sleep(10e6)


obj_ref = blocking_operation.remote()
ray.cancel(obj_ref)

try:
    ray.get(obj_ref)
except ray.exceptions.TaskCancelledError:
    print("Object reference was cancelled.")

调度#

对于每个任务,Ray 会选择一个节点来运行它,调度决策基于几个因素,如 任务的资源需求指定的调度策略任务参数的位置。更多详情请参见 Ray 调度

容错性#

默认情况下,Ray 会 重试 由于系统故障和指定的应用程序级故障而失败的任务。您可以通过在 ray.remote().options() 中设置 max_retriesretry_exceptions 选项来更改此行为。更多详情请参见 Ray 容错

任务事件#

默认情况下,Ray 跟踪任务的执行,报告任务状态事件和分析事件,这些事件由 Ray 仪表板和 状态 API 使用。

你可以通过在 ray.remote().options() 中设置 enable_task_events 选项来改变这种行为,以禁用任务事件,这减少了任务执行的开销,以及任务发送到 Ray Dashboard 的数据量。嵌套任务不会从父任务继承任务事件设置。你需要分别为每个任务设置任务事件设置。

更多关于 Ray 任务#