批量管道API使用指南#

Modin 提供了一个实验性的批处理功能,该功能可以流水线化行并行查询。此功能目前仅支持 PandasOnRay 引擎。请注意,此功能是实验性的,行为或接口可能会发生变化。

使用示例#

在下面的示例中,我们构建并运行了一些管道。需要注意的是,传递给管道的查询操作的是由pandas支持的Modin DataFrame分区。当使用pandas模块级别的函数时,请确保导入并使用pandas而不是modin.pandas

简单的批处理流水线#

本示例通过一个简单的批处理管道来让用户熟悉API。

from modin.experimental.batch import PandasQueryPipeline
import modin.pandas as pd
import numpy as np

df = pd.DataFrame(
    np.random.randint(0, 100, (100, 100)),
    columns=[f"col {i}" for i in range(1, 101)],
) # Build the dataframe we will pipeline.
pipeline = PandasQueryPipeline(df) # Build the pipeline.
pipeline.add_query(lambda df: df + 1, is_output=True) # Add the first query and specify that
                                                      # it is an output query.
pipeline.add_query(
    lambda df: df.rename(columns={f"col {i}":f"col {i-1}" for i in range(1, 101)})
) # Add a second query.
pipeline.add_query(
    lambda df: df.drop(columns=['col 99']),
    is_output=True,
) # Add a third query and specify that it is an output query.
new_df = pd.DataFrame(
    np.ones((100, 100)),
    columns=[f"col {i}" for i in range(1, 101)],
) # Build a second dataframe that we will pipeline now instead.
pipeline.update_df(new_df) # Update the dataframe that we will pipeline to be `new_df`
                           # instead of `df`.
result_dfs = pipeline.compute_batch() # Begin batch processing.

# Print pipeline results
print(f"Result of Query 1:\n{result_dfs[0]}")
print(f"Result of Query 2:\n{result_dfs[1]}")
# Output IDs can also be specified
pipeline = PandasQueryPipeline(df) # Build the pipeline.
pipeline.add_query(
    lambda df: df + 1,
    is_output=True,
    output_id=1,
) # Add the first query, specify that it is an output query, as well as specify an output id.
pipeline.add_query(
    lambda df: df.rename(columns={f"col {i}":f"col {i-1}" for i in range(1, 101)})
) # Add a second query.
pipeline.add_query(
    lambda df: df.drop(columns=['col 99']),
    is_output=True,
    output_id=2,
) # Add a third query, specify that it is an output query, and specify an output_id.
result_dfs = pipeline.compute_batch() # Begin batch processing.

# Print pipeline results - should be a dictionary mapping Output IDs to resulting dataframes:
print(f"Mapping of Output ID to dataframe:\n{result_dfs}")
# Print results
for query_id, res_df in result_dfs.items():
    print(f"Query {query_id} resulted in\n{res_df}")

批量管道处理与后处理#

在调用pipeline.compute_batch时,也可以提供一个后处理函数。下面的示例运行了一个与上述类似的管道,但后处理函数将输出的数据帧写入到一个parquet文件中。

from modin.experimental.batch import PandasQueryPipeline
import modin.pandas as pd
import numpy as np
import os
import shutil

df = pd.DataFrame(
    np.random.randint(0, 100, (100, 100)),
    columns=[f"col {i}" for i in range(1, 101)],
) # Build the dataframe we will pipeline.
pipeline = PandasQueryPipeline(df) # Build the pipeline.
pipeline.add_query(
    lambda df: df + 1,
    is_output=True,
    output_id=1,
) # Add the first query, specify that it is an output query, as well as specify an output id.
pipeline.add_query(
    lambda df: df.rename(columns={f"col {i}":f"col {i-1}" for i in range(1, 101)})
) # Add a second query.
pipeline.add_query(
    lambda df: df.drop(columns=['col 99']),
    is_output=True,
    output_id=2,
) # Add a third query, specify that it is an output query, and specify an output_id.
def postprocessing_func(df, output_id, partition_id):
    filepath = f"query_{output_id}/"
    os.makedirs(filepath, exist_ok=True)
    filepath += f"part-{partition_id:04d}.parquet"
    df.to_parquet(filepath)
    return df
result_dfs = pipeline.compute_batch(
    postprocessor=postprocessing_func,
    pass_partition_id=True,
    pass_output_id=True,
) # Begin computation, pass in a postprocessing function, and specify that partition ID and
  # output ID should be passed to that postprocessing function.

print(os.system("ls query_1/")) # Should show `NPartitions.get()` parquet files - which
                                # correspond to partitions of the output of query 1.
print(os.system("ls query_2/")) # Should show `NPartitions.get()` parquet files - which
                                # correspond to partitions of the output of query 2.

for query_id, res_df in result_dfs.items():
    written_df = pd.read_parquet(f"query_{query_id}/")
    shutil.rmtree(f"query_{query_id}/") # Clean up
    print(f"Written and Computed DF are " +
          f"{'equal' if res_df.equals(written_df) else 'not equal'} for query {query_id}")

批量管道与扇出#

如果查询的输入数据框较小(仅包含一个分区),可以使用fan_out参数来引入额外的并行性。fan_out参数会复制输入分区,将查询应用于每个副本,然后使用reduce_fn将所有副本合并回一个分区,当fan_outTrue时,必须指定reduce_fn

可以通过传递给PandasQueryPipeline构造函数的num_partitions参数来控制并行度。此参数指定所需的分区数量,如果未指定,则默认为NPartitions.get()。在扇出期间,输入分区会被复制num_partitions次。在前面的示例中,num_partitions未指定,因此默认为NPartitions.get()

下面的示例演示了fan_outnum_partitions的用法。我们首先展示一个函数示例,该函数将受益于这种计算模式:

import glob
from PIL import Image
import torchvision.transforms as T
import torchvision

transforms = T.Compose([T.ToTensor()])
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
model.eval()
COCO_INSTANCE_CATEGORY_NAMES = [
    '__background__', 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus',
    'train', 'truck', 'boat', 'traffic light', 'fire hydrant', 'N/A', 'stop sign',
    'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow',
    'elephant', 'bear', 'zebra', 'giraffe', 'N/A', 'backpack', 'umbrella', 'N/A', 'N/A',
    'handbag', 'tie', 'suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball',
    'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard', 'tennis racket',
    'bottle', 'N/A', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl',
    'banana', 'apple', 'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza',
    'donut', 'cake', 'chair', 'couch', 'potted plant', 'bed', 'N/A', 'dining table',
    'N/A', 'N/A', 'toilet', 'N/A', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone',
    'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'N/A', 'book',
    'clock', 'vase', 'scissors', 'teddy bear', 'hair drier', 'toothbrush'
]

def contains_cat(image, model):
    image = transforms(image)
    labels = [COCO_INSTANCE_CATEGORY_NAMES[i] for i in model([image])[0]['labels']]
    return 'cat' in labels

def serial_query(df):
    """
    This function takes as input a dataframe with a single row corresponding to a folder
    containing images to parse. Each image in the folder is passed through a neural network
    that detects whether it contains a cat, in serial, and a new column is computed for the
    dataframe that counts the number of images containing cats.

    Parameters
    ----------
    df : a dataframe
        The dataframe to process

    Returns
    -------
    The same dataframe as before, with an additional column containing the count of images
    containing cats.
    """
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    model.eval()
    img_folder = df['images'][0]
    images = sorted(glob.glob(f"{img_folder}/*.jpg"))
    cats = 0
    for img in images:
        cats = cats + 1 if contains_cat(Image.open(img), model) else cats
    df['cat_count'] = cats
    return df

要下载图像文件以测试此代码,请运行以下bash脚本,该脚本将从fast-ai-coco S3存储桶下载图像到当前工作目录中名为images的文件夹中:

aws s3 cp s3://fast-ai-coco/coco_tiny.tgz . --no-sign-request; tar -xf coco_tiny.tgz; mkdir \
    images; mv coco_tiny/train/* images/; rm -rf coco_tiny; rm -rf coco_tiny.tgz

我们可以像这样管道化那段代码:

import modin.pandas as pd
from modin.experimental.batch import PandasQueryPipeline
from time import time
df = pd.DataFrame([['images']], columns=['images'])
pipeline = PandasQueryPipeline(df)
pipeline.add_query(serial_query, is_output=True)
serial_start = time()
df_with_cat_count = pipeline.compute_batch()[0]
serial_end = time()
print(f"Result of pipeline:\n{df_with_cat_count}")

我们可以通过结合fan_outnum_partitions参数,将8x并行性引入到上述管道中,如下所示:

import modin.pandas as pd
from modin.experimental.batch import PandasQueryPipeline
import shutil
from time import time
df = pd.DataFrame([['images']], columns=['images'])
desired_num_partitions = 8
def parallel_query(df, partition_id):
    """
    This function takes as input a dataframe with a single row corresponding to a folder
    containing images to parse. It parses `total_images/desired_num_partitions` images every
    time it is called. A new column is computed for the dataframe that counts the number of
    images containing cats.

    Parameters
    ----------
    df : a dataframe
        The dataframe to process
    partition_id : int
        The partition id of the dataframe that this function runs on.

    Returns
    -------
    The same dataframe as before, with an additional column containing the count of images
    containing cats.
    """
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    model.eval()
    img_folder = df['images'][0]
    images = sorted(glob.glob(f"{img_folder}/*.jpg"))
    total_images = len(images)
    cats = 0
    start_index = partition_id * (total_images // desired_num_partitions)
    if partition_id == desired_num_partitions - 1: # Last partition must parse to end of list
        images = images[start_index:]
    else:
        end_index = (partition_id + 1) * (total_images // desired_num_partitions)
        images = images[start_index:end_index]
    for img in images:
        cats = cats + 1 if contains_cat(Image.open(img), model) else cats
    df['cat_count'] = cats
    return df

def reduce_fn(dfs):
    """
    Coalesce the results of fanning out the `parallel_query` query.

    Parameters
    ----------
    dfs : a list of dataframes
        The resulting dataframes from fanning out `parallel_query`

    Returns
    -------
    A new dataframe whose `cat_count` column is the sum of the `cat_count` column of all
    dataframes in `dfs`
    """
    df = dfs[0]
    cat_count = df['cat_count'][0]
    for dataframe in dfs[1:]:
        cat_count += dataframe['cat_count'][0]
    df['cat_count'] = cat_count
    return df
pipeline = PandasQueryPipeline(df, desired_num_partitions)
pipeline.add_query(
    parallel_query,
    fan_out=True,
    reduce_fn=reduce_fn,
    is_output=True,
    pass_partition_id=True
)
parallel_start = time()
df_with_cat_count = pipeline.compute_batch()[0]
parallel_end = time()
print(f"Result of pipeline:\n{df_with_cat_count}")
print(f"Total Time in Serial: {serial_end - serial_start}")
print(f"Total time with induced parallelism: {parallel_end - parallel_start}")
shutil.rmtree("images/") # Clean up

动态重分区的批处理流水线#

同样,也可以提示Pipeline API在节点完成计算后进行重新分区。目前仅支持输入数据帧仅包含一个分区的情况。重新分区后的分区数量由传递给PandasQueryPipeline构造函数的num_partitions参数控制。

以下示例演示了如何使用 repartition_after 参数。

import modin.pandas as pd
from modin.experimental.batch import PandasQueryPipeline
import numpy as np

small_df = pd.DataFrame([[1, 2, 3]]) # Create a small dataframe

def increase_dataframe_size(df):
    import pandas
    new_df = pandas.concat([df] * 1000)
    new_df = new_df.reset_index(drop=True) # Get a new range index that isn't duplicated
    return new_df

desired_num_partitions = 24 # We will repartition to 24 partitions

def add_partition_id_to_df(df, partition_id):
    import pandas
    new_col = pandas.Series([partition_id]*len(df), name="partition_id", index=df.index)
    return pandas.concat([df, new_col], axis=1)

pipeline = PandasQueryPipeline(small_df, desired_num_partitions)
pipeline.add_query(increase_dataframe_size, repartition_after=True)
pipeline.add_query(add_partition_id_to_df, pass_partition_id=True, is_output=True)
result_df = pipeline.compute_batch()[0]
print(f"Number of partitions passed to second query: " +
      f"{len(np.unique(result_df['partition_id'].values))}")
print(f"Result of pipeline:\n{result_df}")