Coverage for pygeodyn/utilities.py: 81%
103 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-22 13:43 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-22 13:43 +0000
1#-*- coding: utf-8 -*-
2"""
3Utility functions
4"""
5import h5py
6import math
7import cdflib
8import numpy as np
9import scipy.interpolate
11def decimal_to_date(decimal_date, month_shift=0):
12 """
13 Converts a decimal date in a datetime64 object.
14 Assumes decimal_date = YEAR + (MONTH + SHIFT)/12.
16 :param decimal_date: date to convert
17 :type decimal_date: float
18 :param month_shift: shift of the month. Integer dates have Dec as month with 0, whereas they have Jan with 1.
19 :type month_shift: 0 or 1
20 :return: date converted
21 :rtype: np.datetime64
22 """
23 year = int(decimal_date)
24 month = int(round((decimal_date - year) * 12, 0)) + month_shift
25 # For Dec (year + 12/12), the previous formula returns 0 (if shift is 0) so need to treat the case apart
26 if month == 0:
27 year = year - 1
28 month = 12
29 return np.datetime64('{}-{:02}'.format(year, month))
32def date_to_decimal(date, nb_decimals=None):
33 """
34 Converts a datetime64 object in a decimal date.
35 Returns YEAR + MONTH/12. rounded to the number of decimals given
37 :param date: date to convert
38 :type date: np.datetime64
39 :param nb_decimals: number of decimals for round-off (default: None)
40 :type nb_decimals: int or None
41 :return: date in decimal format
42 :rtype: float
43 """
44 if date != date: # special case for np.nan
45 return np.float('nan')
47 year, month = str(date).split('-')
48 if nb_decimals is None:
49 return int(year) + (int(month)) / 12.
50 else:
51 return round(int(year) + (int(month)) / 12., nb_decimals)
54def cdf_times_to_np_date(times):
55 """
56 Transform the times in cdf epoch into numpy dates, rounding month to nearest.
57 """
58 dates = []
59 for t in times:
60 if t != t:
61 # if time is a nan, date should be a nan as well
62 dates.append(np.float('nan'))
63 continue
64 year, month, day = cdflib.cdfepoch.breakdown_epoch(t)[:3]
65 if day > 15:
66 if month == 12:
67 year += 1
68 month = 1
69 else:
70 month += 1
71 dates.append(np.datetime64('{}-{:02}'.format(year, month)))
72 return dates
74def date_array_to_decimal(date_array, nb_decimals=None):
75 """
76 Uses date_to_decimal to convert a 1D array of datetime64 in a 1D array of floating numbers.
78 :param date_array: array to convert
79 :type date_array: 1D np.array of np.datetime64
80 :param nb_decimals: Number of decimals for round-off (default: None)
81 :type nb_decimals: int or None
82 :return: Same array with dates as floating numbers
83 :rtype: 1D np.array
84 """
85 return np.array([date_to_decimal(date, nb_decimals) for date in date_array])
88def eval_Plm(z, thetas, legendre_poly_values_at_thetas, parity):
89 """
90 Evaluate the Legendre polynomial at l,m at an arbitrary z using interpolation.
92 :param z: Value where the Legendre polynomial must be evaluated
93 :type z: float
94 :param thetas: List of the angles used for the computation of Legendre polynomials
95 :type thetas: np.array
96 :param legendre_poly_values_at_thetas: List of the Legendre polynomials value for all cos(thetas) (at fixed l,m) (same dimension as thetas)
97 :type legendre_poly_values_at_thetas: np.array
98 :return: Interpolated value of the Legendre polynomial at z
99 :rtype: float
100 """
101 # Treat obvious cases first
102 if z == 1:
103 return 1
104 elif z == -1:
105 return (-1) ** parity
106 else:
107 f = interpolate_Plm(thetas, legendre_poly_values_at_thetas, kind='linear')
108 return f(z)
111def get_degree_order(k):
112 """
113 Gets the degree, order and coefficient name ("g" or "h") of a coefficient.
115 :param k: index of the coefficient
116 :type k: int
117 :return: degree, order and coef name of the coefficient
118 :rtype: int, int, str
119 """
121 assert (int(k) == k and k >= 0)
123 # For m = 0, k = l**2 -1.
124 floating_sqrt_result = math.sqrt(k+1)
125 degree = int(floating_sqrt_result)
126 if degree == floating_sqrt_result:
127 return degree, 0, "g"
129 # We need now to find m verifying:
130 # for g : k = l**2 + 2m - 2 => 2m = k - l**2 + 2
131 # OR for h : k = l**2 + 2m - 1 => 2m = k - l**2 + 1
132 twice_order = k - degree*degree + 2
133 if twice_order % 2 == 0:
134 coef = "g"
135 else:
136 coef = "h"
137 # If twice_order is not divisible by 2, it means that the 2m is in fact (twice_order - 1)
138 twice_order = twice_order - 1
140 order = twice_order // 2
141 return degree, order, coef
143def north_geomagnetic_pole_angle(g_10, g_11, h_11):
144 """
145 Given the first three elements of the spherical harmonic decomposition,
146 return the angle theta and phi of the geomagnetic pole.
147 Details in:
148 Hulot, G., et al. "The present and future geomagnetic field." (2015): 33-78.
149 """
150 g_vec = np.array([g_10, g_11, h_11])
151 m_0 = np.sqrt(g_vec @ g_vec)
152 return np.pi - np.arccos(g_10 / m_0), -np.pi + np.arctan2(h_11, g_11)
154def geomagnetic_latitude(theta, phi, theta_0, phi_0):
155 """
156 Computes the geomagnetic (or dipole) latitude.
157 Angle must be given in radians.
158 theta_0: latitude of the north geomagnetic pole
159 phi_0: longitude of the north geomagnetic pole
160 """
161 cos_term = np.cos(theta) * np.cos(theta_0)
162 sin_term = np.sin(theta) * np.sin(theta_0) * np.cos(phi - phi_0)
163 return np.arccos(cos_term + sin_term)
165def extract_dipole_chaos(chaos_file):
166 """
167 Extract the coefficients of the dipole (g_10, g_11, h_11) from chaos_file,
168 at all available times.
169 """
170 with h5py.File(chaos_file, 'r') as hf:
171 gnms = np.array(hf['gnm'])
172 times = np.array(hf['times'])
173 return times, gnms[:, :3]
175def interpolate_Plm(thetas, legendre_poly_values_at_thetas, l, m, **kwargs):
176 # Set fill values for extremal values (-1, 1)
177 if m == 0:
178 fill_values = ((-1) ** l, 1)
179 else:
180 fill_values = 0
182 return scipy.interpolate.interp1d(np.cos(thetas), legendre_poly_values_at_thetas, bounds_error=False,
183 fill_value=fill_values, **kwargs)
186def zonal_indexes(max_degree):
187 """
188 Generates the indexes of zonal coefficients t_n^0.
190 :param max_degree: Maximum value of n
191 :type max_degree: int
192 """
193 for n in range(max_degree):
194 yield n*(n+2)
197def zonal_mask(max_degree):
198 """
199 Generates a mask to select indexes of zonal coefficients t_n^(m=0).
201 :param max_degree: Maximum value of n
202 :type max_degree: int
203 """
204 Nu2 = 2 * max_degree * (max_degree + 2)
205 zonal_mask = np.zeros(Nu2, bool) # False everywhere...
206 zonal_mask[[i_z for i_z in zonal_indexes(max_degree)]] = True # ...except for zonal indexes
207 return zonal_mask
210def non_zonal_mask(max_degree):
211 """
212 Generates a mask to select indexes of non zonal coefficients t_n^(m!=0).
214 :param max_degree: Maximum value of n
215 :type max_degree: int
216 """
217 return np.logical_not(zonal_mask(max_degree))
220def spectral_to_znz_matrix(max_degree):
221 """
222 Generates the permutation matrix P linking U stored in spectral order to U stored in zonal-non_zonal order
224 znz_U = P @ spectral_U
226 :param max_degree: Maximum value of n
227 :type max_degree: int
228 :return: Matrix linking spectral U to zonal-non_zonal U
229 :rtype: np.array (dim: Nu2 x Nu2)
230 """
231 zonals = zonal_mask(max_degree)
232 non_zonals = non_zonal_mask(max_degree)
233 Nu2 = len(zonals)
235 P = np.zeros((Nu2, Nu2))
236 # Link the first members to zonal coefs
237 P[[i for i in range(max_degree)], zonals] = 1
238 # Link the rest to non zonal coefs
239 P[[i for i in range(max_degree, Nu2)], non_zonals] = 1
241 return P
243def get_seeds_for_obs(seed, obs_type_to_use):
244 """
245 Get a seed for each observatory to noise the MF or SV data
246 """
247 # maximum number of seeds is equal to 2 times the number of keys in the dict
248 # even are for MF, odds for SV
249 np.random.seed(seed)
250 seeds = np.random.randint(0, 50000, size=len(obs_type_to_use.keys())*2)
251 seeds_dict = {}
252 for i, (obs, measure) in enumerate(obs_type_to_use.items()):
253 if type(measure) == str: # if keys of obs_select are written in the simple form 'MF'
254 seeds_dict[obs, measure] = seeds[2*i]
255 else: # else the keys of obs_select are written as ('MF',) or ('MF', 'SV')
256 seeds_dict[obs, measure[0]] = seeds[2*i]
257 if len(measure) > 1:
258 seeds_dict[obs, measure[1]] = seeds[2*i+1]
259 return seeds_dict