import gc import os import requests import wget import xarray as xr from datetime import datetime, timedelta from zoneinfo import ZoneInfo class Grib: @staticmethod def is_reachable(url: str): """Check if url is reachable at all with the current setup :param url: URL to check :return: True if url is reachable, False otherwise """ if requests.head(url): return True return False @staticmethod def form_gfs_link(target_time=None, prod_hour=384): """Return well formed link to gfs data which should be available by given time :param target_time: time to check, defaults to current time :param prod_hour: forecast hour to link to, defaults to 384 :returns: URL to gfs file """ if not target_time: target_time = datetime.now( tz=ZoneInfo("US/Eastern") ) # noaa is located in washington looking_at = "atmos" prod_hour = str(prod_hour).zfill(3) date_str = target_time.strftime("%Y%m%d") hour_str = str((target_time.hour // 6) * 6).zfill(2) target_url = f"https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfs.{date_str}/{hour_str}/{looking_at}/gfs.t{hour_str}z.pgrb2.0p25.f{prod_hour}" return target_url @staticmethod def form_gfswave_link(target_time=None, prod_hour=384): """Return well formed link to gfs data which should be available by given time :param target_time: time to check, defaults to current time :param prod_hour: forecast hour to link to, defaults to 384 :returns: URL to gfs file """ if not target_time: target_time = datetime.now( tz=ZoneInfo("US/Eastern") ) # noaa is located in washington looking_at = "wave" prod_hour = str(prod_hour).zfill(3) date_str = target_time.strftime("%Y%m%d") hour_str = str((target_time.hour // 6) * 6).zfill(2) target_url = f"https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfs.{date_str}/{hour_str}/{looking_at}/gridded/gfs{looking_at}.t{hour_str}z.global.0p25.f{prod_hour}.grib2" return target_url @staticmethod def fresh_grib_time(target_time=None): """Find most recent available GRIB file for GFS atmospheric and GFSWave forecasts :param target_time: no more recent than this time :returns: most recent time GRIB is available """ if not target_time: target_time = datetime.now( tz=ZoneInfo("US/Eastern") ) # noaa is located in washington for fallback in range(4): # wasting a bit of time here on useless first cycle (mult by 0, subtract 0) # allows us to kepp code a bit neater later fallback_hours = fallback * 6 target_time = target_time - timedelta(hours=fallback_hours) if not Grib.is_reachable( Grib.form_gfs_link(target_time=target_time, prod_hour=384) ): continue if not Grib.is_reachable( Grib.form_gfswave_link(target_time=target_time, prod_hour=384) ): continue break else: raise Exception("No forecasts in the last 24 hours were reachable") # both forecasts are reachable @ target_time return target_time @staticmethod def download_fresh_grib(target_time, prod_hour=0): """Download GRIB files for atmos and wave :param target_time: download GRIB from this time :param prod_hour: download forecast for this hour :returns: filenames where GRIB files are downloaded to """ gfs_atmos = Grib.form_gfs_link(target_time=target_time, prod_hour=prod_hour) gfs_wave = Grib.form_gfswave_link(target_time=target_time, prod_hour=prod_hour) return ( wget.download(gfs_atmos, out=SAVE_DIR), wget.download(gfs_wave, out=SAVE_DIR), ) @staticmethod def extract_useful_data(): """Download and process GRIB files into csv of requested parameters :returns: filenames of the resulting csv """ target_time = Grib.fresh_grib_time() save_to = f"{target_time.strftime('%Y%m%d')}-{str((target_time.hour // 6) * 6).zfill(2)}.csv" for forecast_hour in range(MAX_FORECAST_HOUR): # download gribs ds_atmos_file, ds_wave_file = Grib.download_fresh_grib( target_time=target_time, prod_hour=forecast_hour ) # filter atmos to requested variables with xr.open_dataset(ds_atmos_file, engine="pynio") as ds_atmos: filtered_ds_atmos = ds_atmos.get(ATMOS_PARAM_NAMES) or ds_atmos.get( [p for p in ATMOS_PARAM_NAMES if not p == "APCP_P8_L1_GLL0_acc"] ) # skip running total column in the first forecast for name, param in HEIGHT_PARAM_NAMES.items(): if name == "TMP_P0_L103_GLL0": level = TEMPERATURE_HEIGHT else: level = WIND_HEIGHT filtered_ds_atmos[name] = ( ds_atmos[name] .sel({param: level}) .assign_attrs(level=level) .drop_vars(param) ) # if hour==0 add running total column from future forecasts if forecast_hour == 0: precip = xr.zeros_like(filtered_ds_atmos["GUST_P0_L1_GLL0"]) precip.name = "APCP_P8_L1_GLL0_acc" filtered_ds_atmos = xr.combine_by_coords( [filtered_ds_atmos, precip], coords="mimal" ) # filter wave to requested variables with xr.open_dataset(ds_wave_file, engine="pynio") as ds_wave: filtered_ds_wave = ds_wave.get(WAVE_PARAM_NAMES) # concatinate atmos and wave into a single dataset combined_product = filtered_ds_atmos.merge( filtered_ds_wave.reindex_like( filtered_ds_atmos, method="nearest" ) ) # transfer to pandas df = combined_product.to_dataframe() # convert longitude values into the standard range of -180 degrees to +180 degrees # TODO: do we want to do it? latitudes = df.index.get_level_values("lat_0") longitudes = df.index.get_level_values("lon_0") map_function = lambda lon: (lon - 360) if (lon > 180) else lon remapped_longitudes = longitudes.map(map_function) df["longitude"] = remapped_longitudes df["latitude"] = latitudes # dump datafrate to csv on disk if forecast_hour == 0: df.to_csv( os.path.join( SAVE_DIR, save_to, ), index=False, ) else: df.to_csv( os.path.join(SAVE_DIR, save_to), index=False, mode="a", header=False, ) # clean up grib files os.remove(ds_wave_file) os.remove(ds_atmos_file) return save_to