Coverage for webgeodyn/obsdata/obsdata.py: 0%

78 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-12-18 09:33 +0000

1import numpy as np 

2import os 

3from .observatory import ObservatoryGroup 

4import cdflib 

5 

6class ObsData: 

7 """ 

8 Class handling the list of ObservatoryGroups and interfaces it with the webpage. 

9 """ 

10 def __init__(self, obs_directory=None): 

11 """ 

12 Creates the obsGroups dict and loads the data from the current directory. 

13 """ 

14 self.obsGroups = {} 

15 if obs_directory is None: 

16 obs_directory = os.path.dirname(__file__) 

17 self.loadFromDirectory(obs_directory) 

18 

19 def getDataInfo(self): 

20 """ 

21 Returns a JSON with group names as keys. 

22 

23 The Values of the JSON are also dictionaries with the following keys,values : 

24 

25 - 'coordinates': coordinates of the observatories of the group 

26 - 'search_radius': search radius of the group 

27 - 'display_r': display radius of the group 

28 

29 """ 

30 jsondata = {} 

31 for obsGroupName in self.obsGroups: 

32 jsondata[obsGroupName] = { 

33 "coordinates": self.obsGroups[obsGroupName].coordinates, 

34 "search_radius": self.obsGroups[obsGroupName].search_radius, 

35 "display_r": self.obsGroups[obsGroupName].display_r 

36 } 

37 return jsondata 

38 

39 def addObsGroup(self, groupName, display_r, search_radius=None): 

40 """ 

41 Creates a new ObservatoryGroup. 

42 

43 :param groupName: name of the group to create. Raises an Error if it already exists. 

44 :type groupName: str 

45 :param display_r: radius where the group data should be displayed 

46 :type display_r: float or None 

47 :param search_radius: ? 

48 :type search_radius: 

49 """ 

50 if groupName in self.obsGroups: 

51 raise ValueError("An observatories group named %s already exists." % groupName) 

52 self.obsGroups[groupName] = ObservatoryGroup(groupName, display_r, search_radius) 

53 

54 def getObsR(self, th, ph, groupName): 

55 """ 

56 :param th: colatitude at which the r should be fetched. 

57 :type th: float 

58 :param ph: azimuth at which the r should be fetched. 

59 :type ph: float 

60 :param groupName: name of the group in which the data should be fetched. 

61 :type groupName: 

62 :return: radius of the group 

63 :rtype: float 

64 """ 

65 return self.obsGroups[groupName].getObsR(th, ph) 

66 

67 def getData(self, measureName, th, ph, groupName): 

68 """ 

69 Gets the observed data for the given parameters. 

70 

71 :param measureName: name of the measure to fetch 

72 :type measureName: str 

73 :param th: colatitude at which the data should be fetched. 

74 :type th: float 

75 :param ph: azimuth at which the data should be fetched. 

76 :type ph: float 

77 :param groupName: name of the group for which the data should be fetched 

78 :type groupName: str 

79 :return: Temporal array of the observed data at th,ph. 

80 :rtype: np.array 

81 """ 

82 return self.obsGroups[groupName].getObservatoryData(th, ph, measureName) 

83 

84 def find_cdf_files(self, path): 

85 """ 

86 find all files with .cdf extension in path directory and 

87 returns the list the 12-month data. 

88 """ 

89 brut_files_12M = [] 

90 for dirpath, dirnames, filenames in os.walk(path): 

91 for filename in [f for f in filenames if f.endswith(".cdf")]: 

92 name = os.path.join(dirpath, filename) 

93 if '12M' in filename: 

94 brut_files_12M.append(name) 

95 else: 

96 print('file {} do not match requirements'.format(filename)) 

97 return brut_files_12M 

98 

99 def loadFromDirectory(self, dataDirectory, cdf_type='12M'): 

100 """ 

101 Reads VO files into CHAMP, SWARM, GO and other satellites files in Magnetic ObservatoryGroups. 

102 

103 :param dataDirectory: directory where the files are located 

104 :type dataDirectory: str 

105 """ 

106 # Read CHAMP, SWARM, Oersted, Cryosat and Composite cdf data files 

107 possible_ids = np.array(['GROUND', 'CHAMP', 'SWARM', 'OERSTED', 'CRYOSAT', 'COMPOSITE']) 

108 cdf_name_shortcuts = ['GO', 'CH', 'SW', 'OR', 'CR', 'CO'] 

109 

110 # find all cdf filenames in data directory 

111 cdf_files_12M = self.find_cdf_files(dataDirectory) 

112 

113 for obsGroupName, cdf_shortcut in zip(possible_ids, cdf_name_shortcuts): 

114 GO = True if obsGroupName == 'GROUND' else False 

115 

116 if cdf_type == '12M': 

117 cdf_files = cdf_files_12M 

118 else: 

119 raise ValueError('cdf_type {} not recognized'.format(cdf_type)) 

120 # extract file that starts with the cdf name shortcut, i.e. GO, CH, SW.... 

121 try: 

122 idx = [name.split('/')[-1][:2] for name in cdf_files].index(cdf_shortcut) 

123 except ValueError: 

124 print('{} not found'.format(possible_ids[cdf_name_shortcuts.index(cdf_shortcut)])) 

125 cdf_filename = cdf_files[idx] 

126 

127 cdf_file = cdflib.CDF(cdf_filename) 

128 

129 print("Reading {} data from {}".format(obsGroupName, cdf_filename)) 

130 

131 thetas = 90 - cdf_file.varget("Latitude") # convert in colatitude 

132 if np.any(thetas > 180) or np.any(thetas < 0): 

133 raise ValueError('angle th {} represent the colatitude, did you use latitudes instead?'.format(th)) 

134 phis = cdf_file.varget("Longitude") 

135 radii = cdf_file.varget('Radius') / 1000 # to convert in km 

136 if GO: 

137 locs = [''.join([l[0] for l in loc]) for loc in cdf_file.varget('Obs')] 

138 

139 # because VOs circles should be bigger than GOs 

140 if GO: 

141 self.addObsGroup(obsGroupName, display_r=6371.2, search_radius=None) 

142 else: 

143 # assume unique radius for all VOs through time 

144 self.addObsGroup(obsGroupName, display_r=radii[0], search_radius=850) 

145 

146 times_stamp = cdf_file.varget('Timestamp') 

147 dec_times = [] 

148 for t in times_stamp: 

149 year, month = cdflib.cdfepoch.breakdown_epoch(t)[:2] 

150 dec_times.append(year + month / 12) 

151 

152 Bs, SVs = cdf_file.varget('B_CF'), cdf_file.varget('B_SV') 

153 if GO: 

154 Bs -= cdf_file.varget('bias_crust') 

155 

156 for i, (B, SV, r, th, ph, time) in enumerate(zip(Bs, SVs, radii, thetas, phis, dec_times)): 

157 if self.obsGroups[obsGroupName].display_r is None and not GO: 

158 self.obsGroups[obsGroupName].display_r = r 

159 

160 if GO: 

161 self.obsGroups[obsGroupName].addObservatory(locs[i], r, th, ph) 

162 else: 

163 self.obsGroups[obsGroupName].addObservatory(obsGroupName, r, th, ph) # Add if not exists 

164 

165 if np.any(np.isnan(B)): 

166 continue 

167 else: 

168 self.obsGroups[obsGroupName].addObservatoryData(th, ph, 'MF', time, B[0], B[1], B[2]) 

169 

170 if np.any(np.isnan(SV)): 

171 continue 

172 else: 

173 self.obsGroups[obsGroupName].addObservatoryData(th, ph, 'SV', time, SV[0], SV[1], SV[2])