From 2abf210d51009ee743beffb344b5629bd1e816e6 Mon Sep 17 00:00:00 2001 From: g Date: Tue, 17 May 2022 06:57:39 +0300 Subject: [PATCH] ref: total --- Dockerfile | 23 +++ Makefile | 48 ++++++ README.md | 43 ++--- __init__.py => app/__init__.py | 0 app/main.py | 111 ++++++++++++ app/settings.py | 12 ++ entrypoint.sh | 17 ++ environment.yml | 14 ++ main.py | 301 --------------------------------- requirements.txt | 26 --- utils/tools.py | 13 ++ 11 files changed, 255 insertions(+), 353 deletions(-) create mode 100755 Dockerfile create mode 100755 Makefile rename __init__.py => app/__init__.py (100%) mode change 100644 => 100755 create mode 100755 app/main.py create mode 100755 app/settings.py create mode 100644 entrypoint.sh create mode 100644 environment.yml delete mode 100644 main.py delete mode 100644 requirements.txt create mode 100755 utils/tools.py diff --git a/Dockerfile b/Dockerfile new file mode 100755 index 0000000..94ef343 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,23 @@ +FROM continuumio/miniconda3 + +WORKDIR /app + +# Create the environment: +COPY environment.yml . +RUN conda env create --file environment.yml + +# Make RUN commands use the new environment: +RUN echo "conda activate myenv" >> ~/.bashrc +SHELL ["/bin/bash", "--login", "-c"] + +# Demonstrate the environment is activated: +RUN echo "Make sure fastapi is installed:" +RUN python -c "import fastapi" + +# The code to run when container is started: +COPY . /app + +EXPOSE 80 +ENTRYPOINT ["./entrypoint.sh"] + +CMD ["gunicorn", "app.main:app", "-w", "4", "-b", "0.0.0.0:80", "-k", "uvicorn.workers.UvicornWorker", "--timeout", "0", "--graceful-timeout", "0", "--keep-alive", "300"] \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100755 index 0000000..2228893 --- /dev/null +++ b/Makefile @@ -0,0 +1,48 @@ +################################################## +# Init # +################################################## + +SHELL = /bin/sh + +ROOT_DIR:=$(shell dirname $(realpath $(firstword $(MAKEFILE_LIST)))) + + +################################################## +# Docker # +################################################## + +build: + docker build -t noaa_grib:latest . + +run: + docker run -p 1000:80 --name noaa_grib noaa_grib:latest + +up: + docker build -t noaa_grib:latest . && docker run -p 1000:80 --name noaa_grib noaa_grib:latest + +down: + docker stop noaa_grib && docker rm noaa_grib + + +################################################## +# Requirements (for local development) # +################################################## + +req: + conda env create --file environment.yml + + +################################################## +# Black # +################################################## + +black: + black $(ROOT_DIR)/* + + +################################################## +# File system # +################################################## + +chmod: + sudo chmod 777 -R $(ROOT_DIR)/* diff --git a/README.md b/README.md index ec2a45b..5488685 100644 --- a/README.md +++ b/README.md @@ -1,31 +1,22 @@ -Prototype. -- linux only (uses `/tmp/`) +# noaa grib downloader -Download: -- downloads either today or 2022-05-08 if today is unavailable -- crashes if most recent hour does not have files yet (it is chehcked but fallback is not implemented) -- prediction_time is hardcoded to 4 +#### Docker (```Dockerfile```): +- build: + ```make build``` +- run: + ```make run``` +- up (build and run): + ```make up``` +- down (stop and remove volumes): + ```make stop``` +- base URL: http://localhost:1000 -Data: -- returns data PER DOT (and in 20 seconds) -- opens the file like 10 times -- latest hour is hardcoded to 18 -# Requirements -Requires `eccodes` to be installed -https://confluence.ecmwf.int/display/ECC/ecCodes+Home -``` -pip install -r requirements.txt -python -m eccodes selfcheck -``` +Install requirements locally: +```make req``` +Reformat code (Black): +```make black``` -# Usage -``` -uvicorn main:app --reload -``` - -``` -curl http://127.0.0.1:8000/download/ -curl 'http://127.0.0.1:8000/weather_dot/?lat=75&lon=0&prediction_time=004' -``` +Change permissions to mounted volumes (if necessary): +```make chmod``` diff --git a/__init__.py b/app/__init__.py old mode 100644 new mode 100755 similarity index 100% rename from __init__.py rename to app/__init__.py diff --git a/app/main.py b/app/main.py new file mode 100755 index 0000000..d3b2060 --- /dev/null +++ b/app/main.py @@ -0,0 +1,111 @@ +import gc +import requests + + +from datetime import datetime, timedelta +from requests.utils import urlparse +from zoneinfo import ZoneInfo + +from fastapi import FastAPI, BackgroundTasks +from fastapi.responses import JSONResponse +from fastapi.middleware import Middleware +from starlette.middleware.cors import CORSMiddleware +from fastapi_utils.tasks import repeat_every + +from app.settings import * + +# App +middleware = [ + Middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ), +] +app = FastAPI(title="noaa", middleware=middleware) + + +# API +# GET 'test' +@app.get("/test/") +async def get_test(): + # check source availability + # fresh one might be missing and old ones get deleted, so check yesterday's + yesterday_news = datetime.now(tz=ZoneInfo("US/Eastern")) - timedelta(days=1) + url = form_gfswave_link(target_time=yesterday_news) + if not is_reachable(url): # just one should be fine + print(url, " is not reachable at this time") + # TODO: should we actually error out? + return JSONResponse(content={"status": "success"}) + + +# GET 'test_background' +@app.get("/test_background/{field_1}") +async def get_test_background(field_1: str, background_tasks: BackgroundTasks): + # background_tasks - for "heavy" processes + # TODO + background_tasks.add_task(service_example.run_test, field_1) + return JSONResponse(content={"status": "Background task started"}) + + +# Tasks +# gc +@app.on_event("startup") +@repeat_every(seconds=(1 * 60)) +async def task_gc() -> None: + gc.collect() + + +def is_reachable(url: str): + """Check if url is reachable at all with the current setup + + :param url: URL to check + :return: True if url is reachable, False otherwise + """ + if requests.head(url): + return True + return False + + +def form_gfs_link(target_time=None, prod_hour=384): + """Return well formed link to gfs data which + should be available by given time + + :param target_time: time to check, defaults to current time + :param prod_hour: forecast hour to link to, defaults to 384 + :returns: URL to gfs file + """ + if not target_time: + target_time = datetime.now( + tz=ZoneInfo("US/Eastern") + ) # noaa is located in washington + + looking_at = "atmos" + + date_str = target_time.strftime("%Y%m%d") + hour_str = str((target_time.hour // 6) * 6).zfill(2) + target_url = f"https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfs.{date_str}/{hour_str}/{looking_at}/gfs.t{hour_str}z.pgrb2.0p25.f{prod_hour}" + return target_url + + +def form_gfswave_link(target_time=None, prod_hour=384): + """Return well formed link to gfs data which + should be available by given time + + :param target_time: time to check, defaults to current time + :param prod_hour: forecast hour to link to, defaults to 384 + :returns: URL to gfs file + """ + if not target_time: + target_time = datetime.now( + tz=ZoneInfo("US/Eastern") + ) # noaa is located in washington + + looking_at = "wave" + + date_str = target_time.strftime("%Y%m%d") + hour_str = str((target_time.hour // 6) * 6).zfill(2) + target_url = f"https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfs.{date_str}/{hour_str}/{looking_at}/gridded/gfs{looking_at}.t{hour_str}z.global.0p25.f{prod_hour}.grib2" + return target_url diff --git a/app/settings.py b/app/settings.py new file mode 100755 index 0000000..38333a9 --- /dev/null +++ b/app/settings.py @@ -0,0 +1,12 @@ +import os + +# Tools +from utils.tools import make_dir + + +# Paths +BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + +SAVE_DIR = '/tmp/grib/' # don't forget to create it! + +MAX_FORCAST_HOUR = 5 # 0-384 beware, each file is 500mb diff --git a/entrypoint.sh b/entrypoint.sh new file mode 100644 index 0000000..45ae467 --- /dev/null +++ b/entrypoint.sh @@ -0,0 +1,17 @@ +#!/bin/bash --login +# The --login ensures the bash configuration is loaded +# enabling Conda. + +# Enable strict mode. +set -euo pipefail +# ... Run whatever commands ... + +# Temporarily disable strict mode and activate conda: +set +euo pipefail +conda activate myenv + +# Re-enable strict mode: +set -euo pipefail + +# exec the final command: +#exec gunicorn app.main:app -w 4 -b 0.0.0.0:80 -k uvicorn.workers.UvicornWorker --timeout 0 --graceful-timeout 0 --keep-alive 300 diff --git a/environment.yml b/environment.yml new file mode 100644 index 0000000..6df680c --- /dev/null +++ b/environment.yml @@ -0,0 +1,14 @@ +# conda install --channel conda-forge pynio fastapi fastapi_utils gunicorn requests wget` +name: myenv +channels: + - conda-forge +dependencies: + - conda + - python=3.9 + - wget + - pynio + - gunicorn + - requests + - fastapi + - fastapi_utils +prefix: /opt/conda \ No newline at end of file diff --git a/main.py b/main.py deleted file mode 100644 index 0246779..0000000 --- a/main.py +++ /dev/null @@ -1,301 +0,0 @@ -import os -import requests -import xarray as xr - -from fastapi import FastAPI -from numpy import arctan2, degrees, isnan -from requests.compat import urljoin, urlparse -from time import strftime, gmtime, sleep -from bs4 import BeautifulSoup -from bs4.element import Tag - -app = FastAPI() - - -@app.get("/download/") -def start_download(): - """ - Download most recent available models from ncep.noaa.gov - """ - soup = BeautifulSoup( - requests.get("https://www.nco.ncep.noaa.gov/pmb/products/gfs/").text, - "html.parser", - ) - base_url = ( - soup.find("font", string="Most commonly used parameters") - .parent.find_next_sibling("td") - .find("a", string="Available in GRIB2 via https")["href"] - ) - prodlist = BeautifulSoup( - requests.get("https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod").text, - "html.parser", - ) - date_url = urljoin( - base=base_url + "/", - url=prodlist.find( - "a", string="".join(["gfs.", strftime("%Y%m%d", gmtime()), "/"]) - )["href"] - if prodlist.find( - "a", string="".join(["gfs.", strftime("%Y%m%d", gmtime()), "/"]) - ) - else prodlist.find("a", string="".join(["gfs.", "20220508", "/"]))[ - "href" - ], # TODO[2]: use datetime and fallback to the previous day gracefully - ) - hour_list = BeautifulSoup( - requests.get(urljoin(base=base_url + "/", url=date_url)).text, "html.parser" - ) - latest_hour = max(hour_list.findAll("a"), key=pull_text).text.removesuffix( - "/" - ) # TODO:this may be unavailable still, fallback to previous hour - - hour_url = urljoin( - base=date_url, url=max(hour_list.findAll("a"), key=pull_text)["href"] - ) - - atmos_url = urljoin(base=hour_url, url="atmos/") - - prod_hour = "004" # TODO[1]: do it for 000 - 384 forecast hours of product - gfs_url = urljoin( - base=atmos_url, - url="".join(["gfs.t", latest_hour, "z.pgrb2.0p25.f", prod_hour]), - ) - - if not requests.head( - urljoin( - base=atmos_url, url="".join(["gfs.t", latest_hour, "z.pgrb2.0p25.f", "384"]) - ) - ): - raise Exception("unavailable still, fallback to previous hour") - - print(download_file(gfs_url)) - wave_url = urljoin(base=hour_url, url="wave/gridded/") - gfswave_url = urljoin( - base=wave_url, - url="".join(["gfswave.t", latest_hour, "z.global.0p25.f", prod_hour, ".grib2"]), - ) - - if not requests.head( - urljoin( - base=gfswave_url, - url="".join(["gfswave.t", latest_hour, "z.global.0p25.f", "384", ".grib2"]), - ) - ): - raise Exception("unavailable still, fallback to previous hour") - - print(download_file(gfswave_url)) - - -def pull_text(tag: Tag): - try: - return int(tag.text.removesuffix("/")) - except ValueError: - return -1 - - -def download_file(url: str, file_path="/tmp/grib/", attempts=2): - """Downloads a URL content into a file (with large file support by streaming) - - :param url: URL to download - :param file_path: Local file name to contain the data downloaded - :param attempts: Number of attempts - :return: New file path. Empty string if the download failed - """ - if not file_path: - file_path = os.path.realpath(os.path.basename(url)) - if os.path.isdir(file_path): - file_path = os.path.join(file_path, os.path.basename(url)) - # logger.info(f'Downloading {url} content to {file_path}') - url_sections = urlparse(url) - if not url_sections.scheme: - # logger.debug('The given url is missing a scheme. Adding http scheme') - url = f"http://{url}" - # logger.debug(f'New url: {url}') - for attempt in range(1, attempts + 1): - try: - if attempt > 1: - sleep(10) # 10 seconds wait time between downloads - with requests.get(url, stream=True) as response: - response.raise_for_status() - with open(file_path, "wb") as out_file: - for chunk in response.iter_content( - chunk_size=1024 * 1024 * 8 - ): # 8MB chunks - out_file.write(chunk) - # logger.info('Download finished successfully') - return file_path - except Exception as ex: - print(ex) - # logger.error(f'Attempt #{attempt} failed with error: {ex}') - return "" - - -@app.get("/weather_dot/") -def weather_dot(lat, lon, prediction_time="000"): - response = {} - latest_hour = "18" # TODO[3]: global? singleton? make user choose? - with xr.open_dataset( - f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}", - engine="cfgrib", - filter_by_keys={"typeOfLevel": "heightAboveGround", "level": 10}, - ) as ds: - wind_dot = ds.sel( - latitude=lat, longitude=lon - ) # TODO[0]: Do we want method="nearest" or exact here - u_wind = wind_dot.get("u10").item() - v_wind = wind_dot.get("v10").item() - response.update( - u_wind=u_wind, - v_wind=v_wind, - # http://colaweb.gmu.edu/dev/clim301/lectures/wind/wind-uv - wind_speed=(u_wind**2 + v_wind**2) ** 0.5, - wind_alpha=degrees(arctan2(v_wind, u_wind)), - ) - - with xr.open_dataset( - f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}", - engine="cfgrib", - filter_by_keys={"typeOfLevel": "heightAboveGround", "level": 2}, - ) as ds: - two_dot = ds.sel( - latitude=lat, longitude=lon - ) # TODO[0]: Do we want method="nearest" or exact here - response.update( - temperature=two_dot.get("t2m").item(), - dew_point=two_dot.get("d2m").item(), - humidity=two_dot.get("r2").item(), - ) - if int(prediction_time) > 0: - response.update( - min_temp=two_dot.get("tmin").item(), max_temp=two_dot.get("tmax").item() - ) - with xr.open_dataset( - f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}", - engine="cfgrib", - filter_by_keys={"typeOfLevel": "meanSea"}, - ) as ds: - sea_dot = ds.sel( - latitude=lat, longitude=lon - ) # TODO[0]: Do we want method="nearest" or exact here - response.update(pressure=sea_dot.get("prmsl").item() / 1000) # in GPa - - # things differ for the first and the latter for the rest - if int(prediction_time) > 0: - with xr.open_dataset( - f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}", - engine="cfgrib", - filter_by_keys={"typeOfLevel": "surface", "stepType": "instant"}, - ) as ds: - surface_dot = ds.sel( - latitude=lat, longitude=lon - ) # TODO[0]: Do we want method="nearest" or exact here - response.update( - visibility=surface_dot.get("vis").item() / 1000, # in km - wind_gust=surface_dot.get("gust").item(), - frozen_precip=surface_dot.get("cpofp").item(), - ) - - with xr.open_dataset( - f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}", - engine="cfgrib", - filter_by_keys={"typeOfLevel": "surface", "stepType": "accum"}, - ) as ds: - surface_dot = ds.sel( - latitude=lat, longitude=lon - ) # TODO[0]: Do we want method="nearest" or exact here - response.update(total_precip=surface_dot.get("tp").item()) - - with xr.open_dataset( - f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}", - engine="cfgrib", - filter_by_keys={"typeOfLevel": "lowCloudLayer", "stepType": "instant"}, - ) as ds: - low_cloud_dot = ds.sel( - latitude=lat, longitude=lon - ) # TODO[0]: Do we want method="nearest" or exact here - response.update(low_clouds=low_cloud_dot.get("lcc").item()) - with xr.open_dataset( - f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}", - engine="cfgrib", - filter_by_keys={"typeOfLevel": "atmosphere", "stepType": "instant"}, - ) as ds: - total_cloud_dot = ds.sel( - latitude=lat, longitude=lon - ) # TODO[0]: Do we want method="nearest" or exact here - response.update(total_clouds=total_cloud_dot.get("tcc").item()) - - with xr.open_dataset( - f"/tmp/grib/gfswave.t{latest_hour}z.global.0p25.f{prediction_time}.grib2", - engine="cfgrib", - # filter_by_keys={"stepType": "avg"}, - ) as ds: - wave_dot = ds.sel( - latitude=lat, longitude=lon - ) # TODO[0]: Do we want method="nearest" or exact here - if not isnan(wave_dot.get("swh").item()): # check one for swell waves - response.update( - wind_and_swell_wave_height=wave_dot.get("swh").item(), - swell_wave_direction=wave_dot.get("swdir").values.tolist(), - swell_wave_period=wave_dot.get("swper").values.tolist(), - ) - if not isnan(wave_dot.get("shww").item()): # and one for wind waves - response.update( - wind_wave_height=wave_dot.get("shww").item(), - wind_wave_direction=wave_dot.get("wvdir").item(), - wind_wave_period=wave_dot.get("mpww").item(), - ) - - else: - with xr.open_dataset( - f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}", - engine="cfgrib", - filter_by_keys={"typeOfLevel": "surface"}, - ) as ds: - surface_dot = ds.sel( - latitude=lat, longitude=lon - ) # TODO[0]: Do we want method="nearest" or exact here - response.update( - visibility=surface_dot.get("vis").item() / 1000, # in km - wind_gust=surface_dot.get("gust").item(), - frozen_precip=surface_dot.get("cpofp").item(), - ) - with xr.open_dataset( - f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}", - engine="cfgrib", - filter_by_keys={"typeOfLevel": "lowCloudLayer"}, - ) as ds: - low_cloud_dot = ds.sel( - latitude=lat, longitude=lon - ) # TODO[0]: Do we want method="nearest" or exact here - response.update(low_clouds=low_cloud_dot.get("lcc").item()) - - with xr.open_dataset( - f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}", - engine="cfgrib", - filter_by_keys={"typeOfLevel": "atmosphere"}, - ) as ds: - total_cloud_dot = ds.sel( - latitude=lat, longitude=lon - ) # TODO[0]: Do we want method="nearest" or exact here - response.update(total_clouds=total_cloud_dot.get("tcc").item()) - - with xr.open_dataset( - f"/tmp/grib/gfswave.t{latest_hour}z.global.0p25.f{prediction_time}.grib2", - engine="cfgrib", - ) as ds: - wave_dot = ds.sel( - latitude=lat, longitude=lon - ) # TODO[0]: Do we want method="nearest" or exact here - if not isnan( - wave_dot.get("swh").item() - ): # just check one, should be enoughto tell if it is water - response.update( - wind_and_swell_wave_height=wave_dot.get("swh").item(), - wind_wave_height=wave_dot.get("shww").item(), - wind_wave_direction=wave_dot.get("wvdir").item(), - swell_wave_direction=wave_dot.get("swdir").values.tolist(), - swell_wave_period=wave_dot.get("swper").values.tolist(), - wind_wave_period=wave_dot.get("mpww").item(), - ) - - return response diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 55a4682..0000000 --- a/requirements.txt +++ /dev/null @@ -1,26 +0,0 @@ -anyio==3.5.0 -attrs==21.4.0 -beautifulsoup4==4.11.1 -bs4==0.0.1 -cffi==1.15.0 -cfgrib==0.9.10.1 -click==8.1.3 -eccodes==1.4.1 -fastapi==0.76.0 -findlibs==0.0.2 -idna==3.3 -numpy==1.22.3 -packaging==21.3 -pandas==1.4.2 -pycparser==2.21 -pydantic==1.9.0 -pyparsing==3.0.8 -python-dateutil==2.8.2 -pytz==2022.1 -requests==2.27.1 -six==1.16.0 -sniffio==1.2.0 -soupsieve==2.3.2.post1 -starlette==0.18.0 -typing_extensions==4.2.0 -xarray==2022.3.0 diff --git a/utils/tools.py b/utils/tools.py new file mode 100755 index 0000000..6437fc1 --- /dev/null +++ b/utils/tools.py @@ -0,0 +1,13 @@ +import os + + +# Make dir +def make_dir(path_dir): + path_dir = str(path_dir) + try: + if not os.path.exists(path_dir): + os.makedirs(path_dir) + print("Ready ('make_dir'). '{path_dir}'".format(path_dir=path_dir)) + except Exception as e: + print("Error while executing 'make_dir()'. {e}".format(e=e)) + return path_dir