Coverage for webgeodyn/inout/zforecast.py: 0%

72 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-12-18 09:33 +0000

1import os 

2import re 

3import numpy as np 

4from .default import giveMeasureTypeUnits 

5 

6 

7def getDataAndLmax(data, measureType): 

8 """ Returns the data and the lmax for the data and type of measure given. 

9 :param data: Array containing the measure data 

10 :type data: ndarray 

11 :param measureType: Type of the measure 

12 :type measureType: str 

13 :return: Data and the lmax computed from the data shape 

14 :rtype: ndarray, int 

15 """ 

16 sizeData = np.where(data[0] != 0)[0].shape[0] - 1 

17 if measureType == "U": 

18 lmax = (-2+np.sqrt(4+4*sizeData/2))/2 

19 nk = int(lmax*(lmax+2)) 

20 middle = int((data.shape[1] - 1)/2) 

21 data = data[:, np.r_[1:nk+1, middle+1:middle+nk+1]] 

22 else: 

23 lmax = (-2+np.sqrt(4+4*sizeData))/2 

24 nk = int(lmax*(lmax+2)) 

25 data = data[:, 1:nk+1] 

26 # Check if computed lmax is int 

27 if not (int(lmax) == lmax): 

28 raise ValueError('[{}] File size ({}) is not multiple of lmax*(lmax + 2), lmax={}'.format(measureType, sizeData, lmax)) 

29 

30 return data, lmax 

31 

32 

33def load(dataDirectory, dataModel, keepRealisations=False): 

34 """ Loading function for zForecast files. Also adds the data to the dataModel. 

35 

36 :param dataDirectory: Location of the zForecast files 

37 :type dataDirectory: os.path 

38 :param dataModel: Model in which to add the loaded measures 

39 :type dataModel: Model 

40 :param keepRealisations: If True, all realisations are kept in the data. Else, the data is averaged over the realisations 

41 :type keepRealisations: bool 

42 :return: 0 if everything went well, -1 otherwise 

43 :rtype: int 

44 """ 

45 firstpoint = 3 # CUT FIRST 3 points 

46 times = None 

47 

48 filelist = {} 

49 for file in os.listdir(dataDirectory): 

50 # If realisations are kept, only build a filelist that will be treated afterwards 

51 if keepRealisations: 

52 matchfile = re.match('^ZReal_Forecast_Analysis_(.*)_(.*).dat$', file) 

53 if matchfile is not None: 

54 measureName = matchfile.group(1) 

55 irealisation = int(matchfile.group(2))-1 

56 if measureName not in filelist: 

57 filelist[measureName] = [] 

58 filelist[measureName].append([irealisation, file]) 

59 

60 # Else read Mean+RMS for all values and add them to the model 

61 else: 

62 matchfile = re.match('^ZForecast_Analysis_(.*).dat$', file) 

63 if matchfile is not None: 

64 measureName = matchfile.group(1) 

65 datafile = os.path.join(dataDirectory, file) 

66 rms_datafile = os.path.join(dataDirectory, "ZRMS_" + measureName + "_Analysis.dat") 

67 measureType, units = giveMeasureTypeUnits(measureName) 

68 

69 # Reading measure file 

70 sourceData = np.loadtxt(datafile)[firstpoint:] 

71 sourceDataRMS = np.loadtxt(rms_datafile)[firstpoint:] 

72 

73 # Extract times (once for all files) 

74 if times is None: 

75 print("Ignoring first %i points" % firstpoint) 

76 times = sourceData[:, 0] 

77 itimes_analysis = np.arange((302-firstpoint) % 3, times.shape[0], 3) 

78 times_analysis = times[itimes_analysis] 

79 print("Found %i analysis time steps from %f to %f" % (times_analysis.shape[0], 

80 times_analysis[0], times_analysis[-1])) 

81 

82 # Prevent from loading times that are not the same for all files 

83 if not np.allclose(times, sourceData[:, 0]) or not np.allclose(times, sourceDataRMS[:, 0]): 

84 print('[%s] Times are not the same in all files', measureName) 

85 return -1 

86 

87 sourceData, lmax = getDataAndLmax(sourceData, measureType) 

88 sourceDataRMS, lmax = getDataAndLmax(sourceDataRMS, measureType) 

89 

90 rmsdata = np.zeros((sourceDataRMS[itimes_analysis].shape[0], 

91 sourceDataRMS[itimes_analysis].shape[1], 2)) 

92 rmsdata[:, :, 1] = sourceDataRMS[itimes_analysis] 

93 rmsdata[:, :, 0] = sourceDataRMS[itimes_analysis-1] 

94 

95 dataModel.addMeasure(measureName, measureType, lmax, units, 

96 sourceData[itimes_analysis], rmsdata, times_analysis) 

97 

98 # Treat the filelist previously build by extracting the data and adding it to the model 

99 if keepRealisations: 

100 for measureName in filelist: 

101 measureData = None 

102 measureType, units = giveMeasureTypeUnits(measureName) 

103 

104 for ireal, fileName in filelist[measureName]: 

105 print(ireal, " Reading", measureName, fileName) 

106 datafile = os.path.join(dataDirectory, fileName) 

107 

108 # Reading realisation file 

109 sourceData = np.loadtxt(datafile)[firstpoint:] 

110 

111 # Extract times (once for all files) 

112 if times is None: 

113 times = sourceData[:, 0] 

114 # Prevent from loading times that are not the same for all files 

115 if not np.allclose(times, sourceData[:, 0]): 

116 print('[%s] Times are not the same in all files !', measureName) 

117 return -1 

118 

119 # Extract sourceData and the maximum sph. harm. coef lmax 

120 sourceData, lmax = getDataAndLmax(sourceData, measureType) 

121 if measureData is None: 

122 # Initialise data to be of size [ntimes, ncoef, nrealisations] 

123 measureData = np.zeros((sourceData.shape[0], sourceData.shape[1], len(filelist[measureName]))) 

124 measureData[:, :, ireal] = sourceData 

125 

126 # Add the data loaded to the model as measures 

127 dataModel.addMeasure(measureName, measureType, lmax, units, 

128 measureData, times=times) 

129 

130 # Returns 0 if everything went well 

131 return 0