You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

302 lines
12 KiB

import os
import requests
import xarray as xr
from fastapi import FastAPI
from numpy import arctan2, degrees, isnan
from requests.compat import urljoin, urlparse
from time import strftime, gmtime, sleep
from bs4 import BeautifulSoup
from bs4.element import Tag
app = FastAPI()
@app.get("/download/")
def start_download():
"""
Download most recent available models from ncep.noaa.gov
"""
soup = BeautifulSoup(
requests.get("https://www.nco.ncep.noaa.gov/pmb/products/gfs/").text,
"html.parser",
)
base_url = (
soup.find("font", string="Most commonly used parameters")
.parent.find_next_sibling("td")
.find("a", string="Available in GRIB2 via https")["href"]
)
prodlist = BeautifulSoup(
requests.get("https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod").text,
"html.parser",
)
date_url = urljoin(
base=base_url + "/",
url=prodlist.find(
"a", string="".join(["gfs.", strftime("%Y%m%d", gmtime()), "/"])
)["href"]
if prodlist.find(
"a", string="".join(["gfs.", strftime("%Y%m%d", gmtime()), "/"])
)
else prodlist.find("a", string="".join(["gfs.", "20220508", "/"]))[
"href"
], # TODO[2]: use datetime and fallback to the previous day gracefully
)
hour_list = BeautifulSoup(
requests.get(urljoin(base=base_url + "/", url=date_url)).text, "html.parser"
)
latest_hour = max(hour_list.findAll("a"), key=pull_text).text.removesuffix(
"/"
) # TODO:this may be unavailable still, fallback to previous hour
hour_url = urljoin(
base=date_url, url=max(hour_list.findAll("a"), key=pull_text)["href"]
)
atmos_url = urljoin(base=hour_url, url="atmos/")
prod_hour = "004" # TODO[1]: do it for 000 - 384 forecast hours of product
gfs_url = urljoin(
base=atmos_url,
url="".join(["gfs.t", latest_hour, "z.pgrb2.0p25.f", prod_hour]),
)
if not requests.head(
urljoin(
base=atmos_url, url="".join(["gfs.t", latest_hour, "z.pgrb2.0p25.f", "384"])
)
):
raise Exception("unavailable still, fallback to previous hour")
# print(download_file(gfs_url))
wave_url = urljoin(base=hour_url, url="wave/gridded/")
gfswave_url = urljoin(
base=wave_url,
url="".join(["gfswave.t", latest_hour, "z.global.0p25.f", prod_hour, ".grib2"]),
)
if not requests.head(
urljoin(
base=gfswave_url,
url="".join(["gfswave.t", latest_hour, "z.global.0p25.f", "384", ".grib2"]),
)
):
raise Exception("unavailable still, fallback to previous hour")
print(download_file(gfswave_url))
def pull_text(tag: Tag):
try:
return int(tag.text.removesuffix("/"))
except ValueError:
return -1
def download_file(url: str, file_path="/tmp/grib/", attempts=2):
"""Downloads a URL content into a file (with large file support by streaming)
:param url: URL to download
:param file_path: Local file name to contain the data downloaded
:param attempts: Number of attempts
:return: New file path. Empty string if the download failed
"""
if not file_path:
file_path = os.path.realpath(os.path.basename(url))
if os.path.isdir(file_path):
file_path = os.path.join(file_path, os.path.basename(url))
# logger.info(f'Downloading {url} content to {file_path}')
url_sections = urlparse(url)
if not url_sections.scheme:
# logger.debug('The given url is missing a scheme. Adding http scheme')
url = f"http://{url}"
# logger.debug(f'New url: {url}')
for attempt in range(1, attempts + 1):
try:
if attempt > 1:
sleep(10) # 10 seconds wait time between downloads
with requests.get(url, stream=True) as response:
response.raise_for_status()
with open(file_path, "wb") as out_file:
for chunk in response.iter_content(
chunk_size=1024 * 1024 * 8
): # 8MB chunks
out_file.write(chunk)
# logger.info('Download finished successfully')
return file_path
except Exception as ex:
print(ex)
# logger.error(f'Attempt #{attempt} failed with error: {ex}')
return ""
@app.get("/weather_dot/")
def weather_dot(lat, lon, prediction_time="000"):
response = {}
latest_hour = "18" # TODO[3]: global? singleton? make user choose?
with xr.open_dataset(
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
engine="cfgrib",
filter_by_keys={"typeOfLevel": "heightAboveGround", "level": 10},
) as ds:
wind_dot = ds.sel(
latitude=lat, longitude=lon
) # TODO[0]: Do we want method="nearest" or exact here
u_wind = wind_dot.get("u10").item()
v_wind = wind_dot.get("v10").item()
response.update(
u_wind=u_wind,
v_wind=v_wind,
# http://colaweb.gmu.edu/dev/clim301/lectures/wind/wind-uv
wind_speed=(u_wind**2 + v_wind**2) ** 0.5,
wind_alpha=degrees(arctan2(v_wind, u_wind)),
)
with xr.open_dataset(
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
engine="cfgrib",
filter_by_keys={"typeOfLevel": "heightAboveGround", "level": 2},
) as ds:
two_dot = ds.sel(
latitude=lat, longitude=lon
) # TODO[0]: Do we want method="nearest" or exact here
response.update(
temperature=two_dot.get("t2m").item(),
dew_point=two_dot.get("d2m").item(),
humidity=two_dot.get("r2").item(),
)
if int(prediction_time) > 0:
response.update(
min_temp=two_dot.get("tmin").item(), max_temp=two_dot.get("tmax").item()
)
with xr.open_dataset(
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
engine="cfgrib",
filter_by_keys={"typeOfLevel": "meanSea"},
) as ds:
sea_dot = ds.sel(
latitude=lat, longitude=lon
) # TODO[0]: Do we want method="nearest" or exact here
response.update(pressure=sea_dot.get("prmsl").item() / 1000) # in GPa
# things differ for the first and the latter for the rest
if int(prediction_time) > 0:
with xr.open_dataset(
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
engine="cfgrib",
filter_by_keys={"typeOfLevel": "surface", "stepType": "instant"},
) as ds:
surface_dot = ds.sel(
latitude=lat, longitude=lon
) # TODO[0]: Do we want method="nearest" or exact here
response.update(
visibility=surface_dot.get("vis").item() / 1000, # in km
wind_gust=surface_dot.get("gust").item(),
frozen_precip=surface_dot.get("cpofp").item(),
)
with xr.open_dataset(
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
engine="cfgrib",
filter_by_keys={"typeOfLevel": "surface", "stepType": "accum"},
) as ds:
surface_dot = ds.sel(
latitude=lat, longitude=lon
) # TODO[0]: Do we want method="nearest" or exact here
response.update(total_precip=surface_dot.get("tp").item())
with xr.open_dataset(
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
engine="cfgrib",
filter_by_keys={"typeOfLevel": "lowCloudLayer", "stepType": "instant"},
) as ds:
low_cloud_dot = ds.sel(
latitude=lat, longitude=lon
) # TODO[0]: Do we want method="nearest" or exact here
response.update(low_clouds=low_cloud_dot.get("lcc").item())
with xr.open_dataset(
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
engine="cfgrib",
filter_by_keys={"typeOfLevel": "atmosphere", "stepType": "instant"},
) as ds:
total_cloud_dot = ds.sel(
latitude=lat, longitude=lon
) # TODO[0]: Do we want method="nearest" or exact here
response.update(total_clouds=total_cloud_dot.get("tcc").item())
with xr.open_dataset(
f"/tmp/grib/gfswave.t{latest_hour}z.global.0p25.f{prediction_time}.grib2",
engine="cfgrib",
# filter_by_keys={"stepType": "avg"},
) as ds:
wave_dot = ds.sel(
latitude=lat, longitude=lon
) # TODO[0]: Do we want method="nearest" or exact here
if not isnan(wave_dot.get("swh").item()): # check one for swell waves
response.update(
wind_and_swell_wave_height=wave_dot.get("swh").item(),
swell_wave_direction=wave_dot.get("swdir").values.tolist(),
swell_wave_period=wave_dot.get("swper").values.tolist(),
)
if not isnan(wave_dot.get("shww").item()): # and one for wind waves
response.update(
wind_wave_height=wave_dot.get("shww").item(),
wind_wave_direction=wave_dot.get("wvdir").item(),
wind_wave_period=wave_dot.get("mpww").item(),
)
else:
with xr.open_dataset(
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
engine="cfgrib",
filter_by_keys={"typeOfLevel": "surface"},
) as ds:
surface_dot = ds.sel(
latitude=lat, longitude=lon
) # TODO[0]: Do we want method="nearest" or exact here
response.update(
visibility=surface_dot.get("vis").item() / 1000, # in km
wind_gust=surface_dot.get("gust").item(),
frozen_precip=surface_dot.get("cpofp").item(),
)
with xr.open_dataset(
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
engine="cfgrib",
filter_by_keys={"typeOfLevel": "lowCloudLayer"},
) as ds:
low_cloud_dot = ds.sel(
latitude=lat, longitude=lon
) # TODO[0]: Do we want method="nearest" or exact here
response.update(low_clouds=low_cloud_dot.get("lcc").item())
with xr.open_dataset(
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
engine="cfgrib",
filter_by_keys={"typeOfLevel": "atmosphere"},
) as ds:
total_cloud_dot = ds.sel(
latitude=lat, longitude=lon
) # TODO[0]: Do we want method="nearest" or exact here
response.update(total_clouds=total_cloud_dot.get("tcc").item())
with xr.open_dataset(
f"/tmp/grib/gfswave.t{latest_hour}z.global.0p25.f{prediction_time}.grib2",
engine="cfgrib",
) as ds:
wave_dot = ds.sel(
latitude=lat, longitude=lon
) # TODO[0]: Do we want method="nearest" or exact here
if not isnan(
wave_dot.get("swh").item()
): # just check one, should be enoughto tell if it is water
response.update(
wind_and_swell_wave_height=wave_dot.get("swh").item(),
wind_wave_height=wave_dot.get("shww").item(),
wind_wave_direction=wave_dot.get("wvdir").item(),
swell_wave_direction=wave_dot.get("swdir").values.tolist(),
swell_wave_period=wave_dot.get("swper").values.tolist(),
wind_wave_period=wave_dot.get("mpww").item(),
)
return response