Coverage for pygeodyn/common.py: 89%
325 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-22 13:43 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-22 13:43 +0000
1#!/usr/bin/python3
3""" Functions that are common to all algo steps. """
5import math
6import numpy as np
7from . import utilities
8from scipy import linalg
9from .constants import r_earth
10import geodyn_fortran as fortran
12def compute_direct_obs_operator(positions, max_degree, internal_source=True, eps=1e-7):
13 """
14 Computes the direct observation operator H of a measure of given max degree for a list of given positions.
16 :param positions: List of the positions stored under the form [r1, th1, ph1, r2, th2, ph2 ...]
17 :type positions: list or 1D numpy.ndarray
18 :param max_degree: Maximal degree of the observed measure
19 :type max_degree: int
20 :param internal_source: whether the Gauss coefficient represent an internal or an external source (a/r or r/a dependency)
21 :type internal_source: bool
22 :return: the direct observation operator H
23 :rtype: 2D numpy.ndarray (nb_observations x nb_coefs)
24 """
26 No = len(positions)
27 Ncoefs = max_degree * (max_degree + 2)
29 H = np.zeros([No, Ncoefs])
31 for i in range(0, No, 3):
32 r = positions[i]
33 th = positions[i + 1]
34 ph = positions[i + 2]
35 H[i:i+3, :] = compute_lines_direct_obs_operator(r, th, ph, internal_source, eps, max_degree)
36 return H
38def compute_lines_direct_obs_operator(r, th, ph, internal_source, eps, max_degree):
39 """
40 Convenience function, as most of the the observations positions are similar through time (GVOs are unchanged for each satellite era),
41 a cache is used to avoid computing the same coefficients for each time step.
42 """
43 Ncoefs = max_degree * (max_degree + 2)
44 r_th_ph_coefs = np.zeros(shape=(3, Ncoefs))
45 # Compute prefactors
46 if internal_source:
47 radius_to_earth = r_earth / r
49 sin_th = math.sin(th)
50 if abs(sin_th) < eps: # theta is either O or pi
51 sin_th_0 = True
52 sign_cos_th = np.sign(math.cos(th))
53 else:
54 sin_th_0 = False
56 # Evaluate the Legendre coeffs at a given th using the Fortran function
57 plm, dplm, d2plm = eval_Plm_at_theta(th, max_degree)
58 # computed_plms[th] = plm
59 # computed_dplms[th] = dplm
61 for k in range(0, Ncoefs):
62 l, m, coef = utilities.get_degree_order(k)
63 k_legendre = (l + 1) * l // 2 + m
65 Plm_at_z = plm[k_legendre]
66 dPlm_at_z = dplm[k_legendre]
68 cos_mph = math.cos(m * ph)
69 sin_mph = math.sin(m * ph)
71 if internal_source:
72 if coef == "g":
73 r_th_ph_coefs[0, k] = (l + 1) * radius_to_earth ** (l + 2) * Plm_at_z * cos_mph
74 r_th_ph_coefs[1, k] = - radius_to_earth ** (l + 2) * dPlm_at_z * cos_mph
75 if sin_th_0:
76 assert sin_mph == 0 or Plm_at_z == 0, 'One of these values should be 0, {}, {}'.format(sin_mph, Plm_at_z)
77 r_th_ph_coefs[2, k] = radius_to_earth ** (l + 2) * m * sign_cos_th * dPlm_at_z * sin_mph
78 else:
79 r_th_ph_coefs[2, k] = radius_to_earth ** (l + 2) * m * Plm_at_z * sin_mph / sin_th
80 else:
81 r_th_ph_coefs[0, k] = (l + 1) * radius_to_earth ** (l + 2) * Plm_at_z * sin_mph
82 r_th_ph_coefs[1, k] = -radius_to_earth ** (l + 2) * dPlm_at_z * sin_mph
83 if sin_th_0:
84 assert cos_mph == 0 or Plm_at_z == 0, 'One of these values should be 0, {}, {}'.format(cos_mph, Plm_at_z)
85 r_th_ph_coefs[2, k] = -radius_to_earth ** (l + 2) * m * sign_cos_th * dPlm_at_z * cos_mph
86 else:
87 r_th_ph_coefs[2, k] = -radius_to_earth ** (l + 2) * m * Plm_at_z * cos_mph / sin_th
88 return r_th_ph_coefs
90def compute_direct_obs_operator_SOLA(r, th, ph, max_degree, internal_source=True, eps=1e-7):
91 """
92 Computes the direct observation operator H of a measure of given max degree for a list of given positions.
94 :param positions: List of the positions stored under the form [r1, th1, ph1, r2, th2, ph2 ...]
95 :type positions: list or 1D numpy.ndarray
96 :param max_degree: Maximal degree of the observed measure
97 :type max_degree: int
98 :param internal_source: whether the Gauss coefficient represent an internal or an external source (a/r or r/a dependency)
99 :type internal_source: bool
100 :return: the direct observation operator H
101 :rtype: 2D numpy.ndarray (nb_observations x nb_coefs)
102 """
104 No = r.shape[0]
105 Ncoefs = max_degree * (max_degree + 2)
107 H = np.zeros([No, Ncoefs])
109 for i in range(No):
110 H[i, :] = compute_lines_direct_obs_operator_SOLA(r[i], th[i], ph[i], internal_source, eps, max_degree)
111 return H
113def compute_lines_direct_obs_operator_SOLA(r, th, ph, internal_source, eps, max_degree):
114 Ncoefs = max_degree * (max_degree + 2)
115 r_th_ph_coefs = np.zeros(Ncoefs)
116 # Compute prefactors
117 if internal_source:
118 radius_to_earth = r_earth / r
120 # Evaluate the Legendre coeffs at a given th using the Fortran function
121 plm, dplm, d2plm = eval_Plm_at_theta(th, max_degree)
122 # computed_plms[th] = plm
123 # computed_dplms[th] = dplm
125 for k in range(0, Ncoefs):
126 l, m, coef = utilities.get_degree_order(k)
127 k_legendre = (l + 1) * l // 2 + m
129 Plm_at_z = plm[k_legendre]
131 cos_mph = math.cos(m * ph)
132 sin_mph = math.sin(m * ph)
134 if internal_source:
135 if coef == "g":
136 r_th_ph_coefs[k] = (l + 1) * radius_to_earth ** (l + 2) * Plm_at_z * cos_mph
137 else:
138 r_th_ph_coefs[k] = (l + 1) * radius_to_earth ** (l + 2) * Plm_at_z * sin_mph
139 return r_th_ph_coefs
142def sample_timed_quantity(times, X_quantity, sampling_dt):
143 """
144 Samples a timed quantity according to sampling_dt.
145 First, generates sampling times from the times array and then evaluates the X quantity at these times using interpolation.
147 :param times: times of X quantity
148 :type times: 1D numpy.array (dim: Ntimes)
149 :param X_quantity: array containing values of the quantity X at times
150 :type X_quantity: 2D numpy.array (dim: Ntimes x Ncoefs)
151 :param sampling_dt: time step to use for the sampling (in years)
152 :type sampling_dt: float
153 :return: 2-tuple containing the sampling times and the quantity X evaluated at these sampling times
154 :rtype: 1D numpy.array (dim: Nsamples), 2D numpy.array (dim: Nsamples x Ncoefs)
155 """
156 nb_coeffs = X_quantity.shape[1]
157 sampling_times = np.arange(times.min(), times.max(), sampling_dt)
158 nb_samples = len(sampling_times)
159 sampled_X = np.zeros(shape=(nb_samples, nb_coeffs))
161 # TODO Replace by scipy method ?
162 for i_coef in range(nb_coeffs):
163 sampled_X[:, i_coef] = np.interp(sampling_times, times, X_quantity[:, i_coef])
165 return sampling_times, sampled_X
168def compute_legendre_polys(tmax, Lb, Lu, Lsv):
169 """
170 Computes the coefficients of Legendre polynomials, first derivative and second derivative of B, U and SV using a wrapped Fortran function.
172 :param tmax: Number of angles for computation
173 :type tmax: int
174 :param Lb: Max degree of magnetic field
175 :type Lb: int
176 :param Lu: Max degree of core flow
177 :type Lu: int
178 :param Lsv: Max degree of secular variation
179 :type Lsv: int
180 :return: A dictionary containing the angles used for the computations and the Legendre coefs of B, U and SV.
181 :rtype: dict
182 """
184 # Compute number of coefs
185 LLb = ((Lb + 1) * (Lb + 2)) // 2
186 LLu = ((Lu + 1) * (Lu + 2)) // 2
187 LLsv = ((Lsv + 1) * (Lsv + 2)) // 2
189 # Init of arrays storing the legendre polynomials, their derivative and second derivative
190 # For magnetic field
191 lp_b = np.zeros((LLb, tmax), order='F')
192 d_lp_b = np.zeros((LLb, tmax), order='F')
193 d2_lp_b = np.zeros((LLb, tmax), order='F')
194 # For core flow
195 lp_u = np.zeros((LLu, tmax), order='F')
196 d_lp_u = np.zeros((LLu, tmax), order='F')
197 d2_lp_u = np.zeros((LLu, tmax), order='F')
198 # For secular variation
199 lp_sv = np.zeros((LLsv, tmax), order='F')
200 d_lp_sv = np.zeros((LLsv, tmax), order='F')
201 d2_lp_sv = np.zeros((LLsv, tmax), order='F')
203 # Compute Gauss-Legendre quadrature : returns tmax gauss_points in [-1, -1] with their associated weights
204 gauss_points, gauss_weights = fortran.legendre.gauleg(np.float64(-1.), np.float64(1.), tmax)
206 # For each gauss point x_i, compute legendre associated functions at x_i
207 for i, x_i in enumerate(gauss_points):
208 # Last arg of plmbar2 is 1 to have Schmidt quasi-normalisation
209 fortran.legendre.plmbar2(lp_b[:, i], d_lp_b[:, i], d2_lp_b[:, i], x_i, Lb, 1)
210 fortran.legendre.plmbar2(lp_u[:, i], d_lp_u[:, i], d2_lp_u[:, i], x_i, Lu, 1)
211 fortran.legendre.plmbar2(lp_sv[:, i], d_lp_sv[:, i], d2_lp_sv[:, i], x_i, Lsv, 1)
213 # Convert the values used for the polynomial computation in angles
214 gauss_thetas = np.arccos(gauss_points)
216 return {'thetas': gauss_thetas,
217 'weights': gauss_weights,
218 'MF': np.array((lp_b, d_lp_b, d2_lp_b)),
219 'U': np.array((lp_u, d_lp_u, d2_lp_u)),
220 'SV': np.array((lp_sv, d_lp_sv, d2_lp_sv))}
223def compute_Kalman_gain_matrix(P, H, R, use_cholesky=False):
224 """
225 Computes the Kalman gain matrix from the correlation matrix, observation operator and observation error matrix:
226 K = P*trans(H)*inv(H*P*trans(H)+R)
227 The inversion can be carried out using Cholesky decomposition.
229 :param P: Correlation matrix
230 :type P: 2D numpy.ndarray (dim: Nstate x Nstate)
231 :param H: Observation operator
232 :type H: 2D numpy.ndarray (dim: Nobs x Nstate)
233 :param R: Observation error matrix
234 :type R: 2D numpy.ndarray (dim: Nobs x Nobs)
235 :param use_cholesky: if True, matrix inversion is done using Cholesky decomposition. Uses scipy.linalg.inv otherwise.
236 :type use_cholesky: bool
237 :return: Kalman gain matrix K = P*trans(H)*inv(H*P*trans(H)+R)
238 :rtype: 2D numpy.ndarray (dim: Nstate x Nobs)
239 """
241 # Compute PHT = P*transpose(H) that is needed twice
242 PHT = np.matmul(P, np.transpose(H))
244 # Compute the Kalman gain K = P*trans(H)*inv(H*P*trans(H)+R)
245 matrix_to_invert = np.matmul(H, PHT) + R
246 if use_cholesky:
247 invert_matrix = mat_inv_using_Cholesky(matrix_to_invert)
248 return np.matmul(PHT, invert_matrix)
249 else:
250 # invert_matrix = linalg.inv(matrix_to_invert)
251 # return np.matmul(PHT, invert_matrix)
252 # Solve K = PHT*inv(H*PHT+R) <=> (H*PHT+R)^T*K^T = PHT^T
253 return np.transpose(linalg.solve(np.transpose(matrix_to_invert), np.transpose(PHT)))
256def compute_Kalman_huber(b_f, y, Pbb_inv, H, Rbb, max_steps=50):
257 """
258 Solve iteratively the least square problem with a Huber norm, the equation is written (cf Walker and jackson 2000):
259 b^k+1 = b^f + (P^{-1/2} + H^T R^{-1/2} W R^{-1/2} H)^-1 ( H^T R^{-1/2} W R^{-1/2}) (y - H b^f)
261 :param max_steps: maximum number of realisations
262 :type max_steps: int (dim: N_real x N_coefs_A)
263 """
264 sq_R = 1 / np.sqrt(np.diag(Rbb))
265 R_H = np.multiply(sq_R[:, None], H); H_R = R_H.T
266 W = np.ones(Rbb.shape[0])
267 ones_min = np.ones(Rbb.shape[0])
268 c = 1.5
269 cutoff = 1e-8 # just to avoid division by zero
270 y_minus_H_b = y - H @ b_f
272 for i in range(max_steps):
273 b_kplus1 = b_f + np.linalg.solve(H_R @ np.multiply(W[:, None], R_H) + Pbb_inv,
274 np.multiply(H_R, (W * sq_R)[None, :])) @ y_minus_H_b
275 # if difference between last iteration and this one is less than 2% of the deviation, break the loop
276 if i and np.amax(b_kplus1 - b_k) < 1e-4 * np.amax(b_k):
277 break
278 b_k = b_kplus1
279 epsilon = abs(y - H @ b_kplus1) * sq_R
280 epsilon[epsilon < cutoff] = cutoff
282 W = np.min([c / epsilon, ones_min], axis=0) # if residual is bigger than one, weight is replaced by one
283 # conditions to break the loop to make it faster
284 if np.all(W == ones_min): # then nothing will change in the loop, LS solution because no outliers
285 break
286 return b_kplus1
289def compute_Kalman_huber_parameter_basis(b_f, y, HPzzHT, PzzHT, H, Rbb, norm='huber', max_steps=50):
290 """
291 very similar to compute_Kalman_huber but written in the parameter basis.
292 Solve iteratively the least square problem with a Huber norm, the equation is written (cf Walker and jackson 2000):
293 b^k+1 = b^f + (P^{-1/2} + H^T R^{-1/2} W R^{-1/2} H)^-1 (H^T R^{-1/2} W R^{-1/2}) (y - H b^f)
295 :param max_steps: maximum number of realisations
296 :type max_steps: int (dim: N_real x N_coefs_A)
297 """
298 sq_R = np.sqrt(np.diag(Rbb))
299 W = np.ones(Rbb.shape[0])
300 ones_min = np.ones(Rbb.shape[0])
301 c = 1.5
302 cutoff = 1e-8 # just to avoid division by zero
303 y_minus_H_b = y - H @ b_f
304 if norm in ('l2', 'L2'):
305 max_steps = 1 # first step just computes least square, one iteration in the loop is fine
307 for i in range(max_steps):
308 R_k = np.diag(sq_R / W) * sq_R
309 b_kplus1 = b_f + PzzHT @ np.linalg.solve(R_k + HPzzHT, y_minus_H_b)
310 # if difference between last iteration and this one is less than 2% of the deviation, break the loop
311 epsilon = abs(y - H @ b_kplus1) / sq_R
312 epsilon[epsilon < cutoff] = cutoff
314 prev_W = W
315 W = np.min([c / epsilon, ones_min], axis=0) # if residual is bigger than c, weight is replaced by one
316 # conditions to break the loop to make it faster
317 if np.all(W == ones_min): # then nothing will change in the loop, LS solution because no outliers
318 break
320 if i and np.amax(W - prev_W) < 1e-4 * np.amax(W):
321 break
323 return b_kplus1
326def cov(x):
327 return np.cov(x, rowvar=False)
330def corr(x):
331 return np.corrcoef(x, rowvar=False)
334def compute_average(X, Nt, mode = "valid"):
335 # Perform a convolution of X by a blackman smoothing window of size Nt (NOT A PROPER AVERAGE)
336 X_avg = np.zeros((X.shape[0] - Nt + 1,X.shape[1]))
337 for i in range(X.shape[1]):
338 X_avg[:,i] = np.convolve(X[:,i], np.blackman(Nt), mode)
339 # PROPER AVERAGE = X_avg/np.sum(np.blackman(Nt))
340 return X_avg
343def compute_derivative(X,dt):
345 if X.ndim != 3:
346 raise ValueError("X must be 3D")
347 if X.shape[1]<3:
348 raise ValueError("X first dimension must be at least 3")
350 # compute second order 1st and 2nd derivatives
351 dX = (X[:,2:,:]-X[:,:-2,:])/(2*dt)
352 d2X = (X[:,2:,:]-2*X[:,1:-1,:]+X[:,:-2,:])/(dt**2)
354 return dX, d2X
356def compute_misfit(realisations, mean, inverse_weight_matrix):
357 """
358 Computes the misfit of realisations with respect to the mean.
359 The misfit is a weighted norm according to the weight_matrice (Mahalanobis distance).
360 :warning: The inverse weight matrix must be supplied
362 :param realisations: Array of realisations (dim: nb_real x nb_coefs)
363 :type realisations: np.array
364 :param mean: Mean that will be subtracted to the realisations array (dim: nb_coefs)
365 :type mean: np.array
366 :param inverse_weight_matrix: Inverse of the weight matrix (WM^{-1}, dim: nb_coefs x nb_coefs)
367 :type inverse_weight_matrix: np.array
368 :return: sqrt(1/(nb_real(nb_coefs-1))*sum[(realisations - mean)^T WM^{-1} (realisations-mean)])
369 :rtype: float
370 """
371 nb_realisations = realisations.shape[0]
372 nb_coefs = realisations.shape[1]
374 # Regular distance D (this step will fail if the dimensions are not matching)
375 distance = realisations - mean
377 # Compute the sum over realisations (assumed axis=0) of D^T * WM^{-1} * D that is the squared weighted norm.
378 non_norm_squared_norm = 0
379 for k in range(0, nb_realisations):
380 non_norm_squared_norm += distance[k].T @ (inverse_weight_matrix @ distance[k])
381 return np.sqrt(1./(nb_realisations * (nb_coefs - 1)) * non_norm_squared_norm)
384def get_BLUE(X, Y, P, H, R, K=None, cholesky=False):
385 """
386 Returns the Best Linear Unbiased Estimate (BLUE) for X from the observations Y and the covariances matrices.
388 :param X: current state vector
389 :type X: 1D numpy.ndarray (dim: Nstate)
390 :param Y: observation vector
391 :type Y: 1D numpy.ndarray (dim: Nobs)
392 :param P: covariance of forecast priors
393 :type P: 2D numpy.ndarray (dim: Nstate x Nstate)
394 :param H: Observation operator
395 :type H: 2D numpy.ndarray (dim: Nobs x Nstate)
396 :param R: covariance of observations errors
397 :type R: 2D numpy.ndarray (dim: Nobs x Nobs)
398 :param K: Kalman gain matrix. Optionnal: if not supplied, will be computed from other matrices.
399 :type K: 2D numpy.ndarray (dim: Nstate x Nobs)
400 :param cholesky: if True, uses Cholesky decomposition to compute matrices' inverses
401 :type cholesky: bool
402 :return: Best Linear Unbiased Estimate of X
403 :rtype: 1D numpy.ndarray (dim: Nstate)
404 """
406 # Compute the innovation vector first
407 D = Y - np.matmul(H, X)
409 # Compute Kalman gain matrix if not provided
410 if K is None:
411 K = compute_Kalman_gain_matrix(P, H, R, use_cholesky=cholesky)
413 return X + np.matmul(K, D)
416def mat_inv_using_Cholesky(M):
417 """
418 Returns the inverse of a matrix computed by Cholesky decomposition
420 :param M: a square positive-definite matrix
421 :type M: ndarray
422 :return: Inverse of M
423 :rtype: ndarray of the dimensions of M
424 """
425 if linalg.det(M) == 0:
426 raise np.linalg.LinAlgError('The inversion matrix method using Cholesky was called on an non-invertible matrix: {}!'.format(M))
428 LCMat_tuple = linalg.cho_factor(M, lower=True)
429 return linalg.cho_solve(LCMat_tuple, np.identity(M.shape[0]))
431def eval_Plm_at_theta(theta, lmax, norm=1):
432 """
433 Evaluates the coeffs of the legendre polynomials (and 1st, and 2nd derivative) at a given theta.
435 :param theta: angle where to evaluate the Legendre polys (in rad)
436 :type theta: float
437 :param lmax: Max degree described by the Legendre polys
438 :type lmax: int
439 :param norm: Optional arg to set the normalisation of the polys (1=Schmidt semi-norm (default), 2=full norm)
440 :type norm: int
441 :return: 3 numpy.array with the coefs of the Legendre polys, of its 1st derivative and of its 2nd derivative
442 :rtype: np.array, np.array, np.array
443 """
445 nb_legendre_coefs = int((lmax+1)*(lmax+2)/2)
447 coefs_plm = np.zeros(nb_legendre_coefs)
448 coefs_dplm = np.zeros_like(coefs_plm)
449 coefs_d2plm = np.zeros_like(coefs_plm)
451 fortran.legendre.plmbar2(coefs_plm, coefs_dplm, coefs_d2plm, math.cos(theta), lmax, norm)
453 return coefs_plm, coefs_dplm, coefs_d2plm
456def compute_diag_AR1_coefs(cov_U, cov_ER, Tau_U, Tau_E):
457 """
458 Computes the matrices for a diagonal AR-1 process
460 :param correlation_matrix: Correlation matrix of the AR-1 quantity
461 :type correlation_matrix: np.array (dim: N x N)
462 :param ar_timestep: Timestep of the AR-1 process in years (also called Tau)
463 :type ar_timestep: float
464 :return: The drift matrix and Cholesky lower matrix of the correlation matrix
465 :rtype: np.array (dim: N x N), np.array (dim: N x N)
466 """
468 if not cov_U.ndim == 2:
469 raise ValueError("cov_U must be 2D but is {}D".format(cov_U.ndim))
471 Nu = cov_U.shape[0]
472 Ner = cov_ER.shape[0]
473 Nz = Nu + Ner
474 diag = np.ones((Nz))
475 diag[:Nu] = 1/Tau_U
476 diag[Nu:] = 1/Tau_E
477 A = np.diag(diag)
479 Chol_U = np.sqrt(2. / Tau_U) * linalg.cholesky(cov_U, lower=True)
480 Chol_ER = np.sqrt(2. / Tau_E) * linalg.cholesky(cov_ER, lower=True)
481 Chol = linalg.block_diag(Chol_U, Chol_ER)
483 return A, Chol
486def prep_AR_matrix(Z, dt_samp, Nt):
487 """
488 Compute time derivatives
490 :Z param: state array
491 :Z type: ndarray (Ntimes x Ncoeffs)
492 :dt_samp param: time step
493 :dt_samp type: float
494 :Nt param: average window size
495 :Nt type: int
496 :X return: X
497 :X rtype: ndarray (Ntimes x Ncoeffs)
498 :dX return: first time derivative X
499 :dX rtype: ndarray (Ntimes x Ncoeffs)
500 :d2X return: second time derivative X
501 :d2X rtype: ndarray (Ntimes x Ncoeffs)
502 :d3X return: third time derivative X
503 :d3X rtype: ndarray (Ntimes x Ncoeffs)
504 """
506 if not Z.ndim == 2:
507 raise ValueError("Z must be 2D but is {}D".format(Z.ndim))
508 if Z.shape[0] < 4:
509 raise ValueError("First dimension of Z must be at least 4 but is {}".format(Z.shape[0]))
511 x0 = Z[:-3,:]
512 x1 = Z[1:-2,:]
513 x2 = Z[2:-1,:]
514 x3 = Z[3:,:]
516 # compute dX d2X d3X derivatives
517 dX = (x1-x0)/dt_samp
518 d2X = (x2-2*x1+x0)/dt_samp**2
519 d3X = (x3-3*x2+3*x1-x0)/dt_samp**3
521 # compute average (not divided by smoothing window total weigth !!!)
522 # requires to be divided by the smoothing window total weigth to get a proper average
523 x0 = compute_average(x0, Nt)
524 dX = compute_average(dX, Nt)
525 d2X = compute_average(d2X, Nt)
526 d3X = compute_average(d3X, Nt)
528 if not x0.shape == dX.shape == d2X.shape == d3X.shape:
529 raise ValueError("x0, dX, d2X and d3X must be the same shape")
531 return x0, dX, d2X, d3X
534def compute_AR_coefs_avg(container, AR_type):
535 """
536 Compute the dense AR coefs:
537 A,Chol if AR1
538 A,B,C,Chol if AR3
540 :param container: list containing the stored variables [X, dX, dt_sampling] for every prior.
541 :type container: list
542 :param AR_type: type of AR process
543 :type AR_type: str
544 """
546 for i in range(len(container)):
547 if not container[i][0].shape == container[i][1].shape:
548 raise ValueError("X, dX must be the same shape")
549 if not container[i][0].ndim == 2:
550 raise ValueError("Z must be 2D but is {}D".format(container[i][0].ndim))
552 if AR_type == "AR1":
553 NZ = container[0][0].shape[0]
554 tmp1 = np.zeros((NZ,NZ))
555 tmp2 = np.zeros((NZ,NZ))
556 elif AR_type == "AR3":
557 NZ = container[0][0].shape[0] // 3
558 tmp1 = np.zeros((3*NZ,3*NZ))
559 tmp2 = np.zeros((3*NZ,3*NZ))
561 Dt = container[0][2]
563 # compute AA
564 NT = 0
565 for i in range(len(container)):
566 X = container[i][0]
567 dX = container[i][1]
568 Nt = container[i][3]
569 Mt = np.sum(np.power(np.blackman(Nt),2))
570 NT += round(X.shape[1]/Nt)
571 XXT = X @ X.T / (Mt*Nt)
572 dXXT = dX @ X.T / (Mt*Nt)
573 tmp1 += dXXT
574 tmp2 += XXT
576 AA = -1/Dt * tmp1 @ np.linalg.inv(tmp2)
578 # compute WWT
579 WWT = []
580 for i in range(len(container)):
581 X = container[i][0]
582 dX = container[i][1]
583 dt = container[i][2]
584 Nt = container[i][3]
585 Mt = np.sum(np.power(np.blackman(Nt),2))
586 W = (dX + dt * AA @ X)/np.sqrt(dt)
587 WWT.append(W @ W.T / (Mt*Nt))
589 if AR_type == "AR1":
590 S = (sum(WWT) / NT)
591 L_S = np.linalg.cholesky(S) #Warning:: return the LOWER triangle of the Choleski decomposition!
592 return AA.T, L_S
593 elif AR_type == "AR3":
594 C = AA[2*NZ:3*NZ,:NZ]
595 B = AA[2*NZ:3*NZ,NZ:2*NZ]
596 A = AA[2*NZ:3*NZ,2*NZ:3*NZ]
597 S = (sum(WWT) / NT) [2*NZ:3*NZ,2*NZ:3*NZ]
598 L_S = np.linalg.cholesky(S) #Warning:: return the LOWER triangle of the Choleski decomposition!
599 return A.T, B.T, C.T, L_S
602def compute_AR1_coefs_forecast(A, Chol, dt_forecast, Ncoef):
603 """
604 Compute the forecast coeffcients A, and Chol for AR-1 process
605 """
607 if not (A.shape == Chol.shape == (Ncoef,Ncoef)):
608 raise ValueError("A, Chol matrices must be the same shape")
609 if not np.allclose(Chol, np.tril(Chol)):
610 raise ValueError('Cholesky matrix supplied in AR-1 is not a lower triangular matrix ! Did you supply the upper Cholesky matrix instead ?')
612 # matrices pour AR1 schéma décentré
613 Id = np.identity(Ncoef)
614 A_forecast = dt_forecast*A-Id
615 Chol_forecast = dt_forecast**(1/2) * Chol
617 return A_forecast, Chol_forecast
620def compute_AR3_coefs_forecast(A, B, C, Chol, dt_forecast, Ncoef):
621 """
622 Compute the forecast coeffcients A, B, C and Chol for AR-3 process
623 """
625 if not (A.shape == B.shape == C.shape == Chol.shape == (Ncoef,Ncoef)):
626 raise ValueError("A, B, C, Chol matrices must have the same shape")
627 if not np.allclose(Chol, np.tril(Chol)):
628 raise ValueError('Cholesky matrix supplied in AR-3 is not a lower triangular matrix ! Did you supply the upper Cholesky matrix instead ?')
630 # matrices pour AR3 schéma décentré
631 Id = np.identity(Ncoef)
632 A_forecast = dt_forecast*A-3*Id
633 B_forecast = (dt_forecast**2)*B - 2*dt_forecast*A + 3*Id
634 C_forecast = (dt_forecast**3)*C - (dt_forecast**2)*B + dt_forecast*A - Id
635 Chol_forecast = dt_forecast**(5/2) * Chol
637 return A_forecast, B_forecast, C_forecast, Chol_forecast
640def ar1_process(X, A, Chol, random_state=None, check_Cholesky=True):
641 """
642 Applies an Auto-Regressive process of order 1 to the augmented state Z.
644 X1 = - X0 @ A + Chol @ normal_noise
646 :param X: quantity on which the AR-1 process will be applied (X0 above)
647 :type X: 1D numpy.ndarray (dim: Nz)
648 :param A: AR-1 operator (can be diagonal or dense)
649 :type A: 2D numpy.ndarray (dim: Nz x Nz)
650 :param random_state: RandomState to use for normal distribution draw. If None (default), draws will be done with np.random.normal.
651 :type: None or numpy.random.RandomState
652 :param check_Cholesky: If True (default), checks that the Cholesky is lower triangular. Setting to False may enhance performance.
653 :type: bool
654 :return: vector containing the result of the AR-1 process (X1 above)
655 :rtype: 1D numpy.ndarray (dim: Nz)
656 """
658 Ncoef = X.shape[0]
660 if X.ndim != 1:
661 raise ValueError('Input quantity in AR-1 process should be 1D ! Got {} dims instead'.format(X.ndim))
662 if A.shape != (Ncoef, Ncoef):
663 raise ValueError('A matrix in AR-1 should have dimensions ({0},{0}). Got {1} instead'.format(Ncoef, A.shape))
664 if check_Cholesky:
665 if not np.allclose(Chol, np.tril(Chol)):
666 raise ValueError('Cholesky matrix supplied in AR-1 is not a lower triangular matrix ! Did you supply the upper Cholesky matrix instead ?')
668 # Returns samples of the same size as qty from normal distribution with zero mean and unit variance N(0,1)
669 if random_state is not None:
670 normal_noise = random_state.normal(0, 1, size=Ncoef)
671 else:
672 normal_noise = np.random.normal(0, 1, size=Ncoef)
674 #build a scaled noise
675 scaled_noise = Chol @ normal_noise
677 # Compute qty at t+1 from qty at t and the scaled noise (Euler-Maruyama)
678 return - X @ A + scaled_noise
681def ar3_process(X, A, B, C, Chol, random_state=None, check_Cholesky=True):
682 """
683 Applies an Auto-Regressive process of order 3 to the augmented state Z.
685 X3 = - X0 @ A - X1 @ B - X2 @ C + Chol @ normal_noise
687 :param X: quantity on which the AR-3 process will be applied (X0, X1, X2 above)
688 :type X: 2D numpy.ndarray (dim: 3 x Nz)
689 :param A: AR-3 operator (must be dense)
690 :type A: 2D numpy.ndarray (dim: Nz x Nz)
691 :param B: AR-3 operator (must be dense)
692 :type B: 2D numpy.ndarray (dim: Nz x Nz)
693 :param C: AR-3 operator (must be dense)
694 :type C: 2D numpy.ndarray (dim: Nz x Nz)
695 :param random_state: RandomState to use for normal distribution draw. If None (default), draws will be done with np.random.normal.
696 :type: None or numpy.random.RandomState
697 :param check_Cholesky: If True (default), checks that the Cholesky is lower triangular. Setting to False may enhance performance.
698 :type: bool
699 :return: vector containing the result of the AR-3 process (X3 above)
700 :rtype: 1D numpy.ndarray (dim: Nz)
701 """
703 Ncoef = X.shape[-1]
705 if X.ndim != 2:
706 raise ValueError('Input quantity in AR-3 process should be 2D ! Got {} dims instead'.format(X.ndim))
707 if X.shape[0] != 3:
708 raise ValueError('First dimension of input quantity in AR-3 process should be 3 ! Got {} dims instead'.format(X.shape[0]))
709 if A.shape != (Ncoef, Ncoef):
710 raise ValueError('A matrix in AR-3 should have dimensions ({0},{0}). Got {1} instead'.format(Ncoef, A.shape))
711 if B.shape != (Ncoef, Ncoef):
712 raise ValueError('B matrix in AR-3 should have dimensions ({0},{0}). Got {1} instead'.format(Ncoef, B.shape))
713 if C.shape != (Ncoef, Ncoef):
714 raise ValueError('C matrix in AR-3 should have dimensions ({0},{0}). Got {1} instead'.format(Ncoef, C.shape))
715 if check_Cholesky:
716 if not np.allclose(Chol, np.tril(Chol)):
717 raise ValueError('Cholesky matrix supplied in AR-3 is not a lower triangular matrix ! Did you supply the upper Cholesky matrix instead ?')
719 # Returns samples of the same size as qty from normal distribution with zero mean and unit variance N(0,1)
720 if random_state is not None:
721 normal_noise = random_state.normal(0, 1, size=Ncoef)
722 else:
723 normal_noise = np.random.normal(0, 1, size=Ncoef)
725 #build a scaled noise
726 scaled_noise = Chol @ normal_noise
728 # Compute X3 from X0, X1, X2 and the scaled noise (Euler-Maruyama)
729 return - X[2,:] @ A - X[1,:] @ B - X[0,:] @ C + scaled_noise