import functools
from collections import namedtuple
import numpy as np
from dask.dataframe.dispatch import meta_nonempty
from dask.dataframe.tseries.resample import _resample_bin_and_out_divs, _resample_series
from dask.utils import derived_from
from pandas.core.resample import Resampler as pd_Resampler
from dask_expr._collection import new_collection
from dask_expr._expr import (
    Blockwise,
    Expr,
    Projection,
    make_meta,
    plain_column_projection,
)
from dask_expr._repartition import Repartition
BlockwiseDep = namedtuple(typename="BlockwiseDep", field_names=["iterable"])
class ResampleReduction(Expr):
    _parameters = [
        "frame",
        "rule",
        "kwargs",
        "how_args",
        "how_kwargs",
    ]
    _defaults = {
        "closed": None,
        "label": None,
        "kwargs": None,
        "how_args": (),
        "how_kwargs": None,
    }
    how = None
    fill_value = np.nan
    @functools.cached_property
    def npartitions(self):
        return self.frame.npartitions
    def _divisions(self):
        return self._resample_divisions[1]
    @functools.cached_property
    def _meta(self):
        resample = meta_nonempty(self.frame._meta).resample(self.rule, **self.kwargs)
        meta = getattr(resample, self.how)(*self.how_args, **self.how_kwargs or {})
        return make_meta(meta)
    @functools.cached_property
    def kwargs(self):
        return {} if self.operand("kwargs") is None else self.operand("kwargs")
    @functools.cached_property
    def how_kwargs(self):
        return {} if self.operand("how_kwargs") is None else self.operand("how_kwargs")
    @functools.cached_property
    def _resample_divisions(self):
        return _resample_bin_and_out_divs(
            self.frame.divisions, self.rule, **self.kwargs or {}
        )
    def _simplify_up(self, parent, dependents):
        if isinstance(parent, Projection):
            return plain_column_projection(self, parent, dependents)
    def _lower(self):
        partitioned = Repartition(
            self.frame, new_divisions=self._resample_divisions[0], force=True
        )
        output_divisions = self._resample_divisions[1]
        return ResampleAggregation(
            partitioned,
            BlockwiseDep(output_divisions[:-1]),
            BlockwiseDep(output_divisions[1:]),
            BlockwiseDep(["left"] * (len(output_divisions[1:]) - 1) + [None]),
            self.rule,
            self.kwargs,
            self.how,
            self.fill_value,
            list(self.how_args),
            self.how_kwargs,
        )
class ResampleAggregation(Blockwise):
    _parameters = [
        "frame",
        "divisions_left",
        "divisions_right",
        "closed",
        "rule",
        "kwargs",
        "how",
        "fill_value",
        "how_args",
        "how_kwargs",
    ]
    operation = staticmethod(_resample_series)
    @functools.cached_property
    def _meta(self):
        return self.frame._meta
    def _divisions(self):
        return list(self.divisions_left.iterable) + [self.divisions_right.iterable[-1]]
    def _blockwise_arg(self, arg, i):
        if isinstance(arg, BlockwiseDep):
            return arg.iterable[i]
        return super()._blockwise_arg(arg, i)
class ResampleCount(ResampleReduction):
    how = "count"
    fill_value = 0
class ResampleSum(ResampleReduction):
    how = "sum"
    fill_value = 0
class ResampleProd(ResampleReduction):
    how = "prod"
class ResampleMean(ResampleReduction):
    how = "mean"
class ResampleMin(ResampleReduction):
    how = "min"
class ResampleMax(ResampleReduction):
    how = "max"
class ResampleFirst(ResampleReduction):
    how = "first"
class ResampleLast(ResampleReduction):
    how = "last"
class ResampleVar(ResampleReduction):
    how = "var"
class ResampleStd(ResampleReduction):
    how = "std"
class ResampleSize(ResampleReduction):
    how = "size"
    fill_value = 0
class ResampleNUnique(ResampleReduction):
    how = "nunique"
    fill_value = 0
class ResampleMedian(ResampleReduction):
    how = "median"
class ResampleQuantile(ResampleReduction):
    how = "quantile"
class ResampleOhlc(ResampleReduction):
    how = "ohlc"
class ResampleSem(ResampleReduction):
    how = "sem"
class ResampleAgg(ResampleReduction):
    how = "agg"
    def _simplify_up(self, parent, dependents):
        # Disable optimization in `agg`; function may access other columns
        return
[docs]class Resampler:
    """Aggregate using one or more operations
    The purpose of this class is to expose an API similar
    to Pandas' `Resampler` for dask-expr
    """
[docs]    def __init__(self, obj, rule, **kwargs):
        if obj.divisions[0] is None:
            msg = (
                "Can only resample dataframes with known divisions\n"
                "See http://www.aidoczh.com/dask/dataframe-design.html#partitions\n"
                "for more information."
            )
            raise ValueError(msg)
        self.obj = obj
        self.rule = rule
        self.kwargs = kwargs 
    def _single_agg(self, expr_cls, how_args=(), how_kwargs=None):
        return new_collection(
            expr_cls(
                self.obj,
                self.rule,
                self.kwargs,
                how_args=how_args,
                how_kwargs=how_kwargs,
            )
        )
[docs]    @derived_from(pd_Resampler)
    def count(self):
        return self._single_agg(ResampleCount) 
[docs]    @derived_from(pd_Resampler)
    def sum(self):
        return self._single_agg(ResampleSum) 
[docs]    @derived_from(pd_Resampler)
    def prod(self):
        return self._single_agg(ResampleProd) 
[docs]    @derived_from(pd_Resampler)
    def mean(self):
        return self._single_agg(ResampleMean) 
[docs]    @derived_from(pd_Resampler)
    def min(self):
        return self._single_agg(ResampleMin) 
[docs]    @derived_from(pd_Resampler)
    def max(self):
        return self._single_agg(ResampleMax) 
[docs]    @derived_from(pd_Resampler)
    def first(self):
        return self._single_agg(ResampleFirst) 
[docs]    @derived_from(pd_Resampler)
    def last(self):
        return self._single_agg(ResampleLast) 
[docs]    @derived_from(pd_Resampler)
    def var(self):
        return self._single_agg(ResampleVar) 
[docs]    @derived_from(pd_Resampler)
    def std(self):
        return self._single_agg(ResampleStd) 
[docs]    @derived_from(pd_Resampler)
    def size(self):
        return self._single_agg(ResampleSize) 
[docs]    @derived_from(pd_Resampler)
    def nunique(self):
        return self._single_agg(ResampleNUnique) 
[docs]    @derived_from(pd_Resampler)
    def quantile(self):
        return self._single_agg(ResampleQuantile) 
[docs]    @derived_from(pd_Resampler)
    def ohlc(self):
        return self._single_agg(ResampleOhlc) 
[docs]    @derived_from(pd_Resampler)
    def sem(self):
        return self._single_agg(ResampleSem) 
[docs]    @derived_from(pd_Resampler)
    def agg(self, func, *args, **kwargs):
        return self._single_agg(ResampleAgg, how_args=(func, *args), how_kwargs=kwargs)