Source code for mlmc.estimator

import numpy as np
import scipy.stats as st
import scipy.integrate as integrate
import mlmc.quantity.quantity_estimate as qe
import mlmc.tool.simple_distribution
from mlmc.quantity.quantity_estimate import mask_nan_samples
from mlmc.quantity.quantity_types import ScalarType
from mlmc.plot import plots
from mlmc.quantity.quantity_spec import ChunkSpec


[docs] class Estimate: """ A wrapper class for moment estimation, PDF approximation, and related MLMC post-processing. Provides utility methods to: - Estimate statistical moments, variances, and covariances - Perform regression-based variance estimation - Conduct bootstrap resampling - Construct approximate probability density functions - Visualize and analyze MLMC variance and sample distributions """
[docs] def __init__(self, quantity, sample_storage, moments_fn=None): """ Initialize the Estimate instance. :param quantity: mlmc.quantity.Quantity Quantity object representing the stochastic quantity of interest. :param sample_storage: mlmc.sample_storage.SampleStorage Storage containing MLMC samples for each level. :param moments_fn: callable, optional Function defining the statistical moments to be estimated. """ self._quantity = quantity self._sample_storage = sample_storage self._moments_fn = moments_fn self._moments_mean = None
@property def quantity(self): """Return the current Quantity object.""" return self._quantity @quantity.setter def quantity(self, quantity): """Set a new Quantity object.""" self._quantity = quantity @property def n_moments(self): """Return the number of moment functions defined.""" return self._moments_fn.size @property def moments_mean_obj(self): """Return the most recently computed mean of the moments.""" return self._moments_mean @moments_mean_obj.setter def moments_mean_obj(self, moments_mean): """ Set the estimated mean of the moments. :param moments_mean: mlmc.quantity.quantity.QuantityMean Object containing mean and variance of the estimated moments. :raises TypeError: If the object is not an instance of QuantityMean. """ if not isinstance(moments_mean, mlmc.quantity.quantity.QuantityMean): raise TypeError self._moments_mean = moments_mean def estimate_moments(self, moments_fn=None): """ Estimate the mean and variance of the defined moment functions. :param moments_fn: callable, optional Function to compute statistical moments. If None, uses the stored function. :return: tuple (moment_means, moment_variances) Arrays of length n_moments representing estimated means and variances. """ if moments_fn is None: moments_fn = self._moments_fn moments_mean = qe.estimate_mean(qe.moments(self._quantity, moments_fn)) self.moments_mean_obj = moments_mean return moments_mean.mean, moments_mean.var def estimate_covariance(self, moments_fn=None): """ Estimate the covariance matrix and its variance from MLMC samples. :param moments_fn: callable, optional Function defining moment evaluations. If None, uses the stored one. :return: tuple (covariance_matrix, covariance_variance) """ if moments_fn is None: moments_fn = self._moments_fn cov_mean = qe.estimate_mean(qe.covariance(self._quantity, moments_fn)) return cov_mean.mean, cov_mean.var def estimate_diff_vars_regression(self, n_created_samples, moments_fn=None, raw_vars=None): """ Estimate variances using a linear regression model. Assumes that variance increases with moment order. Typically, only two moments with the highest average variance are used. :param n_created_samples: array-like Number of created samples on each MLMC level. :param moments_fn: callable, optional Moment evaluation function. :param raw_vars: ndarray, optional Precomputed raw variance estimates. :return: tuple (variance_array, n_ops_estimate) """ self._n_created_samples = n_created_samples if raw_vars is None: if moments_fn is None: moments_fn = self._moments_fn raw_vars, n_samples = self.estimate_diff_vars(moments_fn) sim_steps = np.squeeze(self._sample_storage.get_level_parameters()) vars = self._all_moments_variance_regression(raw_vars, sim_steps) return vars, self._sample_storage.get_n_ops() def estimate_diff_vars(self, moments_fn=None): """ Estimate the variance of moment differences between consecutive MLMC levels. :param moments_fn: callable, optional Moment evaluation functions. :return: tuple (diff_variance, n_samples) diff_variance - shape (L, R): variances of differences of moments. n_samples - shape (L,): number of samples per level. """ moments_mean = qe.estimate_mean(qe.moments(self._quantity, moments_fn)) return moments_mean.l_vars, moments_mean.n_samples def _all_moments_variance_regression(self, raw_vars, sim_steps): """Apply variance regression across all moment functions.""" reg_vars = raw_vars.copy() n_moments = raw_vars.shape[1] for m in range(1, n_moments): reg_vars[:, m] = self._moment_variance_regression(raw_vars[:, m], sim_steps) assert np.allclose(reg_vars[:, 0], 0.0) return reg_vars def _moment_variance_regression(self, raw_vars, sim_steps): """ Perform regression-based smoothing of level variance for a single moment. Model: log(var_l) = A + B * log(h_l) + C * log^2(h_l) :param raw_vars: ndarray, shape (L,) Raw variance estimates of a single moment. :param sim_steps: ndarray, shape (L,) Simulation step sizes or level parameters. :return: ndarray, shape (L,) Smoothed variance estimates. """ L, = raw_vars.shape L1 = L - 1 if L < 3 or np.allclose(raw_vars, 0): return raw_vars W = 1.0 / np.sqrt(self._variance_of_variance()) W = W[1:] W = np.ones((L - 1,)) K = 3 X = np.zeros((L1, K)) log_step = np.log(sim_steps[1:]) X[:, 0] = np.ones(L1) X[:, 1] = np.full(L1, log_step) X[:, 2] = np.full(L1, log_step ** 2) WX = X * W[:, None] log_vars = np.log(raw_vars[1:]) log_vars = W * log_vars params, res, rank, sing_vals = np.linalg.lstsq(WX, log_vars) new_vars = raw_vars.copy() new_vars[1:] = np.exp(np.dot(X, params)) return new_vars def _variance_of_variance(self, n_samples=None): """ Approximate the variance of log(X), where X follows a chi-squared distribution. Used to approximate the uncertainty of variance estimates. :param n_samples: array-like, optional Number of samples per level. :return: ndarray Variance of variance estimates per level. """ if n_samples is None: n_samples = self._n_created_samples if hasattr(self, "_saved_var_var"): ns, var_var = self._saved_var_var if np.sum(np.abs(np.array(ns) - np.array(n_samples))) == 0: return var_var vars = [] for ns in n_samples: df = ns - 1 def log_chi_pdf(x): return np.exp(x) * df * st.chi2.pdf(np.exp(x) * df, df=df) def compute_moment(moment): std_est = np.sqrt(2 / df) fn = lambda x, m=moment: x ** m * log_chi_pdf(x) return integrate.quad(fn, -100 * std_est, 100 * std_est)[0] mean = compute_moment(1) second = compute_moment(2) vars.append(second - mean ** 2) self._saved_var_var = (n_samples, np.array(vars)) return np.array(vars) def est_bootstrap(self, n_subsamples=100, sample_vector=None, moments_fn=None): """ Perform bootstrap resampling to estimate uncertainty in MLMC estimators. :param n_subsamples: int, default=100 Number of bootstrap subsamples. :param sample_vector: ndarray, optional Sampling vector for selecting subsamples. :param moments_fn: callable, optional Moment evaluation function. """ if moments_fn is not None: self._moments_fn = moments_fn else: moments_fn = self._moments_fn sample_vector = determine_sample_vec( n_collected_samples=self._sample_storage.get_n_collected(), n_levels=self._sample_storage.get_n_levels(), sample_vector=sample_vector ) bs_mean, bs_var, bs_l_means, bs_l_vars = [], [], [], [] for i in range(n_subsamples): quantity_subsample = self.quantity.subsample(sample_vec=sample_vector) moments_quantity = qe.moments(quantity_subsample, moments_fn=moments_fn, mom_at_bottom=False) q_mean = qe.estimate_mean(moments_quantity) bs_mean.append(q_mean.mean) bs_var.append(q_mean.var) bs_l_means.append(q_mean.l_means) bs_l_vars.append(q_mean.l_vars) self.bs_mean = bs_mean self.bs_var = bs_var self.mean_bs_mean = np.mean(bs_mean, axis=0) self.mean_bs_var = np.mean(bs_var, axis=0) self.mean_bs_l_means = np.mean(bs_l_means, axis=0) self.mean_bs_l_vars = np.mean(bs_l_vars, axis=0) self.var_bs_mean = np.var(bs_mean, axis=0, ddof=1) self.var_bs_var = np.var(bs_var, axis=0, ddof=1) self.var_bs_l_means = np.var(bs_l_means, axis=0, ddof=1) self.var_bs_l_vars = np.var(bs_l_vars, axis=0, ddof=1) self._bs_level_mean_variance = ( self.var_bs_l_means * np.array(self._sample_storage.get_n_collected())[:, None] ) def bs_target_var_n_estimated(self, target_var, sample_vec=None, n_subsamples=100): """ Estimate the number of samples required to achieve a target variance. :param target_var: float Desired target variance for MLMC estimation. :param sample_vec: ndarray, optional Sampling vector specifying subsamples per level. :param n_subsamples: int, default=100 Number of bootstrap resamplings to perform. :return: ndarray Estimated number of samples required at each level. """ sample_vec = determine_sample_vec( n_collected_samples=self._sample_storage.get_n_collected(), n_levels=self._sample_storage.get_n_levels(), sample_vector=sample_vec ) self.est_bootstrap(n_subsamples=n_subsamples, sample_vector=sample_vec) variances, n_ops = self.estimate_diff_vars_regression( sample_vec, raw_vars=self.mean_bs_l_vars ) n_estimated = estimate_n_samples_for_target_variance( target_var, variances, n_ops, n_levels=self._sample_storage.get_n_levels() ) return n_estimated def plot_variances(self, sample_vec=None): """ Plot variance breakdown from bootstrap and regression data. :param sample_vec: ndarray, optional Sampling vector specifying subsamples per level. """ var_plot = plots.VarianceBreakdown(10) sample_vec = determine_sample_vec( n_collected_samples=self._sample_storage.get_n_collected(), n_levels=self._sample_storage.get_n_levels(), sample_vector=sample_vec ) self.est_bootstrap(n_subsamples=100, sample_vector=sample_vec) var_plot.add_variances( self.mean_bs_l_vars, sample_vec, ref_level_vars=self._bs_level_mean_variance ) var_plot.show(None) def plot_bs_var_log(self, sample_vec=None): """ Generate log-scale bootstrap variance plots and variance regression fits. :param sample_vec: ndarray, optional Sampling vector specifying subsamples per level. """ sample_vec = determine_sample_vec( n_collected_samples=self._sample_storage.get_n_collected(), n_levels=self._sample_storage.get_n_levels(), sample_vector=sample_vec ) moments_quantity = qe.moments( self._quantity, moments_fn=self._moments_fn, mom_at_bottom=False ) q_mean = qe.estimate_mean(moments_quantity) bs_plot = plots.BSplots( bs_n_samples=sample_vec, n_samples=self._sample_storage.get_n_collected(), n_moments=self._moments_fn.size, ref_level_var=q_mean.l_vars ) bs_plot.plot_means_and_vars( self.mean_bs_mean[1:], self.mean_bs_var[1:], n_levels=self._sample_storage.get_n_levels() ) bs_plot.plot_bs_variances(self.mean_bs_l_vars) # bs_plot.plot_bs_var_log_var() bs_plot.plot_var_regression(self, self._sample_storage.get_n_levels(), self._moments_fn) def fine_coarse_violinplot(self): """ Create violin plots comparing fine and coarse samples across levels. Uses pandas for data organization and mlmc.plot.violinplot for visualization. """ import pandas as pd from mlmc.plot import violinplot label_n_spaces = 5 n_levels = self._sample_storage.get_n_levels() if n_levels > 1: for level_id in range(n_levels): chunk_spec = next( self._sample_storage.chunks( level_id=level_id, n_samples=self._sample_storage.get_n_collected()[level_id] ) ) samples = np.squeeze(self._quantity.samples(chunk_spec, axis=0)) if level_id == 0: label = "{} F{} {} C".format(level_id, ' ' * label_n_spaces, level_id + 1) data = {'samples': samples[:, 0], 'type': 'fine', 'level': label} dframe = pd.DataFrame(data) else: data = {'samples': samples[:, 1], 'type': 'coarse', 'level': label} dframe = pd.concat([dframe, pd.DataFrame(data)], axis=0) if level_id + 1 < n_levels: label = "{} F{} {} C".format(level_id, ' ' * label_n_spaces, level_id + 1) data = {'samples': samples[:, 0], 'type': 'fine', 'level': label} dframe = pd.concat([dframe, pd.DataFrame(data)], axis=0) violinplot.fine_coarse_violinplot(dframe) @staticmethod def estimate_domain(quantity, sample_storage, quantile=None): """ Estimate lower and upper bounds of the domain from MLMC samples. :param quantity: mlmc.quantity.Quantity Quantity object representing the stochastic quantity. :param sample_storage: mlmc.sample_storage.SampleStorage Storage object containing all level samples. :param quantile: float, optional Quantile value in (0, 1). None defaults to 0.01. :return: tuple (lower_bound, upper_bound) """ ranges = [] if quantile is None: quantile = 0.01 for level_id in range(sample_storage.get_n_levels()): try: sample_storage.get_n_collected()[level_id] except AttributeError: print(f"No collected values for level {level_id}") break print("sample_storage.get_n_collected() ", type(sample_storage.get_n_collected()[0])) if isinstance(sample_storage.get_n_collected()[level_id], AttributeError): print("continue") continue chunk_spec = next( sample_storage.chunks( level_id=level_id, n_samples=sample_storage.get_n_collected()[level_id] ) ) fine_samples = quantity.samples(chunk_spec)[..., 0] fine_samples = np.squeeze(fine_samples) print("fine samples ", fine_samples) fine_samples = fine_samples[~np.isnan(fine_samples)] ranges.append(np.percentile(fine_samples, [100 * quantile, 100 * (1 - quantile)])) ranges = np.array(ranges) return np.min(ranges[:, 0]), np.max(ranges[:, 1]) def construct_density(self, tol=1e-8, reg_param=0.0, orth_moments_tol=1e-4, exact_pdf=None): """ Construct an approximate probability density function using orthogonal moments. :param tol: float, default=1e-8 Optimization tolerance for density estimation. :param reg_param: float, default=0.0 Regularization parameter to stabilize estimation. :param orth_moments_tol: float, default=1e-4 Tolerance for orthogonalization of moments. :param exact_pdf: callable, optional Reference exact PDF for validation or comparison. :return: tuple (distribution, info, result, moments_object) """ if not isinstance(self._quantity.qtype, ScalarType): raise NotImplementedError("Currently, only ScalarType quantities are supported.") cov_mean = qe.estimate_mean(qe.covariance(self._quantity, self._moments_fn)) cov_mat = cov_mean.mean moments_obj, info = mlmc.tool.simple_distribution.construct_ortogonal_moments( self._moments_fn, cov_mat, tol=orth_moments_tol ) moments_mean = qe.estimate_mean(qe.moments(self._quantity, moments_obj)) est_moments = moments_mean.mean est_vars = moments_mean.var est_vars = np.ones(moments_obj.size) min_var, max_var = np.min(est_vars[1:]), np.max(est_vars[1:]) moments_data = np.stack((est_moments, est_vars), axis=1) distr_obj = mlmc.tool.simple_distribution.SimpleDistribution( moments_obj, moments_data, domain=moments_obj.domain ) result = distr_obj.estimate_density_minimize(tol, reg_param) return distr_obj, info, result, moments_obj def get_level_samples(self, level_id, n_samples=None): """ Retrieve MLMC samples for a given level. :param level_id: int Level index to access. :param n_samples: int, optional Number of samples to retrieve. If None, retrieves all available samples. :return: ndarray Samples for the specified level. """ chunk_spec = next(self._sample_storage.chunks(level_id=level_id, n_samples=n_samples)) return self._quantity.samples(chunk_spec=chunk_spec) def kurtosis_check(self, quantity=None): """ Compute and return the kurtosis of the given or stored quantity. :param quantity: mlmc.quantity.Quantity, optional Quantity for which to compute kurtosis. Defaults to the stored quantity. :return: float or ndarray Computed kurtosis per level. """ if quantity is None: quantity = self._quantity moments_mean_quantity = qe.estimate_mean(quantity) kurtosis = qe.level_kurtosis(quantity, moments_mean_quantity) return kurtosis
def consistency_check(quantity, sample_storage=None): """ Check consistency between fine and coarse level samples in MLMC. :param quantity: mlmc.quantity.Quantity instance :param sample_storage: mlmc.sample_storage.SampleStorage instance :return: dict mapping level_id -> consistency metric """ fine_samples = {} coarse_samples = {} for chunk_spec in quantity.get_quantity_storage().chunks(): samples = quantity.samples(chunk_spec) chunk, _ = mask_nan_samples(samples) # Skip empty chunks if chunk.shape[1] == 0: continue fine_samples.setdefault(chunk_spec.level_id, []).extend(chunk[:, :, 0]) if chunk_spec.level_id > 0: coarse_samples.setdefault(chunk_spec.level_id, []).extend(chunk[:, :, 1]) cons_check_val = {} for level_id in range(sample_storage.get_n_levels()): if level_id > 0: fine_mean = np.mean(fine_samples[level_id]) coarse_mean = np.mean(coarse_samples[level_id]) diff_mean = np.mean(np.array(fine_samples[level_id]) - np.array(coarse_samples[level_id])) fine_var = np.var(fine_samples[level_id]) coarse_var = np.var(fine_samples[level_id]) diff_var = np.var(np.array(fine_samples[level_id]) - np.array(coarse_samples[level_id])) val = np.abs(coarse_mean - fine_mean + diff_mean) / ( 3 * (np.sqrt(coarse_var) + np.sqrt(fine_var) + np.sqrt(diff_var)) ) assert np.isclose(coarse_mean - fine_mean + diff_mean, 0) assert val < 0.9 cons_check_val[level_id] = val return cons_check_val def coping_with_high_kurtosis(vars, costs, kurtosis, kurtosis_threshold=100): """ Adjust variance estimates if kurtosis is unusually high to avoid underestimation. :param vars: ndarray of shape (L, M) with level variances for moments :param costs: cost of computing samples per level :param kurtosis: kurtosis of each level :param kurtosis_threshold: threshold above which kurtosis is considered "high" :return: adjusted vars ndarray """ for l_id in range(2, vars.shape[0]): if kurtosis[l_id] > kurtosis_threshold: vars[l_id] = np.maximum(vars[l_id], 0.5 * vars[l_id - 1] * costs[l_id - 1] / costs[l_id]) return vars def estimate_n_samples_for_target_variance(target_variance, prescribe_vars, n_ops, n_levels, theta=0, kurtosis=None): """ Estimate optimal number of samples per level to reach a target variance. :param target_variance: desired variance for MLMC estimator :param prescribe_vars: ndarray of level variances (L x M) :param n_ops: cost/operations per level :param n_levels: number of levels :param theta: safety factor (0 ≤ theta < 1) :param kurtosis: optional ndarray of kurtosis per level :return: ndarray of optimal number of samples per moment function """ vars = prescribe_vars if kurtosis is not None and len(vars) == len(kurtosis): vars = coping_with_high_kurtosis(vars, n_ops, kurtosis) sqrt_var_n = np.sqrt(vars.T * n_ops) # moments_fn in rows, levels in cols total = np.sum(sqrt_var_n, axis=1) n_samples_estimate = np.round((sqrt_var_n / n_ops).T * total / target_variance).astype(int) n_samples_estimate = 1 / (1 - theta) * n_samples_estimate # Limit maximal number of samples per level n_samples_estimate_safe = np.maximum( np.minimum(n_samples_estimate, vars * n_levels / target_variance), 2 ) return np.max(n_samples_estimate_safe, axis=1).astype(int) def calc_level_params(step_range, n_levels): """ Compute level-dependent step sizes for MLMC. :param step_range: tuple (h_fine, h_coarse) :param n_levels: number of levels :return: list of step sizes per level """ assert step_range[0] > step_range[1] level_parameters = [] for i_level in range(n_levels): level_param = 1 if n_levels == 1 else i_level / (n_levels - 1) level_parameters.append([step_range[0] ** (1 - level_param) * step_range[1] ** level_param]) return level_parameters def determine_sample_vec(n_collected_samples, n_levels, sample_vector=None): """ Determine the sample vector for bootstrapping or MLMC calculations. """ if sample_vector is None: sample_vector = n_collected_samples if len(sample_vector) > n_levels: sample_vector = sample_vector[:n_levels] return np.array(sample_vector) def determine_level_parameters(n_levels, step_range): """ Wrapper to calculate level parameters (simulation step sizes). """ assert step_range[0] > step_range[1] level_parameters = [] for i_level in range(n_levels): level_param = 1 if n_levels == 1 else i_level / (n_levels - 1) level_parameters.append([step_range[0] ** (1 - level_param) * step_range[1] ** level_param]) return level_parameters def determine_n_samples(n_levels, n_samples=None): """ Generate an array of target sample sizes for each level. :param n_levels: number of MLMC levels :param n_samples: int or list of 2 ints to define start/end for exponential spacing :return: ndarray of sample sizes for each level """ if n_samples is None: n_samples = [100, 3] n_samples = np.atleast_1d(n_samples) if len(n_samples) == 1: n_samples = np.array([n_samples[0], 3]) if len(n_samples) == 2: n0, nL = n_samples n_samples = np.round(np.exp2(np.linspace(np.log2(n0), np.log2(nL), n_levels))).astype(int) return n_samples