Source code for fluiddyn.calcul.sphericalharmo

Spherical harmonics operators (:mod:`fluiddyn.calcul.sphericalharmo`)

This module requires the C library shtns. To install it, run something like::

  ./configure --enable-openmp --enable-python
  make && make install

import sys
from time import time

import numpy as np

    import shtns
except ImportError:
    sht_orthonormal = shtns.sht_orthonormal
    sht_fourpi = shtns.sht_fourpi
    sht_schmidt = shtns.sht_schmidt

    # print('sht_norms', sht_orthonormal, sht_fourpi, sht_schmidt)

    sht_gauss = shtns.sht_gauss
    sht_auto = shtns.sht_auto
    sht_reg_fast = shtns.sht_reg_fast
    sht_reg_dct = shtns.sht_reg_dct
    sht_quick_init = shtns.sht_quick_init
    sht_reg_poles = shtns.sht_reg_poles
    sht_gauss_fly = shtns.sht_gauss_fly




flags for normalization (norm = ...)
 sht_orthonormal = 0  # orthonormalized spherical harmonics (default).
 sht_fourpi      = 1  # Geodesy and spectral analysis : 4.pi normalization.
 sht_schmidt     = 2  # Schmidt semi-normalized : 4.pi/(2l+1)

flags for spatial data layouts and grids used by SHTns
use the bitwise operator OR "|" to separate the different flags
for example:
flags = (shtns.sht_quick_init | shtns.SHT_PHI_CONTIGUOUS |

radius_earth = 6367470.0  # earth radius (m)

def compute_lmax(nlat, nl_order=2):
    return int(2 * nlat / (nl_order + 1) - 1)

def compute_nlatnlon(lmax, nl_order=2):
    if lmax % 2 == 0:
        lmax += 1
    nlat = int((lmax + 1) * (nl_order + 1) / 2)
    if nlat % 2 == 1:
        nlat += 1
    return nlat, 2 * nlat

def _for_test(i):
    if i % 2 == 1:
        assert i == compute_lmax(compute_nlatnlon(i)[0])

def bin_int(n, width):
    return "".join(str((n >> i) & 1) for i in range(width - 1, -1, -1))

[docs] class EasySHT: """Simple possibilities of shtns. Less possibilities but very simple to use... It has been written specially for atmospheric applications. Creation of a default instance:: esh = EasySHT(lmax=15) Parameters ---------- lmax : number {15} Truncation degree. mmax : {None, int} If is None, triangular truncation. mres : 1 Azimutal symmetry coefficient (see shtns documentation). norm=shtns.sht_fourpi For SH with quadratic mean equal to unity. nlat : {None, int} If None, computed by shtns to avoid aliasing. nlon : {None, int} If None, computed by shtns to avoid aliasing. flags : {sht_quick_init|SHT_PHI_CONTIGUOUS|SHT_SOUTH_POLE_FIRST, int} Option flag for shtns. polar_opt : {1.0e-8, float} Polar optimization threshold. nl_order : {2, int} Nonlinear order of the equations (used to compute nlat and nlon). radius : {radius_earth, number} Radius of the sphere (in meters) Notes ----- In contrast as with shtns, with easypysht the meridional unit vector points towards the North if shtns.SHT_SOUTH_POLE_FIRST is used (this is the default) and it points towards the South if shtns.SHT_SOUTH_POLE_FIRST is not given in flags (thus there is a change of sign in the meridional velocity). easypysht has been written for atmospheric applications thus some usual notations are used. Here are some definitions useful to understand the code: - l denotes the degree of the spherical harmonic functions - m denotes the order of the spherical harmonic functions - SH denotes spherical harmonic (spectral array) - spat denotes spatial array - lat denotes latitude - lon denotes longitude - kh denotes horizontal wavenumber - u denotes longitudinal velocity - v denotes meridional velocity (the sign depends on the used base) - hdiv denotes horizontal divergence - hrot denotes vertical vorticity (curl on the sphere) - grad denotes the horizontal gradient Variables and functions about spectral space array: lmax, mmax, mres, nlm, idx_lm(), l_idx, m_idx, l2_idx Variables about grid and real space array: nlat, nlon, lats, lons, sin_lats, LATS, LONS Variables for spectra: l2_l, kh_l is the instance of the class sht defined in All functions and variables of this class can be used directly from this instance, for example::, cost, phi) where t is the colatitude and phi is the longitude. """ def __init__( self, lmax=15, mmax=None, mres=1, norm=None, nlat=None, nlon=None, flags=None, polar_opt=1.0e-8, nl_order=2, radius=radius_earth, ): # to get a clear ImportError try: import shtns except ImportError: print( "ImportError shtns.\n\n" "To install shtns, you can run the following:\n" " git clone\n" " cd shtns\n" " ./configure --prefix=$VIRTUAL_ENV --enable-python\n" " make\n" " make install\n" ) raise if norm is None: norm = sht_fourpi if flags is None: flags = ( shtns.sht_quick_init | shtns.SHT_PHI_CONTIGUOUS | shtns.SHT_SOUTH_POLE_FIRST ) # print(lmax,mmax,mres,nlat,nlon,flags,polar_opt,nl_order,radius) # print('flags', flags, # shtns.sht_quick_init, shtns.SHT_PHI_CONTIGUOUS, # shtns.SHT_SOUTH_POLE_FIRST) if lmax is None and nlat is None: raise ValueError("lmax or nlat should be given.") elif lmax is None: lmax = compute_lmax(nlat) # print('lmax', lmax) self.lmax = int(lmax) self.radius = float(radius) # print(self.radius, radius_earth) if mmax is None and mres == 1: # triangular truncation self.mmax = self.lmax self.mres = 1 else: self.mmax = int(float(lmax) / mres) self.mres = mres print( "lmax={}, mmax={}, mres={}, norm={}".format( self.lmax, self.mmax, self.mres, norm ) ) = shtns.sht(self.lmax, self.mmax, self.mres, norm=norm) bin_flags = bin_int(flags, 14) # print(bin_flags, bin_flags[-14]) if bin_flags[-14] == "1": self.order_lat = "south_to_north" else: self.order_lat = "north_to_south" # in shtns, 0 means None if nlat is None: nlat = 0 if nlon is None: nlon = 0 self.nlat, self.nlon = nlat=nlat, nphi=nlon, flags=flags, polar_opt=polar_opt, nl_order=nl_order, ) # print('flags', flags) self.sin_lats = self.lons = np.arange(self.nlon) * 360.0 / (self.nlon * self.mres) self.lats = np.arcsin(self.sin_lats) / np.pi * 180.0 # for fluidsim plotting self.x_seq = self.lons self.y_seq = self.lats # degree self.l_idx = self.l2_idx = self.l_idx * (self.l_idx + 1) # laplacian:=l(l+1)/r^2 and laplacian^2 self.K2 = self.l2_idx / self.radius**2 self.K4 = self.K2**2 self.K8 = self.K4**2 self.K2_not0 = self.K2[:] COND = self.l2_idx == 0 self.K2_not0[COND] = 1e-15 # print('l2_idx=', self.l2_idx) self.m_idx = self.nlm = self.deltax = 360.0 / (self.nlon * self.mres) # wrong but useful for fluidsim self.deltay = self.deltax # create arrays 2D lats et lons self.LONS, self.LATS = np.meshgrid(self.lons, self.lats) self.cosLATS = np.cos(self.LATS / 180 * np.pi) self.lrange = np.arange(self.lmax + 1) self.l2_l = self.lrange * (self.lrange + 1) self.kh_l = np.sqrt(self.l2_l) / self.radius self._complex64_save_netCFD = np.dtype( [("real", np.float32), ("imag", np.float32)] ) # print(lmax,mmax,mres,nlat,nlon,flags,polar_opt,nl_order,radius) # no MPI decomposition self.shapeX = self.shapeX_loc = self.shapeX_seq = (self.nlat, self.nlon) self.shapeK = self.shapeK_loc = self.shapeK_seq = (self.nlm,) self.sht_as_arg = self.isht_as_arg = self._zeros_sh = self.create_array_sh(0.0) def vec_from_rotsh(self, rot_sh): return self.uv_from_hdivrotsh(self._zeros_sh, rot_sh) def vec_from_divsh(self, div_sh): return self.uv_from_hdivrotsh(div_sh, self._zeros_sh)
[docs] def produce_str_describing_oper(self): """Produce a string describing the operator.""" return f"lmax{self.lmax}_nlat{self.nlat}_nlon{self.nlon}"
def produce_long_str_describing_oper(self): return "\n".join( ( "Spherical harmonic transforms " "nlat = {} ; nlon = {}".format(self.nlat, self.nlon), "(1 point every {:6.2g} m for the current sphere of radius {})".format( 2 * np.pi * self.radius / self.nlon, self.radius ), "(1 point every {:6.2g} km for the earth)\n".format( 2 * np.pi * radius_earth / self.nlon / 1000 ), ) )
[docs] def idx_lm(self, l, m): """idx_lm(self, l,m)""" if l >= 0 and 0 <= m <= l: return, int(m)) else: raise ValueError("not (l>=0 and m>=0 and m<=l)")
# functions for initialisation of field
[docs] def create_array_sh(self, value=None, dtype=complex): """Create an array representing a field in spectral space.""" if value is None: field_lm = np.empty(self.nlm, dtype) elif value == "rand": field_lm = np.random.randn(self.nlm) + 1.0j * np.random.randn( self.nlm ) elif value == 0: field_lm = np.zeros(self.nlm, dtype) else: field_lm = value * np.ones(self.nlm, dtype) return field_lm
[docs] def create_array_spat(self, value=None): """Create an array representing a field in spatial space.""" if value is None: field = np.empty(self.shapeX) elif value == "rand": field = np.random.randn(self.nlat, self.nlon) elif value == 0: field = np.zeros(self.shapeX) else: field = value * np.ones(self.shapeX) # a spatial array matching a grid build with SHT_PHI_CONTIGUOUS return field
def convert2complex64_save_netCFD(self, f_lm): result = np.empty(f_lm.shape, self._complex64_save_netCFD) result["real"] = f_lm.real result["imag"] = f_lm.imag return result # functions for scalar spherical harmonic transforms (forward and backward) def sht(self, field): field_lm = self.create_array_sh(), field_lm) return field_lm def isht(self, field_lm): field = self.create_array_spat(), field) return field
[docs] def sh_from_spat(self, field, field_lm=None): """Spherical harmonic transform. examples: f_lm = sh_from_spat(f) or if f_lm already exists: sh_from_spat(f, f_lm) """ if field.dtype != np.dtype("float"): field = np.array(field, float) if field_lm is None: field_lm = self.create_array_sh(), field_lm) return field_lm
[docs] def spat_from_sh(self, field_lm, field=None): """Inverse spherical harmonic transform.""" if field_lm.dtype != np.dtype("complex"): field_lm = np.array(field_lm, complex) if field is None: field = self.create_array_spat(), field) return field
[docs] def chrono_sht(self, nb_sht=10): """Microbenchmark forward and inverse SHT, and vorticity, divergence <-> u, v transformations. """ f = np.random.rand(self.nlat, self.nlon) t1 = time() for i in range(nb_sht): f_lm = self.sh_from_spat(f) t2 = time() print( " mean time for 1 forward SHT: {:3.6f} s".format( (t2 - t1) / nb_sht ) ) t1 = time() for i in range(nb_sht): f = self.spat_from_sh(f_lm) t2 = time() print( " mean time for 1 inverse SHT: {:3.6f} s".format( (t2 - t1) / nb_sht ) ) uu = np.random.rand(self.nlat, self.nlon) vv = np.random.rand(self.nlat, self.nlon) t1 = time() for i in range(nb_sht): hdiv, hrot = self.hdivrotsh_from_uv(uu, vv) t2 = time() print( " mean time for hdivrotsh_from_uuvv vectorial SHT: " "{:3.6f} s".format((t2 - t1) / nb_sht) ) t1 = time() for i in range(nb_sht): uu, vv = self.uv_from_hdivrotsh(hdiv, hrot) t2 = time() print( " mean time for uv_from_hdivrotsh vectorial SHT: " "{:3.6f} s".format((t2 - t1) / nb_sht) )
# functions for 2D vectorial spherical harmonic transforms
[docs] def uv_from_hdivrotsh(self, hdiv_lm, hrot_lm, uu=None, vv=None): """ u, v from div, rot (u and v are overwritten) """ if uu is None: uu = self.create_array_spat() vv = self.create_array_spat() uD_lm = self.create_array_sh(0.0) uR_lm = self.create_array_sh(0.0) COND = self.l2_idx > 0 uD_lm[COND] = -hdiv_lm[COND] / self.l2_idx[COND] * self.radius uR_lm[COND] = -hrot_lm[COND] / self.l2_idx[COND] * self.radius, uR_lm, vv, uu) # if self.order_lat == 'south_to_north': # vv[:] = -vv+0 # because SHTns uses colatitude basis return uu, vv
[docs] def hdivrotsh_from_uv(self, uu, vv, hdiv_lm=None, hrot_lm=None): """Compute hdivrotsh from uuvv. (div_lm and rot_lm are overwritten) """ if hdiv_lm is None: hdiv_lm = self.create_array_sh() hrot_lm = self.create_array_sh() # if self.order_lat == 'south_to_north': # vv = -vv # print('order_lat',self.order_lat), uu, hdiv_lm, hrot_lm) # in fact there is uD_lm in hdiv_lm and # uR_lm in hrot_lm # we compute div_lm and rot_lm hdiv_lm[:] = -self.l2_idx * hdiv_lm[:] / self.radius # removed minus hrot_lm[:] = -self.l2_idx * hrot_lm[:] / self.radius return hdiv_lm, hrot_lm
[docs] def uv_from_uDuRsh(self, uD_lm, uR_lm, uu=None, vv=None): """Compute velocities uu, vv from uD, uR (uu and vv are overwritten). """ if uu is None: uu = self.create_array_spat() if vv is None: vv = self.create_array_spat(), uR_lm, vv, uu) # if self.order_lat == 'south_to_north': # vv[:] = -vv+0 # because SHTns uses colatitude basis return uu, vv
[docs] def uDuRsh_from_uv(self, uu, vv, uD_lm=None, uR_lm=None): """Compute helmholtz decomposition of the velocities from uu, vv. (uD_lm and uR_lm are overwritten). """ if uD_lm is None: uD_lm = self.create_array_sh() if uR_lm is None: uR_lm = self.create_array_sh() # if self.order_lat == 'south_to_north': # vv = -vv # print('order_lat', self.order_lat), uu, uD_lm, uR_lm) # removed minus uD_lm[:] = -uD_lm[:] uR_lm[:] = -uR_lm[:] # print(self.radius) return uD_lm, uR_lm
[docs] def hdivrotsh_from_uDuRsh(self, uD_lm, uR_lm, hdiv_lm=None, hrot_lm=None): """Compute horizontal divergence and vertical vorticity spherical harmonics from velocity vector spherical harmonics (hdiv_lm and hrot_lm are overwritten). """ if hdiv_lm is None: hdiv_lm = self.create_array_sh(0.0) # TODO: Maybe can be empty? if hrot_lm is None: hrot_lm = self.create_array_sh(0.0) hdiv_lm[:] = -self.l2_idx * uD_lm / self.radius hrot_lm[:] = self.l2_idx * uR_lm / self.radius return hdiv_lm, hrot_lm
[docs] def uDuRsh_from_hdivrotsh(self, hdiv_lm, hrot_lm, uD_lm=None, uR_lm=None): """Compute velocity vector spherical harmonics from horizontal divergence and vertical vorticity spherical harmonics (uD_lm and uR_lm are overwritten). """ if uD_lm is None: uD_lm = self.create_array_sh(0.0) if uR_lm is None: uR_lm = self.create_array_sh(0.0) COND = self.l2_idx > 0 uD_lm[COND] = -hdiv_lm[COND] / self.l2_idx[COND] * self.radius uR_lm[COND] = +hrot_lm[COND] / self.l2_idx[COND] * self.radius # print(self.radius) return uD_lm, uR_lm
[docs] def gradf_from_fsh(self, f_lm, gradf_lon=None, gradf_lat=None): """gradf from fsh. Compute the gradient of a function f from its spherical harmonic coeff f_lm (gradf_lon and gradf_lat are overwritten) """ if gradf_lon is None: gradf_lon = self.create_array_spat(0) gradf_lat = self.create_array_spat(0) # becareful bug if not 0!!! # We do not use SHsph_to_spat() because it seems that there is a # problem # FIXME: av: What exactly is the problem and does it still exist? Issue link? ####, gradf_lat, gradf_lon) # instead we use SHsphtor_to_spat(...) with tor_lm= zeros_lm # zeros_lm = self.create_array_sh(0.), self._zeros_sh, gradf_lat, gradf_lon) # if self.order_lat == 'south_to_north': # sign_inv_vv = -1 # else: # sign_inv_vv = 1 # sign_inv_vv = 1 gradf_lat[:] = +gradf_lat / self.radius # *sign_inv_vv gradf_lon[:] = +gradf_lon / self.radius # print('radius', self.radius) return gradf_lon, gradf_lat
def spat3d_from_sh3d(self, f_lm_3d, f3d=None, dtype=np.float64): nvert = f_lm_3d.shape[0] if f3d is None: f3d = np.empty([nvert, self.nlat, self.nlon], dtype) for iz in range(nvert): f2D = self.spat_from_sh(f_lm_3d[iz]) f3d[iz] = f2D return f3d def sh3d_from_spat3d(self, f3d, f_lm3d=None, dtype=np.complex128): nvert = f3d.shape[0] if f_lm3d is None: f_lm3d = np.empty([nvert, self.nlm], dtype) for iz in range(nvert): f_lm3d[iz] = self.sh_from_spat(f3d[iz]) return f_lm3d # print functions... def print_info(self): print(self.produce_str_describing_oper()) print("") def print_array_sh(self, field_lm, name_field_lm, lmax_print=10): lmax_print = np.min([lmax_print, self.lmax]) print(name_field_lm, "= ") for n in range(lmax_print + 1): print(f"n={n:2d} ", end="") for m in range(n + 1): temp_idx = m * self.lmax - (m + 1) * m / 2 + m + n sys.stdout.write("{:8.3g} ".format(np.abs(field_lm[temp_idx]))) print("") # functions for the computation of spectra and co-spectra
[docs] def sum_wavenumbers(self, field_lm): """Convenient function to look more like a pseudo-spectral Operators""" return field_lm.sum()
[docs] def _spectrum_from_array_desh(self, array_desh): """Compute spectrum(l) from array_desh(ilm)""" spectrum = np.zeros(self.lmax + 1) for ilm in range(0, self.nlm): spectrum[self.l_idx[ilm]] += array_desh[ilm] return spectrum
[docs] def _array_desh_from_sh(self, field_lm, key_field): """Compute the array_desh (density of energy) from an field_lm""" if key_field[:1] == "u" or key_field[:1] == "v": array_desh = abs(field_lm) ** 2 / 2.0 elif ( key_field[:1] == "T" or key_field[:2] == "ps" or key_field[:1] == "o" ): array_desh = abs(field_lm) ** 2 / 2.0 elif key_field[:4] == "beta": array_desh = self.create_array_sh(0.0, float) array_desh = abs(field_lm) ** 2 elif key_field[:2] == "uD" or key_field[:2] == "uR": array_desh = self.l2_idx * (abs(field_lm) ** 2) / 2 elif key_field[:4] == "hdiv" or key_field[:4] == "hrot": array_desh = self.create_array_sh(0.0, float) COND = self.l2_idx > 0 array_desh[COND] = ( self.radius**2 / self.l2_idx[COND] * abs(field_lm[COND]) ** 2 / 2 ) else: raise ValueError("key_field is not correct") array_desh[self.m_idx > 0] = 2 * array_desh[self.m_idx > 0] return array_desh
[docs] def spectrum_from_sh(self, field_lm, key_field): """compute spectrum from field_lm""" array_desh = self._array_desh_from_sh(field_lm, key_field) spectrum = self._spectrum_from_array_desh(array_desh) return spectrum
[docs] def cospectrum_from_2fieldssh(self, f_lm, g_lm): """compute cospectrum(l) from f_lm(ilm) and g_lm(ilm)""" cospectrum = np.zeros(self.lmax + 1) array_desh = f_lm.conjugate() * g_lm + f_lm * g_lm.conjugate() array_desh = array_desh.real array_desh[self.m_idx == 0] = array_desh[self.m_idx == 0] / 2 for ilm in range(self.nlm): cospectrum[self.l_idx[ilm]] += array_desh[ilm] return cospectrum
[docs] def cospectrum_from_2vectorssh(self, f_lon_lm, f_lat_lm, g_lon_lm, g_lat_lm): """compute cospectrum(l)...""" cospectrum = np.zeros(self.lmax + 1) array_desh = ( f_lon_lm.conjugate() * g_lon_lm + f_lon_lm * g_lon_lm.conjugate() + f_lat_lm.conjugate() * g_lat_lm + f_lat_lm * g_lat_lm.conjugate() ) array_desh = array_desh.real array_desh[self.m_idx == 0] = array_desh[self.m_idx == 0] / 2 for ilm in range(self.nlm): cospectrum[self.l_idx[ilm]] += array_desh[ilm] return cospectrum
[docs] def cospectrum_from_2fieldssh2(self, f_lm, g_lm): """compute cospectrum(l)...""" cospectrum = np.zeros(self.lmax + 1) array_desh = f_lm.conjugate() * g_lm + f_lm * g_lm.conjugate() array_desh = array_desh.real array_desh[self.m_idx == 0] = array_desh[self.m_idx == 0] / 2 array_desh2 = self.create_array_sh(0.0, float) COND = self.l2_idx > 0 array_desh2[COND] = self.radius**2 / self.l2_idx[COND] * array_desh[COND] for ilm in range(self.nlm): cospectrum[self.l_idx[ilm]] += array_desh2[ilm] return cospectrum
[docs] def cospectrum_from_2divrotsh(self, hdiva_lm, hrota_lm, hdivb_lm, hrotb_lm): """compute cospectrum(l)...""" cospectrum = np.zeros(self.lmax + 1) array_desh = ( hrota_lm.conjugate() * hrotb_lm + hdiva_lm.conjugate() * hdivb_lm ) array_desh = array_desh.real array_desh[self.m_idx == 0] = array_desh[self.m_idx == 0] / 2 array_desh2 = self.create_array_sh(0.0, float) COND = self.l2_idx > 0 array_desh2[COND] = self.radius**2 / self.l2_idx[COND] * array_desh[COND] for ilm in range(self.nlm): cospectrum[self.l_idx[ilm]] += array_desh2[ilm] # print('cospe',cospectrum.shape) return cospectrum
[docs] def dealiasing(self, field_lm): """Convenient function for fluidsim""" return field_lm
if __name__ == "__main__": obj = EasySHT()