Source code for netcdf_scm.wranglers

"""Functions used to 'wrangle' netCDF-SCM netCDF files into other formats"""
import logging
from os.path import isfile, join

import numpy as np
from scmdata import ScmRun

from . import mat4py

logger = logging.getLogger(__name__)


[docs]def convert_tuningstruc_to_scmdf( # pylint:disable=too-many-arguments,too-many-locals filepath, variable=None, region=None, unit=None, scenario=None, model=None ): """ Convert a matlab tuningstruc to an :obj:`scmdata.ScmRun` Parameters ---------- filepath : str Filepath from which to load the data variable : str Name of the variable contained in the tuningstruc. If None, `convert_tuningstruc_to_scmdf` will attempt to determine it from the input file. region : str Region to which the data in the tuningstruc applies. If None, `convert_tuningstruc_to_scmdf` will attempt to determine it from the input file. unit : str Units of the data in the tuningstruc. If None, `convert_tuningstruc_to_scmdf` will attempt to determine it from the input file. scenario : str Scenario to which the data in the tuningstruc applies. If None, `convert_tuningstruc_to_scmdf` will attempt to determine it from the input file. model : str The (integrated assessment) model which generated the emissions scenario associated with the data in the tuningstruc. If None, `convert_tuningstruc_to_scmdf` will attempt to determine it from the input file and if it cannot, it will be set to "unspecified". Raises ------ KeyError If a metadata variable is not supplied and it cannot be determined from the tuningstruc. Returns ------- :obj:`scmdata.ScmRun` :obj:`scmdata.ScmRun` with the tuningstruc data """ dataset = mat4py.loadmat(filepath) for m, climate_model in enumerate(dataset["tuningdata"]["modelcodes"]): metadata = { "variable": [variable], "region": [region], "unit": [unit], "climate_model": [climate_model], "scenario": [scenario], "model": [model], } for k, v in metadata.items(): if v == [None]: try: metadata[k] = [dataset["tuningdata"]["model"][m][k]] except KeyError: if k == "model": metadata[k] = ["unspecified"] continue error_msg = "Cannot determine {} from file: " "{}".format( k, filepath ) raise KeyError(error_msg) data = np.asarray(dataset["tuningdata"]["model"][m]["data"]) if len(data) != 2: data = data.T scmdf = ScmRun(data=data[1], index=data[0], columns=metadata) try: ref_df.append(scmdf, inplace=True) except NameError: ref_df = scmdf return ref_df
[docs]def convert_scmdf_to_tuningstruc(scmdf, outdir, prefix=None, force=False): """ Convert an :obj:`scmdata.ScmRun` to a matlab tuningstruc One tuningstruc file will be created for each unique ["model", "scenario", "variable", "region", "unit"] combination in the input :obj:`scmdata.ScmRun`. Parameters ---------- scmdf : :obj:`scmdata.ScmRun` :obj:`scmdata.ScmRun` to convert to a tuningstruc outdir : str Directory in which to save the tuningstruc prefix : str Prefix for the filename. The rest of the filename is generated from the metadata. `.mat` is also appended automatically. If ``None``, no prefix is used. force : bool If True, overwrite any existing files Returns ------- list List of files which were not re-written as they already exist Raises ------ AssertionError If timeseries are not unique for a given ["climate_model", "model", "scenario", "variable", "region", "unit"] combination. """ already_written = [] group_cols = ["model", "scenario", "variable", "region", "unit"] for label, df in scmdf.timeseries().groupby(group_cols): ids = dict(zip(group_cols, label)) dataset = {} dataset["tuningdata"] = {} dataset["tuningdata"]["modelcodes"] = [] dataset["tuningdata"]["model"] = [] for m, (climate_model, cmdf) in enumerate(df.groupby("climate_model")): # impossible to make dataframe with duplicate rows, this is just in # case if cmdf.shape[0] != 1: # pragma: no cover # emergency valve raise AssertionError( "Should only have a single unique timeseries for a given " '["climate_model", "model", "scenario", "variable", ' '"region", "unit"] combination' ) dataset["tuningdata"]["modelcodes"].append(climate_model) dataset["tuningdata"]["model"].append({}) dataset["tuningdata"]["model"][m]["model"] = ids["model"] dataset["tuningdata"]["model"][m]["scenario"] = ids["scenario"] dataset["tuningdata"]["model"][m]["variable"] = ids["variable"] dataset["tuningdata"]["model"][m]["region"] = ids["region"] dataset["tuningdata"]["model"][m]["unit"] = ids["unit"] dataset["tuningdata"]["model"][m]["notes"] = ( "{} {} {} {} ({}) tuningstruc (written with scmcallib)" "".format( ids["scenario"], ids["model"], ids["region"], ids["variable"], ids["unit"], ) ) dataset["tuningdata"]["model"][m]["data"] = [ list(e) for e in zip( [float(t.year) for t in cmdf.columns], list(cmdf.values.squeeze()), ) ] dataset["tuningdata"]["model"][m]["col_code"] = ["YEARS", ids["variable"]] outfile = get_tuningstruc_name_from_df(df, outdir, prefix) if isfile(outfile) and not force: logger.info("Skipped (already exists, not overwriting) %s", outfile) already_written.append(outfile) else: logger.info("Writing %s", outfile) mat4py.savemat(outfile, dataset) return already_written
[docs]def get_tuningstruc_name_from_df(df, outdir, prefix): """ Get the name of a tuningstruc from a ``pd.DataFrame`` Parameters ---------- df : :obj:`pd.DataFrame` *pandas* DataFrame to convert to a tuningstruc outdir : str Base path on which to append the metadata and `.mat`. prefix : str Prefix to prepend to the name. If ``None``, no prefix is prepended. Returns ------- str tuningstruc name Raises ------ ValueError A name cannot be determined because e.g. more than one scenario is contained in the dataframe """ prefix = "{}_".format(prefix) if prefix is not None else "" def _get_col(col): try: vals = df[col].unique() except KeyError: vals = df.index.get_level_values(col).unique() if len(vals) != 1: raise ValueError("More than one {} in df".format(col)) return vals[0] scenario = _get_col("scenario") try: member_id = _get_col("member_id") except KeyError: member_id = "member-id" variable = _get_col("variable") region = _get_col("region") raw_name = ( ( "{}_{}_{}_{}".format(variable, scenario, member_id, region) .replace(" ", "_") .replace("|", "_") ) ).upper() + ".mat" return join(outdir, "{}{}".format(prefix, raw_name))