Source code for netcdf_scm.output

"""
Module for handling crunching output tracking

This module handles checking whether a file has already been crunched and if its
source files have been updated since it was last crunched.
"""
import json
import logging
from collections import OrderedDict
from datetime import datetime
from os.path import exists, join

logger = logging.getLogger(__name__)


[docs]class OutputFileDatabase: """ Holds a list of output files which have been written. Also keeps track of the source files used to create each output file. """ filename = "netcdf-scm_crunched.jsonl" def __init__(self, out_dir): """ Initialise. Parameters ---------- out_dir : str Directory in which to save the database (filename is given by ``self.filename``) """ self.out_dir = out_dir # Choosing a OrderedDict because it's time complexity for checking if an item # already exists is constant, while being able to keep the items in time order self._data = OrderedDict() self._fp = self.load_from_file() def __len__(self): """Get length of database""" return len(self._data)
[docs] def load_from_file(self): """ Load database from ``self.out_dir`` Returns ------- :obj:`io.TextIOWrapper` Handle to the loaded filepath Raises ------ ValueError The loaded file contains more than one entry for a given filename """ fname = join(self.out_dir, self.filename) if not exists(fname): logger.warning("No output tracking file available. Creating new file") return open(fname, "w") fp = open(fname, "r+") lines = fp.readlines() for line in lines: info = json.loads(line) k = info["filename"] if k in self._data: raise ValueError( "Corrupted output file: duplicate entries for {}".format(k) ) self._data[info["filename"]] = info logger.info("Read in %s items from database %s", len(self._data), self.filename) return fp
[docs] def register(self, out_fname, info): """ Register a filepath with info in the database Parameters ---------- out_fname : str Filepath to register info : dict ``out_fname``'s metadata """ if out_fname in self._data: # Need to dump the new order of the contents to file del self._data[out_fname] self.dump() r = { **info, **{"filename": out_fname, "updated_at": datetime.utcnow().isoformat()}, } self._data[out_fname] = r self._write_line(r) self._fp.flush()
def _write_line(self, line): """Flush out a line to file""" self._fp.write("{}\n".format(json.dumps(line)))
[docs] def dump(self): """Rewrite the entire file""" logger.info("Rewriting output file") self._fp.close() # Create a new file truncating the old values self._fp = open(join(self.out_dir, self.filename), "w") for _, l in self._data.items(): self._write_line(l) self._fp.flush()
[docs] def contains_file(self, filepath): """ Return whether a filepath exists in the database Parameters ---------- filepath : str Filepath to check (use absolute paths to be safe) Returns ------- bool If the file is in the database, True, otherwise False """ return filepath in self._data