Modin中支持范围分区的操作#
一旦cfg.RangePartitioning变量设置为True,以下操作将改变其行为。浏览列表,找出何时对某个方法启用范围分区可能是有益的。
分组#
注意
在使用范围分区实现进行多列分组时,即使传递了groupby(sort=True, ...),结果也可能不会排序:modin-project/modin#6875。
对于groupby.apply()、groupby.transform()、groupby.rolling(),范围分组的groupby实现会自动启用。对于来自此列表的groupby聚合,默认使用MapReduce实现。MapReduce在低基数的groupby中往往表现出更好的性能。如果您要分组的列基数预计较高,建议启用范围分组实现。
合并#
注意
范围分区方法仅在“左”和“内”合并时实现,并且仅在使用on参数在单列上合并时实现。
范围分区合并取代了广播合并。如果合并中的右侧数据框与左侧数据框一样大,建议使用范围分区实现。在这种情况下,范围分区实现工作更快且消耗更少的内存。
在下面的内容中,您可以找到在不同场景下范围分区和广播合并的性能比较:
Performance measurements for merge
性能是在h2o join queries上使用Intel(R) Xeon(R) Gold 6238R CPU @ 2.20GHz(56核)进行测量的,其中为Modin分配的核数限制为44(MODIN_CPUS=44)。
小500mb数据的测量:
中等5GB数据的测量结果:
.unique() 和 .drop_duplicates()#
注意
当启用范围分区时,.unique() 和 .drop_duplicates() 将生成按行排序的结果。如果禁用范围分区,将保持原始顺序。
当输入数据量较大(超过5,000,000行)且预期输出数据量也较大(不超过80%的值为重复值)时,.unique() / .drop_duplicates()的范围分区实现效果最佳。
在下面的隐藏内容中,您可以找到不同场景下的性能比较:
Performance measurements for ``.unique()``
性能测试是在使用Intel(R) Xeon(R) Gold 6238R CPU @ 2.20GHz(56核)的随机生成数据上进行的。重复率显示了数据集中重复行的百分比。您可以通过阅读其源代码了解更多关于此微基准测试的信息:
Micro-benchmark's source code
import modin.pandas as pd
import numpy as np
import modin.config as cfg
from modin.utils import execute
from timeit import default_timer as timer
import pandas
cfg.CpuCount.put(16)
def get_data(nrows, dtype):
if dtype == int:
return np.arange(nrows)
elif dtype == float:
return np.arange(nrows).astype(float)
elif dtype == str:
return np.array([f"value{i}" for i in range(nrows)])
else:
raise NotImplementedError(dtype)
pd.DataFrame(np.arange(cfg.NPartitions.get() * cfg.MinRowPartitionSize.get())).to_numpy()
nrows = [1_000_000, 5_000_000, 10_000_000, 25_000_000, 50_000_000, 100_000_000]
duplicate_rate = [0, 0.1, 0.5, 0.95]
dtypes = [int, str]
use_range_part = [True, False]
columns = pandas.MultiIndex.from_product([dtypes, duplicate_rate, use_range_part], names=["dtype", "duplicate rate", "use range-part"])
result = pandas.DataFrame(index=nrows, columns=columns)
i = 0
total_its = len(nrows) * len(duplicate_rate) * len(dtypes) * len(use_range_part)
for dt in dtypes:
for nrow in nrows:
data = get_data(nrow, dt)
np.random.shuffle(data)
for dpr in duplicate_rate:
data_c = data.copy()
dupl_val = data_c[0]
num_duplicates = int(dpr * nrow)
dupl_indices = np.random.choice(np.arange(nrow), num_duplicates, replace=False)
data_c[dupl_indices] = dupl_val
for impl in use_range_part:
print(f"{round((i / total_its) * 100, 2)}%")
i += 1
cfg.RangePartitioning.put(impl)
sr = pd.Series(data_c)
execute(sr)
t1 = timer()
# returns a list, so no need for materialization
sr.unique()
tm = timer() - t1
print(nrow, dpr, dt, impl, tm)
result.loc[nrow, (dt, dpr, impl)] = tm
result.to_excel("unique.xlsx")
为Modin分配16个核心的测量结果(MODIN_CPUS=16):
为Modin分配44个核心的测量结果(MODIN_CPUS=44):
Performance measurements for ``.drop_duplicates()``
性能测试是在使用Intel(R) Xeon(R) Gold 6238R CPU @ 2.20GHz(56核)随机生成的数据上进行的。重复率显示了数据集中重复行的百分比。子集大小显示了作为subset参数为df.drop_duplicates()指定的列数。您可以通过阅读其源代码了解更多关于此微基准测试的信息:
Micro-benchmark's source code
import modin.pandas as pd
import numpy as np
import modin.config as cfg
from modin.utils import execute
from timeit import default_timer as timer
import pandas
cfg.CpuCount.put(16)
pd.DataFrame(np.arange(cfg.NPartitions.get() * cfg.MinRowPartitionSize.get())).to_numpy()
nrows = [1_000_000, 5_000_000, 10_000_000, 25_000_000]
duplicate_rate = [0, 0.1, 0.5, 0.95]
subset = [["col0"], ["col1", "col2", "col3", "col4"], None]
ncols = 15
use_range_part = [True, False]
columns = pandas.MultiIndex.from_product(
[
[len(sbs) if sbs is not None else ncols for sbs in subset],
duplicate_rate,
use_range_part
],
names=["subset size", "duplicate rate", "use range-part"]
)
result = pandas.DataFrame(index=nrows, columns=columns)
i = 0
total_its = len(nrows) * len(duplicate_rate) * len(subset) * len(use_range_part)
for sbs in subset:
for nrow in nrows:
data = {f"col{i}": np.arange(nrow) for i in range(ncols)}
pandas_df = pandas.DataFrame(data)
for dpr in duplicate_rate:
pandas_df_c = pandas_df.copy()
dupl_val = pandas_df_c.iloc[0]
num_duplicates = int(dpr * nrow)
dupl_indices = np.random.choice(np.arange(nrow), num_duplicates, replace=False)
pandas_df_c.iloc[dupl_indices] = dupl_val
for impl in use_range_part:
print(f"{round((i / total_its) * 100, 2)}%")
i += 1
cfg.RangePartitioning.put(impl)
md_df = pd.DataFrame(pandas_df_c)
execute(md_df)
t1 = timer()
res = md_df.drop_duplicates(subset=sbs)
execute(res)
tm = timer() - t1
sbs_s = len(sbs) if sbs is not None else ncols
print("len()", res.shape, nrow, dpr, sbs_s, impl, tm)
result.loc[nrow, (sbs_s, dpr, impl)] = tm
result.to_excel("drop_dupl.xlsx")
为Modin分配16个核心的测量结果(MODIN_CPUS=16):
为Modin分配44个核心的测量结果(MODIN_CPUS=44):
‘.nunique()’#
注意
范围分区方法仅针对pd.Series.nunique()和单列数据框实现。
对于多列数据框,.nunique()只能使用全轴减少实现。
当输入数据量较大(超过5,000,000行)且输出数据量也预计较大(重复值不超过80%)时,'.nunique()'的范围分区实现效果最佳。
在下面的隐藏内容中,您可以找到不同场景下的性能比较:
Performance measurements for ``.nunique()``
性能测试是在使用Intel(R) Xeon(R) Gold 6238R CPU @ 2.20GHz(56核)随机生成的数据上进行的。重复率显示了数据集中重复行的百分比。您可以通过阅读其源代码了解更多关于此微基准测试的信息:
Micro-benchmark's source code
import modin.pandas as pd
import numpy as np
import modin.config as cfg
from modin.utils import execute
from timeit import default_timer as timer
import pandas
cfg.CpuCount.put(16)
def get_data(nrows, dtype):
if dtype == int:
return np.arange(nrows)
elif dtype == float:
return np.arange(nrows).astype(float)
elif dtype == str:
return np.array([f"value{i}" for i in range(nrows)])
else:
raise NotImplementedError(dtype)
pd.DataFrame(np.arange(cfg.NPartitions.get() * cfg.MinRowPartitionSize.get())).to_numpy()
nrows = [1_000_000, 5_000_000, 10_000_000, 25_000_000, 50_000_000, 100_000_000]
duplicate_rate = [0, 0.1, 0.5, 0.95]
dtypes = [int, str]
use_range_part = [True, False]
columns = pandas.MultiIndex.from_product([dtypes, duplicate_rate, use_range_part], names=["dtype", "duplicate rate", "use range-part"])
result = pandas.DataFrame(index=nrows, columns=columns)
i = 0
total_its = len(nrows) * len(duplicate_rate) * len(dtypes) * len(use_range_part)
for dt in dtypes:
for nrow in nrows:
data = get_data(nrow, dt)
np.random.shuffle(data)
for dpr in duplicate_rate:
data_c = data.copy()
dupl_val = data_c[0]
num_duplicates = int(dpr * nrow)
dupl_indices = np.random.choice(np.arange(nrow), num_duplicates, replace=False)
data_c[dupl_indices] = dupl_val
for impl in use_range_part:
print(f"{round((i / total_its) * 100, 2)}%")
i += 1
cfg.RangePartitioning.put(impl)
sr = pd.Series(data_c)
execute(sr)
t1 = timer()
# returns a scalar, so no need for materialization
res = sr.nunique()
tm = timer() - t1
print(nrow, dpr, dt, impl, tm)
result.loc[nrow, (dt, dpr, impl)] = tm
result.to_excel("nunique.xlsx")
为Modin分配16个核心的测量结果(MODIN_CPUS=16):
重采样#
注意
范围分区方法不支持类似转换的函数(如 .interpolate(), .ffill(), .bfill(), …)
建议在处理具有超过5_000_000行的数据框时使用范围分区进行重采样,如果预期输出也很大(超过500_000行)。
在下面的隐藏内容中,您可以找到不同场景下的性能比较:
Performance measurements for ``.resample()``
下面的脚本使用 Intel(R) Xeon(R) Gold 6238R CPU @ 2.20GHz (56 核心) 测量 df.resample(rule).sum() 的性能。
您可以通过阅读其源代码了解更多关于此微基准测试的信息:
Micro-benchmark's source code
import pandas
import numpy as np
import modin.pandas as pd
import modin.config as cfg
from timeit import default_timer as timer
from modin.utils import execute
cfg.CpuCount.put(16)
nrows = [1_000_000, 5_000_000, 10_000_000]
ncols = [5, 33]
rules = [
"500ms", # doubles nrows
"30s", # decreases nrows in 30 times
"5min", # decreases nrows in 300
]
use_rparts = [True, False]
cols = pandas.MultiIndex.from_product([rules, ncols, use_rparts], names=["rule", "ncols", "USE RANGE PART"])
rres = pandas.DataFrame(index=nrows, columns=cols)
total_nits = len(nrows) * len(ncols) * len(rules) * len(use_rparts)
i = 0
for nrow in nrows:
for ncol in ncols:
index = pandas.date_range("31/12/2000", periods=nrow, freq="s")
data = {f"col{i}": np.arange(nrow) for i in range(ncol)}
pd_df = pandas.DataFrame(data, index=index)
for rule in rules:
for rparts in use_rparts:
print(f"{round((i / total_nits) * 100, 2)}%")
i += 1
cfg.RangePartitioning.put(rparts)
df = pd.DataFrame(data, index=index)
execute(df)
t1 = timer()
res = df.resample(rule).sum()
execute(res)
ts = timer() - t1
print(nrow, ncol, rule, rparts, ts)
rres.loc[nrow, (rule, ncol, rparts)] = ts
rres.to_excel("resample.xlsx")
为Modin分配16个核心的测量结果(MODIN_CPUS=16):
数据透视表#
范围分区实现会自动应用于df.pivot_table,用户无法控制这一点。
排序值#
范围分区实现会自动应用于df.sort_values,用户无法控制这一点。