Source code for cosipy.utilities.wrf2cosipy.wrf2cosipy

"""
Create 2D input from WRF data.

Reads the input data (model forcing) and writes the output to
a netcdf file. Edit the configuration by supplying a valid .toml file.
See the sample ``utilities_config.toml`` for more information.

Usage:

From source:
``python -m cosipy.utilities.wrf2cosipy.wrf2cosipy -i <path> -o <path> -u [<path] [-b <date>] [-e <date>]``

Entry point:
``cosipy-wrf2cosipy -i <path> -o <path> -u [<path] [-b <date>] [-e <date>]``

Options and arguments:

Required arguments:
    -i, --input <path>      Path to WRF file.
    -o, --output <path>     Path to the resulting COSIPY file.

Optional arguments:
    -u, --u <path>                  Relative path to utilities'
                                        configuration file.
    -b, --start_date <yyyymmdd>     Start date.
    -e, --end_date <yyyymmdd>       End date. 
"""

import argparse

import numpy as np
import pandas as pd
import xarray as xr
import xrspatial as xrs

from cosipy.constants import Constants
from cosipy.utilities.config_utils import UtilitiesConfig

_args = None
_cfg = None


[docs] def create_input(wrf_file, cosipy_file, start_date, end_date): """Create an input dataset from WRF data.""" print('-------------------------------------------') print(f"Create input\nRead input file {wrf_file}") # Read WRF file ds = xr.open_dataset(wrf_file) # Rename the time coordinate ds = ds.rename({'XTIME':'Time'}) # Re-format timestamp (only hour and minutes, no seconds) ds = ds.assign_coords(Time=pd.to_datetime(ds['Time'].values).strftime('%Y-%m-%dT%H-%M')) # Select the specified period if ((start_date!=None) & (end_date!=None)): ds = ds.sel(Time=slice(start_date,end_date)) # Create COSIPY input file dso = xr.Dataset() dso.coords['time'] = (('time'), ds.Time.values) dso.coords['lat'] = (('south_north', 'west_east'), ds.XLAT[0].values) dso.coords['lon'] = (('south_north', 'west_east'), ds.XLONG[0].values) # Add variables to file dso = add_variable_along_latlon(dso, ds.HGT[0].values, 'HGT', 'm', 'Elevation') dso = add_variable_along_timelatlon(dso, ds.T2.values, 'T2', 'm', 'Temperature at 2 m') dso = add_variable_along_timelatlon(dso, wrf_rh(ds.T2.values, ds.Q2.values, ds.PSFC.values), 'RH2', '%', 'Relative humidity at 2 m') dso = add_variable_along_timelatlon(dso, ds.SWDOWN.values, 'G', 'W m^-2', 'Incoming shortwave radiation') dso = add_variable_along_timelatlon(dso, ds.GLW.values, 'LWin', 'W m^-2', 'Incoming longwave radiation') dso = add_variable_along_timelatlon(dso, ds.PSFC.values/100.0, 'PRES', 'hPa', 'Atmospheric Pressure') dso = add_variable_along_latlon(dso, ds.SNOWH[0], 'SNOWHEIGHT', 'm', 'Initial snowheight') dso = add_variable_along_latlon(dso, ds.SNOW[0], 'SWE', 'kg m^-2', 'Snow Water Equivalent') dso = add_variable_along_latlon(dso, ds.SNOWC[0], 'SNOWC', '-', 'Flag indicating snow coverage (1 for snow cover)') dso = add_variable_along_latlon(dso, ds.LU_INDEX[0], 'LU_INDEX', '-', 'Land use category') dso = add_variable_along_latlon(dso, ds.TSK[0], 'TSK', 'K', 'Skin temperature') # Wind velocity at 2 m (assuming neutral stratification) z = _cfg.constants["hu"] # Height of measurement z0 = 0.0040 # Roughness length for momentum umag = np.sqrt(ds.V10.values**2+ds.U10.values**2) # Mean wind velocity U2 = umag * (np.log(2 / z0) / np.log(10 / z0)) dso = add_variable_along_timelatlon(dso, U2, 'U2', 'm s^-1', 'Wind velocity at 2 m') # Add glacier mask to file (derived from the land use category) mask = ds.LU_INDEX[0].values mask[mask!=_cfg.constants["lu_class"]] = 0 mask[mask==_cfg.constants["lu_class"]] = 1 dso = add_variable_along_latlon(dso, mask, 'MASK', '-', 'Glacier mask') # Derive precipitation from accumulated values rrr = np.full_like(ds.T2, 0.) for t in np.arange(1,len(dso.time)): rrr[t,:,:] = (ds.RAINNC[t,:,:] + ds.RAINC[t,:,:]) - (ds.RAINNC[t-1,:,:] + ds.RAINC[t-1,:,:]) dso = add_variable_along_timelatlon(dso, rrr, 'RRR', 'mm', 'Total precipitation') # Calc fresh snow density if (Constants.densification_method!='constant'): density_fresh_snow = np.maximum(109.0+6.0*(ds.T2.values-273.16)+26.0*np.sqrt(U2), 50.0) else: density_fresh_snow = Constants.constant_density # Derive snowfall from accumulated values snowf = np.full_like(ds.T2, 0.) for t in np.arange(1,len(dso.time)): snowf[t,:,:] = ds.SNOWNC[t,:,:]-ds.SNOWNC[t-1,:,:] snowf = (snowf/1000.0)*(Constants.water_density/density_fresh_snow) dso = add_variable_along_timelatlon(dso, snowf, 'SNOWFALL', 'm', 'Snowfall') # Compute slope & aspect from HGT DX = ds.attrs.get('DX').astype("float") DY = ds.attrs.get('DY').astype("float") HGT = ds.HGT[0] HGT.attrs['res'] = (DX,DY) slope = xrs.slope(HGT) aspect = xrs.aspect(HGT) dso = add_variable_along_latlon(dso, slope, 'SLOPE', 'degrees', 'Slope') dso = add_variable_along_latlon(dso, aspect, 'ASPECT', 'degrees', 'Aspect') # Write file dso.to_netcdf(cosipy_file) print(dso.dims['south_north']) # Do some checks check(dso.T2,316.16,223.16) check(dso.RH2,100.0,0.0) check(dso.SNOWFALL,1.0,0.0) check(dso.U2,50.0,0.0) check(dso.G,1600.0,0.0) check(dso.PRES,1080.0,200.0) check(dso.LWin,500.0,100.0)
[docs] def add_variable_along_latlon(ds, var, name, units, long_name): """Add spatial data to a dataset.""" ds[name] = (('south_north','west_east'), var) ds[name].attrs['units'] = units ds[name].attrs['long_name'] = long_name ds[name].encoding['_FillValue'] = -9999 return ds
[docs] def add_variable_along_timelatlon(ds, var, name, units, long_name): """Add spatiotemporal data to a dataset.""" ds[name] = (('time','south_north','west_east'), var) ds[name].attrs['units'] = units ds[name].attrs['long_name'] = long_name return ds
[docs] def check(field, max_bound, min_bound): """Check the validity of the input data.""" if np.nanmax(field) > max_bound or np.nanmin(field) < min_bound: msg = f"{str.capitalize(field.name)} MAX: {np.nanmax(field):.2f} MIN: {np.nanmin(field):.2f}" print( f"\n\nWARNING! Please check the data, it seems they are out of a reasonable range {msg}" )
[docs] def wrf_rh(T2, Q2, PSFC): """Get the relative humidity.""" pq0 = 379.90516 a2 = 17.2693882 a3 = 273.16 a4 = 35.86 rh = Q2 * 100 / ( (pq0 / PSFC) * np.exp(a2 * (T2 - a3) / (T2 - a4)) ) rh[rh>100.0] = 100.0 rh[rh<0.0] = 0.0 return rh
[docs] def get_user_arguments(parser: argparse.ArgumentParser) -> argparse.Namespace: """Get user arguments for converting AWS data. Args: parser: An initialised argument parser. Returns: User arguments for conversion. """ parser.description = "Create 2D input file from WRF file." parser.prog = __package__ # Required arguments parser.add_argument( "-i", "--input", dest="wrf_file", type=str, metavar="<path>", required=True, help="Path to WRF file", ) parser.add_argument( "-o", "--output", dest="cosipy_file", type=str, metavar="<path>", required=True, help="Path to the resulting COSIPY file", ) # Optional arguments parser.add_argument( "-b", "--start_date", dest="start_date", const=None, help="Start date" ) parser.add_argument( "-e", "--end_date", dest="end_date", const=None, help="End date" ) arguments = parser.parse_args() return arguments
[docs] def load_config(module_name: str) -> tuple: """Load configuration for module. Args: module_name: name of this module. Returns: User arguments and configuration parameters. """ params = UtilitiesConfig() arguments = get_user_arguments(params.parser) params.load(arguments.utilities_path) params = params.get_config_expansion(name=module_name) return arguments, params
[docs] def main(): global _args global _cfg _args, _cfg = load_config(module_name="wrf2cosipy") create_input(_args.wrf_file, _args.cosipy_file, _args.start_date, _args.end_date)
if __name__ == "__main__": main()