ref: split extract_useful_data fn

dev
g 4 years ago
parent 3a7448245e
commit d107523f22

@ -122,6 +122,74 @@ class Grib:
wget.download(gfs_wave, out=SAVE_DIR), wget.download(gfs_wave, out=SAVE_DIR),
) )
@staticmethod
def grib_to_dataframe(ds_atmos_file, ds_wave_file, forecast_hour):
"""Work with open GRIB file, extracting data into pandas dataframe"""
with xr.open_dataset(ds_atmos_file, engine="pynio") as ds_atmos:
filtered_ds_atmos = ds_atmos.get(ATMOS_PARAM_NAMES) or ds_atmos.get(
[p for p in ATMOS_PARAM_NAMES if not p == "APCP_P8_L1_GLL0_acc"]
) # skip running total column in the first forecast
for name, param in HEIGHT_PARAM_NAMES.items():
if name == "TMP_P0_L103_GLL0":
level = TEMPERATURE_HEIGHT
else:
level = WIND_HEIGHT
filtered_ds_atmos[name] = (
ds_atmos[name]
.sel({param: level})
.assign_attrs(level=level)
.drop_vars(param)
)
# if hour==0 add running total column from future forecasts
if forecast_hour == 0:
precip = xr.zeros_like(filtered_ds_atmos["GUST_P0_L1_GLL0"])
precip.name = "APCP_P8_L1_GLL0_acc"
filtered_ds_atmos = xr.combine_by_coords(
[filtered_ds_atmos, precip], coords="mimal"
)
# filter wave to requested variables
with xr.open_dataset(ds_wave_file, engine="pynio") as ds_wave:
filtered_ds_wave = ds_wave.get(WAVE_PARAM_NAMES)
# concatinate atmos and wave into a single dataset
combined_product = filtered_ds_atmos.merge(
filtered_ds_wave.reindex_like(filtered_ds_atmos, method="nearest")
)
# transfer to pandas
df = combined_product.to_dataframe()
# convert longitude values into the standard range of -180 degrees to +180 degrees
# TODO: do we want to do it?
latitudes = df.index.get_level_values("lat_0")
longitudes = df.index.get_level_values("lon_0")
map_function = lambda lon: (lon - 360) if (lon > 180) else lon
remapped_longitudes = longitudes.map(map_function)
df["longitude"] = remapped_longitudes
df["latitude"] = latitudes
return df
@staticmethod
def dump_df_to_csv(df, forecast_hour, save_to):
"""Dump pandas dataframe to CSV on disk"""
if forecast_hour == 0:
df.to_csv(
os.path.join(
SAVE_DIR,
save_to,
),
index=False,
)
else:
df.to_csv(
os.path.join(SAVE_DIR, save_to),
index=False,
mode="a",
header=False,
)
@staticmethod @staticmethod
def extract_useful_data(target_time: str = None): def extract_useful_data(target_time: str = None):
"""Download and process GRIB files into csv of requested parameters """Download and process GRIB files into csv of requested parameters
@ -143,68 +211,9 @@ class Grib:
) )
# filter atmos to requested variables # filter atmos to requested variables
with xr.open_dataset(ds_atmos_file, engine="pynio") as ds_atmos: df = Grib.grib_to_dataframe(ds_atmos_file, ds_wave_file, forecast_hour)
filtered_ds_atmos = ds_atmos.get(ATMOS_PARAM_NAMES) or ds_atmos.get(
[p for p in ATMOS_PARAM_NAMES if not p == "APCP_P8_L1_GLL0_acc"]
) # skip running total column in the first forecast
for name, param in HEIGHT_PARAM_NAMES.items():
if name == "TMP_P0_L103_GLL0":
level = TEMPERATURE_HEIGHT
else:
level = WIND_HEIGHT
filtered_ds_atmos[name] = (
ds_atmos[name]
.sel({param: level})
.assign_attrs(level=level)
.drop_vars(param)
)
# if hour==0 add running total column from future forecasts
if forecast_hour == 0:
precip = xr.zeros_like(filtered_ds_atmos["GUST_P0_L1_GLL0"])
precip.name = "APCP_P8_L1_GLL0_acc"
filtered_ds_atmos = xr.combine_by_coords(
[filtered_ds_atmos, precip], coords="mimal"
)
# filter wave to requested variables
with xr.open_dataset(ds_wave_file, engine="pynio") as ds_wave:
filtered_ds_wave = ds_wave.get(WAVE_PARAM_NAMES)
# concatinate atmos and wave into a single dataset
combined_product = filtered_ds_atmos.merge(
filtered_ds_wave.reindex_like(
filtered_ds_atmos, method="nearest"
)
)
# transfer to pandas
df = combined_product.to_dataframe()
# convert longitude values into the standard range of -180 degrees to +180 degrees
# TODO: do we want to do it?
latitudes = df.index.get_level_values("lat_0")
longitudes = df.index.get_level_values("lon_0")
map_function = lambda lon: (lon - 360) if (lon > 180) else lon
remapped_longitudes = longitudes.map(map_function)
df["longitude"] = remapped_longitudes
df["latitude"] = latitudes
# dump datafrate to csv on disk # dump datafrate to csv on disk
if forecast_hour == 0: Grib.dump_df_to_csv(df, forecast_hour, save_to)
df.to_csv(
os.path.join(
SAVE_DIR,
save_to,
),
index=False,
)
else:
df.to_csv(
os.path.join(SAVE_DIR, save_to),
index=False,
mode="a",
header=False,
)
# clean up grib files # clean up grib files
os.remove(ds_wave_file) os.remove(ds_wave_file)

Loading…
Cancel
Save