import numpy as np
import scipy as sc
import scipy.integrate as integrate
[docs]
class Distribution:
"""
Calculation of the distribution
"""
def __init__(self, moments_obj, moment_data, domain=None, force_decay=(True, True),
monitor=False):
"""
:param moments_obj: Function for calculating moments
:param moment_data: Array of moments and their vars; (n_moments, 2)
:param domain: Explicit domain fo reconstruction. None = use domain of moments.
:param force_decay: Flag for each domain side to enforce decay of the PDF approximation.
"""
# Family of moments basis functions.
self.moments_basis = moments_obj
# Moment evaluation function with bounded number of moments and their domain.
self.moments_fn = None
# Domain of the density approximation (and moment functions).
if domain is None:
domain = moments_obj.domain
self.domain = domain
# Indicates whether force decay of PDF at domain endpoints.
self.decay_penalty = force_decay
# Approximation of moment values.
self.moment_means = moment_data[:, 0]
self.moment_errs = np.sqrt(moment_data[:, 1])
# Approximation paramters. Lagrange multipliers for moment equations.
self.multipliers = None
# Number of basis functions to approximate the density.
# In future can be smaller then number of provided approximative moments.
self.approx_size = len(self.moment_means)
assert moments_obj.size >= self.approx_size
self.moments_fn = moments_obj
# Degree of Gauss quad to use on every subinterval determined by adaptive quad.
self._gauss_degree = 21
# Panalty coef for endpoint derivatives
self._penalty_coef = 10
# Flag for monitoring convergence on stdout.
self.monitor = monitor
[docs]
def estimate_density_minimize(self, tol=1e-5, reg_param =0.01):
"""
Optimize density estimation
:param tol: Tolerance for the nonlinear system residual, after division by std errors for
individual moment means, i.e.
res = || (F_i - \mu_i) / \sigma_i ||_2
:return: None
"""
# Initialize domain, multipliers, ...
# Geometrical series for sizes with base 1.2.
# Using just odd numbers.
self._reg_param = reg_param
base = 1.2
if self.approx_size <= 5:
sizes = [self.approx_size]
else:
size = self.approx_size
sizes = [size]
while size > 4:
size /= base
odd_size = 2*round((size-1)/2)+1
if odd_size != sizes[-1]:
sizes.append(odd_size)
sizes.reverse()
#print("sizes: ", sizes)
self.approx_size = sizes[0]
self._initialize_params(self.approx_size, tol)
self.extend_size(self.approx_size)
init_error = np.linalg.norm(self._calculate_gradient(self.multipliers))
if len(sizes) == 1:
tolerances = [ tol ]
else:
t1 = tol
t0 = max(tol, init_error / 10)
t = (np.array(sizes) - sizes[0]) / ( sizes[-1] - sizes[0])
tolerances = np.exp(np.log(t1) * t + np.log(t0) * (1-t))
#print("tolerances: ", tolerances)
total_nit = 0
for approx_size, approx_tol in zip(sizes, tolerances):
self._quad_tolerance = approx_tol / 16
self.extend_size(approx_size)
#if approx_size == self.moments_fn.size or approx_size == sizes[0]:
# max_it = 200
#else:
# max_it = 20
max_it = 200
method = 'trust-exact'
#method ='Newton-CG'
result = sc.optimize.minimize(self._calculate_functional, self.multipliers, method=method,
jac=self._calculate_gradient,
hess=self._calculate_jacobian_matrix,
options={'tol': approx_tol, 'xtol': approx_tol,
'gtol': approx_tol, 'disp': False, 'maxiter': max_it})
self.multipliers = result.x
jac_norm = np.linalg.norm(result.jac)
total_nit += result.nit
if self.monitor:
print("Iteration: size: {} nits: {} tol: {:5.3g} res: {:5.3g} msg: {}".format(
self.approx_size, result.nit, approx_tol, jac_norm, result.message))
# Fix normalization
gradient, _ = self._calculate_exact_moment(self.multipliers, m=0, full_output=0)
self.multipliers /= gradient
if result.success or jac_norm < tol:
result.success = True
result.nit = total_nit
result.fun_norm = jac_norm
return result
[docs]
def estimate_density(self, tol=None):
"""
Run nonlinear iterative solver to estimate density, use previous solution as initial guess.
Faster, but worse stability.
:return: None
"""
# Initialize domain, multipliers, ...
self._initialize_params(tol)
result = sc.optimize.root(
fun=self._calculate_gradient,
x0=self.multipliers,
jac=self._calculate_jacobian_matrix,
tol=tol
)
fun_norm = np.linalg.norm(result.fun)
if result.success or fun_norm < tol:
result.success = True
self.multipliers = result.x
result.fun_norm = fun_norm
return result
[docs]
def density(self, value, moments_fn=None):
"""
:param value: float or np.array
:param moments_fn: counting moments function
:return: density for passed value
"""
size = self.approx_size
if moments_fn is None:
moments_fn = self.moments_fn
moments = moments_fn.eval_all(value, size=size)
return np.exp(-np.sum(moments * self.multipliers / self._moment_errs, axis=1))
[docs]
def cdf(self, values):
values = np.atleast_1d(values)
np.sort(values)
last_x = self.domain[0]
last_y = 0
cdf_y = np.empty(len(values))
for i, val in enumerate(values):
if val <= self.domain[0]:
last_y = 0
elif val >= self.domain[1]:
last_y = 1
else:
dy = integrate.fixed_quad(self.density, last_x, val, n=10)[0]
last_x = val
last_y = last_y + dy
cdf_y[i] = last_y
return cdf_y
def _initialize_params(self, size, tol=None):
"""
Initialize parameters for density estimation
:return: None
"""
assert self.domain is not None
assert tol is not None
self._quad_tolerance = tol / 16
self.moment_errs[0] = np.min(self.moment_errs[1:]) / 8
# Start with uniform distribution
self.multipliers = np.zeros(size)
self.multipliers[0] = -np.log(1/(self.domain[1] - self.domain[0])) * self.moment_errs[0]
# Log to store error messages from quad, report only on conv. problem.
self._quad_log = []
[docs]
def extend_size(self, new_size):
self._last_solved_multipliers = self.multipliers
self._stab_penalty = self._reg_param / np.linalg.norm(self.multipliers)
#self._stab_penalty = 0.0
self.approx_size = new_size
multipliers = self.multipliers
self.multipliers = np.zeros(new_size)
self.multipliers[:len(multipliers)] = multipliers
self._moment_means = self.moment_means[:self.approx_size]
self._moment_errs = self.moment_errs[:self.approx_size]
# Evaluate endpoint derivatives of the moments.
self._end_point_diff = self.end_point_derivatives()
self._update_quadrature(self.multipliers, force=True)
#self._calculate_gradient(self.multipliers)
[docs]
def eval_moments(self, x):
return self.moments_fn.eval_all(x, self.approx_size)
def _calculate_exact_moment(self, multipliers, m=0, full_output=0):
def integrand(x):
moms = self.eval_moments(x)
power = -np.sum(moms * multipliers / self._moment_errs, axis=1)
power = np.minimum(np.maximum(power, -200), 200)
return np.exp(power) * moms[:,m]
result = sc.integrate.quad(integrand, self.domain[0], self.domain[1],
epsabs=self._quad_tolerance, full_output=full_output)
return result[0], result
def _calculate_exact_hessian(self, i, j, multipliers=None):
if multipliers is None:
multipliers = self.multipliers
def integrand(x):
moms = self.eval_moments(x)
power = -np.sum(moms * multipliers / self._moment_errs, axis=1)
power = np.minimum(np.maximum(power, -200), 200)
return np.exp(power) * moms[:,i] * moms[:,j]
result = sc.integrate.quad(integrand, self.domain[0], self.domain[1],
epsabs=self._quad_tolerance, full_output=False)
return result[0], result
def _update_quadrature(self, multipliers, force=False):
"""
Update quadrature points and their moments and weights based on integration of the density.
return: True if update of gradient is necessary
"""
if not force:
mult_norm = np.linalg.norm(multipliers - self._last_multipliers)
grad_norm = np.linalg.norm(self._last_gradient)
if grad_norm * mult_norm < self._quad_tolerance:
#print("OPT")
return
#print(grad_norm * mult_norm, self._quad_tolerance)
# More precise but depends on actual gradient which may not be available
quad_err_estimate = np.abs(np.dot(self._last_gradient, (multipliers - self._last_multipliers)))
if quad_err_estimate < self._quad_tolerance:
return
val, result = self._calculate_exact_moment(multipliers, m=0, full_output=1)
if len(result) > 3:
y, abserr, info, message = result
self._quad_log.append(result)
else:
y, abserr, info = result
message =""
pt, w = np.polynomial.legendre.leggauss(self._gauss_degree)
K = info['last']
#print("Update Quad: {} {} {} {}".format(K, y, abserr, message))
a = info['alist'][:K, None]
b = info['blist'][:K, None]
points = (pt[None, :] + 1) / 2 * (b - a) + a
weights = w[None, :] * (b - a) / 2
self._quad_points = points.flatten()
self._quad_weights = weights.flatten()
self._quad_moments = self.eval_moments(self._quad_points)
power = -np.dot(self._quad_moments, multipliers/self._moment_errs)
power = np.minimum(np.maximum(power, -200), 200)
q_gradient = self._quad_moments.T * np.exp(power)
integral = np.dot(q_gradient, self._quad_weights) / self._moment_errs
self._last_multipliers = multipliers
self._last_gradient = integral
[docs]
def end_point_derivatives(self):
"""
Compute approximation of moment derivatives at endpoints of the domain.
:return: array (2, n_moments)
"""
eps = 1e-10
left_diff = right_diff = np.zeros((1, self.approx_size))
if self.decay_penalty[0]:
left_diff = self.eval_moments(self.domain[0] + eps) - self.eval_moments(self.domain[0])
if self.decay_penalty[1]:
right_diff = -self.eval_moments(self.domain[1]) + self.eval_moments(self.domain[1] - eps)
return np.stack((left_diff[0,:], right_diff[0,:]), axis=0)/eps/ self._moment_errs[None, :]
def _calculate_functional(self, multipliers):
"""
Minimized functional.
:param multipliers: current multipliers
:return: float
"""
update_grad = self._update_quadrature(multipliers)
power = -np.dot(self._quad_moments, multipliers / self._moment_errs)
power = np.minimum(np.maximum(power, -200), 200)
q_density = np.exp(power)
integral = np.dot(q_density, self._quad_weights)
sum = np.sum(self._moment_means * multipliers / self._moment_errs)
end_diff = np.dot(self._end_point_diff, multipliers)
penalty = np.sum(np.maximum(end_diff, 0)**2)
fun = sum + integral
fun = fun + np.abs(fun) * self._penalty_coef * penalty
last_size = len(self._last_solved_multipliers)
fun += 0.5 * self._stab_penalty * np.sum((self._last_solved_multipliers - multipliers[:last_size])**2)
return fun
def _calculate_gradient(self, multipliers):
"""
Gradient of th functional
:return: array, shape (n_moments,)
"""
self._update_quadrature(multipliers)
power = -np.dot(self._quad_moments, multipliers / self._moment_errs)
#power = np.minimum(np.maximum(power, -200), 200)
q_density = np.exp(power)
q_gradient = self._quad_moments.T * q_density
integral = np.dot(q_gradient, self._quad_weights) / self._moment_errs
end_diff = np.dot(self._end_point_diff, multipliers)
penalty = 2 * np.dot( np.maximum(end_diff, 0), self._end_point_diff)
fun = np.sum(self._moment_means * multipliers / self._moment_errs) + integral[0] * self._moment_errs[0]
gradient = self._moment_means / self._moment_errs - integral + np.abs(fun) * self._penalty_coef * penalty
last_size = len(self._last_solved_multipliers)
gradient[:last_size] += self._stab_penalty * (multipliers[:last_size] - self._last_solved_multipliers)
return gradient
def _calculate_jacobian_matrix(self, multipliers):
"""
:return: jacobian matrix, symmetric, (n_moments, n_moments)
"""
self._update_quadrature(multipliers)
power = -np.dot(self._quad_moments, multipliers / self._moment_errs)
power = np.minimum(np.maximum(power, -200), 200)
q_density = np.exp(power)
moment_outer = np.einsum('ki,kj->ijk', self._quad_moments, self._quad_moments)
triu_idx = np.triu_indices(self.approx_size)
triu_outer = moment_outer[triu_idx[0], triu_idx[1], :]
q_jac = triu_outer * q_density
integral = np.dot(q_jac, self._quad_weights)
integral /= self._moment_errs[triu_idx[0]]
integral /= self._moment_errs[triu_idx[1]]
jacobian_matrix = np.empty(shape=(self.approx_size, self.approx_size))
jacobian_matrix[triu_idx[0], triu_idx[1]] = integral
jacobian_matrix[triu_idx[1], triu_idx[0]] = integral
end_diff = np.dot(self._end_point_diff, multipliers)
fun = np.sum(self._moment_means * multipliers / self._moment_errs) \
+ jacobian_matrix[0,0] * self._moment_errs[0]**2
for side in [0,1]:
if end_diff[side] > 0:
penalty = 2 * np.outer(self._end_point_diff[side], self._end_point_diff[side])
jacobian_matrix += np.abs(fun) * self._penalty_coef * penalty
jacobian_matrix[np.diag_indices_from(jacobian_matrix)] += self._stab_penalty
return jacobian_matrix
[docs]
def compute_exact_moments(moments_fn, density, tol=1e-4):
"""
Compute approximation of moments using exact density.
:param moments_fn: Moments function.
:param n_moments: Number of mements to compute.
:param density: Density function (must accept np vectors).
:param a, b: Integral bounds, approximate integration over R.
:param tol: Tolerance of integration.
:return: np.array, moment values
"""
a, b = moments_fn.domain
integral = np.zeros(moments_fn.size)
for i in range(moments_fn.size):
fn = lambda x, m = i: moments_fn.eval(m, x) * density(x)
integral[i] = integrate.quad(fn, a, b, epsabs = tol)[0]
return integral
[docs]
def KL_divergence(prior_density, posterior_density, a, b):
"""
Compute D_KL(P | Q) = \int_R P(x) \log( P(X)/Q(x)) \dx
:param prior_density: P
:param posterior_density: Q
:return: KL divergence value
"""
integrand = lambda x: prior_density(x) * max(np.log(prior_density(x) / posterior_density(x)), -1e300)
value = integrate.quad(integrand, a, b, epsabs=1e-10)[0]
return max(value, 1e-10)
[docs]
def L2_distance(prior_density, posterior_density, a, b):
integrand = lambda x: (posterior_density(x) - prior_density(x)) ** 2
return np.sqrt(integrate.quad(integrand, a, b))[0]