为什么我的工作进程会死亡?

Dask 工作节点可能因多种原因停止运行。这些原因可以分为以下几类:

  • 工作者选择退出

  • 在工作进程中发生了一个不可恢复的异常

  • 工作进程被某些外部操作关闭

下面将详细描述这些情况。当这些事情发生时,您将经历的症状范围从工作不再完成,到与本地客户端交互时出现各种异常,例如 KilledWorkerTimeoutErrorCommClosedError

注意 KilledWorker 的特殊情况:这意味着某个任务在一个工作节点上尝试执行,但该节点死亡,然后相同的任务被发送到另一个工作节点,该节点也死亡。在可配置的死亡次数(配置键 distributed.scheduler.allowed-failures)之后,Dask 决定归咎于任务本身,并返回此异常。请注意,任务有可能被不公平地归咎——工作节点在任务活跃时恰好死亡,可能是由于另一个线程——这使得诊断变得复杂。

在每种情况下,查找更多信息的首要位置是给定工作者的日志,这些日志很可能会完整描述发生了什么。这些日志由工作者打印到其“标准错误”,可能会出现在您启动工作者的文本控制台中,或者由集群基础设施维护的某些日志系统中。观察诊断仪表板以查找内存峰值也很有帮助,但当然这只有在工作者仍然存活时才可能。

在所有情况下,调度器会注意到工作节点已经离开,这可能是因为显式的注销,或者是因为工作节点不再产生心跳,应该可以将任务重新路由到其他工作节点,并使系统继续运行。

场景

Worker 选择退出

工作者可能在正常运行中退出,因为他们被要求这样做,例如,他们收到了键盘中断(^C),或者调度器缩减了集群。在这种情况下,工作者正在处理的工作将被重定向到其他工作者,如果还有剩余的话。

你应该期望在工作日志的末尾看到以下消息:

distributed.dask_worker - INFO - End worker

在这些情况下,通常不需要做任何事情,因为行为是预期的。

不可恢复的异常

worker 是一个 Python 进程,和其他代码一样,可能会发生异常导致进程退出。一个典型的例子可能是客户端和 worker 之间的包版本不匹配,导致发送给 worker 的消息在解包时出错。需要匹配的包有很多,不仅仅是 daskdistributed

在这种情况下,你应该会在工作者的日志中看到完整的Python回溯信息。如果出现版本不匹配的情况,这可能会抱怨导入错误或缺少属性。然而,其他致命异常也是可能的,例如尝试分配比系统可用内存更多的内存,或在没有适当权限的情况下写入临时文件。

为了确保版本匹配,您应该运行(较新版本的distributed可能会自动执行此操作)

client.get_versions(check=True)

对于其他错误,如果可能的话,您可能希望在本地客户端中运行计算,或者尝试仅获取出错的任务并使用 recreate_error_locally(),就像处理任务执行期间发生的普通异常一样。

特别是对于连接问题(例如,工作日志中的超时异常),您需要诊断您的网络基础设施,这比这里描述的要复杂得多。通常,这可能涉及登录到运行受影响工作者的机器。

被保姆杀害

Dask 的“保姆”是一个监控工作进程的进程,必要时会重启工作进程。它还会跟踪工作进程的内存使用情况,如果内存使用量超过总内存的指定比例,工作进程也会被重启,中断任何正在进行的工作。日志中会显示类似的消息。

Worker exceeded X memory budget. Restarting

其中 X 是内存分数。您可以使用配置设置此关键分数,请参阅 Worker 内存管理。如果您有一个由集群基础设施(HPC、Kubernetes 等)提供的外部系统来监控内存使用情况,那么关闭此内存限制可能是合理的。实际上,在这些情况下,重启可能也会由系统处理,因此您完全可以不需要保姆(--no-nanny CLI 选项或等效配置)。

突然退出

工作进程可能会在没有通知的情况下停止工作。这可能是因为工作进程内部的问题,例如内存违规(如果与编译代码接口,这种情况很常见),或者是因为外部因素,例如 kill 命令,或者停止运行工作进程的容器或机器。

在最好的情况下,您可能会在操作系统日志中看到一行关于工作进程被关闭的信息,例如单个词“killed”或更详细的描述。在这些情况下,问题很可能出在您的代码中,您可能可以使用与上一节中相同的调试工具。

然而,如果操作是由外部框架启动的,那么工作线程将没有时间留下日志消息,死亡*可能*与工作线程当时正在做的事情无关。例如,如果kubernetes决定驱逐一个pod,或者你的ec2实例因维护而关闭,工作线程并无过错。希望系统能在进程输出中提供一个合理的关于发生了什么的信息。但是,如果内存分配(或其他资源)超过容忍度,那么就是代码的错误——尽管你可能可以通过更好地配置Dask自身的限制,或者简单地使用更大的集群来修复。无论如何,你的部署框架有自己的日志系统,你应该在那里查找dask工作线程被关闭的原因。

具体到内存问题,请参考 最佳实践 中的内存部分。