Source code for scmdata.time

"""
Time period handling and interpolation

A large portion of this module was originally from openscm.
Thanks to the original author, Sven Willner
"""

from datetime import datetime

import cftime
import numpy as np
import pandas as pd
from dateutil import parser

try:
    import scipy.interpolate

    has_scipy = True
except ImportError:  # pragma: no cover
    scipy = None
    has_scipy = False


_TARGET_TYPE = np.int64
_TARGET_DTYPE = "datetime64[s]"


[docs]class InsufficientDataError(Exception): """ Insufficient data is available to interpolate/extrapolate """ pass
def _float_year_to_datetime(inp: float) -> np.datetime64: year = int(inp) fractional_part = inp - year return np.datetime64( # pylint: disable=too-many-function-args year - 1970, "Y" ) + np.timedelta64( # pylint: disable=too-many-function-args int( (datetime(year + 1, 1, 1) - datetime(year, 1, 1)).total_seconds() * fractional_part ), "s", ) _ufunc_float_year_to_datetime = np.frompyfunc(_float_year_to_datetime, 1, 1) _ufunc_str_to_datetime = np.frompyfunc(parser.parse, 1, 1) def _parse_datetime(inp: np.ndarray) -> np.ndarray: try: return _ufunc_float_year_to_datetime(inp.astype(float)) except (TypeError, ValueError): return _ufunc_str_to_datetime(inp) def _format_datetime(dts: np.ndarray) -> np.ndarray: """ Convert an array to an array of :class:`numpy.datetime64`. Parameters ---------- dts Input to attempt to convert Returns ------- :class:`numpy.ndarray` of :class:`numpy.datetime64` Converted array Raises ------ ValueError If one of the values in :obj:`dts` cannot be converted to :class:`numpy.datetime64` """ if len(dts) <= 0: # pylint: disable=len-as-condition return np.array([], dtype=_TARGET_DTYPE) dtype = dts.dtype.type if dts.dtype.kind == "O": dtype = np.dtype(type(dts[0])).type if issubclass(dtype, np.datetime64): return np.asarray(dts, dtype=_TARGET_DTYPE) if issubclass(dtype, np.floating): return _ufunc_float_year_to_datetime(dts).astype(_TARGET_DTYPE) if issubclass(dtype, np.integer): return (np.asarray(dts) - 1970).astype("datetime64[Y]").astype(_TARGET_DTYPE) if issubclass(dtype, str): return _parse_datetime(dts).astype(_TARGET_DTYPE) return np.asarray(dts, dtype=_TARGET_DTYPE)
[docs]class TimePoints: """ Handles time points by wrapping :class:`numpy.ndarray` of :class:`numpy.datetime64`.. """ def __init__(self, values): """ Initialize. Parameters ---------- values Time points array to handle """ self._values = _format_datetime(np.asarray(values)) def __len__(self) -> int: """ Get the number of time points. """ return len(self._values) @property def values(self) -> np.ndarray: """ Time points """ return self._values
[docs] def to_index(self) -> pd.Index: """ Get time points as :class:`pandas.Index`. Returns ------- :class:`pandas.Index` :class:`pandas.Index` of :class:`numpy.dtype` :class:`object` with name ``"time"`` made from the time points represented as :class:`datetime.datetime`. """ return pd.Index(self._values.astype(object), dtype=object, name="time")
[docs] def as_cftime(self, date_cls=cftime.DatetimeGregorian) -> list: """ Format time points as :class:`cftime.datetime` Parameters ---------- date_cls : :class:`cftime.datetime` The time points will be returned as instances of ``date_cls`` Returns ------- list of :class:`cftime.datetime` Time points as a list of ``date_cls`` objects """ return [date_cls(*dt.timetuple()[:6]) for dt in self._values.astype(object)]
[docs] def years(self) -> np.ndarray: """ Get year of each time point. Returns ------- :class:`numpy.ndarray` of :obj:`int` Year of each time point """ return np.vectorize(getattr)(self._values.astype(object), "year")
[docs] def months(self) -> np.ndarray: """ Get month of each time point. Returns ------- :class:`numpy.ndarray` of :obj:`int` Month of each time point """ return np.vectorize(getattr)(self._values.astype(object), "month")
[docs] def days(self) -> np.ndarray: """ Get day of each time point. Returns ------- :class:`numpy.ndarray` of :obj:`int` Day of each time point """ return np.vectorize(getattr)(self._values.astype(object), "day")
[docs] def hours(self) -> np.ndarray: """ Get hour of each time point. Returns ------- :class:`numpy.ndarray` of :obj:`int` Hour of each time point """ return np.vectorize(getattr)(self._values.astype(object), "hour")
[docs] def weekdays(self) -> np.ndarray: """ Get weekday of each time point. Returns ------- :class:`numpy.ndarray` of :obj:`int` Day of the week of each time point """ return np.vectorize(datetime.weekday)(self._values.astype(object))
[docs]class TimeseriesConverter: """ Interpolator used to convert data between different time bases This is a modified version originally in :mod:`openscm.time.TimeseriesConverter`. The integral preserving interpolation was removed as it is outside the scope of this package. Parameters ---------- source_time_points: np.ndarray Source timeseries time points target_time_points: np.ndarray Target timeseries time points interpolation_type : {"linear"} Interpolation type. Options are 'linear' extrapolation_type : {"linear", "constant", None} Extrapolation type. Options are None, 'linear' or 'constant' Raises ------ InsufficientDataError Timeseries too short to extrapolate """ def __init__( self, source_time_points: np.ndarray, target_time_points: np.ndarray, interpolation_type="linear", extrapolation_type="linear", ): self.source = ( np.array(source_time_points) .astype(_TARGET_DTYPE) .astype(_TARGET_TYPE, copy=True) ) self.target = ( np.array(target_time_points) .astype(_TARGET_DTYPE) .astype(_TARGET_TYPE, copy=True) ) self.interpolation_type = interpolation_type self.extrapolation_type = extrapolation_type if not self.points_are_compatible(self.source, self.target): error_msg = ( "Target time points are outside the source time points, use an " "extrapolation type other than None" ) raise InsufficientDataError(error_msg)
[docs] def points_are_compatible(self, source: np.ndarray, target: np.ndarray) -> bool: """ Are the two sets of time points compatible i.e. can I convert between the two? Parameters ---------- source Source timeseries time points target Target timeseries time points Returns ------- bool Can I convert between the time points? """ if self.extrapolation_type is None and ( source[0] > target[0] or source[-1] < target[-1] ): return False return True
def _get_scipy_extrapolation_args(self, values: np.ndarray): if self.extrapolation_type == "linear": return {"fill_value": "extrapolate"} if self.extrapolation_type == "constant": return {"fill_value": (values[0], values[-1]), "bounds_error": False} # TODO: add cubic support return {} def _get_scipy_interpolation_arg(self) -> str: if self.interpolation_type == "linear": return "linear" # TODO: add cubic support raise NotImplementedError def _convert( self, values: np.ndarray, source_time_points: np.ndarray, target_time_points: np.ndarray, ) -> np.ndarray: """ Wrap :func:`_convert_unsafe` to provide proper error handling. Any nan values are removed from :obj:`source` before interpolation Parameters ---------- values Array of data to convert source_time_points Source timeseries time points target_time_points Target timeseries time points Raises ------ InsufficientDataError Length of the time series is too short to convert InsufficientDataError Target time points are outside the source time points and :attr:`extrapolation_type` is 'NONE' ImportError Optional dependency scipy has not been installed Returns ------- np.ndarray Converted time period average data for timeseries :obj:`values` """ values = np.asarray(values) # Check for nans nan_mask = np.isnan(values) if nan_mask.sum(): values = values[~nan_mask] source_time_points = source_time_points[~nan_mask] if len(values) < 3: raise InsufficientDataError try: return self._convert_unsafe(values, source_time_points, target_time_points) except Exception: # pragma: no cover # emergency valve print("numpy interpolation failed...") raise def _convert_unsafe( self, values: np.ndarray, source_time_points: np.ndarray, target_time_points: np.ndarray, ) -> np.ndarray: if not has_scipy: raise ImportError("scipy is not installed. Run 'pip install scipy'") res_point = scipy.interpolate.interp1d( source_time_points.astype(_TARGET_TYPE), values, kind=self._get_scipy_interpolation_arg(), **self._get_scipy_extrapolation_args(values), ) return res_point(target_time_points.astype(_TARGET_TYPE))
[docs] def convert_from(self, values: np.ndarray) -> np.ndarray: """ Convert value **from** source timeseries time points to target timeseries time points. Parameters ---------- values: np.ndarray Value Returns ------- np.ndarray Converted data for timeseries :obj:`values` into the target timebase """ return self._convert(values, self.source, self.target)
[docs] def convert_to(self, values: np.ndarray) -> np.ndarray: """ Convert value from target timeseries time points **to** source timeseries time points. Parameters ---------- values: np.ndarray Value Returns ------- np.ndarray Converted data for timeseries :obj:`values` into the source timebase """ return self._convert(values, self.target, self.source)