You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

244 lines
8.0 KiB

import gc
import os
import requests
import wget
import xarray as xr
from datetime import datetime, timedelta
from requests.utils import urlparse
from zoneinfo import ZoneInfo
from fastapi import FastAPI, BackgroundTasks
from fastapi.responses import JSONResponse
from fastapi.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
from fastapi_utils.tasks import repeat_every
from app.settings import *
# App
middleware = [
Middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
),
]
app = FastAPI(title="noaa", middleware=middleware)
# API
# GET 'test'
@app.get("/test/")
async def get_test():
# check source availability
# fresh one might be missing and old ones get deleted, so check yesterday's
yesterday_news = datetime.now(tz=ZoneInfo("US/Eastern")) - timedelta(days=1)
url = form_gfswave_link(target_time=yesterday_news)
if not is_reachable(url): # just one should be fine
print(url, " is not reachable at this time")
# TODO: should we actually error out?
return JSONResponse(content={"status": "success"})
@app.get("/download/")
async def download_grib(background_tasks: BackgroundTasks):
"""Download and process GRIB files into csv of requested parameters"""
background_tasks.add_task(extract_useful_data)
return JSONResponse(content={"status": "Background task started"})
# Tasks
# gc
@app.on_event("startup")
@repeat_every(seconds=(1 * 60))
async def task_gc() -> None:
gc.collect()
def is_reachable(url: str):
"""Check if url is reachable at all with the current setup
:param url: URL to check
:return: True if url is reachable, False otherwise
"""
if requests.head(url):
return True
return False
def form_gfs_link(target_time=None, prod_hour=384):
"""Return well formed link to gfs data which
should be available by given time
:param target_time: time to check, defaults to current time
:param prod_hour: forecast hour to link to, defaults to 384
:returns: URL to gfs file
"""
if not target_time:
target_time = datetime.now(
tz=ZoneInfo("US/Eastern")
) # noaa is located in washington
looking_at = "atmos"
prod_hour = str(prod_hour).zfill(3)
date_str = target_time.strftime("%Y%m%d")
hour_str = str((target_time.hour // 6) * 6).zfill(2)
target_url = f"https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfs.{date_str}/{hour_str}/{looking_at}/gfs.t{hour_str}z.pgrb2.0p25.f{prod_hour}"
return target_url
def form_gfswave_link(target_time=None, prod_hour=384):
"""Return well formed link to gfs data which
should be available by given time
:param target_time: time to check, defaults to current time
:param prod_hour: forecast hour to link to, defaults to 384
:returns: URL to gfs file
"""
if not target_time:
target_time = datetime.now(
tz=ZoneInfo("US/Eastern")
) # noaa is located in washington
looking_at = "wave"
prod_hour = str(prod_hour).zfill(3)
date_str = target_time.strftime("%Y%m%d")
hour_str = str((target_time.hour // 6) * 6).zfill(2)
target_url = f"https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfs.{date_str}/{hour_str}/{looking_at}/gridded/gfs{looking_at}.t{hour_str}z.global.0p25.f{prod_hour}.grib2"
return target_url
def fresh_grib_time(target_time=None):
"""Find most recent available GRIB file
for GFS atmospheric and GFSWave forecasts
:param target_time: no more recent than this time
:returns: most recent time GRIB is available
"""
if not target_time:
target_time = datetime.now(
tz=ZoneInfo("US/Eastern")
) # noaa is located in washington
for fallback in range(4):
# wasting a bit of time here on useless first cycle (mult by 0, subtract 0)
# allows us to kepp code a bit neater later
fallback_hours = fallback * 6
target_time = target_time - timedelta(hours=fallback_hours)
if not is_reachable(form_gfs_link(target_time=target_time, prod_hour=384)):
continue
if not is_reachable(form_gfswave_link(target_time=target_time, prod_hour=384)):
continue
break
else:
raise Exception("No forecasts in the last 24 hours were reachable")
# both forecasts are reachable @ target_time
return target_time
def download_fresh_grib(target_time, prod_hour=0):
"""Download GRIB files for atmos and wave
:param target_time: download GRIB from this time
:param prod_hour: download forecast for this hour
:returns: filenames where GRIB files are downloaded to
"""
gfs_atmos = form_gfs_link(target_time=target_time, prod_hour=prod_hour)
gfs_wave = form_gfswave_link(target_time=target_time, prod_hour=prod_hour)
return (
wget.download(gfs_atmos, out=SAVE_DIR),
wget.download(gfs_wave, out=SAVE_DIR),
)
def extract_useful_data():
"""Download and process GRIB files into csv of requested parameters
:returns: filenames of the resulting csv
"""
target_time = fresh_grib_time()
save_to = f"{target_time.strftime('%Y%m%d')}-{str((target_time.hour // 6) * 6).zfill(2)}.csv"
for forecast_hour in range(MAX_FORECAST_HOUR):
# download gribs
ds_atmos_file, ds_wave_file = download_fresh_grib(
target_time=target_time, prod_hour=forecast_hour
)
# filter atmos to requested variables
with xr.open_dataset(ds_atmos_file, engine="pynio") as ds_atmos:
filtered_ds_atmos = ds_atmos.get(ATMOS_PARAM_NAMES) or ds_atmos.get(
[p for p in ATMOS_PARAM_NAMES if not p == "APCP_P8_L1_GLL0_acc"]
) # skip running total column in the first forecast
for name, param in HEIGHT_PARAM_NAMES.items():
if name == "TMP_P0_L103_GLL0":
level = TEMPERATURE_HEIGHT
else:
level = WIND_HEIGHT
filtered_ds_atmos[name] = (
ds_atmos[name]
.sel({param: level})
.assign_attrs(level=level)
.drop_vars(param)
)
# if hour==0 add running total column from future forecasts
if forecast_hour == 0:
precip = xr.zeros_like(filtered_ds_atmos["GUST_P0_L1_GLL0"])
precip.name = "APCP_P8_L1_GLL0_acc"
filtered_ds_atmos = xr.combine_by_coords(
[filtered_ds_atmos, precip], coords="mimal"
)
# filter wave to requested variables
with xr.open_dataset(ds_wave_file, engine="pynio") as ds_wave:
filtered_ds_wave = ds_wave.get(WAVE_PARAM_NAMES)
# concatinate atmos and wave into a single dataset
combined_product = filtered_ds_atmos.merge(
filtered_ds_wave.reindex_like(filtered_ds_atmos, method="nearest")
)
# transfer to pandas
df = combined_product.to_dataframe()
# convert longitude values into the standard range of -180 degrees to +180 degrees
# TODO: do we want to do it?
latitudes = df.index.get_level_values("lat_0")
longitudes = df.index.get_level_values("lon_0")
map_function = lambda lon: (lon - 360) if (lon > 180) else lon
remapped_longitudes = longitudes.map(map_function)
df["longitude"] = remapped_longitudes
df["latitude"] = latitudes
# dump datafrate to csv on disk
if forecast_hour == 0:
df.to_csv(
os.path.join(
SAVE_DIR,
save_to,
),
index=False,
)
else:
df.to_csv(
os.path.join(SAVE_DIR, save_to),
index=False,
mode="a",
header=False,
)
# clean up grib files
os.remove(ds_wave_file)
os.remove(ds_atmos_file)
return save_to