Source code for scmdata.groupby

"""
Functionality for grouping and filtering ScmRun objects
"""
import warnings
from collections.abc import Iterable

import numpy as np
from xarray.core import ops
from xarray.core.common import ImplementsArrayReduce


def _maybe_wrap_array(original, new_array):
    """
    Wrap a transformed array with ``__array_wrap__`` if it can be done safely.

    This lets us treat arbitrary functions that take and return ndarray objects
    like ufuncs, as long as they return an array with the same shape.
    """
    # in case func lost array's metadata
    if isinstance(new_array, np.ndarray) and new_array.shape == original.shape:
        return original.__array_wrap__(new_array)
    else:
        return new_array


class _GroupBy(ImplementsArrayReduce):
    def __init__(self, meta, groups, na_fill_value=-10000):
        m = meta.reset_index(drop=True)
        self.na_fill_value = float(na_fill_value)

        # Work around the bad handling of NaN values in groupbys
        if any([np.issubdtype(m[c].dtype, np.number) for c in m]):
            if (meta == na_fill_value).any(axis=None):
                raise ValueError(
                    "na_fill_value conflicts with data value. Choose a na_fill_value "
                    "not in meta"
                )
            else:
                m = m.fillna(na_fill_value)

        self._grouper = m.groupby(list(groups), group_keys=True)

    def _iter_grouped(self):
        def _try_fill_value(v):
            try:
                if float(v) == float(self.na_fill_value):
                    return np.nan
            except ValueError:
                pass
            return v

        for indices in self._grouper.groups:
            if not isinstance(indices, Iterable) or isinstance(indices, str):
                indices = [indices]

            indices = [_try_fill_value(v) for v in indices]
            res = self.run.filter(**{k: v for k, v in zip(self.group_keys, indices)})
            if not len(res):
                raise ValueError(
                    "Empty group for {}".format(list(zip(self.group_keys, indices)))
                )
            yield res

    def __iter__(self):
        return self._iter_grouped()


[docs]class RunGroupBy(_GroupBy): """ GroupBy object specialized to grouping ScmRun objects """ def __init__(self, run, groups): self.run = run self.group_keys = groups super().__init__(run.meta, groups)
[docs] def map(self, func, *args, **kwargs): """ Apply a function to each group and append the results `func` is called like `func(ar, *args, **kwargs)` for each :class:`ScmRun <scmdata.run.ScmRun>` ``ar`` in this group. If the result of this function call is None, than it is excluded from the results. The results are appended together using :func:`run_append`. The function can change the size of the input :class:`ScmRun <scmdata.run.ScmRun>` as long as :func:`run_append` can be applied to all results. Examples -------- .. code:: python >>> def write_csv(arr): ... variable = arr.get_unique_meta("variable") ... arr.to_csv("out-{}.csv".format(variable) >>> df.groupby("variable").map(write_csv) Parameters ---------- func : function Callable to apply to each timeseries. ``*args`` Positional arguments passed to `func`. ``**kwargs`` Used to call `func(ar, **kwargs)` for each array `ar`. Returns ------- applied : :class:`ScmRun <scmdata.run.ScmRun>` The result of splitting, applying and combining this array. """ grouped = self._iter_grouped() applied = [ _maybe_wrap_array(arr, func(arr, *args, **kwargs)) for arr in grouped ] return self._combine(applied)
def _combine(self, applied): """ Recombine the applied objects like the original. """ from scmdata.run import run_append # Remove all None values applied = [df for df in applied if df is not None] if len(applied) == 0: return None else: return run_append(applied)
[docs] def reduce(self, func, dim=None, axis=None, **kwargs): """ Reduce the items in this group by applying `func` along some dimension(s). Parameters ---------- func : function Function which can be called in the form `func(x, axis=axis, **kwargs)` to return the result of collapsing an np.ndarray over an integer valued axis. dim : `...`, str or sequence of str, optional Not used in this implementation axis : int or sequence of int, optional Axis(es) over which to apply `func`. Only one of the 'dimension' and 'axis' arguments can be supplied. If neither are supplied, then `func` is calculated over all dimension for each group item. **kwargs : dict Additional keyword arguments passed on to `func`. Returns ------- reduced : :class:`ScmRun <scmdata.run.ScmRun>` Array with summarized data and the indicated dimension(s) removed. """ if dim is not None and dim != "time": raise ValueError("Only reduction along the time dimension is supported") def reduce_array(ar): return ar.reduce(func, dim, axis, **kwargs) with warnings.catch_warnings(): warnings.simplefilter("ignore", category=RuntimeWarning) return self.map(reduce_array)
ops.inject_reduce_methods(RunGroupBy)