优化说明#
Modin 在这里为许多配置选择了默认值,这些配置在大多数情况下都能提供出色的性能。本页面适用于那些喜欢优化代码的人以及那些对 Modin 中现有优化感到好奇的人。在这里,您可以找到有关 Modin 优化的更多信息,无论是针对操作管道还是特定操作。如果您想自行调整 Modin 的行为,请参阅 Modin 配置设置 页面,了解 Modin 中可用的完整配置集。
Modin中的范围分区#
Modin 利用范围分区方法进行特定操作,显著增强了并行性并在某些场景下减少了内存消耗。范围分区通常用于具有关键列的操作(用于分组、合并等)。
您可以通过指定cfg.RangePartitioning 配置变量:来启用范围分区。
import modin.pandas as pd
import modin.config as cfg
cfg.RangePartitioning.put(True) # past this point methods that support range-partitioning
# will use it
pd.DataFrame(...).groupby(...).mean() # use range-partitioning for groupby.mean()
cfg.Range-partitioning.put(False)
pd.DataFrame(...).groupby(...).mean() # use MapReduce implementation for groupby.mean()
构建范围分区假设数据重新洗牌,这可能会导致打破行的原始顺序,对于某些操作,这将意味着结果将与Pandas不同。
范围分区并不是万能的,意味着启用它并不总是有益的。下面您会找到一个链接,列出了支持范围分区的操作以及何时应该启用它的实用建议:支持范围分区的操作。
Modin中的动态分区#
Ray引擎在同时运行大量小型远程任务时会遇到速度减慢的问题。Ray Core建议避免小型任务。 当Modin DataFrame有大量分区时,某些函数会产生大量远程任务,这可能导致速度减慢。 为了解决这个问题,Modin建议使用动态分区。这种方法通过将多个分区合并为一个虚拟分区并在其上执行一个共同的远程任务来减少远程任务的数量。
动态分区通常用于在所有分区上分别完全或部分执行的操作。
import modin.pandas as pd
from modin.config import context
df = pd.DataFrame(...)
with context(DynamicPartitioning=True):
df.abs()
动态分区并不总是有用,这种方法通常用于具有大量列的中等大小的DataFrames。 如果列的数量较少,分区的数量将接近CPU的数量,Ray将不会有这个问题。 如果DataFrame有太多行,这也不是使用动态分区的好情况,因为每个任务不再是小任务,执行组合任务的开销比单独分配它们更大。
不幸的是,动态分区的使用取决于多种因素,如数据大小、CPU数量、执行的操作等,用户需要自行判断动态分区是否能在其情况下带来性能提升。
理解Modin的分区机制#
Modin的分区对性能至关重要;因此我们建议专家用户理解Modin的分区机制以及如何调整它以获得更好的性能。
Modin如何分区数据框#
Modin 使用一种分区方案,该方案沿着两个轴对数据框进行分区,从而形成一个分区矩阵。行和列的分块大小是根据相应轴的长度和 Modin 的特殊 配置变量(NPartitions、MinRowPartitionSize 和 MinColumnPartitionSize)独立计算的:
NPartitions是沿轴的最大分割数;默认情况下,它等于本地机器或节点集群上的核心数。MinRowPartitionSize是进行分割的最小行数。例如,如果MinRowPartitionSize是 32,那么行轴将不会被分割,除非行数大于 32。如果行数大于 32,例如 34,那么行轴将被分割成两个分区:分别包含 32 行和 2 行。MinColumnPartitionSize是进行分割的最小列数。例如,如果MinColumnPartitionSize是 32,则列轴不会被分割,除非列数大于 32。如果列数大于 32,例如 34,则列轴将被分割成两个分区:分别包含 32 列和 2 列。
请注意,NPartitions 指定了沿单个轴的分区数量的限制,这意味着整个数据帧的实际限制是 NPartitions 的平方。
全轴函数#
一些聚合函数需要了解整个轴的信息,例如在.apply(foo, axis=0)中,传递的函数foo期望一次性接收整个列的数据。
当应用全轴函数时,沿此轴的分区会被收集到处理该函数的单个工作节点。函数执行完毕后,数据的分区将恢复正常。
请注意,远程调用的数量等于分区的数量,这意味着由于全轴函数的分区数量减少,并行性的潜力也会降低。
还需注意,诸如 .sum()、.mean()、.max() 等归约函数不被视为全轴操作,因此它们不会受到并行度降低的影响。
如何调整分区#
配置Modin的默认分区方案#
从上面的例子中可以看出,数据框的形状越接近正方形,分区数量就越接近NPartitions的平方。在NPartitions等于工作节点数量的情况下,这意味着单个工作节点将同时处理多个分区,从而降低了整体性能。
如果你的工作流主要处理宽数据框和非全轴函数,那么减少NPartitions值是有意义的,这样单个工作进程将处理单个分区。
可复制粘贴的示例,展示了如何调整宽框架的NPartitions值可能会提高您机器上的性能:
from multiprocessing import cpu_count
from modin.distributed.dataframe.pandas import unwrap_partitions
import modin.config as cfg
import modin.pandas as pd
import numpy as np
import timeit
# Generating data for a square-like dataframe
data = np.random.randint(0, 100, size=(5000, 5000))
# Explicitly setting `NPartitions` to its default value
cfg.NPartitions.put(cpu_count())
# Each worker processes `cpu_count()` amount of partitions
df = pd.DataFrame(data)
print(f"NPartitions: {cfg.NPartitions.get()}")
# Getting raw partitions to count them
partitions_shape = np.array(unwrap_partitions(df)).shape
print(
f"The frame has {partitions_shape[0]}x{partitions_shape[1]}={np.prod(partitions_shape)} partitions "
f"when the CPU has only {cpu_count()} cores."
)
print(f"10 times of .abs(): {timeit.timeit(lambda: df.abs(), number=10)}s.")
# Possible output:
# NPartitions: 112
# The frame has 112x112=12544 partitions when the CPU has only 112 cores.
# 10 times of .abs(): 23.64s.
# Taking a square root of the the current `cpu_count` to make more even partitioning
cfg.NPartitions.put(int(cpu_count() ** 0.5))
# Each worker processes a single partition
df = pd.DataFrame(data)
print(f"NPartitions: {cfg.NPartitions.get()}")
# Getting raw partitions to count them
partitions_shape = np.array(unwrap_partitions(df)).shape
print(
f"The frame has {partitions_shape[0]}x{partitions_shape[1]}={np.prod(partitions_shape)} "
f"when the CPU has {cpu_count()} cores."
)
print(f"10 times of .abs(): {timeit.timeit(lambda: df.abs(), number=10)}s.")
# Possible output:
# NPartitions: 10
# The frame has 10x10=100 partitions when the CPU has 112 cores.
# 10 times of .abs(): 0.25s.
手动触发重新分区#
如果您遇到意外的性能不佳,尽管您正确配置了MODIN_NPARTITIONS,那么这可能是由于工作流执行期间发生的不平衡分区引起的。
Modin的理念是在内部处理分区,不让用户担心应用大量可能影响DataFrame分区的“不良”操作的潜在后果。我们不断努力寻找并修复可能导致用户头疼的分区情况。
然而,如果您觉得您正在处理不平衡的分区,您可以尝试在您的DataFrame上调用内部的modin.pandas.dataframe.DataFrame._repartition()方法,以手动触发分区重新平衡,并查看它是否改善了您的案例的性能。
- DataFrame._repartition(axis: Optional[int] = None) Self#
重新分区Modin对象以获取理想的分区。
允许在查询编译器尚无法通过隐式重新分区来提高性能的情况下提高性能。
此方法的实际用例可能如下:
import modin.pandas as pd
import timeit
df = pd.DataFrame({"col0": [1, 2, 3, 4]})
# Appending a lot of columns may result into unbalanced partitioning
for i in range(1, 128):
df[f"col{i}"] = pd.Series([1, 2, 3, 4])
print(
"DataFrame with unbalanced partitioning:",
timeit.timeit(lambda: df.sum(), number=10)
) # 1.44s
df = df._repartition()
print(
"DataFrame after '._repartition()':",
timeit.timeit(lambda: df.sum(), number=10)
) # 0.21s.
避免遍历Modin DataFrame#
尽可能使用df.apply()或其他聚合方法,而不是遍历数据框。
For循环无法扩展,并且会强制将分布式数据收集回驱动程序。
可复制粘贴的示例,展示了如何将for循环替换为等效的.apply()可能会提高性能:
import modin.pandas as pd
import numpy as np
from timeit import default_timer as timer
data = np.random.randint(1, 100, (2 ** 10, 2 ** 2))
md_df = pd.DataFrame(data)
result = []
t1 = timer()
# Iterating over a dataframe forces to collect distributed data to the driver and doesn't scale
for idx, row in md_df.iterrows():
result.append((row[1] + row[2]) / row[3])
print(f"Filling a list by iterating a Modin frame: {timer() - t1:.2f}s.")
# Possible output: 36.15s.
t1 = timer()
# Using `.apply()` perfectly scales to all axis-partitions
result = md_df.apply(lambda row: (row[1] + row[2]) / row[3], axis=1).to_numpy().tolist()
print(f"Filling a list by using '.apply()' and converting the result to a list: {timer() - t1:.2f}s.")
# Possible output: 0.22s.
使用Modin的Dataframe代数API实现自定义并行函数#
Modin 提供了一组低级别的并行实现操作符,可用于构建大多数聚合函数。这些操作符存在于 代数模块 中。 Modin DataFrame 允许用户使用他们自己通过此模块构建的聚合。请访问文档中的 DataFrame 的代数 页面以了解如何操作。
避免混合使用pandas和Modin数据框#
尽管Modin被认为是pandas的即插即用替代品,但Modin和pandas并不打算在单一流程中一起使用。将pandas DataFrame作为Modin DataFrame方法的参数传递可能会减慢函数的速度(因为它必须处理非分布式对象)或引发错误。如果你将Modin DataFrame作为输入传递给pandas方法,你也会得到未定义的行为,因为pandas将Modin的对象识别为简单的可迭代对象,因此无法利用其作为分布式数据框架的优势。
可复制粘贴的示例,展示了在单个流程中混合使用pandas和Modin DataFrame可能会成为性能瓶颈:
import modin.pandas as pd
import numpy as np
import timeit
import pandas
data = np.random.randint(0, 100, (2 ** 20, 2 ** 2))
md_df, md_df_copy = pd.DataFrame(data), pd.DataFrame(data)
pd_df, pd_df_copy = pandas.DataFrame(data), pandas.DataFrame(data)
print("concat modin frame + pandas frame:")
# Concatenating modin frame + pandas frame using modin '.concat()'
# This case is bad because Modin have to process non-distributed pandas object
time = timeit.timeit(lambda: pd.concat([md_df, pd_df]), number=10)
print(f"\t{time}s.\n")
# Possible output: 0.44s.
print("concat modin frame + modin frame:")
# Concatenating modin frame + modin frame using modin '.concat()'
# This is an ideal case, Modin is being used as intended
time = timeit.timeit(lambda: pd.concat([md_df, md_df_copy]), number=10)
print(f"\t{time}s.\n")
# Possible output: 0.05s.
print("concat pandas frame + pandas frame:")
# Concatenating pandas frame + pandas frame using pandas '.concat()'
time = timeit.timeit(lambda: pandas.concat([pd_df, pd_df_copy]), number=10)
print(f"\t{time}s.\n")
# Possible output: 0.31s.
print("concat pandas frame + modin frame:")
# Concatenating pandas frame + modin frame using pandas '.concat()'
time = timeit.timeit(lambda: pandas.concat([pd_df, md_df]), number=10)
print(f"\t{time}s.\n")
# Possible output: TypeError
使用NativeQueryCompiler执行DataFrame操作#
默认情况下,Modin 将数据分布在多个分区中,并使用 PandasQueryCompiler 执行操作。然而,在某些情况下,例如处理小型或空的 DataFrame 时,分布它们可能会引入不必要的开销。在这种情况下,在查询编译器层默认使用 pandas 更为高效。这可以通过将 cfg.NativeDataframeMode 配置变量: 设置为 Pandas 来实现。当设置为 Pandas 时,Modin 中的所有操作都默认使用 pandas,并且 DataFrame 不会被分布,从而避免了额外的开销。此配置可以根据是否需要 DataFrame 分布来切换。
在NativeDataframeMode激活时创建的DataFrames将继续使用NativeQueryCompiler,即使在配置被禁用后也是如此。Modin支持分布式Modin DataFrames与使用NativeQueryCompiler的DataFrames之间的互操作性。
import modin.pandas as pd
import modin.config as cfg
# This dataframe will be distributed and use `PandasQueryCompiler` by default
df_distributed = pd.DataFrame(...)
# Set mode to "Pandas" to avoid distribution and use `NativeQueryCompiler`
cfg.NativeDataframeMode.put("Pandas")
df_native_qc = pd.DataFrame(...)
# Revert to default settings for distributed dataframes
cfg.NativeDataframeMode.put("Default")
df_distributed = pd.DataFrame(...)
特定操作的优化#
合并#
merge 操作在 Modin 中使用广播连接算法:将右侧的 Modin DataFrame 合并到一个 pandas DataFrame 中,并将其广播到左侧 Modin DataFrame 的行分区。为了在进行内连接时最小化进程间通信成本,您可能需要交换左右 DataFrame。
import modin.pandas as pd
import numpy as np
left_data = np.random.randint(0, 100, size=(2**8, 2**8))
right_data = np.random.randint(0, 100, size=(2**12, 2**12))
left_df = pd.DataFrame(left_data)
right_df = pd.DataFrame(right_data)
%timeit left_df.merge(right_df, how="inner", on=10)
3.59 s 107 ms per loop (mean std. dev. of 7 runs, 1 loop each)
%timeit right_df.merge(left_df, how="inner", on=10)
1.22 s 40.1 ms per loop (mean std. dev. of 7 runs, 1 loop each)
请注意,第一次和第二次merge的结果列顺序可能不同。