Coverage for pygeodyn/common.py: 89%

325 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-12-22 13:43 +0000

1#!/usr/bin/python3 

2 

3""" Functions that are common to all algo steps. """ 

4 

5import math 

6import numpy as np 

7from . import utilities 

8from scipy import linalg 

9from .constants import r_earth 

10import geodyn_fortran as fortran 

11 

12def compute_direct_obs_operator(positions, max_degree, internal_source=True, eps=1e-7): 

13 """ 

14 Computes the direct observation operator H of a measure of given max degree for a list of given positions. 

15 

16 :param positions: List of the positions stored under the form [r1, th1, ph1, r2, th2, ph2 ...] 

17 :type positions: list or 1D numpy.ndarray 

18 :param max_degree: Maximal degree of the observed measure 

19 :type max_degree: int 

20 :param internal_source: whether the Gauss coefficient represent an internal or an external source (a/r or r/a dependency) 

21 :type internal_source: bool 

22 :return: the direct observation operator H 

23 :rtype: 2D numpy.ndarray (nb_observations x nb_coefs) 

24 """ 

25 

26 No = len(positions) 

27 Ncoefs = max_degree * (max_degree + 2) 

28 

29 H = np.zeros([No, Ncoefs]) 

30 

31 for i in range(0, No, 3): 

32 r = positions[i] 

33 th = positions[i + 1] 

34 ph = positions[i + 2] 

35 H[i:i+3, :] = compute_lines_direct_obs_operator(r, th, ph, internal_source, eps, max_degree) 

36 return H 

37 

38def compute_lines_direct_obs_operator(r, th, ph, internal_source, eps, max_degree): 

39 """ 

40 Convenience function, as most of the the observations positions are similar through time (GVOs are unchanged for each satellite era), 

41 a cache is used to avoid computing the same coefficients for each time step. 

42 """ 

43 Ncoefs = max_degree * (max_degree + 2) 

44 r_th_ph_coefs = np.zeros(shape=(3, Ncoefs)) 

45 # Compute prefactors 

46 if internal_source: 

47 radius_to_earth = r_earth / r 

48 

49 sin_th = math.sin(th) 

50 if abs(sin_th) < eps: # theta is either O or pi 

51 sin_th_0 = True 

52 sign_cos_th = np.sign(math.cos(th)) 

53 else: 

54 sin_th_0 = False 

55 

56 # Evaluate the Legendre coeffs at a given th using the Fortran function 

57 plm, dplm, d2plm = eval_Plm_at_theta(th, max_degree) 

58 # computed_plms[th] = plm 

59 # computed_dplms[th] = dplm 

60 

61 for k in range(0, Ncoefs): 

62 l, m, coef = utilities.get_degree_order(k) 

63 k_legendre = (l + 1) * l // 2 + m 

64 

65 Plm_at_z = plm[k_legendre] 

66 dPlm_at_z = dplm[k_legendre] 

67 

68 cos_mph = math.cos(m * ph) 

69 sin_mph = math.sin(m * ph) 

70 

71 if internal_source: 

72 if coef == "g": 

73 r_th_ph_coefs[0, k] = (l + 1) * radius_to_earth ** (l + 2) * Plm_at_z * cos_mph 

74 r_th_ph_coefs[1, k] = - radius_to_earth ** (l + 2) * dPlm_at_z * cos_mph 

75 if sin_th_0: 

76 assert sin_mph == 0 or Plm_at_z == 0, 'One of these values should be 0, {}, {}'.format(sin_mph, Plm_at_z) 

77 r_th_ph_coefs[2, k] = radius_to_earth ** (l + 2) * m * sign_cos_th * dPlm_at_z * sin_mph 

78 else: 

79 r_th_ph_coefs[2, k] = radius_to_earth ** (l + 2) * m * Plm_at_z * sin_mph / sin_th 

80 else: 

81 r_th_ph_coefs[0, k] = (l + 1) * radius_to_earth ** (l + 2) * Plm_at_z * sin_mph 

82 r_th_ph_coefs[1, k] = -radius_to_earth ** (l + 2) * dPlm_at_z * sin_mph 

83 if sin_th_0: 

84 assert cos_mph == 0 or Plm_at_z == 0, 'One of these values should be 0, {}, {}'.format(cos_mph, Plm_at_z) 

85 r_th_ph_coefs[2, k] = -radius_to_earth ** (l + 2) * m * sign_cos_th * dPlm_at_z * cos_mph 

86 else: 

87 r_th_ph_coefs[2, k] = -radius_to_earth ** (l + 2) * m * Plm_at_z * cos_mph / sin_th 

88 return r_th_ph_coefs 

89 

90def compute_direct_obs_operator_SOLA(r, th, ph, max_degree, internal_source=True, eps=1e-7): 

91 """ 

92 Computes the direct observation operator H of a measure of given max degree for a list of given positions. 

93 

94 :param positions: List of the positions stored under the form [r1, th1, ph1, r2, th2, ph2 ...] 

95 :type positions: list or 1D numpy.ndarray 

96 :param max_degree: Maximal degree of the observed measure 

97 :type max_degree: int 

98 :param internal_source: whether the Gauss coefficient represent an internal or an external source (a/r or r/a dependency) 

99 :type internal_source: bool 

100 :return: the direct observation operator H 

101 :rtype: 2D numpy.ndarray (nb_observations x nb_coefs) 

102 """ 

103 

104 No = r.shape[0] 

105 Ncoefs = max_degree * (max_degree + 2) 

106 

107 H = np.zeros([No, Ncoefs]) 

108 

109 for i in range(No): 

110 H[i, :] = compute_lines_direct_obs_operator_SOLA(r[i], th[i], ph[i], internal_source, eps, max_degree) 

111 return H 

112 

113def compute_lines_direct_obs_operator_SOLA(r, th, ph, internal_source, eps, max_degree): 

114 Ncoefs = max_degree * (max_degree + 2) 

115 r_th_ph_coefs = np.zeros(Ncoefs) 

116 # Compute prefactors 

117 if internal_source: 

118 radius_to_earth = r_earth / r 

119 

120 # Evaluate the Legendre coeffs at a given th using the Fortran function 

121 plm, dplm, d2plm = eval_Plm_at_theta(th, max_degree) 

122 # computed_plms[th] = plm 

123 # computed_dplms[th] = dplm 

124 

125 for k in range(0, Ncoefs): 

126 l, m, coef = utilities.get_degree_order(k) 

127 k_legendre = (l + 1) * l // 2 + m 

128 

129 Plm_at_z = plm[k_legendre] 

130 

131 cos_mph = math.cos(m * ph) 

132 sin_mph = math.sin(m * ph) 

133 

134 if internal_source: 

135 if coef == "g": 

136 r_th_ph_coefs[k] = (l + 1) * radius_to_earth ** (l + 2) * Plm_at_z * cos_mph 

137 else: 

138 r_th_ph_coefs[k] = (l + 1) * radius_to_earth ** (l + 2) * Plm_at_z * sin_mph 

139 return r_th_ph_coefs 

140 

141 

142def sample_timed_quantity(times, X_quantity, sampling_dt): 

143 """ 

144 Samples a timed quantity according to sampling_dt. 

145 First, generates sampling times from the times array and then evaluates the X quantity at these times using interpolation. 

146 

147 :param times: times of X quantity 

148 :type times: 1D numpy.array (dim: Ntimes) 

149 :param X_quantity: array containing values of the quantity X at times 

150 :type X_quantity: 2D numpy.array (dim: Ntimes x Ncoefs) 

151 :param sampling_dt: time step to use for the sampling (in years) 

152 :type sampling_dt: float 

153 :return: 2-tuple containing the sampling times and the quantity X evaluated at these sampling times 

154 :rtype: 1D numpy.array (dim: Nsamples), 2D numpy.array (dim: Nsamples x Ncoefs) 

155 """ 

156 nb_coeffs = X_quantity.shape[1] 

157 sampling_times = np.arange(times.min(), times.max(), sampling_dt) 

158 nb_samples = len(sampling_times) 

159 sampled_X = np.zeros(shape=(nb_samples, nb_coeffs)) 

160 

161 # TODO Replace by scipy method ? 

162 for i_coef in range(nb_coeffs): 

163 sampled_X[:, i_coef] = np.interp(sampling_times, times, X_quantity[:, i_coef]) 

164 

165 return sampling_times, sampled_X 

166 

167 

168def compute_legendre_polys(tmax, Lb, Lu, Lsv): 

169 """ 

170 Computes the coefficients of Legendre polynomials, first derivative and second derivative of B, U and SV using a wrapped Fortran function. 

171 

172 :param tmax: Number of angles for computation 

173 :type tmax: int 

174 :param Lb: Max degree of magnetic field 

175 :type Lb: int 

176 :param Lu: Max degree of core flow 

177 :type Lu: int 

178 :param Lsv: Max degree of secular variation 

179 :type Lsv: int 

180 :return: A dictionary containing the angles used for the computations and the Legendre coefs of B, U and SV. 

181 :rtype: dict 

182 """ 

183 

184 # Compute number of coefs 

185 LLb = ((Lb + 1) * (Lb + 2)) // 2 

186 LLu = ((Lu + 1) * (Lu + 2)) // 2 

187 LLsv = ((Lsv + 1) * (Lsv + 2)) // 2 

188 

189 # Init of arrays storing the legendre polynomials, their derivative and second derivative 

190 # For magnetic field 

191 lp_b = np.zeros((LLb, tmax), order='F') 

192 d_lp_b = np.zeros((LLb, tmax), order='F') 

193 d2_lp_b = np.zeros((LLb, tmax), order='F') 

194 # For core flow 

195 lp_u = np.zeros((LLu, tmax), order='F') 

196 d_lp_u = np.zeros((LLu, tmax), order='F') 

197 d2_lp_u = np.zeros((LLu, tmax), order='F') 

198 # For secular variation 

199 lp_sv = np.zeros((LLsv, tmax), order='F') 

200 d_lp_sv = np.zeros((LLsv, tmax), order='F') 

201 d2_lp_sv = np.zeros((LLsv, tmax), order='F') 

202 

203 # Compute Gauss-Legendre quadrature : returns tmax gauss_points in [-1, -1] with their associated weights 

204 gauss_points, gauss_weights = fortran.legendre.gauleg(np.float64(-1.), np.float64(1.), tmax) 

205 

206 # For each gauss point x_i, compute legendre associated functions at x_i 

207 for i, x_i in enumerate(gauss_points): 

208 # Last arg of plmbar2 is 1 to have Schmidt quasi-normalisation 

209 fortran.legendre.plmbar2(lp_b[:, i], d_lp_b[:, i], d2_lp_b[:, i], x_i, Lb, 1) 

210 fortran.legendre.plmbar2(lp_u[:, i], d_lp_u[:, i], d2_lp_u[:, i], x_i, Lu, 1) 

211 fortran.legendre.plmbar2(lp_sv[:, i], d_lp_sv[:, i], d2_lp_sv[:, i], x_i, Lsv, 1) 

212 

213 # Convert the values used for the polynomial computation in angles 

214 gauss_thetas = np.arccos(gauss_points) 

215 

216 return {'thetas': gauss_thetas, 

217 'weights': gauss_weights, 

218 'MF': np.array((lp_b, d_lp_b, d2_lp_b)), 

219 'U': np.array((lp_u, d_lp_u, d2_lp_u)), 

220 'SV': np.array((lp_sv, d_lp_sv, d2_lp_sv))} 

221 

222 

223def compute_Kalman_gain_matrix(P, H, R, use_cholesky=False): 

224 """ 

225 Computes the Kalman gain matrix from the correlation matrix, observation operator and observation error matrix: 

226 K = P*trans(H)*inv(H*P*trans(H)+R) 

227 The inversion can be carried out using Cholesky decomposition. 

228 

229 :param P: Correlation matrix 

230 :type P: 2D numpy.ndarray (dim: Nstate x Nstate) 

231 :param H: Observation operator 

232 :type H: 2D numpy.ndarray (dim: Nobs x Nstate) 

233 :param R: Observation error matrix 

234 :type R: 2D numpy.ndarray (dim: Nobs x Nobs) 

235 :param use_cholesky: if True, matrix inversion is done using Cholesky decomposition. Uses scipy.linalg.inv otherwise. 

236 :type use_cholesky: bool 

237 :return: Kalman gain matrix K = P*trans(H)*inv(H*P*trans(H)+R) 

238 :rtype: 2D numpy.ndarray (dim: Nstate x Nobs) 

239 """ 

240 

241 # Compute PHT = P*transpose(H) that is needed twice 

242 PHT = np.matmul(P, np.transpose(H)) 

243 

244 # Compute the Kalman gain K = P*trans(H)*inv(H*P*trans(H)+R) 

245 matrix_to_invert = np.matmul(H, PHT) + R 

246 if use_cholesky: 

247 invert_matrix = mat_inv_using_Cholesky(matrix_to_invert) 

248 return np.matmul(PHT, invert_matrix) 

249 else: 

250 # invert_matrix = linalg.inv(matrix_to_invert) 

251 # return np.matmul(PHT, invert_matrix) 

252 # Solve K = PHT*inv(H*PHT+R) <=> (H*PHT+R)^T*K^T = PHT^T 

253 return np.transpose(linalg.solve(np.transpose(matrix_to_invert), np.transpose(PHT))) 

254 

255 

256def compute_Kalman_huber(b_f, y, Pbb_inv, H, Rbb, max_steps=50): 

257 """ 

258 Solve iteratively the least square problem with a Huber norm, the equation is written (cf Walker and jackson 2000): 

259 b^k+1 = b^f + (P^{-1/2} + H^T R^{-1/2} W R^{-1/2} H)^-1 ( H^T R^{-1/2} W R^{-1/2}) (y - H b^f) 

260 

261 :param max_steps: maximum number of realisations 

262 :type max_steps: int (dim: N_real x N_coefs_A) 

263 """ 

264 sq_R = 1 / np.sqrt(np.diag(Rbb)) 

265 R_H = np.multiply(sq_R[:, None], H); H_R = R_H.T 

266 W = np.ones(Rbb.shape[0]) 

267 ones_min = np.ones(Rbb.shape[0]) 

268 c = 1.5 

269 cutoff = 1e-8 # just to avoid division by zero 

270 y_minus_H_b = y - H @ b_f 

271 

272 for i in range(max_steps): 

273 b_kplus1 = b_f + np.linalg.solve(H_R @ np.multiply(W[:, None], R_H) + Pbb_inv, 

274 np.multiply(H_R, (W * sq_R)[None, :])) @ y_minus_H_b 

275 # if difference between last iteration and this one is less than 2% of the deviation, break the loop 

276 if i and np.amax(b_kplus1 - b_k) < 1e-4 * np.amax(b_k): 

277 break 

278 b_k = b_kplus1 

279 epsilon = abs(y - H @ b_kplus1) * sq_R 

280 epsilon[epsilon < cutoff] = cutoff 

281 

282 W = np.min([c / epsilon, ones_min], axis=0) # if residual is bigger than one, weight is replaced by one 

283 # conditions to break the loop to make it faster 

284 if np.all(W == ones_min): # then nothing will change in the loop, LS solution because no outliers 

285 break 

286 return b_kplus1 

287 

288 

289def compute_Kalman_huber_parameter_basis(b_f, y, HPzzHT, PzzHT, H, Rbb, norm='huber', max_steps=50): 

290 """ 

291 very similar to compute_Kalman_huber but written in the parameter basis. 

292 Solve iteratively the least square problem with a Huber norm, the equation is written (cf Walker and jackson 2000): 

293 b^k+1 = b^f + (P^{-1/2} + H^T R^{-1/2} W R^{-1/2} H)^-1 (H^T R^{-1/2} W R^{-1/2}) (y - H b^f) 

294 

295 :param max_steps: maximum number of realisations 

296 :type max_steps: int (dim: N_real x N_coefs_A) 

297 """ 

298 sq_R = np.sqrt(np.diag(Rbb)) 

299 W = np.ones(Rbb.shape[0]) 

300 ones_min = np.ones(Rbb.shape[0]) 

301 c = 1.5 

302 cutoff = 1e-8 # just to avoid division by zero 

303 y_minus_H_b = y - H @ b_f 

304 if norm in ('l2', 'L2'): 

305 max_steps = 1 # first step just computes least square, one iteration in the loop is fine 

306 

307 for i in range(max_steps): 

308 R_k = np.diag(sq_R / W) * sq_R 

309 b_kplus1 = b_f + PzzHT @ np.linalg.solve(R_k + HPzzHT, y_minus_H_b) 

310 # if difference between last iteration and this one is less than 2% of the deviation, break the loop 

311 epsilon = abs(y - H @ b_kplus1) / sq_R 

312 epsilon[epsilon < cutoff] = cutoff 

313 

314 prev_W = W 

315 W = np.min([c / epsilon, ones_min], axis=0) # if residual is bigger than c, weight is replaced by one 

316 # conditions to break the loop to make it faster 

317 if np.all(W == ones_min): # then nothing will change in the loop, LS solution because no outliers 

318 break 

319 

320 if i and np.amax(W - prev_W) < 1e-4 * np.amax(W): 

321 break 

322 

323 return b_kplus1 

324 

325 

326def cov(x): 

327 return np.cov(x, rowvar=False) 

328 

329 

330def corr(x): 

331 return np.corrcoef(x, rowvar=False) 

332 

333 

334def compute_average(X, Nt, mode = "valid"): 

335 # Perform a convolution of X by a blackman smoothing window of size Nt (NOT A PROPER AVERAGE) 

336 X_avg = np.zeros((X.shape[0] - Nt + 1,X.shape[1])) 

337 for i in range(X.shape[1]): 

338 X_avg[:,i] = np.convolve(X[:,i], np.blackman(Nt), mode) 

339 # PROPER AVERAGE = X_avg/np.sum(np.blackman(Nt)) 

340 return X_avg 

341 

342 

343def compute_derivative(X,dt): 

344 

345 if X.ndim != 3: 

346 raise ValueError("X must be 3D") 

347 if X.shape[1]<3: 

348 raise ValueError("X first dimension must be at least 3") 

349 

350 # compute second order 1st and 2nd derivatives 

351 dX = (X[:,2:,:]-X[:,:-2,:])/(2*dt) 

352 d2X = (X[:,2:,:]-2*X[:,1:-1,:]+X[:,:-2,:])/(dt**2) 

353 

354 return dX, d2X 

355 

356def compute_misfit(realisations, mean, inverse_weight_matrix): 

357 """ 

358 Computes the misfit of realisations with respect to the mean. 

359 The misfit is a weighted norm according to the weight_matrice (Mahalanobis distance). 

360 :warning: The inverse weight matrix must be supplied 

361 

362 :param realisations: Array of realisations (dim: nb_real x nb_coefs) 

363 :type realisations: np.array 

364 :param mean: Mean that will be subtracted to the realisations array (dim: nb_coefs) 

365 :type mean: np.array 

366 :param inverse_weight_matrix: Inverse of the weight matrix (WM^{-1}, dim: nb_coefs x nb_coefs) 

367 :type inverse_weight_matrix: np.array 

368 :return: sqrt(1/(nb_real(nb_coefs-1))*sum[(realisations - mean)^T WM^{-1} (realisations-mean)]) 

369 :rtype: float 

370 """ 

371 nb_realisations = realisations.shape[0] 

372 nb_coefs = realisations.shape[1] 

373 

374 # Regular distance D (this step will fail if the dimensions are not matching) 

375 distance = realisations - mean 

376 

377 # Compute the sum over realisations (assumed axis=0) of D^T * WM^{-1} * D that is the squared weighted norm. 

378 non_norm_squared_norm = 0 

379 for k in range(0, nb_realisations): 

380 non_norm_squared_norm += distance[k].T @ (inverse_weight_matrix @ distance[k]) 

381 return np.sqrt(1./(nb_realisations * (nb_coefs - 1)) * non_norm_squared_norm) 

382 

383 

384def get_BLUE(X, Y, P, H, R, K=None, cholesky=False): 

385 """ 

386 Returns the Best Linear Unbiased Estimate (BLUE) for X from the observations Y and the covariances matrices. 

387 

388 :param X: current state vector 

389 :type X: 1D numpy.ndarray (dim: Nstate) 

390 :param Y: observation vector 

391 :type Y: 1D numpy.ndarray (dim: Nobs) 

392 :param P: covariance of forecast priors 

393 :type P: 2D numpy.ndarray (dim: Nstate x Nstate) 

394 :param H: Observation operator 

395 :type H: 2D numpy.ndarray (dim: Nobs x Nstate) 

396 :param R: covariance of observations errors 

397 :type R: 2D numpy.ndarray (dim: Nobs x Nobs) 

398 :param K: Kalman gain matrix. Optionnal: if not supplied, will be computed from other matrices. 

399 :type K: 2D numpy.ndarray (dim: Nstate x Nobs) 

400 :param cholesky: if True, uses Cholesky decomposition to compute matrices' inverses 

401 :type cholesky: bool 

402 :return: Best Linear Unbiased Estimate of X 

403 :rtype: 1D numpy.ndarray (dim: Nstate) 

404 """ 

405 

406 # Compute the innovation vector first 

407 D = Y - np.matmul(H, X) 

408 

409 # Compute Kalman gain matrix if not provided 

410 if K is None: 

411 K = compute_Kalman_gain_matrix(P, H, R, use_cholesky=cholesky) 

412 

413 return X + np.matmul(K, D) 

414 

415 

416def mat_inv_using_Cholesky(M): 

417 """ 

418 Returns the inverse of a matrix computed by Cholesky decomposition 

419 

420 :param M: a square positive-definite matrix 

421 :type M: ndarray 

422 :return: Inverse of M 

423 :rtype: ndarray of the dimensions of M 

424 """ 

425 if linalg.det(M) == 0: 

426 raise np.linalg.LinAlgError('The inversion matrix method using Cholesky was called on an non-invertible matrix: {}!'.format(M)) 

427 

428 LCMat_tuple = linalg.cho_factor(M, lower=True) 

429 return linalg.cho_solve(LCMat_tuple, np.identity(M.shape[0])) 

430 

431def eval_Plm_at_theta(theta, lmax, norm=1): 

432 """ 

433 Evaluates the coeffs of the legendre polynomials (and 1st, and 2nd derivative) at a given theta. 

434 

435 :param theta: angle where to evaluate the Legendre polys (in rad) 

436 :type theta: float 

437 :param lmax: Max degree described by the Legendre polys 

438 :type lmax: int 

439 :param norm: Optional arg to set the normalisation of the polys (1=Schmidt semi-norm (default), 2=full norm) 

440 :type norm: int 

441 :return: 3 numpy.array with the coefs of the Legendre polys, of its 1st derivative and of its 2nd derivative 

442 :rtype: np.array, np.array, np.array 

443 """ 

444 

445 nb_legendre_coefs = int((lmax+1)*(lmax+2)/2) 

446 

447 coefs_plm = np.zeros(nb_legendre_coefs) 

448 coefs_dplm = np.zeros_like(coefs_plm) 

449 coefs_d2plm = np.zeros_like(coefs_plm) 

450 

451 fortran.legendre.plmbar2(coefs_plm, coefs_dplm, coefs_d2plm, math.cos(theta), lmax, norm) 

452 

453 return coefs_plm, coefs_dplm, coefs_d2plm 

454 

455 

456def compute_diag_AR1_coefs(cov_U, cov_ER, Tau_U, Tau_E): 

457 """ 

458 Computes the matrices for a diagonal AR-1 process 

459 

460 :param correlation_matrix: Correlation matrix of the AR-1 quantity 

461 :type correlation_matrix: np.array (dim: N x N) 

462 :param ar_timestep: Timestep of the AR-1 process in years (also called Tau) 

463 :type ar_timestep: float 

464 :return: The drift matrix and Cholesky lower matrix of the correlation matrix 

465 :rtype: np.array (dim: N x N), np.array (dim: N x N) 

466 """ 

467 

468 if not cov_U.ndim == 2: 

469 raise ValueError("cov_U must be 2D but is {}D".format(cov_U.ndim)) 

470 

471 Nu = cov_U.shape[0] 

472 Ner = cov_ER.shape[0] 

473 Nz = Nu + Ner 

474 diag = np.ones((Nz)) 

475 diag[:Nu] = 1/Tau_U 

476 diag[Nu:] = 1/Tau_E 

477 A = np.diag(diag) 

478 

479 Chol_U = np.sqrt(2. / Tau_U) * linalg.cholesky(cov_U, lower=True) 

480 Chol_ER = np.sqrt(2. / Tau_E) * linalg.cholesky(cov_ER, lower=True) 

481 Chol = linalg.block_diag(Chol_U, Chol_ER) 

482 

483 return A, Chol 

484 

485 

486def prep_AR_matrix(Z, dt_samp, Nt): 

487 """ 

488 Compute time derivatives 

489 

490 :Z param: state array 

491 :Z type: ndarray (Ntimes x Ncoeffs) 

492 :dt_samp param: time step 

493 :dt_samp type: float 

494 :Nt param: average window size 

495 :Nt type: int 

496 :X return: X 

497 :X rtype: ndarray (Ntimes x Ncoeffs) 

498 :dX return: first time derivative X 

499 :dX rtype: ndarray (Ntimes x Ncoeffs) 

500 :d2X return: second time derivative X 

501 :d2X rtype: ndarray (Ntimes x Ncoeffs) 

502 :d3X return: third time derivative X 

503 :d3X rtype: ndarray (Ntimes x Ncoeffs) 

504 """ 

505 

506 if not Z.ndim == 2: 

507 raise ValueError("Z must be 2D but is {}D".format(Z.ndim)) 

508 if Z.shape[0] < 4: 

509 raise ValueError("First dimension of Z must be at least 4 but is {}".format(Z.shape[0])) 

510 

511 x0 = Z[:-3,:] 

512 x1 = Z[1:-2,:] 

513 x2 = Z[2:-1,:] 

514 x3 = Z[3:,:] 

515 

516 # compute dX d2X d3X derivatives 

517 dX = (x1-x0)/dt_samp 

518 d2X = (x2-2*x1+x0)/dt_samp**2 

519 d3X = (x3-3*x2+3*x1-x0)/dt_samp**3 

520 

521 # compute average (not divided by smoothing window total weigth !!!) 

522 # requires to be divided by the smoothing window total weigth to get a proper average 

523 x0 = compute_average(x0, Nt) 

524 dX = compute_average(dX, Nt) 

525 d2X = compute_average(d2X, Nt) 

526 d3X = compute_average(d3X, Nt) 

527 

528 if not x0.shape == dX.shape == d2X.shape == d3X.shape: 

529 raise ValueError("x0, dX, d2X and d3X must be the same shape") 

530 

531 return x0, dX, d2X, d3X 

532 

533 

534def compute_AR_coefs_avg(container, AR_type): 

535 """ 

536 Compute the dense AR coefs: 

537 A,Chol if AR1 

538 A,B,C,Chol if AR3 

539  

540 :param container: list containing the stored variables [X, dX, dt_sampling] for every prior. 

541 :type container: list 

542 :param AR_type: type of AR process 

543 :type AR_type: str 

544 """ 

545 

546 for i in range(len(container)): 

547 if not container[i][0].shape == container[i][1].shape: 

548 raise ValueError("X, dX must be the same shape") 

549 if not container[i][0].ndim == 2: 

550 raise ValueError("Z must be 2D but is {}D".format(container[i][0].ndim)) 

551 

552 if AR_type == "AR1": 

553 NZ = container[0][0].shape[0] 

554 tmp1 = np.zeros((NZ,NZ)) 

555 tmp2 = np.zeros((NZ,NZ)) 

556 elif AR_type == "AR3": 

557 NZ = container[0][0].shape[0] // 3 

558 tmp1 = np.zeros((3*NZ,3*NZ)) 

559 tmp2 = np.zeros((3*NZ,3*NZ)) 

560 

561 Dt = container[0][2] 

562 

563 # compute AA 

564 NT = 0 

565 for i in range(len(container)): 

566 X = container[i][0] 

567 dX = container[i][1] 

568 Nt = container[i][3] 

569 Mt = np.sum(np.power(np.blackman(Nt),2)) 

570 NT += round(X.shape[1]/Nt) 

571 XXT = X @ X.T / (Mt*Nt) 

572 dXXT = dX @ X.T / (Mt*Nt) 

573 tmp1 += dXXT 

574 tmp2 += XXT 

575 

576 AA = -1/Dt * tmp1 @ np.linalg.inv(tmp2) 

577 

578 # compute WWT 

579 WWT = [] 

580 for i in range(len(container)): 

581 X = container[i][0] 

582 dX = container[i][1] 

583 dt = container[i][2] 

584 Nt = container[i][3] 

585 Mt = np.sum(np.power(np.blackman(Nt),2)) 

586 W = (dX + dt * AA @ X)/np.sqrt(dt) 

587 WWT.append(W @ W.T / (Mt*Nt)) 

588 

589 if AR_type == "AR1": 

590 S = (sum(WWT) / NT) 

591 L_S = np.linalg.cholesky(S) #Warning:: return the LOWER triangle of the Choleski decomposition! 

592 return AA.T, L_S 

593 elif AR_type == "AR3": 

594 C = AA[2*NZ:3*NZ,:NZ] 

595 B = AA[2*NZ:3*NZ,NZ:2*NZ] 

596 A = AA[2*NZ:3*NZ,2*NZ:3*NZ] 

597 S = (sum(WWT) / NT) [2*NZ:3*NZ,2*NZ:3*NZ] 

598 L_S = np.linalg.cholesky(S) #Warning:: return the LOWER triangle of the Choleski decomposition! 

599 return A.T, B.T, C.T, L_S 

600 

601 

602def compute_AR1_coefs_forecast(A, Chol, dt_forecast, Ncoef): 

603 """ 

604 Compute the forecast coeffcients A, and Chol for AR-1 process 

605 """ 

606 

607 if not (A.shape == Chol.shape == (Ncoef,Ncoef)): 

608 raise ValueError("A, Chol matrices must be the same shape") 

609 if not np.allclose(Chol, np.tril(Chol)): 

610 raise ValueError('Cholesky matrix supplied in AR-1 is not a lower triangular matrix ! Did you supply the upper Cholesky matrix instead ?') 

611 

612 # matrices pour AR1 schéma décentré 

613 Id = np.identity(Ncoef) 

614 A_forecast = dt_forecast*A-Id 

615 Chol_forecast = dt_forecast**(1/2) * Chol 

616 

617 return A_forecast, Chol_forecast 

618 

619 

620def compute_AR3_coefs_forecast(A, B, C, Chol, dt_forecast, Ncoef): 

621 """ 

622 Compute the forecast coeffcients A, B, C and Chol for AR-3 process 

623 """ 

624 

625 if not (A.shape == B.shape == C.shape == Chol.shape == (Ncoef,Ncoef)): 

626 raise ValueError("A, B, C, Chol matrices must have the same shape") 

627 if not np.allclose(Chol, np.tril(Chol)): 

628 raise ValueError('Cholesky matrix supplied in AR-3 is not a lower triangular matrix ! Did you supply the upper Cholesky matrix instead ?') 

629 

630 # matrices pour AR3 schéma décentré 

631 Id = np.identity(Ncoef) 

632 A_forecast = dt_forecast*A-3*Id 

633 B_forecast = (dt_forecast**2)*B - 2*dt_forecast*A + 3*Id 

634 C_forecast = (dt_forecast**3)*C - (dt_forecast**2)*B + dt_forecast*A - Id 

635 Chol_forecast = dt_forecast**(5/2) * Chol 

636 

637 return A_forecast, B_forecast, C_forecast, Chol_forecast 

638 

639 

640def ar1_process(X, A, Chol, random_state=None, check_Cholesky=True): 

641 """ 

642 Applies an Auto-Regressive process of order 1 to the augmented state Z. 

643 

644 X1 = - X0 @ A + Chol @ normal_noise 

645 

646 :param X: quantity on which the AR-1 process will be applied (X0 above) 

647 :type X: 1D numpy.ndarray (dim: Nz) 

648 :param A: AR-1 operator (can be diagonal or dense) 

649 :type A: 2D numpy.ndarray (dim: Nz x Nz) 

650 :param random_state: RandomState to use for normal distribution draw. If None (default), draws will be done with np.random.normal. 

651 :type: None or numpy.random.RandomState 

652 :param check_Cholesky: If True (default), checks that the Cholesky is lower triangular. Setting to False may enhance performance. 

653 :type: bool 

654 :return: vector containing the result of the AR-1 process (X1 above) 

655 :rtype: 1D numpy.ndarray (dim: Nz) 

656 """ 

657 

658 Ncoef = X.shape[0] 

659 

660 if X.ndim != 1: 

661 raise ValueError('Input quantity in AR-1 process should be 1D ! Got {} dims instead'.format(X.ndim)) 

662 if A.shape != (Ncoef, Ncoef): 

663 raise ValueError('A matrix in AR-1 should have dimensions ({0},{0}). Got {1} instead'.format(Ncoef, A.shape)) 

664 if check_Cholesky: 

665 if not np.allclose(Chol, np.tril(Chol)): 

666 raise ValueError('Cholesky matrix supplied in AR-1 is not a lower triangular matrix ! Did you supply the upper Cholesky matrix instead ?') 

667 

668 # Returns samples of the same size as qty from normal distribution with zero mean and unit variance N(0,1) 

669 if random_state is not None: 

670 normal_noise = random_state.normal(0, 1, size=Ncoef) 

671 else: 

672 normal_noise = np.random.normal(0, 1, size=Ncoef) 

673 

674 #build a scaled noise 

675 scaled_noise = Chol @ normal_noise 

676 

677 # Compute qty at t+1 from qty at t and the scaled noise (Euler-Maruyama) 

678 return - X @ A + scaled_noise 

679 

680 

681def ar3_process(X, A, B, C, Chol, random_state=None, check_Cholesky=True): 

682 """ 

683 Applies an Auto-Regressive process of order 3 to the augmented state Z. 

684 

685 X3 = - X0 @ A - X1 @ B - X2 @ C + Chol @ normal_noise 

686 

687 :param X: quantity on which the AR-3 process will be applied (X0, X1, X2 above) 

688 :type X: 2D numpy.ndarray (dim: 3 x Nz) 

689 :param A: AR-3 operator (must be dense) 

690 :type A: 2D numpy.ndarray (dim: Nz x Nz) 

691 :param B: AR-3 operator (must be dense) 

692 :type B: 2D numpy.ndarray (dim: Nz x Nz) 

693 :param C: AR-3 operator (must be dense) 

694 :type C: 2D numpy.ndarray (dim: Nz x Nz) 

695 :param random_state: RandomState to use for normal distribution draw. If None (default), draws will be done with np.random.normal. 

696 :type: None or numpy.random.RandomState 

697 :param check_Cholesky: If True (default), checks that the Cholesky is lower triangular. Setting to False may enhance performance. 

698 :type: bool 

699 :return: vector containing the result of the AR-3 process (X3 above) 

700 :rtype: 1D numpy.ndarray (dim: Nz) 

701 """ 

702 

703 Ncoef = X.shape[-1] 

704 

705 if X.ndim != 2: 

706 raise ValueError('Input quantity in AR-3 process should be 2D ! Got {} dims instead'.format(X.ndim)) 

707 if X.shape[0] != 3: 

708 raise ValueError('First dimension of input quantity in AR-3 process should be 3 ! Got {} dims instead'.format(X.shape[0])) 

709 if A.shape != (Ncoef, Ncoef): 

710 raise ValueError('A matrix in AR-3 should have dimensions ({0},{0}). Got {1} instead'.format(Ncoef, A.shape)) 

711 if B.shape != (Ncoef, Ncoef): 

712 raise ValueError('B matrix in AR-3 should have dimensions ({0},{0}). Got {1} instead'.format(Ncoef, B.shape)) 

713 if C.shape != (Ncoef, Ncoef): 

714 raise ValueError('C matrix in AR-3 should have dimensions ({0},{0}). Got {1} instead'.format(Ncoef, C.shape)) 

715 if check_Cholesky: 

716 if not np.allclose(Chol, np.tril(Chol)): 

717 raise ValueError('Cholesky matrix supplied in AR-3 is not a lower triangular matrix ! Did you supply the upper Cholesky matrix instead ?') 

718 

719 # Returns samples of the same size as qty from normal distribution with zero mean and unit variance N(0,1) 

720 if random_state is not None: 

721 normal_noise = random_state.normal(0, 1, size=Ncoef) 

722 else: 

723 normal_noise = np.random.normal(0, 1, size=Ncoef) 

724 

725 #build a scaled noise 

726 scaled_noise = Chol @ normal_noise 

727 

728 # Compute X3 from X0, X1, X2 and the scaled noise (Euler-Maruyama) 

729 return - X[2,:] @ A - X[1,:] @ B - X[0,:] @ C + scaled_noise