使用Vaex进行异步编程#

使用Rich based progress bar我们可以看到,如果我们在一个数据帧上调用两个方法,我们会得到两次数据遍历(如[1][2]所示)。

[1]:
import vaex

df = vaex.datasets.taxi()

with vaex.progress.tree('rich', title="Two passes"):
    print(df.tip_amount.sum())
    print(df.passenger_count.sum())
  Two passes                                    ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.15s   
├──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s   
│   └──   vaex.agg.sum('tip_amount')            ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]
└──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.06s   
    └──   vaex.agg.sum('passenger_count')       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.06s[2]

使用 delay=True#

如果我们传递 delay=True,Vaex 将不会开始执行它内部创建的任务,而是会返回一个 promise。在调用 df.execute() 后,所有任务将会执行,并且 promise 将会被解决,这意味着你可以使用 .get() 方法来获取最终值,或者使用 .then() 方法来表示结果。

[2]:
with vaex.progress.tree('rich', title="Single pass using delay"):
    tip_sum_promise = df.tip_amount.sum(delay=True)
    passengers_promise = df.passenger_count.sum(delay=True)
    df.execute()
    tip_per_passenger = tip_sum_promise.get() / passengers_promise.get()
    print(f"tip_per_passenger = {tip_per_passenger}")
  Single pass using delay                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s   
├──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s   
│   └──   vaex.agg.sum('tip_amount')            ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]
└──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s   
    └──   vaex.agg.sum('passenger_count')       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]
tip_per_passenger = 0.5774000691888607

使用@delayed装饰器#

为了让生活更轻松,Vaex 实现了 vaex.delayed 装饰器。一旦所有参数都解析完毕,装饰的函数将自动执行。

[3]:
with vaex.progress.tree('rich', title="Single pass using delay + using delayed"):
    @vaex.delayed
    def compute(tip_sum, passengers):
        return tip_sum/passengers

    tip_per_passenger_promise = compute(df.tip_amount.sum(delay=True),
                                        df.passenger_count.sum(delay=True))
    df.execute()
    print(f"tip_per_passenger = {tip_per_passenger_promise.get()}")
  Single pass using delay + using delayed       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s   
├──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s   
│   └──   vaex.agg.sum('tip_amount')            ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]
└──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s   
    └──   vaex.agg.sum('passenger_count')       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]

异步 await#

在上述所有情况下,我们调用了df.execute(),它将使用线程同步执行所有任务。然而,如果你在Python中使用异步IO,这意味着你正在阻止所有其他异步协程运行。

为了让其他协程继续运行(例如在FastAPI上下文中),我们可以改为等待df.execute_async()。除此之外,我们还可以await承诺以获取结果,而不是调用.get(),使你的代码看起来更像AsyncIO。

[4]:
with vaex.progress.tree('rich', title="Single pass using delay + using delayed and await"):
    @vaex.delayed
    def compute(tip_sum, passengers):
        return tip_sum/passengers

    tip_per_passenger_promise = compute(df.tip_amount.sum(delay=True),
                                        df.passenger_count.sum(delay=True))
    await df.execute_async()
    tip_per_passenger = await tip_per_passenger_promise
    print(f"tip_per_passenger = {tip_per_passenger}")
  Single pass using delay + using delayed and await ━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.14s   
├──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.09s   
│   └──   vaex.agg.sum('tip_amount')            ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]
└──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s   
    └──   vaex.agg.sum('passenger_count')       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]
tip_per_passenger = 0.5774000691888603

注意: 在 Jupyter notebook 中,已经有一个 asyncio 事件循环在运行。在脚本中,你可能需要使用 asyncio.run(my_top_level_coroutine()) 才能使用 await

异步自动执行#

在前面的例子中,我们手动调用了 df.execute_async()。这使得 Vaex 能够以尽可能少的次数遍历数据来执行所有任务。

为了让生活更轻松,让你的代码更像AsyncIO,我们可以使用df.executor.auto_execute()异步上下文管理器,它会在等待一个承诺时自动为你调用df.execute_async()

[5]:
with vaex.progress.tree('rich', title="Single pass using auto_execute"):
    async with df.executor.auto_execute():
        @vaex.delayed
        def compute(tip_sum, passengers):
            return tip_sum/passengers

        tip_per_passenger = await compute(df.tip_amount.sum(delay=True),
                                          df.passenger_count.sum(delay=True))
        print(f"tip_per_passenger = {tip_per_passenger}")
  Single pass using auto_execute                ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s   
├──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s   
│   └──   vaex.agg.sum('tip_amount')            ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]
└──   sum                                       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s   
    └──   vaex.agg.sum('passenger_count')       ━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 00.08s[1]
tip_per_passenger = 0.5774000691888609