Coverage for pygeodyn/utilities.py: 81%

103 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-12-22 13:43 +0000

1#-*- coding: utf-8 -*- 

2""" 

3Utility functions 

4""" 

5import h5py 

6import math 

7import cdflib 

8import numpy as np 

9import scipy.interpolate 

10 

11def decimal_to_date(decimal_date, month_shift=0): 

12 """ 

13 Converts a decimal date in a datetime64 object. 

14 Assumes decimal_date = YEAR + (MONTH + SHIFT)/12. 

15 

16 :param decimal_date: date to convert 

17 :type decimal_date: float 

18 :param month_shift: shift of the month. Integer dates have Dec as month with 0, whereas they have Jan with 1. 

19 :type month_shift: 0 or 1 

20 :return: date converted 

21 :rtype: np.datetime64 

22 """ 

23 year = int(decimal_date) 

24 month = int(round((decimal_date - year) * 12, 0)) + month_shift 

25 # For Dec (year + 12/12), the previous formula returns 0 (if shift is 0) so need to treat the case apart 

26 if month == 0: 

27 year = year - 1 

28 month = 12 

29 return np.datetime64('{}-{:02}'.format(year, month)) 

30 

31 

32def date_to_decimal(date, nb_decimals=None): 

33 """ 

34 Converts a datetime64 object in a decimal date. 

35 Returns YEAR + MONTH/12. rounded to the number of decimals given 

36 

37 :param date: date to convert 

38 :type date: np.datetime64 

39 :param nb_decimals: number of decimals for round-off (default: None) 

40 :type nb_decimals: int or None 

41 :return: date in decimal format 

42 :rtype: float 

43 """ 

44 if date != date: # special case for np.nan 

45 return np.float('nan') 

46 

47 year, month = str(date).split('-') 

48 if nb_decimals is None: 

49 return int(year) + (int(month)) / 12. 

50 else: 

51 return round(int(year) + (int(month)) / 12., nb_decimals) 

52 

53 

54def cdf_times_to_np_date(times): 

55 """ 

56 Transform the times in cdf epoch into numpy dates, rounding month to nearest. 

57 """ 

58 dates = [] 

59 for t in times: 

60 if t != t: 

61 # if time is a nan, date should be a nan as well 

62 dates.append(np.float('nan')) 

63 continue 

64 year, month, day = cdflib.cdfepoch.breakdown_epoch(t)[:3] 

65 if day > 15: 

66 if month == 12: 

67 year += 1 

68 month = 1 

69 else: 

70 month += 1 

71 dates.append(np.datetime64('{}-{:02}'.format(year, month))) 

72 return dates 

73 

74def date_array_to_decimal(date_array, nb_decimals=None): 

75 """ 

76 Uses date_to_decimal to convert a 1D array of datetime64 in a 1D array of floating numbers. 

77 

78 :param date_array: array to convert 

79 :type date_array: 1D np.array of np.datetime64 

80 :param nb_decimals: Number of decimals for round-off (default: None) 

81 :type nb_decimals: int or None 

82 :return: Same array with dates as floating numbers 

83 :rtype: 1D np.array 

84 """ 

85 return np.array([date_to_decimal(date, nb_decimals) for date in date_array]) 

86 

87 

88def eval_Plm(z, thetas, legendre_poly_values_at_thetas, parity): 

89 """ 

90 Evaluate the Legendre polynomial at l,m at an arbitrary z using interpolation. 

91 

92 :param z: Value where the Legendre polynomial must be evaluated 

93 :type z: float 

94 :param thetas: List of the angles used for the computation of Legendre polynomials 

95 :type thetas: np.array 

96 :param legendre_poly_values_at_thetas: List of the Legendre polynomials value for all cos(thetas) (at fixed l,m) (same dimension as thetas) 

97 :type legendre_poly_values_at_thetas: np.array 

98 :return: Interpolated value of the Legendre polynomial at z 

99 :rtype: float 

100 """ 

101 # Treat obvious cases first 

102 if z == 1: 

103 return 1 

104 elif z == -1: 

105 return (-1) ** parity 

106 else: 

107 f = interpolate_Plm(thetas, legendre_poly_values_at_thetas, kind='linear') 

108 return f(z) 

109 

110 

111def get_degree_order(k): 

112 """ 

113 Gets the degree, order and coefficient name ("g" or "h") of a coefficient. 

114 

115 :param k: index of the coefficient 

116 :type k: int 

117 :return: degree, order and coef name of the coefficient 

118 :rtype: int, int, str 

119 """ 

120 

121 assert (int(k) == k and k >= 0) 

122 

123 # For m = 0, k = l**2 -1. 

124 floating_sqrt_result = math.sqrt(k+1) 

125 degree = int(floating_sqrt_result) 

126 if degree == floating_sqrt_result: 

127 return degree, 0, "g" 

128 

129 # We need now to find m verifying: 

130 # for g : k = l**2 + 2m - 2 => 2m = k - l**2 + 2 

131 # OR for h : k = l**2 + 2m - 1 => 2m = k - l**2 + 1 

132 twice_order = k - degree*degree + 2 

133 if twice_order % 2 == 0: 

134 coef = "g" 

135 else: 

136 coef = "h" 

137 # If twice_order is not divisible by 2, it means that the 2m is in fact (twice_order - 1) 

138 twice_order = twice_order - 1 

139 

140 order = twice_order // 2 

141 return degree, order, coef 

142 

143def north_geomagnetic_pole_angle(g_10, g_11, h_11): 

144 """ 

145 Given the first three elements of the spherical harmonic decomposition, 

146 return the angle theta and phi of the geomagnetic pole. 

147 Details in: 

148 Hulot, G., et al. "The present and future geomagnetic field." (2015): 33-78. 

149 """ 

150 g_vec = np.array([g_10, g_11, h_11]) 

151 m_0 = np.sqrt(g_vec @ g_vec) 

152 return np.pi - np.arccos(g_10 / m_0), -np.pi + np.arctan2(h_11, g_11) 

153 

154def geomagnetic_latitude(theta, phi, theta_0, phi_0): 

155 """ 

156 Computes the geomagnetic (or dipole) latitude. 

157 Angle must be given in radians. 

158 theta_0: latitude of the north geomagnetic pole 

159 phi_0: longitude of the north geomagnetic pole 

160 """ 

161 cos_term = np.cos(theta) * np.cos(theta_0) 

162 sin_term = np.sin(theta) * np.sin(theta_0) * np.cos(phi - phi_0) 

163 return np.arccos(cos_term + sin_term) 

164 

165def extract_dipole_chaos(chaos_file): 

166 """ 

167 Extract the coefficients of the dipole (g_10, g_11, h_11) from chaos_file, 

168 at all available times. 

169 """ 

170 with h5py.File(chaos_file, 'r') as hf: 

171 gnms = np.array(hf['gnm']) 

172 times = np.array(hf['times']) 

173 return times, gnms[:, :3] 

174 

175def interpolate_Plm(thetas, legendre_poly_values_at_thetas, l, m, **kwargs): 

176 # Set fill values for extremal values (-1, 1) 

177 if m == 0: 

178 fill_values = ((-1) ** l, 1) 

179 else: 

180 fill_values = 0 

181 

182 return scipy.interpolate.interp1d(np.cos(thetas), legendre_poly_values_at_thetas, bounds_error=False, 

183 fill_value=fill_values, **kwargs) 

184 

185 

186def zonal_indexes(max_degree): 

187 """ 

188 Generates the indexes of zonal coefficients t_n^0. 

189 

190 :param max_degree: Maximum value of n 

191 :type max_degree: int 

192 """ 

193 for n in range(max_degree): 

194 yield n*(n+2) 

195 

196 

197def zonal_mask(max_degree): 

198 """ 

199 Generates a mask to select indexes of zonal coefficients t_n^(m=0). 

200 

201 :param max_degree: Maximum value of n 

202 :type max_degree: int 

203 """ 

204 Nu2 = 2 * max_degree * (max_degree + 2) 

205 zonal_mask = np.zeros(Nu2, bool) # False everywhere... 

206 zonal_mask[[i_z for i_z in zonal_indexes(max_degree)]] = True # ...except for zonal indexes 

207 return zonal_mask 

208 

209 

210def non_zonal_mask(max_degree): 

211 """ 

212 Generates a mask to select indexes of non zonal coefficients t_n^(m!=0). 

213 

214 :param max_degree: Maximum value of n 

215 :type max_degree: int 

216 """ 

217 return np.logical_not(zonal_mask(max_degree)) 

218 

219 

220def spectral_to_znz_matrix(max_degree): 

221 """ 

222 Generates the permutation matrix P linking U stored in spectral order to U stored in zonal-non_zonal order 

223 

224 znz_U = P @ spectral_U 

225 

226 :param max_degree: Maximum value of n 

227 :type max_degree: int 

228 :return: Matrix linking spectral U to zonal-non_zonal U 

229 :rtype: np.array (dim: Nu2 x Nu2) 

230 """ 

231 zonals = zonal_mask(max_degree) 

232 non_zonals = non_zonal_mask(max_degree) 

233 Nu2 = len(zonals) 

234 

235 P = np.zeros((Nu2, Nu2)) 

236 # Link the first members to zonal coefs 

237 P[[i for i in range(max_degree)], zonals] = 1 

238 # Link the rest to non zonal coefs 

239 P[[i for i in range(max_degree, Nu2)], non_zonals] = 1 

240 

241 return P 

242 

243def get_seeds_for_obs(seed, obs_type_to_use): 

244 """ 

245 Get a seed for each observatory to noise the MF or SV data 

246 """ 

247 # maximum number of seeds is equal to 2 times the number of keys in the dict 

248 # even are for MF, odds for SV 

249 np.random.seed(seed) 

250 seeds = np.random.randint(0, 50000, size=len(obs_type_to_use.keys())*2) 

251 seeds_dict = {} 

252 for i, (obs, measure) in enumerate(obs_type_to_use.items()): 

253 if type(measure) == str: # if keys of obs_select are written in the simple form 'MF' 

254 seeds_dict[obs, measure] = seeds[2*i] 

255 else: # else the keys of obs_select are written as ('MF',) or ('MF', 'SV') 

256 seeds_dict[obs, measure[0]] = seeds[2*i] 

257 if len(measure) > 1: 

258 seeds_dict[obs, measure[1]] = seeds[2*i+1] 

259 return seeds_dict