ray.data.grouped_data.GroupedData.map_groups#

GroupedData.map_groups(fn: Callable[[pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]], pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]] | Callable[[pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]], Iterator[pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]]] | _CallableClassProtocol, *, compute: str | ComputeStrategy = None, batch_format: str | None = 'default', fn_args: Iterable[Any] | None = None, fn_kwargs: Dict[str, Any] | None = None, fn_constructor_args: Iterable[Any] | None = None, fn_constructor_kwargs: Dict[str, Any] | None = None, num_cpus: float | None = None, num_gpus: float | None = None, concurrency: int | Tuple[int, int] | None = None, **ray_remote_args) Dataset[源代码]#

将给定的函数应用于该数据集的每个记录组。

虽然 map_groups() 非常灵活,但请注意它也有缺点:
  • 使用诸如 min()、max() 等更具体的方法可能会更快。

  • 它要求每个组都能在一个节点上的内存中完全容纳。

通常情况下,优先使用 aggregate() 而不是 map_groups()。

示例

>>> # Return a single record per group (list of multiple records in,
>>> # list of a single record out).
>>> import ray
>>> import pandas as pd
>>> import numpy as np
>>> # Get first value per group.
>>> ds = ray.data.from_items([ 
...     {"group": 1, "value": 1},
...     {"group": 1, "value": 2},
...     {"group": 2, "value": 3},
...     {"group": 2, "value": 4}])
>>> ds.groupby("group").map_groups( 
...     lambda g: {"result": np.array([g["value"][0]])})
>>> # Return multiple records per group (dataframe in, dataframe out).
>>> df = pd.DataFrame(
...     {"A": ["a", "a", "b"], "B": [1, 1, 3], "C": [4, 6, 5]}
... )
>>> ds = ray.data.from_pandas(df) 
>>> grouped = ds.groupby("A") 
>>> grouped.map_groups( 
...     lambda g: g.apply(
...         lambda c: c / g[c.name].sum() if c.name in ["B", "C"] else c
...     )
... ) 
参数:
  • fn – 应用于每组记录的函数,或可以实例化以创建此类可调用对象的类类型。它以单个组中所有记录的批次作为输入,并返回零个或多个记录的批次,类似于 map_batches()。

  • compute – 计算策略,可以是“tasks”(默认)以使用Ray任务,ray.data.ActorPoolStrategy(size=n) 以使用固定大小的actor池,或 ray.data.ActorPoolStrategy(min_size=m, max_size=n) 以使用自动扩展的actor池。

  • batch_format – 指定 "default" 以使用默认块格式(NumPy),"pandas" 以选择 pandas.DataFrame"pyarrow" 以选择 pyarrow.Table,或 "numpy" 以选择 Dict[str, numpy.ndarray],或 None 以返回底层块,不进行任何额外格式化。

  • fn_argsfn 的参数。

  • fn_kwargsfn 的关键字参数。

  • fn_constructor_args – 传递给 fn 构造函数的定位参数。只有当 fn 是一个可调用类时,你才能提供这个参数。这些参数是底层 Ray 演员构造任务中的顶级参数。

  • fn_constructor_kwargs – 传递给 fn 构造函数的键值参数。只有在 fn 是一个可调用类时才能提供这些参数。这些参数是底层 Ray 角色构造任务中的顶级参数。

  • num_cpus – 为每个并行映射工作器保留的CPU数量。

  • num_gpus – 为每个并行映射工作器预留的GPU数量。例如,指定 num_gpus=1 以请求每个并行映射工作器使用1个GPU。

  • ray_remote_args – 从ray请求的额外资源需求(例如,num_gpus=1 用于为map任务请求GPU)。

返回:

返回类型由 fn 的返回类型决定,返回值由所有组的结果组合而成。