任务#
Ray 使得任意函数能够在独立的 Python 工作者上异步执行。这些函数被称为 Ray 远程函数,它们的异步调用被称为 Ray 任务。以下是一个示例。
import ray
import time
# A regular Python function.
def normal_function():
return 1
# By adding the `@ray.remote` decorator, a regular Python function
# becomes a Ray remote function.
@ray.remote
def my_function():
return 1
# To invoke this remote function, use the `remote` method.
# This will immediately return an object ref (a future) and then create
# a task that will be executed on a worker process.
obj_ref = my_function.remote()
# The result can be retrieved with ``ray.get``.
assert ray.get(obj_ref) == 1
@ray.remote
def slow_function():
time.sleep(10)
return 1
# Ray tasks are executed in parallel.
# All computation is performed in the background, driven by Ray's internal event loop.
for _ in range(4):
# This doesn't block.
slow_function.remote()
更多详情请参见 ray.remote API。
public class MyRayApp {
// A regular Java static method.
public static int myFunction() {
return 1;
}
}
// Invoke the above method as a Ray task.
// This will immediately return an object ref (a future) and then create
// a task that will be executed on a worker process.
ObjectRef<Integer> res = Ray.task(MyRayApp::myFunction).remote();
// The result can be retrieved with ``ObjectRef::get``.
Assert.assertTrue(res.get() == 1);
public class MyRayApp {
public static int slowFunction() throws InterruptedException {
TimeUnit.SECONDS.sleep(10);
return 1;
}
}
// Ray tasks are executed in parallel.
// All computation is performed in the background, driven by Ray's internal event loop.
for(int i = 0; i < 4; i++) {
// This doesn't block.
Ray.task(MyRayApp::slowFunction).remote();
}
// A regular C++ function.
int MyFunction() {
return 1;
}
// Register as a remote function by `RAY_REMOTE`.
RAY_REMOTE(MyFunction);
// Invoke the above method as a Ray task.
// This will immediately return an object ref (a future) and then create
// a task that will be executed on a worker process.
auto res = ray::Task(MyFunction).Remote();
// The result can be retrieved with ``ray::ObjectRef::Get``.
assert(*res.Get() == 1);
int SlowFunction() {
std::this_thread::sleep_for(std::chrono::seconds(10));
return 1;
}
RAY_REMOTE(SlowFunction);
// Ray tasks are executed in parallel.
// All computation is performed in the background, driven by Ray's internal event loop.
for(int i = 0; i < 4; i++) {
// This doesn't block.
ray::Task(SlowFunction).Remote();
a
使用 State API 中的 ray summary tasks 查看正在运行和已完成的任务及其数量:
# This API is only available when you download Ray via `pip install "ray[default]"`
ray summary tasks
======== Tasks Summary: 2023-05-26 11:09:32.092546 ========
Stats:
------------------------------------
total_actor_scheduled: 0
total_actor_tasks: 0
total_tasks: 5
Table (group by func_name):
------------------------------------
FUNC_OR_CLASS_NAME STATE_COUNTS TYPE
0 slow_function RUNNING: 4 NORMAL_TASK
1 my_function FINISHED: 1 NORMAL_TASK
指定所需资源#
您可以在任务中指定资源需求(更多详情请参见 资源需求)。
# Specify required resources.
@ray.remote(num_cpus=4, num_gpus=2)
def my_function():
return 1
# Override the default resource requirements.
my_function.options(num_cpus=3).remote()
// Specify required resources.
Ray.task(MyRayApp::myFunction).setResource("CPU", 4.0).setResource("GPU", 2.0).remote();
// Specify required resources.
ray::Task(MyFunction).SetResource("CPU", 4.0).SetResource("GPU", 2.0).Remote();
将对象引用传递给 Ray 任务#
除了值之外,对象引用 也可以传递到远程函数中。当任务执行时,在函数体内 参数将是底层值。例如,考虑这个函数:
@ray.remote
def function_with_an_argument(value):
return value + 1
obj_ref1 = my_function.remote()
assert ray.get(obj_ref1) == 1
# You can pass an object ref as an argument to another Ray task.
obj_ref2 = function_with_an_argument.remote(obj_ref1)
assert ray.get(obj_ref2) == 2
public class MyRayApp {
public static int functionWithAnArgument(int value) {
return value + 1;
}
}
ObjectRef<Integer> objRef1 = Ray.task(MyRayApp::myFunction).remote();
Assert.assertTrue(objRef1.get() == 1);
// You can pass an object ref as an argument to another Ray task.
ObjectRef<Integer> objRef2 = Ray.task(MyRayApp::functionWithAnArgument, objRef1).remote();
Assert.assertTrue(objRef2.get() == 2);
static int FunctionWithAnArgument(int value) {
return value + 1;
}
RAY_REMOTE(FunctionWithAnArgument);
auto obj_ref1 = ray::Task(MyFunction).Remote();
assert(*obj_ref1.Get() == 1);
// You can pass an object ref as an argument to another Ray task.
auto obj_ref2 = ray::Task(FunctionWithAnArgument).Remote(obj_ref1);
assert(*obj_ref2.Get() == 2);
注意以下行为:
由于第二个任务依赖于第一个任务的输出,Ray 将不会执行第二个任务,直到第一个任务完成。
如果这两个任务被安排在不同的机器上,第一个任务的输出(对应于
obj_ref1/objRef1的值)将通过网络发送到第二个任务被安排的机器上。
等待部分结果#
在 Ray 任务结果上调用 ray.get 会阻塞,直到任务完成执行。在启动多个任务后,您可能想知道哪些任务已经完成执行,而不阻塞所有任务。这可以通过 ray.wait() 实现。该函数的工作原理如下。
object_refs = [slow_function.remote() for _ in range(2)]
# Return as soon as one of the tasks finished execution.
ready_refs, remaining_refs = ray.wait(object_refs, num_returns=1, timeout=None)
WaitResult<Integer> waitResult = Ray.wait(objectRefs, /*num_returns=*/0, /*timeoutMs=*/1000);
System.out.println(waitResult.getReady()); // List of ready objects.
System.out.println(waitResult.getUnready()); // list of unready objects.
ray::WaitResult<int> wait_result = ray::Wait(object_refs, /*num_objects=*/0, /*timeout_ms=*/1000);
生成器#
Ray 兼容 Python 生成器语法。更多详情请参见 Ray 生成器。
多重返回#
默认情况下,Ray 任务只返回一个对象引用。然而,你可以通过设置 num_returns 选项来配置 Ray 任务返回多个对象引用。
# By default, a Ray task only returns a single Object Ref.
@ray.remote
def return_single():
return 0, 1, 2
object_ref = return_single.remote()
assert ray.get(object_ref) == (0, 1, 2)
# However, you can configure Ray tasks to return multiple Object Refs.
@ray.remote(num_returns=3)
def return_multiple():
return 0, 1, 2
object_ref0, object_ref1, object_ref2 = return_multiple.remote()
assert ray.get(object_ref0) == 0
assert ray.get(object_ref1) == 1
assert ray.get(object_ref2) == 2
对于返回多个对象的任务,Ray 还支持远程生成器,允许任务一次返回一个对象以减少工作线程的内存使用。Ray 还支持动态设置返回值数量的选项,这在任务调用者不知道预期有多少返回值时非常有用。有关更多使用案例的详细信息,请参阅 用户指南。
@ray.remote(num_returns=3)
def return_multiple_as_generator():
for i in range(3):
yield i
# NOTE: Similar to normal functions, these objects will not be available
# until the full task is complete and all returns have been generated.
a, b, c = return_multiple_as_generator.remote()
取消任务#
Ray 任务可以通过在返回的对象引用上调用 ray.cancel() 来取消。
@ray.remote
def blocking_operation():
time.sleep(10e6)
obj_ref = blocking_operation.remote()
ray.cancel(obj_ref)
try:
ray.get(obj_ref)
except ray.exceptions.TaskCancelledError:
print("Object reference was cancelled.")
调度#
对于每个任务,Ray 会选择一个节点来运行它,调度决策基于几个因素,如 任务的资源需求、指定的调度策略 和 任务参数的位置。更多详情请参见 Ray 调度。
容错性#
默认情况下,Ray 会 重试 由于系统故障和指定的应用程序级故障而失败的任务。您可以通过在 ray.remote() 和 .options() 中设置 max_retries 和 retry_exceptions 选项来更改此行为。更多详情请参见 Ray 容错。
任务事件#
默认情况下,Ray 跟踪任务的执行,报告任务状态事件和分析事件,这些事件由 Ray 仪表板和 状态 API 使用。
你可以通过在 ray.remote() 和 .options() 中设置 enable_task_events 选项来改变这种行为,以禁用任务事件,这减少了任务执行的开销,以及任务发送到 Ray Dashboard 的数据量。嵌套任务不会从父任务继承任务事件设置。你需要分别为每个任务设置任务事件设置。