基础
内容
基础¶
在阅读本文档之前,您应该先阅读 快速入门。
分布式计算之所以困难,有两个原因:
分布式系统的协调一致需要精巧的设计。
并发网络编程是棘手的且容易出错的
dask.distributed 的基础提供了抽象来隐藏并发网络编程的一些复杂性(#2)。这些抽象使得在更安全的环境中构建复杂的并行系统(#1)变得更加容易。然而,与所有分层抽象一样,我们的抽象也有缺陷。欢迎提供关键反馈。
使用 Tornado 协程的并发¶
Worker 和 Scheduler 节点并发运行。它们同时处理多个重叠的请求和执行多个重叠的计算,而不会阻塞。并发编程有多种方法,我们选择使用 Tornado 的原因如下:
在没有线程的情况下,开发和调试更加舒适
Tornado 的文档 非常出色
Stackoverflow 的覆盖范围非常出色
性能令人满意
端到端通信¶
各种分布式端点(客户端、调度器、工作者)通过相互发送任意Python对象进行通信。编码、发送然后解码这些对象是 通信层 的工作。
然而,诸如基于Bokeh的Web界面等辅助服务,具有自己的实现和语义。
协议处理¶
虽然抽象通信层可以传输任意 Python 对象(只要它们是可序列化的),但 distributed 集群中的参与者具体遵循分布式 协议 ,该协议使用定义良好的消息格式指定请求-响应语义。
在 distributed 中的专用基础设施处理协议的各个方面,例如调度端点支持的各种操作。
服务器¶
Worker、Scheduler 和 Nanny 对象都继承自 Server 类。
- class distributed.core.Server(handlers, blocked_handlers=None, stream_handlers=None, connection_limit=512, deserialize=True, serializers=None, deserializers=None, connection_args=None, timeout=None, io_loop=None, local_directory=None, needs_workdir=True)[源代码]¶
Dask 分布式服务器
分布式集群中端点的超类,例如 Worker 和 Scheduler 对象。
处理程序
服务器通过
handlers字典将操作名称映射到函数来定义操作。处理函数的第一个参数将是一个与客户端建立通信的Comm。其他参数将从传入消息的键接收输入,该消息始终是一个字典。>>> def pingpong(comm): ... return b'pong'
>>> def add(comm, x, y): ... return x + y
>>> handlers = {'ping': pingpong, 'add': add} >>> server = Server(handlers) >>> server.listen('tcp://0.0.0.0:8000')
消息格式
服务器期望消息为带有特殊键 ‘op’ 的字典,该键对应操作的名称,并且其他键值对根据函数要求提供。
所以在上面的例子中,以下是好的信息。
{'op': 'ping'}{'op': 'add', 'x': 10, 'y': 20}
RPC¶
要与远程服务器交互,我们通常使用 rpc 对象,这些对象提供了一个熟悉的方法调用接口来调用远程操作。
- class distributed.core.rpc(arg=None, comm=None, deserialize=True, timeout=None, connection_args=None, serializers=None, deserializers=None)[源代码]¶
方便地与远程服务器交互
>>> remote = rpc(address) >>> response = await remote.add(x=10, y=20)
一个 rpc 对象可以被用于多次交互。此外,该对象根据需要创建和销毁许多通信,因此在多个重叠通信中使用是安全的。
完成后,明确关闭通信。
>>> remote.close_comms()
示例¶
以下是一个使用 distributed.core 创建并与之交互的自定义服务器的小示例。
服务器端¶
import asyncio
from distributed.core import Server
def add(comm, x=None, y=None): # simple handler, just a function
return x + y
async def stream_data(comm, interval=1): # complex handler, multiple responses
data = 0
while True:
await asyncio.sleep(interval)
data += 1
await comm.write(data)
s = Server({'add': add, 'stream_data': stream_data})
s.listen('tcp://:8888') # listen on TCP port 8888
asyncio.get_event_loop().run_forever()
客户端¶
import asyncio
from distributed.core import connect
async def f():
comm = await connect('tcp://127.0.0.1:8888')
await comm.write({'op': 'add', 'x': 1, 'y': 2})
result = await comm.read()
await comm.close()
print(result)
>>> asyncio.get_event_loop().run_until_complete(f())
3
async def g():
comm = await connect('tcp://127.0.0.1:8888')
await comm.write({'op': 'stream_data', 'interval': 1})
while True:
result = await comm.read()
print(result)
>>> asyncio.get_event_loop().run_until_complete(g())
1
2
3
...
使用 rpc 的客户端¶
RPC 提供了一个更 Pythonic 的接口。它还提供了其他好处,例如在并发情况下使用多个流。大多数分布式代码使用 rpc。例外情况是我们需要执行多次读取或写入操作,如上述流数据情况。
import asyncio
from distributed.core import rpc
async def f():
# comm = await connect('tcp://127.0.0.1', 8888)
# await comm.write({'op': 'add', 'x': 1, 'y': 2})
# result = await comm.read()
async with rpc('tcp://127.0.0.1:8888') as r:
result = await r.add(x=1, y=2)
print(result)
>>> asyncio.get_event_loop().run_until_complete(f())
3