通用调试#

分布式应用程序比非分布式应用程序更强大但也更复杂。Ray 的一些行为可能会让用户感到意外,但这些设计选择可能有充分的理由。

本页列出了用户可能会遇到的一些常见问题。特别是,用户认为 Ray 是在他们的本地机器上运行,虽然有时确实如此,但这会导致许多问题。

环境变量不会从驱动进程传递到工作进程#

问题:如果在命令行设置了一个环境变量(在你运行驱动程序的地方),如果集群之前已经启动,它不会传递给集群中所有正在运行的工作节点。

示例:如果你在运行 Ray 的目录中有一个文件 baz.py,并且你运行以下命令:

import ray
import os

ray.init()


@ray.remote
def myfunc():
    myenv = os.environ.get("FOO")
    print(f"myenv is {myenv}")
    return 1


ray.get(myfunc.remote())
# this prints: "myenv is None"

预期行为:大多数人会期望(就像它是单台机器上的单个进程一样)所有 Worker 中的环境变量都是相同的。但实际上不会。

修复: 使用运行时环境来显式传递环境变量。如果你调用 ray.init(runtime_env=...),那么工作进程将设置该环境变量。

ray.init(runtime_env={"env_vars": {"FOO": "bar"}})


@ray.remote
def myfunc():
    myenv = os.environ.get("FOO")
    print(f"myenv is {myenv}")
    return 1


ray.get(myfunc.remote())
# this prints: "myenv is bar"

文件名有时工作,有时不工作#

问题:如果你在任务或角色中通过文件名引用文件,它有时会成功,有时会失败。这是因为如果任务或角色在集群的头节点上运行,它会成功,但如果任务或角色在另一台机器上运行,它将不会成功。

示例: 假设我们执行以下命令:

% touch /tmp/foo.txt

我还有这段代码:

import os
import ray

@ray.remote
def check_file():
  foo_exists = os.path.exists("/tmp/foo.txt")
  return foo_exists

futures = []
for _ in range(1000):
  futures.append(check_file.remote())

print(ray.get(futures))

那么你将得到一个 True 和 False 的混合。如果 check_file() 在主节点上运行,或者我们在本地运行,它就能正常工作。但如果它在工作节点上运行,它将返回 False

预期行为:大多数人会期望这个要么失败,要么一致成功。毕竟这是相同的代码。

修复

  • 对于此类应用程序,仅使用共享路径——例如,如果您使用的是网络文件系统,则可以使用它,或者文件可以位于 S3 上。

  • 不要依赖文件路径的一致性。

放置组不可组合#

问题:如果你有一个任务是从运行在放置组中的某个东西调用的,资源永远不会被分配,并且它会挂起。

示例:您正在使用 Ray Tune,它会创建放置组,并且您希望将其应用于目标函数,但该目标函数本身使用了 Ray 任务,例如。

import ray
from ray import tune

def create_task_that_uses_resources():
  @ray.remote(num_cpus=10)
  def sample_task():
    print("Hello")
    return

  return ray.get([sample_task.remote() for i in range(10)])

def objective(config):
  create_task_that_uses_resources()

tuner = tune.Tuner(objective, param_space={"a": 1})
tuner.fit()

这将报错并显示消息:

  ValueError: Cannot schedule create_task_that_uses_resources.<locals>.sample_task with the placement group
  because the resource request {'CPU': 10} cannot fit into any bundles for the placement group, [{'CPU': 1.0}].

预期行为:上述执行。

修复:在 create_task_that_uses_resources() 调用的任务的 @ray.remote 声明中,包含一个 scheduling_strategy=PlacementGroupSchedulingStrategy(placement_group=None)

def create_task_that_uses_resources():
+     @ray.remote(num_cpus=10, scheduling_strategy=PlacementGroupSchedulingStrategy(placement_group=None))
-     @ray.remote(num_cpus=10)

过时的函数定义#

由于Python的细微差别,如果你重新定义了一个远程函数,你可能不会总是得到预期的行为。在这种情况下,可能是Ray没有运行最新版本的函数。

假设你定义了一个远程函数 f ,然后重新定义它。Ray 应该使用最新版本。

import ray

@ray.remote
def f():
    return 1

@ray.remote
def f():
    return 2

print(ray.get(f.remote()))  # This should be 2.
2

然而,以下情况修改远程函数不会更新Ray到新版本(至少在不停止并重启Ray的情况下)。

  • 该函数是从外部文件导入的: 在这种情况下,f 是在某个外部文件 file.py 中定义的。如果你 import file,在 file.py 中更改 f 的定义,然后重新 import file,函数 f 将不会更新。

    这是因为第二次导入被视为无操作而被忽略,因此 f 仍然由第一次导入定义。

    解决这个问题的方法是使用 reload(file) 而不是第二次 import file。重新加载会导致 f 的新定义被重新执行,并将其导出到其他机器。注意,在 Python 3 中,你需要执行 from importlib import reload

  • 该函数依赖于外部文件中的辅助函数: 在这种情况下,f 可以在您的 Ray 应用程序中定义,但它依赖于在某个外部文件 file.py 中定义的辅助函数 h。如果 file.py 中的 h 定义发生变化,重新定义 f 不会更新 Ray 以使用 h 的新版本。

    这是因为当 f 首次被定义时,其定义会被发送到所有 Worker 进程,并被解封。在解封过程中,file.py 在 Worker 中被导入。然后当 f 被重新定义时,其定义再次被发送到所有 Worker 并解封。但由于 file.py 已经在 Worker 中被导入,它被视为第二次导入并被忽略,不做任何操作。

    不幸的是,在驱动程序上重新加载不会更新 h,因为重新加载需要在工作进程上进行。

    解决这个问题的方法是重新定义 f ,使其在调用 h 之前重新加载 file.py 。例如,如果 file.py 内部有

    def h():
        return 1
    

    并且你将远程函数 f 定义为

    @ray.remote
    def f():
        return file.h()
    

    你可以如下重新定义 f

    @ray.remote
    def f():
        reload(file)
        return file.h()
    

    这会根据需要在 Workers 上强制执行重新加载。请注意,在 Python 3 中,你需要执行 from importlib import reload

本文档讨论了人们在使用 Ray 时遇到的一些常见问题以及一些已知问题。如果你遇到其他问题,告诉我们