Source code for mlmc.quantity.quantity_estimate

import numpy as np
import mlmc.quantity.quantity
import mlmc.quantity.quantity_types as qt


[docs] def mask_nan_samples(chunk): """ Remove (mask out) samples containing NaN values in either the fine or coarse part of the result. :param chunk: np.ndarray of shape [M, chunk_size, 2] M - quantity size (number of scalar components), chunk_size - number of samples in the chunk, 2 - fine and coarse parts of the result. :return: (filtered_chunk, n_masked) filtered_chunk: np.ndarray with invalid samples removed, n_masked: int, number of masked (removed) samples. """ # Identify any sample with NaNs in its fine or coarse component mask = np.any(np.isnan(chunk), axis=0).any(axis=1) return chunk[..., ~mask, :], np.count_nonzero(mask)
[docs] def cache_clear(): """ Clear cached Quantity sample evaluations. Used before running MLMC estimations to ensure fresh data is fetched from storage. """ mlmc.quantity.quantity.Quantity.samples.cache_clear() mlmc.quantity.quantity.QuantityConst.samples.cache_clear()
[docs] def estimate_mean(quantity, form="diff", operation_func=None, **kwargs): """ Estimate the MLMC mean (and variance) of a Quantity using multilevel sampling. The function computes per-level means and variances from simulation results. Supports large datasets via chunked processing and handles NaN-masked samples. :param quantity: Quantity instance to estimate. :param form: str, type of estimation: - "diff": estimate based on differences (fine - coarse) → standard MLMC approach. - "fine": estimate using fine-level data only. - "coarse": estimate using coarse-level data only. :param operation_func: Optional transformation applied to chunk data before accumulation (e.g., for moment or kurtosis computation). :param kwargs: Additional keyword arguments passed to operation_func. :return: QuantityMean object containing mean, variance, and sample statistics per level. """ # Reset cached quantity evaluations cache_clear() quantity_vec_size = quantity.size() sums = None sums_of_squares = None # Initialize level-specific storage quantity_storage = quantity.get_quantity_storage() level_ids = quantity_storage.level_ids() n_levels = np.max(level_ids) + 1 n_samples = [0] * n_levels n_rm_samples = [0] * n_levels # Iterate through data chunks for chunk_spec in quantity_storage.chunks(): samples = quantity.samples(chunk_spec) chunk, n_mask_samples = mask_nan_samples(samples) n_samples[chunk_spec.level_id] += chunk.shape[1] n_rm_samples[chunk_spec.level_id] += n_mask_samples # Skip empty chunks if chunk.shape[1] == 0: continue assert chunk.shape[0] == quantity_vec_size # Allocate accumulators at first valid chunk if sums is None: sums = [np.zeros(chunk.shape[0]) for _ in range(n_levels)] sums_of_squares = [np.zeros(chunk.shape[0]) for _ in range(n_levels)] # Select appropriate data form for the estimator if form == "fine": chunk_diff = chunk[:, :, 0] elif form == "coarse": chunk_diff = np.zeros_like(chunk[:, :, 0]) if chunk_spec.level_id == 0 else chunk[:, :, 1] else: # Default MLMC difference (fine - coarse) chunk_diff = chunk[:, :, 0] if chunk_spec.level_id == 0 else chunk[:, :, 0] - chunk[:, :, 1] # Optional user-defined transformation of data if operation_func is not None: chunk_diff = operation_func(chunk_diff, chunk_spec, **kwargs) # Accumulate sums and squared sums for this level sums[chunk_spec.level_id] += np.sum(chunk_diff, axis=1) sums_of_squares[chunk_spec.level_id] += np.sum(chunk_diff ** 2, axis=1) if sums is None: raise Exception("All samples were masked (no valid data found).") # Compute means and variances for each level l_means = [] l_vars = [] for s, sp, n in zip(sums, sums_of_squares, n_samples): l_means.append(s / n) if n > 1: l_vars.append((sp - (s ** 2 / n)) / (n - 1)) else: l_vars.append(np.full(len(s), np.inf)) # Construct QuantityMean object with level statistics return mlmc.quantity.quantity.QuantityMean( quantity.qtype, l_means=l_means, l_vars=l_vars, n_samples=n_samples, n_rm_samples=n_rm_samples )
[docs] def moment(quantity, moments_fn, i=0): """ Construct a Quantity that represents a single statistical moment. :param quantity: Base Quantity instance. :param moments_fn: Instance of mlmc.moments.Moments defining the moment computation. :param i: Index of the moment to compute. :return: New Quantity that computes the i-th moment. """ def eval_moment(x): return moments_fn.eval_single_moment(i, value=x) return mlmc.quantity.quantity.Quantity( quantity_type=quantity.qtype, input_quantities=[quantity], operation=eval_moment )
[docs] def moments(quantity, moments_fn, mom_at_bottom=True): """ Construct a Quantity representing all moments defined by a given Moments object. :param quantity: Base Quantity. :param moments_fn: mlmc.moments.Moments child defining moment evaluations. :param mom_at_bottom: bool, if True, moments are added at the lowest (scalar) level of the Quantity type. :return: Quantity that computes all defined moments. """ def eval_moments(x): if mom_at_bottom: mom = moments_fn.eval_all(x).transpose((0, 3, 1, 2)) # [M, R, N, 2] else: mom = moments_fn.eval_all(x).transpose((3, 0, 1, 2)) # [R, M, N, 2] return mom.reshape((np.prod(mom.shape[:-2]), mom.shape[-2], mom.shape[-1])) # [M, N, 2] # Define new Quantity type according to desired hierarchy if mom_at_bottom: moments_array_type = qt.ArrayType(shape=(moments_fn.size,), qtype=qt.ScalarType()) moments_qtype = quantity.qtype.replace_scalar(moments_array_type) else: moments_qtype = qt.ArrayType(shape=(moments_fn.size,), qtype=quantity.qtype) return mlmc.quantity.quantity.Quantity( quantity_type=moments_qtype, input_quantities=[quantity], operation=eval_moments )
[docs] def covariance(quantity, moments_fn, cov_at_bottom=True): """ Construct a Quantity representing covariance matrices of the given moments. :param quantity: Base Quantity. :param moments_fn: mlmc.moments.Moments child defining moment evaluations. :param cov_at_bottom: bool, if True covariance matrices are attached at the scalar level of the Quantity type. :return: Quantity that computes covariance matrices. """ def eval_cov(x): # Compute all moments (fine and coarse) moments = moments_fn.eval_all(x) mom_fine = moments[..., 0, :] cov_fine = np.einsum('...i,...j', mom_fine, mom_fine) if moments.shape[-2] == 1: # Single level (no coarse) cov = np.array([cov_fine]) else: mom_coarse = moments[..., 1, :] cov_coarse = np.einsum('...i,...j', mom_coarse, mom_coarse) cov = np.array([cov_fine, cov_coarse]) # Reshape covariance according to desired data layout if cov_at_bottom: cov = cov.transpose((1, 3, 4, 2, 0)) # [M, R, R, N, 2] else: cov = cov.transpose((3, 4, 1, 2, 0)) # [R, R, M, N, 2] return cov.reshape((np.prod(cov.shape[:-2]), cov.shape[-2], cov.shape[-1])) # Adjust Quantity type for covariance structure if cov_at_bottom: moments_array_type = qt.ArrayType(shape=(moments_fn.size, moments_fn.size), qtype=qt.ScalarType()) moments_qtype = quantity.qtype.replace_scalar(moments_array_type) else: moments_qtype = qt.ArrayType(shape=(moments_fn.size, moments_fn.size), qtype=quantity.qtype) return mlmc.quantity.quantity.Quantity( quantity_type=moments_qtype, input_quantities=[quantity], operation=eval_cov )
[docs] def kurtosis_numerator(chunk_diff, chunk_spec, l_means): """ Compute the numerator for the sample kurtosis: E[(Y_l - E[Y_l])^4] :param chunk_diff: np.ndarray [quantity shape, number of samples] :param chunk_spec: quantity_spec.ChunkSpec describing current level and chunk. :param l_means: List of per-level means used for centering. :return: np.ndarray of the same shape as input. """ return (chunk_diff - l_means[chunk_spec.level_id]) ** 4
[docs] def level_kurtosis(quantity, means_obj): """ Estimate the sample kurtosis for each level: E[(Y_l - E[Y_l])^4] / (Var[Y_l])^2, where Y_l = fine_l - coarse_l :param quantity: Quantity instance. :param means_obj: QuantityMean object containing level means and variances. :return: np.ndarray of kurtosis values per level. """ numerator_means_obj = estimate_mean(quantity, operation_func=kurtosis_numerator, l_means=means_obj.l_means) kurtosis = numerator_means_obj.l_means / (means_obj.l_vars) ** 2 return kurtosis