Source code for scmdata.database

"""
Database for handling large datasets in a performant, but flexible way

Data is chunked using unique combinations of metadata. This allows for the
database to expand as new data is added without having to change any of the
existing data.

Subsets of data are also able to be read without having to load all the data
and then filter. For example, one could save model results from a number of different
climate models and then load just the ``Surface Temperature`` data for all models.
"""
import glob
import itertools
import os
import os.path
import pathlib
from abc import ABC, abstractmethod

import pandas as pd
import six
import tqdm.autonotebook as tqdman

from scmdata import ScmRun, run_append


[docs]def ensure_dir_exists(fp): """ Ensure directory exists Parameters ---------- fp : str Filepath of which to ensure the directory exists """ dir_to_check = os.path.dirname(fp) if not os.path.isdir(dir_to_check): try: os.makedirs(dir_to_check) except OSError: # pragma: no cover # Prevent race conditions if multiple threads attempt to create dir at same time if not os.path.exists(dir_to_check): raise
def _check_is_subdir(root, d): root_path = pathlib.Path(root).resolve() out_path = pathlib.Path(d).resolve() is_subdir = root_path in out_path.parents # Sanity check that we never mangle anything outside of the root dir if not is_subdir: # pragma: no cover raise AssertionError("{} not in {}".format(d, root)) def _get_safe_filename(inp): def safe_char(c): if c.isalnum() or c in "-/*_.": return c else: return "-" return "".join(safe_char(c) for c in inp)
[docs]class DatabaseBackend(ABC): """ Abstract backend for serialising/deserialising data Data is stored as objects represented by keys. These keys can be used later to load data. """ def __init__(self, **kwargs): self.kwargs = kwargs
[docs] @abstractmethod def save(self, sr): """ Save data Parameters ---------- sr: scmdata.ScmRun Returns ------- str Key where the data is stored """ pass
[docs] @abstractmethod def load(self, key): """ Load data at a given key Parameters ---------- key : str Key to load Returns ------- scmdata.ScmRun """ pass
[docs] def delete(self, key): """ Delete a given key Parameters ---------- key: str """ pass
[docs] @abstractmethod def get(self, filters): """ Get all matching keys for a given filter Parameters ---------- filters: dict of str String filters If a level is missing then all values are fetched Returns ------- list of str Each item is a key which may contain data which is of interest """ pass
[docs]class NetCDFBackend(DatabaseBackend): """ On-disk database handler for outputs from SCMs Data is split into groups as specified by :attr:`levels`. This allows for fast reading and writing of new subsets of data when a single output file is no longer performant or data cannot all fit in memory. """
[docs] def get_key(self, sr): """ Get key where the data will be stored The key is the root directory joined with the other information provided. The filepath is also cleaned to remove spaces and special characters. Parameters ---------- sr : :class:`scmdata.ScmRun` Data to save Raises ------ ValueError If non-unique metadata is found for each of :attr:`self.kwargs["levels"]` KeyError If missing metadata is found for each of :attr:`self.kwargs["levels"]` Returns ------- str Path in which to save the data without spaces or special characters """ levels = { database_level: sr.get_unique_meta( database_level, no_duplicates=True ).replace(os.sep, "_") for database_level in self.kwargs["levels"] } return self._get_out_filepath(**levels)
def _get_out_filepath(self, **data_levels): out_levels = [] for database_level in self.kwargs["levels"]: if database_level not in data_levels: # pragma: no cover # emergency valve raise KeyError("expected level: {}".format(database_level)) out_levels.append(str(data_levels[database_level])) out_path = os.path.join(self.kwargs["root_dir"], *out_levels) out_fname = "__".join(out_levels) + ".nc" out_fname = os.path.join(out_path, out_fname) _check_is_subdir(self.kwargs["root_dir"], out_fname) return _get_safe_filename(out_fname)
[docs] def save(self, sr): """ Save a ScmRun to the database The dataset should not contain any duplicate metadata for the database levels Parameters ---------- sr : :class:`scmdata.ScmRun` Data to save Raises ------ ValueError If duplicate metadata are present for the requested database levels KeyError If metadata for the requested database levels are not found Returns ------- str Key where the data is saved """ key = self.get_key(sr) ensure_dir_exists(key) if os.path.exists(key): existing_run = ScmRun.from_nc(key) sr = run_append([existing_run, sr]) # Check for required extra dimensions dimensions = self.kwargs.get("dimensions", None) if not dimensions: nunique_meta_vals = sr.meta.nunique() dimensions = nunique_meta_vals[nunique_meta_vals > 1].index.tolist() sr.to_nc(key, dimensions=dimensions) return key
[docs] def load(self, key): """ Parameters ---------- key: str Returns ------- :class:`scmdata.ScmRun` """ return ScmRun.from_nc(key)
[docs] def delete(self, key): """ Delete a key Parameters ---------- key: str """ os.remove(key)
[docs] def get(self, filters): """ Get all matching objects for a given filter Parameters ---------- filters: dict of str String filters If a level is missing then all values are fetched Returns ------- list of str """ level_options = [] for level in self.kwargs["levels"]: level_values = filters.get(level, ["*"]) if isinstance(level_values, str): level_values = [level_values] level_options.append(level_values) # AND logic across levels, OR logic within levels level_options_product = itertools.product(*level_options) globs_to_check = [ _get_safe_filename(os.path.join(self.kwargs["root_dir"], *levels, "*.nc")) for levels in level_options_product ] load_files = [ v for vlist in [glob.glob(g, recursive=True) for g in globs_to_check] for v in vlist ] return load_files
""" Loaded backends for ScmDatabase Additional backends should be based upon :class:`DatabaseBackend` """ backend_classes = {"netcdf": NetCDFBackend}
[docs]class ScmDatabase: """ On-disk database handler for outputs from SCMs Data is split into groups as specified by :attr:`levels`. This allows for fast reading and writing of new subsets of data when a single output file is no longer performant or data cannot all fit in memory. """ def __init__( self, root_dir, levels=("climate_model", "variable", "region", "scenario"), backend="netcdf", backend_config=None, ): """ Initialise the database Parameters ---------- root_dir : str The root directory of the database levels : tuple of str Specifies how the runs should be stored on disk. The data will be grouped by ``levels``. These levels should be adapted to best match the input data and desired access pattern. If there are any additional varying dimensions, they will be stored as dimensions. backend: str or :class:`DatabaseBackend` Determine the backend to serialize and deserialize data Defaults to using :class:`NetCDFBackend` which reads and writes data as netCDF files. Note that this requires the optional dependency of netCDF4 to be installed. If a custom backend class is being used, it must be extend the :class:`DatabaseBackend` class. backend_config: dict Additional configuration to pass to the backend See the documentation for the target backend to determine what configuration options are available. .. note:: Creating a new :class:`ScmDatabase` does not modify any existing data on disk. To load an existing database ensure that the :attr:`root_dir` and :attr:`levels` are the same as the previous instance. """ self._root_dir = root_dir self.levels = tuple(levels) backend_config = backend_config if backend_config else {} for key in ["levels", "root_dir"]: if key in backend_config: raise ValueError( "backend_config cannot contain key of `{}`".format(key) ) backend_config["levels"] = self.levels backend_config["root_dir"] = root_dir self._backend = self._get_backend(backend, backend_config) def _get_backend(self, backend, backend_config): if isinstance(backend, six.string_types): try: cls = backend_classes[backend.lower()] return cls(**backend_config) except KeyError: raise ValueError("Unknown database backend: {}".format(backend)) else: if not isinstance(backend, DatabaseBackend): raise ValueError( "Backend should be an instance of scmdata.database.DatabaseBackend" ) return backend def __repr__(self): return "<scmdata.database.SCMDatabase (root_dir: {}, levels: {})>".format( self._root_dir, self.levels ) @property def root_dir(self): """ Root directory of the database. Returns ------- str """ return self._root_dir def _clean_filters(self, filters): for level in filters: if level not in self.levels: raise ValueError("Unknown level: {}".format(level)) if os.sep in filters[level]: filters[level] = filters[level].replace(os.sep, "_") return filters
[docs] def save(self, scmrun, disable_tqdm=False): """ Save data to the database The results are saved with one file for each unique combination of :attr:`levels` in a directory structure underneath ``root_dir``. Use :meth:`available_data` to see what data is available. Subsets of data can then be loaded as an :class:`scmdata.ScmRun <scmdata.run.ScmRun>` using :meth:`load`. Parameters ---------- scmrun : :class:`scmdata.ScmRun <scmdata.run.ScmRun>` Data to save. The timeseries in this run should have valid metadata for each of the columns specified in ``levels``. disable_tqdm: bool If True, do not show the progress bar Raises ------ KeyError If a filter for a level not in :attr:`levels` is specified """ for r in tqdman.tqdm( scmrun.groupby(self.levels), leave=False, desc="Saving to database", disable=disable_tqdm, ): self._backend.save(r)
[docs] def load(self, disable_tqdm=False, **filters): """ Load data from the database Parameters ---------- disable_tqdm: bool If True, do not show the progress bar filters: dict of str : [str, list[str]] Filters for the data to load. Defaults to loading all values for a level if it isn't specified. If a filter is a list then OR logic is applied within the level. For example, if we have ``scenario=["ssp119", "ssp126"]`` then both the ssp119 and ssp126 scenarios will be loaded. Returns ------- :class:`scmdata.ScmRun` Loaded data Raises ------ ValueError If a filter for a level not in :attr:`levels` is specified If no data matching ``filters`` is found """ filters = self._clean_filters(filters) load_files = self._backend.get(filters) return run_append( [ self._backend.load(f) for f in tqdman.tqdm( load_files, desc="Loading files", leave=False, disable=disable_tqdm, ) ] )
[docs] def delete(self, **filters): """ Delete data from the database Parameters ---------- filters: dict of str Filters for the data to load. Defaults to deleting all data if nothing is specified. Raises ------ ValueError If a filter for a level not in :attr:`levels` is specified """ filters = self._clean_filters(filters) targets = self._backend.get(filters) for t in targets: _check_is_subdir(self._root_dir, t) self._backend.delete(t)
[docs] def available_data(self): """ Get all the data which is available to be loaded If metadata includes non-alphanumeric characters then it might appear modified in the returned table. The original metadata values can still be used to filter data. Returns ------- :class:`pd.DataFrame` """ all_files = self._backend.get({}) file_meta = [] for f in all_files: dirnames = f.split(os.sep)[:-1] file_meta.append(dirnames[-len(self.levels) :]) data = pd.DataFrame(file_meta, columns=self.levels) return data.sort_values(by=data.columns.to_list()).reset_index(drop=True)