管理计算
内容
管理计算¶
Dask.distributed 中的数据和计算总是处于三种状态之一
本地内存中的具体值。例如包括整数
1或本地进程中的 numpy 数组。dask 图中的惰性计算,可能存储在
dask.delayed或dask.dataframe对象中。运行计算或远程数据,由指向当前正在进行的计算的
Future对象表示。
这三种形式都很重要,并且有一些函数可以在所有三种状态之间进行转换。
Dask 集合到具体值¶
你可以通过调用 .compute() 方法或 dask.compute(...) 函数将任何 dask 集合转换为具体值。此函数将阻塞直到计算完成,直接从惰性 dask 集合转换为本地内存中的具体值。
这种方法是最熟悉和直接的,特别是对于那些从标准的单机Dask体验或从普通的编程中过来的人来说。当你已经有数据在内存中,并且想要快速获得小结果直接到你的本地进程时,这种方法非常棒。
>>> df = dd.read_csv('s3://...')
>>> df.value.sum().compute()
100000000
然而,如果你尝试将整个数据集带回到本地RAM,这种方法通常会失效。
>>> df.compute()
MemoryError(...)
它还强制您在计算完成之前等待,然后再将解释器的控制权交还。
Dask 集合到未来¶
你可以使用 client.compute 和 client.persist 方法异步提交延迟的 dask 图表以在集群上运行。这些函数会立即返回 Future 对象。然后可以查询这些 futures 以确定计算的状态。
client.compute¶
.compute 方法接受一个集合并返回一个单一的未来。
>>> df = dd.read_csv('s3://...')
>>> total = client.compute(df.sum()) # Return a single future
>>> total
Future(..., status='pending')
>>> total.result() # Block until finished
100000000
因为这是一个单一的未来,结果必须适合单个工作机器。与上面的 dask.compute 类似,client.compute 方法仅适用于结果较小且应适合内存的情况。以下操作可能会失败:
>>> future = client.compute(df) # Blows up memory
相反,你应该使用 client.persist
client.persist¶
.persist 方法将 Dask 集合背后的任务图提交给调度器,获取所有最顶层任务的 Futures(例如,Dask DataFrame 中的每个 Pandas DataFrame 对应一个 Future)。然后,它返回一个指向这些 futures 而不是之前图的集合副本。这个新集合在语义上是等价的,但现在指向的是正在运行的数据,而不是惰性图。如果你查看集合中的 dask 图,你会直接看到 Future 对象:
>>> df = dd.read_csv('s3://...')
>>> df.dask # Recipe to compute df in chunks
{('read', 0): (load_s3_bytes, ...),
('parse', 0): (pd.read_csv, ('read', 0)),
('read', 1): (load_s3_bytes, ...),
('parse', 1): (pd.read_csv, ('read', 1)),
...
}
>>> df = client.persist(df) # Start computation
>>> df.dask # Now points to running futures
{('parse', 0): Future(..., status='finished'),
('parse', 1): Future(..., status='pending'),
...
}
集合会立即返回,计算将在集群后台进行。最终,该集合的所有未来对象都将完成,届时对该集合的进一步查询可能会非常快。
通常的工作流程是使用 dask.dataframe 或 dask.delayed 等工具定义计算,直到你有一个良好的数据集可以处理,然后将该集合持久化到集群中,然后对结果集合执行许多快速查询。
具体值到未来对象¶
我们通过几种不同的方式获得未来。一种是通过上述机制,在Dask集合中包装Futures。另一种是通过使用``client.scatter``、``client.submit``或``client.map``直接向集群提交数据或任务。
futures = client.scatter(args) # Send data
future = client.submit(function, *args, **kwargs) # Send single task
futures = client.map(function, sequence, **kwargs) # Send many tasks
在这种情况下,*args 或 **kwargs 可以是普通的 Python 对象,例如 1 或 'hello',或者如果你想要将任务与依赖关系链接在一起,它们也可以是其他 Future 对象。
与 dask.delayed 这样的 Dask 集合不同,这些任务提交会立即发生。concurrent.futures 接口与 dask.delayed 非常相似,只是执行是立即的而不是惰性的。
未来值到具体值¶
你可以通过调用 Future.result() 方法将单个 Future 转换为本地进程中的具体值。你可以通过调用 client.gather 方法将一组 futures 转换为具体值。
>>> future.result()
1
>>> client.gather(futures)
[1, 2, 3, 4, ...]
Dask 集合的未来¶
如在“集合到未来”部分中所见,在Dask图表中通常会有当前正在计算的``Future``对象。这使我们能够在当前正在运行的计算基础上构建进一步的计算。这通常通过dask.delayed工作流在自定义计算中完成:
>>> x = delayed(sum)(futures)
>>> y = delayed(product)(futures)
>>> future = client.compute(x + y)
混合这两种形式允许你像 sum(...) + product(...) 这样分阶段构建和提交计算。如果你希望在确定如何继续之前等待查看计算的某些部分的值,这通常很有价值。一次性提交许多计算允许调度器在确定运行什么时稍微更智能。