任务的旅程

我们通过用户界面、调度器、工作节点,再回到原点,完成一个任务。希望这有助于说明系统的内部工作原理。

用户代码

用户计算集群上已有的两个变量的加法,然后将结果拉回到本地进程。

client = Client('host:port')
x = client.submit(...)
y = client.submit(...)

z = client.submit(add, x, y)  # we follow z

print(z.result())

步骤 1: 客户端

z 在其生命周期开始时,当 Client.submit 函数向 Scheduler 发送以下消息时:

{'op': 'update-graph',
 'tasks': {'z': (add, x, y)},
 'keys': ['z']}

客户端随后使用键 'z' 创建一个 Future 对象,并将该对象返回给用户。这一过程发生在消息被调度器接收之前。未来的状态显示为 'pending'

步骤 2:到达调度器

几毫秒后,调度器在一个打开的套接字上接收到这条消息。

调度器通过这个小图更新其状态,该图展示了如何计算 z:

scheduler.update_graph(tasks=msg['tasks'], keys=msg['keys'])

调度器还更新了 大量 其他状态。值得注意的是,它必须识别 xy 本身是变量,并将所有这些依赖关系连接起来。这是一个漫长且注重细节的过程,涉及更新大约10个集合和字典。感兴趣的读者应研究 distributed/scheduler.py::update_graph()。虽然这相当复杂且描述起来繁琐,但请放心,这一切都在常数时间内完成,大约在一毫秒内。

步骤 3:选择一个工人

一旦 xy 中的后者完成,调度器注意到 z 的所有依赖项都已在内存中,并且 z 本身现在可以运行。z 应该选择哪个工作线程?我们考虑一系列标准:

  1. 首先,我们快速筛选出那些在本地内存中拥有 xy 的工作者。

  2. 然后,我们选择需要收集最少字节数的worker,以便在本地获取 xy 。例如,如果两个不同的worker拥有 xy ,并且 y 占用的字节数比 x 多,那么我们选择持有 y 的机器,这样我们就不需要进行太多的通信。

  3. 如果有多个工作线程需要最少的通信字节数,那么我们选择最不繁忙的工作线程。

z 考虑了工人们,并根据上述标准选择了一个。在常见情况下,经过第一步后选择就相当明显了。z 等待与所选工人相关联的堆栈。尽管如此,工人可能仍然忙碌,因此 z 可能需要等待一段时间。

注意:此政策正在变动中,本文件的这一部分很可能已经过时。

步骤 4:传输到工作节点

最终,工作者完成了一项任务,有一个空闲的核心,并且 z 发现自己位于栈顶(注意,如果在此期间其他任务将自己置于工作者的栈顶,这可能是在最后一个部分之后的某个时间。)

我们将 z 放入与该工作线程关联的 worker_queue 中,并且 worker_core 协程将其取出。 z 的功能、与其参数关联的键,以及持有这些键的工作线程的位置被打包成如下所示的消息:

{'op': 'compute',
 'function': execute_task,
 'args': ((add, 'x', 'y'),),
 'who_has': {'x': {(worker_host, port)},
             'y': {(worker_host, port), (worker_host, port)}},
 'key': 'z'}

此消息被序列化并通过 TCP 套接字发送给工作进程。

步骤 5:在 Worker 上执行

工作者解包消息,并注意到它需要 xy。如果工作者还没有这两个数据,那么它会从消息中的 who_has 字典中列出的工作者那里收集它们。对于每个它没有的键,它会从 who_has 中随机选择一个有效的工作者,并从该工作者那里收集数据。

在这段交流之后,工作者既有了 x 的值,也有了 y 的值。因此,它在本地 ThreadPoolExecutor 中启动计算 add(x, y) 并等待结果。

与此同时,工人并行地为其他任务重复这个过程。没有任何阻塞。

最终计算完成。Worker 将此结果存储在其本地内存中:

data['z'] = ...

并返回一个成功信息,以及结果的字节数:

Worker: Hey Scheduler, 'z' worked great.
        I'm holding onto it.
        It takes up 64 bytes.

worker 不会将 z 的实际值传回。

步骤 6: 调度器后续处理

调度器接收到此消息并执行以下操作:

  1. 它指出该工作线程有一个空闲的核心,并在有可用任务时发送另一个任务。

  2. 如果不再需要 xy ,则会向相关工作人员发送消息,要求从本地内存中删除它们。

  3. 它向所有客户端发送消息,表示 z 已准备就绪,因此所有当前正在等待的客户端 Future 对象应唤醒。特别是,这将唤醒用户最初执行的 z.result() 命令。

步骤 7: 收集

当用户调用 z.result() 时,他们等待计算完成并将计算结果通过网络发送回本地进程。通常情况下,这不是必要的,通常你不想将数据移回本地进程,而是希望将其保留在集群中。

但也许用户真的想知道这个值,所以他们调用了 z.result()

调度器检查谁拥有 z 并向他们发送一条消息请求结果。这条消息不会在队列中等待或等待其他作业完成,它会立即开始。值被序列化,通过TCP发送,然后反序列化并返回给用户(在此过程中可能会经过一两个队列。)

步骤 8: 垃圾回收

用户离开这部分代码,局部变量 z 超出作用域。Python 垃圾收集器会清理它。这会触发客户端的引用计数减少(我们没有提到这一点,但当我们创建 Future 时,我们也启动了一个引用计数。)如果这是唯一一个指向 z 的 Future 实例,那么我们会向调度器发送一条消息,表示可以释放 z。用户不再需要它持久存在。

调度器接收到这个消息,如果没有计算可能在不久的将来依赖于 z,它将从本地调度器状态中移除这个键的元素,并将该键添加到一个定期删除的键列表中。每500毫秒,调度器会向相关的工作者发送消息,告诉他们可以从本地内存中删除哪些键。创建 z 结果的图/配方在调度器中永久保存。

开销

用户在约10毫秒内体验到这一点,具体取决于网络延迟。