Coverage for pygeodyn/pca.py: 89%

56 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-12-22 13:43 +0000

1import math 

2import logging 

3import numpy as np 

4from . import utilities 

5from sklearn.decomposition import PCA 

6 

7class NormedPCAOperator: 

8 """ 

9 Object handling the transforms with normalisation of the PCA of U. 

10 Generates the sklearn operator (used to fit the normed data) and the norm matrix used for normalisation. 

11 Implements the fit_transform, transform and inverse_transform methods with normalisation taken into account. 

12 """ 

13 def __init__(self, config): 

14 self.sklearn_operator = PCA(n_components=config.N_pca_u, svd_solver='full') 

15 self.norm_matrix, self.inverse_norm_matrix = compute_normalisation_matrix(config.Lu, getattr(config, 'pca_norm', None)) 

16 

17 # Assuming that the sklearn_operator was used to do the PCA on the normed data Û = NM*U (NM = norm_matrix): 

18 # we have U_pca = SKC*(Û - Û0) with SKC = sklearn_operator.components_ and Û0 = sklearn_operator.mean_ 

19 # so Û = SKC^T * U_pca + Û0 

20 # => U = NM^(-1) SKC^T * U_pca + NM^(-1) * Û0 

21 # => U = S_u * U_pca + U0 

22 # => U_pca = inv_S_u * (U - U0) 

23 # /!\ S_u is no longer orthogonal due to normalisation so the pseudo inverse need to be stored 

24 

25 @property 

26 def S_u(self): 

27 return self.inverse_norm_matrix @ np.transpose(self.sklearn_operator.components_) 

28 

29 @property 

30 def inv_S_u(self): 

31 return self.sklearn_operator.components_ @ self.norm_matrix 

32 

33 @property 

34 def U0(self): 

35 return self.inverse_norm_matrix @ self.sklearn_operator.mean_ 

36 

37 @property 

38 def n_components(self): 

39 return self.sklearn_operator.n_components 

40 

41 def fit(self, realisations_U): 

42 self.sklearn_operator.fit(realisations_U @ self.norm_matrix) 

43 

44 def transform(self, U): 

45 return np.transpose(self.inv_S_u @ np.transpose(U - self.U0)) 

46 

47 def inverse_transform(self, pcaU): 

48 return np.transpose(self.S_u @ np.transpose(pcaU)) + self.U0 

49 

50 def transform_deriv(self, dU): 

51 return np.transpose(self.inv_S_u @ np.transpose(dU)) 

52 

53 def inverse_transform_deriv(self, pcadU): 

54 return np.transpose(self.S_u @ np.transpose(pcadU)) 

55 

56 def __str__(self): 

57 return type(self).__name__ 

58 

59 

60class NormedNzPCAOperator(NormedPCAOperator): 

61 def __init__(self, config): 

62 super().__init__(config) 

63 # Project into zU,nzU space to recover only the nz part of norm_matrix 

64 P_U_Ua = utilities.spectral_to_znz_matrix(config.Lu) 

65 self.norm_matrix = (P_U_Ua @ self.norm_matrix @ P_U_Ua.T)[config.Lu:, config.Lu:] 

66 self.inverse_norm_matrix = (P_U_Ua @ self.inverse_norm_matrix @ P_U_Ua.T)[config.Lu:, config.Lu:] 

67 

68 

69def compute_normalisation_matrix(Lu, norm_type=None): 

70 """ 

71 Computes the normalisation matrix for the PCA according to the asked norm type. 

72 Default is None: the normalisation matrix (and its inverse) is then the identity. 

73 

74 :param Lu: max_degree of core flow U 

75 :type Lu: int 

76 :param norm_type: Normalisation type 

77 :type norm_type: "energy" or None 

78 :return: Normalisation matrix and its inverse 

79 :rtype: np.array (Nu2 x Nu2), np.array (Nu2 x Nu2) 

80 """ 

81 if norm_type is not None and norm_type == "energy": 

82 logging.debug('PCA performed with energy normalisation') 

83 return compute_energy_normalisation_matrix(Lu) 

84 else: 

85 # No normalisation 

86 logging.debug('PCA performed without any normalisation') 

87 return np.identity(2*Lu*(Lu+2)), np.identity(2*Lu*(Lu+2)) 

88 

89 

90def compute_energy_normalisation_matrix(Lu): 

91 """ 

92 Computes the matrix normalising coefficients with respect to energy. 

93 That is: norm_tnm = sqrt(n*(n + 1)/(2 * n + 1))*tnm 

94 

95 :param Lu: max_degree of core flow U 

96 :type Lu: int 

97 :return: Energy normalisation matrix and its inverse 

98 :rtype: np.array (Nu2 x Nu2), np.array (Nu2 x Nu2) 

99 """ 

100 

101 Nu = Lu*(Lu+2) 

102 norm_factors = np.zeros(2*Nu) 

103 min_index = 0 

104 

105 for n in range(1, Lu + 1): 

106 next_index = n*(n + 2) 

107 

108 norm_factor = math.sqrt(n * (n + 1) / (2 * n + 1)) 

109 norm_factors[min_index: next_index] = norm_factor 

110 norm_factors[Nu + min_index: Nu + next_index] = norm_factor 

111 

112 min_index = next_index 

113 

114 return np.diag(norm_factors), np.diag(1./norm_factors)