Coverage for pygeodyn/shear/compute_error.py: 0%
52 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-22 13:43 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-12-22 13:43 +0000
1import numpy as np
2import os
3import time
4import logging
5from pygeodyn.shear.shear_algo import ShearError
6from mpi4py import MPI
8# directory of the script
9dir = os.path.dirname(os.path.abspath(__file__))
11# Computation parameters
12Ly = 18
13Lu = 18
14Lb = 13
15prior_dir_shear = dir + "/../data/priors/CE_augkf" #coupled_earth
16prior_type_shear = "coupled_earth" #coupled_earth
17#prior_dir_shear = dir + "/../data/priors/midpath" #midpath
18#prior_type_shear = "midpath" #midpath
19#prior_dir_shear = dir + "/../data/priors/71path" #71path
20#prior_type_shear = "71path" #71path
21TauG = 0
23def compute_error(Ly, Lu, Lb, TauG, prior_dir_shear, prior_type_shear):
24 """
25 Compute the error of representativeness for all prior times and save it in prior_dir_shear
27 :param Ly: Maximal spherical harmonic degree of the constraint on the shear
28 :type Ly: int
29 :param Lu: Maximal spherical harmonic degree of the core flow
30 :type Lu: int
31 :param Lb: Maximal spherical harmonic degree of the magnetic field
32 :type Lb: int
33 :param TauG: Time constant for the shear (MUST REMAIN 0 SO FAR)
34 :type TauG: int
35 :param prior_dir_shear: Directory containing the prior data for the shear computation
36 :type prior_dir_shear: str
37 :param prior_type_shear: Prior type for the shear computation
38 :type prior_type_shear: str
39 :param test: Test directory that contains reference data for tests
40 :type test: str
41 """
43 # to improve performance in parrallel
44 os.environ["MKL_NUM_THREADS"] = "1"
45 os.environ["NUMEXPR_NUM_THREADS"] = "1"
46 os.environ["OMP_NUM_THREADS"] = "1"
48 # Init MPI
49 if not MPI.Is_initialized():
50 MPI.Init()
51 comm = MPI.COMM_WORLD
52 nb_proc = comm.Get_size()
53 rank = comm.Get_rank()
55 def first_process():
56 return rank == 0
58 # Log
59 logging_level = logging.INFO
60 log_format = '%(asctime)s - Process {} - %(levelname)s - %(message)s'.format(rank)
61 logging.basicConfig(format=log_format, level=logging_level)
63 if first_process:
64 begin_time = time.time()
66 # Init ShearError instance
67 error = ShearError(Ly, Lu, Lb, TauG, prior_dir_shear, prior_type_shear)
69 # Precompute operators
70 error.precompute_operators()
72 # Compute
73 U,DU,MF,SV = error.load_prior_for_err_rep()
75 N_times = U.shape[0]
77 # Build the attribution of times if several processes in parallel
78 attributed_times = range(rank, N_times, nb_proc)
80 store = np.zeros((N_times, error.Ny2))
82 for i in attributed_times:
83 logging.info("Computing error at time index " + str(i) + " out of " + str(N_times))
84 u = U[i,:]
85 du = DU[i,:]
86 mf = MF[i,:]
87 sv = SV[i,:]
89 store[i,:] = error.compute_error(u, du, mf, sv)
91 # Recover all errors by summing all arrays
92 # Note that Allreduce updates the array of each process with the result so no need to broadcast afterwards.
93 logging.info("Waiting all process to finish computing the error of representativeness")
94 comm.Allreduce(MPI.IN_PLACE, store, op=MPI.SUM)
95 logging.info("Sync done !".format(rank))
97 if first_process:
98 # Save error in prior_dir_shear
99 np.save(prior_dir_shear + "/y12_err_repr_Ly_{}_Lu_{}_Lb_{}.npy".format(Ly,Lu,Lb), store)
101 end_time = time.time()
102 elapsed_time = end_time - begin_time
103 logging.info("Elapsed time : {:.2f}".format(elapsed_time))
105if __name__ == '__main__':
106 compute_error(Ly, Lu, Lb, TauG, prior_dir_shear, prior_type_shear)