Source code for scmdata.database.backends.netcdf

Database backend for handling local files stored as NetCDF

import glob
import itertools
import os
import os.path

from scmdata.database._utils import _check_is_subdir, ensure_dir_exists
from scmdata.database.backends import BaseDatabaseBackend
from import ScmRun, run_append

def _get_safe_filename(inp, include_glob=False):
    def safe_char(c):
        accepted_chars = "-_."
        if include_glob:
            accepted_chars = accepted_chars + "*"

        if c.isalnum() or c in accepted_chars or c == os.sep:
            return c

        return "-"

    return "".join(safe_char(c) for c in inp)

[docs]class NetCDFDatabaseBackend(BaseDatabaseBackend): """ Database backend for handling local files stored as NetCDF """
[docs] def get_key(self, sr): """ Get key where the data will be stored The key is the root directory joined with the other information provided. The filepath is also cleaned to remove spaces and special characters. Parameters ---------- sr : :class:`scmdata.ScmRun` Data to save Raises ------ ValueError If non-unique metadata is found for each of :attr:`self.kwargs["levels"]` If any metadata end with '.' KeyError If missing metadata is found for each of :attr:`self.kwargs["levels"]` Returns ------- str Path in which to save the data without spaces or special characters """ levels = { database_level: sr.get_unique_meta( database_level, no_duplicates=True ).replace(os.sep, "_") for database_level in self.kwargs["levels"] } # Windows does not support directories or filenames which end in a '.' if any([level.endswith(".") for level in levels.values()]): raise ValueError("Metadata cannot end in a '.'") return self._get_out_filepath(**levels)
def _get_out_filepath(self, **data_levels): out_levels = [] for database_level in self.kwargs["levels"]: if database_level not in data_levels: # pragma: no cover # emergency valve raise KeyError(f"expected level: {database_level}") out_levels.append(str(data_levels[database_level])) out_path = os.path.join(*out_levels) out_fname = "__".join(out_levels) + ".nc" out_fname = _get_safe_filename(os.path.join(out_path, out_fname)) out_fname = os.path.join(self.kwargs["root_dir"], out_fname) _check_is_subdir(self.kwargs["root_dir"], out_fname) return out_fname
[docs] def save(self, sr): """ Save a ScmRun to the database The dataset should not contain any duplicate metadata for the database levels Parameters ---------- sr : :class:`scmdata.ScmRun` Data to save Raises ------ ValueError If duplicate metadata are present for the requested database levels KeyError If metadata for the requested database levels are not found Returns ------- str Key where the data is saved """ key = self.get_key(sr) ensure_dir_exists(key) if os.path.exists(key): existing_run = ScmRun.from_nc(key) sr = run_append([existing_run, sr]) # Check for required extra dimensions dimensions = self.kwargs.get("dimensions", None) if not dimensions: nunique_meta_vals = sr.meta.nunique() dimensions = nunique_meta_vals[nunique_meta_vals > 1].index.tolist() sr.to_nc(key, dimensions=dimensions) return key
[docs] def load(self, key): """ Load a run using a key Parameters ---------- key: str Returns ------- :class:`scmdata.ScmRun` """ return ScmRun.from_nc(key)
[docs] def delete(self, key): """ Delete a key Parameters ---------- key: str """ os.remove(key)
[docs] def get(self, filters): """ Get all matching objects for a given filter Parameters ---------- filters: dict of str String filters If a level is missing then all values are fetched Returns ------- list of str """ level_options = [] for level in self.kwargs["levels"]: level_values = filters.get(level, ["*"]) if isinstance(level_values, str): level_values = [level_values] level_options.append(level_values) # AND logic across levels, OR logic within levels level_options_product = itertools.product(*level_options) globs_to_check = [ _get_safe_filename(os.path.join(*levels, "*.nc"), include_glob=True) for levels in level_options_product ] load_files = [ v for vlist in [ glob.glob(os.path.join(self.kwargs["root_dir"], g), recursive=True) for g in globs_to_check ] for v in vlist ] return load_files