"""
Utils contains a number of helpful functions for doing common cube operations.
For example, applying masks to cubes, taking latitude-longitude means and getting
timeseries from a cube as datetime values.
"""
import datetime as dt
import logging
import numpy as np
import numpy.ma as ma
from dateutil.relativedelta import relativedelta
try:
import cf_units
import cftime
import dask.array as da
import iris
from iris.analysis import WeightedAggregator, _build_dask_mdtol_function
from iris.exceptions import CoordinateNotFoundError
# monkey patch iris MEAN until https://github.com/SciTools/iris/pull/3299 is merged
iris.analysis.MEAN = WeightedAggregator(
"mean", ma.average, lazy_func=_build_dask_mdtol_function(da.ma.average)
)
except ModuleNotFoundError: # pragma: no cover # emergency valve
from .errors import raise_no_iris_warning
raise_no_iris_warning()
logger = logging.getLogger(__name__)
[docs]def get_cube_timeseries_data(scm_cube, realise_data=False):
"""
Get a timeseries from a cube.
This function only works on cubes which are on a time grid only i.e. have no other dimension coordinates.
Parameters
----------
scm_cube : :obj:`ScmCube`
An ``ScmCube`` instance with only a 'time' dimension.
realise_data : bool
If ``True``, force the data to be realised before returning
Returns
-------
np.ndarray
The cube's timeseries data. If ``realise_data`` is ``False`` then a
``da.Array`` will be returned if the data is lazy.
"""
_assert_only_cube_dim_coord_is_time(scm_cube)
raw_data = scm_cube.cube.core_data()
if isinstance(raw_data, da.Array) and realise_data:
return raw_data.compute()
return raw_data
[docs]def get_scm_cube_time_axis_in_calendar(scm_cube, calendar):
"""
Get a cube's time axis in a given calendar
Parameters
----------
scm_cube : :obj:`ScmCube`
An ``ScmCube`` instance.
calendar : str
The calendar to return the time axis in e.g. '365_day', 'gregorian'.
Returns
-------
np.ndarray
Array of datetimes, containing the cube's calendar.
"""
time = scm_cube.cube.dim_coords[scm_cube.time_dim_number]
return cf_units.num2date(time.points, time.units.name, calendar)
def _assert_only_cube_dim_coord_is_time(scm_cube):
assert_msg = "Should only have time coordinate here"
if len(scm_cube.cube.dim_coords) != 1:
raise AssertionError(assert_msg)
if scm_cube.cube.dim_coords[0].standard_name != "time":
raise AssertionError(assert_msg)
[docs]def assert_all_time_axes_same(time_axes):
"""
Assert all time axes in a set are the same.
Parameters
----------
time_axes : list_like of array_like
List of time axes to compare.
Raises
------
AssertionError
If not all time axes are the same.
"""
for time_axis_to_check in time_axes:
assert_msg = "all the time axes should be the same"
try:
np.testing.assert_array_equal(
time_axis_to_check, time_axes[0], err_msg=assert_msg
)
# handle weird numpy error in case it comes up
except AttributeError: # pragma: no cover
raise AssertionError(assert_msg)
[docs]def take_lat_lon_mean(in_scmcube, in_weights):
"""
Take the latitude longitude mean of a cube with given weights
Parameters
----------
in_scmcube : :obj:`ScmCube`
An ``ScmCube`` instance.
in_weights : np.ndarray
Weights to use when taking the mean.
Returns
-------
:obj:`ScmCube`, float
First output is a copy of the input cube in which the data is now the
latitude-longitude mean of the input cube's data. Second output is
the sum of weights i.e. normalisation used in the weighted mean.
"""
out_cube = type(in_scmcube)()
if in_weights.shape != in_scmcube.cube.shape:
in_weights_broad = broadcast_onto_lat_lon_grid(in_scmcube, in_weights)
else:
in_weights_broad = in_weights
out_cube.cube = in_scmcube.cube.collapsed(
[in_scmcube.lat_name, in_scmcube.lon_name],
iris.analysis.MEAN,
weights=in_weights_broad,
)
normalising_factor = _get_normalising_factor(in_scmcube, in_weights)
return out_cube, normalising_factor
def _get_normalising_factor(in_scmcube, in_weights):
helper = type(in_scmcube)()
helper.cube = in_scmcube._get_lat_lon_slice()
if len(in_weights.shape) > len(helper.cube.shape):
weights_area_slice = [
slice(None)
if d in [in_scmcube.lat_dim_number, in_scmcube.lon_dim_number]
else 0
for d in range(len(in_weights.shape))
]
in_weights = in_weights[tuple(weights_area_slice)]
in_weights_single_timestep = broadcast_onto_lat_lon_grid(helper, in_weights)
helper_dat = helper.cube.data # have to realise data in order to get mask
if hasattr(helper_dat, "mask"):
if isinstance(in_weights_single_timestep, da.Array):
in_weights_single_timestep = in_weights_single_timestep.compute()
normalising_factor = ma.masked_array(
in_weights_single_timestep, helper_dat.mask
).sum()
else:
normalising_factor = in_weights_single_timestep.sum()
return normalising_factor
[docs]def apply_mask(in_scmcube, in_mask):
"""
Apply a mask to an scm cube's data
Parameters
----------
in_scmcube : :obj:`ScmCube`
An ``ScmCube`` instance.
in_mask : np.ndarray
The mask to apply
Returns
-------
:obj:`ScmCube`
A copy of the input cube with the mask applied to its data
"""
out_cube = type(in_scmcube)()
if in_scmcube.cube.has_lazy_data():
new_data = da.ma.masked_array(data=in_scmcube.cube.lazy_data(), mask=in_mask)
else:
new_data = ma.masked_array(in_scmcube.cube.data, mask=in_mask)
out_cube.cube = in_scmcube.cube.copy(data=new_data)
return out_cube
[docs]def unify_lat_lon(cubes, rtol=10 ** -6):
"""
Unify latitude and longitude co-ordinates of cubes in place.
The co-ordinates will only be unified if they already match to within a given
tolerance.
Parameters
----------
cubes : :obj:`iris.cube.CubeList`
List of iris cubes whose latitude and longitude co-ordinates should be unified.
rtol : float
Maximum relative difference which can be accepted between co-ordinate values.
Raises
------
ValueError
If the co-ordinates differ by more than relative tolerance or are not
compatible (e.g. different shape).
"""
ref_lats = cubes[0].coords("latitude")[0].points
ref_lons = cubes[0].coords("longitude")[0].points
for cube in cubes[1:]:
cube_lats = cube.coords("latitude")[0].points
cube_lons = cube.coords("longitude")[0].points
try:
np.testing.assert_allclose(ref_lats, cube_lats, rtol=rtol)
np.testing.assert_allclose(ref_lons, cube_lons, rtol=rtol)
except AssertionError:
error_msg = (
"Cannot unify latitude and longitude, relative difference in "
"co-ordinates is greater than {}".format(rtol)
)
raise ValueError(error_msg)
lat_dim_numbers = cube.coord_dims("latitude")
cube.remove_coord("latitude")
if len(lat_dim_numbers) > 1:
cube.add_aux_coord(cubes[0].coords("latitude")[0], lat_dim_numbers)
else:
cube.add_dim_coord(cubes[0].coords("latitude")[0], lat_dim_numbers[0])
lon_dim_numbers = cube.coord_dims("longitude")
cube.remove_coord("longitude")
if len(lon_dim_numbers) > 1:
cube.add_aux_coord(cubes[0].coords("longitude")[0], lon_dim_numbers)
else:
cube.add_dim_coord(cubes[0].coords("longitude")[0], lon_dim_numbers[0])
[docs]def cube_lat_lon_grid_compatible_with_array(cube, array_in):
"""
Assert that an array can be broadcast onto the cube's lat-lon grid
Parameters
----------
cube : :obj:`ScmCube`
:obj:`ScmCube` instance whose lat-lon grid we want to check agains
array_in : np.ndarray
The array we want to ensure is able to be broadcast
Returns
-------
bool
``True`` if the cube's lat-lon grid is compatible with ``array_in``, otherwise
``False``
Raises
------
AssertionError
The array cannot be broadcast onto the cube's lat-lon grid
"""
base_shape = cube.lat_lon_shape
if array_in.shape != base_shape:
array_in = np.transpose(array_in)
if array_in.shape != base_shape:
return False
return True
[docs]def broadcast_onto_lat_lon_grid(cube, array_in):
"""
Broadcast an array onto the latitude-longitude grid of ``cube``.
Here, broadcasting means taking the array and 'duplicating' it so that it
has the same number of dimensions as the cube's underlying data.
For example, given a cube with a time dimension of length 3, a latitude dimension of length 4
and a longitude dimension of length 2 (shape 3x4x2) and ``array_in`` of shape 4x2, results in
a 3x4x2 array where each slice in the broadcasted array's time dimension is identical to ``array_in``.
Parameters
----------
cube : :obj:`ScmCube`
:obj:`ScmCube` instance whose lat-lon grid we want to check agains
array_in : np.ndarray
The array we want to broadcast
Returns
-------
array_out
The original array, broadcast onto the cube's lat-lon grid (i.e. duplicated
along all dimensions except for latitude and longitude). Note: If the cube has
lazy data, we return a :obj:`da.Array`, otherwise we return an
:obj:`np.ndarray`.
Raises
------
AssertionError
``array_in`` cannot be broadcast onto the cube's lat-lon grid because their
shapes are not compatible
ValueError
``array_in`` cannot be broadcast onto the cube's lat-lon grid by
``iris.util.broadcast_to_shape``
"""
if not cube_lat_lon_grid_compatible_with_array(cube, array_in):
shape_assert_msg = (
"the ``array_in`` must be the same shape as the "
"cube's longitude-latitude grid"
)
raise AssertionError(shape_assert_msg)
broadcaster = np.broadcast_to if not cube.cube.has_lazy_data() else da.broadcast_to
try:
return broadcaster(array_in, cube.cube.shape)
except ValueError as e:
try_transpose = str(e).startswith(
"operands could not be broadcast together with remapped shapes"
) or str(e).startswith("cannot broadcast shape")
if not try_transpose: # pragma: no cover
raise
return broadcaster(array_in.T, cube.cube.shape)
def _cftime_conversion(t):
return dt.datetime(
t.year, t.month, t.day, t.hour, t.minute, t.second, t.microsecond
)
_vector_cftime_conversion = np.vectorize(_cftime_conversion)
def _check_cube_and_adjust_if_needed(cube, time_name="time"):
"""
Check cube and adjust if required
Parameters
----------
cube : :obj:`iris.cube.Cube`
Cube to check
time_name : str
Name of the time dimension of the cube to check
Returns
-------
:obj:`iris.cube.Cube`
Cube, adjusted if needed
"""
try:
time_dim = cube.coord(time_name)
gregorian = time_dim.units.calendar == "gregorian"
year_zero = str(time_dim.units).startswith("days since 0-1-1")
except CoordinateNotFoundError:
gregorian = False
year_zero = False
if gregorian and year_zero:
warn_msg = (
"Your calendar is gregorian yet has units of 'days since 0-1-1'. "
"We rectify this by removing all data before year 1 and changing the "
"units to 'days since 1-1-1'. If you want other behaviour, you will "
"need to use another package."
)
logger.warning(warn_msg)
return _adjust_gregorian_year_zero_units(cube, time_name)
return cube
def _adjust_gregorian_year_zero_units(cube, time_name):
"""
Adjust Gregogrian calendar with year zero.
This function makes the time axis useable with iris again (there is no year zero
in the Gregorian calendar) at the expense of removing the year zero data.
Parameters
----------
cube : :obj:`iris.cube.Cube`
Cube to adjusted
time_name : str
Name of the time dimension of the cube to adjust
Returns
-------
:obj:`iris.cube.Cube`
Adjusted cube
Raises
------
AssertionError
Defensive assertion: the code is being used in an unexpected way
"""
# pylint:disable=too-many-locals
# hack function to work around very specific use case
year_zero_cube = cube.copy()
year_zero_cube_time_dim = cube.coord(time_name)
gregorian_year_zero_cube = (
year_zero_cube_time_dim.units.calendar == "gregorian"
) and str(year_zero_cube_time_dim.units).startswith("days since 0-1-1")
if not gregorian_year_zero_cube: # pragma: no cover # emergency valve
raise AssertionError("This function is not setup for other cases")
new_unit_str = "days since 1-1-1"
# converting with the new units means we're actually converting with the wrong
# units, we use this variable to track how many years to shift back to get the
# right time axis again
new_units_shift = 1
new_time_dim_unit = cf_units.Unit(
new_unit_str, calendar=year_zero_cube_time_dim.units.calendar
)
tmp_time_dim = year_zero_cube_time_dim.copy()
tmp_time_dim.units = new_time_dim_unit
tmp_cube = iris.cube.Cube(year_zero_cube.data)
for i, coord in enumerate(year_zero_cube.dim_coords):
if coord.standard_name == "time":
tmp_cube.add_dim_coord(tmp_time_dim, i)
else:
tmp_cube.add_dim_coord(coord, i)
years_to_bin = 1
first_valid_year = years_to_bin + new_units_shift
def check_usable_data(cell):
return first_valid_year <= cell.point.year
usable_cube = tmp_cube.extract(iris.Constraint(time=check_usable_data))
usable_data = usable_cube.data
tmp_time_dim = usable_cube.coord(time_name)
tmp_time = cftime.num2date(
tmp_time_dim.points, new_unit_str, tmp_time_dim.units.calendar
)
# TODO: move to utils
tmp_time = np.array([dt.datetime(*v.timetuple()[:6]) for v in tmp_time])
# undo the shift to new units
usable_time = cf_units.date2num(
tmp_time - relativedelta(years=new_units_shift),
year_zero_cube_time_dim.units.name,
year_zero_cube_time_dim.units.calendar,
)
usable_time_unit = cf_units.Unit(
year_zero_cube_time_dim.units.name,
calendar=year_zero_cube_time_dim.units.calendar,
)
usable_time_dim = iris.coords.DimCoord(
usable_time,
standard_name=year_zero_cube_time_dim.standard_name,
long_name=year_zero_cube_time_dim.long_name,
var_name=year_zero_cube_time_dim.var_name,
units=usable_time_unit,
)
new_cube = iris.cube.Cube(usable_data)
for i, coord in enumerate(usable_cube.dim_coords):
if coord.standard_name == "time":
new_cube.add_dim_coord(usable_time_dim, i)
else:
new_cube.add_dim_coord(coord, i)
# hard coding as making this list dynamically is super hard as there's so many
# edge cases to cover
attributes_to_copy = [
"attributes",
"cell_methods",
"units",
"var_name",
"standard_name",
"name",
"metadata",
"long_name",
]
for att in attributes_to_copy:
setattr(new_cube, att, getattr(year_zero_cube, att))
return new_cube
def _ensure_is_int(in_val):
error_msg = "Cannot convert to int: {}"
if np.issubdtype(type(in_val), np.integer):
return in_val
if isinstance(in_val, float):
in_float = in_val
else:
try:
if in_val.endswith("D"):
in_float = float(in_val.replace("D", ""))
else:
in_float = float(in_val)
except (AttributeError, ValueError) as exc:
raise ValueError(error_msg.format(in_val)) from exc
out_int = int(in_float)
if out_int != in_float:
raise ValueError(error_msg.format(in_val))
return out_int