Source code for cosipy.modules.surfaceTemperature

import numpy as np
from numba import njit
from scipy.optimize import minimize, newton

from cosipy.config import Config
from cosipy.constants import Constants
from cosipy.modules.secant import secant

zlt1 = Constants.zlt1
zlt2 = Constants.zlt2
saturation_water_vapour_method = Constants.saturation_water_vapour_method
zero_temperature = Constants.zero_temperature
lat_heat_vaporize = Constants.lat_heat_vaporize
lat_heat_sublimation = Constants.lat_heat_sublimation
sigma = Constants.sigma
stability_correction = Constants.stability_correction
spec_heat_air = Constants.spec_heat_air
spec_heat_water = Constants.spec_heat_water
sfc_temperature_method = Constants.sfc_temperature_method
surface_emission_coeff = Constants.surface_emission_coeff
water_density = Constants.water_density
WRF_X_CSPY = Config.WRF_X_CSPY


[docs] def update_surface_temperature(GRID, dt, z, z0, T2, rH2, p, SWnet, u2, RAIN, SLOPE, LWin=None, N=None): """Solve the surface temperature and get the surface fluxes. Implemented minimisation methods: - **Newton**: Recommended, and fastest. - **Secant**: Potentially faster for coupled WRF simulations. - **L-BFGS-B**: Slower, different results since 02/2024. - **SLSQP**: Slower, different results since 02/2024. Args: GRID (Grid): Glacier data structure. dt (float): Integration time [s] - can vary in WRF_X_CSPY. z (float): Measurement height [m] - varies in WRF_X_CSPY. z0 (float): Roughness length [m]. T2 (float): Air temperature [K]. rH2 (float): Relative humidity [%]. p (float): Air pressure [hPa]. SWnet (float): Incoming shortwave radiation [|W m^-2|]. u2 (float): Wind velocity [|m s^-1|]. RAIN (float): Rain [mm]. SLOPE (float): Slope of the surface [degree]. LWin (float): Incoming longwave radiation [|W m^-2|]. N (float): Fractional cloud cover [-]. Returns: tuple: :res.fun: Minimisation function. :res.x: Surface temperature [K]. :Li: Incoming longwave radiation [|W m^-2|]. :Lo: Outgoing longwave radiation [|W m^-2|]. :H: Sensible heat flux [|W m^-2|]. :L: Latent heat flux [|W m^-2|]. :B: Ground heat flux [|W m^-2|]. :Qrr: Rain heat flux [|W m^-2|]. :rho: Air density [|kg m^-3|]. :Lv: Latent heat of vaporization [|J kg^-1|]. :MOL: Monin-Obukhov length [m]. :Cs_t: Stanton number [-]. :Cs_q: Dalton number [-]. :q0: Mixing ratio at the surface [|kg kg^-1|]. :q2: Mixing ratio at measurement height [|kg kg^-1|]. Raises: NotImplementedError: Invalid method for minimizing the residual. """ # Interpolate subsurface temperatures to selected subsurface depths for GHF computation B_Ts = interp_subT(GRID) # Lower bound for surface temperature lower_bnd_ts = 220. upper_bnd_ts = 330. initial_guess = min(GRID.get_node_temperature(0), 270) if sfc_temperature_method in ['L-BFGS-B', 'SLSQP']: # Get surface temperature by minimizing the energy balance function (SWnet+Li+Lo+H+L=0) res = minimize(eb_optim, initial_guess, method=sfc_temperature_method, bounds=((lower_bnd_ts, upper_bnd_ts),),tol=1e-2, args=(GRID, dt, z, z0, T2, rH2, p, SWnet, u2, RAIN, SLOPE, B_Ts, LWin, N)) elif sfc_temperature_method == 'Newton': try: res = newton(eb_optim, np.array([initial_guess]), tol=1e-2, maxiter=50, args=(GRID, dt, z, z0, T2, rH2, p, SWnet, u2, RAIN, SLOPE, B_Ts, LWin, N)) check_residual_clip(residual=res, lower_bound=lower_bnd_ts) except (RuntimeError,ValueError): # Workaround for non-convergence and unboundedness res = minimize(eb_optim, initial_guess, method='SLSQP', bounds=((lower_bnd_ts, upper_bnd_ts),),tol=1e-2, args=(GRID, dt, z, z0, T2, rH2, p, SWnet, u2, RAIN, SLOPE, B_Ts, LWin, N)) elif sfc_temperature_method == 'Secant': try: res = call_secant_jitted( GRID, dt, z, z0, T2, rH2, p, SWnet, u2, RAIN, SLOPE, B_Ts, lower_bnd_ts, LWin, N, ) check_residual_clip(residual=res, lower_bound=lower_bnd_ts) except (RuntimeError, ValueError): # Workaround for non-convergence and unboundedness res = minimize(eb_optim, initial_guess, method='SLSQP', bounds=((lower_bnd_ts, zero_temperature),), tol=1e-2, args=(GRID, dt, z, z0, T2, rH2, p, SWnet, u2, RAIN, SLOPE, B_Ts, LWin, N)) else: raise NotImplementedError('Invalid method for minimizing the residual.') # Set surface temperature if isinstance(res, np.ndarray): surface_temperature = min(np.array([zero_temperature]), res) minimisation_function = None else: surface_temperature = min(np.array([zero_temperature]), res.x) minimisation_function = res.fun GRID.set_node_temperature(0, surface_temperature) (Li, Lo, H, L, B, Qrr, rho, Lv, MOL, Cs_t, Cs_q, q0, q2) = eb_fluxes(GRID, surface_temperature, dt, z, z0, T2, rH2, p, u2, RAIN, SLOPE, B_Ts, LWin, N) # Consistency check if (surface_temperature > zero_temperature) or (surface_temperature < lower_bnd_ts): print('Surface temperature is outside bounds:', GRID.get_node_temperature(0)) # Return fluxes return minimisation_function, surface_temperature, Li, Lo, H, L, B, Qrr, rho, Lv, MOL, Cs_t, Cs_q, q0, q2
[docs] @njit def check_residual_clip(residual, lower_bound: float) -> bool: """Raise error if the surface temperature is out of bounds. Args: residual (float or np.ndarray): Residual from minimisation. lower_bound: Minimum allowable surface temperature. Returns: True if surface temperature solution is within bounds. Raises: ValueError: Surface temperature solution is out of bounds. """ if residual < lower_bound: raise ValueError("TS solution is out of bounds.") return True
[docs] @njit def call_secant_jitted(GRID, dt, z, z0, T2, rH2, p, SWnet, u2, RAIN, SLOPE, B_Ts, lower_bnd_ts, LWin=None, N=None, initial_guess=None): """Jitted call to secant.py. Returns: np.ndarray: Updated surface temperature. Raises: ValueError: Surface temperature solution is out of bounds. """ if initial_guess is None: initial_guess = np.array([min(GRID.get_node_temperature(0), 270)]) res = secant(eb_optim, initial_guess, tol=1e-2, maxiter=50, args=(GRID, dt, z, z0, T2, rH2, p, SWnet, u2, RAIN, SLOPE, B_Ts, LWin, N)) if res.item() < lower_bnd_ts: raise ValueError("TS Solution is out of bounds") return res
[docs] @njit def get_subsurface_temperature(GRID, cumulative_depth: np.ndarray, zlt: float): """Get subsurface temperature. Args: GRID (Grid): Glacier data structure. cumulative_depth: Cumulative glacier layer heights [m]. zlt: Interpolation depth [m]. Returns: float: Subsurface temperature at the interpolation depth. """ # Find indexes of two depths for temperature interpolation idx1_depth = np.abs(cumulative_depth - zlt).argmin() depth = cumulative_depth.flat[idx1_depth] if depth > zlt: idx2_depth = idx1_depth - 1 else: idx2_depth = idx1_depth + 1 temperature_idx1 = GRID.get_node_temperature(idx1_depth) t_z = temperature_idx1 + ( (temperature_idx1 - GRID.get_node_temperature(idx2_depth)) / (cumulative_depth[idx1_depth] - cumulative_depth[idx2_depth]) ) * (zlt - cumulative_depth[idx1_depth]) return t_z
[docs] @njit def interp_subT(GRID) -> np.ndarray: """Interpolate subsurface temperature to depths used for ground heat flux. Returns: Interpolated subsurface temperatures at requested depths. """ # Cumulative layer depths layer_heights_cum = np.cumsum(np.array(GRID.get_height())) t_z1 = get_subsurface_temperature(GRID, layer_heights_cum, zlt1) t_z2 = get_subsurface_temperature(GRID, layer_heights_cum, zlt2) return np.array([t_z1, t_z2])
[docs] @njit def get_saturation_vapor_pressure(T_0: float, T_2: float) -> tuple: """Get saturation vapour pressure. Args: T_0: Surface temperature [K]. T_2: 2m air temperature [K]. Returns: tuple[float, float]: Surface and 2m saturation vapour pressure. """ if saturation_water_vapour_method == "Sonntag90": Ew = method_EW_Sonntag(T_2) Ew0 = method_EW_Sonntag(T_0) else: msg = ( f"Method for saturation water vapour", f"{saturation_water_vapour_method}", "not available, using default", ) print(" ".join(msg)) Ew = method_EW_Sonntag(T_2) Ew0 = method_EW_Sonntag(T_0) return Ew, Ew0
[docs] @njit def eb_fluxes(GRID, T0, dt, z, z0, T2, rH2, p, u2, RAIN, SLOPE, B_Ts, LWin=None, N=None): """Get the surface fluxes and apply the Monin-Obukhov stability correction. Args: GRID (Grid): Glacier data structure. T0 (float): Surface temperature [K]. dt (float): Integration time [s]. z (float): Measurement height [m]. z0 (float): Roughness length [m]. T2 (float): Air temperature [K]. rH2 (float): Relative humidity [%]. p (float): Air pressure [hPa]. u2 (float): Wind velocity [|m s^-1|]. RAIN (float): Rain [mm]. SLOPE (float): Slope of the surface [degree]. B_Ts (np.ndarray): Subsurface temperatures at interpolation depths [K]. LWin (float): Incoming longwave radiation [|W m^-2|]. N (float): Fractional cloud cover [-]. Returns: tuple: :Li: Incoming longwave radiation [|W m^-2|]. :Lo: Outgoing longwave radiation [|W m^-2|]. :H: Sensible heat flux [|W m^-2|]. :LE: Latent heat flux [|W m^-2|]. :B: Ground heat flux [|W m^-2|]. :QRR: Rain heat flux [|W m^-2|]. :rho: Air density [|kg m^-3|]. :Lv: Latent heat of vaporization [|J kg^-1|]. :L: Monin-Obukhov length [m]. :Cs_t: Stanton number [-]. :Cs_q: Dalton number [-]. :q0: Mixing ratio at the surface [|kg kg^-1|]. :q2: Mixing ratio at measurement height [|kg kg^-1|]. """ # Saturation vapour pressure (hPa) Ew, Ew0 = get_saturation_vapor_pressure(T_0=T0, T_2=T2) # latent heat of vaporization if T0 >= zero_temperature: Lv = lat_heat_vaporize else: Lv = lat_heat_sublimation # Water vapour at height z in m (hPa) Ea = (rH2 * Ew) / 100.0 # Calc incoming longwave radiation, if not available Ea has to be in Pa (Konzelmann 1994) # numba has no implementation for power(none, int) if (LWin is None) and (N is not None): eps_cs = 0.23 + 0.433 * np.power(100*Ea/T2,1.0/8.0) eps_tot = eps_cs * (1 - np.power(N,2)) + 0.984 * np.power(N,2) Li = eps_tot * sigma * np.power(T2,4.0) else: # otherwise use LW data from file Li = LWin # turbulent Prandtl number Pr = 0.8 # Mixing Ratio at surface and at measurement height or calculate with other formula? 0.622*e/p = q q2 = (rH2 * 0.622 * (Ew / (p - Ew))) / 100.0 q0 = (100.0 * 0.622 * (Ew0 / (p - Ew0))) / 100.0 # Air density rho = (p*100.0) / (287.058 * (T2 * (1 + 0.608 * q2))) # Bulk transfer coefficient z0t = z0/100 # Roughness length for sensible heat z0q = z0/10 # Roughness length for moisture L = None # Avoid recalculating slope_radians = np.radians(SLOPE) cos_slope_radians = np.cos(slope_radians) # Monin-Obukhov stability correction if stability_correction == 'MO': L = 0.0 H0 = T0*0. + np.inf # numba: consistent typing of H0 diff = np.inf optim = True niter = 0 # Optimize Obukhov length while optim: # ustar with initial condition of L == x ust = ustar(u2,z,z0,L) # Sensible heat flux for neutral conditions delta_phi_tq = phi_tq(z, L) - phi_tq(z0, L) Cd = np.power(0.41,2.0) / np.power(np.log(z/z0) - phi_m(z,L) - phi_m(z0,L),2.0) Cs_t = 0.41*np.sqrt(Cd) / (np.log(z/z0t) - delta_phi_tq) Cs_q = 0.41*np.sqrt(Cd) / (np.log(z/z0q) - delta_phi_tq) # Surface heat flux H = get_sensible_heat_flux(rho, Cs_t, u2, (T2 - T0), cos_slope_radians) # Latent heat flux LE = get_latent_heat_flux(rho, Lv, Cs_q, u2, (q2 - q0), cos_slope_radians) # Monin-Obukhov length L = MO(rho, ust, T2, H) # Heat flux differences between iterations diff = np.abs(H0-H) # Termination criterion if (diff<1e-1) | (niter>5): optim = False niter = niter+1 # Store last heat flux in H0 H0 = H # Richardson-Number stability correction elif stability_correction == 'Ri': # Bulk transfer coefficient Cs_t = np.power(0.41,2.0) / ( np.log(z/z0) * np.log(z/z0t) ) # Stanton-Number Cs_q = np.power(0.41,2.0) / ( np.log(z/z0) * np.log(z/z0q) ) # Dalton-Number # Bulk Richardson number Ri = 0 if u2 != 0: Ri = ( (9.81 * (T2 - T0) * z) / (T2 * np.power(u2, 2)) ).item() #numba can't compare literal & array below # Stability correction phi = 1 if 0.01 < Ri <= 0.2: phi = np.power(1-5*Ri,2) elif Ri > 0.2: phi = 0.0 # Sensible heat flux H = phi * get_sensible_heat_flux(rho, Cs_t, u2, (T2 - T0), cos_slope_radians) # Latent heat flux LE = phi * get_latent_heat_flux(rho, Lv, Cs_q, u2, (q2 - q0), cos_slope_radians) else: msg = f"Stability correction {stability_correction} is not supported." raise ValueError(msg) # Outgoing longwave radiation Lo = -surface_emission_coeff * sigma * np.power(T0, 4.0) # Get thermal conductivity lam = GRID.get_node_thermal_conductivity(0) # Ground heat flux hminus = zlt1 hplus = zlt2 - hminus # avoid namespace collision with zlt1 Tz1, Tz2 = B_Ts B = lam * ((hminus/(hplus+hminus)) * ((Tz2-Tz1)/hplus) + (hplus/(hplus+hminus)) * ((Tz1-T0)/hminus)) # Rain heat flux QRR = water_density * spec_heat_water * (RAIN/1000/dt) * (T2 - T0) # Return surface fluxes # Numba: No implementation of function Function(<class 'float'>) found for signature: >>> float(array(float64, 1d, C)) # fmt: off return (Li.item(), Lo.item(), H.item(), LE.item(), B.item(), QRR.item(), rho, Lv, L, Cs_t, Cs_q, q0, q2)
# fmt: on
[docs] @njit def get_sensible_heat_flux( rho: float, Cs_t: float, u2: float, dT: float, cos_slope: float ) -> float: """Get sensible heat flux. Args: rho: Air density, [|kg m^-3|]. Cs_t: Stanton number, [-]. u2: Wind velocity, [|m s^-1|]. dT: Difference in temperature between the surface and measurement height, [K]. cos_slope: Cosine of slope angle, [|rad|]. Returns: Sensible heat flux, [|W m^-2|]. """ return rho * spec_heat_air * Cs_t * u2 * dT * cos_slope
[docs] @njit def get_latent_heat_flux( rho: float, Lv: float, Cs_q: float, u2: float, dq: float, cos_slope: float ) -> float: """Get latent heat flux. Args: rho: Air density, [|kg m^-3|]. Lv: Latent heat of vapourisation, [|J kg^-1|]. Cs_q: Dalton number, [-]. u2: Wind velocity, [|m s^-1|]. dq: Difference in mixing ratio between the surface and measurement height. cos_slope: Cosine of slope angle, [|rad|]. Returns: Latent heat flux, [|W m^-2|]. """ return rho * Lv * Cs_q * u2 * dq * cos_slope
[docs] @njit def phi_m_stable(z: float, L: float) -> float: """Get integrated stability function for momentum, stable conditions. Args: z: Height, [m]. L: Obukhov length, [m]. Returns: Integrated stability function for momentum under stable conditions. """ zeta = z / L if (zeta > 0.0) & (zeta <= 1.0): # weak stability return -5 * zeta elif zeta > 1.0: # strong stability if WRF_X_CSPY: return -5.0 # limit stability parameter (z/L <= 1.) following Noah-MP LSM approach else: return (1 - 5) * (1 + np.log(zeta)) - zeta else: return 0.0
[docs] @njit def phi_m(z: float, L: float) -> float: """Get the integrated stability function for momentum. Args: z: Height, [m]. L: Obukhov length, [m]. Returns: Integrated stability function for momentum. """ if L > 0: return phi_m_stable(z, L) elif L < 0: x = np.power((1 - 16 * z / L), 0.25) return ( 2 * np.log((1 + x) / 2.0) + np.log((1 + np.power(x, 2.0)) / 2.0) - 2 * np.arctan(x) + np.pi / 2.0 ) else: return 0.0
[docs] @njit def phi_tq(z: float, L: float) -> float: """Stability function for the heat and moisture flux.""" if L > 0: return phi_m_stable(z, L) elif L < 0: x = np.power((1 - 19.3 * z / L), 0.25) return 2 * np.log((1 + np.power(x, 2.0)) / 2.0) else: return 0.0
[docs] @njit def ustar(u2: float, z: float, z0: float, L: float) -> float: """Get the friction velocity.""" return (0.41 * u2) / (np.log(z / z0) - phi_m(z, L))
[docs] @njit def MO(rho, ust, T2, H) -> float: """Get the Monin-Obukhov length.""" if H!=0: return ( (rho * spec_heat_air * np.power(ust, 3) * T2) / (0.41 * 9.81 * H) ).item() # numba: expects a float else: return 0.0
# fmt: off
[docs] @njit def eb_optim( T0: float, GRID, dt: int, z: float, z0: float, T2: float, rH2: float, p: float, SWnet: float, u2: float, RAIN: float, SLOPE: float, B_Ts: np.ndarray, LWin: float = None, N: float = None, ) -> float: """Optimization function to solve for surface temperature T0. Returns: Minimised residual. """ # Get surface fluxes for surface temperature T0 (Li, Lo, H, L, B, Qrr, _, _, _, _, _, _, _) = eb_fluxes( GRID, T0, dt, z, z0, T2, rH2, p, u2, RAIN, SLOPE, B_Ts, LWin, N ) # Return the residual (minimized by the optimization function) if sfc_temperature_method in ['Newton', 'Secant']: return SWnet + Li + Lo + H + L + B + Qrr else: return np.abs(SWnet + Li + Lo + H + L + B + Qrr)
# fmt: on
[docs] @njit def method_EW_Sonntag(T: float) -> float: """Get the saturation vapor pressure. Args: T: Temperature [K]. Returns: Saturation vapor pressure. """ if T >= 273.16: # over water Ew = 6.112 * np.exp((17.67 * (T - 273.16)) / (T - 29.66)) else: # over ice Ew = 6.112 * np.exp((22.46 * (T - 273.16)) / (T - 0.55)) return Ew