You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

223 lines
7.9 KiB

import gc
import os
import requests
import wget
import xarray as xr
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from app.settings import *
class Grib:
@staticmethod
def is_reachable(url: str):
"""Check if url is reachable at all with the current setup
:param url: URL to check
:return: True if url is reachable, False otherwise
"""
if requests.head(url):
return True
return False
@staticmethod
def form_gfs_link(target_time=None, prod_hour=384):
"""Return well formed link to gfs data which
should be available by given time
:param target_time: time to check, defaults to current time
:param prod_hour: forecast hour to link to, defaults to 384
:returns: URL to gfs file
"""
if not target_time:
target_time = datetime.now(
tz=ZoneInfo("US/Eastern")
) # noaa is located in washington
looking_at = "atmos"
prod_hour = str(prod_hour).zfill(3)
date_str = target_time.strftime("%Y%m%d")
hour_str = str((target_time.hour // 6) * 6).zfill(2)
target_url = (
GFS_BASE_URL
+ f".{date_str}/{hour_str}/{looking_at}/gfs.t{hour_str}z.pgrb2.0p25.f{prod_hour}"
)
return target_url
@staticmethod
def form_gfswave_link(target_time=None, prod_hour=384):
"""Return well formed link to gfs data which
should be available by given time
:param target_time: time to check, defaults to current time
:param prod_hour: forecast hour to link to, defaults to 384
:returns: URL to gfs file
"""
if not target_time:
target_time = datetime.now(
tz=ZoneInfo("US/Eastern")
) # noaa is located in washington
looking_at = "wave"
prod_hour = str(prod_hour).zfill(3)
date_str = target_time.strftime("%Y%m%d")
hour_str = str((target_time.hour // 6) * 6).zfill(2)
target_url = (
GFS_BASE_URL
+ f".{date_str}/{hour_str}/{looking_at}/gridded/gfs{looking_at}.t{hour_str}z.global.0p25.f{prod_hour}.grib2"
)
return target_url
@staticmethod
def fresh_grib_time(target_time=None):
"""Find most recent available GRIB file
for GFS atmospheric and GFSWave forecasts
:param target_time: no more recent than this time
:returns: most recent time GRIB is available
"""
if not target_time:
target_time = datetime.now(
tz=ZoneInfo("US/Eastern")
) # noaa is located in washington
for fallback in range(4):
# wasting a bit of time here on useless first cycle (mult by 0, subtract 0)
# allows us to kepp code a bit neater later
fallback_hours = fallback * 6
target_time = target_time - timedelta(hours=fallback_hours)
if not Grib.is_reachable(
Grib.form_gfs_link(target_time=target_time, prod_hour=384)
):
continue
if not Grib.is_reachable(
Grib.form_gfswave_link(target_time=target_time, prod_hour=384)
):
continue
break
else:
raise Exception("No forecasts in the last 24 hours were reachable")
# both forecasts are reachable @ target_time
return target_time
@staticmethod
def download_fresh_grib(target_time, prod_hour=0):
"""Download GRIB files for atmos and wave
:param target_time: download GRIB from this time
:param prod_hour: download forecast for this hour
:returns: filenames where GRIB files are downloaded to
"""
gfs_atmos = Grib.form_gfs_link(target_time=target_time, prod_hour=prod_hour)
gfs_wave = Grib.form_gfswave_link(target_time=target_time, prod_hour=prod_hour)
return (
wget.download(gfs_atmos, out=SAVE_DIR),
wget.download(gfs_wave, out=SAVE_DIR),
)
@staticmethod
def grib_to_dataframe(ds_atmos_file, ds_wave_file, forecast_hour):
"""Work with open GRIB file, extracting data into pandas dataframe"""
with xr.open_dataset(ds_atmos_file, engine="pynio") as ds_atmos:
filtered_ds_atmos = ds_atmos.get(ATMOS_PARAM_NAMES) or ds_atmos.get(
[p for p in ATMOS_PARAM_NAMES if not p == "APCP_P8_L1_GLL0_acc"]
) # skip running total column in the first forecast
for name, param in HEIGHT_PARAM_NAMES.items():
if name == "TMP_P0_L103_GLL0":
level = TEMPERATURE_HEIGHT
else:
level = WIND_HEIGHT
filtered_ds_atmos[name] = (
ds_atmos[name]
.sel({param: level})
.assign_attrs(level=level)
.drop_vars(param)
)
# if hour==0 add running total column from future forecasts
if forecast_hour == 0:
precip = xr.zeros_like(filtered_ds_atmos["GUST_P0_L1_GLL0"])
precip.name = "APCP_P8_L1_GLL0_acc"
filtered_ds_atmos = xr.combine_by_coords(
[filtered_ds_atmos, precip], coords="mimal"
)
# filter wave to requested variables
with xr.open_dataset(ds_wave_file, engine="pynio") as ds_wave:
filtered_ds_wave = ds_wave.get(WAVE_PARAM_NAMES)
# concatinate atmos and wave into a single dataset
combined_product = filtered_ds_atmos.merge(
filtered_ds_wave.reindex_like(filtered_ds_atmos, method="nearest")
)
# transfer to pandas
df = combined_product.to_dataframe()
# convert longitude values into the standard range of -180 degrees to +180 degrees
# TODO: do we want to do it?
latitudes = df.index.get_level_values("lat_0")
longitudes = df.index.get_level_values("lon_0")
map_function = lambda lon: (lon - 360) if (lon > 180) else lon
remapped_longitudes = longitudes.map(map_function)
df["longitude"] = remapped_longitudes
df["latitude"] = latitudes
return df
@staticmethod
def dump_df_to_csv(df, forecast_hour, save_to):
"""Dump pandas dataframe to CSV on disk"""
if forecast_hour == 0:
df.to_csv(
os.path.join(
SAVE_DIR,
save_to,
),
index=False,
)
else:
df.to_csv(
os.path.join(SAVE_DIR, save_to),
index=False,
mode="a",
header=False,
)
@staticmethod
def extract_useful_data(target_time: str = None):
"""Download and process GRIB files into csv of requested parameters
:param target_time: well formed ISO 8601 time string, we will try to download GRIB available just before it
:returns: filenames of the resulting csv
"""
if not target_time:
target_time = Grib.fresh_grib_time()
else:
target_time = Grib.fresh_grib_time(datetime.fromisoformat(target_time))
save_to = f"{target_time.strftime('%Y%m%d')}-{str((target_time.hour // 6) * 6).zfill(2)}.csv"
for forecast_hour in range(MAX_FORECAST_HOUR):
# download gribs
ds_atmos_file, ds_wave_file = Grib.download_fresh_grib(
target_time=target_time, prod_hour=forecast_hour
)
# filter atmos to requested variables
df = Grib.grib_to_dataframe(ds_atmos_file, ds_wave_file, forecast_hour)
# dump datafrate to csv on disk
Grib.dump_df_to_csv(df, forecast_hour, save_to)
# clean up grib files
os.remove(ds_wave_file)
os.remove(ds_atmos_file)
return save_to