Source code for scmdata.ops

"""
Operations for :class:`ScmRun <scmdata.run.ScmRun>` objects

These largely rely on
`Pint's Pandas interface <https://pint.readthedocs.io/en/0.13/pint-pandas.html>`_
to handle unit conversions automatically
"""
import warnings

import numpy as np
import pandas as pd
import pint_pandas
from openscm_units import unit_registry

from .time import TimePoints

try:
    import scipy.integrate

    has_scipy = True
except ImportError:  # pragma: no cover
    scipy = None
    has_scipy = False


[docs]def prep_for_op(inp, op_cols, meta, ur=unit_registry): """ Prepare dataframe for operation Parameters ---------- inp : :class:`ScmRun <scmdata.run.ScmRun>` :class:`ScmRun <scmdata.run.ScmRun>` containing data to prepare op_cols : dict of str: str Dictionary containing the columns to drop in order to prepare for the operation as the keys (the values are not used). For example, if we have ``op_cols={"variable": "Emissions|CO2 - Emissions|CO2|Fossil"}`` then we will drop the "variable" column from the index. ur : :class:`pint.UnitRegistry` Pint unit registry to use for the operation Returns ------- :class:`pandas.DataFrame` Timeseries to use for the operation. They are the transpose of the normal :meth:`ScmRun.timeseries` output with the columns being Pint arrays (unless "unit" is in op_cols in which case no units are available to be used so the columns are standard numpy arrays). We do this so that we can use `Pint's Pandas interface <https://pint.readthedocs.io/en/0.13/pint-pandas.html>`_ to handle unit conversions automatically. """ pint_pandas.PintType.ureg = ur key_cols = list(op_cols.keys()) out = inp.timeseries(meta=meta).reset_index(key_cols, drop=True) out = out.T if "unit" not in op_cols: unit_level = out.columns.names.index("unit",) out = out.pint.quantify(level=unit_level) return out
[docs]def set_op_values(output, op_cols): """ Set operation values in output Parameters ---------- output : :class:`pandas.Dataframe` Dataframe of which to update the values op_cols : dict of str: str Dictionary containing the columns to update as the keys and the value those columns should hold in the output as the values. For example, if we have ``op_cols={"variable": "Emissions|CO2 - Emissions|CO2|Fossil"}`` then the output will have a "variable" column with the value "Emissions|CO2 - Emissions|CO2|Fossil". Returns ------- :class:`pandas.Dataframe` ``output`` with the relevant columns being set according to ``op_cols``. """ for k, v in op_cols.items(): output[k] = v return output
def _perform_op(base, other, op, use_pint_units=True): # pint handling means we have to do series by series out = [] col_names = base.columns.names for col in base: if op in ["add", "subtract"]: try: if op == "add": out.append(base[col] + other[col]) elif op == "subtract": out.append(base[col] - other[col]) except KeyError: raise KeyError( "No equivalent in `other` for {}".format(list(zip(col_names, col))) ) elif op == "multiply": out.append(base[col] * other[col]) elif op == "divide": out.append(base[col] / other[col]) else: # pragma: no cover raise NotImplementedError(op) if len(out) == 1: out = out[0].to_frame() else: out = pd.concat(out, axis="columns") out.columns.names = base.columns.names if use_pint_units: out = out.pint.dequantify() out = out.T return out
[docs]def subtract(self, other, op_cols, **kwargs): """ Subtract values Parameters ---------- other : :class:`ScmRun <scmdata.run.ScmRun>` :class:`ScmRun <scmdata.run.ScmRun>` containing data to subtract op_cols : dict of str: str Dictionary containing the columns to drop before subtracting as the keys and the value those columns should hold in the output as the values. For example, if we have ``op_cols={"variable": "Emissions|CO2 - Emissions|CO2|Fossil"}`` then the subtraction will be performed with an index that uses all columns except the "variable" column and the output will have a "variable" column with the value "Emissions|CO2 - Emissions|CO2|Fossil". **kwargs : any Passed to :func:`prep_for_op` Returns ------- :class:`ScmRun <scmdata.run.ScmRun>` Difference between ``self`` and ``other``, using ``op_cols`` to define the columns which should be dropped before the data is aligned and to define the value of these columns in the output. Examples -------- >>> import numpy as np >>> from scmdata import ScmRun >>> >>> IDX = [2010, 2020, 2030] >>> >>> >>> start = ScmRun( ... data=np.arange(18).reshape(3, 6), ... index=IDX, ... columns={ ... "variable": [ ... "Emissions|CO2|Fossil", ... "Emissions|CO2|AFOLU", ... "Emissions|CO2|Fossil", ... "Emissions|CO2|AFOLU", ... "Cumulative Emissions|CO2", ... "Surface Air Temperature Change", ... ], ... "unit": ["GtC / yr", "GtC / yr", "GtC / yr", "GtC / yr", "GtC", "K"], ... "region": ["World|NH", "World|NH", "World|SH", "World|SH", "World", "World"], ... "model": "idealised", ... "scenario": "idealised", ... }, ... ) >>> >>> start.head() time 2010-01-01 00:00:00 2020-01-01 00:00:00 2030-01-01 00:00:00 variable unit region model scenario Emissions|CO2|Fossil GtC / yr World|NH idealised idealised 0.0 6.0 12.0 Emissions|CO2|AFOLU GtC / yr World|NH idealised idealised 1.0 7.0 13.0 Emissions|CO2|Fossil GtC / yr World|SH idealised idealised 2.0 8.0 14.0 Emissions|CO2|AFOLU GtC / yr World|SH idealised idealised 3.0 9.0 15.0 Cumulative Emissions|CO2 GtC World idealised idealised 4.0 10.0 16.0 >>> fos = start.filter(variable="*Fossil") >>> fos.head() time 2010-01-01 00:00:00 2020-01-01 00:00:00 2030-01-01 00:00:00 variable unit region model scenario Emissions|CO2|Fossil GtC / yr World|NH idealised idealised 0.0 6.0 12.0 World|SH idealised idealised 2.0 8.0 14.0 >>> >>> afolu = start.filter(variable="*AFOLU") >>> afolu.head() time 2010-01-01 00:00:00 2020-01-01 00:00:00 2030-01-01 00:00:00 variable unit region model scenario Emissions|CO2|AFOLU GtC / yr World|NH idealised idealised 1.0 7.0 13.0 World|SH idealised idealised 3.0 9.0 15.0 >>> >>> fos_minus_afolu = fos.subtract( ... afolu, op_cols={"variable": "Emissions|CO2|Fossil - AFOLU"} ... ) >>> fos_minus_afolu.head() time 2010-01-01 00:00:00 2020-01-01 00:00:00 2030-01-01 00:00:00 model scenario region variable unit idealised idealised World|NH Emissions|CO2|Fossil - AFOLU gigatC / a -1.0 -1.0 -1.0 World|SH Emissions|CO2|Fossil - AFOLU gigatC / a -1.0 -1.0 -1.0 >>> >>> nh_minus_sh = nh.subtract(sh, op_cols={"region": "World|NH - SH"}) >>> nh_minus_sh.head() time 2010-01-01 00:00:00 2020-01-01 00:00:00 2030-01-01 00:00:00 model scenario region variable unit idealised idealised World|NH - SH Emissions|CO2|Fossil gigatC / a -2.0 -2.0 -2.0 Emissions|CO2|AFOLU gigatC / a -2.0 -2.0 -2.0 """ out = _perform_op( prep_for_op(self, op_cols, self.meta.columns, **kwargs), prep_for_op(other, op_cols, self.meta.columns, **kwargs), "subtract", use_pint_units="unit" not in op_cols, ) out = set_op_values(out, op_cols) return type(self)(out)
[docs]def add(self, other, op_cols, **kwargs): """ Add values Parameters ---------- other : :class:`ScmRun <scmdata.run.ScmRun>` :class:`ScmRun <scmdata.run.ScmRun>` containing data to add op_cols : dict of str: str Dictionary containing the columns to drop before adding as the keys and the value those columns should hold in the output as the values. For example, if we have ``op_cols={"variable": "Emissions|CO2 - Emissions|CO2|Fossil"}`` then the addition will be performed with an index that uses all columns except the "variable" column and the output will have a "variable" column with the value "Emissions|CO2 - Emissions|CO2|Fossil". **kwargs : any Passed to :func:`prep_for_op` Returns ------- :class:`ScmRun <scmdata.run.ScmRun>` Sum of ``self`` and ``other``, using ``op_cols`` to define the columns which should be dropped before the data is aligned and to define the value of these columns in the output. Examples -------- >>> import numpy as np >>> from scmdata import ScmRun >>> >>> IDX = [2010, 2020, 2030] >>> >>> >>> start = ScmRun( ... data=np.arange(18).reshape(3, 6), ... index=IDX, ... columns={ ... "variable": [ ... "Emissions|CO2|Fossil", ... "Emissions|CO2|AFOLU", ... "Emissions|CO2|Fossil", ... "Emissions|CO2|AFOLU", ... "Cumulative Emissions|CO2", ... "Surface Air Temperature Change", ... ], ... "unit": ["GtC / yr", "GtC / yr", "GtC / yr", "GtC / yr", "GtC", "K"], ... "region": ["World|NH", "World|NH", "World|SH", "World|SH", "World", "World"], ... "model": "idealised", ... "scenario": "idealised", ... }, ... ) >>> >>> start.head() time 2010-01-01 00:00:00 2020-01-01 00:00:00 2030-01-01 00:00:00 variable unit region model scenario Emissions|CO2|Fossil GtC / yr World|NH idealised idealised 0.0 6.0 12.0 Emissions|CO2|AFOLU GtC / yr World|NH idealised idealised 1.0 7.0 13.0 Emissions|CO2|Fossil GtC / yr World|SH idealised idealised 2.0 8.0 14.0 Emissions|CO2|AFOLU GtC / yr World|SH idealised idealised 3.0 9.0 15.0 Cumulative Emissions|CO2 GtC World idealised idealised 4.0 10.0 16.0 >>> fos = start.filter(variable="*Fossil") >>> fos.head() time 2010-01-01 00:00:00 2020-01-01 00:00:00 2030-01-01 00:00:00 variable unit region model scenario Emissions|CO2|Fossil GtC / yr World|NH idealised idealised 0.0 6.0 12.0 World|SH idealised idealised 2.0 8.0 14.0 >>> >>> afolu = start.filter(variable="*AFOLU") >>> afolu.head() time 2010-01-01 00:00:00 2020-01-01 00:00:00 2030-01-01 00:00:00 variable unit region model scenario Emissions|CO2|AFOLU GtC / yr World|NH idealised idealised 1.0 7.0 13.0 World|SH idealised idealised 3.0 9.0 15.0 >>> >>> total = fos.add(afolu, op_cols={"variable": "Emissions|CO2"}) >>> total.head() time 2010-01-01 00:00:00 2020-01-01 00:00:00 2030-01-01 00:00:00 model scenario region variable unit idealised idealised World|NH Emissions|CO2 gigatC / a 1.0 13.0 25.0 World|SH Emissions|CO2 gigatC / a 5.0 17.0 29.0 >>> >>> nh = start.filter(region="*NH") >>> nh.head() time 2010-01-01 00:00:00 2020-01-01 00:00:00 2030-01-01 00:00:00 variable unit region model scenario Emissions|CO2|Fossil GtC / yr World|NH idealised idealised 0.0 6.0 12.0 Emissions|CO2|AFOLU GtC / yr World|NH idealised idealised 1.0 7.0 13.0 >>> >>> sh = start.filter(region="*SH") >>> sh.head() time 2010-01-01 00:00:00 2020-01-01 00:00:00 2030-01-01 00:00:00 variable unit region model scenario Emissions|CO2|Fossil GtC / yr World|SH idealised idealised 2.0 8.0 14.0 Emissions|CO2|AFOLU GtC / yr World|SH idealised idealised 3.0 9.0 15.0 >>> >>> world = nh.add(sh, op_cols={"region": "World"}) >>> world.head() time 2010-01-01 00:00:00 2020-01-01 00:00:00 2030-01-01 00:00:00 model scenario region variable unit idealised idealised World Emissions|CO2|Fossil gigatC / a 2.0 14.0 26.0 Emissions|CO2|AFOLU gigatC / a 4.0 16.0 28.0 """ out = _perform_op( prep_for_op(self, op_cols, self.meta.columns, **kwargs), prep_for_op(other, op_cols, self.meta.columns, **kwargs), "add", use_pint_units="unit" not in op_cols, ) out = set_op_values(out, op_cols) return type(self)(out)
[docs]def multiply(self, other, op_cols, **kwargs): """ Multiply values Parameters ---------- other : :class:`ScmRun <scmdata.run.ScmRun>` :class:`ScmRun <scmdata.run.ScmRun>` containing data to multiply op_cols : dict of str: str Dictionary containing the columns to drop before multiplying as the keys and the value those columns should hold in the output as the values. For example, if we have ``op_cols={"variable": "Emissions|CO2 - Emissions|CO2|Fossil"}`` then the multiplication will be performed with an index that uses all columns except the "variable" column and the output will have a "variable" column with the value "Emissions|CO2 - Emissions|CO2|Fossil". **kwargs : any Passed to :func:`prep_for_op` Returns ------- :class:`ScmRun <scmdata.run.ScmRun>` Product of ``self`` and ``other``, using ``op_cols`` to define the columns which should be dropped before the data is aligned and to define the value of these columns in the output. Examples -------- >>> import numpy as np >>> from scmdata import ScmRun >>> >>> IDX = [2010, 2020, 2030] >>> >>> >>> start = ScmRun( ... data=np.arange(18).reshape(3, 6), ... index=IDX, ... columns={ ... "variable": [ ... "Emissions|CO2|Fossil", ... "Emissions|CO2|AFOLU", ... "Emissions|CO2|Fossil", ... "Emissions|CO2|AFOLU", ... "Cumulative Emissions|CO2", ... "Surface Air Temperature Change", ... ], ... "unit": ["GtC / yr", "GtC / yr", "GtC / yr", "GtC / yr", "GtC", "K"], ... "region": ["World|NH", "World|NH", "World|SH", "World|SH", "World", "World"], ... "model": "idealised", ... "scenario": "idealised", ... }, ... ) >>> >>> start.head() time 2010-01-01 00:00:00 2020-01-01 00:00:00 2030-01-01 00:00:00 variable unit region model scenario Emissions|CO2|Fossil GtC / yr World|NH idealised idealised 0.0 6.0 12.0 Emissions|CO2|AFOLU GtC / yr World|NH idealised idealised 1.0 7.0 13.0 Emissions|CO2|Fossil GtC / yr World|SH idealised idealised 2.0 8.0 14.0 Emissions|CO2|AFOLU GtC / yr World|SH idealised idealised 3.0 9.0 15.0 Cumulative Emissions|CO2 GtC World idealised idealised 4.0 10.0 16.0 >>> fos = start.filter(variable="*Fossil") >>> fos.head() time 2010-01-01 00:00:00 2020-01-01 00:00:00 2030-01-01 00:00:00 variable unit region model scenario Emissions|CO2|Fossil GtC / yr World|NH idealised idealised 0.0 6.0 12.0 World|SH idealised idealised 2.0 8.0 14.0 >>> >>> afolu = start.filter(variable="*AFOLU") >>> afolu.head() time 2010-01-01 00:00:00 2020-01-01 00:00:00 2030-01-01 00:00:00 variable unit region model scenario Emissions|CO2|AFOLU GtC / yr World|NH idealised idealised 1.0 7.0 13.0 World|SH idealised idealised 3.0 9.0 15.0 >>> >>> fos_times_afolu = fos.multiply( ... afolu, op_cols={"variable": "Emissions|CO2|Fossil : AFOLU"} ... ) >>> fos_times_afolu.head() time 2010-01-01 00:00:00 2020-01-01 00:00:00 2030-01-01 00:00:00 model scenario region variable unit idealised idealised World|NH Emissions|CO2|Fossil : AFOLU gigatC ** 2 / a ** 2 0.0 42.0 156.0 World|SH Emissions|CO2|Fossil : AFOLU gigatC ** 2 / a ** 2 6.0 72.0 210.0 """ out = _perform_op( prep_for_op(self, op_cols, self.meta.columns, **kwargs), prep_for_op(other, op_cols, self.meta.columns, **kwargs), "multiply", use_pint_units="unit" not in op_cols, ) out = set_op_values(out, op_cols) return type(self)(out)
[docs]def divide(self, other, op_cols, **kwargs): """ Divide values (self / other) Parameters ---------- other : :class:`ScmRun <scmdata.run.ScmRun>` :class:`ScmRun <scmdata.run.ScmRun>` containing data to divide op_cols : dict of str: str Dictionary containing the columns to drop before dividing as the keys and the value those columns should hold in the output as the values. For example, if we have ``op_cols={"variable": "Emissions|CO2 - Emissions|CO2|Fossil"}`` then the division will be performed with an index that uses all columns except the "variable" column and the output will have a "variable" column with the value "Emissions|CO2 - Emissions|CO2|Fossil". **kwargs : any Passed to :func:`prep_for_op` Returns ------- :class:`ScmRun <scmdata.run.ScmRun>` Quotient of ``self`` and ``other``, using ``op_cols`` to define the columns which should be dropped before the data is aligned and to define the value of these columns in the output. Examples -------- >>> import numpy as np >>> from scmdata import ScmRun >>> >>> IDX = [2010, 2020, 2030] >>> >>> >>> start = ScmRun( ... data=np.arange(18).reshape(3, 6), ... index=IDX, ... columns={ ... "variable": [ ... "Emissions|CO2|Fossil", ... "Emissions|CO2|AFOLU", ... "Emissions|CO2|Fossil", ... "Emissions|CO2|AFOLU", ... "Cumulative Emissions|CO2", ... "Surface Air Temperature Change", ... ], ... "unit": ["GtC / yr", "GtC / yr", "GtC / yr", "GtC / yr", "GtC", "K"], ... "region": ["World|NH", "World|NH", "World|SH", "World|SH", "World", "World"], ... "model": "idealised", ... "scenario": "idealised", ... }, ... ) >>> >>> start.head() time 2010-01-01 00:00:00 2020-01-01 00:00:00 2030-01-01 00:00:00 variable unit region model scenario Emissions|CO2|Fossil GtC / yr World|NH idealised idealised 0.0 6.0 12.0 Emissions|CO2|AFOLU GtC / yr World|NH idealised idealised 1.0 7.0 13.0 Emissions|CO2|Fossil GtC / yr World|SH idealised idealised 2.0 8.0 14.0 Emissions|CO2|AFOLU GtC / yr World|SH idealised idealised 3.0 9.0 15.0 Cumulative Emissions|CO2 GtC World idealised idealised 4.0 10.0 16.0 >>> fos = start.filter(variable="*Fossil") >>> fos.head() time 2010-01-01 00:00:00 2020-01-01 00:00:00 2030-01-01 00:00:00 variable unit region model scenario Emissions|CO2|Fossil GtC / yr World|NH idealised idealised 0.0 6.0 12.0 World|SH idealised idealised 2.0 8.0 14.0 >>> >>> afolu = start.filter(variable="*AFOLU") >>> afolu.head() time 2010-01-01 00:00:00 2020-01-01 00:00:00 2030-01-01 00:00:00 variable unit region model scenario Emissions|CO2|AFOLU GtC / yr World|NH idealised idealised 1.0 7.0 13.0 World|SH idealised idealised 3.0 9.0 15.0 >>> >>> fos_afolu_ratio = fos.divide( ... afolu, op_cols={"variable": "Emissions|CO2|Fossil : AFOLU"} ... ) >>> fos_afolu_ratio.head() time 2010-01-01 00:00:00 2020-01-01 00:00:00 2030-01-01 00:00:00 model scenario region variable unit idealised idealised World|NH Emissions|CO2|Fossil : AFOLU dimensionless 0.000000 0.857143 0.923077 World|SH Emissions|CO2|Fossil : AFOLU dimensionless 0.666667 0.888889 0.933333 """ out = _perform_op( prep_for_op(self, op_cols, self.meta.columns, **kwargs), prep_for_op(other, op_cols, self.meta.columns, **kwargs), "divide", use_pint_units="unit" not in op_cols, ) out = set_op_values(out, op_cols) return type(self)(out)
[docs]def integrate(self, out_var=None): """ Integrate with respect to time Parameters ---------- out_var : str If provided, the variable column of the output is set equal to ``out_var``. Otherwise, the output variables are equal to the input variables, prefixed with "Cumulative " . Returns ------- :class:`scmdata.ScmRun <scmdata.run.ScmRun>` :class:`scmdata.ScmRun <scmdata.run.ScmRun>` containing the integral of ``self`` with respect to time Warns ----- UserWarning The data being integrated contains nans. If this happens, the output data will also contain nans. """ if not has_scipy: raise ImportError("scipy is not installed. Run 'pip install scipy'") time_unit = "s" times_in_s = self.time_points.values.astype( "datetime64[{}]".format(time_unit) ).astype("int") ts = self.timeseries() if ts.isnull().sum().sum() > 0: warnings.warn( "You are integrating data which contains nans so your result will " "also contain nans. Perhaps you want to remove the nans before " "performing the integration using a combination of :meth:`filter` " "and :meth:`interpolate`?" ) # If required, we can remove the hard-coding of initial, it just requires # some thinking about unit handling _initial = 0.0 out = pd.DataFrame( scipy.integrate.cumtrapz(y=ts, x=times_in_s, axis=1, initial=_initial) ) out.index = ts.index out.columns = ts.columns out = type(self)(out) out *= unit_registry(time_unit) try: out_unit = out.get_unique_meta("unit", no_duplicates=True).replace(" ", "") out_unit = str(unit_registry(out_unit).to_reduced_units().units) out = out.convert_unit(out_unit) except ValueError: # more than one unit, don't try to clean up pass if out_var is None: out["variable"] = "Cumulative " + out["variable"] else: out["variable"] = out_var return out
[docs]def delta_per_delta_time(self, out_var=None): """ Calculate change in timeseries values for each timestep, divided by the size of the timestep The output is placed on the middle of each timestep and is one timestep shorter than the input. Parameters ---------- out_var : str If provided, the variable column of the output is set equal to ``out_var``. Otherwise, the output variables are equal to the input variables, prefixed with "Delta " . Returns ------- :class:`scmdata.ScmRun <scmdata.run.ScmRun>` :class:`scmdata.ScmRun <scmdata.run.ScmRun>` containing the changes in values of ``self``, normalised by the change in time Warns ----- UserWarning The data contains nans. If this happens, the output data will also contain nans. """ time_unit = "s" times_numpy = self.time_points.values.astype("datetime64[{}]".format(time_unit)) times_deltas_numpy = times_numpy[1:] - times_numpy[:-1] times_in_s = times_numpy.astype("int") time_deltas_in_s = times_in_s[1:] - times_in_s[:-1] ts = self.timeseries() if ts.isnull().sum().sum() > 0: warnings.warn( "You are calculating deltas of data which contains nans so your " "result will also contain nans. Perhaps you want to remove the " "nans before calculating the deltas using a combination of " ":meth:`filter` and :meth:`interpolate`?" ) out = ts.diff(periods=1, axis="columns") if not out.iloc[:, 0].isnull().all(): # pragma: no cover raise AssertionError( "Did pandas change their API? The first timestep is not all nan." ) out = out.iloc[:, 1:] / time_deltas_in_s new_times = times_numpy[:-1] + times_deltas_numpy / 2 out.columns = TimePoints(new_times).to_index() out = type(self)(out) out /= unit_registry(time_unit) try: out_unit = out.get_unique_meta("unit", no_duplicates=True).replace(" ", "") out_unit = str(unit_registry(out_unit).to_reduced_units().units) out = out.convert_unit(out_unit) except ValueError: # more than one unit, don't try to clean up pass if out_var is None: out["variable"] = "Delta " + out["variable"] else: out["variable"] = out_var return out
[docs]def linear_regression(self): """ Calculate linear regression of each timeseries Note ---- Times in seconds since 1970-01-01 are used as the x-axis for the regressions. Such values can be accessed with ``self.time_points.values.astype("datetime64[s]").astype("int")``. This decision does not matter for the gradients, but is important for the intercept values. Returns ------- list of dict[str : Any] List of dictionaries. Each dictionary contains the metadata for the timeseries plus the gradient (with key ``"gradient"``) and intercept ( with key ``"intercept"``). The gradient and intercept are stored as :class:`pint.Quantity`. """ _, _, time_unit, gradients, intercepts, meta = _calculate_linear_regression(self) out = [] for row_meta, gradient, intercept in zip( meta.to_dict("records"), gradients, intercepts, ): unit = row_meta.pop("unit") row_meta["gradient"] = gradient * unit_registry( "{} / {}".format(unit, time_unit) ) row_meta["intercept"] = intercept * unit_registry(unit) out.append(row_meta) return out
def _convert_linear_regression_raw_to_pdf(raw, key_to_keep, unit): pdf_dicts = [] for r in raw: transformed = { k: v for k, v in r.items() if k not in ["gradient", "intercept", "unit"] } if unit is None: transformed[key_to_keep] = r[key_to_keep].magnitude transformed["unit"] = str(r["gradient"].units) else: transformed[key_to_keep] = r[key_to_keep].to(unit).magnitude transformed["unit"] = unit pdf_dicts.append(transformed) return pd.DataFrame(pdf_dicts)
[docs]def linear_regression_gradient(self, unit=None): """ Calculate gradients of a linear regression of each timeseries Parameters ---------- unit : str Output unit for gradients. If not supplied, the gradients' units will not be converted to a common unit. Returns ------- :class:`pandas.DataFrame` ``self.meta`` plus a column with the value of the gradient for each timeseries. The ``"unit"`` column is updated to show the unit of the gradient. """ raw = self.linear_regression() return _convert_linear_regression_raw_to_pdf(raw, "gradient", unit)
[docs]def linear_regression_intercept(self, unit=None): """ Calculate intercepts of a linear regression of each timeseries Note ---- Times in seconds since 1970-01-01 are used as the x-axis for the regressions. Such values can be accessed with ``self.time_points.values.astype("datetime64[s]").astype("int")``. This decision does not matter for the gradients, but is important for the intercept values. Parameters ---------- unit : str Output unit for gradients. If not supplied, the gradients' units will not be converted to a common unit. Returns ------- :class:`pandas.DataFrame` ``self.meta`` plus a column with the value of the gradient for each timeseries. The ``"unit"`` column is updated to show the unit of the gradient. """ raw = self.linear_regression() return _convert_linear_regression_raw_to_pdf(raw, "intercept", unit)
[docs]def linear_regression_scmrun(self): """ Re-calculate the timeseries based on a linear regression Returns ------- :class:`scmdata.ScmRun <scmdata.run.ScmRun>` The timeseries, re-calculated based on a linear regression """ ( times_numpy, times_in_s, time_unit, gradients, intercepts, meta, ) = _calculate_linear_regression(self) out_shape = (meta.shape[0], len(times_in_s)) regression_timeseries = ( np.broadcast_to(gradients, out_shape[::-1]).T * times_in_s + intercepts[:, np.newaxis] ) out = type(self)( data=regression_timeseries.T, index=times_numpy, columns=meta.to_dict(orient="list"), ) return out
def _calculate_linear_regression(in_scmrun): time_unit = "s" times_numpy = in_scmrun.time_points.values.astype( "datetime64[{}]".format(time_unit) ) times_in_s = times_numpy.astype("int") ts = in_scmrun.timeseries() if ts.isnull().sum().sum() > 0: warnings.warn( "You are calculating a linear regression of data which contains " "nans so your result will also contain nans. Perhaps you want to " "remove the nans before calculating the regression using a " "combination of :meth:`filter` and :meth:`interpolate`?" ) res = np.polyfit(times_in_s, ts.T, 1) gradients = res[0, :] intercepts = res[1, :] meta = ts.index.to_frame().reset_index(drop=True) return times_numpy, times_in_s, time_unit, gradients, intercepts, meta
[docs]def inject_ops_methods(cls): """ Inject the operation methods Parameters ---------- cls Target class """ methods = [ ("subtract", subtract), ("add", add), ("multiply", multiply), ("divide", divide), ("integrate", integrate), ("delta_per_delta_time", delta_per_delta_time), ("linear_regression", linear_regression), ("linear_regression_gradient", linear_regression_gradient), ("linear_regression_intercept", linear_regression_intercept), ("linear_regression_scmrun", linear_regression_scmrun), ] for name, f in methods: setattr(cls, name, f)