未来
内容
未来¶
Dask 支持一个实时任务框架,该框架扩展了 Python 的 concurrent.futures 接口。Dask 的未来(futures)允许你通过最小的代码更改,将通用的 Python 工作流扩展到 Dask 集群中。
这个接口适合用于像 dask.delayed 这样的任意任务调度,但它不是延迟执行的,而是立即执行的,这在计算可能会随时间变化的情况下提供了更多的灵活性。这些功能依赖于 dask.distributed 中的第二代任务调度器(尽管它的名字如此,但在单台机器上运行得非常好)。
示例¶
访问 http://www.aidoczh.com/dask-examples/ futures.html 以查看并运行使用 Dask 的 futures 示例。
启动 Dask 客户端¶
要使用 futures 接口,您必须启动一个 Client。这会在各个工作进程或线程之间跟踪状态:
from dask.distributed import Client
client = Client() # start local workers as processes
# or
client = Client(processes=False) # start local workers as threads
如果你安装了 Bokeh ,那么这将在 http://localhost:8787 启动一个诊断仪表盘。
提交任务¶
|
向调度器提交一个函数应用 |
|
将函数映射到参数序列上 |
|
等待计算完成,将结果收集到本地进程。 |
你可以使用 submit 方法提交单个任务:
def inc(x):
return x + 1
def add(x, y):
return x + y
a = client.submit(inc, 10) # calls inc(10) in background thread or process
b = client.submit(inc, 20) # calls inc(20) in background thread or process
submit 函数返回一个 Future,它指的是一个远程结果。这个结果可能尚未完成:
>>> a
<Future: status: pending, key: inc-b8aaf26b99466a7a1980efa1ade6701d>
最终它会完成。结果会保持在远程线程/进程/工作者中,直到你明确要求取回它:
>>> a
<Future: status: finished, type: int, key: inc-b8aaf26b99466a7a1980efa1ade6701d>
>>> a.result() # blocks until task completes and data arrives
11
你可以将 futures 作为输入传递给 submit。Dask 自动处理依赖关系跟踪;一旦所有输入 futures 完成,它们将被移动到一个工作节点(如果需要),然后依赖于它们的计算将被启动。你不需要在提交新任务之前等待输入完成;Dask 会自动处理这一点:
c = client.submit(add, a, b) # calls add on the results of a and b
类似于Python的 map,你可以使用 Client.map 来调用同一个函数并处理多个输入:
futures = client.map(inc, range(1000))
然而,请注意每个任务大约有1毫秒的开销。如果你想将一个函数映射到大量输入上,那么你可能需要考虑使用 dask.bag 或 dask.dataframe 。
移动数据¶
|
等待计算完成,将结果收集到本地进程。 |
|
从分布式内存中收集未来 |
|
将数据分散到分布式内存中 |
给定任何未来对象,你可以调用 .result 方法来收集结果。这将阻塞直到未来对象完成计算,并在必要时将结果传回你的本地进程:
>>> c.result()
32
你可以使用 Client.gather 方法同时收集多个结果。这比按顺序调用每个 future 的 .result() 方法更高效:
>>> # results = [future.result() for future in futures]
>>> results = client.gather(futures) # this can be faster
如果你有一些重要的本地数据想要包含在你的计算中,你可以将其作为正常输入包含在提交或映射调用中:
>>> df = pd.read_csv('training-data.csv')
>>> future = client.submit(my_function, df)
或者你可以 scatter 它明确地。分散操作会将你的数据移动到一个工作节点,并返回一个指向该数据的未来对象:
>>> remote_df = client.scatter(df)
>>> remote_df
<Future: status: finished, type: DataFrame, key: bbd0ca93589c56ea14af49cba470006e>
>>> future = client.submit(my_function, remote_df)
这两种方法都能达到相同的结果,但使用 scatter 有时会更快。特别是在使用进程或分布式工作器(需要数据传输)并且希望在多次计算中使用 df 时,这一点尤为明显。预先分散数据可以避免过多的数据移动。
对列表调用 scatter 会分散所有单独的元素。Dask 将以轮询方式将这些元素均匀分布到各个工作节点中:
>>> client.scatter([1, 2, 3])
[<Future: status: finished, type: int, key: c0a8a20f903a4915b94db8de3ea63195>,
<Future: status: finished, type: int, key: 58e78e1b34eb49a68c65b54815d1b158>,
<Future: status: finished, type: int, key: d3395e15f605bc35ab1bac6341a285e2>]
参考文献、取消和例外¶
|
取消运行此未来的请求 |
|
返回失败任务的异常 |
|
返回失败任务的回溯信息 |
|
取消正在运行的未来任务 如果尚未运行,这将阻止未来任务被调度,如果已经运行,则删除它们。 |
Dask 只会计算并保留那些有活跃 futures 的结果。通过这种方式,你的本地变量定义了 Dask 中什么是活跃的。当一个 future 被你的本地 Python 会话垃圾回收时,Dask 会自由地删除该数据或停止那些试图产生它的正在进行计算:
>>> del future # deletes remote data once future is garbage collected
你也可以使用 Future.cancel 或 Client.cancel 方法显式取消任务:
>>> future.cancel() # deletes data even if other futures point to it
如果一个未来失败,那么Dask会在你尝试获取结果时引发远程异常和回溯:
def div(x, y):
return x / y
>>> a = client.submit(div, 1, 0) # 1 / 0 raises a ZeroDivisionError
>>> a
<Future: status: error, key: div-3601743182196fb56339e584a2bf1039>
>>> a.result()
1 def div(x, y):
----> 2 return x / y
ZeroDivisionError: division by zero
所有依赖于出错未来的未来也会以相同的异常出错:
>>> b = client.submit(inc, a)
>>> b
<Future: status: error, key: inc-15e2e4450a0227fa38ede4d6b1a952db>
你可以使用 Future.exception 或 Future.traceback 方法显式地收集异常或回溯。
等待未来¶
|
按完成顺序返回期货 |
|
等待所有/任意未来完成 |
你可以使用 wait 函数等待一个 future 或 future 集合:
from dask.distributed import wait
>>> wait(futures)
此操作会阻塞,直到所有未来对象完成或出错。
你也可以使用 as_completed 函数来迭代那些已经完成的 future:
from dask.distributed import as_completed
futures = client.map(score, x_values)
best = -1
for future in as_completed(futures):
y = future.result()
if y > best:
best = y
为了提高效率,你也可以让 as_completed 在后台收集结果:
for future, result in as_completed(futures, with_results=True):
# y = future.result() # don't need this
...
或者收集自上次迭代以来到达的所有批次中的未来:
for batch in as_completed(futures, with_results=True).batches():
for future, result in batch:
...
此外,对于迭代算法,您可以在迭代 期间 向 as_completed 迭代器中添加更多未来对象:
seq = as_completed(futures)
for future in seq:
y = future.result()
if condition(y):
new_future = client.submit(...)
seq.add(new_future) # add back into the loop
或者使用 seq.update(futures) 一次性添加多个未来对象。
发射后不管¶
|
即使我们释放了未来,也要至少运行一次任务 |
有时我们并不关心任务的结果,而只关心它可能产生的副作用,比如将结果写入文件:
>>> a = client.submit(load, filename)
>>> b = client.submit(process, a)
>>> c = client.submit(write, b, out_filename)
如上所述,Dask 会停止没有活跃 futures 的工作。它认为因为没有人在关注这个数据,所以没有人关心。你可以使用 fire_and_forget 函数告诉 Dask 无论如何都要计算一个任务,即使没有活跃的 futures:
from dask.distributed import fire_and_forget
>>> fire_and_forget(c)
这在未来可能会超出作用域时特别有用,例如,作为函数的一部分:
def process(filename):
out_filename = 'out-' + filename
a = client.submit(load, filename)
b = client.submit(process, a)
c = client.submit(write, b, out_filename)
fire_and_forget(c)
return # here we lose the reference to c, but that's now ok
for filename in filenames:
process(filename)
提交任务并从不同进程检索结果¶
有时我们关心获取一个结果,但不一定来自同一个进程。
from distributed import Variable
var = Variable("my-result")
fut = client.submit(...)
var.set(fut)
使用 Variable 指令告诉 dask 记住这个任务的结果,并将其存储在给定的名称下,以便稍后可以检索它,而不必在此期间保持 Client 活动。
var = Variable("my-result")
fut = var.get()
result = fut.result()
从任务提交任务¶
|
在任务中获取客户端。 |
|
让这个线程重新加入 ThreadPoolExecutor |
|
让此任务从工作线程池中分离 |
这是一个高级功能,在常见情况下很少需要。
任务可以通过获取自己的客户端来启动其他任务。这使得复杂且高度动态的工作负载成为可能:
from dask.distributed import get_client
def my_function(x):
...
# Get locally created client
client = get_client()
# Do normal client operations, asking cluster for computation
a = client.submit(...)
b = client.submit(...)
a, b = client.gather([a, b])
return a + b
它还允许你设置长时间运行的任务,这些任务可以监视其他资源,如套接字或物理传感器:
def monitor(device):
client = get_client()
while True:
data = device.read_data()
future = client.submit(process, data)
fire_and_forget(future)
for device in devices:
fire_and_forget(client.submit(monitor))
然而,每个运行的任务占用一个线程,因此如果你启动许多会启动其他任务的任务,那么如果不小心,可能会导致系统死锁。你可以在任务中调用 secede 函数,使其从专用线程池中移除,进入一个不占用Dask worker中槽位的管理线程:
from dask.distributed import get_client, secede
def monitor(device):
client = get_client()
secede() # remove this task from the thread pool
while True:
data = device.read_data()
future = client.submit(process, data)
fire_and_forget(future)
如果你打算在等待客户端工作后在同一线程中进行更多工作,你可能希望显式地阻塞,直到线程能够 重新加入 线程池。这允许对创建的线程数量进行一些控制,并防止同时激活的线程过多,从而过度饱和你的硬件:
def f(n): # assume that this runs as a task
client = get_client()
secede() # secede while we wait for results to come back
futures = client.map(func, range(n))
results = client.gather(futures)
rejoin() # block until a slot is open in the thread pool
result = analyze(results)
return result
或者,你可以在任务中直接使用普通的 compute 函数。这将自动适当地调用 secede 和 rejoin:
def f(name, fn):
df = dd.read_csv(fn) # note that this is a dask collection
result = df[df.name == name].count()
# This calls secede
# Then runs the computation on the cluster (including this worker)
# Then blocks on rejoin, and finally delivers the answer
result = result.compute()
return result
协调原语¶
|
分布式队列 |
|
分布式全局变量 |
|
分布式集中锁 |
|
分布式集中事件等同于 asyncio.Event |
|
这个 信号量 将跟踪调度器上的租约,这些租约可以被该类的实例获取和释放。 |
|
使用发布-订阅模式发布数据 |
|
订阅发布/订阅主题 |
有时会出现任务、工作者或客户端需要以超出常规任务调度的未来方式相互协调的情况。在这些情况下,Dask 提供了额外的原语来帮助处理复杂情况。
Dask 提供了分布式版本的协调原语,如锁、事件、队列、全局变量和发布-订阅系统,这些原语在适当的情况下与其内存中的对应物相匹配。这些原语可用于控制对外部资源的访问、跟踪正在进行的计算的进度,或在多个工作者、客户端和任务之间通过侧通道合理地共享数据。
这些功能对于Dask的常见使用来说很少是必要的。我们建议初学者坚持使用上面提到的更简单的未来(如 Client.submit 和 Client.gather),而不是采用不必要的复杂技术。
队列¶
|
分布式队列 |
Dask 队列遵循标准 Python 队列的 API,但现在在客户端之间移动 futures 或小消息。队列在必要时可以合理地序列化并在远程客户端上重新连接自己:
from dask.distributed import Queue
def load_and_submit(filename):
data = load(filename)
client = get_client()
future = client.submit(process, data)
queue.put(future)
client = Client()
queue = Queue()
for filename in filenames:
future = client.submit(load_and_submit, filename)
fire_and_forget(future)
while True:
future = queue.get()
print(future.result())
队列也可以发送少量信息,任何可以被 msgpack 编码的内容(整数、字符串、布尔值、列表、字典等)。这对于发送小的分数或管理消息非常有用:
def func(x):
try:
...
except Exception as e:
error_queue.put(str(e))
error_queue = Queue()
队列由中央调度器管理,因此它们不适合发送大量数据(您发送的所有内容都将通过一个中央点进行路由)。它们非常适合移动小块元数据或期货。这些期货可能安全地指向更大的数据块:
>>> x = ... # my large numpy array
# Don't do this!
>>> q.put(x)
# Do this instead
>>> future = client.scatter(x)
>>> q.put(future)
# Or use futures for metadata
>>> q.put({'status': 'OK', 'stage=': 1234})
如果你需要在工作者之间移动大量数据,那么你可能还需要考虑下面几节中描述的发布/订阅系统。
全局变量¶
|
分布式全局变量 |
变量类似于队列,因为它们在客户端之间传递未来和小数据。然而,变量只保存一个单一的值。你可以在任何时候获取或设置这个值:
>>> var = Variable('stopping-criterion')
>>> var.set(False)
>>> var.get()
False
这通常用于在客户端之间发出停止标准或当前参数的信号。
如果你想分享大量信息,那么先分散数据:
>>> parameters = np.array(...)
>>> future = client.scatter(parameters)
>>> var.set(future)
锁¶
|
分布式集中锁 |
你也可以使用 Lock 对象来持有集群范围内的锁。Dask 锁与普通的 threading.Lock 对象具有相同的 API,只不过它们在集群范围内工作:
from dask.distributed import Lock
lock = Lock()
with lock:
# access protected resource
你可以同时管理多个锁。锁可以被赋予一个一致的名称,或者你可以传递锁对象本身。
当你想要锁定某些已知命名的资源时,使用一致的名称会很方便:
from dask.distributed import Lock
def load(fn):
with Lock('the-production-database'):
# read data from filename using some sensitive source
return ...
futures = client.map(load, filenames)
传递锁同样有效,并且在特定情况下需要创建短期锁时更为简单:
from dask.distributed import Lock
lock = Lock()
def load(fn, lock=None):
with lock:
# read data from filename using some sensitive source
return ...
futures = client.map(load, filenames, lock=lock)
如果你想控制对某些外部资源的并发访问,比如数据库或非线程安全的库,这可能会很有用。
事件¶
|
分布式集中事件等同于 asyncio.Event |
Dask 事件模仿 asyncio.Event 对象,但在集群范围内。它们持有一个单一的标志,可以被设置或清除。客户端可以等待直到事件标志被设置。与 Lock 不同,每个客户端都可以设置或清除标志,并且事件没有“所有权”。
你可以使用事件来例如同步多个客户端:
# One one client
from dask.distributed import Event
event = Event("my-event-1")
event.wait()
调用 wait 将会阻塞,直到事件被设置,例如在另一个客户端中
# In another client
from dask.distributed import Event
event = Event("my-event-1")
# do some work
event.set()
事件可以被设置、清除和多次等待。每个引用相同事件名称的等待者都会在事件设置时被通知(而不仅仅是在锁的情况下第一个等待者):
from dask.distributed import Event
def wait_for_event(x):
event = Event("my-event")
event.wait()
# at this point, all function calls
# are in sync once the event is set
futures = client.map(wait_for_event, range(10))
Event("my-event").set()
client.gather(futures)
信号量¶
|
这个 信号量 将跟踪调度器上的租约,这些租约可以被该类的实例获取和释放。 |
类似于单值的 Lock,也可以使用集群范围内的信号量来协调和限制对敏感资源(如数据库)的访问。
from dask.distributed import Semaphore
sem = Semaphore(max_leases=2, name="database")
def access_limited(val, sem):
with sem:
# Interact with the DB
return
futures = client.map(access_limited, range(10), sem=sem)
client.gather(futures)
sem.close()
演员¶
角色允许工作者在不与中央调度器协调的情况下管理快速变化的状态。这具有减少延迟(工作者之间的往返延迟约为1毫秒)、减轻中央调度器的压力(工作者可以完全相互协调角色)以及支持需要有状态或就地内存操作的工作流的优点。
然而,这些好处是有代价的。调度器不了解参与者,因此它们无法从诊断、负载均衡或弹性中受益。一旦一个参与者在一个工作节点上运行,它就永远与该工作节点绑定。如果该工作节点负担过重或崩溃,那么就没有机会恢复工作负载。
因为Actor避免了中央调度器,它们可以高性能,但不是弹性的。
示例:计数器¶
一个actor是一个包含状态和方法的类,它被提交给一个worker:
class Counter:
n = 0
def __init__(self):
self.n = 0
def increment(self):
self.n += 1
return self.n
from dask.distributed import Client
client = Client()
future = client.submit(Counter, actor=True)
counter = future.result()
>>> counter
<Actor: Counter, key=Counter-afa1cdfb6b4761e616fa2cfab42398c8>
对此对象的方法调用会产生 ActorFutures ,它们类似于普通的 Futures,但只与持有 Actor 的 worker 交互:
>>> future = counter.increment()
>>> future
<ActorFuture>
>>> future.result()
1
属性访问是同步且阻塞的:
>>> counter.n
1
示例:参数服务器¶
此示例将使用参数服务器执行以下最小化:
这是一个简单的最小化示例,将作为说明性例子。
Dask Actor 将作为参数服务器来保存模型。客户端将计算上述损失函数的梯度。
import numpy as np
from dask.distributed import Client
client = Client(processes=False)
class ParameterServer:
def __init__(self):
self.data = dict()
def put(self, key, value):
self.data[key] = value
def get(self, key):
return self.data[key]
def train(params, lr=0.1):
grad = 2 * (params - 1) # gradient of (params - 1)**2
new_params = params - lr * grad
return new_params
ps_future = client.submit(ParameterServer, actor=True)
ps = ps_future.result()
ps.put('parameters', np.random.default_rng().random(1000))
for k in range(20):
params = ps.get('parameters').result()
new_params = train(params)
ps.put('parameters', new_params)
print(new_params.mean())
# k=0: "0.5988202981316124"
# k=10: "0.9569236575164062"
这个例子是有效的,并且损失函数被最小化。上面的(简单)方程被最小化,因此每个 \(p_i\) 收敛到 1。如果需要,这个例子可以被调整为使用更复杂函数进行最小化的机器学习。
异步操作¶
所有需要与远程工作者通信的操作都是可等待的:
async def f():
future = client.submit(Counter, actor=True)
counter = await future # gather actor object locally
counter.increment() # send off a request asynchronously
await counter.increment() # or wait until it was received
n = await counter.n # attribute access also must be awaited
通常,所有触发计算的I/O操作(例如 to_parquet)应使用 compute=False 参数来完成,以避免异步阻塞:
await client.compute(ddf.to_parquet('/tmp/some.parquet', compute=False))
API¶
客户端
|
连接到 Dask 集群并提交计算 |
|
取消正在运行的未来任务 如果尚未运行,这将阻止未来任务被调度,如果已经运行,则删除它们。 |
|
在集群上计算 dask 集合 |
|
从分布式内存中收集未来 |
|
计算 dask 图 |
|
如果存在,从调度器获取命名数据集。 |
|
返回一个 concurrent.futures 执行器,用于在此客户端上提交任务 |
|
哪些键由哪些工作者持有 |
|
列出调度器上可用的命名数据集 |
|
将函数映射到参数序列上 |
|
每个工作节点上可用的线程/核心数 |
|
在集群上持久化dask集合 |
|
收集有关近期工作的统计分析信息 |
|
将命名数据集发布到调度器 |
|
在网络内重新平衡数据 |
|
在网络中设置期货的复制 |
|
重启所有工作进程。 |
|
在任务调度系统之外对所有工作节点运行一个函数 |
|
在调度器进程上运行一个函数 |
|
将数据分散到分布式内存中 |
关闭连接的调度器和工作节点 |
|
|
集群中工人的基本信息 |
|
向调度器提交一个函数应用 |
|
从调度器中移除命名数据集 |
|
将本地包上传到调度器和工作节点 |
|
存储每个未来数据的工人 |
未来
|
远程运行的计算 |
当未来完成时调用回调 |
|
|
取消运行此未来的请求 |
如果未来已被取消,则返回 True |
|
返回计算是否完成。 |
|
|
返回失败任务的异常 |
|
等待计算完成,将结果收集到本地进程。 |
|
返回失败任务的回溯信息 |
函数
|
按完成顺序返回期货 |
|
即使我们释放了未来,也要至少运行一次任务 |
|
在任务中获取客户端。 |
|
让此任务从工作线程池中分离 |
|
让这个线程重新加入 ThreadPoolExecutor |
|
等待所有/任意未来完成 |
|
一个内置 |
|
一个内置 |
- distributed.as_completed(futures=None, loop=None, with_results=False, raise_errors=True, *, timeout=None)[源代码]¶
按完成顺序返回期货
这将返回一个迭代器,按完成顺序生成输入的未来对象。在迭代器上调用
next将阻塞,直到下一个未来对象完成,无论顺序如何。此外,您还可以在计算过程中使用
.add方法向此对象添加更多功能- 参数
- futures: 未来集合
一个 Future 对象的列表,按照它们完成的顺序进行迭代
- with_results: bool (False)
是否等待并包含未来的结果;在这种情况下,
as_completed产生一个 (future, result) 的元组- raise_errors: bool (True)
当未来的结果引发异常时,我们是否应该抛出异常;仅在
with_results=True时影响行为。- timeout: int (可选)
如果调用
__next__()或__anext__()并且结果在从原始调用as_completed()起 timeout 秒后仍不可用,返回的迭代器将引发dask.distributed.TimeoutError。如果未指定 timeout 或为None,则等待时间没有限制。
示例
>>> x, y, z = client.map(inc, [1, 2, 3]) >>> for future in as_completed([x, y, z]): ... print(future.result()) 3 2 4
在计算过程中添加更多功能
>>> x, y, z = client.map(inc, [1, 2, 3]) >>> ac = as_completed([x, y, z]) >>> for future in ac: ... print(future.result()) ... if random.random() < 0.5: ... ac.add(c.submit(double, future)) 4 2 8 3 6 12 24
可选地等待结果也被收集完毕
>>> ac = as_completed([x, y, z], with_results=True) >>> for future, result in ac: ... print(result) 2 4 3
- distributed.fire_and_forget(obj)[源代码]¶
即使我们释放了未来,也要至少运行一次任务
在正常操作下,Dask 不会运行没有活动 future 的任务(这在许多情况下避免了不必要的工作)。然而,有时你只想启动一个任务,不跟踪它的 future,并期望它最终完成。你可以对 future 或 future 集合使用此函数,请求 Dask 即使没有活动客户端跟踪也要完成任务。
任务完成后,结果将不会保存在内存中(除非有活跃的未来),因此这仅对依赖副作用的任务有用。
- 参数
- obj未来, 列表, 字典, dask 集合
你希望至少运行一次的未来
示例
>>> fire_and_forget(client.submit(func, *args))
- distributed.get_client(address=None, timeout=None, resolve_address=True) Client[源代码]¶
在任务中获取客户端。
此客户端连接到与工作节点相同的调度器
- 参数
- 地址str, 可选
要连接的调度器的地址。默认为工作进程连接的调度器。
- 超时int 或 str
获取客户端的超时时间(以秒为单位)。默认为
distributed.comm.timeouts.connect配置值。- resolve_addressbool, 默认 True
是否将 address 解析为其规范形式。
- 返回
- 客户端
示例
>>> def f(): ... client = get_client(timeout="10s") ... futures = client.map(lambda x: x + 1, range(10)) # spawn many tasks ... results = client.gather(futures) ... return sum(results)
>>> future = client.submit(f) >>> future.result() 55
- distributed.secede()[源代码]¶
让此任务从工作线程池中分离
这为新任务打开了一个新的调度槽和一个新的线程。这使得客户端可以在这个节点上调度任务,这在等待其他作业完成时特别有用(例如,使用
client.gather)。示例
>>> def mytask(x): ... # do some work ... client = get_client() ... futures = client.map(...) # do some remote work ... secede() # while that work happens, remove ourself from the pool ... return client.gather(futures) # return gathered results
- distributed.rejoin()[源代码]¶
让这个线程重新加入 ThreadPoolExecutor
这将阻塞,直到执行器中有一个新的槽位打开。下一个完成任务的线程将离开池子,以允许这个线程加入。
参见
secede离开线程池
- distributed.wait(fs, timeout=None, return_when='ALL_COMPLETED')[源代码]¶
等待所有/任意未来完成
- 参数
- fsList[Future]
- 超时数字, 字符串, 可选
在引发
dask.distributed.TimeoutError之前的时间。可以是像"10 分钟"这样的字符串,或者是等待的秒数。- return_whenstr, 可选
ALL_COMPLETED 或 FIRST_COMPLETED 之一
- 返回
- 已完成、未完成的命名元组
- distributed.print(*args, sep: str | None = ' ', end: str | None = '\n', file: TextIO | None = None, flush: bool = False) None[源代码]¶
一个用于从工作节点向客户端进行远程打印的内置
print函数的替代品。如果在 dask 工作节点之外调用,其参数将直接传递给builtins.print()。如果由在工作节点上运行的代码调用,除了本地打印外,任何连接到管理此工作节点的调度器的客户端(可能远程)都将收到一个事件,指示它们将相同的输出打印到它们自己的标准输出或标准错误流中。例如,用户可以通过在提交的代码中包含对此print函数的调用来执行简单的远程计算调试,并在本地 Jupyter 笔记本或解释器会话中检查输出。所有参数的行为与
builtins.print()的参数相同,但file关键字参数除外,如果指定,必须是sys.stdout或sys.stderr;不允许使用任意类似文件的对象。所有非关键字参数都使用
str()转换为字符串并写入流中,由sep分隔并在end后跟随。sep和end都必须是字符串;它们也可以是None,这意味着使用默认值。如果没有给出对象,print()将只写入end。- 参数
- sepstr, 可选
插入在值之间的字符串,默认是一个空格。
- 结束str, 可选
在最后一个值之后附加的字符串,默认为换行符。
- 文件 :
sys.stdout或sys.stderr,可选sys.stdout 或 sys.stderr,可选 默认为当前的 sys.stdout。
- flushbool, 默认 False
是否强制刷新流。
示例
>>> from dask.distributed import Client, print >>> client = distributed.Client(...) >>> def worker_function(): ... print("Hello from worker!") >>> client.submit(worker_function) <Future: finished, type: NoneType, key: worker_function-...> Hello from worker!
- distributed.warn(message: str | Warning, category: type[Warning] | None = <class 'UserWarning'>, stacklevel: int = 1, source: typing.Any = None) None[源代码]¶
一个内置
warnings.warn()函数的即插即用替代品,用于从工作线程向客户端远程发出警告。如果从 dask 工作线程外部调用,其参数将直接传递给
warnings.warn()。如果由在工作线程上运行的代码调用,除了在本地发出警告外,任何连接(可能远程)到管理此工作线程的调度程序的客户端都将收到一个事件,指示它们发出相同的警告(受其本地过滤器等的影响)。在实现可能在工作线程上运行的计算时,用户可以调用此warn函数,以确保任何远程客户端会话将看到他们的警告,例如在 Jupyter 输出单元格中。虽然所有参数都被本地发出的警告所尊重(与
warnings.warn()中的含义相同),但stacklevel和source被客户端忽略,因为它们在客户端的线程中没有意义。示例
>>> from dask.distributed import Client, warn >>> client = Client() >>> def do_warn(): ... warn("A warning from a worker.") >>> client.submit(do_warn).result() /path/to/distributed/client.py:678: UserWarning: A warning from a worker.
- class distributed.Client(address=None, loop=None, timeout=_NoDefault.no_default, set_as_default=True, scheduler_file=None, security=None, asynchronous=False, name=None, heartbeat_interval=None, serializers=None, deserializers=None, extensions={'pubsub': <class 'distributed.pubsub.PubSubClientExtension'>}, direct_to_workers=None, connection_limit=512, **kwargs)[源代码]¶
连接到 Dask 集群并提交计算
客户端将用户连接到 Dask 集群。它提供了一个围绕函数和未来的异步用户界面。此类类似于
concurrent.futures中的执行器,但也允许在submit/map调用中使用Future对象。当实例化客户端时,默认情况下它会接管所有dask.compute和dask.persist调用。通常也可以在不指定调度器地址的情况下创建一个客户端,例如
Client()。在这种情况下,客户端会在后台创建一个LocalCluster并连接到它。任何额外的关键字参数在这种情况下都会从客户端传递给 LocalCluster。更多信息请参阅 LocalCluster 文档。- 参数
- 地址: 字符串, 或集群
这可以是
Scheduler服务器的地址,例如字符串'127.0.0.1:8786',或者是集群对象,例如LocalCluster()- 循环
事件循环
- timeout: int (默认为配置 ``distributed.comm.timeouts.connect``)
初始连接到调度器的超时时间
- set_as_default: bool (True)
将此客户端用作全局 dask 调度器
- scheduler_file: 字符串 (可选)
如果可用,指向包含调度器信息的文件的路径
- security: 安全或布尔值,可选
可选的安全信息。如果创建本地集群,也可以传入
True,在这种情况下,将自动创建临时的自签名凭证。- 异步: bool (默认值为 False)
如果在异步/等待函数或Tornado gen.coroutines中使用此客户端,请设置为True。否则,在正常使用时应保持为False。
- 名称: 字符串 (可选)
为客户端指定一个名称,该名称将包含在调度程序生成的与该客户端相关的日志中
- heartbeat_interval: int (可选)
心跳到调度器之间的时间(以毫秒为单位)
- 序列化器
在序列化对象时要使用的迭代方法。更多信息请参见 序列化。
- 反序列化器
反序列化对象时要使用的迭代方法。更多信息请参见 序列化。
- 扩展列表
扩展
- direct_to_workers: bool (可选)
是否直接连接到工作节点,或者要求调度器作为中介。
- 连接限制整数
连接池中同时维护的开放通信数量
- **kwargs:
如果你没有传递调度器地址,Client 将创建一个
LocalCluster对象,并传递任何额外的关键字参数。
示例
在初始化时提供集群的调度节点地址:
>>> client = Client('127.0.0.1:8786')
使用
submit方法将单个计算任务发送到集群>>> a = client.submit(add, 1, 2) >>> b = client.submit(add, 10, 20)
继续使用提交或映射结果来构建更大的计算
>>> c = client.submit(add, a, b)
使用
gather方法收集结果。>>> client.gather(c) 33
你也可以在没有参数的情况下调用 Client 来创建你自己的本地集群。
>>> client = Client() # makes your own local "cluster"
额外的关键词将直接传递给 LocalCluster
>>> client = Client(n_workers=2, threads_per_worker=4)
- property amm¶
方便访问器用于 Active Memory Manager
- as_current()[源代码]¶
线程本地、任务本地的上下文管理器,使得 Client.current 类方法返回 self。在此上下文管理器内反序列化的任何 Future 对象将自动附加到此 Client。
- benchmark_hardware() dict[源代码]¶
在工作人员上运行基准测试,以测试内存、磁盘和网络带宽
- 返回
- 结果: dict
一个映射字典,将名称“disk”、“memory”和“network”映射到将大小映射到带宽的字典。这些带宽是许多工作者在集群上运行计算的平均值。
- call_stack(futures=None, keys=None)[源代码]¶
所有相关键的主动运行调用栈
你可以通过在
futures=关键字中提供期货或集合,或在keys=关键字中提供显式键的列表来指定感兴趣的数据。如果两者都没有提供,则将返回所有调用栈。- 参数
- 未来列表(可选)
未来列表,默认为所有数据
- 键列表(可选)
关键名称列表,默认为所有数据
示例
>>> df = dd.read_parquet(...).persist() >>> client.call_stack(df) # call on collections
>>> client.call_stack() # Or call with no arguments for all activity
- cancel(futures, asynchronous=None, force=False, reason=None, msg=None)[源代码]¶
取消正在运行的未来任务 如果尚未运行,这将阻止未来任务被调度,如果已经运行,则删除它们。调用后,此结果及其所有依赖结果将不再可访问。
- 参数
- 未来List[Future]
期货列表
- 异步: bool
如果为真,客户端处于异步模式
- 强制布尔值 (False)
即使其他客户端希望,也要取消这个未来
- reason: str
取消期货的原因
- 消息str
将附加到已取消未来的消息
- close(timeout=_NoDefault.no_default)[源代码]¶
关闭此客户端
当你的Python会话结束时,客户端也会自动关闭。
如果你在没有参数的情况下启动了一个客户端,例如
Client(),那么这也会关闭同时启动的本地集群。- 参数
- 超时数字
在引发
dask.distributed.TimeoutError之前的秒数
- compute(collections, sync=False, optimize_graph=True, workers=None, allow_other_workers=False, resources=None, retries=0, priority=0, fifo_timeout='60s', actors=None, traverse=True, **kwargs)[源代码]¶
在集群上计算 dask 集合
- 参数
- 集合dask对象的可迭代对象或单个dask对象
像 dask.array 或 dataframe 或 dask.value 对象这样的集合
- 同步bool (可选)
如果为 False(默认)则返回 Futures,如果为 True 则返回具体值。
- 优化图布尔
是否优化底层图表
- 工人字符串或字符串的可迭代对象
一组可以执行计算的工作主机名。留空以默认使用所有工作节点(常见情况)
- 允许其他工作者bool (默认为 False)
与 workers 一起使用。指示计算是否可以在不属于 workers 集合的 worker 上执行。
- 重试int (默认为 0)
如果计算结果失败,允许的自动重试次数
- 优先级数字
任务的优先级可选。默认值为零。优先级越高,优先级越高。
- fifo_timeouttimedelta 字符串 (默认值为 ‘60s’)
允许的调用之间的时间量,以视为相同优先级
- 遍历bool (默认为 True)
默认情况下,dask 会遍历内置的 Python 集合,查找传递给
compute的 dask 对象。对于大型集合,这可能会很耗费资源。如果没有任何参数包含 dask 对象,请设置traverse=False以避免进行此遍历。- 资源dict (默认为 {})
定义了此映射任务的每个实例在工作节点上所需的 资源;例如
{'GPU': 2}。有关定义资源的详细信息,请参阅 工作节点资源。- 演员布尔值或字典(默认 None)
这些任务是否应在工作节点上作为有状态的执行者存在。可以在全局(True/False)或每个任务(
{'x': True, 'y': False})的基础上指定。有关更多详细信息,请参阅 Actors。- **kwargs
传递给图优化调用的选项
- 返回
- 如果输入是一个序列,则列出所有未来项;否则,列出单个未来项。
参见
Client.get普通的同步 dask.get 函数
示例
>>> from dask import delayed >>> from operator import add >>> x = delayed(add)(1, 2) >>> y = delayed(add)(x, x) >>> xx, yy = client.compute([x, y]) >>> xx <Future: status: finished, key: add-8f6e709446674bad78ea8aeecfee188e> >>> xx.result() 3 >>> yy.result() 6
也支持单个参数
>>> xx = client.compute(x)
- classmethod current(allow_global=True)[源代码]¶
在 as_client 的上下文中运行时,返回上下文本地的当前客户端。否则,返回最新初始化的客户端。如果没有客户端实例存在,则引发 ValueError。如果 allow_global 设置为 False,则在 as_client 上下文管理器之外运行时引发 ValueError。
- 参数
- allow_global布尔
如果为真,返回默认客户端
- 返回
- 客户端
当前客户端
- Raises
- ValueError
如果没有设置客户端,则会引发 ValueError。
参见
default_client
- property dashboard_link¶
链接到调度器的仪表板。
- 返回
- str
仪表板 URL。
示例
在默认的网络浏览器中打开仪表板:
>>> import webbrowser >>> from distributed import Client >>> client = Client() >>> webbrowser.open(client.dashboard_link)
- dump_cluster_state(filename: str = 'dask-cluster-dump', write_from_scheduler: bool | None = None, exclude: collections.abc.Collection[str] = ('run_spec',), format: Literal['msgpack', 'yaml'] = 'msgpack', **storage_options)[源代码]¶
提取整个集群状态的转储并持久化到磁盘或URL。这仅用于调试目的。
警告:调度器(以及客户端,如果本地写入转储)的内存使用量可能会很大。在大型或长时间运行的集群上,这可能需要几分钟时间。在处理转储时,调度器可能会无响应。
结果将存储在一个字典中:
{ "scheduler": {...}, # scheduler state "workers": { worker_addr: {...}, # worker state ... } "versions": { "scheduler": {...}, "workers": { worker_addr: {...}, ... } } }
- 参数
- 文件名:
要写入的路径或URL。适当的文件后缀(
.msgpack.gz或.yaml)将自动附加。必须是
fsspec.open()支持的路径(例如s3://my-bucket/cluster-dump或cluster-dumps/dump)。请参阅write_from_scheduler以控制转储是从调度器直接写入filename,还是通过网络发送回客户端,然后在本地写入。- write_from_scheduler:
如果为 None(默认),则根据
filename是否看起来像 URL 或本地路径来推断:如果文件名包含://``(例如 ``s3://my-bucket/cluster-dump),则为 True,否则为 False(例如local_dir/cluster-dump)。如果为真,直接从调度器将集群状态写入
filename。如果filename是本地路径,转储将被写入调度器文件系统上的该路径,因此如果调度器运行在临时硬件上,请小心。当调度器连接到网络文件系统或持久磁盘时,或者用于写入存储桶时,这很有用。如果为 False,则通过网络将集群状态从调度器传输回客户端,然后将其写入
filename。这对于大型转储来说效率较低,但在调度器无法访问任何持久存储时非常有用。- 排除:
一组属性名称,这些属性在转储时应被排除,例如排除代码、回溯、日志等。
默认排除
run_spec,这是序列化的用户代码。这通常不需要用于调试。要允许序列化此内容,请传递一个空元组。- 格式:
可以是
"msgpack"或"yaml"。如果使用 msgpack(默认),输出将以 msgpack 格式存储在 gzip 压缩文件中。阅读:
import gzip, msgpack with gzip.open("filename") as fd: state = msgpack.unpack(fd)
或者:
import yaml try: from yaml import CLoader as Loader except ImportError: from yaml import Loader with open("filename") as fd: state = yaml.load(fd, Loader=Loader)
- **存储选项:
在写入URL时,传递给
fsspec.open()的任何额外参数。
- forward_logging(logger_name=None, level=0)[源代码]¶
开始将给定的记录器(默认是根记录器)及其下的所有记录器从工作任务转发到客户端进程。每当命名记录器在工作端处理一个 LogRecord 时,该记录将被序列化,发送到客户端,并由客户端上同名的记录器处理。
请注意,工作端日志记录器只有在它们的级别设置得当的情况下才会处理 LogRecords,而客户端日志记录器只有在它自己的级别同样设置得当的情况下才会发出转发的 LogRecord。例如,如果你的提交任务将一条 DEBUG 消息记录到日志记录器“foo”,那么为了使
forward_logging()在你的客户端会话中发出该消息,你必须确保在工作进程和客户端进程中,日志记录器“foo”的级别都设置为 DEBUG(或更低)。- 参数
- logger_namestr, 可选
开始转发的记录器名称。
logging模块的分层命名系统的通常规则适用。例如,如果name是"foo",那么不仅"foo",还有"foo.bar"、"foo.baz"等也将被转发。如果name是None,这表示根记录器,因此 所有 记录器都将被转发。请注意,只有当记录器的级别足够处理给定的 LogRecord 时,记录器才会转发该 LogRecord。
- 级别str | int, 可选
可选地限制转发到此级别或更高级别的 LogRecords,即使转发日志记录器的自身级别较低。
示例
为了举例说明,假设我们像用户一样配置客户端日志:使用一个附加到根记录器的 StreamHandler,输出级别为 INFO,并使用简单的输出格式:
import logging import distributed import io, yaml TYPICAL_LOGGING_CONFIG = ''' version: 1 handlers: console: class : logging.StreamHandler formatter: default level : INFO formatters: default: format: '%(asctime)s %(levelname)-8s [worker %(worker)s] %(name)-15s %(message)s' datefmt: '%Y-%m-%d %H:%M:%S' root: handlers: - console ''' config = yaml.safe_load(io.StringIO(TYPICAL_LOGGING_CONFIG)) logging.config.dictConfig(config)
现在创建一个客户端,并开始将工作线程的根日志转发回我们的本地客户端进程。
>>> client = distributed.Client() >>> client.forward_logging() # forward the root logger at any handled level
然后提交一个在工作者上进行一些错误日志记录的任务。我们看到了来自客户端 StreamHandler 的输出。
>>> def do_error(): ... logging.getLogger("user.module").error("Hello error") ... return 42 >>> client.submit(do_error).result() 2022-11-09 03:43:25 ERROR [worker tcp://127.0.0.1:34783] user.module Hello error 42
注意,dask 还会向转发的 LogRecord 添加一个
"worker"属性,我们的自定义格式化程序会使用它。这对于准确识别哪个工作节点记录了错误非常有用。值得强调的一点细微差别:尽管我们的客户端根日志记录器配置了INFO级别,但工作端根日志记录器仍然保持其默认的ERROR级别,因为我们没有在工作端进行任何显式的日志记录配置。因此,工作端的INFO日志将*不会*被转发,因为它们从一开始就没有被处理。
>>> def do_info_1(): ... # no output on the client side ... logging.getLogger("user.module").info("Hello info the first time") ... return 84 >>> client.submit(do_info_1).result() 84
在处理并将信息消息转发到客户端之前,必须将客户端日志记录器的级别设置为INFO。换句话说,客户端转发日志记录的“有效”级别是每个日志记录器的客户端和工作者端级别的最大值。
>>> def do_info_2(): ... logger = logging.getLogger("user.module") ... logger.setLevel(logging.INFO) ... # now produces output on the client side ... logger.info("Hello info the second time") ... return 84 >>> client.submit(do_info_2).result() 2022-11-09 03:57:39 INFO [worker tcp://127.0.0.1:42815] user.module Hello info the second time 84
- gather(futures, errors='raise', direct=None, asynchronous=None)[源代码]¶
从分布式内存中收集未来
接受一个未来、未来的嵌套容器、迭代器或队列。返回类型将与输入类型匹配。
- 参数
- 未来未来集合
这可能是一个可能嵌套的 Future 对象集合。集合可以是列表、集合或字典。
- 错误字符串
如果一个 future 出错,我们应该 ‘raise’ 或者 ‘skip’,即抛出错误或跳过其在输出集合中的包含。
- 直接布尔
是否直接连接到工作节点,或者请求调度器作为中介。这也可以在创建客户端时设置。
- 异步: bool
如果为真,客户端处于异步模式
- 返回
- 结果:与输入类型相同的集合,但现在具有
- 收集的结果而不是未来
参见
Client.scatter将数据发送到集群
示例
>>> from operator import add >>> c = Client('127.0.0.1:8787') >>> x = c.submit(add, 1, 2) >>> c.gather(x) 3 >>> c.gather([x, [x], x]) # support lists and dicts [3, [3], 3]
- get(dsk, keys, workers=None, allow_other_workers=None, resources=None, sync=True, asynchronous=None, direct=None, retries=None, priority=0, fifo_timeout='60s', actors=None, **kwargs)[源代码]¶
计算 dask 图
- 参数
- dskdict
- 键对象,或对象的嵌套列表
- 工人字符串或字符串的可迭代对象
一组工作地址或主机名,可以在其上执行计算。留空以默认使用所有工作节点(常见情况)
- 允许其他工作者bool (默认为 False)
与
workers一起使用。指示计算是否可以在不属于 workers 集合的 worker 上执行。- 资源dict (默认为 {})
定义了此映射任务的每个实例在工作节点上所需的
资源;例如{'GPU': 2}。有关定义资源的详细信息,请参阅 工作节点资源。- 同步bool (可选)
如果为 False 则返回 Futures,如果为 True 则返回具体值(默认)。
- 异步: bool
如果为真,客户端处于异步模式
- 直接布尔
是否直接连接到工作节点,或者请求调度器作为中介。这也可以在创建客户端时设置。
- 重试int (默认为 0)
如果计算结果失败,允许的自动重试次数
- 优先级数字
任务的优先级可选。默认值为零。优先级越高,优先级越高。
- fifo_timeouttimedelta 字符串 (默认值为 ‘60s’)
允许的调用之间的时间量,以视为相同优先级
- 演员布尔值或字典(默认 None)
这些任务是否应在工作节点上作为有状态的执行者存在。可以在全局(True/False)或每个任务(
{'x': True, 'y': False})的基础上指定。有关更多详细信息,请参阅 Actors。
- 返回
- 结果
如果 ‘sync’ 为 True,则返回结果。否则,返回已知数据。如果 ‘sync’ 为 False,则返回已知数据。否则,返回结果。
参见
Client.compute计算异步集合
示例
>>> from operator import add >>> c = Client('127.0.0.1:8787') >>> c.get({'x': (add, 1, 2)}, 'x') 3
- get_dataset(name, default=_NoDefault.no_default, **kwargs)[源代码]¶
如果存在,从调度器获取命名数据集。如果不存在,返回默认值或引发 KeyError。
- 参数
- 名称str
要检索的数据集名称
- 默认str
可选,默认不设置 如果设置,当名称不存在时不会引发 KeyError,而是返回此默认值
- kwargsdict
传递给 _get_dataset 的额外关键字参数
- 返回
- 调度器的数据集,如果存在
- get_events(topic: str | None = None)[源代码]¶
获取结构化主题日志
- 参数
- 主题str, 可选
要检索事件的主题日志名称。如果没有提供
topic,则将返回所有主题的日志。
- get_executor(**kwargs)[源代码]¶
返回一个 concurrent.futures 执行器,用于在此客户端上提交任务
- 参数
- **kwargs
任何与 submit() 或 map() 兼容的参数,例如 workers 或 resources。
- 返回
- ClientExecutor
一个完全兼容 concurrent.futures API 的 Executor 对象。
- get_metadata(keys, default=_NoDefault.no_default)[源代码]¶
从调度器获取任意元数据
参见 set_metadata 以获取包含示例的完整文档字符串
- 参数
- 键键或列表
访问键。如果是列表,则获取嵌套集合中的内容。
- 默认可选的
如果键不存在,则返回此值。如果未提供,则当键不存在时会引发 KeyError。
- get_scheduler_logs(n=None)[源代码]¶
从调度器获取日志
- 参数
- n整数
要检索的日志数量。默认最大值为10000,可通过
distributed.admin.log-length配置值进行配置。
- 返回
- 按倒序记录日志(最新在前)
- get_task_stream(start=None, stop=None, count=None, plot=False, filename='task-stream.html', bokeh_resources=None)[源代码]¶
从调度器获取任务流数据
这收集了仪表板上诊断“任务流”图表中呈现的数据。它包括特定时间段内每个任务的开始、停止、传输和反序列化时间。
请注意,任务流诊断默认不运行。您可能希望在工作开始前调用此函数一次,以确保记录开始,然后在完成后再次调用。
- 参数
- 开始数字或字符串
当你想要开始记录时,如果是一个数字,它应该是调用 time() 的结果;如果是一个字符串,那么它应该是一个相对于现在的时间差,比如 ‘60s’ 或 ‘500 ms’。
- 停止数字或字符串
当你想要停止录制时
- 计数整数
所需记录的数量,如果同时指定了开始和结束,则忽略此项
- 绘图布尔值, 字符串
如果为真,则还返回一个 Bokeh 图形。如果 plot == ‘save’,则将图形保存到文件中。
- 文件名str (可选)
如果你设置
plot='save',保存的文件名- bokeh_resourcesbokeh.resources.Resources (可选)
指定资源组件是 INLINE 还是 CDN
- 返回
- L: List[Dict]
参见
get_task_stream此方法的上下文管理器版本
示例
>>> client.get_task_stream() # prime plugin if not already connected >>> x.compute() # do some work >>> client.get_task_stream() [{'task': ..., 'type': ..., 'thread': ..., ...}]
传递
plot=True或plot='save'关键字以返回一个 Bokeh 图形>>> data, figure = client.get_task_stream(plot='save', filename='myfile.html')
或者考虑上下文管理器
>>> from dask.distributed import get_task_stream >>> with get_task_stream() as ts: ... x.compute() >>> ts.data [...]
- get_versions(check: bool = False, packages: collections.abc.Sequence[str] | None = None) distributed.client.VersionsDict | collections.abc.Coroutine[Any, Any, distributed.client.VersionsDict][源代码]¶
返回调度器、所有工作节点以及我自己的版本信息
- 参数
- 检查
如果所有必需和可选的包不匹配,则引发 ValueError
- 包
额外需要检查的包名
示例
>>> c.get_versions()
>>> c.get_versions(packages=['sklearn', 'geopandas'])
- get_worker_logs(n=None, workers=None, nanny=False)[源代码]¶
从工作节点获取日志
- 参数
- n整数
要检索的日志数量。默认最大值为10000,可通过
distributed.admin.log-length配置值进行配置。- 工人可迭代对象
要检索的工人地址列表。默认情况下获取所有工人。
- 保姆bool, 默认 False
是否从工作进程(False)或保姆进程(True)获取日志。如果指定,workers 中的地址仍应为工作进程地址,而不是保姆进程地址。
- 返回
- 字典映射工作地址到日志。
- 日志以倒序返回(最新的在前)
- has_what(workers=None, **kwargs)[源代码]¶
哪些键由哪些工作者持有
这将返回每个工作进程内存中保存的数据的键。
- 参数
- 工人列表(可选)
工作地址列表,默认为所有地址
- **kwargsdict
远程函数的可选关键字参数
示例
>>> x, y, z = c.map(inc, [1, 2, 3]) >>> wait([x, y, z]) >>> c.has_what() {'192.168.1.141:46784': ['inc-1c8dd6be1c21646c71f76c16d09304ea', 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b', 'inc-1e297fc27658d7b67b3a758f16bcf47a']}
- log_event(topic: str | collections.abc.Collection[str], msg: Any)[源代码]¶
在给定主题下记录事件
- 参数
- 主题str, list[str]
记录事件的主题名称。要将同一事件记录在多个主题下,请传递一个主题名称列表。
- 消息
事件消息以记录。注意这必须是 msgpack 可序列化的。
示例
>>> from time import time >>> client.log_event("current-time", time())
- map(func: Callable, *iterables: collections.abc.Collection, key: str | list | None = None, workers: str | collections.abc.Iterable[str] | None = None, retries: int | None = None, resources: dict[str, Any] | None = None, priority: int = 0, allow_other_workers: bool = False, fifo_timeout: str = '100 ms', actor: bool = False, actors: bool = False, pure: bool = True, batch_size=None, **kwargs)[源代码]¶
将函数映射到参数序列上
参数可以是普通对象或 Futures
- 参数
- 函数可调用
要安排执行的可调用对象。如果
func返回一个协程,它将在工作线程的主事件循环中运行。否则,func将在工作线程的任务执行器池中运行(更多信息请参见Worker.executors)。- 可迭代对象可迭代对象
类似列表的对象用于映射。它们应该具有相同的长度。
- 关键str, list
如果为字符串,则为任务名称添加前缀。如果为列表,则为显式名称。
- 工人字符串或字符串的可迭代对象
一组可以执行计算的工作主机名。留空以默认使用所有工作节点(常见情况)
- 重试int (默认为 0)
任务失败时允许的自动重试次数
- 资源dict (默认为 {})
定义了此映射任务的每个实例在工作节点上所需的 资源;例如
{'GPU': 2}。有关定义资源的详细信息,请参阅 工作节点资源。- 优先级数字
任务的优先级可选。默认值为零。优先级越高,优先级越高。
- 允许其他工作者bool (默认为 False)
与 workers 一起使用。指示计算是否可以在不属于 workers 集合的 worker 上执行。
- fifo_timeoutstr timedelta (默认 ‘100ms’)
允许的调用之间的时间量,以视为相同优先级
- 演员bool (默认 False)
这些任务是否应在工作节点上作为有状态的执行者存在。有关更多详细信息,请参阅 执行者。
- 演员bool (默认 False)
actor 的别名
- 纯bool (默认为 True)
函数是否为纯函数。对于像
np.random.random这样的非纯函数,设置pure=False。请注意,如果actor和pure关键字参数都设置为 True,那么pure的值将被重置为 False,因为 actor 是有状态的。更多详情请参见 纯函数。- batch_sizeint, 可选 (默认: 只有一个批次,其大小为整个可迭代对象)
以(最多)``batch_size`` 为批次向调度器提交任务。批次大小的权衡在于,大批次可以避免更多的每批次开销,但过大的批次可能会花费很长时间提交,并不合理地延迟集群开始处理的时间。
- **kwargsdict
发送给函数的额外关键字参数。较大的值将显式包含在任务图中。
- 返回
- 根据类型的不同,可以是 future 的列表、迭代器或队列。
- 输入。
参见
Client.submit提交一个单一函数
注释
当前的任务图解析实现会搜索
key的出现,并将其替换为相应的Future结果。如果这些字符串作为任务的参数传递,并且这些字符串与集群上已存在的某些key匹配,这可能导致不希望的字符串替换。为了避免这种情况,如果手动设置key,则需要使用唯一值。请参阅 https://github.com/dask/dask/issues/9969 以跟踪此问题的解决进度。示例
>>> L = client.map(func, sequence)
- nbytes(keys=None, summary=True, **kwargs)[源代码]¶
集群中每个键占用的字节数
这是通过
sys.getsizeof测量的,可能无法准确反映真实成本。- 参数
- 键列表(可选)
键的列表,默认为所有键
- 摘要布尔值,(可选)
将键汇总为键类型
- **kwargsdict
远程函数的可选关键字参数
示例
>>> x, y, z = c.map(inc, [1, 2, 3]) >>> c.nbytes(summary=False) {'inc-1c8dd6be1c21646c71f76c16d09304ea': 28, 'inc-1e297fc27658d7b67b3a758f16bcf47a': 28, 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b': 28}
>>> c.nbytes(summary=True) {'inc': 84}
- ncores(workers=None, **kwargs)¶
每个工作节点上可用的线程/核心数
- 参数
- 工人列表(可选)
我们特别关心的工作人员列表。留空以接收所有工作人员的信息。
- **kwargsdict
远程函数的可选关键字参数
示例
>>> c.nthreads() {'192.168.1.141:46784': 8, '192.167.1.142:47548': 8, '192.167.1.143:47329': 8, '192.167.1.144:37297': 8}
- normalize_collection(collection)[源代码]¶
如果存在,用已存在的期货替换集合的任务
这将对集合任务图中的任务进行规范化,使其与调度器中的已知未来任务相匹配。它返回一个包含重叠未来任务的任务图的集合副本。
- 参数
- 集合dask 对象
类似 dask.array 或 dataframe 或 dask.value 对象的集合
- 返回
- 集合dask 对象
集合,其任务被任何现有的期货所替代。
参见
Client.persist触发集合任务的计算
示例
>>> len(x.__dask_graph__()) # x is a dask collection with 100 tasks 100 >>> set(client.futures).intersection(x.__dask_graph__()) # some overlap exists 10
>>> x = client.normalize_collection(x) >>> len(x.__dask_graph__()) # smaller computational graph 20
- nthreads(workers=None, **kwargs)[源代码]¶
每个工作节点上可用的线程/核心数
- 参数
- 工人列表(可选)
我们特别关心的工作人员列表。留空以接收所有工作人员的信息。
- **kwargsdict
远程函数的可选关键字参数
示例
>>> c.nthreads() {'192.168.1.141:46784': 8, '192.167.1.142:47548': 8, '192.167.1.143:47329': 8, '192.167.1.144:37297': 8}
- persist(collections, optimize_graph=True, workers=None, allow_other_workers=None, resources=None, retries=None, priority=0, fifo_timeout='60s', actors=None, **kwargs)[源代码]¶
在集群上持久化dask集合
在集群后台启动集合的计算。提供一个新的 dask 集合,它在语义上与之前的集合相同,但现在基于当前正在执行的 futures。
- 参数
- 集合序列或单个 dask 对象
像 dask.array 或 dataframe 或 dask.value 对象这样的集合
- 优化图布尔
是否优化底层图表
- 工人字符串或字符串的可迭代对象
一组可以执行计算的工作主机名。留空以默认使用所有工作节点(常见情况)
- 允许其他工作者bool (默认为 False)
与 workers 一起使用。指示计算是否可以在不属于 workers 集合的 worker 上执行。
- 重试int (默认为 0)
如果计算结果失败,允许的自动重试次数
- 优先级数字
任务的优先级可选。默认值为零。优先级越高,优先级越高。
- fifo_timeouttimedelta 字符串 (默认值为 ‘60s’)
允许的调用之间的时间量,以视为相同优先级
- 资源dict (默认为 {})
定义了此映射任务的每个实例在工作节点上所需的 资源;例如
{'GPU': 2}。有关定义资源的详细信息,请参阅 工作节点资源。- 演员布尔值或字典(默认 None)
这些任务是否应在工作节点上作为有状态的执行者存在。可以在全局(True/False)或每个任务(
{'x': True, 'y': False})的基础上指定。有关更多详细信息,请参阅 Actors。- **kwargs
传递给图优化调用的选项
- 返回
- 集合列表,或单一集合,取决于输入的类型。
示例
>>> xx = client.persist(x) >>> xx, yy = client.persist([x, y])
- processing(workers=None)[源代码]¶
每个工作节点上当前运行的任务
- 参数
- 工人列表(可选)
工作地址列表,默认为所有地址
示例
>>> x, y, z = c.map(inc, [1, 2, 3]) >>> c.processing() {'192.168.1.141:46784': ['inc-1c8dd6be1c21646c71f76c16d09304ea', 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b', 'inc-1e297fc27658d7b67b3a758f16bcf47a']}
- profile(key=None, start=None, stop=None, workers=None, merge_workers=True, plot=False, filename=None, server=False, scheduler=False)[源代码]¶
收集有关近期工作的统计分析信息
- 参数
- 关键str
选择的关键前缀,这通常是一个函数名,如 ‘inc’。留空为 None 以收集所有数据
- 开始时间
- 停止时间
- 工人列表
限制个人资料信息的员工列表
- 服务器布尔
如果为真,返回工作者管理线程的配置文件,而不是工作者线程的配置文件。这在分析Dask本身而不是用户代码时很有用。
- 调度器布尔
如果为真,从调度器的管理线程返回配置文件信息,而不是从工作线程返回。这在分析 Dask 的调度本身时很有用。
- 绘图布尔值或字符串
是否返回绘图对象
- 文件名str
保存图形的文件名
示例
>>> client.profile() # call on collections >>> client.profile(filename='dask-profile.html') # save to html file
- publish_dataset(*args, **kwargs)[源代码]¶
将命名数据集发布到调度器
这会在调度器上存储一个对 dask 集合或未来列表的命名引用。这些引用对其他客户端可用,它们可以通过
get_dataset下载集合或未来。数据集不会立即计算。在发布数据集之前,您可能希望调用
Client.persist。- 参数
- 参数要发布为名称的对象列表
- kwargsdict
命名集合以在调度器上发布
- 返回
- 无
示例
发布客户端:
>>> df = dd.read_csv('s3://...') >>> df = c.persist(df) >>> c.publish_dataset(my_dataset=df)
替代调用 >>> c.publish_dataset(df, name=’my_dataset’)
接收客户端:
>>> c.list_datasets() ['my_dataset'] >>> df2 = c.get_dataset('my_dataset')
- rebalance(futures=None, workers=None, **kwargs)[源代码]¶
在网络内重新平衡数据
在工作者之间移动数据以大致平衡内存负担。这根据关键字参数的影响,可能只影响部分键/工作者或整个网络。
有关算法的详细信息和配置选项,请参阅匹配的调度器端方法
rebalance()。警告
此操作通常未经过调度器正常操作的充分测试。不建议在等待计算时使用它。
- 参数
- 未来列表,可选
要平衡的未来列表,默认为所有数据
- 工人列表,可选
要在其上进行负载均衡的工人列表,默认为所有工人
- **kwargsdict
函数的可选关键字参数
- register_plugin(plugin: distributed.diagnostics.plugin.NannyPlugin | distributed.diagnostics.plugin.SchedulerPlugin | distributed.diagnostics.plugin.WorkerPlugin, name: str | None = None, idempotent: bool | None = None)[源代码]¶
注册一个插件。
请参阅 https://distributed.readthedocs.io/en/latest/plugins.html
- 参数
- 插件
要注册的保姆、调度器或工作插件。
- 名称
插件的名称;如果为 None,则从插件实例中获取名称,如果不存在则自动生成。
- 幂等
如果给定名称的插件已经存在,请勿重新注册。如果为 None,则使用
plugin.idempotent的定义值,否则为 False。
- register_scheduler_plugin(plugin: distributed.diagnostics.plugin.SchedulerPlugin, name: str | None = None, idempotent: bool | None = None)[源代码]¶
注册一个调度器插件。
2023.9.2 版后已移除: 请使用
Client.register_plugin()代替。参见 https://distributed.readthedocs.io/en/latest/plugins.html#scheduler-plugins
- 参数
- 插件
SchedulerPlugin 传递给调度器的 SchedulerPlugin 实例。
- 名称str
插件的名称;如果为 None,则从插件实例中获取名称,如果不存在则自动生成。
- 幂等布尔
如果已存在指定名称的插件,请勿重新注册。
- 插件
- register_worker_callbacks(setup=None)[源代码]¶
为所有当前和未来的工作线程注册一个设置回调函数。
这为集群中的工作线程注册了一个新的设置函数。该函数将立即在所有当前连接的工作线程上运行。它还将在未来添加的任何工作线程连接时运行。可以注册多个设置函数 - 这些函数将按照它们被添加的顺序调用。
如果函数接受一个名为
dask_worker的输入参数,那么该变量将被填充为工作节点本身。- 参数
- 设置callable(dask_worker: Worker) -> None
在所有工作节点上注册并运行的函数
- register_worker_plugin(plugin: distributed.diagnostics.plugin.NannyPlugin | distributed.diagnostics.plugin.WorkerPlugin, name: str | None = None, nanny: bool | None = None)[源代码]¶
为所有当前和未来的工作器注册一个生命周期工作器插件。
2023.9.2 版后已移除: 请使用
Client.register_plugin()代替。这注册了一个新对象来处理此集群中工作者的设置、任务状态转换和拆卸。插件将在所有当前连接的工作者上实例化自身。它还将在未来连接的任何工作者上运行。
该插件可能包含方法
setup、teardown、transition和release_key。请参阅dask.distributed.WorkerPlugin类或下面的示例以了解接口和文档字符串。它必须可以通过 pickle 或 cloudpickle 模块进行序列化。如果插件有一个
name属性,或者使用了name=关键字,那么这将控制幂等性。如果已经注册了具有该名称的插件,那么它将被移除并替换为新的插件。对于插件的替代方案,您可能还希望了解预加载脚本。
- 参数
- 插件WorkerPlugin 或 NannyPlugin
要注册的 WorkerPlugin 或 NannyPlugin 实例。
- 名称str, 可选
插件的名称。使用相同名称注册插件将不会有任何效果。如果插件没有名称属性,则使用随机名称。
- 保姆bool, 可选
是否将插件注册到工作节点或保姆节点。
参见
distributed.WorkerPluginunregister_worker_plugin
示例
>>> class MyPlugin(WorkerPlugin): ... def __init__(self, *args, **kwargs): ... pass # the constructor is up to you ... def setup(self, worker: dask.distributed.Worker): ... pass ... def teardown(self, worker: dask.distributed.Worker): ... pass ... def transition(self, key: str, start: str, finish: str, ... **kwargs): ... pass ... def release_key(self, key: str, state: str, cause: str | None, reason: None, report: bool): ... pass
>>> plugin = MyPlugin(1, 2, 3) >>> client.register_plugin(plugin)
你可以通过
get_worker函数访问插件>>> client.register_plugin(other_plugin, name='my-plugin') >>> def f(): ... worker = get_worker() ... plugin = worker.plugins['my-plugin'] ... return plugin.my_state
>>> future = client.run(f)
- replicate(futures, n=None, workers=None, branching_factor=2, **kwargs)[源代码]¶
在网络中设置期货的复制
将数据复制到多个工作节点。这有助于广播频繁访问的数据,并可以提高系统的韧性。
这会在网络中的每一份数据上单独执行数据的树形复制。此操作会阻塞直到完成。它不保证数据会复制到未来的工作节点。
备注
此方法与 Active Memory Manager 的 ReduceReplicas 策略不兼容。如果您希望使用它,您必须首先禁用该策略或完全禁用 AMM。
- 参数
- 未来未来列表
我们希望复制的未来
- nint, 可选
在集群上复制数据的进程数。默认为全部。
- 工人worker 地址列表
我们希望限制复制的工作者。默认为所有。
- branching_factorint, 可选
每个生成中可以复制数据的工人数
- **kwargsdict
远程函数的可选关键字参数
示例
>>> x = c.submit(func, *args) >>> c.replicate([x]) # send to all workers >>> c.replicate([x], n=3) # send to three workers >>> c.replicate([x], workers=['alice', 'bob']) # send to specific >>> c.replicate([x], n=1, workers=['alice', 'bob']) # send to one of specific workers >>> c.replicate([x], n=1) # reduce replications
- restart(timeout: typing.Union[str, int, float, typing.Literal[<no_default>]] = _NoDefault.no_default, wait_for_workers: bool = True)[源代码]¶
重启所有工作进程。重置本地状态。可选地等待工作进程返回。
没有保姆的工人会被关闭,希望外部部署系统会重新启动它们。因此,如果不使用保姆,并且您的部署系统不会自动重启工人,
restart将只会关闭所有工人,然后超时!在
restart之后,所有连接的工作者都是新的,无论是否引发了TimeoutError。任何未能及时关闭的工作者都会被移除,并且可能在将来自行关闭,也可能不会。- 参数
- 超时:
如果
wait_for_workers为 True,等待工作线程关闭并返回的时间长度,否则仅等待工作线程关闭的时间长度。如果超过此时间,则引发asyncio.TimeoutError。- wait_for_workers:
是否等待所有工作进程重新连接,还是仅等待它们关闭(默认 True)。使用
restart(wait_for_workers=False)结合Client.wait_for_workers()可以实现对等待多少工作进程的精细控制。
- restart_workers(workers: list[str], timeout: typing.Union[str, int, float, typing.Literal[<no_default>]] = _NoDefault.no_default, raise_for_error: bool = True)[源代码]¶
重启指定的一组工作进程
备注
只有被
distributed.Nanny监控的工人才能被重启。更多详情请参见Nanny.restart。- 参数
- 工人list[str]
需要重启的工作者。这可以是一个工作者地址、名称的列表,或者两者都有。
- 超时int | float | None
等待的秒数
- raise_for_error: bool (默认 True)
如果在
timeout时间内重启工作进程未完成,是否引发TimeoutError,或者由重启工作进程引起的其他异常。
- 返回
- dict[str, “OK” | “已移除” | “已超时”]
工作进程和重启状态的映射,键将与通过
workers传入的原始值匹配。
注释
此方法与
Client.restart()不同之处在于,此方法仅重启指定的工人集合,而Client.restart将重启所有工人并重置集群上的本地状态(例如,所有键都被释放)。此外,这种方法在处理工作进程重启时正在执行的任务时表现不佳。这些任务可能会失败或其可疑计数增加。
示例
你可以使用以下方法获取有关活跃工作者的信息:
>>> workers = client.scheduler_info()['workers']
从该列表中,您可能希望选择一些工作进程进行重启
>>> client.restart_workers(workers=['tcp://address:port', ...])
- retire_workers(workers: list[str] | None = None, close_workers: bool = True, **kwargs)[源代码]¶
在调度器上退役某些工作者
查看
distributed.Scheduler.retire_workers()获取完整的文档字符串。- 参数
- 工人
- 关闭工作者
- **kwargsdict
远程函数的可选关键字参数
参见
dask.distributed.Scheduler.retire_workers
示例
你可以使用以下方法获取有关活跃工作者的信息:
>>> workers = client.scheduler_info()['workers']
从该列表中,您可能希望选择一些工人来关闭
>>> client.retire_workers(workers=['tcp://address:port', ...])
- run(function, *args, workers: list[str] | None = None, wait: bool = True, nanny: bool = False, on_error: Literal['raise', 'return', 'ignore'] = 'raise', **kwargs)[源代码]¶
在任务调度系统之外对所有工作节点运行一个函数
此方法立即在所有当前已知的worker上调用一个函数,阻塞直到这些结果返回,并异步地以worker地址为键的字典形式返回结果。此方法通常用于收集诊断信息或安装库等副作用。
如果你的函数接受一个名为
dask_worker的输入参数,那么该变量将被填充为工作节点本身。- 参数
- 函数可调用
要运行的函数
- *args元组
远程函数的可选参数
- **kwargsdict
远程函数的可选关键字参数
- 工人列表
运行函数的工人。默认为所有已知的工人。
- 等待布尔值(可选)
如果函数是异步的,是否等待该函数完成。
- 保姆bool, 默认 False
是否在保姆进程上运行
function。默认情况下,该函数在工作进程上运行。如果指定,workers中的地址仍应为工作进程地址,而不是保姆进程地址。- on_error: “raise” | “return” | “ignore”
如果在工作节点上函数引发错误:
- raise
(默认) 在客户端重新引发异常。其他工作者的输出将会丢失。
- 返回
返回异常对象而不是工作函数的输出
- 忽略
忽略异常并将工作者从结果字典中移除
示例
>>> c.run(os.getpid) {'192.168.0.100:9000': 1234, '192.168.0.101:9000': 4321, '192.168.0.102:9000': 5555}
使用
workers=关键字参数将计算限制在特定的工人上。>>> c.run(os.getpid, workers=['192.168.0.100:9000', ... '192.168.0.101:9000']) {'192.168.0.100:9000': 1234, '192.168.0.101:9000': 4321}
>>> def get_status(dask_worker): ... return dask_worker.status
>>> c.run(get_status) {'192.168.0.100:9000': 'running', '192.168.0.101:9000': 'running}
在后台运行异步函数:
>>> async def print_state(dask_worker): ... while True: ... print(dask_worker.status) ... await asyncio.sleep(1)
>>> c.run(print_state, wait=False)
- run_on_scheduler(function, *args, **kwargs)[源代码]¶
在调度器进程上运行一个函数
这通常用于实时调试。该函数应接受一个关键字参数
dask_scheduler=,该参数将被赋予调度器对象本身。- 参数
- 函数可调用
在调度器进程上运行的函数
- *args元组
函数的可选参数
- **kwargsdict
函数的可选关键字参数
参见
Client.run在所有工作节点上运行一个函数
示例
>>> def get_number_of_tasks(dask_scheduler=None): ... return len(dask_scheduler.tasks)
>>> client.run_on_scheduler(get_number_of_tasks) 100
在后台运行异步函数:
>>> async def print_state(dask_scheduler): ... while True: ... print(dask_scheduler.status) ... await asyncio.sleep(1)
>>> c.run(print_state, wait=False)
- scatter(data, workers=None, broadcast=False, direct=None, hash=True, timeout=_NoDefault.no_default, asynchronous=None)[源代码]¶
将数据分散到分布式内存中
这将从本地客户端进程将数据移动到分布式调度器的工人中。请注意,通常更好的做法是向工人提交作业,让它们加载数据,而不是在本地加载数据,然后将其分散到它们中。
- 参数
- 数据列表、字典或对象
要分散给工人的数据。输出类型与输入类型匹配。
- 工人元组列表(可选)
可选地限制数据位置。将工作者指定为主机名/端口对,例如
('127.0.0.1', 8787)。- 广播bool (默认为 False)
是否将每个数据元素发送给所有工作节点。默认情况下,我们根据核心数量进行轮询。
备注
将此标志设置为 True 与 Active Memory Manager 的 ReduceReplicas 策略不兼容。如果您希望使用它,您必须首先禁用该策略或完全禁用 AMM。
- 直接bool (默认为自动检查)
是否直接连接到工作节点,或者请求调度器作为中介。这也可以在创建客户端时设置。
- 哈希bool (可选)
是否对数据进行哈希以确定键。如果为 False,则使用随机键。
- 超时数字,可选
在引发
dask.distributed.TimeoutError之前的秒数- 异步: bool
如果为真,客户端处于异步模式
- 返回
- 与输入类型匹配的 futures 列表、字典、迭代器或队列。
参见
Client.gather将数据收集回本地进程
注释
散布字典使用
dict键来创建Future键。当前的任务图解析实现会搜索key的出现,并用相应的Future结果替换它。如果这些字符串作为参数传递给任务,并且这些字符串与集群上已存在的某些key匹配,这可能会导致不需要的字符串替换。为了避免这种情况,如果手动设置key,则需要使用唯一值。请参阅 https://github.com/dask/dask/issues/9969 以跟踪此问题的解决进度。示例
>>> c = Client('127.0.0.1:8787') >>> c.scatter(1) <Future: status: finished, key: c0a8a20f903a4915b94db8de3ea63195>
>>> c.scatter([1, 2, 3]) [<Future: status: finished, key: c0a8a20f903a4915b94db8de3ea63195>, <Future: status: finished, key: 58e78e1b34eb49a68c65b54815d1b158>, <Future: status: finished, key: d3395e15f605bc35ab1bac6341a285e2>]
>>> c.scatter({'x': 1, 'y': 2, 'z': 3}) {'x': <Future: status: finished, key: x>, 'y': <Future: status: finished, key: y>, 'z': <Future: status: finished, key: z>}
将数据位置限制在部分工作节点
>>> c.scatter([1, 2, 3], workers=[('hostname', 8788)])
向所有工作节点广播数据
>>> [future] = c.scatter([element], broadcast=True)
使用客户端期货接口将分散的数据发送到并行化函数
>>> data = c.scatter(data, broadcast=True) >>> res = [c.submit(func, data, i) for i in range(100)]
- scheduler_info(**kwargs)[源代码]¶
集群中工人的基本信息
- 参数
- **kwargsdict
远程函数的可选关键字参数
示例
>>> c.scheduler_info() {'id': '2de2b6da-69ee-11e6-ab6a-e82aea155996', 'services': {}, 'type': 'Scheduler', 'workers': {'127.0.0.1:40575': {'active': 0, 'last-seen': 1472038237.4845693, 'name': '127.0.0.1:40575', 'services': {}, 'stored': 0, 'time-delay': 0.0061032772064208984}}}
- set_metadata(key, value)[源代码]¶
在调度器中设置任意元数据
这允许你在中央调度器进程上存储少量数据,用于管理目的。数据应为 msgpack 可序列化(整数、字符串、列表、字典)。
如果键对应于一个任务,那么当调度器忘记该任务时,该键将被清理。
如果键是一个列表,那么将假定您想要使用这些键索引到嵌套的字典结构中。例如,如果您调用以下内容:
>>> client.set_metadata(['a', 'b', 'c'], 123)
那么这与设置相同
>>> scheduler.task_metadata['a']['b']['c'] = 123
低层级字典将按需创建。
参见
示例
>>> client.set_metadata('x', 123) >>> client.get_metadata('x') 123
>>> client.set_metadata(['x', 'y'], 123) >>> client.get_metadata('x') {'y': 123}
>>> client.set_metadata(['x', 'w', 'z'], 456) >>> client.get_metadata('x') {'y': 123, 'w': {'z': 456}}
>>> client.get_metadata(['x', 'w']) {'z': 456}
- shutdown()[源代码]¶
关闭连接的调度器和工作节点
注意,这可能会干扰其他可能使用相同调度器和工作节点的客户端。
参见
Client.close仅关闭此客户端
- submit(func, *args, key=None, workers=None, resources=None, retries=None, priority=0, fifo_timeout='100 ms', allow_other_workers=False, actor=False, actors=False, pure=True, **kwargs)[源代码]¶
向调度器提交一个函数应用
- 参数
- 函数可调用
可调用对象将被调度为
func(*args **kwargs)。如果func返回一个协程,它将在工作线程的主事件循环中运行。否则func将在工作线程的任务执行器池中运行(更多信息请参见Worker.executors。)- *args元组
可选的位置参数
- 关键str
任务的唯一标识符。默认为函数名和哈希值。
- 工人字符串或字符串的可迭代对象
一组工作地址或主机名,可以在其上执行计算。留空以默认使用所有工作节点(常见情况)
- 资源dict (默认为 {})
定义了此映射任务的每个实例在工作节点上所需的
资源;例如{'GPU': 2}。有关定义资源的详细信息,请参阅 工作节点资源。- 重试int (默认为 0)
任务失败时允许的自动重试次数
- 优先级数字
任务的优先级可选。默认值为零。优先级越高,优先级越高。
- fifo_timeoutstr timedelta (默认 ‘100ms’)
允许的调用之间的时间量,以视为相同优先级
- 允许其他工作者bool (默认为 False)
与
workers一起使用。指示计算是否可以在不属于 workers 集合的 worker 上执行。- 演员bool (默认 False)
此任务是否应在工作节点上作为有状态的actor存在。有关更多详细信息,请参阅 Actors。
- 演员bool (默认 False)
actor 的别名
- 纯bool (默认为 True)
函数是否为纯函数。对于像
np.random.random这样的非纯函数,设置pure=False。请注意,如果actor和pure关键字参数都设置为 True,那么pure的值将被重置为 False,因为 actor 是有状态的。更多详情请参见 纯函数。- **kwargs
- 返回
- 未来
如果在异步模式下运行,返回未来。否则返回具体值。
- Raises
- 类型错误
如果 ‘func’ 不可调用,则会引发 TypeError。
- ValueError
如果 ‘allow_other_workers’ 为 True 且 ‘workers’ 为 None,则会引发 ValueError。
参见
Client.map一次性提交多个参数
注释
当前的任务图解析实现会搜索
key的出现,并将其替换为相应的Future结果。如果这些字符串作为任务的参数传递,并且这些字符串与集群上已存在的某些key匹配,这可能导致不希望的字符串替换。为了避免这种情况,如果手动设置key,则需要使用唯一值。请参阅 https://github.com/dask/dask/issues/9969 以跟踪此问题的解决进度。示例
>>> c = client.submit(add, a, b)
- subscribe_topic(topic, handler)[源代码]¶
订阅一个主题并在每次接收到事件时执行一个处理程序
- 参数
- topic: str
主题名称
- handler: 可调用对象或协程函数
为每个接收到的消息调用的处理程序。处理程序必须接受一个参数 event,这是一个元组 (timestamp, msg),其中时间戳指的是调度器上的时钟。
参见
dask.distributed.Client.unsubscribe_topicdask.distributed.Client.get_eventsdask.distributed.Client.log_event
示例
>>> import logging >>> logger = logging.getLogger("myLogger") # Log config not shown >>> client.subscribe_topic("topic-name", lambda: logger.info)
- unpublish_dataset(name, **kwargs)[源代码]¶
从调度器中移除命名数据集
- 参数
- 名称str
要取消发布的数据集名称
示例
>>> c.list_datasets() ['my_dataset'] >>> c.unpublish_dataset('my_dataset') >>> c.list_datasets() []
- unregister_scheduler_plugin(name)[源代码]¶
取消注册调度器插件
参见 https://distributed.readthedocs.io/en/latest/plugins.html#scheduler-plugins
- 参数
- 名称str
要取消注册的插件名称。更多信息请参阅
Client.register_scheduler_plugin()文档字符串。
示例
>>> class MyPlugin(SchedulerPlugin): ... def __init__(self, *args, **kwargs): ... pass # the constructor is up to you ... async def start(self, scheduler: Scheduler) -> None: ... pass ... async def before_close(self) -> None: ... pass ... async def close(self) -> None: ... pass ... def restart(self, scheduler: Scheduler) -> None: ... pass
>>> plugin = MyPlugin(1, 2, 3) >>> client.register_plugin(plugin, name='foo') >>> client.unregister_scheduler_plugin(name='foo')
- unregister_worker_plugin(name, nanny=None)[源代码]¶
取消注册一个生命周期工作插件
这将注销一个现有的工作插件。作为注销过程的一部分,插件的
teardown方法将被调用。- 参数
- 名称str
要取消注册的插件名称。更多信息请参阅
Client.register_plugin()文档字符串。
示例
>>> class MyPlugin(WorkerPlugin): ... def __init__(self, *args, **kwargs): ... pass # the constructor is up to you ... def setup(self, worker: dask.distributed.Worker): ... pass ... def teardown(self, worker: dask.distributed.Worker): ... pass ... def transition(self, key: str, start: str, finish: str, **kwargs): ... pass ... def release_key(self, key: str, state: str, cause: str | None, reason: None, report: bool): ... pass
>>> plugin = MyPlugin(1, 2, 3) >>> client.register_plugin(plugin, name='foo') >>> client.unregister_worker_plugin(name='foo')
- unsubscribe_topic(topic)[源代码]¶
取消订阅一个主题并移除事件处理程序
参见
dask.distributed.Client.subscribe_topicdask.distributed.Client.get_eventsdask.distributed.Client.log_event
- upload_file(filename, load: bool = True)[源代码]¶
将本地包上传到调度器和工作节点
这将一个本地文件上传到调度器和所有工作节点。该文件被放置在每个节点的工作目录中,请参阅配置选项
temporary-directory(默认为tempfile.gettempdir())。此目录将被添加到Python的系统路径中,因此任何
.py、.egg或.zip文件都将可导入。- 参数
- 文件名字符串
要发送给工作者的
.py、.egg或.zip文件的文件名- 加载bool, 可选
是否在上传过程中导入模块。默认为
True。
示例
>>> client.upload_file('mylibrary.egg') >>> from mylibrary import myfunc >>> L = client.map(myfunc, seq) >>> >>> # Where did that file go? Use `dask_worker.local_directory`. >>> def where_is_mylibrary(dask_worker): >>> path = pathlib.Path(dask_worker.local_directory) / 'mylibrary.egg' >>> assert path.exists() >>> return str(path) >>> >>> client.run(where_is_mylibrary)
- wait_for_workers(n_workers: int, timeout: float | None = None) None[源代码]¶
阻塞调用以等待 n 个工作线程完成后再继续
- 参数
- n_workers整数
工人的数量
- 超时数字,可选
在引发
dask.distributed.TimeoutError之前的秒数
- who_has(futures=None, **kwargs)[源代码]¶
存储每个未来数据的工人
- 参数
- 未来列表(可选)
未来列表,默认为所有数据
- **kwargsdict
远程函数的可选关键字参数
示例
>>> x, y, z = c.map(inc, [1, 2, 3]) >>> wait([x, y, z]) >>> c.who_has() {'inc-1c8dd6be1c21646c71f76c16d09304ea': ['192.168.1.141:46784'], 'inc-1e297fc27658d7b67b3a758f16bcf47a': ['192.168.1.141:46784'], 'inc-fd65c238a7ea60f6a01bf4c8a5fcf44b': ['192.168.1.141:46784']}
>>> c.who_has([x, y]) {'inc-1c8dd6be1c21646c71f76c16d09304ea': ['192.168.1.141:46784'], 'inc-1e297fc27658d7b67b3a758f16bcf47a': ['192.168.1.141:46784']}
- write_scheduler_file(scheduler_file)[源代码]¶
将调度器信息写入一个json文件。
这通过文件系统方便了调度器信息的轻松共享。调度器文件可以用于实例化使用相同调度器的第二个客户端。
- 参数
- scheduler_filestr
写入调度器文件的路径。
示例
>>> client = Client() >>> client.write_scheduler_file('scheduler.json') # connect to previous client's scheduler >>> client2 = Client(scheduler_file='scheduler.json')
- class distributed.Future(key, client=None, state=None, _id=None)[源代码]¶
远程运行的计算
Future 是运行在远程工作节点上的结果的本地代理。用户在本地 Python 进程中管理 future 对象,以决定在大规模集群中发生的事情。
备注
用户不应手动实例化 futures。这可能导致状态损坏和集群死锁。
- 参数
- 键: str, 或 tuple
此未来所引用的远程数据的键
- 客户端: 客户端
应拥有此未来的客户端。默认为 _get_global_client()。
- inform: bool
我们是否通知调度器我们需要更新这个未来
- 状态: 未来状态
未来的状态
参见
Client创建期货
示例
Future 通常从客户端计算中产生
>>> my_future = client.submit(add, 1, 2)
我们可以跟踪未来的进展和结果。
>>> my_future <Future: status: finished, key: add-8f6e709446674bad78ea8aeecfee188e>
我们可以从 future 中获取结果或异常和回溯
>>> my_future.result()
- add_done_callback(fn)[源代码]¶
当未来完成时调用回调
回调
fn应仅以 future 作为其参数。无论 future 是成功完成、出错还是被取消,都会调用此回调。回调函数在单独的线程中执行。
- 参数
- fn可调用
要调用的方法或函数
- exception(timeout=None, **kwargs)[源代码]¶
返回失败任务的异常
- 参数
- 超时数字,可选
在引发
dask.distributed.TimeoutError之前的秒数- **kwargsdict
函数的可选关键字参数
- 返回
- 异常
如果在返回之前经过 timeout 秒,则会引发
dask.distributed.TimeoutError异常。
- property executor¶
返回执行器,即客户端。
- 返回
- 客户端
执行器
- result(timeout=None)[源代码]¶
等待计算完成,将结果收集到本地进程。
- 参数
- 超时数字,可选
在引发
dask.distributed.TimeoutError之前的秒数
- 返回
- 结果
计算的结果。如果客户端是异步的,则可能是一个协程。
- Raises
- dask.distributed.TimeoutError
如果在返回之前经过了 timeout 秒,则会引发
dask.distributed.TimeoutError。
- property status¶
返回状态
- 返回
- str
状态
- traceback(timeout=None, **kwargs)[源代码]¶
返回失败任务的回溯信息
这将返回一个回溯对象。你可以使用
traceback模块检查这个对象。或者,如果你调用future.result(),这个回溯将伴随引发的异常。- 参数
- 超时数字,可选
如果在 timeout 秒后仍未返回,则会引发
dask.distributed.TimeoutError。如果在返回之前经过了 timeout 秒,则会引发dask.distributed.TimeoutError。
- 返回
- traceback
回溯对象。如果客户端是异步的,则返回一个协程。
示例
>>> import traceback >>> tb = future.traceback() >>> traceback.format_tb(tb) [...]
- property type¶
返回类型
- class distributed.Queue(name=None, client=None, maxsize=0)[源代码]¶
分布式队列
这允许多个客户端通过多生产者/多消费者队列相互共享期货或少量数据。所有元数据都通过调度器进行序列化。
队列的元素必须是 Futures 或 msgpack 可编码的数据(整数、字符串、列表、字典)。所有数据都通过调度器发送,因此最好不要发送大型对象。要共享大型对象,请分散数据并共享 future。
警告
此对象为实验性
- 参数
- 名称: 字符串 (可选)
其他客户端和调度器用来标识队列的名称。如果未提供,将生成一个随机名称。
- client: 客户端 (可选)
用于与调度器通信的客户端。如果未指定,将使用默认的全局客户端。
- maxsize: int (可选)
队列中允许的项目数量。如果为0(默认值),则队列大小无限制。
参见
Variable客户端之间的共享变量
示例
>>> from dask.distributed import Client, Queue >>> client = Client() >>> queue = Queue('x') >>> future = client.submit(f, x) >>> queue.put(future)
- get(timeout=None, batch=False, **kwargs)[源代码]¶
从队列中获取数据
- 参数
- 超时数字或字符串或时间差,可选
超时前等待的时间(以秒为单位)。除了秒数,还可以指定字符串格式的timedelta,例如“200ms”。
- 批处理布尔值, 整数 (可选)
如果为 True,则返回队列中当前等待的所有元素。如果为整数,则返回队列中的那么多元素。如果为 False(默认),则一次返回一个项目。
- class distributed.Variable(name=None, client=None)[源代码]¶
分布式全局变量
这允许多个客户端通过一个可变的变量相互共享期货和数据。所有元数据都通过调度器进行序列化。可能会发生竞态条件。
值必须是 Futures 或 msgpack 可编码的数据(整数、列表、字符串等)。所有数据都将被保留并通过调度器发送,因此最好不要发送太多数据。如果你想共享大量数据,那么请使用
scatter并共享 future 对象。- 参数
- 名称: 字符串 (可选)
其他客户端和调度器用来识别变量的名称。如果未提供,将生成一个随机名称。
- client: 客户端 (可选)
用于与调度器通信的客户端。如果未指定,将使用默认的全局客户端。
参见
Queue客户端之间的共享多生产者/多消费者队列
示例
>>> from dask.distributed import Client, Variable >>> client = Client() >>> x = Variable('x') >>> x.set(123) # docttest: +SKIP >>> x.get() # docttest: +SKIP 123 >>> future = client.submit(f, x) >>> x.set(future)
- class distributed.Lock(name=None, client=<object object>, scheduler_rpc=None, loop=None)[源代码]¶
分布式集中锁
警告
这是使用
distributed.Semaphore作为后端,它容易受到租约超额预订的影响。对于锁来说,这意味着如果租约超时,两个或更多的实例可能会同时获取锁。要禁用租约超时,请将distributed.scheduler.locks.lease-timeout设置为 inf,例如。with dask.config.set({"distributed.scheduler.locks.lease-timeout": "inf"}): lock = Lock("x") ...
注意,如果没有租约超时,在集群缩减或工作节点故障的情况下,锁可能会死锁。
- 参数
- 名称: 字符串 (可选)
要获取的锁的名称。选择相同的名称可以使两个不相关的进程协调一个锁。如果没有给出,将生成一个随机名称。
- client: 客户端 (可选)
用于与调度器通信的客户端。如果未指定,将使用默认的全局客户端。
示例
>>> lock = Lock('x') >>> lock.acquire(timeout=1) >>> # do things with protected resource >>> lock.release()
- class distributed.Event(name=None, client=None)[源代码]¶
分布式集中事件等同于 asyncio.Event
一个事件存储一个标志,该标志在开始时设置为假。该标志可以通过 set() 调用设置为真,或者通过 clear() 调用重新设置为假。每次调用 wait() 都会阻塞,直到事件标志被设置为真。
- 参数
- 名称: 字符串 (可选)
事件的名称。选择相同的名称可以使两个不相连的进程协调一个事件。如果没有给出,将生成一个随机名称。
- client: 客户端 (可选)
用于与调度器通信的客户端。如果未指定,将使用默认的全局客户端。
示例
>>> event_1 = Event('a') >>> event_1.wait(timeout=1) >>> # in another process >>> event_2 = Event('a') >>> event_2.set() >>> # now event_1 will stop waiting
- class distributed.Semaphore(max_leases=1, name=None, scheduler_rpc=None, loop=None)[源代码]¶
这个 信号量 将跟踪调度器上的租约,这些租约可以被该类的实例获取和释放。如果已经获取了最大数量的租约,则无法获取更多租约,调用者将等待直到另一个租约被释放。
生命周期或租约通过超时来控制。此超时由该实例的
Client定期刷新,并在工作节点失败时提供防止死锁或资源耗尽的保护。超时可以通过配置选项distributed.scheduler.locks.lease-timeout来控制,调度器验证超时的间隔通过选项distributed.scheduler.locks.lease-validation-interval设置。与Python标准库中的信号量相比,一个显著的区别是这个实现不允许释放的次数多于获取的次数。如果发生这种情况,会发出警告,但内部状态不会被修改。
警告
此实现容易在租约超时的情况下出现租约超订。建议监控日志信息,并根据用户应用程序调整上述配置选项为合适的值。
- 参数
- max_leases: int (可选)
可以同时授予的最大租约数量。这实际上为特定资源的并行访问设置了一个上限。默认为1。
- 名称: 字符串 (可选)
要获取的信号量的名称。选择相同名称可以让两个不相关的进程进行协调。如果未指定,将生成一个随机名称。
- register: bool
如果为 True,将信号量注册到调度器。这需要在获取任何租约之前完成。如果未在初始化期间完成,也可以通过调用此类型的 register 方法来完成。注册时,需要等待完成。
- scheduler_rpc: 连接池
连接到调度器的 ConnectionPool。如果没有提供,则使用工作节点或客户端的池。此参数主要用于测试。
- 循环: IOLoop
此实例正在使用的事件循环。如果提供 None,则重用活动工作线程或客户端的循环。
注释
如果客户端尝试释放信号量但没有获取租约,这将引发异常。
dask 默认情况下假设函数是纯函数,当在这样的函数内部使用信号量获取/释放时,必须注意实际上存在副作用,因此,该函数不能再被视为纯函数。如果不考虑这一点,可能会导致意外行为。
示例
>>> from distributed import Semaphore ... sem = Semaphore(max_leases=2, name='my_database') ... ... def access_resource(s, sem): ... # This automatically acquires a lease from the semaphore (if available) which will be ... # released when leaving the context manager. ... with sem: ... pass ... ... futures = client.map(access_resource, range(10), sem=sem) ... client.gather(futures) ... # Once done, close the semaphore to clean up the state on scheduler side. ... sem.close()