Source code for cosipy.utilities.aws2cosipy.aws2cosipy
"""
Reads the input data (model forcing) and writes the output to a netCDF
file. It supports point models with ``create_1D_input`` and distributed
simulations with ``create_2D_input``.
The 1D input function works without a static file, in which the static
variables are created.
Edit the configuration by supplying a valid .toml file - this includes
lapse rates for both cases. See the sample ``utilities_config.toml`` for
more information.
Usage:
From source:
``python -m cosipy.utilities.aws2cosipy.aws2cosipy -i <input> -o <output> -s <static> [-u <path>] [-b <date>] [-e <date>]``
Entry point:
``cosipy-aws2cosipy -i <input> -o <output> -s <static> [-u <path>] [-b <date>] [-e <date>]``
Options and arguments:
Required arguments:
-i, --input <path> Path to .csv file with meteorological data.
-o, --output <path> Path to the resulting COSIPY netCDF file.
-s, --static_file <path> Path to static file with DEM, slope etc.
Optional arguments:
-u, --u <path> Relative path to utilities' configuration
file.
-b, --start_date <int> Start date.
-e, --end_date <int> End date.
--xl <float> Left longitude value of the subset.
--xr <float> Right longitude value of the subset.
--yl <float> Lower latitude value of the subset.
--yu <float> Upper latitude value of the subset.
"""
import argparse
from itertools import product
import dateutil
import netCDF4 as nc
import numpy as np
import pandas as pd
import xarray as xr
import cosipy.modules.radCor as mod_radCor
from cosipy.utilities.config_utils import UtilitiesConfig
_args = None
_cfg = None
[docs]
def read_input_file(input_path: str) -> tuple:
"""Read input data, parse dates, and convert to a dataframe.
Args:
input_path: Path to input .csv file.
Returns:
tuple: Dataframe of input data and date parser.
"""
print(f"{'-' * 43}")
print(f"Create input\nRead input file {input_path}")
date_parser = lambda x: dateutil.parser.parse(x, ignoretz=True)
dataframe = pd.read_csv(
input_path,
delimiter=_cfg.coords["delimiter"],
index_col=["TIMESTAMP"],
parse_dates=["TIMESTAMP"],
na_values="NAN",
date_parser=date_parser,
)
return dataframe
[docs]
def convert_to_numeric(series: pd.Series) -> pd.Series:
"""Convert series to numeric type."""
series = series.apply(pd.to_numeric, errors="coerce")
return series
[docs]
def set_order_and_type(
dataframe: pd.DataFrame, replace_pressure: bool = False
) -> pd.DataFrame:
"""Set dataframe order and convert to numeric type.
Args:
dataframe: Contains input data.
replace_pressure: Set pressure data to 660 hPa if no pressure
data is available.
Returns:
Ordered dataframe.
"""
for name in ["T2_var", "RH2_var", "U2_var", "G_var", "PRES_var"]:
dataframe[_cfg.names[name]] = convert_to_numeric(
series=dataframe[_cfg.names[name]]
)
if _cfg.names["RRR_var"] in dataframe:
dataframe[_cfg.names["RRR_var"]] = convert_to_numeric(
dataframe[_cfg.names["RRR_var"]]
)
if (replace_pressure) and (_cfg.names["PRES_var"] not in dataframe):
dataframe[_cfg.names["PRES_var"]] = 660.00
if (_cfg.names["LWin_var"] not in dataframe) and (
_cfg.names["N_var"] not in dataframe
):
raise ValueError(
"No data for either incoming longwave radiation or cloud cover."
)
elif _cfg.names["LWin_var"] in dataframe:
dataframe[_cfg.names["LWin_var"]] = convert_to_numeric(
dataframe[_cfg.names["LWin_var"]]
)
elif _cfg.names["N_var"] in dataframe:
dataframe[_cfg.names["N_var"]] = convert_to_numeric(
dataframe[_cfg.names["N_var"]]
)
if _cfg.names["SNOWFALL_var"] in dataframe:
dataframe[_cfg.names["SNOWFALL_var"]] = convert_to_numeric(
dataframe[_cfg.names["SNOWFALL_var"]]
)
return dataframe
[docs]
def check_data(dataset: xr.Dataset, dataframe: pd.DataFrame):
"""Check data is within physically reasonable bounds."""
check(dataset.T2, 316.16, 223.16)
check(dataset.RH2, 100.0, 0.0)
check(dataset.U2, 50.0, 0.0)
check(dataset.G, 1600.0, 0.0)
check(dataset.PRES, 1080.0, 200.0)
if _cfg.names["RRR_var"] in dataframe:
check(dataset.RRR, 20.0, 0.0)
if _cfg.names["SNOWFALL_var"] in dataframe:
check(dataset.SNOWFALL, 0.05, 0.0)
if _cfg.names["LWin_var"] in dataframe:
check(dataset.LWin, 400, 0.0)
if _cfg.names["N_var"] in dataframe:
check(dataset.N, 1.0, 0.0)
[docs]
def get_time_slice(dataframe, start_date, end_date):
if (start_date is not None) & (end_date is not None):
dataframe = dataframe.loc[start_date:end_date]
return dataframe
[docs]
def set_bias(
data: np.ndarray,
lapse_type: str,
altitude: float = 0.0,
limit: bool = True,
):
"""Apply lapse rate to data.
Args:
data: Numerical data.
lapse_type: ID of lapse rate in config file.
altitude: The height difference between a location and sensor.
limit: If True, set negative values to zero. Default True.
"""
biased_data = data + altitude * _cfg.lapse[lapse_type]
if limit:
biased_data = np.maximum(biased_data, 0.0)
return biased_data
[docs]
def set_variable_metadata() -> dict:
"""Initialise variable names and units."""
metadata = {
"HGT": ("m", "Elevation"),
"ASPECT": ("degrees", "Aspect of slope"),
"SLOPE": ("degrees", "Terrain slope"),
"MASK": ("boolean", "Glacier mask"),
"T2": ("K", "Temperature at 2 m"),
"RH2": ("%", "Relative humidity at 2 m"),
"U2": ("m s\u207b\xb9", "Wind velocity at 2 m"),
"G": ("W m\u207b\xb2", "Incoming shortwave radiation"),
"PRES": ("hPa", "Atmospheric Pressure"),
"RRR": ("mm", "Total precipitation (liquid+solid)"),
"SNOWFALL": ("m", "Snowfall"),
"LWin": ("W m\u207b\xb2", "Incoming longwave radiation"),
"N": ("%", "Cloud cover fraction"),
}
return metadata
[docs]
def get_variable_metadata(name: str) -> dict:
"""Get metadata associated with a variable name."""
metadata = set_variable_metadata()
metadata_bundle = {
"name": name,
"units": metadata[name][0],
"long_name": metadata[name][1],
}
return metadata_bundle
[docs]
def set_dataset_coordinates(dataset, latitude, longitude):
x, y = np.meshgrid(longitude, latitude)
dataset.coords["lat"] = (("south_north", "west_east"), x)
dataset.coords["lon"] = (("south_north", "west_east"), y)
return dataset
[docs]
def set_zero_field(
time_index: int, lat_index: int, lon_index: int
) -> np.ndarray:
"""Initialise array and fill with zeros."""
field = np.zeros([time_index, lat_index, lon_index])
return field
[docs]
def get_pressure_bias(data, height):
slp = data / np.power(
(1 - (0.0065 * _cfg.station["stationAlt"]) / (288.15)), 5.255
)
pressure = slp * np.power((1 - (0.0065 * height) / (288.15)), 5.22)
return pressure
[docs]
def write_netcdf(dataset, output_path):
dataset.to_netcdf(output_path)
print(f"{'-' * 43}\nInput file created: {output_path}\n{'-' * 43}")
[docs]
def create_1D_input(cs_file, cosipy_file, static_file, start_date, end_date):
"""Create an input dataset from a csv file with input point data.
Here you need to define how to interpolate the data.
.. warning::
There should be only one header line in the file.
Previous updates:
Tobias Sauter 07.07.2019
Anselm 04.07.2020
"""
df = read_input_file(input_path=cs_file)
print(df[pd.isnull(df).any(axis=1)])
df = df.fillna(method="ffill")
print(df[pd.isnull(df).any(axis=1)])
if _cfg.names["LWin_var"] not in df:
df[_cfg.names["LWin_var"]] = np.nan
if _cfg.names["N_var"] not in df:
df[_cfg.names["N_var"]] = np.nan
if _cfg.names["SNOWFALL_var"] not in df:
df[_cfg.names["SNOWFALL_var"]] = np.nan
col_list = [
_cfg.names["T2_var"],
_cfg.names["RH2_var"],
_cfg.names["U2_var"],
_cfg.names["G_var"],
_cfg.names["RRR_var"],
_cfg.names["PRES_var"],
_cfg.names["LWin_var"],
_cfg.names["N_var"],
_cfg.names["SNOWFALL_var"],
]
df = df[col_list]
df = df.resample("1H").agg(
{
_cfg.names["T2_var"]: "mean",
_cfg.names["RH2_var"]: "mean",
_cfg.names["U2_var"]: "mean",
_cfg.names["G_var"]: "mean",
_cfg.names["PRES_var"]: "mean",
_cfg.names["RRR_var"]: nansumwrapper,
_cfg.names["LWin_var"]: "mean",
_cfg.names["N_var"]: "mean",
_cfg.names["SNOWFALL_var"]: nansumwrapper,
}
)
df = df.dropna(axis=1, how="all")
print(df.head())
# Select time slice
df = get_time_slice(dataframe=df, start_date=start_date, end_date=end_date)
# Load static data
if _cfg.coords["WRF"]:
ds = xr.Dataset()
lon, lat = np.meshgrid(_cfg.points["plon"], _cfg.points["plat"])
ds.coords["lat"] = (("south_north", "west_east"), lon)
ds.coords["lon"] = (("south_north", "west_east"), lat)
else:
if static_file:
print(f"Read static file {static_file}\n")
ds = xr.open_dataset(static_file)
ds = ds.sel(
lat=_cfg.points["plat"],
lon=_cfg.points["plon"],
method="nearest",
)
ds.coords["lon"] = np.array([ds.lon.values])
ds.coords["lat"] = np.array([ds.lat.values])
else:
ds = xr.Dataset()
ds.coords["lon"] = np.array([_cfg.points["plon"]])
ds.coords["lat"] = np.array([_cfg.points["plat"]])
ds.lon.attrs["standard_name"] = "lon"
ds.lon.attrs["long_name"] = "longitude"
ds.lon.attrs["units"] = "degrees_east"
ds.coords["lat"] = np.array([_cfg.points["plat"]])
ds.lat.attrs["standard_name"] = "lat"
ds.lat.attrs["long_name"] = "latitude"
ds.lat.attrs["units"] = "degrees_north"
ds.coords["time"] = (("time"), df.index.values)
# Order variables
df = set_order_and_type(df, replace_pressure=True)
# Get values from file
sensor_height = _cfg.points["hgt"] - _cfg.station["stationAlt"]
if _cfg.names["in_K"]: # Temperature
T2 = set_bias(
data=df[_cfg.names["T2_var"]].values,
lapse_type="lapse_T",
altitude=sensor_height,
limit=False,
)
else:
T2 = set_bias(
data=df[_cfg.names["T2_var"]].values + 273.16,
lapse_type="lapse_T",
altitude=sensor_height,
limit=False,
)
check_temperature_bounds(temperature=T2)
RH2 = set_bias( # Relative humidity
data=df[_cfg.names["RH2_var"]].values,
lapse_type="lapse_RH",
altitude=sensor_height,
limit=False,
)
U2 = df[_cfg.names["U2_var"]].values # Wind velocity
G = df[_cfg.names["G_var"]].values # Incoming shortwave radiation
PRES = get_pressure_bias(
data=df[_cfg.names["PRES_var"]].values, height=_cfg.points["hgt"]
) # Pressure
if _cfg.names["RRR_var"] in df: # Precipitation
RRR = set_bias(
data=df[_cfg.names["RRR_var"]].values,
lapse_type="lapse_RRR",
altitude=sensor_height,
limit=True,
)
if _cfg.names["SNOWFALL_var"] in df:
SNOWFALL = set_bias(
data=df[_cfg.names["SNOWFALL_var"]].values,
lapse_type="lapse_SNOWFALL",
altitude=sensor_height,
limit=True,
)
if _cfg.names["LWin_var"] in df:
LW = df[_cfg.names["LWin_var"]].values # Incoming longwave radiation
if _cfg.names["N_var"] in df:
N = df[_cfg.names["N_var"]].values # Cloud cover fraction
# Change aspect to south==0, east==negative, west==positive
if static_file:
aspect = ds["ASPECT"].values - 180.0
ds["ASPECT"] = aspect
# Auxiliary variables
mask = ds.MASK.values
slope = ds.SLOPE.values
aspect = ds.ASPECT.values
else:
# Auxiliary variables
mask = 1
slope = 0
aspect = 0
# Limit bounds for relative humidity
RH2 = set_relative_humidity_bounds(RH2)
# Add variables to file
add_variable_along_point(ds=ds, var=_cfg.points["hgt"], **get_variable_metadata("HGT"))
add_variable_along_point(ds=ds, var=aspect, **get_variable_metadata("ASPECT"))
add_variable_along_point(ds=ds, var=slope, **get_variable_metadata("SLOPE"))
add_variable_along_point(ds=ds, var=mask, **get_variable_metadata("MASK"))
add_variable_along_timelatlon_point(ds=ds, var=T2, **get_variable_metadata("T2"))
add_variable_along_timelatlon_point(ds=ds, var=RH2, **get_variable_metadata("RH2"))
add_variable_along_timelatlon_point(ds=ds, var=U2, **get_variable_metadata("U2"))
add_variable_along_timelatlon_point(ds=ds, var=G, **get_variable_metadata("G"))
add_variable_along_timelatlon_point(ds=ds, var=PRES, **get_variable_metadata("PRES"))
if _cfg.names["RRR_var"] in df:
add_variable_along_timelatlon_point(ds=ds, var=RRR, **get_variable_metadata("RRR"))
if _cfg.names["SNOWFALL_var"] in df:
add_variable_along_timelatlon_point(ds, var=SNOWFALL, **get_variable_metadata("SNOWFALL"))
if _cfg.names["LWin_var"] in df:
add_variable_along_timelatlon_point(ds=ds, var=LW, **get_variable_metadata("LWin"))
if _cfg.names["N_var"] in df:
add_variable_along_timelatlon_point(ds=ds, var=N, **get_variable_metadata("N"))
# Write file to disk
check_for_nan_point(ds)
write_netcdf(dataset=ds, output_path=cosipy_file)
# Check data
check_data(dataset=ds, dataframe=df)
[docs]
def create_2D_input(
cs_file,
cosipy_file,
static_file,
start_date,
end_date,
x0=None,
x1=None,
y0=None,
y1=None,
):
"""Create a 2D input dataset from a .csv file.
Here you need to define how to interpolate the data.
.. warning::
There should be only one header line in the file.
Previous updates:
Tobias Sauter 07.07.2019
Anselm 01.07.2020
Franziska Temme 03.08.2021
"""
df = read_input_file(input_path=cs_file)
# Select time slice
df = get_time_slice(dataframe=df, start_date=start_date, end_date=end_date)
# Aggregate data to selected value
if _cfg.coords["aggregate"]:
extra_vars = []
for name in ["N_var", "RRR_var", "LWin_var", "SNOWFALL_var"]:
if _cfg.names[name] in df:
extra_vars.append(_cfg.names[name])
aggregates = {
_cfg.names["PRES_var"]: "mean",
_cfg.names["T2_var"]: "mean",
_cfg.names["RH2_var"]: "mean",
_cfg.names["G_var"]: "mean",
_cfg.names["U2_var"]: "mean",
}
for name in extra_vars:
if name in [_cfg.names["N_var"], _cfg.names["LWin_var"]]:
aggregates[name] = "mean"
else:
aggregates[name] = "sum"
df = df.resample(_cfg.coords["aggregation_step"]).agg(aggregates)
# Load static data
print(f"Read static file {static_file}\n")
ds = xr.open_dataset(static_file)
# Create subset
ds = ds.sel(lat=slice(y0, y1), lon=slice(x0, x1))
if _cfg.coords["WRF"]:
dso = xr.Dataset()
x, y = np.meshgrid(ds.lon, ds.lat)
dso.coords["time"] = (("time"), df.index.values)
dso.coords["lat"] = (("south_north", "west_east"), y)
dso.coords["lon"] = (("south_north", "west_east"), x)
else:
dso = ds
dso.coords["time"] = df.index.values
# Order variables
df = set_order_and_type(df, replace_pressure=False)
# Get values from file
RH2 = df[_cfg.names["RH2_var"]] # Relative humidity
U2 = df[_cfg.names["U2_var"]] # Wind velocity
G = df[_cfg.names["G_var"]] # Incoming shortwave radiation
PRES = df[_cfg.names["PRES_var"]] # Pressure
if _cfg.names["in_K"]:
T2 = df[_cfg.names["T2_var"]].values # Temperature
else:
T2 = df[_cfg.names["T2_var"]].values + 273.16
check_temperature_bounds(temperature=T2)
# Create numpy arrays for the 2D fields
time_index = len(dso.time)
lat_index = len(ds.lat)
lon_index = len(ds.lon)
T_interp = set_zero_field(time_index, lat_index, lon_index)
RH_interp = set_zero_field(time_index, lat_index, lon_index)
U_interp = set_zero_field(time_index, lat_index, lon_index)
G_interp = np.full([time_index, lat_index, lon_index], np.nan)
P_interp = set_zero_field(time_index, lat_index, lon_index)
if _cfg.names["RRR_var"] in df:
RRR = df[_cfg.names["RRR_var"]] # Precipitation
RRR_interp = set_zero_field(time_index, lat_index, lon_index)
if _cfg.names["SNOWFALL_var"] in df:
SNOWFALL = df[_cfg.names["SNOWFALL_var"]] # Snowfall
SNOWFALL_interp = set_zero_field(time_index, lat_index, lon_index)
if _cfg.names["LWin_var"] in df:
LW = df[_cfg.names["LWin_var"]] # Incoming longwave radiation
LW_interp = set_zero_field(time_index, lat_index, lon_index)
if _cfg.names["N_var"] in df:
N = df[_cfg.names["N_var"]] # Cloud cover fraction
N_interp = set_zero_field(time_index, lat_index, lon_index)
# Interpolate point data to grid
print("Interpolate CR file to grid")
# Interpolate data (T, RH, RRR, U) to grid using lapse rates
altitude = ds.HGT.values - _cfg.station["stationAlt"]
for t in range(time_index):
T_interp[t, :, :] = set_bias(
data=(T2[t]), lapse_type="lapse_T", altitude=altitude, limit=False
)
RH_interp[t, :, :] = set_bias(
data=RH2[t], lapse_type="lapse_RH", altitude=altitude, limit=False
)
U_interp[t, :, :] = U2[t]
"""
Interpolate pressure using the barometric equation.
Do not replace with get_pressure_bias() as the arrays won't
interpolate correctly.
"""
slp = PRES[t] / np.power(
(1 - (0.0065 * _cfg.station["stationAlt"]) / (288.15)), 5.255
)
P_interp[t, :, :] = slp * np.power(
(1 - (0.0065 * ds.HGT.values) / (288.15)), 5.255
)
if _cfg.names["RRR_var"] in df:
RRR_interp[t, :, :] = set_bias(
data=RRR[t],
lapse_type="lapse_RRR",
altitude=altitude,
limit=True,
)
if _cfg.names["SNOWFALL_var"] in df:
SNOWFALL_interp[t, :, :] = set_bias(
data=SNOWFALL[t],
lapse_type="lapse_SNOWFALL",
altitude=altitude,
limit=True,
)
if _cfg.names["LWin_var"] in df:
LW_interp[t, :, :] = LW[t]
if _cfg.names["N_var"] in df:
N_interp[t, :, :] = N[t]
print(
f"Number of glacier cells: {int(np.count_nonzero(~np.isnan(ds['MASK'].values))):d}"
)
print(f"Number of glacier cells: {int(np.nansum(ds['MASK'].values)):d}")
# Auxiliary variables
mask = ds.MASK.values
heights = ds.HGT.values
slope = ds.SLOPE.values
aspect = ds.ASPECT.values
lats = ds.lat.values
lons = ds.lon.values
sw = G.values
# Run radiation module
if (
_cfg.radiation["radiationModule"] in ["Wohlfahrt2016", "none"]
or _cfg.radiation["radiationModule"] is None
):
print("Run the radiation module Wohlfahrt2016 or no Radiation Module.")
# Change aspect to south==0, east==negative, west==positive
aspect = ds["ASPECT"].values - 180.0
ds["ASPECT"] = (("lat", "lon"), aspect)
for t in range(time_index):
doy = df.index[t].dayofyear
hour = df.index[t].hour
for i in range(lat_index):
for j in range(lon_index):
if mask[i, j] == 1:
if _cfg.radiation["radiationModule"] == "Wohlfahrt2016":
G_interp[t, i, j] = np.maximum(
0.0,
mod_radCor.correctRadiation(
lats[i],
lons[j],
_cfg.radiation["timezone_lon"],
doy,
hour,
slope[i, j],
aspect[i, j],
sw[t],
_cfg.radiation["zeni_thld"],
),
)
else:
G_interp[t, i, j] = sw[t]
elif _cfg.radiation["radiationModule"] == "Moelg2009":
print("Run the radiation module Moelg2009")
# Calculate solar Parameters
solPars, timeCorr = mod_radCor.solpars(_cfg.station["stationLat"])
if _cfg.radiation["LUT"]:
print("Read in look-up-tables")
ds_LUT = xr.open_dataset(_cfg.radiation["LUT_path"])
shad1yr = ds_LUT.SHADING.values
svf = ds_LUT.SVF.values
else:
print("Build look-up-tables")
# Sky view factor
svf = mod_radCor.LUTsvf(
np.flipud(heights),
np.flipud(mask),
np.flipud(slope),
np.flipud(aspect),
lats[::-1],
lons,
)
print("Look-up-table for sky view factor done")
# Topographic shading
shad1yr = mod_radCor.LUTshad(
solPars,
timeCorr,
_cfg.station["stationLat"],
np.flipud(heights),
np.flipud(mask),
lats[::-1],
lons,
_cfg.radiation["dtstep"],
_cfg.radiation["tcart"],
)
print("Look-up-table for topographic shading done")
# Save look-up tables
Nt = int(
366 * (3600 / _cfg.radiation["dtstep"]) * 24
) # number of time steps
Ny = len(lats) # number of latitudes
Nx = len(lons) # number of longitudes
f = nc.Dataset(_cfg.radiation["LUT_path"], "w")
f.createDimension("time", Nt)
f.createDimension("lat", Ny)
f.createDimension("lon", Nx)
LATS = f.createVariable("lat", "f4", ("lat",))
LATS.units = "degree"
LONS = f.createVariable("lon", "f4", ("lon",))
LONS.units = "degree"
LATS[:] = lats
LONS[:] = lons
shad = f.createVariable("SHADING", float, ("time", "lat", "lon"))
shad.long_name = "Topographic shading"
shad[:] = shad1yr
SVF = f.createVariable("SVF", float, ("lat", "lon"))
SVF.long_name = "sky view factor"
SVF[:] = svf
f.close()
# Run the radiation model in both cases
for t in range(time_index):
doy = df.index[t].dayofyear
hour = df.index[t].hour
G_interp[t, :, :] = mod_radCor.calcRad(
solPars,
timeCorr,
doy,
hour,
_cfg.station["stationLat"],
T_interp[t, ::-1, :],
P_interp[t, ::-1, :],
RH_interp[t, ::-1, :],
N_interp[t, ::-1, :],
np.flipud(heights),
np.flipud(mask),
np.flipud(slope),
np.flipud(aspect),
shad1yr,
svf,
_cfg.radiation["dtstep"],
_cfg.radiation["tcart"],
)
# Change aspect to south == 0, east == negative, west == positive
aspect2 = ds["ASPECT"].values - 180.0
ds["ASPECT"] = (("lat", "lon"), aspect2)
else:
raise ValueError(
f'Radiation module {_cfg.radiation["radiationModule"]} not available.\nAvailable options are: Wohlfahrt2016, Moelg2009, none.'
)
# Limit bounds for relative humidity
RH_interp = set_relative_humidity_bounds(RH_interp)
# Add variables to file
add_variable_along_latlon(ds=dso, var=ds.HGT.values, **get_variable_metadata("HGT"))
add_variable_along_latlon(ds=dso, var=ds.ASPECT.values, **get_variable_metadata("ASPECT"))
add_variable_along_latlon(ds=dso, var=ds.SLOPE.values, **get_variable_metadata("SLOPE"))
add_variable_along_latlon(ds=dso, var=ds.MASK.values, **get_variable_metadata("MASK"))
add_variable_along_timelatlon(ds=dso, var=T_interp, **get_variable_metadata("T2"))
add_variable_along_timelatlon(ds=dso, var=RH_interp, **get_variable_metadata("RH2"))
add_variable_along_timelatlon(ds=dso, var=U_interp, **get_variable_metadata("U2"))
add_variable_along_timelatlon(ds=dso, var=G_interp, **get_variable_metadata("G"))
add_variable_along_timelatlon(ds=dso, var=P_interp, **get_variable_metadata("PRES"))
if _cfg.names["RRR_var"] in df:
add_variable_along_timelatlon(ds=dso, var=RRR_interp, **get_variable_metadata("RRR"))
if _cfg.names["SNOWFALL_var"] in df:
add_variable_along_timelatlon(ds=dso, var=SNOWFALL_interp, **get_variable_metadata("SNOWFALL"))
if _cfg.names["LWin_var"] in df:
add_variable_along_timelatlon(ds=dso, var=LW_interp, **get_variable_metadata("LWin"))
if _cfg.names["N_var"] in df:
add_variable_along_timelatlon(ds=dso, var=N_interp, **get_variable_metadata("N"))
# encoding = dict()
# for var in IO.get_result().data_vars:
# dataMin = IO.get_result()[var].min(skipna=True).values
# dataMax = IO.get_result()[var].max(skipna=True).values
#
# dtype = 'int16'
# FillValue = -9999
# scale_factor, add_offset = compute_scale_and_offset(dataMin, dataMax, 16)
# encoding[var] = dict(zlib=True, complevel=2, dtype=dtype, scale_factor=scale_factor, add_offset=add_offset, _FillValue=FillValue)
# Write file to disk
# dso.to_netcdf(cosipy_file, encoding=encoding)
check_for_nan(dso)
write_netcdf(dataset=dso, output_path=cosipy_file)
# Check data
check_data(dataset=dso, dataframe=df)
[docs]
def add_variable_along_timelatlon(ds, var, name, units, long_name):
"""Add spatiotemporal data to a dataset."""
if _cfg.coords["WRF"]:
ds[name] = (("time", "south_north", "west_east"), var)
else:
ds[name] = (("time", "lat", "lon"), var)
ds[name].attrs["units"] = units
ds[name].attrs["long_name"] = long_name
return ds
[docs]
def add_variable_along_latlon(ds, var, name, units, long_name):
"""Add spatial data to a dataset."""
if _cfg.coords["WRF"]:
ds[name] = (("south_north", "west_east"), var)
else:
ds[name] = (("lat", "lon"), var)
ds[name].attrs["units"] = units
ds[name].attrs["long_name"] = long_name
ds[name].encoding["_FillValue"] = -9999
return ds
[docs]
def add_variable_along_timelatlon_point(ds, var, name, units, long_name):
"""Add spatiotemporal point data to a dataset."""
ds[name] = (("time", "lat", "lon"), np.reshape(var, (len(var), 1, 1)))
ds[name].attrs["units"] = units
ds[name].attrs["long_name"] = long_name
return ds
[docs]
def add_variable_along_point(ds, var, name, units, long_name):
"""Add point data to a dataset."""
ds[name] = (("lat", "lon"), np.reshape(var, (1, 1)))
ds[name].attrs["units"] = units
ds[name].attrs["long_name"] = long_name
return ds
[docs]
def check(field, max_bound, min_bound):
"""Check the validity of the input data."""
if np.nanmax(field) > max_bound or np.nanmin(field) < min_bound:
msg = f"{str.capitalize(field.name)} MAX: {np.nanmax(field):.2f} MIN: {np.nanmin(field):.2f}"
print(
f"\n\nWARNING! Please check the data, it seems they are out of a reasonable range {msg}"
)
[docs]
def check_for_nan(ds):
if _cfg.coords["WRF"] is True:
for y, x in product(
range(ds.dims["south_north"]), range(ds.dims["west_east"])
):
mask = ds.MASK.sel(south_north=y, west_east=x)
if mask == 1:
if np.isnan(
ds.sel(south_north=y, west_east=x).to_array()
).any():
raise_nan_error()
else:
for y, x in product(range(ds.dims["lat"]), range(ds.dims["lon"])):
mask = ds.MASK.isel(lat=y, lon=x)
if mask == 1:
if np.isnan(ds.isel(lat=y, lon=x).to_array()).any():
raise_nan_error()
[docs]
def raise_nan_error():
"""Raise error if NaNs are in the dataset.
Raises:
ValueError: There are NaNs in the dataset.
"""
raise ValueError("ERROR! There are NaNs in the dataset.")
[docs]
def check_temperature_bounds(temperature: np.ndarray):
max_temperature = np.nanmax(temperature)
min_temperature = np.nanmin(temperature)
check_msg = "Please check the input temperature"
if max_temperature > 373.16:
error_message = (
f"Maximum temperature is: {max_temperature} K. {check_msg}"
)
raise ValueError(error_message)
elif min_temperature < 173.16:
error_message = (
f"Minimum temperature is: {min_temperature} K. {check_msg}"
)
raise ValueError(error_message)
[docs]
def set_relative_humidity_bounds(humidity):
"""Limit bounds for relative humidity."""
humidity[humidity > 100.0] = 100.0
humidity[humidity < 0.0] = 0.1
return humidity
[docs]
def compute_scale_and_offset(min, max, n):
# stretch/compress data to the available packed range
scale_factor = (max - min) / (2**n - 1)
# translate the range to be symmetric about zero
add_offset = min + 2 ** (n - 1) * scale_factor
return (scale_factor, add_offset)
[docs]
def nansumwrapper(a, **kw_args):
"""Sum dataframe columns which contain NaNs."""
if np.isnan(a).all():
return np.nan
else:
return np.nansum(a, **kw_args)
[docs]
def get_user_arguments(parser: argparse.ArgumentParser) -> argparse.Namespace:
"""Get user arguments for converting AWS data.
Args:
parser: An initialised argument parser.
Returns:
User arguments for conversion.
"""
parser.description = "Create netCDF input file from a .csv file."
parser.prog = __package__
# Required arguments
parser.add_argument(
"-i",
"--input",
dest="csv_file",
type=str,
metavar="<path>",
required=True,
help="Path to .csv file with meteorological data",
)
parser.add_argument(
"-o",
"--output",
dest="cosipy_file",
type=str,
metavar="<path>",
required=True,
help="Path to the resulting COSIPY netCDF file",
)
parser.add_argument(
"-s",
"--static_file",
type=str,
dest="static_file",
help="Path to static file with DEM, slope etc.",
)
# Optional arguments
parser.add_argument(
"-b",
"--start_date",
type=str,
metavar="<yyyy-mm-dd>",
dest="start_date",
help="Start date",
)
parser.add_argument(
"-e",
"--end_date",
type=str,
metavar="<yyyy-mm-dd>",
dest="end_date",
help="End date",
)
parser.add_argument(
"--xl",
dest="xl",
type=float,
metavar="<float>",
const=None,
help="Left longitude value of the subset",
)
parser.add_argument(
"--xr",
dest="xr",
type=float,
metavar="<float>",
const=None,
help="Right longitude value of the subset",
)
parser.add_argument(
"--yl",
dest="yl",
type=float,
metavar="<float>",
const=None,
help="Lower latitude value of the subset",
)
parser.add_argument(
"--yu",
dest="yu",
type=float,
metavar="<float>",
const=None,
help="Upper latitude value of the subset",
)
arguments = parser.parse_args()
return arguments
[docs]
def load_config(module_name: str) -> tuple:
"""Load configuration for module.
Args:
module_name: Name of this module.
Returns:
User arguments and configuration parameters.
"""
params = UtilitiesConfig()
arguments = get_user_arguments(params.parser)
params.load(arguments.utilities_path)
params = params.get_config_expansion(name=module_name)
return arguments, params
[docs]
def main():
global _args # Yes, it's bad practice
global _cfg
_args, _cfg = load_config(module_name="aws2cosipy")
if _cfg.points["point_model"]:
create_1D_input(
_args.csv_file,
_args.cosipy_file,
_args.static_file,
_args.start_date,
_args.end_date,
)
else:
create_2D_input(
_args.csv_file,
_args.cosipy_file,
_args.static_file,
_args.start_date,
_args.end_date,
_args.xl,
_args.xr,
_args.yl,
_args.yu,
)
if __name__ == "__main__":
main()