import numpy as np
import mlmc.random.correlated_field as cf
import gstools
from typing import List
from mlmc.sim.simulation import Simulation
from mlmc.quantity.quantity_spec import QuantitySpec
from mlmc.level_simulation import LevelSimulation
[docs]
def create_corr_field(model='gauss', corr_length=0.1, dim=1, log=True, sigma=1, mode_no=1000):
"""
Create a correlated random field wrapper (GSTools model -> mlmc.random.correlated_field.Field).
:param model: str, covariance model name. Supported strings: 'gauss' (default), 'exp',
'TPLgauss', 'TPLexp', 'TPLStable'.
:param corr_length: float, correlation length (len_scale) supplied to the GSTools model.
:param dim: int, spatial dimension of the covariance model.
:param log: bool, if True the field will be considered in log-space when sampling.
:param sigma: float, multiplicative sigma parameter for the SRF generator.
:param mode_no: int, number of Fourier modes used by GSTools SRF.
:return: cf.Field — an mlmc.random.correlated_field.Field wrapping a GSTools SRF.
"""
if model == 'exp':
model = gstools.Exponential(dim=dim, len_scale=corr_length)
elif model == 'TPLgauss':
model = gstools.TPLGaussian(dim=dim, len_scale=corr_length)
elif model == 'TPLexp':
model = gstools.TPLExponential(dim=dim, len_scale=corr_length)
elif model == 'TPLStable':
model = gstools.TPLStable(dim=dim, len_scale=corr_length)
else:
model = gstools.Gaussian(dim=dim, len_scale=corr_length)
return cf.Field('conductivity', cf.GSToolsSpatialCorrelatedField(model, log=log, sigma=sigma, mode_no=mode_no))
[docs]
class ShootingSimulation2D(Simulation):
"""
Example 2D shooting simulation used with MLMC sampler.
Implements:
- level_instance(...) to create per-level LevelSimulation config,
- calculate(...) static method used to produce a (fine, coarse) sample pair,
- result_format() describing output QuantitySpec.
"""
def __init__(self, config):
"""
Initialize the simulation factory.
:param config: dict containing base simulation configuration. Expected keys:
- 'start_position': np.ndarray shape (2,)
- 'start_velocity': np.ndarray shape (2,)
- 'area_borders': np.ndarray [xmin, xmax, ymin, ymax]
- 'max_time': float
- 'complexity': float used by n_ops_estimate
- 'fields_params': dict passed to create_corr_field()
"""
super().__init__()
self.config = config
# If True, a sample workspace will be created (not required here)
self.need_workspace: bool = False
[docs]
def level_instance(self, fine_level_params: List[float], coarse_level_params: List[float]) -> LevelSimulation:
"""
Construct LevelSimulation for given fine/coarse parameters.
This method mutates a copy of self.config into a level-specific config and
returns a LevelSimulation that will call ShootingSimulation2D.calculate to
generate samples for this level.
:param fine_level_params: list-like, typically [fine_step]
:param coarse_level_params: list-like, typically [coarse_step]
:return: LevelSimulation configured for this level
"""
# Update level-specific fields in self.config
self.config["fine"] = {}
self.config["coarse"] = {}
self.config["fine"]["step"] = fine_level_params[0]
self.config["coarse"]["step"] = coarse_level_params[0]
self.config["res_format"] = self.result_format()
# compute number of elements per level from complexity and step sizes
self.config["fine"]["n_elements"] = int(self.config["complexity"] / self.config["fine"]["step"])
if self.config["coarse"]["step"] > 0:
self.config["coarse"]["n_elements"] = int(self.config["complexity"] / self.config["coarse"]["step"])
else:
self.config["coarse"]["n_elements"] = 0
return LevelSimulation(config_dict=self.config, calculate=ShootingSimulation2D.calculate,
task_size=self.n_ops_estimate(fine_level_params[0]))
[docs]
@staticmethod
def calculate(config, seed):
"""
Build correlated fields and produce paired fine/coarse simulation results.
This static function is intended to be executed by the LevelSimulation worker.
It:
- creates two independent correlated fields (x and y components),
- sets evaluation points,
- samples the fields and splits the realization into fine and coarse parts,
- simulates the projectile dynamics for fine/coarse inputs.
:param config: dict, level-specific configuration produced by level_instance().
:param seed: int, RNG seed (currently not used directly; GSTools RNG is internal).
:return: tuple (fine_result, coarse_result)
- fine_result: np.ndarray or scalar describing the fine-level output
- coarse_result: np.ndarray or scalar describing the coarse-level output
"""
# Create independent correlated random fields for X and Y forcing
field_x = create_corr_field(**config['fields_params'])
field_y = create_corr_field(**config['fields_params'])
# create concatenated points and get number of fine points
points, n_fine_points = ShootingSimulation2D.create_points(config)
field_x.set_points(points)
field_y.set_points(points)
# sample and split into fine and coarse inputs
fine_input_sample, coarse_input_sample = ShootingSimulation2D.generate_random_sample(
field_x, field_y, coarse_step=config["coarse"]["step"], n_fine_elements=n_fine_points
)
# run dynamics for fine and coarse inputs
fine_res = ShootingSimulation2D._run_sample(config, fine_input_sample)
coarse_res = ShootingSimulation2D._run_sample(config, coarse_input_sample)
return fine_res, coarse_res
@staticmethod
def _run_sample(config, rnd_input_samples):
"""
Run the explicit Euler integrator for the 2D shooting model.
Dynamics:
X_{k+1} = X_k + dt * V_k
V_{k+1} = V_k + dt * F_k
The method returns the final position vector X (or [nan, nan] if the projectile exits area_borders).
:param config: dict, level-specific simulation configuration.
:param rnd_input_samples: array-like with shape (n_elements, 2) or (n_elements,) depending on caller.
:return: np.ndarray shape (2,) representing final [x, y] or [nan, nan] if out-of-bounds.
"""
n_elements = len(rnd_input_samples)
# Use copies so we don't mutate config objects
X = np.array(config["start_position"], dtype=float).copy()
V = np.array(config['start_velocity'], dtype=float).copy()
# compute time step if there are elements
if n_elements != 0:
dt = config['max_time'] / n_elements
# integrate step-by-step
for i in range(n_elements):
# update position
X = X + dt * V
# update velocity: rnd_input_samples[i] must be 2-vector (Fx, Fy)
V = V + dt * rnd_input_samples[i]
x = X[0]
y = X[1]
# if projectile leaves the bounding box, return NaNs
if x > config['area_borders'][1] or x < config['area_borders'][0] or \
y > config['area_borders'][3] or y < config['area_borders'][2]:
X = np.array([np.nan, np.nan])
break
time = dt * (i + 1)
if time >= config['max_time']:
break
return X
[docs]
@staticmethod
def create_points(config):
"""
Build concatenated evaluation points for the correlated fields (fine first, then coarse).
The points are 1D coordinates used for the GSTools SRF evaluation; the first
n_fine rows correspond to the fine mesh, the remainder to the coarse mesh.
:param config: dict with 'fine' and 'coarse' sub-dicts containing 'n_elements'.
:return: tuple (points, n_fine_elements)
- points: np.ndarray shape (n_fine + n_coarse, 1)
- n_fine_elements: int number of fine-level points
"""
n_fine_elements = config["fine"]["n_elements"]
n_coarse_elements = config["coarse"]["n_elements"]
assert n_fine_elements > n_coarse_elements
points = np.empty((n_fine_elements + n_coarse_elements, 1))
points[:, 0] = np.concatenate((
np.linspace(0, config["start_velocity"][0] * config["max_time"], n_fine_elements),
np.linspace(0, config["start_velocity"][0] * config["max_time"], n_coarse_elements)
))
return points, n_fine_elements
[docs]
@staticmethod
def generate_random_sample(field_x, field_y, coarse_step, n_fine_elements):
"""
Sample two correlated fields (x and y components) and split the realization into fine and coarse parts.
:param field_x: cf.Field for x-component (already had set_points called).
:param field_y: cf.Field for y-component (already had set_points called).
:param coarse_step: float, coarse-level step size (if 0 there is no coarse part).
:param n_fine_elements: int, how many samples belong to the fine-level portion.
:return: tuple (fine_input_sample, coarse_input_sample)
- fine_input_sample: np.ndarray shape (n_fine_elements, 2) with x and y forcing
- coarse_input_sample: np.ndarray shape (n_coarse_elements, 2) or empty array if coarse_step==0
"""
field_sample_x = field_x.sample()
field_sample_y = field_y.sample()
fine_input_sample = np.empty((n_fine_elements, 2))
fine_input_sample[:, 0] = field_sample_x[:n_fine_elements]
fine_input_sample[:, 1] = field_sample_y[:n_fine_elements]
coarse_input_sample = np.empty((len(field_sample_x) - n_fine_elements, 2))
if coarse_step != 0:
coarse_input_sample[:, 0] = field_sample_x[n_fine_elements:]
coarse_input_sample[:, 1] = field_sample_y[n_fine_elements:]
return fine_input_sample, coarse_input_sample
[docs]
def n_ops_estimate(self, step):
"""
Heuristic estimate of computational expense for a single fine-level sample.
:param step: float, fine-level step size.
:return: float, cost estimate used by MLMC scheduler.
"""
return (1 / step) ** self.config['complexity'] * np.log(max(1 / step, 2.0))