"""Classes and functions for defining, finding, and loading data."""
from __future__ import annotations
import fnmatch
import logging
import os
import pprint
import re
import textwrap
import uuid
from copy import deepcopy
from fnmatch import fnmatchcase
from itertools import groupby
from pathlib import Path
from typing import TYPE_CHECKING, Any
from esmvalcore import esgf, local
from esmvalcore._recipe import check
from esmvalcore._recipe.from_datasets import datasets_to_recipe
from esmvalcore.cmor.table import _get_mips, _update_cmor_facets
from esmvalcore.config import CFG, Session
from esmvalcore.config._config import (
get_activity,
get_ignored_warnings,
get_institutes,
load_extra_facets,
)
from esmvalcore.exceptions import InputFilesNotFound, RecipeError
from esmvalcore.local import (
_dates_to_timerange,
_get_output_file,
_get_start_end_date,
)
from esmvalcore.preprocessor import preprocess
from esmvalcore.preprocessor._derive import get_required
if TYPE_CHECKING:
from collections.abc import Iterable, Iterator, Sequence
from iris.cube import Cube
from esmvalcore.typing import Facets, FacetValue
__all__ = [
"Dataset",
"INHERITED_FACETS",
"datasets_to_recipe",
]
logger = logging.getLogger(__name__)
File = esgf.ESGFFile | local.LocalFile
INHERITED_FACETS: list[str] = [
"dataset",
"domain",
"driver",
"grid",
"project",
"timerange",
]
"""Inherited facets.
Supplementary datasets created based on the available files using the
:func:`Dataset.from_files` method will inherit the values of these facets from
the main dataset.
"""
def _augment(base: dict, update: dict) -> None:
"""Update dict `base` with values from dict `update`."""
for key in update:
if key not in base:
base[key] = update[key]
def _isglob(facet_value: FacetValue | None) -> bool:
"""Check if a facet value is a glob pattern."""
return isinstance(facet_value, str) and bool(
re.match(r".*[\*\?]+.*|.*\[.*\].*", facet_value),
)
def _ismatch(facet_value: FacetValue, pattern: FacetValue) -> bool:
"""Check if a facet value matches a glob pattern."""
return (
isinstance(pattern, str)
and isinstance(facet_value, str)
and fnmatchcase(facet_value, pattern)
)
[docs]
class Dataset:
"""Define datasets, find the related files, and load them.
Parameters
----------
**facets
Facets describing the dataset. See
:obj:`esmvalcore.esgf.facets.FACETS` for the mapping between
the facet names used by ESMValCore and those used on ESGF.
Attributes
----------
supplementaries: list[Dataset]
List of supplementary datasets.
facets: :obj:`esmvalcore.typing.Facets`
Facets describing the dataset.
"""
_SUMMARY_FACETS: tuple[str, ...] = (
"short_name",
"mip",
"project",
"dataset",
"rcm_version",
"driver",
"domain",
"activity",
"exp",
"ensemble",
"grid",
"version",
)
"""Facets used to create a summary of a Dataset instance."""
def __init__(self, **facets: FacetValue) -> None:
self.facets: Facets = {}
self.supplementaries: list[Dataset] = []
self._persist: set[str] = set()
self._session: Session | None = None
self._files: Sequence[File] | None = None
self._file_globs: Sequence[Path] | None = None
self._input_datasets: list[Dataset] = []
for key, value in facets.items():
self.set_facet(key, deepcopy(value), persist=True)
[docs]
@staticmethod
def from_recipe(
recipe: Path | str | dict,
session: Session,
) -> list[Dataset]:
"""Read datasets from a recipe.
Parameters
----------
recipe
:ref:`Recipe <recipe>` to load the datasets from. The value
provided here should be either a path to a file, a recipe file
that has been loaded using e.g. :func:`yaml.safe_load`, or an
:obj:`str` that can be loaded using :func:`yaml.safe_load`.
session
Datasets to use in the recipe.
Returns
-------
list[Dataset]
A list of datasets.
"""
from esmvalcore._recipe.to_datasets import ( # noqa: PLC0415
datasets_from_recipe,
)
return datasets_from_recipe(recipe, session)
def _is_derived(self) -> bool:
"""Return ``True`` for derived variables, ``False`` otherwise."""
return bool(self.facets.get("derive", False))
def _is_force_derived(self) -> bool:
"""Return ``True`` for force-derived variables, ``False`` otherwise."""
return self._is_derived() and bool(
self.facets.get("force_derivation", False),
)
def _derivation_necessary(self) -> bool:
"""Return ``True`` if derivation is necessary, ``False`` otherwise."""
# If variable cannot be derived, derivation is not necessary
if not self._is_derived():
return False
# If forced derivation is requested, derivation is necessary
if self._is_force_derived():
return True
# Otherwise, derivation is necessary of no files for the self dataset
# are found
ds_copy = self.copy()
ds_copy.supplementaries = []
# Avoid potential errors from missing data during timerange glob
# expansion
if _isglob(ds_copy.facets.get("timerange", "")):
ds_copy.facets.pop("timerange", None)
return not ds_copy.files
def _get_input_datasets(self) -> list[Dataset]:
"""Get input datasets."""
input_datasets: list[Dataset] = []
required_vars_facets = get_required(
self.facets["short_name"], # type: ignore
self.facets["project"], # type: ignore
)
for required_facets in required_vars_facets:
input_dataset = self._copy(derive=False, force_derivation=False)
keep = {"alias", "recipe_dataset_index", *self.minimal_facets}
input_dataset.facets = {
k: v for k, v in input_dataset.facets.items() if k in keep
}
input_dataset.facets.update(required_facets)
input_dataset.augment_facets()
input_datasets.append(input_dataset)
return input_datasets
@property
def input_datasets(self) -> list[Dataset]:
"""Get input datasets.
For non-derived variables (i.e., those with facet ``derive=False``),
this will simply return the dataset itself in a list.
For derived variables (i.e., those with facet ``derive=True``), this
will return the datasets required for derivation if derivation is
necessary, and the dataset itself if derivation is not necessary.
Derivation is necessary if the facet ``force_derivation=True`` is set
or no files for the dataset itself are available.
See also :func:`esmvalcore.preprocessor.derive` for an example usage.
"""
if self._input_datasets:
return self._input_datasets
if not self._derivation_necessary():
input_datasets = [self]
else:
input_datasets = self._get_input_datasets()
self._input_datasets = input_datasets
return input_datasets
@staticmethod
def _file_to_dataset(
dataset: Dataset,
file: esgf.ESGFFile | local.LocalFile,
) -> Dataset:
"""Create a dataset from a file with a `facets` attribute."""
facets = dict(file.facets)
if "version" not in dataset.facets:
# Remove version facet if no specific version requested
facets.pop("version", None)
updated_facets = {
f: v
for f, v in facets.items()
if f in dataset.facets
and _isglob(dataset.facets[f])
and _ismatch(v, dataset.facets[f])
}
new_dataset = dataset.copy()
new_dataset.facets.update(updated_facets)
# If possible, remove unexpanded facets that can be automatically
# populated.
unexpanded = {f for f, v in new_dataset.facets.items() if _isglob(v)}
required_for_augment = {"project", "mip", "short_name", "dataset"}
if unexpanded and not unexpanded & required_for_augment:
copy = new_dataset.copy()
copy.supplementaries = []
for facet in unexpanded:
copy.facets.pop(facet)
copy.augment_facets()
for facet in unexpanded:
if facet in copy.facets:
new_dataset.facets.pop(facet)
return new_dataset
def _get_all_available_datasets(self) -> Iterator[Dataset]: # noqa: C901
"""Yield datasets based on the available files.
This function requires that self.facets['mip'] is not a glob pattern.
Does take variable derivation into account, i.e., datasets available
through variable derivation are returned.
"""
datasets_found = False
# If no forced derivation is requested, search for datasets based on
# files from self
if not self._is_force_derived():
for dataset in self._get_available_datasets(self):
datasets_found = True
yield dataset
# For variables that cannot be derived, we are done here
if not self._is_derived():
return
# If forced derivation is requested or no datasets based on files from
# self have been found, search for datasets based on files from input
# datasets
if self._is_force_derived() or not datasets_found:
all_datasets: list[list[tuple[dict, Dataset]]] = []
for input_dataset in self._get_input_datasets():
all_datasets.append([])
for expanded_ds in self._get_available_datasets(
input_dataset,
):
updated_facets = {}
for key, value in self.facets.items():
if _isglob(value):
if key in expanded_ds.facets and not _isglob(
expanded_ds[key],
):
updated_facets[key] = expanded_ds.facets[key]
new_ds = self.copy()
new_ds.facets.update(updated_facets)
new_ds.supplementaries = self.supplementaries
all_datasets[-1].append((updated_facets, new_ds))
# Only consider those datasets that contain all input variables
# necessary for derivation
for updated_facets, new_ds in all_datasets[0]:
other_facets = [[d[0] for d in ds] for ds in all_datasets[1:]]
if all(updated_facets in facets for facets in other_facets):
yield new_ds
else:
logger.debug(
"Not all necessary input variables to derive '%s' are "
"available for %s with facets %s",
self["short_name"],
new_ds.summary(shorten=True),
updated_facets,
)
def _get_available_datasets(self, dataset: Dataset) -> Iterator[Dataset]:
"""Yield datasets based on the available files.
This function requires that self.facets['mip'] is not a glob pattern.
Does not take variable derivation into account, i.e., datasets
potentially available through variable derivation are ignored. To
consider derived variables properly, use the function
:func:`_get_all_available_datasets`.
"""
dataset_template = dataset.copy()
dataset_template.supplementaries = []
if _isglob(dataset_template.facets.get("timerange")):
# Remove wildcard `timerange` facet, because data finding cannot
# handle it
dataset_template.facets.pop("timerange")
seen = set()
partially_defined = []
expanded = False
for file in dataset_template.files:
new_dataset = self._file_to_dataset(dataset, file)
# Filter out identical datasets
facetset = frozenset(
(f, frozenset(v) if isinstance(v, list) else v)
for f, v in new_dataset.facets.items()
)
if facetset not in seen:
seen.add(facetset)
if any(
_isglob(v)
for f, v in new_dataset.facets.items()
if f != "timerange"
):
partially_defined.append((new_dataset, file))
else:
new_dataset._update_timerange() # noqa: SLF001
expanded = True
yield new_dataset
# Only yield datasets with globs if there is no better alternative
for new_dataset, file in partially_defined:
msg = (
f"{new_dataset} with unexpanded wildcards, created from file "
f"{file} with facets {file.facets}. Are the missing facets "
"in the path to the file?"
if isinstance(file, local.LocalFile)
else "available on ESGF?"
)
if expanded:
logger.info("Ignoring %s", msg)
else:
logger.debug(
"Not updating timerange and supplementaries for %s "
"because it still contains wildcards.",
msg,
)
yield new_dataset
[docs]
def from_files(self) -> Iterator[Dataset]:
"""Create datasets based on the available files.
The facet values for local files are retrieved from the directory tree
where the directories represent the facets values.
See :ref:`CMOR-DRS` for more information on this kind of file
organization.
:func:`glob.glob` patterns can be used as facet values to select
multiple datasets.
If for some of the datasets not all glob patterns can be expanded
(e.g. because the required facet values cannot be inferred from the
directory names), these datasets will be ignored, unless this happens
to be all datasets.
If :func:`glob.glob` patterns are used in supplementary variables and
multiple matching datasets are found, only the supplementary dataset
that has most facets in common with the main dataset will be attached.
Supplementary datasets will in inherit the facet values from the main
dataset for those facets listed in :obj:`INHERITED_FACETS`.
This also works for :ref:`derived variables <Variable derivation>`. The
input datasets that are necessary for derivation can be accessed via
:attr:`Dataset.input_datasets`.
Examples
--------
See :ref:`/notebooks/discovering-data.ipynb` for example use cases.
Yields
------
Dataset
Datasets representing the available files.
"""
expanded = False
if any(_isglob(v) for v in self.facets.values()):
if _isglob(self.facets["mip"]):
available_mips = _get_mips(
self.facets["project"], # type: ignore
self.facets["short_name"], # type: ignore
)
mips = [
mip
for mip in available_mips
if _ismatch(mip, self.facets["mip"])
]
else:
mips = [self.facets["mip"]] # type: ignore
for mip in mips:
dataset_template = self.copy(mip=mip)
for dataset in dataset_template._get_all_available_datasets(): # noqa: SLF001
dataset._supplementaries_from_files() # noqa: SLF001
expanded = True
yield dataset
if not expanded:
# If the definition contains no wildcards, no files were found,
# or the file facets didn't match the specification, yield the
# original, but do expand any supplementary globs.
self._supplementaries_from_files()
yield self
def _supplementaries_from_files(self) -> None:
"""Expand wildcards in supplementary datasets."""
supplementaries: list[Dataset] = []
for supplementary_ds in self.supplementaries:
for facet in INHERITED_FACETS:
# allow use of facets from supplementary variable dict
if (
facet in self.facets
and facet not in supplementary_ds.facets
):
supplementary_ds.facets[facet] = self.facets[facet]
supplementaries.extend(supplementary_ds.from_files())
self.supplementaries = supplementaries
self._remove_unexpanded_supplementaries()
self._remove_duplicate_supplementaries()
self._fix_fx_exp()
def _remove_unexpanded_supplementaries(self) -> None:
"""Remove supplementaries where wildcards could not be expanded."""
supplementaries = []
for supplementary_ds in self.supplementaries:
unexpanded = [
f for f, v in supplementary_ds.facets.items() if _isglob(v)
]
if unexpanded:
logger.info(
"For %s: ignoring supplementary variable '%s', "
"unable to expand wildcards %s.",
self.summary(shorten=True),
supplementary_ds.facets["short_name"],
", ".join(f"'{f}'" for f in unexpanded),
)
else:
supplementaries.append(supplementary_ds)
self.supplementaries = supplementaries
def _match(self, other: Dataset) -> int:
"""Compute the match between two datasets."""
score = 0
for facet, value2 in self.facets.items():
if facet in other.facets:
value1 = other.facets[facet]
if isinstance(value1, (list, tuple)):
if isinstance(value2, (list, tuple)):
score += any(elem in value2 for elem in value1)
else:
score += value2 in value1
elif isinstance(value2, (list, tuple)):
score += value1 in value2
else:
score += value1 == value2
return score
def _remove_duplicate_supplementaries(self) -> None:
"""Remove supplementaries that are duplicates."""
not_used = []
supplementaries = list(self.supplementaries)
self.supplementaries.clear()
for _, duplicates in groupby(
supplementaries,
key=lambda ds: ds["short_name"],
):
group = sorted(duplicates, key=self._match, reverse=True)
self.supplementaries.append(group[0])
not_used.extend(group[1:])
if not_used:
logger.debug(
"List of all supplementary datasets found for %s:\n%s",
self.summary(shorten=True),
"\n".join(
sorted(ds.summary(shorten=True) for ds in supplementaries),
),
)
def _fix_fx_exp(self) -> None:
for supplementary_ds in self.supplementaries:
exps = supplementary_ds.facets.get("exp")
frequency = supplementary_ds.facets.get("frequency")
if isinstance(exps, list) and len(exps) > 1 and frequency == "fx":
for exp in exps:
dataset = supplementary_ds.copy(exp=exp)
if dataset.files:
supplementary_ds.facets["exp"] = exp
logger.info(
"Corrected wrong 'exp' from '%s' to '%s' for "
"supplementary variable '%s' of %s",
exps,
exp,
supplementary_ds.facets["short_name"],
self.summary(shorten=True),
)
break
def _copy(self, **facets: FacetValue) -> Dataset:
"""Create a copy of the parent dataset without supplementaries."""
new = self.__class__()
new._session = self._session # noqa: SLF001
for key, value in self.facets.items():
new.set_facet(key, deepcopy(value), key in self._persist)
for key, value in facets.items():
new.set_facet(key, deepcopy(value))
return new
[docs]
def copy(self, **facets: FacetValue) -> Dataset:
"""Create a copy.
Parameters
----------
**facets
Update these facets in the copy. Note that for supplementary
datasets attached to the dataset, the ``'short_name'`` and
``'mip'`` facets will not be updated with these values.
Returns
-------
Dataset
A copy of the dataset.
"""
new = self._copy(**facets)
for supplementary in self.supplementaries:
# The short_name and mip of the supplementary variable are probably
# different from the main variable, so don't copy those facets.
skip = ("short_name", "mip")
supplementary_facets = {
k: v for k, v in facets.items() if k not in skip
}
new_supplementary = supplementary.copy(**supplementary_facets)
new.supplementaries.append(new_supplementary)
return new
def __eq__(self, other: object) -> bool:
"""Compare with another dataset."""
return (
isinstance(other, self.__class__)
and self._session == other._session
and self.facets == other.facets
and self.supplementaries == other.supplementaries
)
def __repr__(self) -> str:
"""Create a string representation."""
first_keys = (
"diagnostic",
"variable_group",
"dataset",
"project",
"mip",
"short_name",
)
def facets2str(facets: Facets) -> str:
view = {k: facets[k] for k in first_keys if k in facets}
for key, value in sorted(facets.items()):
if key not in first_keys:
view[key] = value
return pprint.pformat(view, sort_dicts=False)
txt = [
f"{self.__class__.__name__}:",
facets2str(self.facets),
]
if self.supplementaries:
txt.append("supplementaries:")
txt.extend(
textwrap.indent(facets2str(s.facets), " ")
for s in self.supplementaries
)
if self._session:
txt.append(f"session: '{self.session.session_name}'")
return "\n".join(txt)
def _get_joined_summary_facets(
self,
separator: str,
join_lists: bool = False,
) -> str:
"""Get string consisting of joined summary facets."""
summary_facets_vals = []
for key in self._SUMMARY_FACETS:
if key not in self.facets:
continue
val = self.facets[key]
if join_lists and isinstance(val, (tuple, list)):
val = "-".join(str(elem) for elem in val)
else:
val = str(val)
summary_facets_vals.append(val)
return separator.join(summary_facets_vals)
[docs]
def summary(self, shorten: bool = False) -> str:
"""Summarize the content of dataset.
Parameters
----------
shorten
Shorten the summary.
Returns
-------
str
A summary describing the dataset.
"""
if not shorten:
return repr(self)
title = self.__class__.__name__
txt = f"{title}: " + self._get_joined_summary_facets(", ")
def supplementary_summary(dataset: Dataset) -> str:
return ", ".join(
str(dataset.facets[k])
for k in self._SUMMARY_FACETS
if k in dataset.facets and dataset[k] != self.facets.get(k)
)
if self.supplementaries:
txt += (
", supplementaries: "
+ "; ".join(
supplementary_summary(s) for s in self.supplementaries
)
+ ""
)
return txt
def __getitem__(self, key: str) -> FacetValue:
"""Get a facet value."""
return self.facets[key]
def __setitem__(self, key: str, value: FacetValue) -> None:
"""Set a facet value."""
self.facets[key] = value
[docs]
def set_facet(
self,
key: str,
value: FacetValue,
persist: bool = True,
) -> None:
"""Set facet.
Parameters
----------
key
The name of the facet.
value
The value of the facet.
persist
When writing a dataset to a recipe, only persistent facets
will get written.
"""
self.facets[key] = value
if persist:
self._persist.add(key)
@property
def minimal_facets(self) -> Facets:
"""Return a dictionary with the persistent facets."""
return {k: v for k, v in self.facets.items() if k in self._persist}
@staticmethod
def _get_version(dataset: Dataset) -> str | list[str]:
"""Get available version(s) of dataset."""
versions: set[str] = set()
for file in dataset.files:
if "version" in file.facets:
versions.add(str(file.facets["version"]))
return versions.pop() if len(versions) == 1 else sorted(versions)
[docs]
def set_version(self) -> None:
"""Set the ``'version'`` facet based on the available data."""
versions: set[str] = set()
for input_dataset in self.input_datasets:
version = self._get_version(input_dataset)
if version:
if isinstance(version, list):
versions.update(version)
else:
versions.add(version)
version = versions.pop() if len(versions) == 1 else sorted(versions)
if version:
self.set_facet("version", version)
for supplementary_ds in self.supplementaries:
supplementary_ds.set_version()
@property
def session(self) -> Session:
"""A :obj:`esmvalcore.config.Session` associated with the dataset."""
if self._session is None:
session_name = f"session-{uuid.uuid4()}"
self._session = CFG.start_session(session_name)
return self._session
@session.setter
def session(self, session: Session | None) -> None:
self._session = session
for supplementary in self.supplementaries:
supplementary._session = session # noqa: SLF001
[docs]
def add_supplementary(self, **facets: FacetValue) -> None:
"""Add an supplementary dataset.
This is a convenience function that will create a copy of the current
dataset, update its facets with the values specified in ``**facets``,
and append it to :obj:`Dataset.supplementaries`. For more control
over the creation of the supplementary dataset, first create a new
:class:`Dataset` describing the supplementary dataset and then append
it to :obj:`Dataset.supplementaries`.
Parameters
----------
**facets
Facets describing the supplementary variable.
"""
if self._is_derived():
facets.setdefault("derive", False)
if self._is_force_derived():
facets.setdefault("force_derivation", False)
supplementary = self.copy(**facets)
supplementary.supplementaries = []
self.supplementaries.append(supplementary)
[docs]
def augment_facets(self) -> None:
"""Add additional facets.
This function will update the dataset with additional facets from
various sources.
"""
self._augment_facets()
for supplementary in self.supplementaries:
supplementary._augment_facets() # noqa: SLF001
@staticmethod
def _pattern_filter(patterns: Iterable[str], name) -> list[str]:
"""Get the subset of the list `patterns` that `name` matches."""
return [pat for pat in patterns if fnmatch.fnmatchcase(name, pat)]
def _get_extra_facets(self) -> dict[str, Any]:
"""Get extra facets of dataset."""
extra_facets: dict[str, Any] = {}
raw_extra_facets = (
self.session["projects"]
.get(self["project"], {})
.get("extra_facets", {})
)
dataset_names = self._pattern_filter(raw_extra_facets, self["dataset"])
for dataset_name in dataset_names:
mips = self._pattern_filter(
raw_extra_facets[dataset_name],
self["mip"],
)
for mip in mips:
variables = self._pattern_filter(
raw_extra_facets[dataset_name][mip],
self["short_name"],
)
for var in variables:
facets = raw_extra_facets[dataset_name][mip][var]
extra_facets.update(facets)
# Add deprecated user-defined extra facets
# TODO: remove in v2.15.0
if os.environ.get("ESMVALTOOL_USE_NEW_EXTRA_FACETS_CONFIG"):
return extra_facets
project_details = load_extra_facets(
self.facets["project"],
tuple(self.session["extra_facets_dir"]),
)
dataset_names = self._pattern_filter(project_details, self["dataset"])
for dataset_name in dataset_names:
mips = self._pattern_filter(
project_details[dataset_name],
self["mip"],
)
for mip in mips:
variables = self._pattern_filter(
project_details[dataset_name][mip],
self["short_name"],
)
for var in variables:
facets = project_details[dataset_name][mip][var]
extra_facets.update(facets)
return extra_facets
def _augment_facets(self) -> None:
extra_facets = self._get_extra_facets()
_augment(self.facets, extra_facets)
if "institute" not in self.facets:
institute = get_institutes(self.facets)
if institute:
self.facets["institute"] = institute
if "activity" not in self.facets:
activity = get_activity(self.facets)
if activity:
self.facets["activity"] = activity
_update_cmor_facets(self.facets)
if self.facets.get("frequency") == "fx":
self.facets.pop("timerange", None)
[docs]
def find_files(self) -> None:
"""Find files.
Look for files and populate the :obj:`Dataset.files` property of
the dataset and its supplementary datasets.
"""
self.augment_facets()
if _isglob(self.facets.get("timerange")):
self._update_timerange()
self._find_files()
for supplementary in self.supplementaries:
supplementary.find_files()
def _find_files(self) -> None:
self.files, self._file_globs = local.find_files(
debug=True,
**self.facets,
)
# If project does not support automatic downloads from ESGF, stop here
if self.facets["project"] not in esgf.facets.FACETS:
return
# 'never' mode: never download files from ESGF and stop here
if self.session["search_esgf"] == "never":
return
# 'when_missing' mode: if files are available locally, do not check
# ESGF
if self.session["search_esgf"] == "when_missing":
try:
check.data_availability(self, log=False)
except InputFilesNotFound:
pass # search ESGF for files
else:
return # use local files
# Local files are not available in 'when_missing' mode or 'always' mode
# is used: check ESGF
local_files = {f.name: f for f in self.files}
search_result = esgf.find_files(**self.facets)
for file in search_result:
if file.name not in local_files:
# Use ESGF files that are not available locally.
self.files.append(file)
else:
# Use ESGF files that are newer than the locally available
# files.
local_file = local_files[file.name]
if "version" in local_file.facets:
if file.facets["version"] > local_file.facets["version"]:
idx = self.files.index(local_file)
self.files[idx] = file
@property
def files(self) -> list[File]:
"""The files associated with this dataset."""
if self._files is None:
self.find_files()
return self._files # type: ignore
@files.setter
def files(self, value: Sequence[File]) -> None:
self._files = value
[docs]
def load(self) -> Cube:
"""Load dataset.
Raises
------
InputFilesNotFound
When no files were found.
Returns
-------
iris.cube.Cube
An :mod:`iris` cube with the data corresponding the the dataset.
"""
input_files = list(self.files)
for supplementary_dataset in self.supplementaries:
input_files.extend(supplementary_dataset.files)
esgf.download(input_files, self.session["download_dir"])
cube = self._load()
supplementary_cubes = []
for supplementary_dataset in self.supplementaries:
supplementary_cube = supplementary_dataset._load() # noqa: SLF001
supplementary_cubes.append(supplementary_cube)
output_file = _get_output_file(self.facets, self.session.preproc_dir)
cubes = preprocess(
[cube],
"add_supplementary_variables",
input_files=input_files,
output_file=output_file,
debug=self.session["save_intermediary_cubes"],
supplementary_cubes=supplementary_cubes,
)
return cubes[0]
def _load(self) -> Cube:
"""Load self.files into an iris cube and return it."""
if not self.files:
lines = [
f"No files were found for {self}",
"locally using glob patterns:",
"\n".join(str(f) for f in self._file_globs or []),
]
if self.session["search_esgf"] != "never":
lines.append("or on ESGF.")
msg = "\n".join(lines)
raise InputFilesNotFound(msg)
output_file = _get_output_file(self.facets, self.session.preproc_dir)
fix_dir_prefix = Path(
self.session._fixed_file_dir, # noqa: SLF001
self._get_joined_summary_facets("_", join_lists=True) + "_",
)
settings: dict[str, dict[str, Any]] = {}
settings["fix_file"] = {
"output_dir": fix_dir_prefix,
"add_unique_suffix": True,
"session": self.session,
**self.facets,
}
settings["load"] = {
"ignore_warnings": get_ignored_warnings(
self.facets["project"],
"load",
),
}
settings["fix_metadata"] = {
"session": self.session,
**self.facets,
}
settings["concatenate"] = {"check_level": self.session["check_level"]}
settings["cmor_check_metadata"] = {
"check_level": self.session["check_level"],
"cmor_table": self.facets["project"],
"mip": self.facets["mip"],
"frequency": self.facets["frequency"],
"short_name": self.facets["short_name"],
}
if "timerange" in self.facets:
settings["clip_timerange"] = {
"timerange": self.facets["timerange"],
}
settings["fix_data"] = {
"session": self.session,
**self.facets,
}
settings["cmor_check_data"] = {
"check_level": self.session["check_level"],
"cmor_table": self.facets["project"],
"mip": self.facets["mip"],
"frequency": self.facets["frequency"],
"short_name": self.facets["short_name"],
}
result = [
file.local_file(self.session["download_dir"])
if isinstance(file, esgf.ESGFFile)
else file
for file in self.files
]
for step, kwargs in settings.items():
result = preprocess(
result,
step,
input_files=self.files,
output_file=output_file,
debug=self.session["save_intermediary_cubes"],
**kwargs,
)
return result[0]
[docs]
def from_ranges(self) -> list[Dataset]:
"""Create a list of datasets from short notations.
This expands the ``'ensemble'`` and ``'sub_experiment'`` facets in the
dataset definition if they are ranges.
For example ``'ensemble'='r(1:3)i1p1f1'`` will be expanded to
three datasets, with ``'ensemble'`` values ``'r1i1p1f1'``,
``'r2i1p1f1'``, ``'r3i1p1f1'``.
Returns
-------
list[Dataset]
The datasets.
"""
datasets = [self]
for key in "ensemble", "sub_experiment":
if key in self.facets:
datasets = [
ds.copy(**{key: value})
for ds in datasets
for value in ds._expand_range(key) # noqa: SLF001
]
return datasets
def _expand_range(self, input_tag: str) -> list[FacetValue]:
"""Expand ranges such as ensemble members or start dates.
Expansion only supports ensembles defined as strings, not lists.
"""
expanded: list[FacetValue] = []
regex = re.compile(r"\(\d+:\d+\)")
def expand_range(input_range) -> None:
match = regex.search(input_range)
if match:
start, end = match.group(0)[1:-1].split(":")
for i in range(int(start), int(end) + 1):
range_ = regex.sub(str(i), input_range, 1)
expand_range(range_)
else:
expanded.append(input_range)
tag = self.facets.get(input_tag, "")
if isinstance(tag, (list, tuple)):
for elem in tag:
if regex.search(elem):
msg = (
f"In {self}: {input_tag} expansion "
f"cannot be combined with {input_tag} lists"
)
raise RecipeError(msg)
expanded.append(tag)
else:
expand_range(tag)
return expanded
def _update_timerange(self) -> None:
"""Update wildcards in timerange with found datetime values.
If the timerange is given as a year, it ensures it's formatted
as a 4-digit value (YYYY).
"""
dataset = self.copy()
dataset.supplementaries = []
dataset.augment_facets()
if "timerange" not in dataset.facets:
self.facets.pop("timerange", None)
return
timerange = self.facets["timerange"]
if not isinstance(timerange, str):
msg = f"timerange should be a string, got '{timerange!r}'"
raise TypeError(msg)
check.valid_time_selection(timerange)
if "*" in timerange:
dataset = self.copy()
dataset.facets.pop("timerange")
dataset.supplementaries = []
check.data_availability(dataset)
intervals = [_get_start_end_date(f) for f in dataset.files]
min_date = min(interval[0] for interval in intervals)
max_date = max(interval[1] for interval in intervals)
if timerange == "*":
timerange = f"{min_date}/{max_date}"
if "*" in timerange.split("/")[0]:
timerange = timerange.replace("*", min_date)
if "*" in timerange.split("/")[1]:
timerange = timerange.replace("*", max_date)
# Make sure that years are in format YYYY
start_date, end_date = timerange.split("/")
timerange = _dates_to_timerange(start_date, end_date)
check.valid_time_selection(timerange)
self.set_facet("timerange", timerange)