From d107523f229e09fb45ef2b56fd6e51067d0b063d Mon Sep 17 00:00:00 2001 From: g Date: Tue, 7 Jun 2022 10:17:00 +0300 Subject: [PATCH] ref: split extract_useful_data fn --- services/grib/service.py | 131 +++++++++++++++++++++------------------ 1 file changed, 70 insertions(+), 61 deletions(-) diff --git a/services/grib/service.py b/services/grib/service.py index d075302..8462885 100644 --- a/services/grib/service.py +++ b/services/grib/service.py @@ -122,6 +122,74 @@ class Grib: wget.download(gfs_wave, out=SAVE_DIR), ) + @staticmethod + def grib_to_dataframe(ds_atmos_file, ds_wave_file, forecast_hour): + """Work with open GRIB file, extracting data into pandas dataframe""" + with xr.open_dataset(ds_atmos_file, engine="pynio") as ds_atmos: + filtered_ds_atmos = ds_atmos.get(ATMOS_PARAM_NAMES) or ds_atmos.get( + [p for p in ATMOS_PARAM_NAMES if not p == "APCP_P8_L1_GLL0_acc"] + ) # skip running total column in the first forecast + + for name, param in HEIGHT_PARAM_NAMES.items(): + if name == "TMP_P0_L103_GLL0": + level = TEMPERATURE_HEIGHT + else: + level = WIND_HEIGHT + filtered_ds_atmos[name] = ( + ds_atmos[name] + .sel({param: level}) + .assign_attrs(level=level) + .drop_vars(param) + ) + # if hour==0 add running total column from future forecasts + if forecast_hour == 0: + precip = xr.zeros_like(filtered_ds_atmos["GUST_P0_L1_GLL0"]) + precip.name = "APCP_P8_L1_GLL0_acc" + filtered_ds_atmos = xr.combine_by_coords( + [filtered_ds_atmos, precip], coords="mimal" + ) + # filter wave to requested variables + with xr.open_dataset(ds_wave_file, engine="pynio") as ds_wave: + filtered_ds_wave = ds_wave.get(WAVE_PARAM_NAMES) + + # concatinate atmos and wave into a single dataset + combined_product = filtered_ds_atmos.merge( + filtered_ds_wave.reindex_like(filtered_ds_atmos, method="nearest") + ) + + # transfer to pandas + df = combined_product.to_dataframe() + + # convert longitude values into the standard range of -180 degrees to +180 degrees + # TODO: do we want to do it? + latitudes = df.index.get_level_values("lat_0") + longitudes = df.index.get_level_values("lon_0") + + map_function = lambda lon: (lon - 360) if (lon > 180) else lon + remapped_longitudes = longitudes.map(map_function) + df["longitude"] = remapped_longitudes + df["latitude"] = latitudes + return df + + @staticmethod + def dump_df_to_csv(df, forecast_hour, save_to): + """Dump pandas dataframe to CSV on disk""" + if forecast_hour == 0: + df.to_csv( + os.path.join( + SAVE_DIR, + save_to, + ), + index=False, + ) + else: + df.to_csv( + os.path.join(SAVE_DIR, save_to), + index=False, + mode="a", + header=False, + ) + @staticmethod def extract_useful_data(target_time: str = None): """Download and process GRIB files into csv of requested parameters @@ -143,68 +211,9 @@ class Grib: ) # filter atmos to requested variables - with xr.open_dataset(ds_atmos_file, engine="pynio") as ds_atmos: - filtered_ds_atmos = ds_atmos.get(ATMOS_PARAM_NAMES) or ds_atmos.get( - [p for p in ATMOS_PARAM_NAMES if not p == "APCP_P8_L1_GLL0_acc"] - ) # skip running total column in the first forecast - - for name, param in HEIGHT_PARAM_NAMES.items(): - if name == "TMP_P0_L103_GLL0": - level = TEMPERATURE_HEIGHT - else: - level = WIND_HEIGHT - filtered_ds_atmos[name] = ( - ds_atmos[name] - .sel({param: level}) - .assign_attrs(level=level) - .drop_vars(param) - ) - # if hour==0 add running total column from future forecasts - if forecast_hour == 0: - precip = xr.zeros_like(filtered_ds_atmos["GUST_P0_L1_GLL0"]) - precip.name = "APCP_P8_L1_GLL0_acc" - filtered_ds_atmos = xr.combine_by_coords( - [filtered_ds_atmos, precip], coords="mimal" - ) - # filter wave to requested variables - with xr.open_dataset(ds_wave_file, engine="pynio") as ds_wave: - filtered_ds_wave = ds_wave.get(WAVE_PARAM_NAMES) - - # concatinate atmos and wave into a single dataset - combined_product = filtered_ds_atmos.merge( - filtered_ds_wave.reindex_like( - filtered_ds_atmos, method="nearest" - ) - ) - - # transfer to pandas - df = combined_product.to_dataframe() - - # convert longitude values into the standard range of -180 degrees to +180 degrees - # TODO: do we want to do it? - latitudes = df.index.get_level_values("lat_0") - longitudes = df.index.get_level_values("lon_0") - - map_function = lambda lon: (lon - 360) if (lon > 180) else lon - remapped_longitudes = longitudes.map(map_function) - df["longitude"] = remapped_longitudes - df["latitude"] = latitudes + df = Grib.grib_to_dataframe(ds_atmos_file, ds_wave_file, forecast_hour) # dump datafrate to csv on disk - if forecast_hour == 0: - df.to_csv( - os.path.join( - SAVE_DIR, - save_to, - ), - index=False, - ) - else: - df.to_csv( - os.path.join(SAVE_DIR, save_to), - index=False, - mode="a", - header=False, - ) + Grib.dump_df_to_csv(df, forecast_hour, save_to) # clean up grib files os.remove(ds_wave_file)