系统架构#
在本节中,我们将概述Modin的整体系统架构,并详细介绍组件设计、实现以及其他重要细节。本文档还包含了对那些有兴趣贡献新功能、错误修复和增强功能的人的重要参考信息。
高级架构视图#
下图概述了Modin组件的通用分层视图,并附有文档中每个主要部分的简要描述。
Modin在逻辑上被分为不同的层次,这些层次代表了一个典型的数据库管理系统的层次结构。抽象出每个组件使我们能够单独优化和替换组件,而不影响系统的其余部分。例如,我们可以实现针对特定类型数据优化的新计算内核,并通过实现一个小接口将其简单地插入到现有基础设施中。它仍然可以通过我们选择的计算引擎在内部逻辑上进行分布式处理。
系统视图#
Modin架构的俯视图如下所示:
用户 - 数据科学家通过API发送交互式或批量命令与Modin系统进行交互,Modin使用各种执行引擎执行这些命令:目前支持Ray、Dask和MPI。
子系统/容器视图#
如果我们点击进入下一层细节,我们会看到在Modin内部,分层架构是通过几个相互作用的组件实现的:
为了简化,其他执行系统——Dask和MPI被省略,仅展示了Ray执行。
Dataframe子系统是数据框持有和查询编译的支柱。它负责将输入/输出分派到适当的模块,获取pandas API并调用查询编译器将调用转换为内部中间数据框代数。
数据输入/输出模块与Dataframe和分区子系统协同工作,以读取分成多个分区的数据,并将数据发送到适当的节点进行存储。
查询规划器是一个子系统,它将pandas API转换为中间数据框代数表示DAG,并执行一组初始优化。
查询执行器负责获取数据框代数DAG,根据选定的存储格式进行进一步优化,并将数据框代数DAG映射或编译为实际的执行序列。
存储格式模块负责将抽象操作映射到实际的执行器调用,例如 pandas、自定义格式。
编排子系统负责为选定的执行生成和控制实际的执行环境。它生成实际的节点,启动执行环境,例如Ray,监控执行器的状态并提供遥测数据。
组件视图#
用户执行数据转换、数据输入或数据输出的查询会通过下面详细介绍的Modin组件。查询的路径在执行系统中大致相似。
数据转换#
查询编译器#
Query Compiler 从 pandas API 层接收查询。API 层负责确保向 Query Compiler 提供干净的输入。Query Compiler 必须了解计算内核和数据的内存格式,以便高效地编译查询。
查询编译器负责将编译后的查询发送到核心Modin数据框架。 在这个设计中,查询编译器没有关于查询将在何处或何时执行的信息,并将分区布局的控制权交给Modin数据框架。
为了减少pandas API的复杂性,查询编译器层紧密遵循pandas API,但去除了大部分的重复内容。
核心Modin数据框架#
在这一层,操作可以延迟执行。目前,Modin 急切地执行大多数操作,以尝试像 pandas 那样运行。一些操作,例如 transpose 是昂贵的,并且会在内存中创建数据的完整副本。在这些情况下,我们可以等待直到另一个操作触发计算。未来,我们计划在 Modin 中添加额外的查询规划和延迟执行,以确保查询能够高效执行。
Core Modin 数据框架的结构是可扩展的,因此任何可以为给定执行更好地优化的操作都可以以这种方式被覆盖和优化。
这一层的API相较于QueryCompiler和面向用户的API有显著减少。每个API代表执行给定操作或行为的单一方式。
核心Modin数据框架API#
更多文档可以在代码内部找到。此API尚未完成,但代表了绝大多数操作和行为。
此API可以由其他分布式/并行DataFrame库实现,并且也可以插入到Modin中。创建问题或在我们的Slack上讨论以获取更多信息!
Core Modin Dataframe 负责数据布局和洗牌、分区以及序列化发送到每个分区的任务。Modin Dataframe 接口的其他实现也需要处理这些任务。
分区管理器#
分区管理器可以根据操作的类型改变分区的大小和形状。例如,某些操作很复杂,需要访问整个列或行。分区管理器可以将块分区转换为行分区或列分区。这使Modin能够灵活地执行在仅行或仅列分区方案中难以执行的操作。
分区管理器的另一个重要组件是将编译后的查询序列化并发送到分区。它维护每个分区的长度和宽度的元数据,因此当操作只需要操作或提取数据的子集时,它可以直接将这些查询发送到正确的分区。这对于pandas中的一些操作尤其重要,这些操作可以接受不同列的不同参数和操作,例如带有字典的fillna。
这种抽象将实际的数据移动和函数应用从Dataframe层分离出来,以保持核心Dataframe API的简洁,并分别优化数据移动和元数据管理。
分区#
分区负责管理数据框的一个子集。如下所述,数据框在行和列方向上都进行了分区。这为Modin提供了双向的可扩展性和数据布局的灵活性。Modin中有许多优化是在分区中实现的。分区特定于执行框架和数据的内存格式,使Modin能够利用两者的潜在优化。这些优化在特定于执行框架的页面上有进一步的解释。
执行引擎#
该层对数据的分区执行计算。Modin 数据框架设计用于与任务并行框架一起工作,但通过一些努力,也可以与数据并行框架集成。
存储格式#
存储格式 描述了内存中的分区类型。
Modin 中的基础存储格式是 pandas。在默认情况下,Modin 数据框操作的分区包含 pandas.DataFrame 对象。
数据入口#
注意
数据输入操作(例如 read_csv)在 Modin 中将数据从源加载到分区中,反之亦然,数据输出操作(例如 to_csv)也是如此。通过并行读取/写入分区来提高性能。
数据输入从pandas API层的一个函数开始(例如read_csv)。然后用户的查询被传递给Factory Dispatcher,它定义了特定于执行的工厂。执行工厂包含一个IO类(例如PandasOnRayIO),其职责是执行文件的并行读取/写入。这个IO类包含与pandas IO函数接口和名称相似的类方法(例如PandasOnRayIO.read_csv)。IO类声明了特定于执行引擎和存储格式的Modin Dataframe和Query Compiler类,以确保构建正确的对象。它还声明了IO方法,这些方法是混合类,包含用于部署远程任务的引擎特定类、用于解析给定文件格式的类以及处理头节点上特定格式文件分块的类(参见调度器类实现详细信息)。IO类数据输入函数的输出是一个Modin Dataframe。
数据出口#
数据导出操作(例如 to_csv)与数据导入操作类似,直到执行特定的IO类函数构造。IO类的数据导出函数与数据导入函数的定义略有不同,并且仅为引擎创建,因为分区已经具有有关其存储格式的信息。使用IO类,数据从分区导出到目标文件。
支持的执行引擎和存储格式#
这是Modin支持的执行引擎和内存格式列表。如果您想贡献一个新的执行引擎或内存格式,请参阅贡献文档页面。
- pandas on Ray
使用 Ray 执行框架。
存储格式为 pandas,内存分区类型为 pandas DataFrame。
有关执行路径的更多信息,请参阅 pandas on Ray 页面。
- pandas on Dask
使用 Dask Futures 执行框架。
存储格式为 pandas,内存分区类型为 pandas DataFrame。
有关执行路径的更多信息,请参阅 pandas on Dask 页面。
- pandas on MPI
存储格式为pandas,内存分区类型为pandas DataFrame。
有关执行路径的更多信息,请参阅pandas on Unidist页面。
- pandas on Python
使用原生Python执行 - 主要用于调试。
存储格式为pandas,内存分区类型为pandas DataFrame。
有关执行路径的更多信息,请参阅pandas on Python页面。
数据帧分区#
Modin DataFrame 架构遵循现代数据库和高性能矩阵系统的架构。我们选择了一种沿列和行进行分区的方案,因为它使 Modin 在列数和行数方面都具有灵活性和可扩展性。下图展示了这一概念。
目前,每个分区的主要内存格式是 pandas DataFrame (pandas 存储格式)。
索引#
我们目前使用pandas.Index对象来索引列和行。未来,我们将实现一个分布式的、与pandas兼容的Index对象,以消除系统中的这种扩展限制。大多数工作负载不会受到这种可扩展性限制的影响,因为只有在操作超过数百亿列或行时才会出现这种限制。重要提示:如果您使用的是默认索引(pandas.RangeIndex),则有一个固定的内存开销(约200字节),并且索引不会有可扩展性问题。
API#
API 是面向用户的最外层。以下类包含 Modin 对 pandas API 的实现:
模块/类视图#
Modin的模块布局如下所示。点击链接深入了解Modin的内部实现细节。文档涵盖了大多数模块,并且每天都会添加更多文档!
├───.github ├───asv_bench ├───ci ├───docker ├───docs ├───examples ├───modin │ ├─── config | ├─── utils │ ├───core │ │ ├─── dataframe │ │ │ ├─── 代数 │ │ │ ├─── base │ │ │ └─── pandas │ │ ├───execution │ │ │ ├───dask │ │ │ │ ├───common │ │ │ │ └───implementations │ │ │ │ └─── pandas_on_dask │ │ │ ├─── 调度 │ │ │ ├───python │ │ │ │ └───implementations │ │ │ │ └─── pandas_on_python │ │ │ ├───ray │ │ │ │ ├───common │ │ │ │ ├─── generic │ │ │ │ └───implementations │ │ │ │ └─── pandas_on_ray │ │ │ └───unidist │ │ │ ├───common │ │ │ ├─── generic │ │ │ └───implementations │ │ │ └─── pandas_on_unidist │ │ ├─── io │ │ └─── storage_formats │ │ ├─── base │ │ └─── pandas │ ├───distributed │ │ ├───dataframe │ │ │ └─── pandas │ ├─── 实验性 │ │ ├───core | | | └─── io │ │ ├─── pandas │ │ ├─── sklearn │ │ ├───spreadsheet │ │ ├─── xgboost │ │ └─── batch │ └───pandas │ ├─── dataframe │ └─── series ├───requirements ├───scripts └───stress_tests