Coverage for webgeodyn/data/TSData.py: 0%
170 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-18 09:33 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-18 09:33 +0000
1import numpy as np
2from .measuredata import MeasureData
3from .normPnm import semiNormalisedSchmidt
4from webgeodyn.constants import rE, rC
7# Data for U and S (tc,ts,sc,ss toroidal/poloidal harmonics coefficients)
8class TSData(MeasureData):
9 """ Class handling U and S (toroidal/poloidal harmonics coefficients)."""
10 def __init__(self, data, lmax, units, measureType="U", PnmNorm=semiNormalisedSchmidt, gridth=None, gridph=None):
11 """ Constructor of TSData class
12 :param data: measure data
13 :type data: np.array
14 :param lmax: maximum degree
15 :type lmax: int
16 :param units: units of the measure
17 :type units: str
18 :param measureType: type of the measure (default = 'U')
19 :type measureType: str ('U', 'S' or 'DIVHU')
20 :param PnmNorm: normalisation to use (default: semiNormalisedSchmidt)
21 :type PnmNorm: function
22 :param gridth: grid on which the theta are evaluated (default: None)
23 :type gridth: np.array
24 :param gridph: grid on which the theta are evaluated (default: None)
25 :type gridph: np.array
26 """
27 super().__init__(data.shape, lmax, PnmNorm, units, gridth, gridph)
28 self.measureType = measureType
29 self.initMeasure()
30 self.setData(data)
32 def initMeasure(self):
33 """ Initiates the measure parameters : components, coefs and grids if needed. Called in __init__. """
34 self.nk = 2 * self.lmax * (self.lmax + 2)
35 self.components = ["th", "ph"]
36 self.coefs_list = ["tc", "sc", "ts", "ss"]
37 self.initMatrices()
38 if self.th is not None and self.ph is not None:
39 self.computeGridPnm()
41 def setData(self, data):
42 """ Sets the input data and extracts the time array and temporal mean of the coefs. Called in __init__.
43 :param data: new measure data
44 :type data: np.array
45 """
46 self.clearCache()
47 middle = int(data.shape[1]/2)
48 midnk = int(self.nk/2)
49 # Do the truncation on the coefficients t_l^m and s_l^m to keep only midnk on each
50 self.data = data[:, np.r_[0:midnk, middle:middle+midnk]]
51 self.ntimes = data.shape[0]
52 self.coefs_mean = np.mean(self.data, axis=0)
53 self.has_realisations = False
54 if self.data.ndim >= 3:
55 self.has_realisations = True
57 def lm2k(self, l, m, coef):
58 """ Computes the index k of a coef given its name, its degree l and its order m. . Returns None if no coef exists.
59 :param l: spherical harmonic degree
60 :type l: int
61 :param m: spherical harmonic order
62 :type m: int
63 :param coef: type of coef
64 :type coef: str ('g', 'h' or 'LOD')
65 :return: index of the coef in data
66 :rtype: int or None
67 """
68 if m > l:
69 return None
70 addk = 0
71 if coef.startswith("s"):
72 addk = self.lmax*(self.lmax+2)
74 if m == 0:
75 if coef == "ts" or coef == "ss":
76 return None
77 return int(l**2 - 1 + addk)
79 if coef.endswith("c"):
80 return int(l**2 + 2*m - 2 + addk)
81 if coef.endswith("s"):
82 return int(l**2 + 2*m - 1 + addk)
84 def k2lm(self, k):
85 """ Returns the degree, the order and the name of a coef of given k.
86 :param k: index of the coef
87 :type k: int
88 :return: degree, order and name of the coef of index k
89 :rtype: int, int, str
90 """
91 return self.l[k], self.m[k], self.coefname[k]
93 def initMatrices(self):
94 """ Creates the matrices that allow to identify and index the spherical
95 harmonic coefficients ts,tc,ss and sc. Called in __init__.
96 """
97 k = np.arange(2*self.lmax*(self.lmax+2))
98 kmod = k % (self.lmax*(self.lmax+2))
99 self.l = np.floor(np.sqrt(kmod+1)).astype(int)
100 kl2 = (kmod+1-self.l**2)
101 self.is_sin = np.logical_and(np.mod(kl2, 2) == 0, kl2 != 0)
102 self.is_cos = np.logical_not(self.is_sin)
103 self.is_t = k<(self.lmax*(self.lmax+2))
104 self.is_s = np.logical_not(self.is_t)
105 self.is_tc = np.logical_and(self.is_cos, self.is_t)
106 self.is_ts = np.logical_and(self.is_sin, self.is_t)
107 self.is_sc = np.logical_and(self.is_cos, self.is_s)
108 self.is_ss = np.logical_and(self.is_sin, self.is_s)
109 self.coefname = np.where(self.is_tc, "tc",
110 np.where(self.is_ts, "ts",
111 np.where(self.is_sc, "sc",
112 np.where(self.is_ss, "ss",None))))
113 self.m = np.ceil(kl2/2).astype(int)
115 def set_expand_dims(self, useRealisations):
116 if useRealisations:
117 # for broadcast dimension (coef is the second dimension on 3)
118 self.is_tc_ = self.expand_dims(self.is_tc)
119 self.is_ts_ = self.expand_dims(self.is_ts)
120 self.is_sc_ = self.expand_dims(self.is_sc)
121 self.is_ss_ = self.expand_dims(self.is_ss)
122 self.m_ = self.expand_dims(self.m)
123 self.l_ = self.expand_dims(self.l)
124 self.m_tc_ = self.m_ * self.is_tc_
125 self.m_ts_ = self.m_ * self.is_ts_
126 self.m_sc_ = self.m_ * self.is_sc_
127 self.m_ss_ = self.m_ * self.is_ss_
128 self.lplus1 = self.l_+1
129 else:
130 self.is_tc_ = self.is_tc
131 self.is_ts_ = self.is_ts
132 self.is_sc_ = self.is_sc
133 self.is_ss_ = self.is_ss
134 self.m_ = self.m
135 self.l_ = self.l
136 self.m_tc_ = self.m_ * self.is_tc_
137 self.m_ts_ = self.m_ * self.is_ts_
138 self.m_sc_ = self.m_ * self.is_sc_
139 self.m_ss_ = self.m_ * self.is_ss_
140 self.lplus1 = self.l_+1
142 def computeRThetaPhiData(self, r, thlist, phlist, components=None, usegridPnm=False, computeallrealisation=False, irealisation=-1):
143 """ Computes the components of the quantity described by the spherical harmonic potential
144 given radius, a colatitude list and a longitude list.
146 :param r: radius at which the computation is done
147 :type r: float
148 :param thlist: list containing the colatitudes at which the computation is done
149 :type thlist: list
150 :param phlist: list containing the longitudes at which the computation is done
151 :type phlist: list
152 :param components: list of components to compute (default to ["r", "th", "ph", "norm"])
153 :type components: list
154 :param usegridPnm: boolean indicating if the Pnm computed previously should be used (default=False)
155 :type usegridPnm: bool
156 :param computeallrealisation: boolean indicating if the computation should be on all realisations or on the mean (default=False)
157 :type computeallrealisation: bool
158 :param irealisation: index of the realisation to use (default=-1 and takes the mean instead)
159 :type irealisation: int
160 :return: result of the computation
161 :rtype: dict of np.array of dim (date x n_ph x n_th)
162 """
163 if r != rC:
164 raise ValueError('Error: for U or S type measure, r(%f) should be equal to rC(%f)' % (r, rC))
166 if self.has_realisations and not computeallrealisation:
167 if irealisation < 0:
168 realisation_data = np.mean(self.data, axis=2)
169 else:
170 realisation_data = self.data[:, :, irealisation]
171 else:
172 realisation_data = self.data
174 # Set default components if not given
175 if components is None:
176 components = ["th", "ph", "norm"]#, "divh"]
178 # The computation of norm requires th and ph component
179 if ("norm" in components) and not usegridPnm:
180 for additionalComponent in self.components:
181 if additionalComponent not in components:
182 components.append(additionalComponent)
184 print("WILL COMPUTE ", components)
186 if self.has_realisations and computeallrealisation:
187 result = {component: np.zeros((self.ntimes, len(thlist), len(phlist), realisation_data.shape[2])) for component in components}
188 # Use expanded arrays to fit dimension of realisations
189 self.set_expand_dims(True)
190 else:
191 result = {component: np.zeros((self.ntimes, len(thlist), len(phlist))) for component in components}
192 self.set_expand_dims(False)
194 for ith, th in enumerate(thlist):
195 if usegridPnm:
196 if not self.has_grid:
197 raise ValueError("The data has no real space (th,ph) grid")
198 self.computingState = (ith+1)/self.nth*100
199 sinth = np.sin(th)
201 if usegridPnm:
202 Pnm = self.Pnm[ith]
203 dPnm = self.dPnm[ith]
204 else:
205 Pnm, dPnm = self.computePnm(th)
207 if self.has_realisations and computeallrealisation:
208 Pnm = self.expand_dims(Pnm)
209 dPnm = self.expand_dims(dPnm)
211 for iph, ph in enumerate(phlist):
212 cosmph = np.cos(self.m_*ph)
213 sinmph = np.sin(self.m_*ph)
215 if "divhu" in components:
216 # Computation of the Gauss coefficents was already done when the measure DIVHU was created
217 # So we only need to sum them all
218 result["divhu"][:, ith, iph] = np.sum((cosmph*self.is_sc_+sinmph*self.is_ss_) * Pnm * realisation_data, axis=1)
220 if "th" in components:
221 fac_tcnm_th = -self.m_tc_ * sinmph/sinth * Pnm
222 fac_tsnm_th = self.m_ts_ * cosmph/sinth * Pnm
223 fac_scnm_th = cosmph * self.is_sc_ * dPnm
224 fac_ssnm_th = sinmph * self.is_ss_ * dPnm
225 result["th"][:, ith, iph] = np.sum(
226 (fac_tcnm_th + fac_tsnm_th + fac_scnm_th + fac_ssnm_th)
227 * realisation_data, axis=1)
229 if "ph" in components:
230 fac_tcnm_ph = -cosmph * self.is_tc_ * dPnm
231 fac_tsnm_ph = -sinmph * self.is_ts_ * dPnm
232 fac_scnm_ph = -self.m_sc_ * sinmph/sinth * Pnm
233 fac_ssnm_ph = self.m_ss_ * cosmph/sinth * Pnm
234 result["ph"][:, ith, iph] = np.sum(
235 (fac_tcnm_ph + fac_tsnm_ph + fac_scnm_ph
236 + fac_ssnm_ph) * realisation_data, axis=1)
238 if "norm" in components:
239 if "ph" in result:
240 resultph = result["ph"][:, ith, iph]
241 else:
242 resultph = self.cachedRThetaPhiData[r]["ph"][:, ith, iph]
243 if "th" in result:
244 resultth = result["th"][:, ith, iph]
245 else:
246 resultth = self.cachedRThetaPhiData[r]["th"][:, ith, iph]
247 result["norm"][:, ith, iph] = np.sqrt(resultth**2
248 + resultph**2)
249 # Calculation of divhu as a component
250 # NOT USED ANYMORE : See "DIVHU" measure instead
251 if "divh" in components:
252 fac_scnm_divh = cosmph/r * self.is_sc_
253 fac_ssnm_divh = sinmph/r * self.is_ss_
254 result["divh"][:, ith, iph] = np.sum(
255 (fac_scnm_divh + fac_ssnm_divh)
256 * self.l_ * (self.l_+1) * Pnm
257 * realisation_data, axis=1)
258 # End of the phi loop
259 # End of the theta loop
261 return result
263 def computeNorm(self, r, itime):
264 """
265 Computes norm at a certain time and place using cached data.
267 :param r: radius at which to evaluate the norm
268 :type r: float
269 :param itime: index of time
270 :type itime: int
271 :return: Numpy array
272 :rtype: np.array
273 """
274 return np.sqrt((self.cachedRThetaPhiData[r]["th"][itime] - self.cachedRThetaPhiData_mean[r]["th"]) ** 2
275 + (self.cachedRThetaPhiData[r]["ph"][itime] - self.cachedRThetaPhiData_mean[r]["ph"]) ** 2)
277 def _LowesPrefactor(self, l):
278 return l * (l + 1) / (2 * l + 1)
280 def computeUGeostrophic(self, r, thlist, computeallrealisation=False, irealisation=-1):
281 if r != rC:
282 raise ValueError('Error: for U or S type measure, r(%f) should be equal to rC(%f)' % (r, rC))
284 if self.has_realisations and not computeallrealisation:
285 if irealisation < 0:
286 spectral_data = np.mean(self.data, axis=2)
287 else:
288 spectral_data = self.data[:, :, irealisation]
289 else:
290 spectral_data = self.data
292 if self.has_realisations and computeallrealisation:
293 result = np.zeros((self.ntimes, len(thlist), spectral_data.shape[2]))
294 # Use expanded arrays to fit dimension of realisations
295 self.set_expand_dims(True)
296 else:
297 result = np.zeros((self.ntimes, len(thlist)))
298 self.set_expand_dims(False)
300 for ith, th in enumerate(thlist):
301 Pnm, dPnm = self.computePnm(th)
303 if self.has_realisations and computeallrealisation:
304 dPnm = self.expand_dims(dPnm)
306 for l in range(0, self.lmax):
307 k = self.lm2k(l, 0, "tc")
308 result[:, ith] += - spectral_data[:, k] * dPnm[k]
310 return result