Coverage for pygeodyn/augkf/algo.py: 84%
238 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-22 13:43 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-22 13:43 +0000
1import numpy as np
2import logging
3import scipy as sc
4from .. import common, corestates as cs, pca
5from ..inout import reads
6from ..generic.algo import GenericAlgo
7from pygeodyn.augkf.forecaster import AugkfForecasterAR1, AugkfForecasterAR3
8from pygeodyn.augkf.analyser import AugkfAnalyserAR1, AugkfAnalyserAR3
9from fractions import Fraction
10from ..inout.config import ComputationConfig
12def create_augkf_algo(config, nb_realisations, seed, attributed_models):
13 """
14 Factory function that returns the AugKF algo
16 :param config: Configuration of the algo
17 :type config: ComputationConfig
18 :param nb_realisations: number of realisations to consider in the algo
19 :type nb_realisations: int
20 :return: Algorithm object
21 :rtype: AugkfAlgo
22 """
24 return AugkfAlgo(config, nb_realisations, seed, attributed_models)
27class AugkfAlgo(GenericAlgo):
28 """
29 Augmented State Kalman Filter algorithm with DIFF treated as a contribution to ER.
30 """
31 name = 'augkf'
34 def __eq__(self, other):
35 """
36 Implementation of equality for two different AugkfAlgos
38 :param: other
39 """
40 are_all_equal = True
41 def print_equal_message(eq, key1, key2):
42 if eq:
43 pass
44 else:
45 logging.debug('vals are not equal for key = {}, {}'.format(key1, key2))
47 logging.debug('testing equalities of algo')
48 # loop on all items of the two instantiation of the classes
49 for (key, val), (key_other, val_other) in zip(self.__dict__.items(), other.__dict__.items()):
50 # if val is an instance, skip it, these classes have their own implementation
51 if isinstance(val, AugkfAnalyserAR3) or isinstance(val, AugkfForecasterAR3) or isinstance(val, ComputationConfig):
52 logging.debug('Skipping {}, {}, an instantiation of a class, val = {}'.format(key, key_other, val))
53 continue
54 if type(val) == type(dict()):
55 for (key_val, val_avg), (key_val2, val_avg2) in zip(val.items(), val_other.items()):
56 eq = np.allclose(val_avg, val_avg2)
57 print_equal_message(eq, key_val, key_val2)
58 else:
59 eq = val == val_other
60 print_equal_message(eq, key, key_other)
62 return are_all_equal
65 def __init__(self, cfg, nb_realisations, seed, attributed_models):
66 super().__init__(cfg, nb_realisations)
67 self.attributed_models = attributed_models
68 self.avg_prior, self.cov_prior = self.extract_prior_and_covariances()
69 self.legendre_polys = common.compute_legendre_polys(cfg.Nth_legendre, cfg.Lb, cfg.Lu, cfg.Lsv)
70 self.forecaster = self.create_forecaster()
71 self.seed = seed
72 self.analyser = self.create_analyser()
75 def check_PCA(self):
76 return self.config.pca
79 def create_forecaster(self):
80 """
81 Factory method to create the forecaster.
83 :return: AugkfForecaster
84 """
85 if self.config.AR_type == "AR3":
86 return AugkfForecasterAR3(self)
87 else:
88 return AugkfForecasterAR1(self)
91 def create_analyser(self):
92 """
93 Factory method to create the analyser.
95 :return: AugkfAnalyser
96 """
97 if self.config.AR_type == "AR3":
98 return AugkfAnalyserAR3(self)
99 else:
100 return AugkfAnalyserAR1(self)
103 def get_current_misfits(self, measure):
104 """ Forwards the getting of current_misfits to analyser """
105 return self.analyser.current_misfits[measure]
108 def init_corestates(self, random_state=None):
109 """
110 Sets up the corestates needed for the AugKF algorithm.
111 Returns CoreStates of the adequate form and initialisation to perform the AugKF.
113 :param random_state: Random state to use for normal draw (only used when init from noised priors)
114 :type random_state: np.random.RandomState
115 :return: computed_states, forecast_states, analysed_states, misfits, Z_AR3
116 :rtype: CoreState, CoreState, CoreState, CoreState, 3D numpy array (N_real x 3 x Ncoef) or None (if not AR3)
117 """
119 corestate_measures={}
121 # Define the measures used for the computation and their number of coeffs
122 corestate_measures.update({'MF': self.config.Nb,
123 'U': self.config.Nu2,
124 'SV': self.config.Nsv,
125 'ER': self.config.Nsv,
126 'Z': self.config.Nz})
128 # Add derivatives needed to AR3 to measures
129 if self.config.AR_type == "AR3":
130 corestate_measures.update({'dUdt': self.config.Nu2,
131 'd2Udt2': self.config.Nu2,
132 'dERdt': self.config.Nsv,
133 'd2ERdt2': self.config.Nsv})
135 # Add shear measure ('S') if do_shear = 1
136 if self.config.do_shear == 1:
137 corestate_measures.update({'S': self.config.Nu2})
139 # Build the array of computed states
140 computed_states = cs.CoreState()
141 analysed_states = cs.CoreState()
143 for meas_id, Nmeas in corestate_measures.items():
144 computed_states.addMeasure(meas_id, np.zeros((self.attributed_models.shape[0], self.config.nb_forecasts, Nmeas)))
146 # Initialize the core state at t=0
147 if self.config.core_state_init == 'from_file':
148 Z_AR3 = computed_states.initialise_from_file(self)
149 str_init = self.config.init_file
150 else:
151 Z_AR3 = computed_states.initialise_from_noised_priors(self, random_state=random_state)
152 str_init = 'normal draw around average priors'
154 logging.debug('Computed states initialised from {}'.format(str_init))
156 # Create the array storing the result of only forecasts (also copies the value at t=0)
157 forecast_states = computed_states.copy()
159 for meas_id, Nmeas in corestate_measures.items():
160 analysed_states.addMeasure(meas_id, np.zeros((self.attributed_models.shape[0], self.config.nb_analyses, Nmeas)))
162 # Create the CoreState (1 realisation and 1 coef (max_degree forced to 0)) storing the misfits of analyses
163 misfits = cs.CoreState()
164 for meas_id in ['MF', 'SV']:
165 misfits.addMeasure(meas_id, np.zeros((1, self.config.nb_analyses, 1)), meas_max_degree=0)
167 logging.info("AugKF CoreStates ready !")
168 return computed_states, forecast_states, analysed_states, misfits, Z_AR3
171 def extract_prior_and_covariances(self):
172 """
173 Extracts the priors from the files in the config prior directory. Also sets
174 the covariance matrices by computation from priors or by reading them.
176 :return: average priors and covariances matrices as dictionaries.
177 :rtype: dict, dict
178 """
179 AR_type = self.config.AR_type
180 Nz = self.config.Nz
181 Nb = self.config.Nb
182 Nsv = self.config.Nsv
183 Nuz = self.config.Nuz
184 dt_f = self.config.dt_f
186 MF, U, ER, times, dt_samp, tag = reads.extract_realisations(self.config.prior_dir, self.config.prior_type, self.config.dt_smoothing)
188 if not (len(times) == len(MF) == len(U) == len(ER) == len(dt_samp) == len(tag)):
189 raise ValueError("times, MF, U, ER, dt_samp, tag lists must be the same length but are : {}, {}, {}, {}, {}, {}".format(len(times),len(MF),len(U),len(ER),len(dt_samp),len(tag)))
191 # Applying unique function on array to get list of tags
192 res,ind = np.unique(np.array(tag), return_index=True)
193 tag_list = res[np.argsort(ind)]
195 # Store covariance
196 cov_prior = {}
197 cov_prior['Z,Z'] = np.zeros((Nz,Nz))
198 cov_prior['B,B'] = np.zeros((Nb,Nb))
199 cov_prior['U,U'] = np.zeros((Nuz,Nuz))
200 cov_prior['ER,ER'] = np.zeros((Nsv,Nsv))
201 if AR_type == "AR3":
202 cov_prior['dZ,dZ'] = np.zeros((Nz,Nz))
203 cov_prior['d2Z,d2Z'] = np.zeros((Nz,Nz))
205 # Store average
206 avg_prior = {}
208 # Init container
209 container = []
210 # If U ER not combined then a second container is needed
211 if not self.config.combined_U_ER_forecast:
212 container2 = []
213 # Loop over tag list
214 # 100path MUST NOT BE THE FIRST IN TAG LIST THIS WILL RESULT IN AN ERROR
215 # BECAUSE PCA FIT (AND AVERAGES) MUST BE DONE ON A GEODYNAMO WITH LONG TIME SERIES (LIKE 71PATH)
216 for t in tag_list:
217 # Init matrices to store iteratively if many time series in tag
218 # (case for 100path which combines 100p and 71p priors)
219 # Thus we can mix geodynamo priors
220 Z, dZ, d2Z, d3Z = np.empty((0,Nz)), np.empty((0,Nz)), np.empty((0,Nz)), np.empty((0,Nz))
222 for i in range(len(times)):
223 if t == tag[i]:
225 # Set prior size
226 U[i] = self.set_prior_size(U[i], self.config.Lu, 'U')
227 MF[i] = self.set_prior_size(MF[i], self.config.Lb, 'MF')
228 ER[i] = self.set_prior_size(ER[i], self.config.Lsv, 'ER')
230 dt_sampling = dt_samp[i] # time sampling
232 if AR_type == "AR3":
233 dt_prior = times[i][1]-times[i][0] # geodynamo sampling
234 # Compute blackman smoothing window length as the rounded ratio dt_sampling/dt_prior
235 Nt = round(dt_sampling / dt_prior)
236 # Compute blackman smoothing window total weight by summing all blackman window coeffs
237 blackman_w = np.sum(np.blackman(Nt))
238 else:
239 # No smoothing
240 Nt = 1
241 blackman_w = 1
242 # Perform a subsampling of U MF ER times according to self.config.dt_sampling
243 U[i] = common.sample_timed_quantity(times[i], U[i], self.config.dt_sampling)[1]
244 MF[i] = common.sample_timed_quantity(times[i], MF[i], self.config.dt_sampling)[1]
245 times[i], ER[i] = common.sample_timed_quantity(times[i], ER[i], self.config.dt_sampling)
246 dt_prior = times[i][1]-times[i][0] # geodynamo sampling
248 if t != "100path": #We consider average of 70path for 100path because longer time series
249 # Compute U, B, ER averages
250 avg_prior['U'] = U[i].mean(axis=0)
251 avg_prior['ER'] = ER[i].mean(axis=0)
252 avg_prior['B'] = MF[i].mean(axis=0)
254 # Center data
255 if self.check_PCA():
256 if t != "100path": # PCA fit over 71path because longer time series
257 self.config.pcaU_operator = pca.NormedPCAOperator(self.config)
258 self.config.pcaU_operator.fit(U[i])
259 # PCA transform of U
260 U[i] = self.config.pcaU_operator.transform(U[i])
261 else:
262 # Remove mean U
263 U[i] -= avg_prior['U']
264 # Remove mean ER
265 ER[i] -= avg_prior['ER']
267 if not AR_type == "diag":
268 # compute U ER B and time derivatives and concatenate Z
269 u, du, d2u, d3u = common.prep_AR_matrix(U[i], dt_prior, Nt)
270 e, de, d2e, d3e = common.prep_AR_matrix(ER[i], dt_prior, Nt)
271 b = common.prep_AR_matrix(MF[i], dt_prior, Nt)[0]
273 Z = np.concatenate((Z,self.U_ER_to_Z(u,e)), axis=0)
274 dZ = np.concatenate((dZ,self.U_ER_to_Z(du,de)), axis=0)
275 d2Z = np.concatenate((d2Z,self.U_ER_to_Z(d2u,d2e)), axis=0)
276 d3Z = np.concatenate((d3Z,self.U_ER_to_Z(d3u,d3e)), axis=0)
277 else:
278 u = np.copy(U[i])
279 e = np.copy(ER[i])
280 b = np.copy(MF[i])
282 if t != "100path": #We consider covariance of 70path for 100path because longer time series
283 # Compute U, B, ER covariance matrices
284 cov_prior['U,U'] = common.cov(u/blackman_w)
285 cov_prior['B,B'] = common.cov(b/blackman_w)
286 cov_prior['ER,ER'] = common.cov(e/blackman_w)
288 if AR_type == "diag":
289 # Diag is independant for forecast so diag block U,U and ER,ER
290 cov_prior['Z,Z'] = sc.linalg.block_diag(cov_prior['U,U'],cov_prior['ER,ER'])
291 else:
292 # If U and ER dependant for forecast
293 if self.config.combined_U_ER_forecast :
294 cov_prior['Z,Z'] = common.cov(Z/blackman_w)
295 cov_prior['d2Z,d2Z'] = common.cov(d2Z/blackman_w)
296 cov_prior['dZ,dZ'] = common.cov(dZ/blackman_w)
297 else:
298 # Concatenate
299 cov_prior['Z,Z'] = sc.linalg.block_diag(cov_prior['U,U'],cov_prior['ER,ER'])
300 cov_prior['dZ,dZ'] = sc.linalg.block_diag(common.cov(du/blackman_w),common.cov(de/blackman_w))
301 cov_prior['d2Z,d2Z'] = sc.linalg.block_diag(common.cov(d2u/blackman_w),common.cov(d2e/blackman_w))
303 if not AR_type == "diag":
304 # If U and ER independant for forecast
305 if not self.config.combined_U_ER_forecast:
306 # Split U ER parts of Z
307 Uz = Z[:,:Nuz]
308 dUz = dZ[:,:Nuz]
309 d2Uz = d2Z[:,:Nuz]
310 d3Uz = d3Z[:,:Nuz]
311 ERz = Z[:,Nuz:]
312 dERz = dZ[:,Nuz:]
313 d2ERz = d2Z[:,Nuz:]
314 d3ERz = d3Z[:,Nuz:]
316 if AR_type == "AR1":
317 # If U and ER dependant for forecast
318 if self.config.combined_U_ER_forecast:
319 # Append container for Z
320 container.append([Z.T, dZ.T * dt_prior, dt_prior, Nt])
321 else:
322 # Append container for U
323 container.append([Uz.T, dUz.T * dt_prior, dt_prior, Nt])
324 # Append container 2 for ER
325 container2.append([ERz.T, dERz.T * dt_prior, dt_prior, Nt])
326 elif AR_type == "AR3":
327 # If U and ER dependant for forecast
328 if self.config.combined_U_ER_forecast:
329 # Append container for Z
330 container.append([np.concatenate((Z.T,dZ.T,d2Z.T),axis=0),
331 np.concatenate((dZ.T,d2Z.T,d3Z.T),axis=0) * dt_prior,
332 dt_prior, Nt])
333 else:
334 # Append container for U
335 container.append([np.concatenate((Uz.T,dUz.T,d2Uz.T),axis=0),
336 np.concatenate((dUz.T,d2Uz.T,d3Uz.T),axis=0) * dt_prior,
337 dt_prior, Nt])
338 # Append container 2 for ER
339 container2.append([np.concatenate((ERz.T,dERz.T,d2ERz.T),axis=0),
340 np.concatenate((dERz.T,d2ERz.T,d3ERz.T),axis=0) * dt_prior,
341 dt_prior, Nt])
343 if AR_type == "AR1":
344 # compute A, Chol of U or Z
345 (A, Chol) = common.compute_AR_coefs_avg(container, AR_type)
346 if not self.config.combined_U_ER_forecast:
347 # compute A, Chol of ER
348 (A2, Chol2) = common.compute_AR_coefs_avg(container2, AR_type)
349 # diag block
350 A = sc.linalg.block_diag(A,A2)
351 Chol = sc.linalg.block_diag(Chol,Chol2)
352 # compute A, Chol for forecast
353 (cov_prior['A'],
354 cov_prior['Chol']) = common.compute_AR1_coefs_forecast(A, Chol, dt_f, Nz)
355 elif AR_type == "diag":
356 # compute A, Chol
357 A, Chol = common.compute_diag_AR1_coefs(cov_prior['U,U'],
358 cov_prior['ER,ER'],
359 self.config.TauU,
360 self.config.TauE)
361 # compute A, Chol for forecast
362 (cov_prior['A'],
363 cov_prior['Chol']) = common.compute_AR1_coefs_forecast(A, Chol, dt_f, Nz)
364 elif AR_type == "AR3":
365 # compute A, B, C, Chol of U or Z
366 (A,B,C,Chol) = common.compute_AR_coefs_avg(container, AR_type)
367 if not self.config.combined_U_ER_forecast:
368 # compute A, B, C, Chol of ER
369 (A2,B2,C2,Chol2) = common.compute_AR_coefs_avg(container2, AR_type)
370 # diag block
371 A = sc.linalg.block_diag(A,A2)
372 B = sc.linalg.block_diag(B,B2)
373 C = sc.linalg.block_diag(C,C2)
374 Chol = sc.linalg.block_diag(Chol,Chol2)
375 # compute A, B, C, Chol forecast
376 (cov_prior['A'],
377 cov_prior['B'],
378 cov_prior['C'],
379 cov_prior['Chol']) = common.compute_AR3_coefs_forecast(A, B, C, Chol, dt_f, Nz)
381 logging.debug('Reading and computation of priors/covariances of AugkfAlgo OK !')
383 return avg_prior, cov_prior
386 def check_coef_size(self, X, asked_max_degree, asked_coeffs):
387 # Take only data corresponding to the asked coefs (and check that they are not bigger than the data...)
388 if X.shape[1] < asked_coeffs:
389 raise ValueError(
390 'Asked max degree ({}) is too big to be handled by the prior data in {}. Please retry with a lower max degree.'.format(
391 asked_max_degree, self.config.prior_dir))
394 def set_prior_size(self, X, asked_max_degree, measure):
395 asked_coeffs = asked_max_degree * (asked_max_degree + 2)
396 if measure in ['MF', 'SV', 'ER']:
397 self.check_coef_size(X, asked_max_degree, asked_coeffs)
398 return X[:, :asked_coeffs]
399 else:
400 self.check_coef_size(X, asked_max_degree, asked_coeffs)
401 read_Nu = X.shape[1] // 2
402 return np.concatenate((X[:, :asked_coeffs], X[:, read_Nu:read_Nu + asked_coeffs]), axis=1)
405 def Z_to_U_ER(self, Z, dimension):
406 """
407 Compute U ER from Z
409 :param Z: Augmented state Z
410 :type Z: numpy array, dim Ncoef (1D) or Nreal x Ncoef (2D)
411 :param dimension: dimension of Z
412 :type dimension: int
413 :return: U and ER states
414 :rtype: 1D or 2D U and ER states
415 """
417 if dimension == 1: # Ncoef
418 if self.check_PCA():
419 ER = Z[self.config.N_pca_u:] + self.avg_prior['ER']
420 U = self.config.pcaU_operator.inverse_transform(Z[:self.config.N_pca_u])
421 else:
422 U = Z[:self.config.Nu2] + self.avg_prior['U']
423 ER = Z[self.config.Nu2:] + self.avg_prior['ER']
424 elif dimension == 2: # Nreal x Ncoef
425 if self.check_PCA():
426 ER = Z[:, self.config.N_pca_u:] + self.avg_prior['ER']
427 U = self.config.pcaU_operator.inverse_transform(Z[:, :self.config.N_pca_u])
428 else:
429 U = Z[:, :self.config.Nu2] + self.avg_prior['U']
430 ER = Z[:, self.config.Nu2:] + self.avg_prior['ER']
431 else:
432 raise ValueError("dimension must be equal to 1 or 2 but is {}".format(dimension))
433 return U, ER
436 def dZ_to_dU_dER(self, dZ):
437 """
438 Compute dU dER from dZ or d2U d2ER from d2Z
440 :param dZ: derivative of augmented state (dZ)
441 :type times: numpy array, Nreal x Ncoef (2D)
442 :return: derivatives of U and ER states
443 :rtype: 2D dU and dER states
444 """
446 # Nreal x Ncoef
447 if self.check_PCA():
448 dER = dZ[:, self.config.N_pca_u:]
449 dU = self.config.pcaU_operator.inverse_transform_deriv(dZ[:, :self.config.N_pca_u])
450 else:
451 dU = dZ[:, :self.config.Nu2]
452 dER = dZ[:, self.config.Nu2:]
453 return dU, dER
456 def U_ER_to_Z(self, U, ER):
457 """
458 Compute Z from U ER
460 :param U: U state
461 :type U: numpy array, dim Ncoef (1D) or Ntimes x Ncoef (2D) or Nreal x Ntimes x Ncoef (3D)
462 :return: U and ER states
463 :rtype: 1D or 2D or 3D Z states
464 """
465 if U.ndim == ER.ndim:
466 if U.ndim == 1: # Ncoef
467 return np.concatenate((U, ER),axis=0)
468 elif U.ndim == 2: # Ntimes x Ncoef
469 return np.concatenate((U, ER),axis=1)
470 elif U.ndim == 3: # Nreal x Ntimes x Ncoef
471 return np.concatenate((U, ER),axis=2)
472 else:
473 raise ValueError("U and ER must be 1D, 2D or 3D arrays but are {}, {}".format(U.ndim, ER.ndim))
474 else:
475 raise ValueError("U and ER arrays must be the same number of dimensions but are {}, {}".format(U.ndim, ER.ndim))