You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
302 lines
12 KiB
302 lines
12 KiB
import os
|
|
import requests
|
|
import xarray as xr
|
|
|
|
from fastapi import FastAPI
|
|
from numpy import arctan2, degrees, isnan
|
|
from requests.compat import urljoin, urlparse
|
|
from time import strftime, gmtime, sleep
|
|
from bs4 import BeautifulSoup
|
|
from bs4.element import Tag
|
|
|
|
app = FastAPI()
|
|
|
|
|
|
@app.get("/download/")
|
|
def start_download():
|
|
"""
|
|
Download most recent available models from ncep.noaa.gov
|
|
"""
|
|
soup = BeautifulSoup(
|
|
requests.get("https://www.nco.ncep.noaa.gov/pmb/products/gfs/").text,
|
|
"html.parser",
|
|
)
|
|
base_url = (
|
|
soup.find("font", string="Most commonly used parameters")
|
|
.parent.find_next_sibling("td")
|
|
.find("a", string="Available in GRIB2 via https")["href"]
|
|
)
|
|
prodlist = BeautifulSoup(
|
|
requests.get("https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod").text,
|
|
"html.parser",
|
|
)
|
|
date_url = urljoin(
|
|
base=base_url + "/",
|
|
url=prodlist.find(
|
|
"a", string="".join(["gfs.", strftime("%Y%m%d", gmtime()), "/"])
|
|
)["href"]
|
|
if prodlist.find(
|
|
"a", string="".join(["gfs.", strftime("%Y%m%d", gmtime()), "/"])
|
|
)
|
|
else prodlist.find("a", string="".join(["gfs.", "20220508", "/"]))[
|
|
"href"
|
|
], # TODO[2]: use datetime and fallback to the previous day gracefully
|
|
)
|
|
hour_list = BeautifulSoup(
|
|
requests.get(urljoin(base=base_url + "/", url=date_url)).text, "html.parser"
|
|
)
|
|
latest_hour = max(hour_list.findAll("a"), key=pull_text).text.removesuffix(
|
|
"/"
|
|
) # TODO:this may be unavailable still, fallback to previous hour
|
|
|
|
hour_url = urljoin(
|
|
base=date_url, url=max(hour_list.findAll("a"), key=pull_text)["href"]
|
|
)
|
|
|
|
atmos_url = urljoin(base=hour_url, url="atmos/")
|
|
|
|
prod_hour = "004" # TODO[1]: do it for 000 - 384 forecast hours of product
|
|
gfs_url = urljoin(
|
|
base=atmos_url,
|
|
url="".join(["gfs.t", latest_hour, "z.pgrb2.0p25.f", prod_hour]),
|
|
)
|
|
|
|
if not requests.head(
|
|
urljoin(
|
|
base=atmos_url, url="".join(["gfs.t", latest_hour, "z.pgrb2.0p25.f", "384"])
|
|
)
|
|
):
|
|
raise Exception("unavailable still, fallback to previous hour")
|
|
|
|
print(download_file(gfs_url))
|
|
wave_url = urljoin(base=hour_url, url="wave/gridded/")
|
|
gfswave_url = urljoin(
|
|
base=wave_url,
|
|
url="".join(["gfswave.t", latest_hour, "z.global.0p25.f", prod_hour, ".grib2"]),
|
|
)
|
|
|
|
if not requests.head(
|
|
urljoin(
|
|
base=gfswave_url,
|
|
url="".join(["gfswave.t", latest_hour, "z.global.0p25.f", "384", ".grib2"]),
|
|
)
|
|
):
|
|
raise Exception("unavailable still, fallback to previous hour")
|
|
|
|
print(download_file(gfswave_url))
|
|
|
|
|
|
def pull_text(tag: Tag):
|
|
try:
|
|
return int(tag.text.removesuffix("/"))
|
|
except ValueError:
|
|
return -1
|
|
|
|
|
|
def download_file(url: str, file_path="/tmp/grib/", attempts=2):
|
|
"""Downloads a URL content into a file (with large file support by streaming)
|
|
|
|
:param url: URL to download
|
|
:param file_path: Local file name to contain the data downloaded
|
|
:param attempts: Number of attempts
|
|
:return: New file path. Empty string if the download failed
|
|
"""
|
|
if not file_path:
|
|
file_path = os.path.realpath(os.path.basename(url))
|
|
if os.path.isdir(file_path):
|
|
file_path = os.path.join(file_path, os.path.basename(url))
|
|
# logger.info(f'Downloading {url} content to {file_path}')
|
|
url_sections = urlparse(url)
|
|
if not url_sections.scheme:
|
|
# logger.debug('The given url is missing a scheme. Adding http scheme')
|
|
url = f"http://{url}"
|
|
# logger.debug(f'New url: {url}')
|
|
for attempt in range(1, attempts + 1):
|
|
try:
|
|
if attempt > 1:
|
|
sleep(10) # 10 seconds wait time between downloads
|
|
with requests.get(url, stream=True) as response:
|
|
response.raise_for_status()
|
|
with open(file_path, "wb") as out_file:
|
|
for chunk in response.iter_content(
|
|
chunk_size=1024 * 1024 * 8
|
|
): # 8MB chunks
|
|
out_file.write(chunk)
|
|
# logger.info('Download finished successfully')
|
|
return file_path
|
|
except Exception as ex:
|
|
print(ex)
|
|
# logger.error(f'Attempt #{attempt} failed with error: {ex}')
|
|
return ""
|
|
|
|
|
|
@app.get("/weather_dot/")
|
|
def weather_dot(lat, lon, prediction_time="000"):
|
|
response = {}
|
|
latest_hour = "18" # TODO[3]: global? singleton? make user choose?
|
|
with xr.open_dataset(
|
|
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
|
|
engine="cfgrib",
|
|
filter_by_keys={"typeOfLevel": "heightAboveGround", "level": 10},
|
|
) as ds:
|
|
wind_dot = ds.sel(
|
|
latitude=lat, longitude=lon
|
|
) # TODO[0]: Do we want method="nearest" or exact here
|
|
u_wind = wind_dot.get("u10").item()
|
|
v_wind = wind_dot.get("v10").item()
|
|
response.update(
|
|
u_wind=u_wind,
|
|
v_wind=v_wind,
|
|
# http://colaweb.gmu.edu/dev/clim301/lectures/wind/wind-uv
|
|
wind_speed=(u_wind**2 + v_wind**2) ** 0.5,
|
|
wind_alpha=degrees(arctan2(v_wind, u_wind)),
|
|
)
|
|
|
|
with xr.open_dataset(
|
|
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
|
|
engine="cfgrib",
|
|
filter_by_keys={"typeOfLevel": "heightAboveGround", "level": 2},
|
|
) as ds:
|
|
two_dot = ds.sel(
|
|
latitude=lat, longitude=lon
|
|
) # TODO[0]: Do we want method="nearest" or exact here
|
|
response.update(
|
|
temperature=two_dot.get("t2m").item(),
|
|
dew_point=two_dot.get("d2m").item(),
|
|
humidity=two_dot.get("r2").item(),
|
|
)
|
|
if int(prediction_time) > 0:
|
|
response.update(
|
|
min_temp=two_dot.get("tmin").item(), max_temp=two_dot.get("tmax").item()
|
|
)
|
|
with xr.open_dataset(
|
|
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
|
|
engine="cfgrib",
|
|
filter_by_keys={"typeOfLevel": "meanSea"},
|
|
) as ds:
|
|
sea_dot = ds.sel(
|
|
latitude=lat, longitude=lon
|
|
) # TODO[0]: Do we want method="nearest" or exact here
|
|
response.update(pressure=sea_dot.get("prmsl").item() / 1000) # in GPa
|
|
|
|
# things differ for the first and the latter for the rest
|
|
if int(prediction_time) > 0:
|
|
with xr.open_dataset(
|
|
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
|
|
engine="cfgrib",
|
|
filter_by_keys={"typeOfLevel": "surface", "stepType": "instant"},
|
|
) as ds:
|
|
surface_dot = ds.sel(
|
|
latitude=lat, longitude=lon
|
|
) # TODO[0]: Do we want method="nearest" or exact here
|
|
response.update(
|
|
visibility=surface_dot.get("vis").item() / 1000, # in km
|
|
wind_gust=surface_dot.get("gust").item(),
|
|
frozen_precip=surface_dot.get("cpofp").item(),
|
|
)
|
|
|
|
with xr.open_dataset(
|
|
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
|
|
engine="cfgrib",
|
|
filter_by_keys={"typeOfLevel": "surface", "stepType": "accum"},
|
|
) as ds:
|
|
surface_dot = ds.sel(
|
|
latitude=lat, longitude=lon
|
|
) # TODO[0]: Do we want method="nearest" or exact here
|
|
response.update(total_precip=surface_dot.get("tp").item())
|
|
|
|
with xr.open_dataset(
|
|
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
|
|
engine="cfgrib",
|
|
filter_by_keys={"typeOfLevel": "lowCloudLayer", "stepType": "instant"},
|
|
) as ds:
|
|
low_cloud_dot = ds.sel(
|
|
latitude=lat, longitude=lon
|
|
) # TODO[0]: Do we want method="nearest" or exact here
|
|
response.update(low_clouds=low_cloud_dot.get("lcc").item())
|
|
with xr.open_dataset(
|
|
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
|
|
engine="cfgrib",
|
|
filter_by_keys={"typeOfLevel": "atmosphere", "stepType": "instant"},
|
|
) as ds:
|
|
total_cloud_dot = ds.sel(
|
|
latitude=lat, longitude=lon
|
|
) # TODO[0]: Do we want method="nearest" or exact here
|
|
response.update(total_clouds=total_cloud_dot.get("tcc").item())
|
|
|
|
with xr.open_dataset(
|
|
f"/tmp/grib/gfswave.t{latest_hour}z.global.0p25.f{prediction_time}.grib2",
|
|
engine="cfgrib",
|
|
# filter_by_keys={"stepType": "avg"},
|
|
) as ds:
|
|
wave_dot = ds.sel(
|
|
latitude=lat, longitude=lon
|
|
) # TODO[0]: Do we want method="nearest" or exact here
|
|
if not isnan(wave_dot.get("swh").item()): # check one for swell waves
|
|
response.update(
|
|
wind_and_swell_wave_height=wave_dot.get("swh").item(),
|
|
swell_wave_direction=wave_dot.get("swdir").values.tolist(),
|
|
swell_wave_period=wave_dot.get("swper").values.tolist(),
|
|
)
|
|
if not isnan(wave_dot.get("shww").item()): # and one for wind waves
|
|
response.update(
|
|
wind_wave_height=wave_dot.get("shww").item(),
|
|
wind_wave_direction=wave_dot.get("wvdir").item(),
|
|
wind_wave_period=wave_dot.get("mpww").item(),
|
|
)
|
|
|
|
else:
|
|
with xr.open_dataset(
|
|
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
|
|
engine="cfgrib",
|
|
filter_by_keys={"typeOfLevel": "surface"},
|
|
) as ds:
|
|
surface_dot = ds.sel(
|
|
latitude=lat, longitude=lon
|
|
) # TODO[0]: Do we want method="nearest" or exact here
|
|
response.update(
|
|
visibility=surface_dot.get("vis").item() / 1000, # in km
|
|
wind_gust=surface_dot.get("gust").item(),
|
|
frozen_precip=surface_dot.get("cpofp").item(),
|
|
)
|
|
with xr.open_dataset(
|
|
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
|
|
engine="cfgrib",
|
|
filter_by_keys={"typeOfLevel": "lowCloudLayer"},
|
|
) as ds:
|
|
low_cloud_dot = ds.sel(
|
|
latitude=lat, longitude=lon
|
|
) # TODO[0]: Do we want method="nearest" or exact here
|
|
response.update(low_clouds=low_cloud_dot.get("lcc").item())
|
|
|
|
with xr.open_dataset(
|
|
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
|
|
engine="cfgrib",
|
|
filter_by_keys={"typeOfLevel": "atmosphere"},
|
|
) as ds:
|
|
total_cloud_dot = ds.sel(
|
|
latitude=lat, longitude=lon
|
|
) # TODO[0]: Do we want method="nearest" or exact here
|
|
response.update(total_clouds=total_cloud_dot.get("tcc").item())
|
|
|
|
with xr.open_dataset(
|
|
f"/tmp/grib/gfswave.t{latest_hour}z.global.0p25.f{prediction_time}.grib2",
|
|
engine="cfgrib",
|
|
) as ds:
|
|
wave_dot = ds.sel(
|
|
latitude=lat, longitude=lon
|
|
) # TODO[0]: Do we want method="nearest" or exact here
|
|
if not isnan(
|
|
wave_dot.get("swh").item()
|
|
): # just check one, should be enoughto tell if it is water
|
|
response.update(
|
|
wind_and_swell_wave_height=wave_dot.get("swh").item(),
|
|
wind_wave_height=wave_dot.get("shww").item(),
|
|
wind_wave_direction=wave_dot.get("wvdir").item(),
|
|
swell_wave_direction=wave_dot.get("swdir").values.tolist(),
|
|
swell_wave_period=wave_dot.get("swper").values.tolist(),
|
|
wind_wave_period=wave_dot.get("mpww").item(),
|
|
)
|
|
|
|
return response
|