Coverage for pygeodyn/pca.py: 89%
56 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-22 13:43 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-22 13:43 +0000
1import math
2import logging
3import numpy as np
4from . import utilities
5from sklearn.decomposition import PCA
7class NormedPCAOperator:
8 """
9 Object handling the transforms with normalisation of the PCA of U.
10 Generates the sklearn operator (used to fit the normed data) and the norm matrix used for normalisation.
11 Implements the fit_transform, transform and inverse_transform methods with normalisation taken into account.
12 """
13 def __init__(self, config):
14 self.sklearn_operator = PCA(n_components=config.N_pca_u, svd_solver='full')
15 self.norm_matrix, self.inverse_norm_matrix = compute_normalisation_matrix(config.Lu, getattr(config, 'pca_norm', None))
17 # Assuming that the sklearn_operator was used to do the PCA on the normed data Û = NM*U (NM = norm_matrix):
18 # we have U_pca = SKC*(Û - Û0) with SKC = sklearn_operator.components_ and Û0 = sklearn_operator.mean_
19 # so Û = SKC^T * U_pca + Û0
20 # => U = NM^(-1) SKC^T * U_pca + NM^(-1) * Û0
21 # => U = S_u * U_pca + U0
22 # => U_pca = inv_S_u * (U - U0)
23 # /!\ S_u is no longer orthogonal due to normalisation so the pseudo inverse need to be stored
25 @property
26 def S_u(self):
27 return self.inverse_norm_matrix @ np.transpose(self.sklearn_operator.components_)
29 @property
30 def inv_S_u(self):
31 return self.sklearn_operator.components_ @ self.norm_matrix
33 @property
34 def U0(self):
35 return self.inverse_norm_matrix @ self.sklearn_operator.mean_
37 @property
38 def n_components(self):
39 return self.sklearn_operator.n_components
41 def fit(self, realisations_U):
42 self.sklearn_operator.fit(realisations_U @ self.norm_matrix)
44 def transform(self, U):
45 return np.transpose(self.inv_S_u @ np.transpose(U - self.U0))
47 def inverse_transform(self, pcaU):
48 return np.transpose(self.S_u @ np.transpose(pcaU)) + self.U0
50 def transform_deriv(self, dU):
51 return np.transpose(self.inv_S_u @ np.transpose(dU))
53 def inverse_transform_deriv(self, pcadU):
54 return np.transpose(self.S_u @ np.transpose(pcadU))
56 def __str__(self):
57 return type(self).__name__
60class NormedNzPCAOperator(NormedPCAOperator):
61 def __init__(self, config):
62 super().__init__(config)
63 # Project into zU,nzU space to recover only the nz part of norm_matrix
64 P_U_Ua = utilities.spectral_to_znz_matrix(config.Lu)
65 self.norm_matrix = (P_U_Ua @ self.norm_matrix @ P_U_Ua.T)[config.Lu:, config.Lu:]
66 self.inverse_norm_matrix = (P_U_Ua @ self.inverse_norm_matrix @ P_U_Ua.T)[config.Lu:, config.Lu:]
69def compute_normalisation_matrix(Lu, norm_type=None):
70 """
71 Computes the normalisation matrix for the PCA according to the asked norm type.
72 Default is None: the normalisation matrix (and its inverse) is then the identity.
74 :param Lu: max_degree of core flow U
75 :type Lu: int
76 :param norm_type: Normalisation type
77 :type norm_type: "energy" or None
78 :return: Normalisation matrix and its inverse
79 :rtype: np.array (Nu2 x Nu2), np.array (Nu2 x Nu2)
80 """
81 if norm_type is not None and norm_type == "energy":
82 logging.debug('PCA performed with energy normalisation')
83 return compute_energy_normalisation_matrix(Lu)
84 else:
85 # No normalisation
86 logging.debug('PCA performed without any normalisation')
87 return np.identity(2*Lu*(Lu+2)), np.identity(2*Lu*(Lu+2))
90def compute_energy_normalisation_matrix(Lu):
91 """
92 Computes the matrix normalising coefficients with respect to energy.
93 That is: norm_tnm = sqrt(n*(n + 1)/(2 * n + 1))*tnm
95 :param Lu: max_degree of core flow U
96 :type Lu: int
97 :return: Energy normalisation matrix and its inverse
98 :rtype: np.array (Nu2 x Nu2), np.array (Nu2 x Nu2)
99 """
101 Nu = Lu*(Lu+2)
102 norm_factors = np.zeros(2*Nu)
103 min_index = 0
105 for n in range(1, Lu + 1):
106 next_index = n*(n + 2)
108 norm_factor = math.sqrt(n * (n + 1) / (2 * n + 1))
109 norm_factors[min_index: next_index] = norm_factor
110 norm_factors[Nu + min_index: Nu + next_index] = norm_factor
112 min_index = next_index
114 return np.diag(norm_factors), np.diag(1./norm_factors)