Coverage for pygeodyn/inout/reads.py: 72%
200 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-22 13:43 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-22 13:43 +0000
1""" Contains generic reading functions """
2import numpy as np
3from collections import defaultdict
4import os.path
5import logging
6import h5py
7from . import priors as priors_module
8from .. import utilities
9import cdflib
10import h5py
11from operator import add
13def extract_realisations(prior_directory, prior_type, dt_sampling, measures=None):
14 """
15 Return prior data arrays (nb_realisations x nb_coefs) by dynamically getting the reading function from the prior type.
17 :param prior_directory: directory where the prior data is stored
18 :type prior_directory: str
19 :param prior_type: type of the prior data
20 :type prior_type: str
21 :param dt_sampling: smoothing window duration in second
22 :type dt_sampling: float
23 :param measures: sequence of measures to extract, allowed values are a selection of 'times', 'B', 'U' and 'ER'
24 :return: MF, U, ER, times, dt_samp, tag
25 :rtype: lists
26 """
27 # statically get the function name from the prior type
29 if prior_type == '0path':
30 func_name = 'read_0_path'
31 elif prior_type == '50path' or prior_type == '71path':
32 func_name = 'read_50_and_71_path'
33 elif prior_type == '100path':
34 func_name = 'read_100_path'
35 else:
36 func_name = None
37 reading_prior_function = getattr(priors_module, func_name, None)
39 if reading_prior_function is None:
40 raise NotImplementedError('No loading function was defined for {} prior'.format(prior_type))
41 logging.info('Reading {} data for the priors {}...'.format(prior_type, measures))
43 # Read and return prior data arrays (nb_realisations x nb_coefs)
44 logging.info('function name {}'.format(reading_prior_function.__name__))
45 return reading_prior_function(prior_directory, prior_type, dt_sampling, measures=measures)
48def read_analysed_states_hdf5(filepath):
49 if not os.path.isfile(filepath):
50 raise IOError('{} is not a valid file ! Please check the given path'.format(filepath))
52 output_dict = {}
53 with h5py.File(filepath, 'r') as hdf_file:
54 if 'analysed' in hdf_file.keys():
55 init_data = hdf_file['analysed']
56 else:
57 raise ValueError('{} does not contain "analysed" data! Initialisation from file impossible'.format(filepath))
59 for measureName, measureData in init_data.items():
60 output_dict[measureName] = np.array(measureData)
62 return output_dict
65def parse_config_file(file_name=None):
66 """ Reads the config file and returns a dictionary of the config. Supported types for reading: int, float, str, bool
68 :param file_name: file name storing the config
69 :type file_name: str
70 :return: the configuration read: dict["parameter"] = value
71 :rtype: dict
72 """
73 if file_name is None:
74 raise IOError('Cannot read configuration : no file path was given !')
75 if not os.path.isfile(file_name):
76 raise IOError('{} is not a valid file ! Please check the given path'.format(file_name))
78 config = {}
79 with open(file_name, "r") as handle:
80 for l in handle:
81 if l.strip().startswith("#") or l.strip() == '':
82 continue
83 parameter, p_type = l.split()[:2]
84 vals = l.split()[2:]
85 p_val = vals[0] if len(vals) == 1 else vals
86 if p_type == "int":
87 config[parameter] = int(p_val)
88 elif p_type == "float":
89 config[parameter] = float(p_val)
90 elif p_type == "str":
91 config[parameter] = p_val
92 elif p_type == "bool":
93 if p_val == '0' or p_val.lower() == 'false' or p_val.lower() == 'no':
94 config[parameter] = False
95 else:
96 config[parameter] = bool(p_val)
97 elif p_type == "csv":
98 config[parameter] = np.array(p_val.split(','))
99 # Reading of date objects
100 elif p_type == "date":
101 # If only year is given, assume Jan
102 if len(p_val.split('-')) <= 1:
103 p_val += "-01"
104 config[parameter] = np.datetime64(p_val, 'M')
105 elif p_type == "months":
106 config[parameter] = np.timedelta64(p_val, 'M')
107 elif p_type == "years":
108 # Convert in months
109 config[parameter] = np.timedelta64(int(p_val)*12, 'M')
110 elif p_type == "dict":
111 p_val = ' '.join(p_val) # taking care of spaces
112 config[parameter] = eval(p_val)
113 else:
114 raise ValueError("type not (yet) supported: {}".format(p_type))
116 return config
119def read_prior(filename, max_cols=None, **kwargs):
120 """
121 Uses genfromtxt to read the prior data. Returns the average :math:`<X>`, the RMS
122 :math:`\sqrt{<(X-<X>)^2>}` and deviation :math:`X-<X>` of the prior.
124 :param filename: path of the prior file to read
125 :type filename: str
126 :param max_cols: number of columns to read
127 :type max_cols: int or None
128 :param kwargs: keyword args passed to genfromtxt.
129 :type kwargs: kwargs
130 :return: tuple containing the average, the root-mean-square and the deviation with respect to the average of the prior. All arrays have dimensions (max_cols x max_rows).
131 :rtype: (ndarray, ndarray, ndarray)
132 """
134 # Setting usecols in genfromtxt is faster than reading the full prior and slicing it.
135 # If max_cols is ill-defined, read all cols
136 if type(max_cols) is not int:
137 raw_prior = np.genfromtxt(filename, **kwargs)
138 else:
139 raw_prior = np.genfromtxt(filename, usecols=range(0, max_cols), **kwargs)
141 # Compute average
142 avg_prior = np.average(raw_prior, axis=0)
143 # Compute rms
144 rms_prior = np.std(raw_prior, axis=0)
145 # Compute deviation
146 dev_prior = raw_prior - avg_prior
148 return avg_prior, rms_prior, dev_prior
151def read_single_computed_realisation(filename):
152 dates = []
153 data = []
154 with open(filename, 'r') as f:
155 for line in f:
156 values = line.split()
157 dates.append(values[0])
158 data.append(values[1:])
160 return np.array(dates, dtype=np.float64), np.array(data, dtype=np.float64)
163def read_all_computed_realisations(basename, nb_realisations):
164 all_measures = []
165 dates = []
166 for i in range(0, nb_realisations):
167 real_name = basename.replace('*', str(i))
168 print('Reading {}...'.format(real_name))
169 dates, measure_data = read_single_computed_realisation(real_name)
170 all_measures.append(measure_data)
172 return dates, np.array(all_measures, dtype=np.float64)
175def read_observatories_cdf_data(cdf_filename, measure_type, obs, remove_bias_crust=True,
176 huge_sigma=99999, discard=False):
177 """
178 Reads the data of observatories file in cdf format.
180 huge_sigma: Replace Nans in sigma by this big value
182 :param cdf_filename: path of the cdf observation file to read
183 :type filename: str
184 :param measure_type: either 'MF' or 'SV' for main field or secular variation
185 :type filename: str
186 :param obs: abreviation of the satellite name, possible values 'GO' (ground observations), 'CH' (Champ), 'SW' (Swarm), 'OR' (oersted), 'CR' (cryosat), 'CO' (composite)
187 :type filename: str
188 :param huge_sigma: Replace Nans in sigma by this big value
189 :type cdf_filename: str
190 :param cdf_filename: name of the .cdf file
191 """
192 possible_obs = np.array(['GO', 'CH', 'SW', 'OR', 'CR', 'GR', 'CO'])
193 if not np.any(obs == possible_obs):
194 raise ValueError('Got {} as obs value, possible values are {}'.format(obs, possible_obs))
195 if not np.any(measure_type == np.array(['MF', 'SV'])):
196 raise ValueError('Got {} as measure_type, possible values are {}'.format(measure_type, ('MF', 'SV')))
198 cdf_file = cdflib.CDF(cdf_filename)
200 thetas = 90 - cdf_file.varget("Latitude") # convert in colatitude
201 if np.any(thetas > 180) or np.any(thetas < 0):
202 raise ValueError('angle th {} represent the colatitude, did you use latitudes instead?'.format(thetas))
204 thetas = thetas * np.pi / 180 # convert in rad
205 phis = cdf_file.varget("Longitude") * np.pi / 180
206 radii = cdf_file.varget('Radius') / 1000 # to convert in kms
207 assert np.amax(radii) < 10000 and np.amin(radii) > 6000, 'Are you sure radii are stored in meters?'
209 if measure_type == 'MF':
210 times = cdf_file.varget('Timestamp')
211 Bs, sigma2 = cdf_file.varget('B_CF'), cdf_file.varget('sigma_CF')**2
212 if remove_bias_crust and obs == 'GO':
213 Bs -= cdf_file.varget('bias_crust')
214 elif measure_type == 'SV':
215 times = cdf_file.varget('Timestamp_SV')
216 Bs, sigma2 = cdf_file.varget('B_SV'), cdf_file.varget('sigma_SV')**2
217 Bs[np.where(Bs==0)] = np.nan
218 sigma2[np.where(sigma2==0)] = np.nan
220 err = 'At least some invalid values of sigma correspond to valid values in B, file, measure and obs {}, {}, {}'
221 err = err.format(cdf_filename.split('/')[-1], measure_type, obs)
223 if obs == 'GO':
224 locs = [''.join([l[0] for l in loc]) for loc in cdf_file.varget('Obs')]
225 N_VO = len(list(dict.fromkeys(zip(thetas, phis, locs)))) # small trick that counts the number of unique pairs of theta/phi/locs
226 else:
227 N_VO = len(dict.fromkeys(zip(thetas, phis)).keys()) # small trick that counts the number of unique pairs of theta/phi
228 dates = utilities.cdf_times_to_np_date(times)
229 # dates = [np.datetime64('{}-{:02}'.format(*cdflib.cdfepoch.breakdown_epoch(t)[:2])) for t in times]
230 # Precise the first date as cdf files are not human readable and therefore it is not easy to find the date manually
231 # logging.info('First data available at {}'.format(dates[0]))
233 obs_data = defaultdict(list) # list returns []
234 obs_positions = defaultdict(list)
235 var_data = defaultdict(list)
237 if discard:
238 to_deg = lambda alpha: 180 * alpha / np.pi
239 to_rad = lambda alpha: np.pi * alpha / 180
240 chaos_file = '../data/observations/CHAOS-7/CHAOS-7.hdf5' # takes Chaos file from usual data location
241 chaos_file = os.path.join(os.path.dirname(__file__), chaos_file) # relative path to absolute
242 times_chaos, dipoles = utilities.extract_dipole_chaos(chaos_file)
244 for i, date in enumerate(dates):
245 if discard: # if discard high_lat set to 0, will pass.
246 g_10, g_11, h_11 = dipoles[np.argmin(abs(times_chaos - times[i])), :3]
247 th_0, ph_0 = utilities.north_geomagnetic_pole_angle(g_10, g_11, h_11)
248 theta_d = to_deg(utilities.geomagnetic_latitude(thetas[i], phis[i], th_0, ph_0)) # all angles should be in rad
249 if theta_d > 90 + discard or theta_d < 90 - discard:
250 radii[i] = np.nan
251 thetas[i] = np.nan
252 phis[i] = np.nan
253 Bs[i] = np.nan
254 sigma2[i] = np.nan
256 # Add positions
257 # Convert angles in rad
258 obs_positions[date].extend((radii[i], thetas[i], phis[i]))
259 # Add measurement data
260 obs_data[date].extend(tuple(Bs[i]))
261 # add errors
262 var_data[date].extend(tuple(sigma2[i, :]))
264 # if MF measure, compute a simple time average to sync MF and SV dates
265 if measure_type == "MF":
266 obs_data_sync = defaultdict(list) # list returns []
267 obs_positions_sync = defaultdict(list)
268 var_data_sync = defaultdict(list)
269 mask_data_sync = defaultdict(list)
270 sat_name_sync = defaultdict(list)
271 unique = np.unique(np.array(dates))
272 for i, date in enumerate(unique):
273 if i < unique.shape[0]-1:
274 dt = (unique[i+1] - unique[i])/2
275 date_sync = unique[i] + dt
277 # time i
278 pos1 = np.array(obs_positions[unique[i]])
279 data1 = np.array(obs_data[unique[i]])
280 var1 = np.array(var_data[unique[i]])
282 # time i+1
283 pos2 = np.array(obs_positions[unique[i+1]])
284 data2 = np.array(obs_data[unique[i+1]])
285 var2 = np.array(var_data[unique[i+1]])
287 # compute average
288 obs_positions_sync[date_sync] = (pos1 + pos2)/2
289 obs_data_sync[date_sync] = (data1 + data2)/2
290 var_data_sync[date_sync] = (var1 + var2)/4
292 #update sync dicts
293 mask = np.logical_not(np.isnan(obs_positions_sync[date_sync]) | np.isnan(obs_data_sync[date_sync]) | np.isnan(var_data_sync[date_sync]))
294 obs_positions_sync[date_sync] = list(obs_positions_sync[date_sync][mask])
295 obs_data_sync[date_sync] = list(obs_data_sync[date_sync][mask])
296 var_data_sync[date_sync] = list(var_data_sync[date_sync][mask])
297 if obs == "SW":
298 mask_data_sync[date_sync] = mask
299 else:
300 mask_data_sync[date_sync] = np.array([])
301 sat_name_sync[date_sync] = list(obs) * len(obs_data_sync[date_sync])
302 else:
303 obs_data_sync = defaultdict(list) # list returns []
304 obs_positions_sync = defaultdict(list)
305 var_data_sync = defaultdict(list)
306 mask_data_sync = defaultdict(list)
307 sat_name_sync = defaultdict(list)
308 unique = np.unique(np.array(dates))
309 for i, date in enumerate(unique):
311 # time i
312 obs_positions_sync[unique[i]] = np.array(obs_positions[unique[i]])
313 obs_data_sync[unique[i]] = np.array(obs_data[unique[i]])
314 var_data_sync[unique[i]] = np.array(var_data[unique[i]])
316 #update sync dicts
317 mask = np.logical_not(np.isnan(obs_positions_sync[unique[i]]) | np.isnan(obs_data_sync[unique[i]]) | np.isnan(var_data_sync[unique[i]]))
318 obs_positions_sync[unique[i]] = list(obs_positions_sync[unique[i]][mask])
319 obs_data_sync[unique[i]] = list(obs_data_sync[unique[i]][mask])
320 var_data_sync[unique[i]] = list(var_data_sync[unique[i]][mask])
321 if obs == "SW":
322 mask_data_sync[unique[i]] = mask
323 else:
324 mask_data_sync[unique[i]] = np.array([])
325 sat_name_sync[unique[i]] = list(obs) * len(obs_data_sync[unique[i]])
327 return obs_positions_sync, obs_data_sync, var_data_sync, mask_data_sync, sat_name_sync