基础

在阅读本文档之前,您应该先阅读 快速入门

分布式计算之所以困难,有两个原因:

  1. 分布式系统的协调一致需要精巧的设计。

  2. 并发网络编程是棘手的且容易出错的

dask.distributed 的基础提供了抽象来隐藏并发网络编程的一些复杂性(#2)。这些抽象使得在更安全的环境中构建复杂的并行系统(#1)变得更加容易。然而,与所有分层抽象一样,我们的抽象也有缺陷。欢迎提供关键反馈。

使用 Tornado 协程的并发

Worker 和 Scheduler 节点并发运行。它们同时处理多个重叠的请求和执行多个重叠的计算,而不会阻塞。并发编程有多种方法,我们选择使用 Tornado 的原因如下:

  1. 在没有线程的情况下,开发和调试更加舒适

  2. Tornado 的文档 非常出色

  3. Stackoverflow 的覆盖范围非常出色

  4. 性能令人满意

端到端通信

各种分布式端点(客户端、调度器、工作者)通过相互发送任意Python对象进行通信。编码、发送然后解码这些对象是 通信层 的工作。

然而,诸如基于Bokeh的Web界面等辅助服务,具有自己的实现和语义。

协议处理

虽然抽象通信层可以传输任意 Python 对象(只要它们是可序列化的),但 distributed 集群中的参与者具体遵循分布式 协议 ,该协议使用定义良好的消息格式指定请求-响应语义。

distributed 中的专用基础设施处理协议的各个方面,例如调度端点支持的各种操作。

服务器

Worker、Scheduler 和 Nanny 对象都继承自 Server 类。

class distributed.core.Server(handlers, blocked_handlers=None, stream_handlers=None, connection_limit=512, deserialize=True, serializers=None, deserializers=None, connection_args=None, timeout=None, io_loop=None, local_directory=None, needs_workdir=True)[源代码]

Dask 分布式服务器

分布式集群中端点的超类,例如 Worker 和 Scheduler 对象。

处理程序

服务器通过 handlers 字典将操作名称映射到函数来定义操作。处理函数的第一个参数将是一个与客户端建立通信的 Comm。其他参数将从传入消息的键接收输入,该消息始终是一个字典。

>>> def pingpong(comm):
...     return b'pong'
>>> def add(comm, x, y):
...     return x + y
>>> handlers = {'ping': pingpong, 'add': add}
>>> server = Server(handlers)  
>>> server.listen('tcp://0.0.0.0:8000')  

消息格式

服务器期望消息为带有特殊键 ‘op’ 的字典,该键对应操作的名称,并且其他键值对根据函数要求提供。

所以在上面的例子中,以下是好的信息。

  • {'op': 'ping'}

  • {'op': 'add', 'x': 10, 'y': 20}

RPC

要与远程服务器交互,我们通常使用 rpc 对象,这些对象提供了一个熟悉的方法调用接口来调用远程操作。

class distributed.core.rpc(arg=None, comm=None, deserialize=True, timeout=None, connection_args=None, serializers=None, deserializers=None)[源代码]

方便地与远程服务器交互

>>> remote = rpc(address)  
>>> response = await remote.add(x=10, y=20)  

一个 rpc 对象可以被用于多次交互。此外,该对象根据需要创建和销毁许多通信,因此在多个重叠通信中使用是安全的。

完成后,明确关闭通信。

>>> remote.close_comms()  

示例

以下是一个使用 distributed.core 创建并与之交互的自定义服务器的小示例。

服务器端

import asyncio
from distributed.core import Server

def add(comm, x=None, y=None):  # simple handler, just a function
    return x + y

async def stream_data(comm, interval=1):  # complex handler, multiple responses
    data = 0
    while True:
        await asyncio.sleep(interval)
        data += 1
        await comm.write(data)

s = Server({'add': add, 'stream_data': stream_data})
s.listen('tcp://:8888')   # listen on TCP port 8888

asyncio.get_event_loop().run_forever()

客户端

import asyncio
from distributed.core import connect

async def f():
    comm = await connect('tcp://127.0.0.1:8888')
    await comm.write({'op': 'add', 'x': 1, 'y': 2})
    result = await comm.read()
    await comm.close()
    print(result)

>>> asyncio.get_event_loop().run_until_complete(f())
3

async def g():
    comm = await connect('tcp://127.0.0.1:8888')
    await comm.write({'op': 'stream_data', 'interval': 1})
    while True:
        result = await comm.read()
        print(result)

>>> asyncio.get_event_loop().run_until_complete(g())
1
2
3
...

使用 rpc 的客户端

RPC 提供了一个更 Pythonic 的接口。它还提供了其他好处,例如在并发情况下使用多个流。大多数分布式代码使用 rpc。例外情况是我们需要执行多次读取或写入操作,如上述流数据情况。

import asyncio
from distributed.core import rpc

async def f():
    # comm = await connect('tcp://127.0.0.1', 8888)
    # await comm.write({'op': 'add', 'x': 1, 'y': 2})
    # result = await comm.read()
    async with rpc('tcp://127.0.0.1:8888') as r:
        result = await r.add(x=1, y=2)

    print(result)

>>> asyncio.get_event_loop().run_until_complete(f())
3