Source code for scmdata.filters

"""
Helpers for filtering data in :class:`scmdata.run.ScmRun`.

Based upon :mod:`pyam.utils`.
"""

import datetime
import re
import time
from typing import Iterable, List, Optional, Union

import numpy as np
import pandas as pd

HIERARCHY_SEPARATOR = "|"


[docs]def is_in(vals: List, items: List) -> np.ndarray: """ Find elements of vals which are in items. Parameters ---------- vals The list of values to check items The options used to determine whether each element of :obj:`vals` is in the desired subset or not Returns ------- :class:`numpy.ndarray` of :obj:`bool` Array of the same length as :obj:`vals` where the element is ``True`` if the corresponding element of :obj:`vals` is in :obj:`items` and False otherwise """ return np.array([v in items for v in vals])
[docs]def find_depth( meta_col: pd.Series, s: str, level: Union[int, str], separator: str = HIERARCHY_SEPARATOR, ) -> np.ndarray: """ Find all values which match given depth from a filter keyword. Parameters ---------- meta_col Column in which to find values which match the given depth s Filter keyword, from which level should be applied level Depth of value to match as defined by the number of separator in the value name. If an int, the depth is matched exactly. If a str, then the depth can be matched as either "X-", for all levels up to level "X", or "X+", for all levels above level "X". separator The string used to separate levels in s. Defaults to a pipe ("|"). Returns ------- :class:`numpy.ndarray` of :obj:`bool` Array where ``True`` indicates a match Raises ------ ValueError If :obj:`level` cannot be understood """ # determine function for finding depth level if not isinstance(level, str): def test(x): return level == x elif level[-1] == "-": _level = int(level[:-1]) def test(x): return _level >= x elif level[-1] == "+": _level = int(level[:-1]) def test(x): return _level <= x else: raise ValueError("Unknown level type: {}".format(level)) # determine depth pipe = re.compile(re.escape(separator)) regexp = str(s).replace("*", "") def apply_test(val): return test(len(pipe.findall(val.replace(regexp, "")))) return [m for m in meta_col.categories if apply_test(m)]
[docs]def pattern_match( # pylint: disable=too-many-arguments,too-many-locals meta_col: pd.Series, values: Union[Iterable[str], str], level: Optional[Union[str, int]] = None, regexp: bool = False, separator: str = HIERARCHY_SEPARATOR, ) -> np.ndarray: """ Filter data by matching metadata columns to given patterns. Parameters ---------- meta_col Column to perform filtering on values Values to match level Passed to :func:`find_depth`. For usage, see docstring of :func:`find_depth`. regexp If ``True``, match using regexp rather than our pseudo regexp syntax. has_nan If ``True``, convert all nan values in :obj:`meta_col` to empty string before applying filters. This means that "" and "*" will match rows with :class:`numpy.nan`. If ``False``, the conversion is not applied and so a search in a string column which contains :class:`numpy.nan` will result in a :class:`TypeError`. separator String used to separate the hierarchy levels in values. Defaults to '|' Returns ------- :class:`numpy.ndarray` of :obj:`bool` Array where ``True`` indicates a match Raises ------ TypeError Filtering is performed on a string metadata column which contains :class:`numpy.nan` and :obj:`has_nan` is ``False`` """ matches = np.array([False] * len(meta_col), dtype=bool) _values = ( [values] if not isinstance(values, Iterable) or isinstance(values, str) else values ) for s in _values: if isinstance(s, str) and s == "": s = np.nan use_string_comparison = isinstance(s, str) or ( not np.isnan(s) and pd.api.types.is_string_dtype(meta_col.categories.dtype) ) if use_string_comparison: if not regexp and s == "*" and level is None: matches |= True else: _regexp = ( str(s) .replace("|", "\\|") .replace(".", r"\.") # `.` has to be replaced before `*` .replace("*", ".*") .replace("+", r"\+") .replace("(", r"\(") .replace(")", r"\)") .replace("$", "\\$") .replace("^", "\\^") ) + "$" pattern = re.compile(_regexp if not regexp else str(s)) subset = [m for m in meta_col.categories if pattern.match(str(m))] if level is not None: depth = find_depth(meta_col, str(s), level, separator=separator) subset = set(subset).intersection(set(depth)) matches |= meta_col.isin(subset) else: s = float(s) if np.isnan(s): matches |= [ c == -1 for c in meta_col.codes ] # nan's are missing from categoricals else: matches |= np.isclose(s, meta_col.astype(float)) return matches
[docs]def years_match(data: List, years: Union[List[int], np.ndarray, int]) -> np.ndarray: """ Match years in time columns for data filtering. Parameters ---------- data Input data to perform filtering on years Years to match Returns ------- :class:`numpy.ndarray` of :obj:`bool` Array where True indicates a match Raises ------ TypeError If :obj:`years` is not :obj:`int` or list of :obj:`int` """ years = [years] if isinstance(years, int) else years usable_int = ( all(isinstance(y, (int, np.integer)) for y in years) if isinstance(years, Iterable) else isinstance(years, int) ) if not usable_int: error_msg = "`year` can only be filtered with ints or lists of ints" raise TypeError(error_msg) return is_in(data, years)
[docs]def month_match( data: List, months: Union[List[str], List[int], int, str] ) -> np.ndarray: """ Match months in time columns for data filtering. Parameters ---------- data Input data to perform filtering on months Months to match Returns ------- :class:`numpy.ndarray` of :obj:`bool` Array where ``True`` indicates a match """ return time_match(data, months, ["%b", "%B"], "tm_mon", "month")
[docs]def day_match(data: List, days: Union[List[str], List[int], int, str]) -> np.ndarray: """ Match days in time columns for data filtering. Parameters ---------- data Input data to perform filtering on days Days to match Returns ------- :class:`numpy.ndarray` of :obj:`bool` Array where ``True`` indicates a match """ return time_match(data, days, ["%a", "%A"], "tm_wday", "day")
[docs]def hour_match(data: List, hours: Union[List[int], int]) -> np.ndarray: """ Match hours in time columns for data filtering. Parameters ---------- data Input data to perform filtering on hours Hours to match Returns ------- :class:`numpy.ndarray` of :obj:`bool` Array where ``True`` indicates a match """ hours_list = [hours] if isinstance(hours, int) else hours return is_in(data, hours_list)
[docs]def time_match( data: List, times: Union[List[str], List[int], int, str], conv_codes: List[str], strptime_attr: str, name: str, ) -> np.ndarray: """ Match times by applying conversion codes to filtering list. Parameters ---------- data Input data to perform filtering on times Times to match conv_codes If :obj:`times` contains strings, conversion codes to try passing to :func:`time.strptime` to convert :obj:`times` to :class:`datetime.datetime` strptime_attr If :obj:`times` contains strings, the :class:`datetime.datetime` attribute to finalize the conversion of strings to integers name Name of the part of a datetime to extract, used to produce useful error messages. Returns ------- :class:`numpy.ndarray` of :obj:`bool` Array where ``True`` indicates a match Raises ------ ValueError If input times cannot be converted understood or if input strings do not lead to increasing integers (i.e. "Nov-Feb" will not work, one must use ["Nov-Dec", "Jan-Feb"] instead) """ times_list = [times] if isinstance(times, (int, str)) else times def conv_strs(strs_to_convert, conv_codes, name): res = None for conv_code in conv_codes: try: res = [ getattr(time.strptime(t, conv_code), strptime_attr) for t in strs_to_convert ] break except ValueError: continue if res is None: error_msg = "Could not convert {} '{}' to integer".format( name, strs_to_convert ) raise ValueError(error_msg) return res if isinstance(times_list[0], str): to_delete = [] to_append = [] # type: List for i, timeset in enumerate(times_list): # ignore type as already established we're looking at strings if "-" in timeset: # type: ignore ints = conv_strs(timeset.split("-"), conv_codes, name) # type: ignore if ints[0] > ints[1]: error_msg = ( "string ranges must lead to increasing integer ranges," " {} becomes {}".format(timeset, ints) ) raise ValueError(error_msg) # + 1 to include last month to_append += [j for j in range(ints[0], ints[1] + 1)] to_delete.append(i) for i in to_delete: del times_list[i] times_list = conv_strs(times_list, conv_codes, name) times_list += to_append return is_in(data, times_list)
[docs]def datetime_match( data: List, dts: Union[List[datetime.datetime], datetime.datetime] ) -> np.ndarray: """ Match datetimes in time columns for data filtering. Parameters ---------- data Input data to perform filtering on dts Datetimes to use for filtering Returns ------- :class:`numpy.ndarray` of :obj:`bool` Array where ``True`` indicates a match Raises ------ TypeError :obj:`dts` contains :obj:`int` """ dts = [dts] if isinstance(dts, datetime.datetime) else dts if isinstance(dts, int) or any([isinstance(d, int) for d in dts]): error_msg = "`time` can only be filtered with datetimes or lists of datetimes" raise TypeError(error_msg) return is_in(data, dts)