使用Vaex进行异步编程#
使用Rich based progress bar我们可以看到,如果我们在一个数据帧上调用两个方法,我们会得到两次数据遍历(如[1]和[2]所示)。
[1]:
import vaex
df = vaex.datasets.taxi()
with vaex.progress.tree('rich', title="Two passes"):
print(df.tip_amount.sum())
print(df.passenger_count.sum())
Two passes ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.15s ├── sum ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s │ └── vaex.agg.sum('tip_amount') ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1] └── sum ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.06s └── vaex.agg.sum('passenger_count') ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.06s[2]
使用 delay=True#
如果我们传递 delay=True,Vaex 将不会开始执行它内部创建的任务,而是会返回一个 promise。在调用 df.execute() 后,所有任务将会执行,并且 promise 将会被解决,这意味着你可以使用 .get() 方法来获取最终值,或者使用 .then() 方法来表示结果。
[2]:
with vaex.progress.tree('rich', title="Single pass using delay"):
tip_sum_promise = df.tip_amount.sum(delay=True)
passengers_promise = df.passenger_count.sum(delay=True)
df.execute()
tip_per_passenger = tip_sum_promise.get() / passengers_promise.get()
print(f"tip_per_passenger = {tip_per_passenger}")
Single pass using delay ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s ├── sum ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s │ └── vaex.agg.sum('tip_amount') ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1] └── sum ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s └── vaex.agg.sum('passenger_count') ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]
tip_per_passenger = 0.5774000691888607
使用@delayed装饰器#
为了让生活更轻松,Vaex 实现了 vaex.delayed 装饰器。一旦所有参数都解析完毕,装饰的函数将自动执行。
[3]:
with vaex.progress.tree('rich', title="Single pass using delay + using delayed"):
@vaex.delayed
def compute(tip_sum, passengers):
return tip_sum/passengers
tip_per_passenger_promise = compute(df.tip_amount.sum(delay=True),
df.passenger_count.sum(delay=True))
df.execute()
print(f"tip_per_passenger = {tip_per_passenger_promise.get()}")
Single pass using delay + using delayed ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s ├── sum ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s │ └── vaex.agg.sum('tip_amount') ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1] └── sum ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s └── vaex.agg.sum('passenger_count') ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]
异步 await#
在上述所有情况下,我们调用了df.execute(),它将使用线程同步执行所有任务。然而,如果你在Python中使用异步IO,这意味着你正在阻止所有其他异步协程运行。
为了让其他协程继续运行(例如在FastAPI上下文中),我们可以改为等待df.execute_async()。除此之外,我们还可以await承诺以获取结果,而不是调用.get(),使你的代码看起来更像AsyncIO。
[4]:
with vaex.progress.tree('rich', title="Single pass using delay + using delayed and await"):
@vaex.delayed
def compute(tip_sum, passengers):
return tip_sum/passengers
tip_per_passenger_promise = compute(df.tip_amount.sum(delay=True),
df.passenger_count.sum(delay=True))
await df.execute_async()
tip_per_passenger = await tip_per_passenger_promise
print(f"tip_per_passenger = {tip_per_passenger}")
Single pass using delay + using delayed and await ━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.14s ├── sum ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.09s │ └── vaex.agg.sum('tip_amount') ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1] └── sum ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s └── vaex.agg.sum('passenger_count') ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]
tip_per_passenger = 0.5774000691888603
注意: 在 Jupyter notebook 中,已经有一个 asyncio 事件循环在运行。在脚本中,你可能需要使用 asyncio.run(my_top_level_coroutine()) 才能使用 await。
异步自动执行#
在前面的例子中,我们手动调用了 df.execute_async()。这使得 Vaex 能够以尽可能少的次数遍历数据来执行所有任务。
为了让生活更轻松,让你的代码更像AsyncIO,我们可以使用df.executor.auto_execute()异步上下文管理器,它会在等待一个承诺时自动为你调用df.execute_async()。
[5]:
with vaex.progress.tree('rich', title="Single pass using auto_execute"):
async with df.executor.auto_execute():
@vaex.delayed
def compute(tip_sum, passengers):
return tip_sum/passengers
tip_per_passenger = await compute(df.tip_amount.sum(delay=True),
df.passenger_count.sum(delay=True))
print(f"tip_per_passenger = {tip_per_passenger}")
Single pass using auto_execute ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s ├── sum ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s │ └── vaex.agg.sum('tip_amount') ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1] └── sum ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s └── vaex.agg.sum('passenger_count') ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]
tip_per_passenger = 0.5774000691888609