import copy
import numpy as np
import numpy.linalg as la
import numpy.random as rand
import scipy as sp
from sklearn.utils.extmath import randomized_svd
import warnings
warnings.simplefilter('always', DeprecationWarning)
import gstools
[docs]
def kozeny_carman(porosity, m, factor, viscosity):
"""
Kozeny-Carman law. Empirical relationship between porosity and conductivity.
:param porosity: Porosity value.
:param m: Power. Suitable values are 1 < m < 4
:param factor: Factor [m^2]. Examples:
1e-7 , m = 3.48; juta fibers
2.2e-8 , m = 1.46; glass fibers
1.8e-13, m = 2.89; erruptive material
1e-12 , m = 2.76; erruptive material
1.8e-12, m = 1.99; basalt
:param viscosity: Fluid viscosity [Pa.s], e.g., water: 8.90e-4
:return: Conductivity
"""
assert np.all(viscosity > 1e-10)
porosity = np.minimum(porosity, 1-1e-10)
porosity = np.maximum(porosity, 1e-10)
cond = factor * porosity ** (2 + m) / (1 - porosity) ** 2 / viscosity
cond = np.maximum(cond, 1e-15)
return cond
[docs]
def positive_to_range(exp, a, b):
"""
Map a positive parameter 'exp' from <0, ∞) to <a, b>.
:param exp: Positive parameter (e.g., lognormal variable)
:param a: Lower bound of target interval
:param b: Upper bound of target interval
:return: Mapped value in [a, b)
"""
return b * (1 - (b - a) / (b + (b - a) * exp))
[docs]
class Field:
def __init__(self, name, field=None, param_fields=[], regions=[]):
"""
Initialize a Field object.
:param name: Name of the field
:param field: Scalar (const), RandomFieldBase, or callable function
:param param_fields: List of dependent parameter fields
:param regions: List of region names where the field is defined
"""
self.correlated_field = None
self.const = None
self._func = field
self.is_outer = True
if type(regions) is str:
regions = [regions]
self.name = name
if type(field) in [float, int]:
self.const = field
assert len(param_fields) == 0
elif isinstance(field, RandomFieldBase):
self.correlated_field = field
assert len(param_fields) == 0
else:
assert len(param_fields) > 0, field
try:
params = [np.ones(2) for i in range(len(param_fields))]
field(*params)
except:
raise Exception("Invalid field function for field: {}".format(name))
self._func = field
self.regions = regions
self.param_fields = param_fields
[docs]
def set_points(self, points):
"""
Set points for field evaluation.
:param points: Array of points where the field will be evaluated
"""
if self.const is not None:
self._sample = self.const * np.ones(len(points))
elif self.correlated_field is not None:
self.correlated_field.set_points(points)
if type(self.correlated_field) is SpatialCorrelatedField:
self.correlated_field.svd_dcmp(n_terms_range=(10, 100))
else:
pass
[docs]
def sample(self):
"""
Generate or compute a new sample of the field.
:return: Sample values of the field
"""
if self.const is not None:
return self._sample
elif self.correlated_field is not None:
self._sample = self.correlated_field.sample()
else:
params = [pf._sample for pf in self.param_fields]
self._sample = self._func(*params)
return self._sample
[docs]
class Fields:
def __init__(self, fields):
"""
Create a set of cross-dependent random fields.
:param fields: List of Field objects
"""
self.fields_orig = fields
self.fields_dict = {}
self.fields = []
for field in self.fields_orig:
new_field = copy.copy(field)
if new_field.param_fields:
new_field.param_fields = [self._get_field_obj(f, new_field.regions)
for f in new_field.param_fields]
self.fields_dict[new_field.name] = new_field
self.fields.append(new_field)
def _get_field_obj(self, field_name, regions):
"""
Get Field object by name or create constant field.
:param field_name: Field name or constant
:param regions: Regions of the field
:return: Field object
"""
if type(field_name) in [float, int]:
const_field = Field("const_{}".format(field_name), field_name, regions=regions)
self.fields.insert(0, const_field)
self.fields_dict[const_field.name] = const_field
return const_field
else:
assert field_name in self.fields_dict
return self.fields_dict[field_name]
[docs]
def set_outer_fields(self, outer):
"""
Set fields to be included in the sampled dictionary.
:param outer: List of outer field names
"""
outer_set = set(outer)
for f in self.fields:
f.is_outer = f.name in outer_set
[docs]
def set_points(self, points, region_ids=[], region_map={}):
"""
Assign evaluation points to each field.
:param points: Array of points for field evaluation
:param region_ids: Optional array of region ids for each point
:param region_map: Mapping from region name to region id
"""
self.n_elements = len(points)
reg_points = {}
for i, reg_id in enumerate(region_ids):
reg_points.setdefault(reg_id, []).append(i)
for field in self.fields:
point_ids = []
if field.regions:
for reg in field.regions:
reg_id = region_map[reg]
point_ids.extend(reg_points.get(reg_id, []))
field.set_points(points[point_ids])
field.full_sample_ids = point_ids
else:
field.set_points(points)
field.full_sample_ids = np.arange(self.n_elements)
[docs]
def sample(self):
"""
Sample all outer fields.
:return: Dictionary with field names as keys and sampled arrays as values
"""
result = {}
for field in self.fields:
sample = field.sample()
if field.is_outer:
shape = (self.n_elements, 3) if field.name == "cond_tn" else self.n_elements
result[field.name] = np.zeros(shape)
result[field.name][field.full_sample_ids] = sample
return result
[docs]
class RandomFieldBase:
"""
Base class for generating spatially correlated random fields.
Random field F(x) with mean E[F(x)] = mu(x) and covariance Cov[x_i,x_j].
Stationary covariance: Cov_ij = sigma^2 * exp(-|X^T K X|^(alpha/2)),
X = x_i - x_j.
Supports optional non-stationary variance sigma(X).
"""
def __init__(self, corr_exp='gauss', dim=2, corr_length=1.0,
aniso_correlation=None, mu=0.0, sigma=1.0, log=False, **kwargs):
"""
Initialize a random field.
:param corr_exp: 'gauss', 'exp', or float >=1 (correlation exponent)
:param dim: Dimension of the domain
:param corr_length: Scalar correlation length
:param aniso_correlation: Optional anisotropic 3x3 correlation tensor
:param mu: Mean (scalar or array)
:param sigma: Standard deviation (scalar or array)
:param log: If True, output field is exponentiated
"""
self.dim = dim
self.log = log
if corr_exp == 'gauss':
self.correlation_exponent = 2.0
elif corr_exp == 'exp':
self.correlation_exponent = 1.0
else:
self.correlation_exponent = float(corr_exp)
self._corr_length = corr_length
if aniso_correlation is None:
assert corr_length > np.finfo(float).eps
self.correlation_tensor = np.eye(dim, dim) * (1 / (corr_length ** 2))
self._max_corr_length = corr_length
else:
self.correlation_tensor = aniso_correlation
self._max_corr_length = la.norm(aniso_correlation, ord=2)
self.points = None
self.mu = mu
self.sigma = sigma
self._initialize(**kwargs)
def _initialize(self, **kwargs):
"""Implementation-specific initialization. To be overridden in subclasses."""
raise NotImplementedError()
[docs]
def set_points(self, points, mu=None, sigma=None):
"""
Set points for field evaluation.
:param points: Array of points (N x dim)
:param mu: Optional mean at points
:param sigma: Optional standard deviation at points
"""
points = np.array(points, dtype=float)
assert points.shape[1] == self.dim
self.n_points, self.dimension = points.shape
self.points = points
if mu is not None:
self.mu = mu
self.mu = np.array(self.mu, dtype=float)
assert self.mu.shape == () or self.mu.shape == (len(points),)
if sigma is not None:
self.sigma = sigma
self.sigma = np.array(self.sigma, dtype=float)
assert self.sigma.shape == () or self.sigma.shape == (len(points),)
def _set_points(self):
"""Optional internal method to update points. Can be overridden."""
pass
[docs]
def sample(self):
"""
Generate a realization of the random field.
:return: Array of field values at set points
"""
field = self._sample()
field = self.sigma * field + self.mu
return np.exp(field) if self.log else field
def _sample(self, uncorrelated):
"""
Implementation-specific sample generation. To be overridden.
:param uncorrelated: Array of uncorrelated standard normal samples
:return: Field sample
"""
raise NotImplementedError()