Source code for netcdf_scm.retractions

"""
Utilities for checking for retracted datasets
"""
import concurrent.futures
import glob
import logging
import os.path
from collections import defaultdict
from urllib.parse import urlencode

import pandas as pd
import pymagicc.io
import requests
import tqdm.autonotebook as tqdman

from .iris_cube_wrappers import CMIP6OutputCube

LOGGER = logging.getLogger(__name__)

ESGF_NODE = "esgf.nci.org.au"


def _create_search_url(**kwargs):
    kwargs.setdefault("format", "application/solr+json")
    kwargs.setdefault("offset", 0)
    kwargs.setdefault("limit", 100)
    kwargs.setdefault("type", "Dataset")
    kwargs.setdefault("replica", False)

    return "https://{}/esg-search/search?{}".format(
        ESGF_NODE, urlencode(kwargs, doseq=True)
    )


def _query_esgf(instance_ids):
    response = requests.get(_create_search_url(instance_id=instance_ids))
    response.raise_for_status()
    response_json = response.json()

    return response_json, instance_ids


def _perform_esgf_queries(instance_ids, esgf_query_batch_size, nworkers):
    futures = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=nworkers) as executor:
        for i in tqdman.tqdm(
            range(0, len(instance_ids), esgf_query_batch_size),
            desc="Querying ESGF (submitting jobs)",
        ):
            batch = instance_ids[i : i + esgf_query_batch_size]
            futures.append(executor.submit(_query_esgf, batch,))

        return [
            f.result()
            for f in tqdman.tqdm(
                concurrent.futures.as_completed(futures),
                total=len(futures),
                desc="Retrieving results from ESGF jobs",
            )
        ]


[docs]def check_retractions(instance_ids, esgf_query_batch_size=100, nworkers=8): """ Check a list of ``instance_ids`` for any retracted datasets Notes ----- This queries external ESGF servers. Please limit the number of parallel requests. Parameters ---------- instance_ids : list of str Datasets to check. ``instance_id`` is the unique identifier for a dataset, for example `CMIP6.CMIP.CSIRO.ACCESS-ESM1-5.esm-hist.r1i1p1f1.Amon.rsut.gn.v20191128` esgf_query_batch_size : int Maximum number of ids to include in each query. nworkers : int Number of workers to parallel queries to ESGF. Returns ------- list of str A list of retracted ``instance_ids`` """ response_jsons_search_instance_ids = _perform_esgf_queries( instance_ids, esgf_query_batch_size, nworkers ) retracted_files = [] for response_json, search_ids in response_jsons_search_instance_ids: found_ids = [] for data_doc in response_json["response"]["docs"]: id_here = data_doc["instance_id"] found_ids.append(id_here) if data_doc["retracted"]: LOGGER.info("Processing dependents of retracted dataset: %s", id_here) retracted_files.append(id_here) if len(found_ids) != len(search_ids): missing_ids = set(search_ids) - set(found_ids) for mid in missing_ids: LOGGER.warning( "Couldn't find %s (search url: %s)", mid, _create_search_url(instance_id=mid), ) return retracted_files
[docs]def check_retracted_files(filenames_or_dir, filename_filter="*.nc", **kwargs): """ Check if any files are retracted Notes ----- This queries external ESGF servers. Please limit the number of parallel requests. Parameters ---------- filenames_or_dir: list of str or str A list of filenames or a directory to check for any retractions. If a string is provided, it is assumed to reference a directory and any files within that directory matching the `filename_filter` will be checked. filename_filter: str If a directory is passed all files matching the filter will be checked. **kwargs : any Passed to :func:`check_retracted` Returns ------- List of the retracted files """ if isinstance(filenames_or_dir, str): if os.path.isdir(filenames_or_dir): fnames = glob.glob( os.path.join(filenames_or_dir, "**", filename_filter), recursive=True, ) else: raise NotADirectoryError(filenames_or_dir) else: if not isinstance(filenames_or_dir, list): raise ValueError("filenames_or_dir should be a list of filenames") fnames = filenames_or_dir filepaths_instance_ids = [] for fname in tqdman.tqdm(fnames): try: filepaths_instance_ids.append( (fname, CMIP6OutputCube.get_instance_id(fname)) ) except ValueError: LOGGER.exception("could not get instance_id from {}".format(fname)) instance_ids = list(set([v[1] for v in filepaths_instance_ids])) retracted = check_retractions(instance_ids, **kwargs) retracted_files = [] for fp, instance_id in filepaths_instance_ids: if instance_id in retracted: retracted_files.append(fp) return retracted_files
[docs]def check_depends_on_retracted(mag_files, raise_on_mismatch=True, **kwargs): """ Check if a ``.MAG`` file was calculated from now retracted data Notes ----- This queries external ESGF servers. Please limit the number of parallel requests. Parameters ---------- mag_files : list of str List of ``.MAG`` files to check raise_on_mismatch : bool If a file cannot be processed, should an error be raised? If ``False``, an error message is logged instead. **kwargs : any Passed to :func:`check_retractions` Returns ------- pd.DataFrame Dataframe which describes the retracted status of each file in ``mag_files``. The columns are: - "mag_file": the files in ``mag_files`` - "dependency_file": file which the file in the "mag_file" column depends on (note that the ``.MAG`` files may have more than one dependency so they may appear more than once in the "mag_file" column) - "dependency_instance_id": instance id (i.e. unique ESGF identifier) of the dependency file - "dependency_retracted": whether the dependency file has been retracted or not (``True`` if the file has been retracated) The list of retracted ``.MAG`` files can then be accessed with e.g. ``res.loc[res["dependency_retracted"], "mag_file"].unique()`` Raises ------ ValueError The ``.MAG`` file is not based on CMIP6 data (retractions cannot be checked automatically for CMIP5 data with netCDF-SCM). ValueError Metadata about a ``.MAG`` file's source is not included in the ``.MAG`` file. """ no_source_data_files = [] non_cmip6_data_files = [] out_table = defaultdict(list) for mf in tqdman.tqdm(mag_files): LOGGER.debug("Processing %s", mf) metadata = pymagicc.io.read_mag_file_metadata(mf) dependency_files = _get_dependencies(metadata) if not dependency_files: no_source_data_files.append(mf) continue for df in dependency_files: try: instance_id = CMIP6OutputCube.get_instance_id(df) except ValueError: non_cmip6_data_files.append(mf) continue out_table["mag_file"].append(mf) out_table["dependency_file"].append(df) out_table["dependency_instance_id"].append(instance_id) if no_source_data_files: msg = "Could not determine source data for: {}".format( set(no_source_data_files) ) if raise_on_mismatch: raise ValueError(msg) LOGGER.error(msg) if non_cmip6_data_files: msg = "Source data is not CMIP6 for: {}".format(set(non_cmip6_data_files)) if raise_on_mismatch: raise ValueError(msg) LOGGER.error(msg) out_table = pd.DataFrame(out_table) instance_ids_retracted = check_retractions( out_table["dependency_instance_id"].unique().tolist(), **kwargs ) out_table["dependency_retracted"] = out_table["dependency_instance_id"].isin( instance_ids_retracted ) return out_table
def _get_dependencies(metadata): dependency_files = [] for k in metadata: if "source_files" in k: dependency_files_key = [ v.strip("]").strip("'") for g in metadata[k].split(";") for sg in g.split(", ") for v in sg.split("[")[1:] ] if any(["," in d for d in dependency_files_key]): raise ValueError( "Bad processing of {}, got {}".format( metadata, dependency_files_key ) ) dependency_files += dependency_files_key LOGGER.debug("Dependent files %s", dependency_files) return dependency_files