Coverage for pygeodyn/shear/compute_error.py: 0%

52 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-12-22 13:43 +0000

1import numpy as np 

2import os 

3import time 

4import logging 

5from pygeodyn.shear.shear_algo import ShearError 

6from mpi4py import MPI 

7 

8# directory of the script 

9dir = os.path.dirname(os.path.abspath(__file__)) 

10 

11# Computation parameters 

12Ly = 18 

13Lu = 18 

14Lb = 13 

15prior_dir_shear = dir + "/../data/priors/CE_augkf" #coupled_earth 

16prior_type_shear = "coupled_earth" #coupled_earth 

17#prior_dir_shear = dir + "/../data/priors/midpath" #midpath 

18#prior_type_shear = "midpath" #midpath 

19#prior_dir_shear = dir + "/../data/priors/71path" #71path 

20#prior_type_shear = "71path" #71path 

21TauG = 0 

22 

23def compute_error(Ly, Lu, Lb, TauG, prior_dir_shear, prior_type_shear): 

24 """ 

25 Compute the error of representativeness for all prior times and save it in prior_dir_shear 

26 

27 :param Ly: Maximal spherical harmonic degree of the constraint on the shear 

28 :type Ly: int 

29 :param Lu: Maximal spherical harmonic degree of the core flow 

30 :type Lu: int 

31 :param Lb: Maximal spherical harmonic degree of the magnetic field 

32 :type Lb: int 

33 :param TauG: Time constant for the shear (MUST REMAIN 0 SO FAR) 

34 :type TauG: int 

35 :param prior_dir_shear: Directory containing the prior data for the shear computation 

36 :type prior_dir_shear: str 

37 :param prior_type_shear: Prior type for the shear computation 

38 :type prior_type_shear: str 

39 :param test: Test directory that contains reference data for tests 

40 :type test: str 

41 """ 

42 

43 # to improve performance in parrallel 

44 os.environ["MKL_NUM_THREADS"] = "1" 

45 os.environ["NUMEXPR_NUM_THREADS"] = "1" 

46 os.environ["OMP_NUM_THREADS"] = "1" 

47 

48 # Init MPI 

49 if not MPI.Is_initialized(): 

50 MPI.Init() 

51 comm = MPI.COMM_WORLD 

52 nb_proc = comm.Get_size() 

53 rank = comm.Get_rank() 

54 

55 def first_process(): 

56 return rank == 0 

57 

58 # Log 

59 logging_level = logging.INFO 

60 log_format = '%(asctime)s - Process {} - %(levelname)s - %(message)s'.format(rank) 

61 logging.basicConfig(format=log_format, level=logging_level) 

62 

63 if first_process: 

64 begin_time = time.time() 

65 

66 # Init ShearError instance 

67 error = ShearError(Ly, Lu, Lb, TauG, prior_dir_shear, prior_type_shear) 

68 

69 # Precompute operators 

70 error.precompute_operators() 

71 

72 # Compute  

73 U,DU,MF,SV = error.load_prior_for_err_rep() 

74 

75 N_times = U.shape[0] 

76 

77 # Build the attribution of times if several processes in parallel 

78 attributed_times = range(rank, N_times, nb_proc) 

79 

80 store = np.zeros((N_times, error.Ny2)) 

81 

82 for i in attributed_times: 

83 logging.info("Computing error at time index " + str(i) + " out of " + str(N_times)) 

84 u = U[i,:] 

85 du = DU[i,:] 

86 mf = MF[i,:] 

87 sv = SV[i,:] 

88 

89 store[i,:] = error.compute_error(u, du, mf, sv) 

90 

91 # Recover all errors by summing all arrays 

92 # Note that Allreduce updates the array of each process with the result so no need to broadcast afterwards. 

93 logging.info("Waiting all process to finish computing the error of representativeness") 

94 comm.Allreduce(MPI.IN_PLACE, store, op=MPI.SUM) 

95 logging.info("Sync done !".format(rank)) 

96 

97 if first_process: 

98 # Save error in prior_dir_shear 

99 np.save(prior_dir_shear + "/y12_err_repr_Ly_{}_Lu_{}_Lb_{}.npy".format(Ly,Lu,Lb), store) 

100 

101 end_time = time.time() 

102 elapsed_time = end_time - begin_time 

103 logging.info("Elapsed time : {:.2f}".format(elapsed_time)) 

104 

105if __name__ == '__main__': 

106 compute_error(Ly, Lu, Lb, TauG, prior_dir_shear, prior_type_shear)