pyspark.pandas.merge_asof ¶
-
pyspark.pandas.
merge_asof
( left : Union [ pyspark.pandas.frame.DataFrame , pyspark.pandas.series.Series ] , right : Union [ pyspark.pandas.frame.DataFrame , pyspark.pandas.series.Series ] , on : Union[Any, Tuple[Any, …], None] = None , left_on : Union[Any, Tuple[Any, …], None] = None , right_on : Union[Any, Tuple[Any, …], None] = None , left_index : bool = False , right_index : bool = False , by : Union[Any, Tuple[Any, …], List[Union[Any, Tuple[Any, …]]], None] = None , left_by : Union[Any, Tuple[Any, …], List[Union[Any, Tuple[Any, …]]], None] = None , right_by : Union[Any, Tuple[Any, …], List[Union[Any, Tuple[Any, …]]], None] = None , suffixes : Tuple [ str , str ] = '_x', '_y' , tolerance : Optional [ Any ] = None , allow_exact_matches : bool = True , direction : str = 'backward' ) → pyspark.pandas.frame.DataFrame [source] ¶ -
执行asof合并。
这类似于左连接,只不过我们是基于最近的键进行匹配,而不是基于相等的键。
对于左DataFrame中的每一行:
-
“向后”搜索选择右DataFrame中最后一行的‘on’键小于或等于左键的行。
-
“向前”搜索选择右DataFrame中第一行的‘on’键大于或等于左键的行。
-
“最近”搜索选择右DataFrame中‘on’键在绝对距离上最接近左键的行。
在搜索时,可以先使用‘by’匹配等效键,然后再使用‘on’进行搜索。
新增于版本 3.3.0。
- Parameters
-
- left DataFrame or named Series
- right DataFrame or named Series
- on label
-
要连接的字段名称。必须同时存在于两个DataFrame中。 数据必须是有序的。这必须是一个数值列, 例如日期时间类型、整数或浮点数。必须给出on或left_on/right_on。
- left_on label
-
左DataFrame中用于连接的字段名称。
- right_on label
-
右侧DataFrame中用于连接的字段名称。
- left_index bool
-
使用左侧DataFrame的索引作为连接键。
- right_index bool
-
使用右侧DataFrame的索引作为连接键。
- by column name or list of column names
-
在执行合并操作之前,先在这些列上进行匹配。
- left_by column name
-
左侧DataFrame中要匹配的字段名称。
- right_by column name
-
右侧DataFrame中要匹配的字段名称。
- suffixes 2-length sequence (tuple, list, …)
-
分别应用于左侧和右侧重叠列名的后缀。
- tolerance int or Timedelta, optional, default None
-
在此范围内选择asof容差;必须与合并索引兼容。
- allow_exact_matches bool, default True
-
-
如果为True,允许匹配相同的‘on’值 (即小于等于/大于等于)
-
如果为False,不匹配相同的‘on’值 (即严格小于/严格大于)。
-
- direction ‘backward’ (default), ‘forward’, or ‘nearest’
-
是否搜索先前、后续或最近的匹配项。
- Returns
-
- merged DataFrame
另请参阅
-
merge
-
与数据库风格的连接合并。
-
merge_ordered
-
合并并可选填充/插值。
示例
>>> left = ps.DataFrame({"a": [1, 5, 10], "left_val": ["a", "b", "c"]}) >>> left a left_val 0 1 a 1 5 b 2 10 c
>>> right = ps.DataFrame({"a": [1, 2, 3, 6, 7], "right_val": [1, 2, 3, 6, 7]}) >>> right a right_val 0 1 1 1 2 2 2 3 3 3 6 6 4 7 7
>>> ps.merge_asof(left, right, on="a").sort_values("a").reset_index(drop=True) a left_val right_val 0 1 a 1 1 5 b 3 2 10 c 7
>>> ps.merge_asof( ... left, ... right, ... on="a", ... allow_exact_matches=False ... ).sort_values("a").reset_index(drop=True) a left_val right_val 0 1 a NaN 1 5 b 3.0 2 10 c 7.0
>>> ps.merge_asof( ... left, ... right, ... on="a", ... direction="forward" ... ).sort_values("a").reset_index(drop=True) a left_val right_val 0 1 a 1.0 1 5 b 6.0 2 10 c NaN
>>> ps.merge_asof( ... left, ... right, ... on="a", ... direction="nearest" ... ).sort_values("a").reset_index(drop=True) a left_val right_val 0 1 a 1 1 5 b 6 2 10 c 7
我们也可以使用带索引的DataFrame。
>>> left = ps.DataFrame({"left_val": ["a", "b", "c"]}, index=[1, 5, 10]) >>> left left_val 1 a 5 b 10 c
>>> right = ps.DataFrame({"right_val": [1, 2, 3, 6, 7]}, index=[1, 2, 3, 6, 7]) >>> right right_val 1 1 2 2 3 3 6 6 7 7
>>> ps.merge_asof(left, right, left_index=True, right_index=True).sort_index() left_val right_val 1 a 1 5 b 3 10 c 7
这是一个现实世界的时间序列示例
>>> quotes = ps.DataFrame( ... { ... "time": [ ... pd.Timestamp("2016-05-25 13:30:00.023"), ... pd.Timestamp("2016-05-25 13:30:00.023"), ... pd.Timestamp("2016-05-25 13:30:00.030"), ... pd.Timestamp("2016-05-25 13:30:00.041"), ... pd.Timestamp("2016-05-25 13:30:00.048"), ... pd.Timestamp("2016-05-25 13:30:00.049"), ... pd.Timestamp("2016-05-25 13:30:00.072"), ... pd.Timestamp("2016-05-25 13:30:00.075") ... ], ... "ticker": [ ... "GOOG", ... "MSFT", ... "MSFT", ... "MSFT", ... "GOOG", ... "AAPL", ... "GOOG", ... "MSFT" ... ], ... "bid": [720.50, 51.95, 51.97, 51.99, 720.50, 97.99, 720.50, 52.01], ... "ask": [720.93, 51.96, 51.98, 52.00, 720.93, 98.01, 720.88, 52.03] ... } ... ) >>> quotes time ticker bid ask 0 2016-05-25 13:30:00.023 GOOG 720.50 720.93 1 2016-05-25 13:30:00.023 MSFT 51.95 51.96 2 2016-05-25 13:30:00.030 MSFT 51.97 51.98 3 2016-05-25 13:30:00.041 MSFT 51.99 52.00 4 2016-05-25 13:30:00.048 GOOG 720.50 720.93 5 2016-05-25 13:30:00.049 AAPL 97.99 98.01 6 2016-05-25 13:30:00.072 GOOG 720.50 720.88 7 2016-05-25 13:30:00.075 MSFT 52.01 52.03
>>> trades = ps.DataFrame( ... { ... "time": [ ... pd.Timestamp("2016-05-25 13:30:00.023"), ... pd.Timestamp("2016-05-25 13:30:00.038"), ... pd.Timestamp("2016-05-25 13:30:00.048"), ... pd.Timestamp("2016-05-25 13:30:00.048"), ... pd.Timestamp("2016-05-25 13:30:00.048") ... ], ... "ticker": ["MSFT", "MSFT", "GOOG", "GOOG", "AAPL"], ... "price": [51.95, 51.95, 720.77, 720.92, 98.0], ... "quantity": [75, 155, 100, 100, 100] ... } ... ) >>> trades time ticker price quantity 0 2016-05-25 13:30:00.023 MSFT 51.95 75 1 2016-05-25 13:30:00.038 MSFT 51.95 155 2 2016-05-25 13:30:00.048 GOOG 720.77 100 3 2016-05-25 13:30:00.048 GOOG 720.92 100 4 2016-05-25 13:30:00.048 AAPL 98.00 100
默认情况下,我们采用的是报价的asof
>>> ps.merge_asof( ... trades, quotes, on="time", by="ticker" ... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True) time ticker price quantity bid ask 0 2016-05-25 13:30:00.023 MSFT 51.95 75 51.95 51.96 1 2016-05-25 13:30:00.038 MSFT 51.95 155 51.97 51.98 2 2016-05-25 13:30:00.048 AAPL 98.00 100 NaN NaN 3 2016-05-25 13:30:00.048 GOOG 720.77 100 720.50 720.93 4 2016-05-25 13:30:00.048 GOOG 720.92 100 720.50 720.93
我们仅在报价时间和交易时间之间2毫秒内进行asof操作
>>> ps.merge_asof( ... trades, ... quotes, ... on="time", ... by="ticker", ... tolerance=sf.expr("INTERVAL 2 MILLISECONDS") # pd.Timedelta("2ms") ... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True) time ticker price quantity bid ask 0 2016-05-25 13:30:00.023 MSFT 51.95 75 51.95 51.96 1 2016-05-25 13:30:00.038 MSFT 51.95 155 NaN NaN 2 2016-05-25 13:30:00.048 AAPL 98.00 100 NaN NaN 3 2016-05-25 13:30:00.048 GOOG 720.77 100 720.50 720.93 4 2016-05-25 13:30:00.048 GOOG 720.92 100 720.50 720.93
我们仅在报价时间和交易时间之间相差10毫秒以内进行asof操作,并且我们排除时间完全匹配的情况。然而, 先前 的数据将会向前传播
>>> ps.merge_asof( ... trades, ... quotes, ... on="time", ... by="ticker", ... tolerance=sf.expr("INTERVAL 10 MILLISECONDS"), # pd.Timedelta("10ms") ... allow_exact_matches=False ... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True) time ticker price quantity bid ask 0 2016-05-25 13:30:00.023 MSFT 51.95 75 NaN NaN 1 2016-05-25 13:30:00.038 MSFT 51.95 155 51.97 51.98 2 2016-05-25 13:30:00.048 AAPL 98.00 100 NaN NaN 3 2016-05-25 13:30:00.048 GOOG 720.77 100 NaN NaN 4 2016-05-25 13:30:00.048 GOOG 720.92 100 NaN NaN
-