Source code for pylag.simulator

"""
This module contains classes that can be used to manage the running of
PyLag simulations in serial mode. All Simulators inherit from a common
Abstract base class `Simulator`. This structure was introduced in order
to make it possible to run different types of simulation (e.g. a trace,
or LCS calculation) through configuration switches.

See Also
--------
pylag.parallel.simulator - Simulators for parallel execution
"""

from __future__ import print_function

import logging
from progressbar import ProgressBar

from pylag.time_manager import TimeManager
from pylag.particle_initialisation import get_initial_particle_state_reader
from pylag.restart import RestartFileCreator
from pylag.netcdf_logger import NetCDFLogger
from pylag.exceptions import PyLagValueError, PyLagRuntimeError

from pylag.model_factory import get_model


[docs]def get_simulator(config): """ Factory method for PyLag simulators Parameters ---------- config : ConfigParser PyLag configuraton object Returns ------- : pylag.simulator.Simulator Object of type Simulator """ if config.get("SIMULATION", "simulation_type") == "trace": return TraceSimulator(config) else: raise PyLagValueError('Unsupported simulation type.')
[docs]class Simulator(object): """ Simulator Abstract base class for PyLag simulators. """
[docs] def run(self): """ Run a PyLag simulation """ raise NotImplementedError
[docs]class TraceSimulator(Simulator): """ Trace simulator Simulator for tracing particle pathlines through time. Trace simulators can perform forward or backward in time integrations. Parameters ---------- config : ConfigParser PyLag configuraton object """ def __init__(self, config): # Configuration object self._config = config # Time manager - for controlling particle release events, time stepping self.time_manager = TimeManager(self._config) # Model object self.model = get_model(self._config, self.time_manager.datetime_start, self.time_manager.datetime_end) # Initial particle state reader self.initial_particle_state_reader = get_initial_particle_state_reader(config) # Restart creator self.restart_creator = None if self._config.getboolean('RESTART', 'create_restarts'): self.restart_creator = RestartFileCreator(config) # Data logger self.data_logger = None
[docs] def run(self): """ Run a simulation Run a single or multiple integrations according to options set out in the run configuration file. Returns ------- : None """ # For logging logger = logging.getLogger(__name__) # Read in particle initial positions from file - these will be used to # create the initial particle set. n_particles, group_ids, x1_positions, x2_positions, x3_positions = \ self.initial_particle_state_reader.get_particle_data() if n_particles == len(group_ids): logger.info(f'Particle seed contains {n_particles} particles.') else: logger.error(f'Error reading particle initial positions from ' f'file. The number of particles specified in the file ' f'is {n_particles}. The actual number found while ' f'parsing the file was {len(group_ids)}.') raise PyLagRuntimeError('Error encountered while reading the ' 'particle initial positions file. See ' 'the log for more details.') # Initialise particle arrays self.model.set_particle_data(group_ids, x1_positions, x2_positions, x3_positions) # Run the ensemble run_simulation = True while run_simulation: # Read data into arrays self.model.read_input_data(self.time_manager.time) # Seed the model self.model.seed(self.time_manager.time) # Create data logger file_name = ''.join([self._config.get('GENERAL', 'output_file'), f'_{self.time_manager.current_release}']) start_datetime = self.time_manager.datetime_start grid_names = self.model.get_grid_names() self.data_logger = NetCDFLogger(self._config, file_name, start_datetime, n_particles, grid_names) # Write particle group ids to file self.data_logger.write_group_ids(group_ids) # Write initial state to file particle_diagnostics = self.model.get_diagnostics(self.time_manager.time) self.data_logger.write(self.time_manager.time, particle_diagnostics) # The main update loop print(f'\nStarting ensemble member ' f'{self.time_manager.current_release} ...') print('Progress:') pbar = ProgressBar(maxval=abs(self.time_manager.time_end), term_width=50).start() while abs(self.time_manager.time) < abs(self.time_manager.time_end): self.model.update(self.time_manager.time) self.time_manager.update_current_time() # Check on status of reading frames and update if necessary # Communicate updated arrays to the data reader if these are # out of date. self.model.read_input_data(self.time_manager.time) if self.time_manager.write_output_to_file() == 1: particle_diagnostics = self.model.get_diagnostics( self.time_manager.time) self.data_logger.write(self.time_manager.time, particle_diagnostics) if self.time_manager.sync_data_to_disk() == 1: self.data_logger.sync() if self.restart_creator: if self.time_manager.create_restart_file() == 1: file_name_stem = f'restart_{self.time_manager.current_release}' datetime_current = self.time_manager.datetime_current particle_data = self.model.get_particle_data() self.restart_creator.create(file_name_stem, n_particles, datetime_current, particle_data) pbar.update(abs(self.time_manager.time)) pbar.finish() # Close the current data logger self.data_logger.close() # Run another simulation? if self.time_manager.new_simulation(): run_simulation = True # Set up data access for the new simulation self.model.setup_input_data_access( self.time_manager.datetime_start, self.time_manager.datetime_end) else: run_simulation = False
__all__ = ['Simulator', 'TraceSimulator', 'get_simulator']