ray.data.grouped_data.GroupedData.map_groups#
- GroupedData.map_groups(fn: Callable[[pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]], pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]] | Callable[[pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]], Iterator[pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]]] | _CallableClassProtocol, *, compute: str | ComputeStrategy = None, batch_format: str | None = 'default', fn_args: Iterable[Any] | None = None, fn_kwargs: Dict[str, Any] | None = None, fn_constructor_args: Iterable[Any] | None = None, fn_constructor_kwargs: Dict[str, Any] | None = None, num_cpus: float | None = None, num_gpus: float | None = None, concurrency: int | Tuple[int, int] | None = None, **ray_remote_args) Dataset[源代码]#
将给定的函数应用于该数据集的每个记录组。
- 虽然 map_groups() 非常灵活,但请注意它也有缺点:
使用诸如 min()、max() 等更具体的方法可能会更快。
它要求每个组都能在一个节点上的内存中完全容纳。
通常情况下,优先使用 aggregate() 而不是 map_groups()。
示例
>>> # Return a single record per group (list of multiple records in, >>> # list of a single record out). >>> import ray >>> import pandas as pd >>> import numpy as np >>> # Get first value per group. >>> ds = ray.data.from_items([ ... {"group": 1, "value": 1}, ... {"group": 1, "value": 2}, ... {"group": 2, "value": 3}, ... {"group": 2, "value": 4}]) >>> ds.groupby("group").map_groups( ... lambda g: {"result": np.array([g["value"][0]])})
>>> # Return multiple records per group (dataframe in, dataframe out). >>> df = pd.DataFrame( ... {"A": ["a", "a", "b"], "B": [1, 1, 3], "C": [4, 6, 5]} ... ) >>> ds = ray.data.from_pandas(df) >>> grouped = ds.groupby("A") >>> grouped.map_groups( ... lambda g: g.apply( ... lambda c: c / g[c.name].sum() if c.name in ["B", "C"] else c ... ) ... )
- 参数:
fn – 应用于每组记录的函数,或可以实例化以创建此类可调用对象的类类型。它以单个组中所有记录的批次作为输入,并返回零个或多个记录的批次,类似于 map_batches()。
compute – 计算策略,可以是“tasks”(默认)以使用Ray任务,
ray.data.ActorPoolStrategy(size=n)以使用固定大小的actor池,或ray.data.ActorPoolStrategy(min_size=m, max_size=n)以使用自动扩展的actor池。batch_format – 指定
"default"以使用默认块格式(NumPy),"pandas"以选择pandas.DataFrame,"pyarrow"以选择pyarrow.Table,或"numpy"以选择Dict[str, numpy.ndarray],或 None 以返回底层块,不进行任何额外格式化。fn_args –
fn的参数。fn_kwargs –
fn的关键字参数。fn_constructor_args – 传递给
fn构造函数的定位参数。只有当fn是一个可调用类时,你才能提供这个参数。这些参数是底层 Ray 演员构造任务中的顶级参数。fn_constructor_kwargs – 传递给
fn构造函数的键值参数。只有在fn是一个可调用类时才能提供这些参数。这些参数是底层 Ray 角色构造任务中的顶级参数。num_cpus – 为每个并行映射工作器保留的CPU数量。
num_gpus – 为每个并行映射工作器预留的GPU数量。例如,指定
num_gpus=1以请求每个并行映射工作器使用1个GPU。ray_remote_args – 从ray请求的额外资源需求(例如,num_gpus=1 用于为map任务请求GPU)。
- 返回:
返回类型由
fn的返回类型决定,返回值由所有组的结果组合而成。