Coverage for pygeodyn/augkf/algo.py: 84%

238 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-12-22 13:43 +0000

1import numpy as np 

2import logging 

3import scipy as sc 

4from .. import common, corestates as cs, pca 

5from ..inout import reads 

6from ..generic.algo import GenericAlgo 

7from pygeodyn.augkf.forecaster import AugkfForecasterAR1, AugkfForecasterAR3 

8from pygeodyn.augkf.analyser import AugkfAnalyserAR1, AugkfAnalyserAR3 

9from fractions import Fraction 

10from ..inout.config import ComputationConfig 

11 

12def create_augkf_algo(config, nb_realisations, seed, attributed_models): 

13 """ 

14 Factory function that returns the AugKF algo 

15 

16 :param config: Configuration of the algo 

17 :type config: ComputationConfig 

18 :param nb_realisations: number of realisations to consider in the algo 

19 :type nb_realisations: int 

20 :return: Algorithm object 

21 :rtype: AugkfAlgo 

22 """ 

23 

24 return AugkfAlgo(config, nb_realisations, seed, attributed_models) 

25 

26 

27class AugkfAlgo(GenericAlgo): 

28 """ 

29 Augmented State Kalman Filter algorithm with DIFF treated as a contribution to ER. 

30 """ 

31 name = 'augkf' 

32 

33 

34 def __eq__(self, other): 

35 """ 

36 Implementation of equality for two different AugkfAlgos 

37 

38 :param: other 

39 """ 

40 are_all_equal = True 

41 def print_equal_message(eq, key1, key2): 

42 if eq: 

43 pass 

44 else: 

45 logging.debug('vals are not equal for key = {}, {}'.format(key1, key2)) 

46 

47 logging.debug('testing equalities of algo') 

48 # loop on all items of the two instantiation of the classes 

49 for (key, val), (key_other, val_other) in zip(self.__dict__.items(), other.__dict__.items()): 

50 # if val is an instance, skip it, these classes have their own implementation 

51 if isinstance(val, AugkfAnalyserAR3) or isinstance(val, AugkfForecasterAR3) or isinstance(val, ComputationConfig): 

52 logging.debug('Skipping {}, {}, an instantiation of a class, val = {}'.format(key, key_other, val)) 

53 continue 

54 if type(val) == type(dict()): 

55 for (key_val, val_avg), (key_val2, val_avg2) in zip(val.items(), val_other.items()): 

56 eq = np.allclose(val_avg, val_avg2) 

57 print_equal_message(eq, key_val, key_val2) 

58 else: 

59 eq = val == val_other 

60 print_equal_message(eq, key, key_other) 

61 

62 return are_all_equal 

63 

64 

65 def __init__(self, cfg, nb_realisations, seed, attributed_models): 

66 super().__init__(cfg, nb_realisations) 

67 self.attributed_models = attributed_models 

68 self.avg_prior, self.cov_prior = self.extract_prior_and_covariances() 

69 self.legendre_polys = common.compute_legendre_polys(cfg.Nth_legendre, cfg.Lb, cfg.Lu, cfg.Lsv) 

70 self.forecaster = self.create_forecaster() 

71 self.seed = seed 

72 self.analyser = self.create_analyser() 

73 

74 

75 def check_PCA(self): 

76 return self.config.pca 

77 

78 

79 def create_forecaster(self): 

80 """ 

81 Factory method to create the forecaster. 

82 

83 :return: AugkfForecaster 

84 """ 

85 if self.config.AR_type == "AR3": 

86 return AugkfForecasterAR3(self) 

87 else: 

88 return AugkfForecasterAR1(self) 

89 

90 

91 def create_analyser(self): 

92 """ 

93 Factory method to create the analyser. 

94 

95 :return: AugkfAnalyser 

96 """ 

97 if self.config.AR_type == "AR3": 

98 return AugkfAnalyserAR3(self) 

99 else: 

100 return AugkfAnalyserAR1(self) 

101 

102 

103 def get_current_misfits(self, measure): 

104 """ Forwards the getting of current_misfits to analyser """ 

105 return self.analyser.current_misfits[measure] 

106 

107 

108 def init_corestates(self, random_state=None): 

109 """ 

110 Sets up the corestates needed for the AugKF algorithm. 

111 Returns CoreStates of the adequate form and initialisation to perform the AugKF. 

112 

113 :param random_state: Random state to use for normal draw (only used when init from noised priors) 

114 :type random_state: np.random.RandomState 

115 :return: computed_states, forecast_states, analysed_states, misfits, Z_AR3 

116 :rtype: CoreState, CoreState, CoreState, CoreState, 3D numpy array (N_real x 3 x Ncoef) or None (if not AR3) 

117 """ 

118 

119 corestate_measures={} 

120 

121 # Define the measures used for the computation and their number of coeffs 

122 corestate_measures.update({'MF': self.config.Nb, 

123 'U': self.config.Nu2, 

124 'SV': self.config.Nsv, 

125 'ER': self.config.Nsv, 

126 'Z': self.config.Nz}) 

127 

128 # Add derivatives needed to AR3 to measures 

129 if self.config.AR_type == "AR3": 

130 corestate_measures.update({'dUdt': self.config.Nu2, 

131 'd2Udt2': self.config.Nu2, 

132 'dERdt': self.config.Nsv, 

133 'd2ERdt2': self.config.Nsv}) 

134 

135 # Add shear measure ('S') if do_shear = 1 

136 if self.config.do_shear == 1: 

137 corestate_measures.update({'S': self.config.Nu2}) 

138 

139 # Build the array of computed states 

140 computed_states = cs.CoreState() 

141 analysed_states = cs.CoreState() 

142 

143 for meas_id, Nmeas in corestate_measures.items(): 

144 computed_states.addMeasure(meas_id, np.zeros((self.attributed_models.shape[0], self.config.nb_forecasts, Nmeas))) 

145 

146 # Initialize the core state at t=0 

147 if self.config.core_state_init == 'from_file': 

148 Z_AR3 = computed_states.initialise_from_file(self) 

149 str_init = self.config.init_file 

150 else: 

151 Z_AR3 = computed_states.initialise_from_noised_priors(self, random_state=random_state) 

152 str_init = 'normal draw around average priors' 

153 

154 logging.debug('Computed states initialised from {}'.format(str_init)) 

155 

156 # Create the array storing the result of only forecasts (also copies the value at t=0) 

157 forecast_states = computed_states.copy() 

158 

159 for meas_id, Nmeas in corestate_measures.items(): 

160 analysed_states.addMeasure(meas_id, np.zeros((self.attributed_models.shape[0], self.config.nb_analyses, Nmeas))) 

161 

162 # Create the CoreState (1 realisation and 1 coef (max_degree forced to 0)) storing the misfits of analyses 

163 misfits = cs.CoreState() 

164 for meas_id in ['MF', 'SV']: 

165 misfits.addMeasure(meas_id, np.zeros((1, self.config.nb_analyses, 1)), meas_max_degree=0) 

166 

167 logging.info("AugKF CoreStates ready !") 

168 return computed_states, forecast_states, analysed_states, misfits, Z_AR3 

169 

170 

171 def extract_prior_and_covariances(self): 

172 """ 

173 Extracts the priors from the files in the config prior directory. Also sets 

174 the covariance matrices by computation from priors or by reading them. 

175 

176 :return: average priors and covariances matrices as dictionaries. 

177 :rtype: dict, dict 

178 """ 

179 AR_type = self.config.AR_type 

180 Nz = self.config.Nz 

181 Nb = self.config.Nb 

182 Nsv = self.config.Nsv 

183 Nuz = self.config.Nuz 

184 dt_f = self.config.dt_f 

185 

186 MF, U, ER, times, dt_samp, tag = reads.extract_realisations(self.config.prior_dir, self.config.prior_type, self.config.dt_smoothing) 

187 

188 if not (len(times) == len(MF) == len(U) == len(ER) == len(dt_samp) == len(tag)): 

189 raise ValueError("times, MF, U, ER, dt_samp, tag lists must be the same length but are : {}, {}, {}, {}, {}, {}".format(len(times),len(MF),len(U),len(ER),len(dt_samp),len(tag))) 

190 

191 # Applying unique function on array to get list of tags 

192 res,ind = np.unique(np.array(tag), return_index=True) 

193 tag_list = res[np.argsort(ind)] 

194 

195 # Store covariance 

196 cov_prior = {} 

197 cov_prior['Z,Z'] = np.zeros((Nz,Nz)) 

198 cov_prior['B,B'] = np.zeros((Nb,Nb)) 

199 cov_prior['U,U'] = np.zeros((Nuz,Nuz)) 

200 cov_prior['ER,ER'] = np.zeros((Nsv,Nsv)) 

201 if AR_type == "AR3": 

202 cov_prior['dZ,dZ'] = np.zeros((Nz,Nz)) 

203 cov_prior['d2Z,d2Z'] = np.zeros((Nz,Nz)) 

204 

205 # Store average 

206 avg_prior = {} 

207 

208 # Init container 

209 container = [] 

210 # If U ER not combined then a second container is needed 

211 if not self.config.combined_U_ER_forecast: 

212 container2 = [] 

213 # Loop over tag list  

214 # 100path MUST NOT BE THE FIRST IN TAG LIST THIS WILL RESULT IN AN ERROR 

215 # BECAUSE PCA FIT (AND AVERAGES) MUST BE DONE ON A GEODYNAMO WITH LONG TIME SERIES (LIKE 71PATH) 

216 for t in tag_list: 

217 # Init matrices to store iteratively if many time series in tag  

218 # (case for 100path which combines 100p and 71p priors)  

219 # Thus we can mix geodynamo priors 

220 Z, dZ, d2Z, d3Z = np.empty((0,Nz)), np.empty((0,Nz)), np.empty((0,Nz)), np.empty((0,Nz)) 

221 

222 for i in range(len(times)): 

223 if t == tag[i]: 

224 

225 # Set prior size  

226 U[i] = self.set_prior_size(U[i], self.config.Lu, 'U') 

227 MF[i] = self.set_prior_size(MF[i], self.config.Lb, 'MF') 

228 ER[i] = self.set_prior_size(ER[i], self.config.Lsv, 'ER') 

229 

230 dt_sampling = dt_samp[i] # time sampling 

231 

232 if AR_type == "AR3": 

233 dt_prior = times[i][1]-times[i][0] # geodynamo sampling 

234 # Compute blackman smoothing window length as the rounded ratio dt_sampling/dt_prior 

235 Nt = round(dt_sampling / dt_prior) 

236 # Compute blackman smoothing window total weight by summing all blackman window coeffs 

237 blackman_w = np.sum(np.blackman(Nt)) 

238 else: 

239 # No smoothing 

240 Nt = 1 

241 blackman_w = 1 

242 # Perform a subsampling of U MF ER times according to self.config.dt_sampling 

243 U[i] = common.sample_timed_quantity(times[i], U[i], self.config.dt_sampling)[1] 

244 MF[i] = common.sample_timed_quantity(times[i], MF[i], self.config.dt_sampling)[1] 

245 times[i], ER[i] = common.sample_timed_quantity(times[i], ER[i], self.config.dt_sampling) 

246 dt_prior = times[i][1]-times[i][0] # geodynamo sampling 

247 

248 if t != "100path": #We consider average of 70path for 100path because longer time series 

249 # Compute U, B, ER averages 

250 avg_prior['U'] = U[i].mean(axis=0) 

251 avg_prior['ER'] = ER[i].mean(axis=0) 

252 avg_prior['B'] = MF[i].mean(axis=0) 

253 

254 # Center data 

255 if self.check_PCA(): 

256 if t != "100path": # PCA fit over 71path because longer time series 

257 self.config.pcaU_operator = pca.NormedPCAOperator(self.config) 

258 self.config.pcaU_operator.fit(U[i]) 

259 # PCA transform of U 

260 U[i] = self.config.pcaU_operator.transform(U[i]) 

261 else: 

262 # Remove mean U 

263 U[i] -= avg_prior['U'] 

264 # Remove mean ER 

265 ER[i] -= avg_prior['ER'] 

266 

267 if not AR_type == "diag": 

268 # compute U ER B and time derivatives and concatenate Z 

269 u, du, d2u, d3u = common.prep_AR_matrix(U[i], dt_prior, Nt) 

270 e, de, d2e, d3e = common.prep_AR_matrix(ER[i], dt_prior, Nt) 

271 b = common.prep_AR_matrix(MF[i], dt_prior, Nt)[0] 

272 

273 Z = np.concatenate((Z,self.U_ER_to_Z(u,e)), axis=0) 

274 dZ = np.concatenate((dZ,self.U_ER_to_Z(du,de)), axis=0) 

275 d2Z = np.concatenate((d2Z,self.U_ER_to_Z(d2u,d2e)), axis=0) 

276 d3Z = np.concatenate((d3Z,self.U_ER_to_Z(d3u,d3e)), axis=0) 

277 else: 

278 u = np.copy(U[i]) 

279 e = np.copy(ER[i]) 

280 b = np.copy(MF[i]) 

281 

282 if t != "100path": #We consider covariance of 70path for 100path because longer time series 

283 # Compute U, B, ER covariance matrices 

284 cov_prior['U,U'] = common.cov(u/blackman_w) 

285 cov_prior['B,B'] = common.cov(b/blackman_w) 

286 cov_prior['ER,ER'] = common.cov(e/blackman_w) 

287 

288 if AR_type == "diag": 

289 # Diag is independant for forecast so diag block U,U and ER,ER 

290 cov_prior['Z,Z'] = sc.linalg.block_diag(cov_prior['U,U'],cov_prior['ER,ER']) 

291 else: 

292 # If U and ER dependant for forecast 

293 if self.config.combined_U_ER_forecast : 

294 cov_prior['Z,Z'] = common.cov(Z/blackman_w) 

295 cov_prior['d2Z,d2Z'] = common.cov(d2Z/blackman_w) 

296 cov_prior['dZ,dZ'] = common.cov(dZ/blackman_w) 

297 else: 

298 # Concatenate 

299 cov_prior['Z,Z'] = sc.linalg.block_diag(cov_prior['U,U'],cov_prior['ER,ER']) 

300 cov_prior['dZ,dZ'] = sc.linalg.block_diag(common.cov(du/blackman_w),common.cov(de/blackman_w)) 

301 cov_prior['d2Z,d2Z'] = sc.linalg.block_diag(common.cov(d2u/blackman_w),common.cov(d2e/blackman_w)) 

302 

303 if not AR_type == "diag": 

304 # If U and ER independant for forecast 

305 if not self.config.combined_U_ER_forecast: 

306 # Split U ER parts of Z 

307 Uz = Z[:,:Nuz] 

308 dUz = dZ[:,:Nuz] 

309 d2Uz = d2Z[:,:Nuz] 

310 d3Uz = d3Z[:,:Nuz] 

311 ERz = Z[:,Nuz:] 

312 dERz = dZ[:,Nuz:] 

313 d2ERz = d2Z[:,Nuz:] 

314 d3ERz = d3Z[:,Nuz:] 

315 

316 if AR_type == "AR1": 

317 # If U and ER dependant for forecast 

318 if self.config.combined_U_ER_forecast: 

319 # Append container for Z 

320 container.append([Z.T, dZ.T * dt_prior, dt_prior, Nt]) 

321 else: 

322 # Append container for U 

323 container.append([Uz.T, dUz.T * dt_prior, dt_prior, Nt]) 

324 # Append container 2 for ER 

325 container2.append([ERz.T, dERz.T * dt_prior, dt_prior, Nt]) 

326 elif AR_type == "AR3": 

327 # If U and ER dependant for forecast 

328 if self.config.combined_U_ER_forecast: 

329 # Append container for Z 

330 container.append([np.concatenate((Z.T,dZ.T,d2Z.T),axis=0), 

331 np.concatenate((dZ.T,d2Z.T,d3Z.T),axis=0) * dt_prior, 

332 dt_prior, Nt]) 

333 else: 

334 # Append container for U 

335 container.append([np.concatenate((Uz.T,dUz.T,d2Uz.T),axis=0), 

336 np.concatenate((dUz.T,d2Uz.T,d3Uz.T),axis=0) * dt_prior, 

337 dt_prior, Nt]) 

338 # Append container 2 for ER 

339 container2.append([np.concatenate((ERz.T,dERz.T,d2ERz.T),axis=0), 

340 np.concatenate((dERz.T,d2ERz.T,d3ERz.T),axis=0) * dt_prior, 

341 dt_prior, Nt]) 

342 

343 if AR_type == "AR1": 

344 # compute A, Chol of U or Z 

345 (A, Chol) = common.compute_AR_coefs_avg(container, AR_type) 

346 if not self.config.combined_U_ER_forecast: 

347 # compute A, Chol of ER 

348 (A2, Chol2) = common.compute_AR_coefs_avg(container2, AR_type) 

349 # diag block  

350 A = sc.linalg.block_diag(A,A2) 

351 Chol = sc.linalg.block_diag(Chol,Chol2) 

352 # compute A, Chol for forecast 

353 (cov_prior['A'], 

354 cov_prior['Chol']) = common.compute_AR1_coefs_forecast(A, Chol, dt_f, Nz) 

355 elif AR_type == "diag": 

356 # compute A, Chol 

357 A, Chol = common.compute_diag_AR1_coefs(cov_prior['U,U'], 

358 cov_prior['ER,ER'], 

359 self.config.TauU, 

360 self.config.TauE) 

361 # compute A, Chol for forecast 

362 (cov_prior['A'], 

363 cov_prior['Chol']) = common.compute_AR1_coefs_forecast(A, Chol, dt_f, Nz) 

364 elif AR_type == "AR3": 

365 # compute A, B, C, Chol of U or Z 

366 (A,B,C,Chol) = common.compute_AR_coefs_avg(container, AR_type) 

367 if not self.config.combined_U_ER_forecast: 

368 # compute A, B, C, Chol of ER 

369 (A2,B2,C2,Chol2) = common.compute_AR_coefs_avg(container2, AR_type) 

370 # diag block  

371 A = sc.linalg.block_diag(A,A2) 

372 B = sc.linalg.block_diag(B,B2) 

373 C = sc.linalg.block_diag(C,C2) 

374 Chol = sc.linalg.block_diag(Chol,Chol2) 

375 # compute A, B, C, Chol forecast 

376 (cov_prior['A'], 

377 cov_prior['B'], 

378 cov_prior['C'], 

379 cov_prior['Chol']) = common.compute_AR3_coefs_forecast(A, B, C, Chol, dt_f, Nz) 

380 

381 logging.debug('Reading and computation of priors/covariances of AugkfAlgo OK !') 

382 

383 return avg_prior, cov_prior 

384 

385 

386 def check_coef_size(self, X, asked_max_degree, asked_coeffs): 

387 # Take only data corresponding to the asked coefs (and check that they are not bigger than the data...) 

388 if X.shape[1] < asked_coeffs: 

389 raise ValueError( 

390 'Asked max degree ({}) is too big to be handled by the prior data in {}. Please retry with a lower max degree.'.format( 

391 asked_max_degree, self.config.prior_dir)) 

392 

393 

394 def set_prior_size(self, X, asked_max_degree, measure): 

395 asked_coeffs = asked_max_degree * (asked_max_degree + 2) 

396 if measure in ['MF', 'SV', 'ER']: 

397 self.check_coef_size(X, asked_max_degree, asked_coeffs) 

398 return X[:, :asked_coeffs] 

399 else: 

400 self.check_coef_size(X, asked_max_degree, asked_coeffs) 

401 read_Nu = X.shape[1] // 2 

402 return np.concatenate((X[:, :asked_coeffs], X[:, read_Nu:read_Nu + asked_coeffs]), axis=1) 

403 

404 

405 def Z_to_U_ER(self, Z, dimension): 

406 """ 

407 Compute U ER from Z  

408 

409 :param Z: Augmented state Z 

410 :type Z: numpy array, dim Ncoef (1D) or Nreal x Ncoef (2D) 

411 :param dimension: dimension of Z 

412 :type dimension: int  

413 :return: U and ER states 

414 :rtype: 1D or 2D U and ER states 

415 """ 

416 

417 if dimension == 1: # Ncoef 

418 if self.check_PCA(): 

419 ER = Z[self.config.N_pca_u:] + self.avg_prior['ER'] 

420 U = self.config.pcaU_operator.inverse_transform(Z[:self.config.N_pca_u]) 

421 else: 

422 U = Z[:self.config.Nu2] + self.avg_prior['U'] 

423 ER = Z[self.config.Nu2:] + self.avg_prior['ER'] 

424 elif dimension == 2: # Nreal x Ncoef 

425 if self.check_PCA(): 

426 ER = Z[:, self.config.N_pca_u:] + self.avg_prior['ER'] 

427 U = self.config.pcaU_operator.inverse_transform(Z[:, :self.config.N_pca_u]) 

428 else: 

429 U = Z[:, :self.config.Nu2] + self.avg_prior['U'] 

430 ER = Z[:, self.config.Nu2:] + self.avg_prior['ER'] 

431 else: 

432 raise ValueError("dimension must be equal to 1 or 2 but is {}".format(dimension)) 

433 return U, ER 

434 

435 

436 def dZ_to_dU_dER(self, dZ): 

437 """ 

438 Compute dU dER from dZ or d2U d2ER from d2Z 

439 

440 :param dZ: derivative of augmented state (dZ) 

441 :type times: numpy array, Nreal x Ncoef (2D) 

442 :return: derivatives of U and ER states 

443 :rtype: 2D dU and dER states 

444 """ 

445 

446 # Nreal x Ncoef 

447 if self.check_PCA(): 

448 dER = dZ[:, self.config.N_pca_u:] 

449 dU = self.config.pcaU_operator.inverse_transform_deriv(dZ[:, :self.config.N_pca_u]) 

450 else: 

451 dU = dZ[:, :self.config.Nu2] 

452 dER = dZ[:, self.config.Nu2:] 

453 return dU, dER 

454 

455 

456 def U_ER_to_Z(self, U, ER): 

457 """ 

458 Compute Z from U ER 

459 

460 :param U: U state 

461 :type U: numpy array, dim Ncoef (1D) or Ntimes x Ncoef (2D) or Nreal x Ntimes x Ncoef (3D) 

462 :return: U and ER states 

463 :rtype: 1D or 2D or 3D Z states 

464 """ 

465 if U.ndim == ER.ndim: 

466 if U.ndim == 1: # Ncoef 

467 return np.concatenate((U, ER),axis=0) 

468 elif U.ndim == 2: # Ntimes x Ncoef 

469 return np.concatenate((U, ER),axis=1) 

470 elif U.ndim == 3: # Nreal x Ntimes x Ncoef 

471 return np.concatenate((U, ER),axis=2) 

472 else: 

473 raise ValueError("U and ER must be 1D, 2D or 3D arrays but are {}, {}".format(U.ndim, ER.ndim)) 

474 else: 

475 raise ValueError("U and ER arrays must be the same number of dimensions but are {}, {}".format(U.ndim, ER.ndim))