import os import requests import xarray as xr from fastapi import FastAPI from numpy import arctan2, degrees, isnan from requests.compat import urljoin, urlparse from time import strftime, gmtime, sleep from bs4 import BeautifulSoup from bs4.element import Tag app = FastAPI() @app.get("/download/") def start_download(): """ Download most recent available models from ncep.noaa.gov """ soup = BeautifulSoup( requests.get("https://www.nco.ncep.noaa.gov/pmb/products/gfs/").text, "html.parser", ) base_url = ( soup.find("font", string="Most commonly used parameters") .parent.find_next_sibling("td") .find("a", string="Available in GRIB2 via https")["href"] ) prodlist = BeautifulSoup( requests.get("https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod").text, "html.parser", ) date_url = urljoin( base=base_url + "/", url=prodlist.find( "a", string="".join(["gfs.", strftime("%Y%m%d", gmtime()), "/"]) )["href"] if prodlist.find( "a", string="".join(["gfs.", strftime("%Y%m%d", gmtime()), "/"]) ) else prodlist.find("a", string="".join(["gfs.", "20220508", "/"]))[ "href" ], # TODO[2]: use datetime and fallback to the previous day gracefully ) hour_list = BeautifulSoup( requests.get(urljoin(base=base_url + "/", url=date_url)).text, "html.parser" ) latest_hour = max(hour_list.findAll("a"), key=pull_text).text.removesuffix( "/" ) # TODO:this may be unavailable still, fallback to previous hour hour_url = urljoin( base=date_url, url=max(hour_list.findAll("a"), key=pull_text)["href"] ) atmos_url = urljoin(base=hour_url, url="atmos/") prod_hour = "004" # TODO[1]: do it for 000 - 384 forecast hours of product gfs_url = urljoin( base=atmos_url, url="".join(["gfs.t", latest_hour, "z.pgrb2.0p25.f", prod_hour]), ) if not requests.head( urljoin( base=atmos_url, url="".join(["gfs.t", latest_hour, "z.pgrb2.0p25.f", "384"]) ) ): raise Exception("unavailable still, fallback to previous hour") print(download_file(gfs_url)) wave_url = urljoin(base=hour_url, url="wave/gridded/") gfswave_url = urljoin( base=wave_url, url="".join(["gfswave.t", latest_hour, "z.global.0p25.f", prod_hour, ".grib2"]), ) if not requests.head( urljoin( base=gfswave_url, url="".join(["gfswave.t", latest_hour, "z.global.0p25.f", "384", ".grib2"]), ) ): raise Exception("unavailable still, fallback to previous hour") print(download_file(gfswave_url)) def pull_text(tag: Tag): try: return int(tag.text.removesuffix("/")) except ValueError: return -1 def download_file(url: str, file_path="/tmp/grib/", attempts=2): """Downloads a URL content into a file (with large file support by streaming) :param url: URL to download :param file_path: Local file name to contain the data downloaded :param attempts: Number of attempts :return: New file path. Empty string if the download failed """ if not file_path: file_path = os.path.realpath(os.path.basename(url)) if os.path.isdir(file_path): file_path = os.path.join(file_path, os.path.basename(url)) # logger.info(f'Downloading {url} content to {file_path}') url_sections = urlparse(url) if not url_sections.scheme: # logger.debug('The given url is missing a scheme. Adding http scheme') url = f"http://{url}" # logger.debug(f'New url: {url}') for attempt in range(1, attempts + 1): try: if attempt > 1: sleep(10) # 10 seconds wait time between downloads with requests.get(url, stream=True) as response: response.raise_for_status() with open(file_path, "wb") as out_file: for chunk in response.iter_content( chunk_size=1024 * 1024 * 8 ): # 8MB chunks out_file.write(chunk) # logger.info('Download finished successfully') return file_path except Exception as ex: print(ex) # logger.error(f'Attempt #{attempt} failed with error: {ex}') return "" @app.get("/weather_dot/") def weather_dot(lat, lon, prediction_time="000"): response = {} latest_hour = "18" # TODO[3]: global? singleton? make user choose? with xr.open_dataset( f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}", engine="cfgrib", filter_by_keys={"typeOfLevel": "heightAboveGround", "level": 10}, ) as ds: wind_dot = ds.sel( latitude=lat, longitude=lon ) # TODO[0]: Do we want method="nearest" or exact here u_wind = wind_dot.get("u10").item() v_wind = wind_dot.get("v10").item() response.update( u_wind=u_wind, v_wind=v_wind, # http://colaweb.gmu.edu/dev/clim301/lectures/wind/wind-uv wind_speed=(u_wind**2 + v_wind**2) ** 0.5, wind_alpha=degrees(arctan2(v_wind, u_wind)), ) with xr.open_dataset( f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}", engine="cfgrib", filter_by_keys={"typeOfLevel": "heightAboveGround", "level": 2}, ) as ds: two_dot = ds.sel( latitude=lat, longitude=lon ) # TODO[0]: Do we want method="nearest" or exact here response.update( temperature=two_dot.get("t2m").item(), dew_point=two_dot.get("d2m").item(), humidity=two_dot.get("r2").item(), ) if int(prediction_time) > 0: response.update( min_temp=two_dot.get("tmin").item(), max_temp=two_dot.get("tmax").item() ) with xr.open_dataset( f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}", engine="cfgrib", filter_by_keys={"typeOfLevel": "meanSea"}, ) as ds: sea_dot = ds.sel( latitude=lat, longitude=lon ) # TODO[0]: Do we want method="nearest" or exact here response.update(pressure=sea_dot.get("prmsl").item() / 1000) # in GPa # things differ for the first and the latter for the rest if int(prediction_time) > 0: with xr.open_dataset( f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}", engine="cfgrib", filter_by_keys={"typeOfLevel": "surface", "stepType": "instant"}, ) as ds: surface_dot = ds.sel( latitude=lat, longitude=lon ) # TODO[0]: Do we want method="nearest" or exact here response.update( visibility=surface_dot.get("vis").item() / 1000, # in km wind_gust=surface_dot.get("gust").item(), frozen_precip=surface_dot.get("cpofp").item(), ) with xr.open_dataset( f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}", engine="cfgrib", filter_by_keys={"typeOfLevel": "surface", "stepType": "accum"}, ) as ds: surface_dot = ds.sel( latitude=lat, longitude=lon ) # TODO[0]: Do we want method="nearest" or exact here response.update(total_precip=surface_dot.get("tp").item()) with xr.open_dataset( f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}", engine="cfgrib", filter_by_keys={"typeOfLevel": "lowCloudLayer", "stepType": "instant"}, ) as ds: low_cloud_dot = ds.sel( latitude=lat, longitude=lon ) # TODO[0]: Do we want method="nearest" or exact here response.update(low_clouds=low_cloud_dot.get("lcc").item()) with xr.open_dataset( f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}", engine="cfgrib", filter_by_keys={"typeOfLevel": "atmosphere", "stepType": "instant"}, ) as ds: total_cloud_dot = ds.sel( latitude=lat, longitude=lon ) # TODO[0]: Do we want method="nearest" or exact here response.update(total_clouds=total_cloud_dot.get("tcc").item()) with xr.open_dataset( f"/tmp/grib/gfswave.t{latest_hour}z.global.0p25.f{prediction_time}.grib2", engine="cfgrib", # filter_by_keys={"stepType": "avg"}, ) as ds: wave_dot = ds.sel( latitude=lat, longitude=lon ) # TODO[0]: Do we want method="nearest" or exact here if not isnan(wave_dot.get("swh").item()): # check one for swell waves response.update( wind_and_swell_wave_height=wave_dot.get("swh").item(), swell_wave_direction=wave_dot.get("swdir").values.tolist(), swell_wave_period=wave_dot.get("swper").values.tolist(), ) if not isnan(wave_dot.get("shww").item()): # and one for wind waves response.update( wind_wave_height=wave_dot.get("shww").item(), wind_wave_direction=wave_dot.get("wvdir").item(), wind_wave_period=wave_dot.get("mpww").item(), ) else: with xr.open_dataset( f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}", engine="cfgrib", filter_by_keys={"typeOfLevel": "surface"}, ) as ds: surface_dot = ds.sel( latitude=lat, longitude=lon ) # TODO[0]: Do we want method="nearest" or exact here response.update( visibility=surface_dot.get("vis").item() / 1000, # in km wind_gust=surface_dot.get("gust").item(), frozen_precip=surface_dot.get("cpofp").item(), ) with xr.open_dataset( f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}", engine="cfgrib", filter_by_keys={"typeOfLevel": "lowCloudLayer"}, ) as ds: low_cloud_dot = ds.sel( latitude=lat, longitude=lon ) # TODO[0]: Do we want method="nearest" or exact here response.update(low_clouds=low_cloud_dot.get("lcc").item()) with xr.open_dataset( f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}", engine="cfgrib", filter_by_keys={"typeOfLevel": "atmosphere"}, ) as ds: total_cloud_dot = ds.sel( latitude=lat, longitude=lon ) # TODO[0]: Do we want method="nearest" or exact here response.update(total_clouds=total_cloud_dot.get("tcc").item()) with xr.open_dataset( f"/tmp/grib/gfswave.t{latest_hour}z.global.0p25.f{prediction_time}.grib2", engine="cfgrib", ) as ds: wave_dot = ds.sel( latitude=lat, longitude=lon ) # TODO[0]: Do we want method="nearest" or exact here if not isnan( wave_dot.get("swh").item() ): # just check one, should be enoughto tell if it is water response.update( wind_and_swell_wave_height=wave_dot.get("swh").item(), wind_wave_height=wave_dot.get("shww").item(), wind_wave_direction=wave_dot.get("wvdir").item(), swell_wave_direction=wave_dot.get("swdir").values.tolist(), swell_wave_period=wave_dot.get("swper").values.tolist(), wind_wave_period=wave_dot.get("mpww").item(), ) return response