Coverage for webgeodyn/obsdata/obsdata.py: 0%
78 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-18 09:33 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-18 09:33 +0000
1import numpy as np
2import os
3from .observatory import ObservatoryGroup
4import cdflib
6class ObsData:
7 """
8 Class handling the list of ObservatoryGroups and interfaces it with the webpage.
9 """
10 def __init__(self, obs_directory=None):
11 """
12 Creates the obsGroups dict and loads the data from the current directory.
13 """
14 self.obsGroups = {}
15 if obs_directory is None:
16 obs_directory = os.path.dirname(__file__)
17 self.loadFromDirectory(obs_directory)
19 def getDataInfo(self):
20 """
21 Returns a JSON with group names as keys.
23 The Values of the JSON are also dictionaries with the following keys,values :
25 - 'coordinates': coordinates of the observatories of the group
26 - 'search_radius': search radius of the group
27 - 'display_r': display radius of the group
29 """
30 jsondata = {}
31 for obsGroupName in self.obsGroups:
32 jsondata[obsGroupName] = {
33 "coordinates": self.obsGroups[obsGroupName].coordinates,
34 "search_radius": self.obsGroups[obsGroupName].search_radius,
35 "display_r": self.obsGroups[obsGroupName].display_r
36 }
37 return jsondata
39 def addObsGroup(self, groupName, display_r, search_radius=None):
40 """
41 Creates a new ObservatoryGroup.
43 :param groupName: name of the group to create. Raises an Error if it already exists.
44 :type groupName: str
45 :param display_r: radius where the group data should be displayed
46 :type display_r: float or None
47 :param search_radius: ?
48 :type search_radius:
49 """
50 if groupName in self.obsGroups:
51 raise ValueError("An observatories group named %s already exists." % groupName)
52 self.obsGroups[groupName] = ObservatoryGroup(groupName, display_r, search_radius)
54 def getObsR(self, th, ph, groupName):
55 """
56 :param th: colatitude at which the r should be fetched.
57 :type th: float
58 :param ph: azimuth at which the r should be fetched.
59 :type ph: float
60 :param groupName: name of the group in which the data should be fetched.
61 :type groupName:
62 :return: radius of the group
63 :rtype: float
64 """
65 return self.obsGroups[groupName].getObsR(th, ph)
67 def getData(self, measureName, th, ph, groupName):
68 """
69 Gets the observed data for the given parameters.
71 :param measureName: name of the measure to fetch
72 :type measureName: str
73 :param th: colatitude at which the data should be fetched.
74 :type th: float
75 :param ph: azimuth at which the data should be fetched.
76 :type ph: float
77 :param groupName: name of the group for which the data should be fetched
78 :type groupName: str
79 :return: Temporal array of the observed data at th,ph.
80 :rtype: np.array
81 """
82 return self.obsGroups[groupName].getObservatoryData(th, ph, measureName)
84 def find_cdf_files(self, path):
85 """
86 find all files with .cdf extension in path directory and
87 returns the list the 12-month data.
88 """
89 brut_files_12M = []
90 for dirpath, dirnames, filenames in os.walk(path):
91 for filename in [f for f in filenames if f.endswith(".cdf")]:
92 name = os.path.join(dirpath, filename)
93 if '12M' in filename:
94 brut_files_12M.append(name)
95 else:
96 print('file {} do not match requirements'.format(filename))
97 return brut_files_12M
99 def loadFromDirectory(self, dataDirectory, cdf_type='12M'):
100 """
101 Reads VO files into CHAMP, SWARM, GO and other satellites files in Magnetic ObservatoryGroups.
103 :param dataDirectory: directory where the files are located
104 :type dataDirectory: str
105 """
106 # Read CHAMP, SWARM, Oersted, Cryosat and Composite cdf data files
107 possible_ids = np.array(['GROUND', 'CHAMP', 'SWARM', 'OERSTED', 'CRYOSAT', 'COMPOSITE'])
108 cdf_name_shortcuts = ['GO', 'CH', 'SW', 'OR', 'CR', 'CO']
110 # find all cdf filenames in data directory
111 cdf_files_12M = self.find_cdf_files(dataDirectory)
113 for obsGroupName, cdf_shortcut in zip(possible_ids, cdf_name_shortcuts):
114 GO = True if obsGroupName == 'GROUND' else False
116 if cdf_type == '12M':
117 cdf_files = cdf_files_12M
118 else:
119 raise ValueError('cdf_type {} not recognized'.format(cdf_type))
120 # extract file that starts with the cdf name shortcut, i.e. GO, CH, SW....
121 try:
122 idx = [name.split('/')[-1][:2] for name in cdf_files].index(cdf_shortcut)
123 except ValueError:
124 print('{} not found'.format(possible_ids[cdf_name_shortcuts.index(cdf_shortcut)]))
125 cdf_filename = cdf_files[idx]
127 cdf_file = cdflib.CDF(cdf_filename)
129 print("Reading {} data from {}".format(obsGroupName, cdf_filename))
131 thetas = 90 - cdf_file.varget("Latitude") # convert in colatitude
132 if np.any(thetas > 180) or np.any(thetas < 0):
133 raise ValueError('angle th {} represent the colatitude, did you use latitudes instead?'.format(th))
134 phis = cdf_file.varget("Longitude")
135 radii = cdf_file.varget('Radius') / 1000 # to convert in km
136 if GO:
137 locs = [''.join([l[0] for l in loc]) for loc in cdf_file.varget('Obs')]
139 # because VOs circles should be bigger than GOs
140 if GO:
141 self.addObsGroup(obsGroupName, display_r=6371.2, search_radius=None)
142 else:
143 # assume unique radius for all VOs through time
144 self.addObsGroup(obsGroupName, display_r=radii[0], search_radius=850)
146 times_stamp = cdf_file.varget('Timestamp')
147 dec_times = []
148 for t in times_stamp:
149 year, month = cdflib.cdfepoch.breakdown_epoch(t)[:2]
150 dec_times.append(year + month / 12)
152 Bs, SVs = cdf_file.varget('B_CF'), cdf_file.varget('B_SV')
153 if GO:
154 Bs -= cdf_file.varget('bias_crust')
156 for i, (B, SV, r, th, ph, time) in enumerate(zip(Bs, SVs, radii, thetas, phis, dec_times)):
157 if self.obsGroups[obsGroupName].display_r is None and not GO:
158 self.obsGroups[obsGroupName].display_r = r
160 if GO:
161 self.obsGroups[obsGroupName].addObservatory(locs[i], r, th, ph)
162 else:
163 self.obsGroups[obsGroupName].addObservatory(obsGroupName, r, th, ph) # Add if not exists
165 if np.any(np.isnan(B)):
166 continue
167 else:
168 self.obsGroups[obsGroupName].addObservatoryData(th, ph, 'MF', time, B[0], B[1], B[2])
170 if np.any(np.isnan(SV)):
171 continue
172 else:
173 self.obsGroups[obsGroupName].addObservatoryData(th, ph, 'SV', time, SV[0], SV[1], SV[2])