Coverage for webgeodyn/inout/covobs_splines.py: 47%
161 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-18 09:33 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-18 09:33 +0000
1import os
2import glob
3import numpy as np
4import re
5import scipy.signal
6import scipy.interpolate
9def build_operator_spl_to_gnm(t_gnm, t_spl, spl_order, deriv=0):
10 """
11 Builds the operator A linking B-splines coefficients to gnm
13 .. math::
15 g_n^m = A \tilde{g}_{spl}
17 :param t_gnm: Array of times at which the gnm are given
18 :type t_gnm: np.ndarray (dim: N_gnm)
19 :param t_spl: Array giving the knot times of the splines
20 :type t_spl: np.ndarray (dim: N_spl)
21 :param spl_order: Order of the splines (passed to scipy.bspline)
22 :type spl_order: int
23 :param deriv: If equal to 1, the operator will link spline coeffs to the derivative of gnms. Default: 0.
24 :type deriv: int
25 :return: Operator linking the B-splines coefficients to the gnms
26 :rtype: np.ndarray (dim: N_gnm x N_spl)
27 """
28 N_gnm = len(t_gnm)
29 N_spl = len(t_spl)
31 if N_spl <= 1:
32 raise ValueError('Too few spline knots were given !')
34 knot_spacing = t_spl[1] - t_spl[0]
36 spl_eval_fc = eval_derivative_spline if deriv == 1 else eval_spline
38 A = np.zeros((N_gnm, N_spl))
39 for i, t_i in enumerate(t_gnm):
40 for j, t_j in enumerate(t_spl):
41 A[i, j] = spl_eval_fc(date_to_eval=t_i, center_date=t_j, spl_order=spl_order, knot_spacing=knot_spacing)
43 return A
46def build_gnm_from_file(file_name, dt=0.5, with_derivatives=False):
47 """
48 Builds gnm from the spline coeffs stored in a file. The timestep of the resulting dates can be changed.
50 :param file_name: Name of the file where the spline coeffs are stored
51 :type file_name: str
52 :param dt: Timestep of the evaluation dates (default: 0.5)
53 :type dt: float
54 :param with_derivatives: If True, returns as well dgnm/dt and d2gnm/dt2 evaluated from derivative of splines
55 :type with_derivatives: bool
56 :return: evaluation dates and the gnm at these dates (and its first and second derivative if with_derivatives is True)
57 :rtype: np.array (dim: n_dates), np.array (dim: n_dates x n_gnm) (, np.array (dim: n_dates), np.array (dim: n_dates x n_gnm))
58 """
59 center_dates, spl_coeffs, spl_order, knot_sp = read_spline_coeffs_file(file_name)
60 eval_dates = np.arange(center_dates[spl_order//2+1], center_dates[-spl_order//2-1], dt)
61 return build_gnm_from_spline_coeffs(eval_dates, center_dates, spl_coeffs, spl_order, with_derivatives)
64def eval_spline(date_to_eval, center_date, spl_order, knot_spacing):
65 """ Evaluates a spline at date_to_eval.
66 The spline is of order spl_order, spacing knot_spacing and centered on center_date."""
67 normalised_X = (date_to_eval - center_date) / knot_spacing
68 return scipy.signal.bspline(normalised_X, spl_order)
71def eval_derivative_spline(date_to_eval, center_date, spl_order, knot_spacing):
72 """ Evaluates the derivative of a spline at date_to_eval.
73 The spline from which the derivative is computed is of order spl_order, spacing knot_spacing and centered on center_date."""
75 def t_iplus(n):
76 return center_date + n*knot_spacing
78 normalised_X0 = (date_to_eval - t_iplus(-0.5)) / knot_spacing
79 normalised_X1 = (date_to_eval - t_iplus(0.5)) / knot_spacing
80 term1 = scipy.signal.bspline(normalised_X0, spl_order-1)/(t_iplus(spl_order) - t_iplus(0))
81 term2 = scipy.signal.bspline(normalised_X1, spl_order-1)/(t_iplus(spl_order+1) - t_iplus(1))
82 return spl_order*(term1 - term2)
85def get_date_indexes_for_spl_eval(eval_date, spl_dates, spl_order):
86 """ Returns the indexes of the dates of spl_dates needed to evaluate the value at eval_date."""
87 eval_date_index = np.searchsorted(spl_dates, eval_date)
88 return np.arange(eval_date_index - spl_order//2 - 1, eval_date_index + spl_order//2 + 1)
91def get_full_knots(center_knots, spl_order, n):
92 """
93 Builds the full knots array (n+spl_order+1) from a subset of knots assumed to be the center of the full knots array.
95 :param center_knots: center of the full knots array
96 :type center_knots: np.array
97 :param spl_order: order of the splines
98 :type spl_order: int
99 :param n: number of spline coeffs
100 :type n: int
101 :return:
102 :rtype:
103 """
104 # Check if len(knots) = nb_dates + spl_order + 1
105 mismatch = (n + spl_order + 1) - len(center_knots)
107 if mismatch == 0:
108 # If no mismatch, the center knots are the full knots
109 return center_knots
110 else:
111 # Else add appropriate number of knots at the beginning and the end
112 dt_knot = center_knots[1] - center_knots[0]
113 full_knots = np.arange(center_knots[0] - mismatch//2 * dt_knot, center_knots[-1] + (mismatch//2 + 1) * dt_knot, dt_knot)
114 return full_knots
117def build_gnm_from_spline_coeffs_old(dates_for_eval, spl_dates, spl_coeffs, spl_order, with_derivative, all_splines=False):
118 """
119 Old manual method to build the gnm from the spline coeffs and the parameters of splines (center_dates, order, spacing).
120 This method uses scipy.signal.bspline to evaluate manually the splines.
122 :param dates_for_eval: Dates where the splines must be computed
123 :type dates_for_eval: np.array (dim: Nt)
124 :param spl_dates: Dates of the splines (knots)
125 :type spl_dates: np.array (dim: Nspl)
126 :param spl_coeffs: Coefficients of the data in the Bspline basis
127 :type spl_coeffs: np.array (dim: Nspl x Nb)
128 :param spl_order: Order of the used splines
129 :type spl_order: int
130 :param with_derivative: If True, returns the reconstructed dgnm/dt evaluated from derivative of splines
131 :type with_derivative: bool
132 :param all_splines: If True, uses all splines to reconstruct the gnm (For debugging purposes. Default is False).
133 :type all_splines: bool
134 :return: Dates, gnm evaluated from splines (and derivative of gnm evaluated from the derivative of splines if with_derivative is True)
135 :rtype: np.array (dim: Nt), np.array (dim: Nt x Nb) (, np.array (dim: Nt x Nb))
136 """
137 n_spl_dates, Nb = spl_coeffs.shape
138 assert n_spl_dates == spl_dates.shape[0]
139 knot_spacing = spl_dates[1] - spl_dates[0]
141 if all_splines:
142 date_index_method = lambda eval_date, spl_dates, spl_order: range(len(spl_dates))
143 else:
144 date_index_method = get_date_indexes_for_spl_eval
146 gnm = np.zeros((len(dates_for_eval), Nb))
147 dgnm = np.zeros((len(dates_for_eval), Nb))
148 for i_date, eval_date in enumerate(dates_for_eval):
149 date_indexes = date_index_method(eval_date, spl_dates, spl_order)
151 for j in date_indexes:
152 center_date = spl_dates[j]
153 spl_value = eval_spline(eval_date, center_date, spl_order, knot_spacing)
154 deriv_spl_value = eval_derivative_spline(eval_date, center_date, spl_order, knot_spacing)
155 gnm[i_date] += spl_coeffs[j]*spl_value
156 dgnm[i_date] += spl_coeffs[j]*deriv_spl_value
158 if with_derivative:
159 return dates_for_eval, gnm, dgnm
160 else:
161 return dates_for_eval, gnm
164def build_gnm_from_spline_coeffs(dates_for_eval, spl_dates, spl_coeffs, spl_order, with_derivatives):
165 """
166 Method to build the gnm from the spline coeffs and the parameters of splines (center_dates, order, spacing).
167 This method uses the class scipy.interpolate.BSpline.
169 :param dates_for_eval: Dates where the splines must be computed
170 :type dates_for_eval: np.array (dim: Nt)
171 :param spl_dates: Dates of the splines (knots)
172 :type spl_dates: np.array (dim: Nspl)
173 :param spl_coeffs: Coefficients of the data in the Bspline basis
174 :type spl_coeffs: np.array (dim: Nspl x Nb)
175 :param spl_order: Order of the used splines
176 :type spl_order: int
177 :param with_derivatives: If True, returns the reconstructed dgnm/dt and d2gnm/dt2 evaluated from derivatives of splines
178 :type with_derivatives: bool
179 :return: Dates, gnm evaluated from splines (and derivatives of gnm if with_derivatives is True)
180 :rtype: np.array (dim: Nt), np.array (dim: Nt x Nb) (, np.array (dim: Nt x Nb), np.array (dim: Nt x Nb))
181 """
182 import scipy.interpolate
184 full_knots = get_full_knots(spl_dates, spl_order, len(spl_coeffs))
185 bsplines = scipy.interpolate.BSpline(full_knots, spl_coeffs, spl_order, extrapolate=False)
187 gnm = bsplines(dates_for_eval)
188 if with_derivatives:
189 return dates_for_eval, gnm, bsplines(dates_for_eval, nu=1), bsplines(dates_for_eval, nu=2)
190 else:
191 return dates_for_eval, gnm
194def read_spline_coeffs_file(file_name):
195 """
196 Extracts the spline coeffs and the spline parameters from a file
198 :param file_name: name of the spline coeffs file
199 :type file_name: str
200 :return: center dates, spline coeffs, spline order and knot spacing
201 :rtype: np.array, np.array, int, float
202 """
203 with open(file_name) as f:
204 f.readline() # Skip header with mod name
205 header = f.readline().split()
206 # Read info
207 Lb = int(header[0])
208 if Lb == 1:
209 Nb = 1 # External field has only one coeff (q10)
210 else:
211 Nb = Lb * (Lb + 2)
212 n_dates = int(header[1])
213 jorder = int(header[2])
214 center_dates = np.array(header[3:][jorder // 2:-jorder // 2], dtype=float)
215 knot_sp = center_dates[1] - center_dates[0]
216 # Create the list of coeffs and append the coeffs to it
217 read_spl_coeffs = []
218 for line in f:
219 line_coeffs = line.split()
220 read_spl_coeffs.extend(line_coeffs)
222 spl_coeffs = np.array(read_spl_coeffs, dtype=np.float64)
223 spl_coeffs = spl_coeffs.reshape((n_dates, Nb))
225 # The Jorder read in the file is in fact spl_order+1
226 # Starts at 1 for COVOBS whereas it starts at 0 for scipy.signal.bspline
227 spl_order = jorder - 1
228 return center_dates, spl_coeffs, spl_order, knot_sp
231def scaling_EF(EF_data, back_EF=20.):
232 """
233 Scales the external field (EF) data with the formula:
234 EF = (-1)*EF_data + back_EF
236 :param EF_data: External field data to scale
237 :type EF_data: np.ndarray
238 :param back_EF: Background of external field (default: 20 nT)
239 :type back_EF: float or np.ndarray
240 :return: Scaled external field data (-1)*EF_data + back_EF
241 :rtype: np.ndarray
242 """
243 return (-1)*EF_data + back_EF
246def load(dataDirectory, dataModel, keepRealisations=False):
247 """ Loading function for splines files (COV-OBS-like). Also adds the data to the dataModel.
249 :param dataDirectory: directory where the data is located
250 :type dataDirectory: str (path)
251 :param dataModel: Model to which the data should be added
252 :type dataModel: Model
253 :param keepRealisations: if True, searches for files matching real*mod* to load mean and RMS. Else, loads the latest mod* file.
254 :type keepRealisations: bool (default: False)
255 """
256 pattern = 'real*mod*' if keepRealisations else 'mod*'
258 # MF
259 gnm_times = None
260 measureFolder = os.path.join(dataDirectory, 'models')
261 generic_model_path = os.path.join(measureFolder, pattern)
262 model_files = glob.glob(str(generic_model_path))
264 if len(model_files) == 0:
265 raise FileNotFoundError('No spline files matching {} were found in {} ! Aborting loading...'.format(pattern, measureFolder))
267 if keepRealisations:
268 # Loads all the real files and computes the mean and rms
269 real_data = {'MF': [], 'SV': [], 'SA': []}
270 for real_file in model_files:
271 # print(os.path.basename(real_file))
272 times, gnm, dgnm, d2gnm = build_gnm_from_file(real_file, with_derivatives=True)
273 real_data['MF'].append(gnm)
274 real_data['SV'].append(dgnm)
275 real_data['SA'].append(d2gnm)
276 if gnm_times is None:
277 gnm_times = times
278 else:
279 assert np.allclose(times, gnm_times)
281 model_data = {}
282 for measure, data in real_data.items():
283 data_as_array = np.array(data)
284 # Real model data must have the shape (nb_times, nb_coeffs, nb_reals)
285 model_data[measure] = np.moveaxis(data_as_array, 0, 2)
286 else:
287 # Load the latest model in the directory
288 model_files.sort(reverse=True)
289 model_data = {}
290 # If no realisation, the model data has the shape (nb_times, nb_coeffs)
291 gnm_times, model_data['MF'], model_data['SV'], model_data['SA'] = build_gnm_from_file(model_files[0], with_derivatives=True)
293 # EF
294 gnm_times = None
295 measureFolder = os.path.join(dataDirectory, 'models_ext')
297 generic_model_path = os.path.join(measureFolder, pattern)
298 model_files = glob.glob(str(generic_model_path))
300 if len(model_files) == 0:
301 raise FileNotFoundError('No spline files matching {} were found in {} ! Aborting loading...'.format(pattern, measureFolder))
303 # Get the mean dipolar coefficients
304 if keepRealisations:
305 mean_dip = model_data['MF'].mean(axis=2)[:, :3]
306 else:
307 mean_dip = model_data['MF'][:, :3]
309 # Internal dipole to project from q10_cd to [q10, q11, s11]_geocentric
310 internal_dip = mean_dip / np.linalg.norm(mean_dip, axis=1).reshape(len(mean_dip), 1)
312 if keepRealisations:
313 # Loads all the real files and computes the mean and rms
314 q10_data = []
315 for real_file in model_files:
316 # print(os.path.basename(real_file))
317 times, gnm = build_gnm_from_file(real_file, with_derivatives=False)
318 q10_data.append(gnm)
319 if gnm_times is None:
320 gnm_times = times
321 else:
322 assert np.allclose(times, gnm_times)
324 q10_data = np.array(q10_data)
325 q10_data = scaling_EF(q10_data)
327 nb_reals = q10_data.shape[0]
329 real_ef_data = np.zeros((nb_reals, len(gnm_times), 3))
330 # Transform q10_cd into [q10, q11, s11]_geocentric for every time
331 for i_t in range(len(gnm_times)):
332 real_ef_data[:, i_t] = - q10_data[:, i_t]*internal_dip[i_t]
334 model_data['EF'] = np.moveaxis(real_ef_data, 0, 2)
335 else:
336 # Load the latest model in the directory
337 model_files.sort(reverse=True)
339 gnm_times, q10_data = build_gnm_from_file(model_files[0], with_derivatives=False)
340 q10_data = scaling_EF(q10_data)
342 # Transform q10_cd into [q10, q11, s11]_geocentric for every time
343 ef_data = - q10_data * internal_dip
345 model_data['EF'] = ef_data
347 for measure_name, measure_data in model_data.items():
349 if measure_name == "SV":
350 units = "nT/yr"
351 elif measure_name == "SA":
352 units = "nT/yr^2"
353 elif measure_name.endswith('F'):
354 units = "nT"
355 else:
356 units = ""
358 Nb = measure_data.shape[1]
359 Lb = np.sqrt(Nb + 1) - 1
360 lmax = int(Lb)
361 # Assert that Nb gives an integer Lb
362 assert Lb == lmax
363 dataModel.addMeasure(measure_name, measure_name, lmax, units, data=measure_data, times=gnm_times)