Coverage for webgeodyn/inout/covobs_splines.py: 47%

161 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-12-18 09:33 +0000

1import os 

2import glob 

3import numpy as np 

4import re 

5import scipy.signal 

6import scipy.interpolate 

7 

8 

9def build_operator_spl_to_gnm(t_gnm, t_spl, spl_order, deriv=0): 

10 """ 

11 Builds the operator A linking B-splines coefficients to gnm 

12 

13 .. math:: 

14 

15 g_n^m = A \tilde{g}_{spl} 

16 

17 :param t_gnm: Array of times at which the gnm are given 

18 :type t_gnm: np.ndarray (dim: N_gnm) 

19 :param t_spl: Array giving the knot times of the splines 

20 :type t_spl: np.ndarray (dim: N_spl) 

21 :param spl_order: Order of the splines (passed to scipy.bspline) 

22 :type spl_order: int 

23 :param deriv: If equal to 1, the operator will link spline coeffs to the derivative of gnms. Default: 0. 

24 :type deriv: int 

25 :return: Operator linking the B-splines coefficients to the gnms 

26 :rtype: np.ndarray (dim: N_gnm x N_spl) 

27 """ 

28 N_gnm = len(t_gnm) 

29 N_spl = len(t_spl) 

30 

31 if N_spl <= 1: 

32 raise ValueError('Too few spline knots were given !') 

33 

34 knot_spacing = t_spl[1] - t_spl[0] 

35 

36 spl_eval_fc = eval_derivative_spline if deriv == 1 else eval_spline 

37 

38 A = np.zeros((N_gnm, N_spl)) 

39 for i, t_i in enumerate(t_gnm): 

40 for j, t_j in enumerate(t_spl): 

41 A[i, j] = spl_eval_fc(date_to_eval=t_i, center_date=t_j, spl_order=spl_order, knot_spacing=knot_spacing) 

42 

43 return A 

44 

45 

46def build_gnm_from_file(file_name, dt=0.5, with_derivatives=False): 

47 """ 

48 Builds gnm from the spline coeffs stored in a file. The timestep of the resulting dates can be changed. 

49 

50 :param file_name: Name of the file where the spline coeffs are stored 

51 :type file_name: str 

52 :param dt: Timestep of the evaluation dates (default: 0.5) 

53 :type dt: float 

54 :param with_derivatives: If True, returns as well dgnm/dt and d2gnm/dt2 evaluated from derivative of splines 

55 :type with_derivatives: bool 

56 :return: evaluation dates and the gnm at these dates (and its first and second derivative if with_derivatives is True) 

57 :rtype: np.array (dim: n_dates), np.array (dim: n_dates x n_gnm) (, np.array (dim: n_dates), np.array (dim: n_dates x n_gnm)) 

58 """ 

59 center_dates, spl_coeffs, spl_order, knot_sp = read_spline_coeffs_file(file_name) 

60 eval_dates = np.arange(center_dates[spl_order//2+1], center_dates[-spl_order//2-1], dt) 

61 return build_gnm_from_spline_coeffs(eval_dates, center_dates, spl_coeffs, spl_order, with_derivatives) 

62 

63 

64def eval_spline(date_to_eval, center_date, spl_order, knot_spacing): 

65 """ Evaluates a spline at date_to_eval. 

66 The spline is of order spl_order, spacing knot_spacing and centered on center_date.""" 

67 normalised_X = (date_to_eval - center_date) / knot_spacing 

68 return scipy.signal.bspline(normalised_X, spl_order) 

69 

70 

71def eval_derivative_spline(date_to_eval, center_date, spl_order, knot_spacing): 

72 """ Evaluates the derivative of a spline at date_to_eval. 

73 The spline from which the derivative is computed is of order spl_order, spacing knot_spacing and centered on center_date.""" 

74 

75 def t_iplus(n): 

76 return center_date + n*knot_spacing 

77 

78 normalised_X0 = (date_to_eval - t_iplus(-0.5)) / knot_spacing 

79 normalised_X1 = (date_to_eval - t_iplus(0.5)) / knot_spacing 

80 term1 = scipy.signal.bspline(normalised_X0, spl_order-1)/(t_iplus(spl_order) - t_iplus(0)) 

81 term2 = scipy.signal.bspline(normalised_X1, spl_order-1)/(t_iplus(spl_order+1) - t_iplus(1)) 

82 return spl_order*(term1 - term2) 

83 

84 

85def get_date_indexes_for_spl_eval(eval_date, spl_dates, spl_order): 

86 """ Returns the indexes of the dates of spl_dates needed to evaluate the value at eval_date.""" 

87 eval_date_index = np.searchsorted(spl_dates, eval_date) 

88 return np.arange(eval_date_index - spl_order//2 - 1, eval_date_index + spl_order//2 + 1) 

89 

90 

91def get_full_knots(center_knots, spl_order, n): 

92 """ 

93 Builds the full knots array (n+spl_order+1) from a subset of knots assumed to be the center of the full knots array. 

94 

95 :param center_knots: center of the full knots array 

96 :type center_knots: np.array 

97 :param spl_order: order of the splines 

98 :type spl_order: int 

99 :param n: number of spline coeffs 

100 :type n: int 

101 :return: 

102 :rtype: 

103 """ 

104 # Check if len(knots) = nb_dates + spl_order + 1 

105 mismatch = (n + spl_order + 1) - len(center_knots) 

106 

107 if mismatch == 0: 

108 # If no mismatch, the center knots are the full knots 

109 return center_knots 

110 else: 

111 # Else add appropriate number of knots at the beginning and the end 

112 dt_knot = center_knots[1] - center_knots[0] 

113 full_knots = np.arange(center_knots[0] - mismatch//2 * dt_knot, center_knots[-1] + (mismatch//2 + 1) * dt_knot, dt_knot) 

114 return full_knots 

115 

116 

117def build_gnm_from_spline_coeffs_old(dates_for_eval, spl_dates, spl_coeffs, spl_order, with_derivative, all_splines=False): 

118 """ 

119 Old manual method to build the gnm from the spline coeffs and the parameters of splines (center_dates, order, spacing). 

120 This method uses scipy.signal.bspline to evaluate manually the splines. 

121 

122 :param dates_for_eval: Dates where the splines must be computed 

123 :type dates_for_eval: np.array (dim: Nt) 

124 :param spl_dates: Dates of the splines (knots) 

125 :type spl_dates: np.array (dim: Nspl) 

126 :param spl_coeffs: Coefficients of the data in the Bspline basis 

127 :type spl_coeffs: np.array (dim: Nspl x Nb) 

128 :param spl_order: Order of the used splines 

129 :type spl_order: int 

130 :param with_derivative: If True, returns the reconstructed dgnm/dt evaluated from derivative of splines 

131 :type with_derivative: bool 

132 :param all_splines: If True, uses all splines to reconstruct the gnm (For debugging purposes. Default is False). 

133 :type all_splines: bool 

134 :return: Dates, gnm evaluated from splines (and derivative of gnm evaluated from the derivative of splines if with_derivative is True) 

135 :rtype: np.array (dim: Nt), np.array (dim: Nt x Nb) (, np.array (dim: Nt x Nb)) 

136 """ 

137 n_spl_dates, Nb = spl_coeffs.shape 

138 assert n_spl_dates == spl_dates.shape[0] 

139 knot_spacing = spl_dates[1] - spl_dates[0] 

140 

141 if all_splines: 

142 date_index_method = lambda eval_date, spl_dates, spl_order: range(len(spl_dates)) 

143 else: 

144 date_index_method = get_date_indexes_for_spl_eval 

145 

146 gnm = np.zeros((len(dates_for_eval), Nb)) 

147 dgnm = np.zeros((len(dates_for_eval), Nb)) 

148 for i_date, eval_date in enumerate(dates_for_eval): 

149 date_indexes = date_index_method(eval_date, spl_dates, spl_order) 

150 

151 for j in date_indexes: 

152 center_date = spl_dates[j] 

153 spl_value = eval_spline(eval_date, center_date, spl_order, knot_spacing) 

154 deriv_spl_value = eval_derivative_spline(eval_date, center_date, spl_order, knot_spacing) 

155 gnm[i_date] += spl_coeffs[j]*spl_value 

156 dgnm[i_date] += spl_coeffs[j]*deriv_spl_value 

157 

158 if with_derivative: 

159 return dates_for_eval, gnm, dgnm 

160 else: 

161 return dates_for_eval, gnm 

162 

163 

164def build_gnm_from_spline_coeffs(dates_for_eval, spl_dates, spl_coeffs, spl_order, with_derivatives): 

165 """ 

166 Method to build the gnm from the spline coeffs and the parameters of splines (center_dates, order, spacing). 

167 This method uses the class scipy.interpolate.BSpline. 

168 

169 :param dates_for_eval: Dates where the splines must be computed 

170 :type dates_for_eval: np.array (dim: Nt) 

171 :param spl_dates: Dates of the splines (knots) 

172 :type spl_dates: np.array (dim: Nspl) 

173 :param spl_coeffs: Coefficients of the data in the Bspline basis 

174 :type spl_coeffs: np.array (dim: Nspl x Nb) 

175 :param spl_order: Order of the used splines 

176 :type spl_order: int 

177 :param with_derivatives: If True, returns the reconstructed dgnm/dt and d2gnm/dt2 evaluated from derivatives of splines 

178 :type with_derivatives: bool 

179 :return: Dates, gnm evaluated from splines (and derivatives of gnm if with_derivatives is True) 

180 :rtype: np.array (dim: Nt), np.array (dim: Nt x Nb) (, np.array (dim: Nt x Nb), np.array (dim: Nt x Nb)) 

181 """ 

182 import scipy.interpolate 

183 

184 full_knots = get_full_knots(spl_dates, spl_order, len(spl_coeffs)) 

185 bsplines = scipy.interpolate.BSpline(full_knots, spl_coeffs, spl_order, extrapolate=False) 

186 

187 gnm = bsplines(dates_for_eval) 

188 if with_derivatives: 

189 return dates_for_eval, gnm, bsplines(dates_for_eval, nu=1), bsplines(dates_for_eval, nu=2) 

190 else: 

191 return dates_for_eval, gnm 

192 

193 

194def read_spline_coeffs_file(file_name): 

195 """ 

196 Extracts the spline coeffs and the spline parameters from a file 

197 

198 :param file_name: name of the spline coeffs file 

199 :type file_name: str 

200 :return: center dates, spline coeffs, spline order and knot spacing 

201 :rtype: np.array, np.array, int, float 

202 """ 

203 with open(file_name) as f: 

204 f.readline() # Skip header with mod name 

205 header = f.readline().split() 

206 # Read info 

207 Lb = int(header[0]) 

208 if Lb == 1: 

209 Nb = 1 # External field has only one coeff (q10) 

210 else: 

211 Nb = Lb * (Lb + 2) 

212 n_dates = int(header[1]) 

213 jorder = int(header[2]) 

214 center_dates = np.array(header[3:][jorder // 2:-jorder // 2], dtype=float) 

215 knot_sp = center_dates[1] - center_dates[0] 

216 # Create the list of coeffs and append the coeffs to it 

217 read_spl_coeffs = [] 

218 for line in f: 

219 line_coeffs = line.split() 

220 read_spl_coeffs.extend(line_coeffs) 

221 

222 spl_coeffs = np.array(read_spl_coeffs, dtype=np.float64) 

223 spl_coeffs = spl_coeffs.reshape((n_dates, Nb)) 

224 

225 # The Jorder read in the file is in fact spl_order+1 

226 # Starts at 1 for COVOBS whereas it starts at 0 for scipy.signal.bspline 

227 spl_order = jorder - 1 

228 return center_dates, spl_coeffs, spl_order, knot_sp 

229 

230 

231def scaling_EF(EF_data, back_EF=20.): 

232 """ 

233 Scales the external field (EF) data with the formula: 

234 EF = (-1)*EF_data + back_EF 

235 

236 :param EF_data: External field data to scale 

237 :type EF_data: np.ndarray 

238 :param back_EF: Background of external field (default: 20 nT) 

239 :type back_EF: float or np.ndarray 

240 :return: Scaled external field data (-1)*EF_data + back_EF 

241 :rtype: np.ndarray 

242 """ 

243 return (-1)*EF_data + back_EF 

244 

245 

246def load(dataDirectory, dataModel, keepRealisations=False): 

247 """ Loading function for splines files (COV-OBS-like). Also adds the data to the dataModel. 

248 

249 :param dataDirectory: directory where the data is located 

250 :type dataDirectory: str (path) 

251 :param dataModel: Model to which the data should be added 

252 :type dataModel: Model 

253 :param keepRealisations: if True, searches for files matching real*mod* to load mean and RMS. Else, loads the latest mod* file. 

254 :type keepRealisations: bool (default: False) 

255 """ 

256 pattern = 'real*mod*' if keepRealisations else 'mod*' 

257 

258 # MF 

259 gnm_times = None 

260 measureFolder = os.path.join(dataDirectory, 'models') 

261 generic_model_path = os.path.join(measureFolder, pattern) 

262 model_files = glob.glob(str(generic_model_path)) 

263 

264 if len(model_files) == 0: 

265 raise FileNotFoundError('No spline files matching {} were found in {} ! Aborting loading...'.format(pattern, measureFolder)) 

266 

267 if keepRealisations: 

268 # Loads all the real files and computes the mean and rms 

269 real_data = {'MF': [], 'SV': [], 'SA': []} 

270 for real_file in model_files: 

271 # print(os.path.basename(real_file)) 

272 times, gnm, dgnm, d2gnm = build_gnm_from_file(real_file, with_derivatives=True) 

273 real_data['MF'].append(gnm) 

274 real_data['SV'].append(dgnm) 

275 real_data['SA'].append(d2gnm) 

276 if gnm_times is None: 

277 gnm_times = times 

278 else: 

279 assert np.allclose(times, gnm_times) 

280 

281 model_data = {} 

282 for measure, data in real_data.items(): 

283 data_as_array = np.array(data) 

284 # Real model data must have the shape (nb_times, nb_coeffs, nb_reals) 

285 model_data[measure] = np.moveaxis(data_as_array, 0, 2) 

286 else: 

287 # Load the latest model in the directory 

288 model_files.sort(reverse=True) 

289 model_data = {} 

290 # If no realisation, the model data has the shape (nb_times, nb_coeffs) 

291 gnm_times, model_data['MF'], model_data['SV'], model_data['SA'] = build_gnm_from_file(model_files[0], with_derivatives=True) 

292 

293 # EF 

294 gnm_times = None 

295 measureFolder = os.path.join(dataDirectory, 'models_ext') 

296 

297 generic_model_path = os.path.join(measureFolder, pattern) 

298 model_files = glob.glob(str(generic_model_path)) 

299 

300 if len(model_files) == 0: 

301 raise FileNotFoundError('No spline files matching {} were found in {} ! Aborting loading...'.format(pattern, measureFolder)) 

302 

303 # Get the mean dipolar coefficients 

304 if keepRealisations: 

305 mean_dip = model_data['MF'].mean(axis=2)[:, :3] 

306 else: 

307 mean_dip = model_data['MF'][:, :3] 

308 

309 # Internal dipole to project from q10_cd to [q10, q11, s11]_geocentric 

310 internal_dip = mean_dip / np.linalg.norm(mean_dip, axis=1).reshape(len(mean_dip), 1) 

311 

312 if keepRealisations: 

313 # Loads all the real files and computes the mean and rms 

314 q10_data = [] 

315 for real_file in model_files: 

316 # print(os.path.basename(real_file)) 

317 times, gnm = build_gnm_from_file(real_file, with_derivatives=False) 

318 q10_data.append(gnm) 

319 if gnm_times is None: 

320 gnm_times = times 

321 else: 

322 assert np.allclose(times, gnm_times) 

323 

324 q10_data = np.array(q10_data) 

325 q10_data = scaling_EF(q10_data) 

326 

327 nb_reals = q10_data.shape[0] 

328 

329 real_ef_data = np.zeros((nb_reals, len(gnm_times), 3)) 

330 # Transform q10_cd into [q10, q11, s11]_geocentric for every time 

331 for i_t in range(len(gnm_times)): 

332 real_ef_data[:, i_t] = - q10_data[:, i_t]*internal_dip[i_t] 

333 

334 model_data['EF'] = np.moveaxis(real_ef_data, 0, 2) 

335 else: 

336 # Load the latest model in the directory 

337 model_files.sort(reverse=True) 

338 

339 gnm_times, q10_data = build_gnm_from_file(model_files[0], with_derivatives=False) 

340 q10_data = scaling_EF(q10_data) 

341 

342 # Transform q10_cd into [q10, q11, s11]_geocentric for every time 

343 ef_data = - q10_data * internal_dip 

344 

345 model_data['EF'] = ef_data 

346 

347 for measure_name, measure_data in model_data.items(): 

348 

349 if measure_name == "SV": 

350 units = "nT/yr" 

351 elif measure_name == "SA": 

352 units = "nT/yr^2" 

353 elif measure_name.endswith('F'): 

354 units = "nT" 

355 else: 

356 units = "" 

357 

358 Nb = measure_data.shape[1] 

359 Lb = np.sqrt(Nb + 1) - 1 

360 lmax = int(Lb) 

361 # Assert that Nb gives an integer Lb 

362 assert Lb == lmax 

363 dataModel.addMeasure(measure_name, measure_name, lmax, units, data=measure_data, times=gnm_times)