多进程处理
TLDR: 如果你发现使用Python内置的multiprocessing模块与Polars一起使用时,出现了关于多进程方法的Polars错误,你应该确保你使用的是spawn,而不是fork,作为启动方法:
from multiprocessing import get_context
def my_fun(s):
print(s)
with get_context("spawn").Pool() as pool:
pool.map(my_fun, ["input1", "input2", ...])
何时不使用多进程
在我们深入细节之前,重要的是要强调Polars从一开始就被构建为使用所有CPU核心。它通过在执行可以并行完成的计算时在单独的线程中执行来实现这一点。例如,在select语句中请求两个表达式可以并行完成,结果只在最后合并。另一个例子是使用group_by().agg(在组内聚合值,每个组可以单独评估。在这些情况下,multiprocessing模块不太可能提高代码性能。如果你在使用Polars的GPU引擎,你也应该避免手动多进程处理。当同时使用时,它们可能会争夺系统内存和处理能力,导致性能下降。
请参阅优化部分以获取更多优化信息。
何时使用多进程
尽管Polars是多线程的,但其他库可能是单线程的。当其他库成为瓶颈,并且手头的问题可以并行化时,使用多进程来加速是有意义的。
默认多进程配置的问题
摘要
Python 多进程文档列出了创建进程池的三种方法:
- 生成
- fork
- forkserver
fork的描述是(截至2022-10-15):
父进程使用os.fork()来分叉Python解释器。子进程在开始时实际上与父进程完全相同。父进程的所有资源都被子进程继承。请注意,安全地分叉一个多线程进程是有问题的。
仅在Unix上可用。Unix上的默认设置。
简短的总结是:Polars 是多线程的,以提供开箱即用的强大性能。因此,它不能与 fork 结合使用。如果你在 Unix(Linux、BSD 等)上,你正在使用 fork,除非你明确地覆盖它。
你可能之前没有遇到过这个问题的原因是,纯Python代码和大多数Python库(大部分)是单线程的。或者,你使用的是Windows或MacOS,在这些系统上fork甚至不可用(对于MacOS,直到Python 3.7都是如此)。
因此,应该使用spawn,或者forkserver来代替。spawn在所有平台上都可用,是最安全的选择,因此是推荐的方法。
示例
使用fork的问题在于复制父进程。考虑下面的例子,这是一个稍微修改过的例子,发布在
Polars 问题跟踪器上:
import multiprocessing
import polars as pl
def test_sub_process(df: pl.DataFrame, job_id):
df_filtered = df.filter(pl.col("a") > 0)
print(f"Filtered (job_id: {job_id})", df_filtered, sep="\n")
def create_dataset():
return pl.DataFrame({"a": [0, 2, 3, 4, 5], "b": [0, 4, 5, 56, 4]})
def setup():
# some setup work
df = create_dataset()
df.write_parquet("/tmp/test.parquet")
def main():
test_df = pl.read_parquet("/tmp/test.parquet")
for i in range(0, 5):
proc = multiprocessing.get_context("spawn").Process(
target=test_sub_process, args=(test_df, i)
)
proc.start()
proc.join()
print(f"Executed sub process {i}")
if __name__ == "__main__":
setup()
main()
使用fork作为方法,而不是spawn,会导致死锁。
fork 方法等同于调用 os.fork(),这是一个在
POSIX 标准中定义的系统调用:
一个进程应被创建为单线程。如果一个多线程进程调用fork(),新进程应包含调用线程的副本及其整个地址空间,可能包括互斥锁和其他资源的状态。因此,为了避免错误,子进程在调用exec函数之前只能执行异步信号安全的操作。
相比之下,spawn 将创建一个全新的 Python 解释器,并且不会继承互斥锁的状态。
那么在代码示例中发生了什么?对于使用pl.read_parquet读取文件时,文件必须被锁定。然后调用os.fork(),复制父进程的状态,包括互斥锁。因此,所有子进程都会复制处于获取状态的文件锁,使它们无限期地挂起等待文件锁被释放,而这永远不会发生。
使调试这些问题变得棘手的是,fork 可以工作。将示例更改为不调用 pl.read_parquet:
import multiprocessing
import polars as pl
def test_sub_process(df: pl.DataFrame, job_id):
df_filtered = df.filter(pl.col("a") > 0)
print(f"Filtered (job_id: {job_id})", df_filtered, sep="\n")
def create_dataset():
return pl.DataFrame({"a": [0, 2, 3, 4, 5], "b": [0, 4, 5, 56, 4]})
def main():
test_df = create_dataset()
for i in range(0, 5):
proc = multiprocessing.get_context("fork").Process(
target=test_sub_process, args=(test_df, i)
)
proc.start()
proc.join()
print(f"Executed sub process {i}")
if __name__ == "__main__":
main()
这工作得很好。因此,在较大的代码库中调试这些问题,即不是这里的小型示例,可能会非常痛苦,因为一个看似不相关的更改可能会破坏你的多进程代码。一般来说,除非有非常特定的需求无法通过其他方式满足,否则不应该在多线程库中使用fork启动方法。
fork的优缺点
基于这个例子,你可能会想,为什么Python一开始就有fork可用?
首先,可能是由于历史原因:spawn是在Python 3.4版本中添加的,而fork从Python 2.x系列开始就已经是Python的一部分了。
其次,spawn 和 forkserver 有一些限制不适用于 fork,特别是所有参数都应该是可序列化的。更多信息请参阅
Python 多进程文档。
第三,因为创建新进程比spawn更快,因为spawn实际上是fork + 通过调用execv创建一个全新的Python进程,没有锁。因此,Python文档中的警告说它较慢:spawn有更多的开销。然而,在几乎所有情况下,人们都希望使用多个进程来加速需要几分钟甚至几小时的计算,这意味着在整体方案中开销可以忽略不计。更重要的是,它实际上可以与多线程库结合使用。
第四,spawn 启动一个新进程,因此它要求代码是可导入的,这与 fork 不同。特别是,这意味着在使用 spawn 时,相关代码不应在全局范围内,例如在 Jupyter 笔记本或普通脚本中。因此,在上面的示例中,我们在其中定义了生成函数,并从 __main__ 子句中运行这些函数。这对于典型的项目来说不是问题,但在笔记本中进行快速实验时可能会失败。
参考文献
-
https://docs.python.org/3/library/multiprocessing.html
-
https://pythonspeed.com/articles/python-multiprocessing/
-
https://pubs.opengroup.org/onlinepubs/9699919799/functions/fork.html
-
https://bnikolic.co.uk/blog/python/parallelism/2019/11/13/python-forkserver-preload.html