Coverage for pygeodyn/inout/reads.py: 72%

200 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-12-22 13:43 +0000

1""" Contains generic reading functions """ 

2import numpy as np 

3from collections import defaultdict 

4import os.path 

5import logging 

6import h5py 

7from . import priors as priors_module 

8from .. import utilities 

9import cdflib 

10import h5py 

11from operator import add 

12 

13def extract_realisations(prior_directory, prior_type, dt_sampling, measures=None): 

14 """ 

15 Return prior data arrays (nb_realisations x nb_coefs) by dynamically getting the reading function from the prior type. 

16 

17 :param prior_directory: directory where the prior data is stored 

18 :type prior_directory: str 

19 :param prior_type: type of the prior data 

20 :type prior_type: str 

21 :param dt_sampling: smoothing window duration in second 

22 :type dt_sampling: float 

23 :param measures: sequence of measures to extract, allowed values are a selection of 'times', 'B', 'U' and 'ER' 

24 :return: MF, U, ER, times, dt_samp, tag 

25 :rtype: lists 

26 """ 

27 # statically get the function name from the prior type 

28 

29 if prior_type == '0path': 

30 func_name = 'read_0_path' 

31 elif prior_type == '50path' or prior_type == '71path': 

32 func_name = 'read_50_and_71_path' 

33 elif prior_type == '100path': 

34 func_name = 'read_100_path' 

35 else: 

36 func_name = None 

37 reading_prior_function = getattr(priors_module, func_name, None) 

38 

39 if reading_prior_function is None: 

40 raise NotImplementedError('No loading function was defined for {} prior'.format(prior_type)) 

41 logging.info('Reading {} data for the priors {}...'.format(prior_type, measures)) 

42 

43 # Read and return prior data arrays (nb_realisations x nb_coefs) 

44 logging.info('function name {}'.format(reading_prior_function.__name__)) 

45 return reading_prior_function(prior_directory, prior_type, dt_sampling, measures=measures) 

46 

47 

48def read_analysed_states_hdf5(filepath): 

49 if not os.path.isfile(filepath): 

50 raise IOError('{} is not a valid file ! Please check the given path'.format(filepath)) 

51 

52 output_dict = {} 

53 with h5py.File(filepath, 'r') as hdf_file: 

54 if 'analysed' in hdf_file.keys(): 

55 init_data = hdf_file['analysed'] 

56 else: 

57 raise ValueError('{} does not contain "analysed" data! Initialisation from file impossible'.format(filepath)) 

58 

59 for measureName, measureData in init_data.items(): 

60 output_dict[measureName] = np.array(measureData) 

61 

62 return output_dict 

63 

64 

65def parse_config_file(file_name=None): 

66 """ Reads the config file and returns a dictionary of the config. Supported types for reading: int, float, str, bool 

67 

68 :param file_name: file name storing the config 

69 :type file_name: str 

70 :return: the configuration read: dict["parameter"] = value 

71 :rtype: dict 

72 """ 

73 if file_name is None: 

74 raise IOError('Cannot read configuration : no file path was given !') 

75 if not os.path.isfile(file_name): 

76 raise IOError('{} is not a valid file ! Please check the given path'.format(file_name)) 

77 

78 config = {} 

79 with open(file_name, "r") as handle: 

80 for l in handle: 

81 if l.strip().startswith("#") or l.strip() == '': 

82 continue 

83 parameter, p_type = l.split()[:2] 

84 vals = l.split()[2:] 

85 p_val = vals[0] if len(vals) == 1 else vals 

86 if p_type == "int": 

87 config[parameter] = int(p_val) 

88 elif p_type == "float": 

89 config[parameter] = float(p_val) 

90 elif p_type == "str": 

91 config[parameter] = p_val 

92 elif p_type == "bool": 

93 if p_val == '0' or p_val.lower() == 'false' or p_val.lower() == 'no': 

94 config[parameter] = False 

95 else: 

96 config[parameter] = bool(p_val) 

97 elif p_type == "csv": 

98 config[parameter] = np.array(p_val.split(',')) 

99 # Reading of date objects 

100 elif p_type == "date": 

101 # If only year is given, assume Jan 

102 if len(p_val.split('-')) <= 1: 

103 p_val += "-01" 

104 config[parameter] = np.datetime64(p_val, 'M') 

105 elif p_type == "months": 

106 config[parameter] = np.timedelta64(p_val, 'M') 

107 elif p_type == "years": 

108 # Convert in months 

109 config[parameter] = np.timedelta64(int(p_val)*12, 'M') 

110 elif p_type == "dict": 

111 p_val = ' '.join(p_val) # taking care of spaces 

112 config[parameter] = eval(p_val) 

113 else: 

114 raise ValueError("type not (yet) supported: {}".format(p_type)) 

115 

116 return config 

117 

118 

119def read_prior(filename, max_cols=None, **kwargs): 

120 """ 

121 Uses genfromtxt to read the prior data. Returns the average :math:`<X>`, the RMS 

122 :math:`\sqrt{<(X-<X>)^2>}` and deviation :math:`X-<X>` of the prior. 

123 

124 :param filename: path of the prior file to read 

125 :type filename: str 

126 :param max_cols: number of columns to read 

127 :type max_cols: int or None 

128 :param kwargs: keyword args passed to genfromtxt. 

129 :type kwargs: kwargs 

130 :return: tuple containing the average, the root-mean-square and the deviation with respect to the average of the prior. All arrays have dimensions (max_cols x max_rows). 

131 :rtype: (ndarray, ndarray, ndarray) 

132 """ 

133 

134 # Setting usecols in genfromtxt is faster than reading the full prior and slicing it. 

135 # If max_cols is ill-defined, read all cols 

136 if type(max_cols) is not int: 

137 raw_prior = np.genfromtxt(filename, **kwargs) 

138 else: 

139 raw_prior = np.genfromtxt(filename, usecols=range(0, max_cols), **kwargs) 

140 

141 # Compute average 

142 avg_prior = np.average(raw_prior, axis=0) 

143 # Compute rms 

144 rms_prior = np.std(raw_prior, axis=0) 

145 # Compute deviation 

146 dev_prior = raw_prior - avg_prior 

147 

148 return avg_prior, rms_prior, dev_prior 

149 

150 

151def read_single_computed_realisation(filename): 

152 dates = [] 

153 data = [] 

154 with open(filename, 'r') as f: 

155 for line in f: 

156 values = line.split() 

157 dates.append(values[0]) 

158 data.append(values[1:]) 

159 

160 return np.array(dates, dtype=np.float64), np.array(data, dtype=np.float64) 

161 

162 

163def read_all_computed_realisations(basename, nb_realisations): 

164 all_measures = [] 

165 dates = [] 

166 for i in range(0, nb_realisations): 

167 real_name = basename.replace('*', str(i)) 

168 print('Reading {}...'.format(real_name)) 

169 dates, measure_data = read_single_computed_realisation(real_name) 

170 all_measures.append(measure_data) 

171 

172 return dates, np.array(all_measures, dtype=np.float64) 

173 

174 

175def read_observatories_cdf_data(cdf_filename, measure_type, obs, remove_bias_crust=True, 

176 huge_sigma=99999, discard=False): 

177 """ 

178 Reads the data of observatories file in cdf format. 

179 

180 huge_sigma: Replace Nans in sigma by this big value 

181 

182 :param cdf_filename: path of the cdf observation file to read 

183 :type filename: str 

184 :param measure_type: either 'MF' or 'SV' for main field or secular variation 

185 :type filename: str 

186 :param obs: abreviation of the satellite name, possible values 'GO' (ground observations), 'CH' (Champ), 'SW' (Swarm), 'OR' (oersted), 'CR' (cryosat), 'CO' (composite) 

187 :type filename: str 

188 :param huge_sigma: Replace Nans in sigma by this big value 

189 :type cdf_filename: str 

190 :param cdf_filename: name of the .cdf file 

191 """ 

192 possible_obs = np.array(['GO', 'CH', 'SW', 'OR', 'CR', 'GR', 'CO']) 

193 if not np.any(obs == possible_obs): 

194 raise ValueError('Got {} as obs value, possible values are {}'.format(obs, possible_obs)) 

195 if not np.any(measure_type == np.array(['MF', 'SV'])): 

196 raise ValueError('Got {} as measure_type, possible values are {}'.format(measure_type, ('MF', 'SV'))) 

197 

198 cdf_file = cdflib.CDF(cdf_filename) 

199 

200 thetas = 90 - cdf_file.varget("Latitude") # convert in colatitude 

201 if np.any(thetas > 180) or np.any(thetas < 0): 

202 raise ValueError('angle th {} represent the colatitude, did you use latitudes instead?'.format(thetas)) 

203 

204 thetas = thetas * np.pi / 180 # convert in rad 

205 phis = cdf_file.varget("Longitude") * np.pi / 180 

206 radii = cdf_file.varget('Radius') / 1000 # to convert in kms 

207 assert np.amax(radii) < 10000 and np.amin(radii) > 6000, 'Are you sure radii are stored in meters?' 

208 

209 if measure_type == 'MF': 

210 times = cdf_file.varget('Timestamp') 

211 Bs, sigma2 = cdf_file.varget('B_CF'), cdf_file.varget('sigma_CF')**2 

212 if remove_bias_crust and obs == 'GO': 

213 Bs -= cdf_file.varget('bias_crust') 

214 elif measure_type == 'SV': 

215 times = cdf_file.varget('Timestamp_SV') 

216 Bs, sigma2 = cdf_file.varget('B_SV'), cdf_file.varget('sigma_SV')**2 

217 Bs[np.where(Bs==0)] = np.nan 

218 sigma2[np.where(sigma2==0)] = np.nan 

219 

220 err = 'At least some invalid values of sigma correspond to valid values in B, file, measure and obs {}, {}, {}' 

221 err = err.format(cdf_filename.split('/')[-1], measure_type, obs) 

222 

223 if obs == 'GO': 

224 locs = [''.join([l[0] for l in loc]) for loc in cdf_file.varget('Obs')] 

225 N_VO = len(list(dict.fromkeys(zip(thetas, phis, locs)))) # small trick that counts the number of unique pairs of theta/phi/locs 

226 else: 

227 N_VO = len(dict.fromkeys(zip(thetas, phis)).keys()) # small trick that counts the number of unique pairs of theta/phi 

228 dates = utilities.cdf_times_to_np_date(times) 

229 # dates = [np.datetime64('{}-{:02}'.format(*cdflib.cdfepoch.breakdown_epoch(t)[:2])) for t in times] 

230 # Precise the first date as cdf files are not human readable and therefore it is not easy to find the date manually 

231 # logging.info('First data available at {}'.format(dates[0])) 

232 

233 obs_data = defaultdict(list) # list returns [] 

234 obs_positions = defaultdict(list) 

235 var_data = defaultdict(list) 

236 

237 if discard: 

238 to_deg = lambda alpha: 180 * alpha / np.pi 

239 to_rad = lambda alpha: np.pi * alpha / 180 

240 chaos_file = '../data/observations/CHAOS-7/CHAOS-7.hdf5' # takes Chaos file from usual data location 

241 chaos_file = os.path.join(os.path.dirname(__file__), chaos_file) # relative path to absolute 

242 times_chaos, dipoles = utilities.extract_dipole_chaos(chaos_file) 

243 

244 for i, date in enumerate(dates): 

245 if discard: # if discard high_lat set to 0, will pass. 

246 g_10, g_11, h_11 = dipoles[np.argmin(abs(times_chaos - times[i])), :3] 

247 th_0, ph_0 = utilities.north_geomagnetic_pole_angle(g_10, g_11, h_11) 

248 theta_d = to_deg(utilities.geomagnetic_latitude(thetas[i], phis[i], th_0, ph_0)) # all angles should be in rad 

249 if theta_d > 90 + discard or theta_d < 90 - discard: 

250 radii[i] = np.nan 

251 thetas[i] = np.nan 

252 phis[i] = np.nan 

253 Bs[i] = np.nan 

254 sigma2[i] = np.nan 

255 

256 # Add positions 

257 # Convert angles in rad 

258 obs_positions[date].extend((radii[i], thetas[i], phis[i])) 

259 # Add measurement data 

260 obs_data[date].extend(tuple(Bs[i])) 

261 # add errors 

262 var_data[date].extend(tuple(sigma2[i, :])) 

263 

264 # if MF measure, compute a simple time average to sync MF and SV dates  

265 if measure_type == "MF": 

266 obs_data_sync = defaultdict(list) # list returns [] 

267 obs_positions_sync = defaultdict(list) 

268 var_data_sync = defaultdict(list) 

269 mask_data_sync = defaultdict(list) 

270 sat_name_sync = defaultdict(list) 

271 unique = np.unique(np.array(dates)) 

272 for i, date in enumerate(unique): 

273 if i < unique.shape[0]-1: 

274 dt = (unique[i+1] - unique[i])/2 

275 date_sync = unique[i] + dt 

276 

277 # time i 

278 pos1 = np.array(obs_positions[unique[i]]) 

279 data1 = np.array(obs_data[unique[i]]) 

280 var1 = np.array(var_data[unique[i]]) 

281 

282 # time i+1 

283 pos2 = np.array(obs_positions[unique[i+1]]) 

284 data2 = np.array(obs_data[unique[i+1]]) 

285 var2 = np.array(var_data[unique[i+1]]) 

286 

287 # compute average 

288 obs_positions_sync[date_sync] = (pos1 + pos2)/2 

289 obs_data_sync[date_sync] = (data1 + data2)/2 

290 var_data_sync[date_sync] = (var1 + var2)/4 

291 

292 #update sync dicts 

293 mask = np.logical_not(np.isnan(obs_positions_sync[date_sync]) | np.isnan(obs_data_sync[date_sync]) | np.isnan(var_data_sync[date_sync])) 

294 obs_positions_sync[date_sync] = list(obs_positions_sync[date_sync][mask]) 

295 obs_data_sync[date_sync] = list(obs_data_sync[date_sync][mask]) 

296 var_data_sync[date_sync] = list(var_data_sync[date_sync][mask]) 

297 if obs == "SW": 

298 mask_data_sync[date_sync] = mask 

299 else: 

300 mask_data_sync[date_sync] = np.array([]) 

301 sat_name_sync[date_sync] = list(obs) * len(obs_data_sync[date_sync]) 

302 else: 

303 obs_data_sync = defaultdict(list) # list returns [] 

304 obs_positions_sync = defaultdict(list) 

305 var_data_sync = defaultdict(list) 

306 mask_data_sync = defaultdict(list) 

307 sat_name_sync = defaultdict(list) 

308 unique = np.unique(np.array(dates)) 

309 for i, date in enumerate(unique): 

310 

311 # time i 

312 obs_positions_sync[unique[i]] = np.array(obs_positions[unique[i]]) 

313 obs_data_sync[unique[i]] = np.array(obs_data[unique[i]]) 

314 var_data_sync[unique[i]] = np.array(var_data[unique[i]]) 

315 

316 #update sync dicts 

317 mask = np.logical_not(np.isnan(obs_positions_sync[unique[i]]) | np.isnan(obs_data_sync[unique[i]]) | np.isnan(var_data_sync[unique[i]])) 

318 obs_positions_sync[unique[i]] = list(obs_positions_sync[unique[i]][mask]) 

319 obs_data_sync[unique[i]] = list(obs_data_sync[unique[i]][mask]) 

320 var_data_sync[unique[i]] = list(var_data_sync[unique[i]][mask]) 

321 if obs == "SW": 

322 mask_data_sync[unique[i]] = mask 

323 else: 

324 mask_data_sync[unique[i]] = np.array([]) 

325 sat_name_sync[unique[i]] = list(obs) * len(obs_data_sync[unique[i]]) 

326 

327 return obs_positions_sync, obs_data_sync, var_data_sync, mask_data_sync, sat_name_sync