dev
g 4 years ago
parent 0af0476198
commit 2abf210d51

@ -0,0 +1,23 @@
FROM continuumio/miniconda3
WORKDIR /app
# Create the environment:
COPY environment.yml .
RUN conda env create --file environment.yml
# Make RUN commands use the new environment:
RUN echo "conda activate myenv" >> ~/.bashrc
SHELL ["/bin/bash", "--login", "-c"]
# Demonstrate the environment is activated:
RUN echo "Make sure fastapi is installed:"
RUN python -c "import fastapi"
# The code to run when container is started:
COPY . /app
EXPOSE 80
ENTRYPOINT ["./entrypoint.sh"]
CMD ["gunicorn", "app.main:app", "-w", "4", "-b", "0.0.0.0:80", "-k", "uvicorn.workers.UvicornWorker", "--timeout", "0", "--graceful-timeout", "0", "--keep-alive", "300"]

@ -0,0 +1,48 @@
##################################################
# Init #
##################################################
SHELL = /bin/sh
ROOT_DIR:=$(shell dirname $(realpath $(firstword $(MAKEFILE_LIST))))
##################################################
# Docker #
##################################################
build:
docker build -t noaa_grib:latest .
run:
docker run -p 1000:80 --name noaa_grib noaa_grib:latest
up:
docker build -t noaa_grib:latest . && docker run -p 1000:80 --name noaa_grib noaa_grib:latest
down:
docker stop noaa_grib && docker rm noaa_grib
##################################################
# Requirements (for local development) #
##################################################
req:
conda env create --file environment.yml
##################################################
# Black #
##################################################
black:
black $(ROOT_DIR)/*
##################################################
# File system #
##################################################
chmod:
sudo chmod 777 -R $(ROOT_DIR)/*

@ -1,31 +1,22 @@
Prototype. # noaa grib downloader
- linux only (uses `/tmp/`)
Download: #### Docker (```Dockerfile```):
- downloads either today or 2022-05-08 if today is unavailable - build:
- crashes if most recent hour does not have files yet (it is chehcked but fallback is not implemented) ```make build```
- prediction_time is hardcoded to 4 - run:
```make run```
- up (build and run):
```make up```
- down (stop and remove volumes):
```make stop```
- base URL: http://localhost:1000
Data:
- returns data PER DOT (and in 20 seconds)
- opens the file like 10 times
- latest hour is hardcoded to 18
# Requirements Install requirements locally:
Requires `eccodes` to be installed ```make req```
https://confluence.ecmwf.int/display/ECC/ecCodes+Home
```
pip install -r requirements.txt
python -m eccodes selfcheck
```
Reformat code (Black):
```make black```
# Usage Change permissions to mounted volumes (if necessary):
``` ```make chmod```
uvicorn main:app --reload
```
```
curl http://127.0.0.1:8000/download/
curl 'http://127.0.0.1:8000/weather_dot/?lat=75&lon=0&prediction_time=004'
```

@ -0,0 +1,111 @@
import gc
import requests
from datetime import datetime, timedelta
from requests.utils import urlparse
from zoneinfo import ZoneInfo
from fastapi import FastAPI, BackgroundTasks
from fastapi.responses import JSONResponse
from fastapi.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
from fastapi_utils.tasks import repeat_every
from app.settings import *
# App
middleware = [
Middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
),
]
app = FastAPI(title="noaa", middleware=middleware)
# API
# GET 'test'
@app.get("/test/")
async def get_test():
# check source availability
# fresh one might be missing and old ones get deleted, so check yesterday's
yesterday_news = datetime.now(tz=ZoneInfo("US/Eastern")) - timedelta(days=1)
url = form_gfswave_link(target_time=yesterday_news)
if not is_reachable(url): # just one should be fine
print(url, " is not reachable at this time")
# TODO: should we actually error out?
return JSONResponse(content={"status": "success"})
# GET 'test_background'
@app.get("/test_background/{field_1}")
async def get_test_background(field_1: str, background_tasks: BackgroundTasks):
# background_tasks - for "heavy" processes
# TODO
background_tasks.add_task(service_example.run_test, field_1)
return JSONResponse(content={"status": "Background task started"})
# Tasks
# gc
@app.on_event("startup")
@repeat_every(seconds=(1 * 60))
async def task_gc() -> None:
gc.collect()
def is_reachable(url: str):
"""Check if url is reachable at all with the current setup
:param url: URL to check
:return: True if url is reachable, False otherwise
"""
if requests.head(url):
return True
return False
def form_gfs_link(target_time=None, prod_hour=384):
"""Return well formed link to gfs data which
should be available by given time
:param target_time: time to check, defaults to current time
:param prod_hour: forecast hour to link to, defaults to 384
:returns: URL to gfs file
"""
if not target_time:
target_time = datetime.now(
tz=ZoneInfo("US/Eastern")
) # noaa is located in washington
looking_at = "atmos"
date_str = target_time.strftime("%Y%m%d")
hour_str = str((target_time.hour // 6) * 6).zfill(2)
target_url = f"https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfs.{date_str}/{hour_str}/{looking_at}/gfs.t{hour_str}z.pgrb2.0p25.f{prod_hour}"
return target_url
def form_gfswave_link(target_time=None, prod_hour=384):
"""Return well formed link to gfs data which
should be available by given time
:param target_time: time to check, defaults to current time
:param prod_hour: forecast hour to link to, defaults to 384
:returns: URL to gfs file
"""
if not target_time:
target_time = datetime.now(
tz=ZoneInfo("US/Eastern")
) # noaa is located in washington
looking_at = "wave"
date_str = target_time.strftime("%Y%m%d")
hour_str = str((target_time.hour // 6) * 6).zfill(2)
target_url = f"https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfs.{date_str}/{hour_str}/{looking_at}/gridded/gfs{looking_at}.t{hour_str}z.global.0p25.f{prod_hour}.grib2"
return target_url

@ -0,0 +1,12 @@
import os
# Tools
from utils.tools import make_dir
# Paths
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
SAVE_DIR = '/tmp/grib/' # don't forget to create it!
MAX_FORCAST_HOUR = 5 # 0-384 beware, each file is 500mb

@ -0,0 +1,17 @@
#!/bin/bash --login
# The --login ensures the bash configuration is loaded
# enabling Conda.
# Enable strict mode.
set -euo pipefail
# ... Run whatever commands ...
# Temporarily disable strict mode and activate conda:
set +euo pipefail
conda activate myenv
# Re-enable strict mode:
set -euo pipefail
# exec the final command:
#exec gunicorn app.main:app -w 4 -b 0.0.0.0:80 -k uvicorn.workers.UvicornWorker --timeout 0 --graceful-timeout 0 --keep-alive 300

@ -0,0 +1,14 @@
# conda install --channel conda-forge pynio fastapi fastapi_utils gunicorn requests wget`
name: myenv
channels:
- conda-forge
dependencies:
- conda
- python=3.9
- wget
- pynio
- gunicorn
- requests
- fastapi
- fastapi_utils
prefix: /opt/conda

@ -1,301 +0,0 @@
import os
import requests
import xarray as xr
from fastapi import FastAPI
from numpy import arctan2, degrees, isnan
from requests.compat import urljoin, urlparse
from time import strftime, gmtime, sleep
from bs4 import BeautifulSoup
from bs4.element import Tag
app = FastAPI()
@app.get("/download/")
def start_download():
"""
Download most recent available models from ncep.noaa.gov
"""
soup = BeautifulSoup(
requests.get("https://www.nco.ncep.noaa.gov/pmb/products/gfs/").text,
"html.parser",
)
base_url = (
soup.find("font", string="Most commonly used parameters")
.parent.find_next_sibling("td")
.find("a", string="Available in GRIB2 via https")["href"]
)
prodlist = BeautifulSoup(
requests.get("https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod").text,
"html.parser",
)
date_url = urljoin(
base=base_url + "/",
url=prodlist.find(
"a", string="".join(["gfs.", strftime("%Y%m%d", gmtime()), "/"])
)["href"]
if prodlist.find(
"a", string="".join(["gfs.", strftime("%Y%m%d", gmtime()), "/"])
)
else prodlist.find("a", string="".join(["gfs.", "20220508", "/"]))[
"href"
], # TODO[2]: use datetime and fallback to the previous day gracefully
)
hour_list = BeautifulSoup(
requests.get(urljoin(base=base_url + "/", url=date_url)).text, "html.parser"
)
latest_hour = max(hour_list.findAll("a"), key=pull_text).text.removesuffix(
"/"
) # TODO:this may be unavailable still, fallback to previous hour
hour_url = urljoin(
base=date_url, url=max(hour_list.findAll("a"), key=pull_text)["href"]
)
atmos_url = urljoin(base=hour_url, url="atmos/")
prod_hour = "004" # TODO[1]: do it for 000 - 384 forecast hours of product
gfs_url = urljoin(
base=atmos_url,
url="".join(["gfs.t", latest_hour, "z.pgrb2.0p25.f", prod_hour]),
)
if not requests.head(
urljoin(
base=atmos_url, url="".join(["gfs.t", latest_hour, "z.pgrb2.0p25.f", "384"])
)
):
raise Exception("unavailable still, fallback to previous hour")
print(download_file(gfs_url))
wave_url = urljoin(base=hour_url, url="wave/gridded/")
gfswave_url = urljoin(
base=wave_url,
url="".join(["gfswave.t", latest_hour, "z.global.0p25.f", prod_hour, ".grib2"]),
)
if not requests.head(
urljoin(
base=gfswave_url,
url="".join(["gfswave.t", latest_hour, "z.global.0p25.f", "384", ".grib2"]),
)
):
raise Exception("unavailable still, fallback to previous hour")
print(download_file(gfswave_url))
def pull_text(tag: Tag):
try:
return int(tag.text.removesuffix("/"))
except ValueError:
return -1
def download_file(url: str, file_path="/tmp/grib/", attempts=2):
"""Downloads a URL content into a file (with large file support by streaming)
:param url: URL to download
:param file_path: Local file name to contain the data downloaded
:param attempts: Number of attempts
:return: New file path. Empty string if the download failed
"""
if not file_path:
file_path = os.path.realpath(os.path.basename(url))
if os.path.isdir(file_path):
file_path = os.path.join(file_path, os.path.basename(url))
# logger.info(f'Downloading {url} content to {file_path}')
url_sections = urlparse(url)
if not url_sections.scheme:
# logger.debug('The given url is missing a scheme. Adding http scheme')
url = f"http://{url}"
# logger.debug(f'New url: {url}')
for attempt in range(1, attempts + 1):
try:
if attempt > 1:
sleep(10) # 10 seconds wait time between downloads
with requests.get(url, stream=True) as response:
response.raise_for_status()
with open(file_path, "wb") as out_file:
for chunk in response.iter_content(
chunk_size=1024 * 1024 * 8
): # 8MB chunks
out_file.write(chunk)
# logger.info('Download finished successfully')
return file_path
except Exception as ex:
print(ex)
# logger.error(f'Attempt #{attempt} failed with error: {ex}')
return ""
@app.get("/weather_dot/")
def weather_dot(lat, lon, prediction_time="000"):
response = {}
latest_hour = "18" # TODO[3]: global? singleton? make user choose?
with xr.open_dataset(
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
engine="cfgrib",
filter_by_keys={"typeOfLevel": "heightAboveGround", "level": 10},
) as ds:
wind_dot = ds.sel(
latitude=lat, longitude=lon
) # TODO[0]: Do we want method="nearest" or exact here
u_wind = wind_dot.get("u10").item()
v_wind = wind_dot.get("v10").item()
response.update(
u_wind=u_wind,
v_wind=v_wind,
# http://colaweb.gmu.edu/dev/clim301/lectures/wind/wind-uv
wind_speed=(u_wind**2 + v_wind**2) ** 0.5,
wind_alpha=degrees(arctan2(v_wind, u_wind)),
)
with xr.open_dataset(
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
engine="cfgrib",
filter_by_keys={"typeOfLevel": "heightAboveGround", "level": 2},
) as ds:
two_dot = ds.sel(
latitude=lat, longitude=lon
) # TODO[0]: Do we want method="nearest" or exact here
response.update(
temperature=two_dot.get("t2m").item(),
dew_point=two_dot.get("d2m").item(),
humidity=two_dot.get("r2").item(),
)
if int(prediction_time) > 0:
response.update(
min_temp=two_dot.get("tmin").item(), max_temp=two_dot.get("tmax").item()
)
with xr.open_dataset(
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
engine="cfgrib",
filter_by_keys={"typeOfLevel": "meanSea"},
) as ds:
sea_dot = ds.sel(
latitude=lat, longitude=lon
) # TODO[0]: Do we want method="nearest" or exact here
response.update(pressure=sea_dot.get("prmsl").item() / 1000) # in GPa
# things differ for the first and the latter for the rest
if int(prediction_time) > 0:
with xr.open_dataset(
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
engine="cfgrib",
filter_by_keys={"typeOfLevel": "surface", "stepType": "instant"},
) as ds:
surface_dot = ds.sel(
latitude=lat, longitude=lon
) # TODO[0]: Do we want method="nearest" or exact here
response.update(
visibility=surface_dot.get("vis").item() / 1000, # in km
wind_gust=surface_dot.get("gust").item(),
frozen_precip=surface_dot.get("cpofp").item(),
)
with xr.open_dataset(
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
engine="cfgrib",
filter_by_keys={"typeOfLevel": "surface", "stepType": "accum"},
) as ds:
surface_dot = ds.sel(
latitude=lat, longitude=lon
) # TODO[0]: Do we want method="nearest" or exact here
response.update(total_precip=surface_dot.get("tp").item())
with xr.open_dataset(
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
engine="cfgrib",
filter_by_keys={"typeOfLevel": "lowCloudLayer", "stepType": "instant"},
) as ds:
low_cloud_dot = ds.sel(
latitude=lat, longitude=lon
) # TODO[0]: Do we want method="nearest" or exact here
response.update(low_clouds=low_cloud_dot.get("lcc").item())
with xr.open_dataset(
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
engine="cfgrib",
filter_by_keys={"typeOfLevel": "atmosphere", "stepType": "instant"},
) as ds:
total_cloud_dot = ds.sel(
latitude=lat, longitude=lon
) # TODO[0]: Do we want method="nearest" or exact here
response.update(total_clouds=total_cloud_dot.get("tcc").item())
with xr.open_dataset(
f"/tmp/grib/gfswave.t{latest_hour}z.global.0p25.f{prediction_time}.grib2",
engine="cfgrib",
# filter_by_keys={"stepType": "avg"},
) as ds:
wave_dot = ds.sel(
latitude=lat, longitude=lon
) # TODO[0]: Do we want method="nearest" or exact here
if not isnan(wave_dot.get("swh").item()): # check one for swell waves
response.update(
wind_and_swell_wave_height=wave_dot.get("swh").item(),
swell_wave_direction=wave_dot.get("swdir").values.tolist(),
swell_wave_period=wave_dot.get("swper").values.tolist(),
)
if not isnan(wave_dot.get("shww").item()): # and one for wind waves
response.update(
wind_wave_height=wave_dot.get("shww").item(),
wind_wave_direction=wave_dot.get("wvdir").item(),
wind_wave_period=wave_dot.get("mpww").item(),
)
else:
with xr.open_dataset(
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
engine="cfgrib",
filter_by_keys={"typeOfLevel": "surface"},
) as ds:
surface_dot = ds.sel(
latitude=lat, longitude=lon
) # TODO[0]: Do we want method="nearest" or exact here
response.update(
visibility=surface_dot.get("vis").item() / 1000, # in km
wind_gust=surface_dot.get("gust").item(),
frozen_precip=surface_dot.get("cpofp").item(),
)
with xr.open_dataset(
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
engine="cfgrib",
filter_by_keys={"typeOfLevel": "lowCloudLayer"},
) as ds:
low_cloud_dot = ds.sel(
latitude=lat, longitude=lon
) # TODO[0]: Do we want method="nearest" or exact here
response.update(low_clouds=low_cloud_dot.get("lcc").item())
with xr.open_dataset(
f"/tmp/grib/gfs.t{latest_hour}z.pgrb2.0p25.f{prediction_time}",
engine="cfgrib",
filter_by_keys={"typeOfLevel": "atmosphere"},
) as ds:
total_cloud_dot = ds.sel(
latitude=lat, longitude=lon
) # TODO[0]: Do we want method="nearest" or exact here
response.update(total_clouds=total_cloud_dot.get("tcc").item())
with xr.open_dataset(
f"/tmp/grib/gfswave.t{latest_hour}z.global.0p25.f{prediction_time}.grib2",
engine="cfgrib",
) as ds:
wave_dot = ds.sel(
latitude=lat, longitude=lon
) # TODO[0]: Do we want method="nearest" or exact here
if not isnan(
wave_dot.get("swh").item()
): # just check one, should be enoughto tell if it is water
response.update(
wind_and_swell_wave_height=wave_dot.get("swh").item(),
wind_wave_height=wave_dot.get("shww").item(),
wind_wave_direction=wave_dot.get("wvdir").item(),
swell_wave_direction=wave_dot.get("swdir").values.tolist(),
swell_wave_period=wave_dot.get("swper").values.tolist(),
wind_wave_period=wave_dot.get("mpww").item(),
)
return response

@ -1,26 +0,0 @@
anyio==3.5.0
attrs==21.4.0
beautifulsoup4==4.11.1
bs4==0.0.1
cffi==1.15.0
cfgrib==0.9.10.1
click==8.1.3
eccodes==1.4.1
fastapi==0.76.0
findlibs==0.0.2
idna==3.3
numpy==1.22.3
packaging==21.3
pandas==1.4.2
pycparser==2.21
pydantic==1.9.0
pyparsing==3.0.8
python-dateutil==2.8.2
pytz==2022.1
requests==2.27.1
six==1.16.0
sniffio==1.2.0
soupsieve==2.3.2.post1
starlette==0.18.0
typing_extensions==4.2.0
xarray==2022.3.0

@ -0,0 +1,13 @@
import os
# Make dir
def make_dir(path_dir):
path_dir = str(path_dir)
try:
if not os.path.exists(path_dir):
os.makedirs(path_dir)
print("Ready ('make_dir'). '{path_dir}'".format(path_dir=path_dir))
except Exception as e:
print("Error while executing 'make_dir()'. {e}".format(e=e))
return path_dir
Loading…
Cancel
Save