From 1fead8bdbc0a2c04afa79fb39682b940a65d52c4 Mon Sep 17 00:00:00 2001 From: g Date: Tue, 17 May 2022 13:25:53 +0300 Subject: [PATCH] add: mvp --- app/main.py | 144 ++++++++++++++++++++++++++++++++++++++++++++++-- app/settings.py | 96 +++++++++++++++++++++++++++++++- 2 files changed, 232 insertions(+), 8 deletions(-) diff --git a/app/main.py b/app/main.py index d3b2060..380e5f8 100755 --- a/app/main.py +++ b/app/main.py @@ -1,5 +1,8 @@ import gc +import os import requests +import wget +import xarray as xr from datetime import datetime, timedelta @@ -41,12 +44,10 @@ async def get_test(): return JSONResponse(content={"status": "success"}) -# GET 'test_background' -@app.get("/test_background/{field_1}") -async def get_test_background(field_1: str, background_tasks: BackgroundTasks): - # background_tasks - for "heavy" processes - # TODO - background_tasks.add_task(service_example.run_test, field_1) +@app.get("/download/") +async def download_grib(background_tasks: BackgroundTasks): + """Download and process GRIB files into csv of requested parameters""" + background_tasks.add_task(extract_useful_data) return JSONResponse(content={"status": "Background task started"}) @@ -83,6 +84,7 @@ def form_gfs_link(target_time=None, prod_hour=384): ) # noaa is located in washington looking_at = "atmos" + prod_hour = str(prod_hour).zfill(3) date_str = target_time.strftime("%Y%m%d") hour_str = str((target_time.hour // 6) * 6).zfill(2) @@ -104,8 +106,138 @@ def form_gfswave_link(target_time=None, prod_hour=384): ) # noaa is located in washington looking_at = "wave" + prod_hour = str(prod_hour).zfill(3) date_str = target_time.strftime("%Y%m%d") hour_str = str((target_time.hour // 6) * 6).zfill(2) target_url = f"https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfs.{date_str}/{hour_str}/{looking_at}/gridded/gfs{looking_at}.t{hour_str}z.global.0p25.f{prod_hour}.grib2" return target_url + + +def fresh_grib_time(target_time=None): + """Find most recent available GRIB file + for GFS atmospheric and GFSWave forecasts + + :param target_time: no more recent than this time + :returns: most recent time GRIB is available + """ + if not target_time: + target_time = datetime.now( + tz=ZoneInfo("US/Eastern") + ) # noaa is located in washington + + for fallback in range(4): + # wasting a bit of time here on useless first cycle (mult by 0, subtract 0) + # allows us to kepp code a bit neater later + fallback_hours = fallback * 6 + target_time = target_time - timedelta(hours=fallback_hours) + + if not is_reachable(form_gfs_link(target_time=target_time, prod_hour=384)): + continue + if not is_reachable(form_gfswave_link(target_time=target_time, prod_hour=384)): + continue + break + else: + raise Exception("No forecasts in the last 24 hours were reachable") + + # both forecasts are reachable @ target_time + return target_time + + +def download_fresh_grib(target_time, prod_hour=0): + """Download GRIB files for atmos and wave + + :param target_time: download GRIB from this time + :param prod_hour: download forecast for this hour + :returns: filenames where GRIB files are downloaded to + """ + gfs_atmos = form_gfs_link(target_time=target_time, prod_hour=prod_hour) + gfs_wave = form_gfswave_link(target_time=target_time, prod_hour=prod_hour) + + return ( + wget.download(gfs_atmos, out=SAVE_DIR), + wget.download(gfs_wave, out=SAVE_DIR), + ) + + +def extract_useful_data(): + """Download and process GRIB files into csv of requested parameters + + :returns: filenames of the resulting csv + """ + target_time = fresh_grib_time() + save_to = f"{target_time.strftime('%Y%m%d')}-{str((target_time.hour // 6) * 6).zfill(2)}.csv" + + for forecast_hour in range(MAX_FORECAST_HOUR): + # download gribs + ds_atmos_file, ds_wave_file = download_fresh_grib( + target_time=target_time, prod_hour=forecast_hour + ) + + # filter atmos to requested variables + with xr.open_dataset(ds_atmos_file, engine="pynio") as ds_atmos: + filtered_ds_atmos = ds_atmos.get(ATMOS_PARAM_NAMES) or ds_atmos.get( + [p for p in ATMOS_PARAM_NAMES if not p == "APCP_P8_L1_GLL0_acc"] + ) # skip running total column in the first forecast + + for name, param in HEIGHT_PARAM_NAMES.items(): + if name == "TMP_P0_L103_GLL0": + level = TEMPERATURE_HEIGHT + else: + level = WIND_HEIGHT + filtered_ds_atmos[name] = ( + ds_atmos[name] + .sel({param: level}) + .assign_attrs(level=level) + .drop_vars(param) + ) + # if hour==0 add running total column from future forecasts + if forecast_hour == 0: + precip = xr.zeros_like(filtered_ds_atmos["GUST_P0_L1_GLL0"]) + precip.name = "APCP_P8_L1_GLL0_acc" + filtered_ds_atmos = xr.combine_by_coords( + [filtered_ds_atmos, precip], coords="mimal" + ) + # filter wave to requested variables + with xr.open_dataset(ds_wave_file, engine="pynio") as ds_wave: + filtered_ds_wave = ds_wave.get(WAVE_PARAM_NAMES) + + # concatinate atmos and wave into a single dataset + combined_product = filtered_ds_atmos.merge( + filtered_ds_wave.reindex_like(filtered_ds_atmos, method="nearest") + ) + + # transfer to pandas + df = combined_product.to_dataframe() + + # convert longitude values into the standard range of -180 degrees to +180 degrees + # TODO: do we want to do it? + latitudes = df.index.get_level_values("lat_0") + longitudes = df.index.get_level_values("lon_0") + + map_function = lambda lon: (lon - 360) if (lon > 180) else lon + remapped_longitudes = longitudes.map(map_function) + df["longitude"] = remapped_longitudes + df["latitude"] = latitudes + # dump datafrate to csv on disk + if forecast_hour == 0: + df.to_csv( + os.path.join( + SAVE_DIR, + save_to, + ), + index=False, + ) + else: + df.to_csv( + os.path.join(SAVE_DIR, save_to), + index=False, + mode="a", + header=False, + ) + + # clean up grib files + os.remove(ds_wave_file) + os.remove(ds_atmos_file) + + return save_to diff --git a/app/settings.py b/app/settings.py index 38333a9..2b726a3 100755 --- a/app/settings.py +++ b/app/settings.py @@ -7,6 +7,98 @@ from utils.tools import make_dir # Paths BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) -SAVE_DIR = '/tmp/grib/' # don't forget to create it! +SAVE_DIR = "/tmp/grib/" # don't forget to create it! -MAX_FORCAST_HOUR = 5 # 0-384 beware, each file is 500mb +MAX_FORECAST_HOUR = 5 # 0-384 beware, each file is 500mb + +# Globals with variable names and comments how to get them +""" +has_height_attr_names = { + "V-component of wind", #1 + "U-component of wind", #1 + "Temperature", #4 + "Relative humidity", #5 + "Pressure reduced to MSL", #7 +} + +surface_attr_names = { + "Wind speed (gust)", #2 + "Total precipitation", #3 + "Visibility", # 6 +} +level_type_names = { + "Specified height level above ground (m)", + "Mean sea level (Pa)" +} +grib_names = [] +for v in ds_atmos: + if ds_atmos[v].attrs["long_name"] in surface_attr_names: + print("{}, {}, {}".format(v, + ds_atmos[v].attrs["long_name"], + ds_atmos[v].attrs["units"])) + grib_names.append(v) + if "level_type" not in ds_atmos[v].attrs: + continue + if ds_atmos[v].attrs["level_type"] not in level_type_names: + continue + if ds_atmos[v].attrs["long_name"] in has_height_attr_names: + print("{}, {}, {}".format(v, + ds_atmos[v].attrs["long_name"], + ds_atmos[v].attrs["units"])) + grib_names.append(v) + + +height_param_names = {} +for name in grib_names: + non_coord_params = [a for a in ds_atmos[name].dims if a != "lat_0" and a != "lon_0"] + if len(non_coord_params) > 0: + height_param, = non_coord_params + print(name, height_param) + height_param_names[name] = height_param + +atmos_param_names = [name for name in grib_names if name not in height_param_names] + + +surface_attr_names = { + "Significant height of combined wind waves and swell", #8 + "Significant height of wind waves", #8 + "Significant height of swell waves", #8 + "Direction of wind waves", #9 + "Direction of Swell Waves", #9 + "Mean period of swell waves", #10 + "Mean period of wind waves", #10 +} +grib_names = [] +for v in ds_wave: + if ds_wave[v].attrs["long_name"] in surface_attr_names: + print("{}, {}, {}".format(v, + ds_wave[v].attrs["long_name"], + ds_wave[v].attrs["units"])) + grib_names.append(v) +""" + +HEIGHT_PARAM_NAMES = { + "UGRD_P0_L103_GLL0": "lv_HTGL7", # 1 + "VGRD_P0_L103_GLL0": "lv_HTGL7", # 1 + "TMP_P0_L103_GLL0": "lv_HTGL2", # 4 +} + +ATMOS_PARAM_NAMES = [ + "GUST_P0_L1_GLL0", # 2 + "APCP_P8_L1_GLL0_acc", # 3 # only appears after 000 h + "RH_P0_L103_GLL0", # 5 + "VIS_P0_L1_GLL0", # 6 + "PRMSL_P0_L101_GLL0", # 7 +] + +WIND_HEIGHT = 10.0 +TEMPERATURE_HEIGHT = 2.0 + +WAVE_PARAM_NAMES = [ + "HTSGW_P0_L1_GLL0", # 8 + "WVHGT_P0_L1_GLL0", # 8 + "SWDIR_P0_L241_GLL0", # 9 + "WVDIR_P0_L1_GLL0", # 9 + "WVPER_P0_L1_GLL0", # 10 + "SWPER_P0_L241_GLL0", # 10 +]