Source code for pylag.file_reader

"""
A set of classes for managing access to input data, including the reading in
of data from file.
"""

import numpy as np
from netCDF4 import Dataset
from datetime import timedelta
import glob
import natsort
import logging
try:
    import configparser
except ImportError:
    import ConfigParser as configparser

from pylag.exceptions import PyLagValueError, PyLagRuntimeError
from pylag.data_types_python import DTYPE_FLOAT
from pylag.numerics import get_global_time_step, get_time_direction
from pylag.datetime_reader import get_datetime_reader
from pylag.utils import round_time
from pylag import version


[docs]class FileReader: """Read in and manage access to input grid and field data Objects of type `FileReader` manage all access to input data stored in files on disk. Support for data stored in multiple files covering non-overlapping time intervals is included. On initialisation, the object will scan the specified list of input data files in order to find the file or files that span the specified simulation start date/time. Two datasets are opened - one for the each of the two input time points that straddle the current simulation time point. These are referred to the `first` and `second` data files or time points respectively, with the `first` always corresponding to the time point that is earlier in time than the current simulation time point. Time indices for the two bounding time points are also stored. Through calls to `update_reading_frames` both the indices corresponding to the bounding time points and the input datasets can be updated as the simulation progresses. Support for running simulations either forward or backward in time is included. Parameters ---------- config : ConfigParser Configuration object. data_source : str String indicating what type of data the datetime objects will be associated with. Options are: 'ocean', 'atmosphere', and 'wave'. file_name_reader : FileNameReader Object to assist with reading in file names. dataset_reader : DatasetReader Object to assist with reading in datasets datetime_start : Datetime Simulation start date/time. datetime_end : Datetime Simulation end date/time. Attributes ---------- config : ConfigParser Run configuration object. config_section_name : str String identifying the section of the config where parameters describing the data are listed (e.g. WAVE_DATA). file_name_reader : FileNameReader Object to assist with reading in file names from disk dataset_reader : DatasetReader Object to assist with reading in NetCDF4 datasets datetime_reader : DateTimeReader Object to assist with reading dates/times in input data. data_dir : str Path to the directory containing input data data_file_name_stem : str File name stem, used for building path names grid_metrics_file_name : str File name or path to the grid metrics file grid_file : Dataset NetCDF4 grid metrics dataset data_file_names : list[str] A list of input data files that were found in `data_dir` first_data_file_name : str Name of the data file containing the `first` time point bounding the current point in time. second_data_file_name : str Name of data file containing the `second` time point bounding the current point in time. first_data_file : Dataset Dataset containing the `first` time point bounding the current point in time. second_data_file : Dataset Dataset containing the `second` time point bounding the current point in time. time_direction : int Flag indicating the direction of integration. 1 forward, -1 backward. first_time : array_like[float] Time array containing the `first` time point bounding the current point in time. second_time : array_like[float] Time array containing the `second` time point bounding the current point in time. tidx_first : int Array index corresponding to the `first` time point bounding the current point in time. tidx_second : int Array index corresponding to the `second` time point bounding the current point in time. sim_start_datatime : Datetime The current simulation start date/time. This is not necessarily fixed for the lifetime of the object - it can be updated through calls to `setup_data_access`. This helps support the running of ensemble simulations. sim_end_datatime : Datetime The current simulation end date/time. This is not necessarily fixed for the lifetime of the object - it can be updated through calls to `setup_data_access`. This helps support the running of ensemble simulations. """ def __init__(self, config, data_source, file_name_reader, dataset_reader, datetime_start, datetime_end): self.config = config # Determine the appropriate section config name from the data source if data_source == 'ocean': self.config_section_name = 'OCEAN_DATA' elif data_source == 'atmosphere': self.config_section_name = 'ATMOSPHERE_DATA' elif data_source == 'wave': self.config_section_name = 'WAVE_DATA' else: raise PyLagValueError(f"Unsupported data source `{data_source}. " f"Valid options are `ocean`, `atmosphere` " f"and `wave`.") self.file_name_reader = file_name_reader self.dataset_reader = dataset_reader self.data_dir = self.config.get(self.config_section_name, "data_dir") self.data_file_name_stem = self.config.get(self.config_section_name, "data_file_stem") try: self.grid_metrics_file_name = self.config.get( self.config_section_name, "grid_metrics_file") except configparser.NoOptionError: logger = logging.getLogger(__name__) logger.error(f"A grid metrics file was not given. Please provide " f"one and try again. If one needs to be generated, " f"please take a look at PyLag's online documentation.") raise PyLagRuntimeError(f"A grid metrics file was not listed in " f"the run configuration file. See the log " f"file for more details.") # Time dimension name try: self._time_dim_name = self.config.get(self.config_section_name, "time_dim_name").strip() except configparser.NoOptionError: # Adopt default name `time` self._time_dim_name = "time" # Time direction self.time_direction = int(get_time_direction(config)) # Initialise datetime reader self.datetime_reader = get_datetime_reader(config, self.config_section_name) # Read in grid info. and search for input data files. self._setup_file_access() # Setup data access using the given simulation start and end datetimes self.setup_data_access(datetime_start, datetime_end) def _setup_file_access(self): """ Set up access to input data files This method is called from __init__() during class initialisation. The following instance variables are defined here: _data_file_names - A list holding paths to input data files. _grid_file - NetCDF4 dataset for the model's grid metrics file. """ logger = logging.getLogger(__name__) # First save output file names into a list logger.info('Searching for input data files.') self.data_file_names = self.file_name_reader.get_file_names( self.data_dir, self.data_file_name_stem) # Ensure files were found in the specified directory. if not self.data_file_names: raise PyLagRuntimeError(f"No input files found in " f"location {self.data_dir}.") else: self.n_data_files = len(self.data_file_names) # Log file names logger.info(f"Found {self.n_data_files} input data files in directory " f"`{self.data_dir}'.") logger.info(f"Input data file names " f"are:" + ", ".join(self.data_file_names)) # Open grid metrics file for reading logger.info("Opening grid metrics file for reading.") # Try to read grid data from the grid metrics file try: self.grid_file = self.dataset_reader.read_dataset( self.grid_metrics_file_name) logger.info(f"Opened grid metrics file " f"{self.grid_metrics_file_name}.") try: if self.grid_file.getncattr('pylag-version-id') != \ version.git_revision: logger.warning(f"The grid metrics file was created with a " f"different version of PyLag to that being " f"run. To avoid consistency issues, please " f"update the grid metrics file.") except AttributeError: pass except RuntimeError: logger.error(f"Failed to read grid metrics file " f"`{self.pylag_grid_metrics_file_name}`.") raise PyLagValueError("Failed to read the grid metrics file.") # Initialise data file names to None self.first_data_file_name = None self.second_data_file_name = None # Initialise data files to None self.first_data_file = None self.second_data_file = None
[docs] def setup_data_access(self, start_datetime, end_datetime): """Open data files for reading and initalise all time variables Use the supplied start and end times to establish which input data file(s) contain data spanning the specified start time. Parameters ---------- start_datetime : Datetime Simulation start date/time. end_datetime : Datetime Simulation end date/time. """ logger = logging.getLogger(__name__) logger.info('Setting up input data access.') if not self._check_date_time_is_valid(start_datetime): raise PyLagValueError(f"The start date/time {start_datetime} lies " f"outside of the time period for which input " f"data is available.") if not self._check_date_time_is_valid(end_datetime): raise PyLagValueError(f"The end date/time {end_datetime} lies " f"outside of the time period for which input " f"data is available.") # Save a reference to the simulation start time for time rebasing self.sim_start_datetime = start_datetime self.sim_end_datetime = end_datetime # Determine which data file holds data covering the simulation start logger.info(f"Beginning search for the input data file spanning the " f"specified simulation start point.") # Check for unusable input data ds_first = self.dataset_reader.read_dataset(self.data_file_names[0]) datetimes_first = self.datetime_reader.get_datetime(ds_first) if self.n_data_files == 1 and len(datetimes_first) == 1: logger.info(f"The single input data file found contains just a " f"single time point which is insufficient to perform " f"a simulation.") raise PyLagRuntimeError(f"Only one time point value found in " f"input dataset") self.first_data_file_name = None self.second_data_file_name = None for idx, data_file_name in enumerate(self.data_file_names): logger.info(f"Trying file `{data_file_name}'") ds = self.dataset_reader.read_dataset(data_file_name) data_start_datetime = self.datetime_reader.get_datetime(ds, time_index=0) data_end_datetime = self.datetime_reader.get_datetime(ds, time_index=-1) # Compute time delta time_delta = self.compute_time_delta_between_datasets( data_file_name, forward=True) ds.close() if (data_start_datetime <= self.sim_start_datetime < data_end_datetime + timedelta(seconds=time_delta)): # Set file names depending on time direction if self.time_direction == 1: self.first_data_file_name = data_file_name if self.sim_start_datetime < data_end_datetime: self.second_data_file_name = data_file_name else: self.second_data_file_name = \ self.data_file_names[idx + 1] else: if self.sim_start_datetime == data_start_datetime: self.first_data_file_name = \ self.data_file_names[idx - 1] self.second_data_file_name = data_file_name else: if self.sim_start_datetime <= data_end_datetime: self.first_data_file_name = data_file_name self.second_data_file_name = data_file_name else: self.first_data_file_name = data_file_name self.second_data_file_name = \ self.data_file_names[idx + 1] logger.info(f"Found first initial data file " f"{self.first_data_file_name}.") logger.info(f"Found second initial data file " f"{self.second_data_file_name}.") break else: logger.info(f"Start point not found in file covering the " f"period {data_start_datetime} to " f"{data_end_datetime}") # Ensure the search was a success if (self.first_data_file_name is None) or \ (self.second_data_file_name is None): raise PyLagRuntimeError(f'Could not find an input data file ' f'spanning the specified start time: ' f'{self.sim_start_datetime}.') # Open the data files for reading and initialise the time array self._open_data_files_for_reading() # Set time arrays self._set_time_arrays() # Set time indices for reading frames self._set_time_indices(0.0) # 0s as simulation start # Check the choice of start time and time step yields an even number # of time steps between the start time and the times at which data are # defined. We check against both the first and second times when input # data are defined to ensure the check is robust. time_step = get_global_time_step(self.config) n_steps_before = self.first_time[self.tidx_first] / time_step n_steps_after = self.second_time[self.tidx_second] / time_step if not (n_steps_before.is_integer() and n_steps_after.is_integer()): raise PyLagValueError(f'PyLag requires there to be an integer ' f'number of time steps (measured in seconds) ' f'between the simulation start time and the ' f'times when input data are defined. ' f'Please modify your start time or time ' f'step to ensure this is the case.')
def _check_date_time_is_valid(self, date_time): """ Check the given date lies within the range covered by the input data Parameters ---------- date_time : Datetime Datetime object to check Returns -------- : bool Flag confirming whether the given date time is valid or not """ ds0 = self.dataset_reader.read_dataset(self.data_file_names[0]) data_datetime_0 = self.datetime_reader.get_datetime(ds0, time_index=0) ds0.close() ds1 = self.dataset_reader.read_dataset(self.data_file_names[-1]) data_datetime_1 = self.datetime_reader.get_datetime(ds1, time_index=-1) ds1.close() if data_datetime_0 <= date_time < data_datetime_1: return True return False
[docs] def compute_time_delta_between_datasets(self, data_file_name, forward): """ Compute time delta between datasets If there is only one dataset or the last data file is given a value of zero is returned. Otherwise, time delta is the time difference in seconds between the last (first) time point in the data file and the first (last) time point in the next (previous) data file, as stored in `self.data_file_names`. The forward argument is used to determine if time delta is computed as the difference between the next or last files. Parameters ---------- data_file_name : str Dataset file name. forward : bool If True, compute time delta between the last time point in the current file and the first time point in the next file. If False, compute time delta between the first time point in the current file and the last time point in the previous file. Returns ------- time_delta : float The absolute time difference in seconds. """ if self.n_data_files == 1: # There is only one file or we are searching the last file in the # set so we set time_delta to zero. return 0.0 # Array index of the given data file file_idx_a = self.data_file_names.index(data_file_name) # Set other indices depending on the value of `forward` if forward: if file_idx_a == self.n_data_files - 1: # Last file in list - return zero return 0.0 file_idx_b = file_idx_a + 1 time_index_a = -1 time_index_b = 0 else: if file_idx_a == 0: # First file in list - return zero return 0.0 file_idx_b = file_idx_a - 1 time_index_a = 0 time_index_b = -1 ds_a = self.dataset_reader.read_dataset(data_file_name) datetime_a = self.datetime_reader.get_datetime(ds_a, time_index=time_index_a) ds_a.close() ds_b = self.dataset_reader.read_dataset( self.data_file_names[file_idx_b]) datetime_b = self.datetime_reader.get_datetime(ds_b, time_index=time_index_b) ds_b.close() return abs((datetime_b - datetime_a).total_seconds())
[docs] def update_reading_frames(self, time): """ Update input datasets and reading frames Update input datasets and reading frames using the given `time`, which is the current simulation time in seconds. Parameters ---------- time : float Time """ # Compute time delta time_delta = self.compute_time_delta_between_datasets( self.first_data_file_name, forward=True) # Load data file covering the first time point, if necessary first_file_idx = None if self.time_direction == 1: if time < self.first_time[0]: first_file_idx = self.data_file_names.index( self.first_data_file_name) - 1 elif time >= self.first_time[-1] + time_delta: first_file_idx = self.data_file_names.index( self.first_data_file_name) + 1 else: if time <= self.first_time[0]: first_file_idx = self.data_file_names.index( self.first_data_file_name) - 1 elif time > self.first_time[-1] + time_delta: first_file_idx = self.data_file_names.index( self.first_data_file_name) + 1 if first_file_idx is not None: try: self.first_data_file_name = self.data_file_names[first_file_idx] except IndexError: logger = logging.getLogger(__name__) logger.error(f'Failed to find the next input data file.') raise PyLagRuntimeError(f'Failed to find the next input ' f'data file.') self._open_first_data_file_for_reading() self._set_first_time_array() # Compute time delta time_delta = self.compute_time_delta_between_datasets( self.second_data_file_name, forward=False) # Load data file covering the second time point, if necessary second_file_idx = None if self.time_direction == 1: if time < self.second_time[0] - time_delta: second_file_idx = self.data_file_names.index( self.second_data_file_name) - 1 elif time >= self.second_time[-1]: second_file_idx = self.data_file_names.index( self.second_data_file_name) + 1 else: if time <= self.second_time[0] - time_delta: second_file_idx = self.data_file_names.index( self.second_data_file_name) - 1 elif time > self.second_time[-1]: second_file_idx = self.data_file_names.index( self.second_data_file_name) + 1 if second_file_idx is not None: try: self.second_data_file_name = \ self.data_file_names[second_file_idx] except IndexError: logger = logging.getLogger(__name__) logger.error(f'Failed to find the next input data file.') raise PyLagRuntimeError(f'Failed to find the next input ' f'data file.') self._open_second_data_file_for_reading() self._set_second_time_array() # Update time indices self._set_time_indices(time)
[docs] def get_dimension_variable(self, var_name): """ Get the size of the NetCDF4 dimension variable Parameters ---------- var_name : str The name of the dimension variable. Returns ------- : int The size of the dimensions variable. """ return len(self.grid_file.dimensions[var_name])
[docs] def get_grid_variable(self, var_name): """ Get the NetCDF4 grid variable Parameters ---------- var_name : str The name of the variable. Returns ------- : NDArray The the grid variable. """ return np.ascontiguousarray( self.grid_file.variables[var_name][:].squeeze())
[docs] def get_time_at_last_time_index(self): """ Get the time and the last time index Returns ------- : float The time at the last time index. """ return self.first_time[self.tidx_first]
[docs] def get_time_at_next_time_index(self): """ Get the time and the next time index Returns ------- : float The time at the next time index. """ return self.second_time[self.tidx_second]
[docs] def get_grid_variable_dimensions(self, var_name): """ Get the variable dimensions Parameters ---------- var_name : str The name of the variable. Returns ------- : tuple(str) The variable's dimensions """ return self.grid_file.variables[var_name].dimensions
[docs] def get_variable_dimensions(self, var_name, include_time=True): """ Get the variable dimensions Parameters ---------- var_name : str The name of the variable. include_time : bool If False, the time dimension is not included in the dimensions. Optional, default: True. Returns ------- : tuple(str) The variable's dimensions """ if include_time: return self.first_data_file.variables[var_name].dimensions else: dimensions = self.first_data_file.variables[var_name].dimensions dimensions = list(dimensions) dimensions.remove(self._time_dim_name) return tuple(dimensions)
[docs] def get_variable_shape(self, var_name, include_time=True): """ Get the variable shape Parameters ---------- var_name : str The name of the variable. include_time : bool If False, the time dimension is not included in the shape. Optional, default: True. Returns ------- : tuple(int) The variable's shape """ if include_time: return self.first_data_file.variables[var_name].shape else: dimensions = self.get_variable_dimensions(var_name) time_dim_idx = dimensions.index(self._time_dim_name) shape = list(self.first_data_file.variables[var_name].shape) shape.pop(time_dim_idx) return tuple(shape)
[docs] def get_time_dependent_variable_at_last_time_index(self, var_name): """ Get the variable at the last time index Parameters ---------- var_name : str The name of the variable. Returns ------- : NDArray The variable array """ # Get time dimension index var_dims = self.get_variable_dimensions(var_name) time_dim_idx = var_dims.index(self._time_dim_name) # Get variable nc_var = self.first_data_file.variables[var_name] var = self._get_time_slice(nc_var, time_dim_idx, self.tidx_first) if np.ma.isMaskedArray(var): var = var.filled(0.0) return np.ascontiguousarray(var)
[docs] def get_time_dependent_variable_at_next_time_index(self, var_name): """ Get the variable at the next time index Parameters ---------- var_name : str The name of the variable. Returns ------- : NDArray The variable array """ # Get time dimension index var_dims = self.get_variable_dimensions(var_name) time_dim_idx = var_dims.index(self._time_dim_name) # Get variable nc_var = self.second_data_file.variables[var_name] var = self._get_time_slice(nc_var, time_dim_idx, self.tidx_second) if np.ma.isMaskedArray(var): var = var.filled(0.0) return np.ascontiguousarray(var)
[docs] def get_mask_at_last_time_index(self, var_name): """ Get the mask at the last time index Parameters ---------- var_name : str The name of the variable. Returns ------- : NDArray The variable mask """ # Get time dimension index var_dims = self.get_variable_dimensions(var_name) time_dim_idx = var_dims.index(self._time_dim_name) # Get variable nc_var = self.first_data_file.variables[var_name] var = self._get_time_slice(nc_var, time_dim_idx, self.tidx_first) if np.ma.isMaskedArray(var): return np.ascontiguousarray(var.mask) raise PyLagRuntimeError(f'Variable {var_name} is not a masked array.')
[docs] def get_mask_at_next_time_index(self, var_name): """ Get the mask at the next time index Parameters ---------- var_name : str The name of the variable. Returns ------- : NDArray The variable mask """ # Get time dimension index var_dims = self.get_variable_dimensions(var_name) time_dim_idx = var_dims.index(self._time_dim_name) # Get variable nc_var = self.second_data_file.variables[var_name] var = self._get_time_slice(nc_var, time_dim_idx, self.tidx_second) if np.ma.isMaskedArray(var): return np.ascontiguousarray(var.mask) raise PyLagRuntimeError(f'Variable {var_name} is not a masked array.')
def _get_time_slice(self, nc_var, time_dim_idx: int, time_idx: int): """ Get the variable at the specified time index Parameters ---------- nc_var : NetCDF4.Variable The NetCDF4 variable time_dim_idx : int The time dimension index time_idx : int The time index Returns ------- : NDArray The variable array """ n_dims = len(nc_var.shape) if n_dims == 1: return nc_var[time_idx] elif n_dims == 2: if time_dim_idx == 0: return nc_var[time_idx, :] elif time_dim_idx == 1: return nc_var[:, time_idx] elif n_dims == 3: if time_dim_idx == 0: return nc_var[time_idx, :, :] elif time_dim_idx == 1: return nc_var[:, time_idx, :] elif time_dim_idx == 2: return nc_var[:, :, time_idx] elif n_dims == 4: if time_dim_idx == 0: return nc_var[time_idx, :, :, :] elif time_dim_idx == 1: return nc_var[:, time_idx, :, :] elif time_dim_idx == 2: return nc_var[:, :, time_idx, :] elif time_dim_idx == 3: return nc_var[:, :, :, time_idx] else: raise PyLagRuntimeError('Variable has more than 4 dimensions - such variables ' 'are not supported.') def _open_data_files_for_reading(self): """Open the first and second data files for reading """ self._open_first_data_file_for_reading() self._open_second_data_file_for_reading() def _open_first_data_file_for_reading(self): logger = logging.getLogger(__name__) # Close the first data file if one has been opened previously if self.first_data_file: self.first_data_file.close() # Open the first data file try: self.first_data_file = self.dataset_reader.read_dataset( self.first_data_file_name) logger.info(f'Opened first data file {self.first_data_file_name} ' f'for reading.') except RuntimeError: logger.error(f'Could not open data file ' f'{self.first_data_file_name}.') raise PyLagRuntimeError('Could not open data file for reading.') def _open_second_data_file_for_reading(self): logger = logging.getLogger(__name__) # Close the second data file if one has been opened previously if self.second_data_file: self.second_data_file.close() # Open the second data file try: self.second_data_file = self.dataset_reader.read_dataset( self.second_data_file_name) logger.info(f'Opened second data file {self.second_data_file_name} ' f'for reading.') except RuntimeError: logger.error(f'Could not open data file ' f'{self.second_data_file_name}.') raise PyLagRuntimeError('Could not open data file for reading.') def _set_time_arrays(self): self._set_first_time_array() self._set_second_time_array() def _set_first_time_array(self): # First time array # --------------- first_datetime = self.datetime_reader.get_datetime(self.first_data_file) # Convert to seconds using datetime_start as a reference point first_time_seconds = [] for time in first_datetime: first_time_seconds.append((time - self.sim_start_datetime).total_seconds()) self.first_time = np.array(first_time_seconds, dtype=DTYPE_FLOAT) def _set_second_time_array(self): # Second time array # ---------------- second_datetime = self.datetime_reader.get_datetime( self.second_data_file) # Convert to seconds using datetime_start as a reference point second_time_seconds = [] for time in second_datetime: second_time_seconds.append((time - self.sim_start_datetime).total_seconds()) self.second_time = np.array(second_time_seconds, dtype=DTYPE_FLOAT) def _compute_first_dataset_time_delta(self, idx): # Time delta between two time points in the first time array # ---------------------------------------------------------- if idx < len(self.first_time) - 1: return self.first_time[idx+1] - self.first_time[idx] else: return self.compute_time_delta_between_datasets( self.first_data_file_name, forward=True) def _compute_second_dataset_time_delta(self, idx): # Time delta between two time points in the second time array # ----------------------------------------------------------- if idx > 0: return self.second_time[idx] - self.second_time[idx-1] else: return self.compute_time_delta_between_datasets( self.second_data_file_name, forward=False) def _set_time_indices(self, time): # Set first time index # ------------------- n_times = len(self.first_time) tidx_first = -1 if self.time_direction == 1: for i in range(0, n_times): t_delta = time - self.first_time[i] t_delta_dataset = self._compute_first_dataset_time_delta(i) if 0.0 <= t_delta < t_delta_dataset: tidx_first = i break else: for i in range(0, n_times): t_delta = time - self.first_time[i] t_delta_dataset = self._compute_first_dataset_time_delta(i) if 0.0 < t_delta <= t_delta_dataset: tidx_first = i break if tidx_first == -1: logger = logging.getLogger(__name__) logger.info(f'The provided time {time}s lies outside of the ' f'range for which there exists input data: ' f'{self.first_time[0]} to {self.first_time[-1]}s') raise PyLagValueError('Time out of range.') # Set second time index # --------------------- n_times = len(self.second_time) tidx_second = -1 if self.time_direction == 1: for i in range(0, n_times): t_delta = self.second_time[i] - time t_delta_dataset = self._compute_second_dataset_time_delta(i) if 0.0 < t_delta <= t_delta_dataset: tidx_second = i break else: for i in range(0, n_times): t_delta = self.second_time[i] - time t_delta_dataset = self._compute_second_dataset_time_delta(i) if 0.0 <= t_delta < t_delta_dataset: tidx_second = i break if tidx_second == -1: logger = logging.getLogger(__name__) logger.info(f'The provided time {time}s lies outside of the range ' f'for which there exists input data: ' f'{self.second_time[0]} to {self.second_time[-1]}s') raise PyLagValueError('Time out of range.') # Save time indices self.tidx_first = tidx_first self.tidx_second = tidx_second
# Helper classes to assist in reading file names ################################################
[docs]class FileNameReader: """ Abstract base class for FileNameReaders File name readers are responsible for reading in and sorting file names, which will usually be stored on disk. An abstract base class was added in order to assist with testing FileReader's behaviour under circumstances when all dependencies on reading data from disk have been removed. """
[docs] def get_file_names(self, file_dir, file_name_stem): """ Get file names Return a list of file names Parameters ---------- file_dir : str Path to the input files. file_name_stem : str Unique string identifying valid input files. Returns ------- : list[str] A list of file names. """ raise NotImplementedError
[docs]class DiskFileNameReader(FileNameReader): """ Disk file name reader which reads in NetCDF file names from disk Derived class for reading in file names from disk. """
[docs] def get_file_names(self, file_dir, file_name_stem): """ Get file names Read file names from disk. A natural sorting algorithm is applied. Parameters ---------- file_dir : str Path to the input files. file_name_stem : str Unique string identifying valid input files. Returns ------- : list[str] A list of file names. """ return natsort.natsorted(glob.glob(f'{file_dir}/{file_name_stem}*.nc'))
# Helper classes to assist in reading datasets ##############################################
[docs]class DatasetReader: """ Abstract base class for DatasetReaders DatasetReaders are responsible for opening and reading single Datasets. Abstract base class introduced to assist with testing objects of type FileReader. """
[docs] def read_dataset(self, file_name, set_auto_mask_and_scale=True): """ Open a dataset for reading Parameters ---------- file_name : str The name or path of the file to open set_auto_mask_and_scale : bool Flag for masking Returns ------- : N/A A dataset. """ raise NotImplementedError
[docs]class NetCDFDatasetReader(DatasetReader): """ NetCDF dataset reader Return a NetCDF4 dataset object. """
[docs] def read_dataset(self, file_name, set_auto_maskandscale=True): """ Open a dataset for reading Parameters ---------- file_name : str The name or path of the file to open set_auto_maskandscale : bool Flag for masking Returns ------- : NetCDF4 Dataset A NetCDF4 dataset. """ ds = Dataset(file_name, 'r') ds.set_auto_maskandscale(set_auto_maskandscale) return ds
__all__ = ["FileReader", "FileNameReader", "DiskFileNameReader", "DatasetReader", "NetCDFDatasetReader"]