import gc import os import requests import wget import xarray as xr from datetime import datetime, timedelta from requests.utils import urlparse from zoneinfo import ZoneInfo from fastapi import FastAPI, BackgroundTasks from fastapi.responses import JSONResponse from fastapi.middleware import Middleware from starlette.middleware.cors import CORSMiddleware from fastapi_utils.tasks import repeat_every from app.settings import * from services.grib import Grib service_grib = Grib() # App middleware = [ Middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ), ] app = FastAPI(title="noaa", middleware=middleware) # API # GET 'test' @app.get("/test/") async def get_test(): # check source availability # fresh one might be missing and old ones get deleted, so check yesterday's yesterday_news = datetime.now(tz=ZoneInfo("US/Eastern")) - timedelta(days=1) url = service_grib.form_gfswave_link(target_time=yesterday_news) if not service_grib.is_reachable(url): # just one should be fine print(url, " is not reachable at this time") # TODO: should we actually error out? return JSONResponse(content={"status": "success"}) @app.get("/download/") async def download_grib(background_tasks: BackgroundTasks): """Download and process GRIB files into csv of requested parameters""" background_tasks.add_task(extract_useful_data) return JSONResponse(content={"status": "Background task started"}) # Tasks # gc @app.on_event("startup") @repeat_every(seconds=(1 * 60)) async def task_gc() -> None: gc.collect() def fresh_grib_time(target_time=None): """Find most recent available GRIB file for GFS atmospheric and GFSWave forecasts :param target_time: no more recent than this time :returns: most recent time GRIB is available """ if not target_time: target_time = datetime.now( tz=ZoneInfo("US/Eastern") ) # noaa is located in washington for fallback in range(4): # wasting a bit of time here on useless first cycle (mult by 0, subtract 0) # allows us to kepp code a bit neater later fallback_hours = fallback * 6 target_time = target_time - timedelta(hours=fallback_hours) if not service_grib.is_reachable( service_grib.form_gfs_link(target_time=target_time, prod_hour=384) ): continue if not service_grib.is_reachable( service_grib.form_gfswave_link(target_time=target_time, prod_hour=384) ): continue break else: raise Exception("No forecasts in the last 24 hours were reachable") # both forecasts are reachable @ target_time return target_time def download_fresh_grib(target_time, prod_hour=0): """Download GRIB files for atmos and wave :param target_time: download GRIB from this time :param prod_hour: download forecast for this hour :returns: filenames where GRIB files are downloaded to """ gfs_atmos = service_grib.form_gfs_link(target_time=target_time, prod_hour=prod_hour) gfs_wave = service_grib.form_gfswave_link( target_time=target_time, prod_hour=prod_hour ) return ( wget.download(gfs_atmos, out=SAVE_DIR), wget.download(gfs_wave, out=SAVE_DIR), ) def extract_useful_data(): """Download and process GRIB files into csv of requested parameters :returns: filenames of the resulting csv """ target_time = fresh_grib_time() save_to = f"{target_time.strftime('%Y%m%d')}-{str((target_time.hour // 6) * 6).zfill(2)}.csv" for forecast_hour in range(MAX_FORECAST_HOUR): # download gribs ds_atmos_file, ds_wave_file = download_fresh_grib( target_time=target_time, prod_hour=forecast_hour ) # filter atmos to requested variables with xr.open_dataset(ds_atmos_file, engine="pynio") as ds_atmos: filtered_ds_atmos = ds_atmos.get(ATMOS_PARAM_NAMES) or ds_atmos.get( [p for p in ATMOS_PARAM_NAMES if not p == "APCP_P8_L1_GLL0_acc"] ) # skip running total column in the first forecast for name, param in HEIGHT_PARAM_NAMES.items(): if name == "TMP_P0_L103_GLL0": level = TEMPERATURE_HEIGHT else: level = WIND_HEIGHT filtered_ds_atmos[name] = ( ds_atmos[name] .sel({param: level}) .assign_attrs(level=level) .drop_vars(param) ) # if hour==0 add running total column from future forecasts if forecast_hour == 0: precip = xr.zeros_like(filtered_ds_atmos["GUST_P0_L1_GLL0"]) precip.name = "APCP_P8_L1_GLL0_acc" filtered_ds_atmos = xr.combine_by_coords( [filtered_ds_atmos, precip], coords="mimal" ) # filter wave to requested variables with xr.open_dataset(ds_wave_file, engine="pynio") as ds_wave: filtered_ds_wave = ds_wave.get(WAVE_PARAM_NAMES) # concatinate atmos and wave into a single dataset combined_product = filtered_ds_atmos.merge( filtered_ds_wave.reindex_like(filtered_ds_atmos, method="nearest") ) # transfer to pandas df = combined_product.to_dataframe() # convert longitude values into the standard range of -180 degrees to +180 degrees # TODO: do we want to do it? latitudes = df.index.get_level_values("lat_0") longitudes = df.index.get_level_values("lon_0") map_function = lambda lon: (lon - 360) if (lon > 180) else lon remapped_longitudes = longitudes.map(map_function) df["longitude"] = remapped_longitudes df["latitude"] = latitudes # dump datafrate to csv on disk if forecast_hour == 0: df.to_csv( os.path.join( SAVE_DIR, save_to, ), index=False, ) else: df.to_csv( os.path.join(SAVE_DIR, save_to), index=False, mode="a", header=False, ) # clean up grib files os.remove(ds_wave_file) os.remove(ds_atmos_file) return save_to