Coverage for webgeodyn/data/TSData.py: 0%

170 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-12-18 09:33 +0000

1import numpy as np 

2from .measuredata import MeasureData 

3from .normPnm import semiNormalisedSchmidt 

4from webgeodyn.constants import rE, rC 

5 

6 

7# Data for U and S (tc,ts,sc,ss toroidal/poloidal harmonics coefficients) 

8class TSData(MeasureData): 

9 """ Class handling U and S (toroidal/poloidal harmonics coefficients).""" 

10 def __init__(self, data, lmax, units, measureType="U", PnmNorm=semiNormalisedSchmidt, gridth=None, gridph=None): 

11 """ Constructor of TSData class 

12 :param data: measure data 

13 :type data: np.array 

14 :param lmax: maximum degree 

15 :type lmax: int 

16 :param units: units of the measure 

17 :type units: str 

18 :param measureType: type of the measure (default = 'U') 

19 :type measureType: str ('U', 'S' or 'DIVHU') 

20 :param PnmNorm: normalisation to use (default: semiNormalisedSchmidt) 

21 :type PnmNorm: function 

22 :param gridth: grid on which the theta are evaluated (default: None) 

23 :type gridth: np.array 

24 :param gridph: grid on which the theta are evaluated (default: None) 

25 :type gridph: np.array 

26 """ 

27 super().__init__(data.shape, lmax, PnmNorm, units, gridth, gridph) 

28 self.measureType = measureType 

29 self.initMeasure() 

30 self.setData(data) 

31 

32 def initMeasure(self): 

33 """ Initiates the measure parameters : components, coefs and grids if needed. Called in __init__. """ 

34 self.nk = 2 * self.lmax * (self.lmax + 2) 

35 self.components = ["th", "ph"] 

36 self.coefs_list = ["tc", "sc", "ts", "ss"] 

37 self.initMatrices() 

38 if self.th is not None and self.ph is not None: 

39 self.computeGridPnm() 

40 

41 def setData(self, data): 

42 """ Sets the input data and extracts the time array and temporal mean of the coefs. Called in __init__. 

43 :param data: new measure data 

44 :type data: np.array 

45 """ 

46 self.clearCache() 

47 middle = int(data.shape[1]/2) 

48 midnk = int(self.nk/2) 

49 # Do the truncation on the coefficients t_l^m and s_l^m to keep only midnk on each 

50 self.data = data[:, np.r_[0:midnk, middle:middle+midnk]] 

51 self.ntimes = data.shape[0] 

52 self.coefs_mean = np.mean(self.data, axis=0) 

53 self.has_realisations = False 

54 if self.data.ndim >= 3: 

55 self.has_realisations = True 

56 

57 def lm2k(self, l, m, coef): 

58 """ Computes the index k of a coef given its name, its degree l and its order m. . Returns None if no coef exists. 

59 :param l: spherical harmonic degree 

60 :type l: int 

61 :param m: spherical harmonic order 

62 :type m: int 

63 :param coef: type of coef 

64 :type coef: str ('g', 'h' or 'LOD') 

65 :return: index of the coef in data 

66 :rtype: int or None 

67 """ 

68 if m > l: 

69 return None 

70 addk = 0 

71 if coef.startswith("s"): 

72 addk = self.lmax*(self.lmax+2) 

73 

74 if m == 0: 

75 if coef == "ts" or coef == "ss": 

76 return None 

77 return int(l**2 - 1 + addk) 

78 

79 if coef.endswith("c"): 

80 return int(l**2 + 2*m - 2 + addk) 

81 if coef.endswith("s"): 

82 return int(l**2 + 2*m - 1 + addk) 

83 

84 def k2lm(self, k): 

85 """ Returns the degree, the order and the name of a coef of given k. 

86 :param k: index of the coef 

87 :type k: int 

88 :return: degree, order and name of the coef of index k 

89 :rtype: int, int, str 

90 """ 

91 return self.l[k], self.m[k], self.coefname[k] 

92 

93 def initMatrices(self): 

94 """ Creates the matrices that allow to identify and index the spherical 

95 harmonic coefficients ts,tc,ss and sc. Called in __init__. 

96 """ 

97 k = np.arange(2*self.lmax*(self.lmax+2)) 

98 kmod = k % (self.lmax*(self.lmax+2)) 

99 self.l = np.floor(np.sqrt(kmod+1)).astype(int) 

100 kl2 = (kmod+1-self.l**2) 

101 self.is_sin = np.logical_and(np.mod(kl2, 2) == 0, kl2 != 0) 

102 self.is_cos = np.logical_not(self.is_sin) 

103 self.is_t = k<(self.lmax*(self.lmax+2)) 

104 self.is_s = np.logical_not(self.is_t) 

105 self.is_tc = np.logical_and(self.is_cos, self.is_t) 

106 self.is_ts = np.logical_and(self.is_sin, self.is_t) 

107 self.is_sc = np.logical_and(self.is_cos, self.is_s) 

108 self.is_ss = np.logical_and(self.is_sin, self.is_s) 

109 self.coefname = np.where(self.is_tc, "tc", 

110 np.where(self.is_ts, "ts", 

111 np.where(self.is_sc, "sc", 

112 np.where(self.is_ss, "ss",None)))) 

113 self.m = np.ceil(kl2/2).astype(int) 

114 

115 def set_expand_dims(self, useRealisations): 

116 if useRealisations: 

117 # for broadcast dimension (coef is the second dimension on 3) 

118 self.is_tc_ = self.expand_dims(self.is_tc) 

119 self.is_ts_ = self.expand_dims(self.is_ts) 

120 self.is_sc_ = self.expand_dims(self.is_sc) 

121 self.is_ss_ = self.expand_dims(self.is_ss) 

122 self.m_ = self.expand_dims(self.m) 

123 self.l_ = self.expand_dims(self.l) 

124 self.m_tc_ = self.m_ * self.is_tc_ 

125 self.m_ts_ = self.m_ * self.is_ts_ 

126 self.m_sc_ = self.m_ * self.is_sc_ 

127 self.m_ss_ = self.m_ * self.is_ss_ 

128 self.lplus1 = self.l_+1 

129 else: 

130 self.is_tc_ = self.is_tc 

131 self.is_ts_ = self.is_ts 

132 self.is_sc_ = self.is_sc 

133 self.is_ss_ = self.is_ss 

134 self.m_ = self.m 

135 self.l_ = self.l 

136 self.m_tc_ = self.m_ * self.is_tc_ 

137 self.m_ts_ = self.m_ * self.is_ts_ 

138 self.m_sc_ = self.m_ * self.is_sc_ 

139 self.m_ss_ = self.m_ * self.is_ss_ 

140 self.lplus1 = self.l_+1 

141 

142 def computeRThetaPhiData(self, r, thlist, phlist, components=None, usegridPnm=False, computeallrealisation=False, irealisation=-1): 

143 """ Computes the components of the quantity described by the spherical harmonic potential 

144 given radius, a colatitude list and a longitude list. 

145 

146 :param r: radius at which the computation is done 

147 :type r: float 

148 :param thlist: list containing the colatitudes at which the computation is done 

149 :type thlist: list 

150 :param phlist: list containing the longitudes at which the computation is done 

151 :type phlist: list 

152 :param components: list of components to compute (default to ["r", "th", "ph", "norm"]) 

153 :type components: list 

154 :param usegridPnm: boolean indicating if the Pnm computed previously should be used (default=False) 

155 :type usegridPnm: bool 

156 :param computeallrealisation: boolean indicating if the computation should be on all realisations or on the mean (default=False) 

157 :type computeallrealisation: bool 

158 :param irealisation: index of the realisation to use (default=-1 and takes the mean instead) 

159 :type irealisation: int 

160 :return: result of the computation 

161 :rtype: dict of np.array of dim (date x n_ph x n_th) 

162 """ 

163 if r != rC: 

164 raise ValueError('Error: for U or S type measure, r(%f) should be equal to rC(%f)' % (r, rC)) 

165 

166 if self.has_realisations and not computeallrealisation: 

167 if irealisation < 0: 

168 realisation_data = np.mean(self.data, axis=2) 

169 else: 

170 realisation_data = self.data[:, :, irealisation] 

171 else: 

172 realisation_data = self.data 

173 

174 # Set default components if not given 

175 if components is None: 

176 components = ["th", "ph", "norm"]#, "divh"] 

177 

178 # The computation of norm requires th and ph component 

179 if ("norm" in components) and not usegridPnm: 

180 for additionalComponent in self.components: 

181 if additionalComponent not in components: 

182 components.append(additionalComponent) 

183 

184 print("WILL COMPUTE ", components) 

185 

186 if self.has_realisations and computeallrealisation: 

187 result = {component: np.zeros((self.ntimes, len(thlist), len(phlist), realisation_data.shape[2])) for component in components} 

188 # Use expanded arrays to fit dimension of realisations 

189 self.set_expand_dims(True) 

190 else: 

191 result = {component: np.zeros((self.ntimes, len(thlist), len(phlist))) for component in components} 

192 self.set_expand_dims(False) 

193 

194 for ith, th in enumerate(thlist): 

195 if usegridPnm: 

196 if not self.has_grid: 

197 raise ValueError("The data has no real space (th,ph) grid") 

198 self.computingState = (ith+1)/self.nth*100 

199 sinth = np.sin(th) 

200 

201 if usegridPnm: 

202 Pnm = self.Pnm[ith] 

203 dPnm = self.dPnm[ith] 

204 else: 

205 Pnm, dPnm = self.computePnm(th) 

206 

207 if self.has_realisations and computeallrealisation: 

208 Pnm = self.expand_dims(Pnm) 

209 dPnm = self.expand_dims(dPnm) 

210 

211 for iph, ph in enumerate(phlist): 

212 cosmph = np.cos(self.m_*ph) 

213 sinmph = np.sin(self.m_*ph) 

214 

215 if "divhu" in components: 

216 # Computation of the Gauss coefficents was already done when the measure DIVHU was created 

217 # So we only need to sum them all 

218 result["divhu"][:, ith, iph] = np.sum((cosmph*self.is_sc_+sinmph*self.is_ss_) * Pnm * realisation_data, axis=1) 

219 

220 if "th" in components: 

221 fac_tcnm_th = -self.m_tc_ * sinmph/sinth * Pnm 

222 fac_tsnm_th = self.m_ts_ * cosmph/sinth * Pnm 

223 fac_scnm_th = cosmph * self.is_sc_ * dPnm 

224 fac_ssnm_th = sinmph * self.is_ss_ * dPnm 

225 result["th"][:, ith, iph] = np.sum( 

226 (fac_tcnm_th + fac_tsnm_th + fac_scnm_th + fac_ssnm_th) 

227 * realisation_data, axis=1) 

228 

229 if "ph" in components: 

230 fac_tcnm_ph = -cosmph * self.is_tc_ * dPnm 

231 fac_tsnm_ph = -sinmph * self.is_ts_ * dPnm 

232 fac_scnm_ph = -self.m_sc_ * sinmph/sinth * Pnm 

233 fac_ssnm_ph = self.m_ss_ * cosmph/sinth * Pnm 

234 result["ph"][:, ith, iph] = np.sum( 

235 (fac_tcnm_ph + fac_tsnm_ph + fac_scnm_ph 

236 + fac_ssnm_ph) * realisation_data, axis=1) 

237 

238 if "norm" in components: 

239 if "ph" in result: 

240 resultph = result["ph"][:, ith, iph] 

241 else: 

242 resultph = self.cachedRThetaPhiData[r]["ph"][:, ith, iph] 

243 if "th" in result: 

244 resultth = result["th"][:, ith, iph] 

245 else: 

246 resultth = self.cachedRThetaPhiData[r]["th"][:, ith, iph] 

247 result["norm"][:, ith, iph] = np.sqrt(resultth**2 

248 + resultph**2) 

249 # Calculation of divhu as a component 

250 # NOT USED ANYMORE : See "DIVHU" measure instead 

251 if "divh" in components: 

252 fac_scnm_divh = cosmph/r * self.is_sc_ 

253 fac_ssnm_divh = sinmph/r * self.is_ss_ 

254 result["divh"][:, ith, iph] = np.sum( 

255 (fac_scnm_divh + fac_ssnm_divh) 

256 * self.l_ * (self.l_+1) * Pnm 

257 * realisation_data, axis=1) 

258 # End of the phi loop 

259 # End of the theta loop 

260 

261 return result 

262 

263 def computeNorm(self, r, itime): 

264 """ 

265 Computes norm at a certain time and place using cached data. 

266 

267 :param r: radius at which to evaluate the norm 

268 :type r: float 

269 :param itime: index of time 

270 :type itime: int 

271 :return: Numpy array 

272 :rtype: np.array 

273 """ 

274 return np.sqrt((self.cachedRThetaPhiData[r]["th"][itime] - self.cachedRThetaPhiData_mean[r]["th"]) ** 2 

275 + (self.cachedRThetaPhiData[r]["ph"][itime] - self.cachedRThetaPhiData_mean[r]["ph"]) ** 2) 

276 

277 def _LowesPrefactor(self, l): 

278 return l * (l + 1) / (2 * l + 1) 

279 

280 def computeUGeostrophic(self, r, thlist, computeallrealisation=False, irealisation=-1): 

281 if r != rC: 

282 raise ValueError('Error: for U or S type measure, r(%f) should be equal to rC(%f)' % (r, rC)) 

283 

284 if self.has_realisations and not computeallrealisation: 

285 if irealisation < 0: 

286 spectral_data = np.mean(self.data, axis=2) 

287 else: 

288 spectral_data = self.data[:, :, irealisation] 

289 else: 

290 spectral_data = self.data 

291 

292 if self.has_realisations and computeallrealisation: 

293 result = np.zeros((self.ntimes, len(thlist), spectral_data.shape[2])) 

294 # Use expanded arrays to fit dimension of realisations 

295 self.set_expand_dims(True) 

296 else: 

297 result = np.zeros((self.ntimes, len(thlist))) 

298 self.set_expand_dims(False) 

299 

300 for ith, th in enumerate(thlist): 

301 Pnm, dPnm = self.computePnm(th) 

302 

303 if self.has_realisations and computeallrealisation: 

304 dPnm = self.expand_dims(dPnm) 

305 

306 for l in range(0, self.lmax): 

307 k = self.lm2k(l, 0, "tc") 

308 result[:, ith] += - spectral_data[:, k] * dPnm[k] 

309 

310 return result