pyspark.pandas.merge_asof

pyspark.pandas. merge_asof ( left : Union [ pyspark.pandas.frame.DataFrame , pyspark.pandas.series.Series ] , right : Union [ pyspark.pandas.frame.DataFrame , pyspark.pandas.series.Series ] , on : Union[Any, Tuple[Any, …], None] = None , left_on : Union[Any, Tuple[Any, …], None] = None , right_on : Union[Any, Tuple[Any, …], None] = None , left_index : bool = False , right_index : bool = False , by : Union[Any, Tuple[Any, …], List[Union[Any, Tuple[Any, …]]], None] = None , left_by : Union[Any, Tuple[Any, …], List[Union[Any, Tuple[Any, …]]], None] = None , right_by : Union[Any, Tuple[Any, …], List[Union[Any, Tuple[Any, …]]], None] = None , suffixes : Tuple [ str , str ] = '_x', '_y' , tolerance : Optional [ Any ] = None , allow_exact_matches : bool = True , direction : str = 'backward' ) → pyspark.pandas.frame.DataFrame [source]

执行asof合并。

这类似于左连接,只不过我们是基于最近的键进行匹配,而不是基于相等的键。

对于左DataFrame中的每一行:

  • “向后”搜索选择右DataFrame中最后一行的‘on’键小于或等于左键的行。

  • “向前”搜索选择右DataFrame中第一行的‘on’键大于或等于左键的行。

  • “最近”搜索选择右DataFrame中‘on’键在绝对距离上最接近左键的行。

在搜索时,可以先使用‘by’匹配等效键,然后再使用‘on’进行搜索。

新增于版本 3.3.0。

Parameters
left DataFrame or named Series
right DataFrame or named Series
on label

要连接的字段名称。必须同时存在于两个DataFrame中。 数据必须是有序的。这必须是一个数值列, 例如日期时间类型、整数或浮点数。必须给出on或left_on/right_on。

left_on label

左DataFrame中用于连接的字段名称。

right_on label

右侧DataFrame中用于连接的字段名称。

left_index bool

使用左侧DataFrame的索引作为连接键。

right_index bool

使用右侧DataFrame的索引作为连接键。

by column name or list of column names

在执行合并操作之前,先在这些列上进行匹配。

left_by column name

左侧DataFrame中要匹配的字段名称。

right_by column name

右侧DataFrame中要匹配的字段名称。

suffixes 2-length sequence (tuple, list, …)

分别应用于左侧和右侧重叠列名的后缀。

tolerance int or Timedelta, optional, default None

在此范围内选择asof容差;必须与合并索引兼容。

allow_exact_matches bool, default True
  • 如果为True,允许匹配相同的‘on’值 (即小于等于/大于等于)

  • 如果为False,不匹配相同的‘on’值 (即严格小于/严格大于)。

direction ‘backward’ (default), ‘forward’, or ‘nearest’

是否搜索先前、后续或最近的匹配项。

Returns
merged DataFrame

另请参阅

merge

与数据库风格的连接合并。

merge_ordered

合并并可选填充/插值。

示例

>>> left = ps.DataFrame({"a": [1, 5, 10], "left_val": ["a", "b", "c"]})
>>> left
    a left_val
0   1        a
1   5        b
2  10        c
>>> right = ps.DataFrame({"a": [1, 2, 3, 6, 7], "right_val": [1, 2, 3, 6, 7]})
>>> right
   a  right_val
0  1          1
1  2          2
2  3          3
3  6          6
4  7          7
>>> ps.merge_asof(left, right, on="a").sort_values("a").reset_index(drop=True)
    a left_val  right_val
0   1        a          1
1   5        b          3
2  10        c          7
>>> ps.merge_asof(
...     left,
...     right,
...     on="a",
...     allow_exact_matches=False
... ).sort_values("a").reset_index(drop=True)
    a left_val  right_val
0   1        a        NaN
1   5        b        3.0
2  10        c        7.0
>>> ps.merge_asof(
...     left,
...     right,
...     on="a",
...     direction="forward"
... ).sort_values("a").reset_index(drop=True)
    a left_val  right_val
0   1        a        1.0
1   5        b        6.0
2  10        c        NaN
>>> ps.merge_asof(
...     left,
...     right,
...     on="a",
...     direction="nearest"
... ).sort_values("a").reset_index(drop=True)
    a left_val  right_val
0   1        a          1
1   5        b          6
2  10        c          7

我们也可以使用带索引的DataFrame。

>>> left = ps.DataFrame({"left_val": ["a", "b", "c"]}, index=[1, 5, 10])
>>> left
   left_val
1         a
5         b
10        c
>>> right = ps.DataFrame({"right_val": [1, 2, 3, 6, 7]}, index=[1, 2, 3, 6, 7])
>>> right
   right_val
1          1
2          2
3          3
6          6
7          7
>>> ps.merge_asof(left, right, left_index=True, right_index=True).sort_index()
   left_val  right_val
1         a          1
5         b          3
10        c          7

这是一个现实世界的时间序列示例

>>> quotes = ps.DataFrame(
...     {
...         "time": [
...             pd.Timestamp("2016-05-25 13:30:00.023"),
...             pd.Timestamp("2016-05-25 13:30:00.023"),
...             pd.Timestamp("2016-05-25 13:30:00.030"),
...             pd.Timestamp("2016-05-25 13:30:00.041"),
...             pd.Timestamp("2016-05-25 13:30:00.048"),
...             pd.Timestamp("2016-05-25 13:30:00.049"),
...             pd.Timestamp("2016-05-25 13:30:00.072"),
...             pd.Timestamp("2016-05-25 13:30:00.075")
...         ],
...         "ticker": [
...                "GOOG",
...                "MSFT",
...                "MSFT",
...                "MSFT",
...                "GOOG",
...                "AAPL",
...                "GOOG",
...                "MSFT"
...            ],
...            "bid": [720.50, 51.95, 51.97, 51.99, 720.50, 97.99, 720.50, 52.01],
...            "ask": [720.93, 51.96, 51.98, 52.00, 720.93, 98.01, 720.88, 52.03]
...     }
... )
>>> quotes
                     time ticker     bid     ask
0 2016-05-25 13:30:00.023   GOOG  720.50  720.93
1 2016-05-25 13:30:00.023   MSFT   51.95   51.96
2 2016-05-25 13:30:00.030   MSFT   51.97   51.98
3 2016-05-25 13:30:00.041   MSFT   51.99   52.00
4 2016-05-25 13:30:00.048   GOOG  720.50  720.93
5 2016-05-25 13:30:00.049   AAPL   97.99   98.01
6 2016-05-25 13:30:00.072   GOOG  720.50  720.88
7 2016-05-25 13:30:00.075   MSFT   52.01   52.03
>>> trades = ps.DataFrame(
...        {
...            "time": [
...                pd.Timestamp("2016-05-25 13:30:00.023"),
...                pd.Timestamp("2016-05-25 13:30:00.038"),
...                pd.Timestamp("2016-05-25 13:30:00.048"),
...                pd.Timestamp("2016-05-25 13:30:00.048"),
...                pd.Timestamp("2016-05-25 13:30:00.048")
...            ],
...            "ticker": ["MSFT", "MSFT", "GOOG", "GOOG", "AAPL"],
...            "price": [51.95, 51.95, 720.77, 720.92, 98.0],
...            "quantity": [75, 155, 100, 100, 100]
...        }
...    )
>>> trades
                     time ticker   price  quantity
0 2016-05-25 13:30:00.023   MSFT   51.95        75
1 2016-05-25 13:30:00.038   MSFT   51.95       155
2 2016-05-25 13:30:00.048   GOOG  720.77       100
3 2016-05-25 13:30:00.048   GOOG  720.92       100
4 2016-05-25 13:30:00.048   AAPL   98.00       100

默认情况下,我们采用的是报价的asof

>>> ps.merge_asof(
...    trades, quotes, on="time", by="ticker"
... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
                     time ticker   price  quantity     bid     ask
0 2016-05-25 13:30:00.023   MSFT   51.95        75   51.95   51.96
1 2016-05-25 13:30:00.038   MSFT   51.95       155   51.97   51.98
2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
3 2016-05-25 13:30:00.048   GOOG  720.77       100  720.50  720.93
4 2016-05-25 13:30:00.048   GOOG  720.92       100  720.50  720.93

我们仅在报价时间和交易时间之间2毫秒内进行asof操作

>>> ps.merge_asof(
...     trades,
...     quotes,
...     on="time",
...     by="ticker",
...     tolerance=sf.expr("INTERVAL 2 MILLISECONDS")  # pd.Timedelta("2ms")
... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
                     time ticker   price  quantity     bid     ask
0 2016-05-25 13:30:00.023   MSFT   51.95        75   51.95   51.96
1 2016-05-25 13:30:00.038   MSFT   51.95       155     NaN     NaN
2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
3 2016-05-25 13:30:00.048   GOOG  720.77       100  720.50  720.93
4 2016-05-25 13:30:00.048   GOOG  720.92       100  720.50  720.93

我们仅在报价时间和交易时间之间相差10毫秒以内进行asof操作,并且我们排除时间完全匹配的情况。然而, 先前 的数据将会向前传播

>>> ps.merge_asof(
...     trades,
...     quotes,
...     on="time",
...     by="ticker",
...     tolerance=sf.expr("INTERVAL 10 MILLISECONDS"),  # pd.Timedelta("10ms")
...     allow_exact_matches=False
... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
                     time ticker   price  quantity     bid     ask
0 2016-05-25 13:30:00.023   MSFT   51.95        75     NaN     NaN
1 2016-05-25 13:30:00.038   MSFT   51.95       155   51.97   51.98
2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
3 2016-05-25 13:30:00.048   GOOG  720.77       100     NaN     NaN
4 2016-05-25 13:30:00.048   GOOG  720.92       100     NaN     NaN