diff --git a/app/main.py b/app/main.py index 585df7a..c32e125 100755 --- a/app/main.py +++ b/app/main.py @@ -51,7 +51,7 @@ async def get_test(): @app.get("/download/") async def download_grib(background_tasks: BackgroundTasks): """Download and process GRIB files into csv of requested parameters""" - background_tasks.add_task(extract_useful_data) + background_tasks.add_task(service_grib.extract_useful_data) return JSONResponse(content={"status": "Background task started"}) @@ -61,138 +61,3 @@ async def download_grib(background_tasks: BackgroundTasks): @repeat_every(seconds=(1 * 60)) async def task_gc() -> None: gc.collect() - - -def fresh_grib_time(target_time=None): - """Find most recent available GRIB file - for GFS atmospheric and GFSWave forecasts - - :param target_time: no more recent than this time - :returns: most recent time GRIB is available - """ - if not target_time: - target_time = datetime.now( - tz=ZoneInfo("US/Eastern") - ) # noaa is located in washington - - for fallback in range(4): - # wasting a bit of time here on useless first cycle (mult by 0, subtract 0) - # allows us to kepp code a bit neater later - fallback_hours = fallback * 6 - target_time = target_time - timedelta(hours=fallback_hours) - - if not service_grib.is_reachable( - service_grib.form_gfs_link(target_time=target_time, prod_hour=384) - ): - continue - if not service_grib.is_reachable( - service_grib.form_gfswave_link(target_time=target_time, prod_hour=384) - ): - continue - break - else: - raise Exception("No forecasts in the last 24 hours were reachable") - - # both forecasts are reachable @ target_time - return target_time - - -def download_fresh_grib(target_time, prod_hour=0): - """Download GRIB files for atmos and wave - - :param target_time: download GRIB from this time - :param prod_hour: download forecast for this hour - :returns: filenames where GRIB files are downloaded to - """ - gfs_atmos = service_grib.form_gfs_link(target_time=target_time, prod_hour=prod_hour) - gfs_wave = service_grib.form_gfswave_link( - target_time=target_time, prod_hour=prod_hour - ) - - return ( - wget.download(gfs_atmos, out=SAVE_DIR), - wget.download(gfs_wave, out=SAVE_DIR), - ) - - -def extract_useful_data(): - """Download and process GRIB files into csv of requested parameters - - :returns: filenames of the resulting csv - """ - target_time = fresh_grib_time() - save_to = f"{target_time.strftime('%Y%m%d')}-{str((target_time.hour // 6) * 6).zfill(2)}.csv" - - for forecast_hour in range(MAX_FORECAST_HOUR): - # download gribs - ds_atmos_file, ds_wave_file = download_fresh_grib( - target_time=target_time, prod_hour=forecast_hour - ) - - # filter atmos to requested variables - with xr.open_dataset(ds_atmos_file, engine="pynio") as ds_atmos: - filtered_ds_atmos = ds_atmos.get(ATMOS_PARAM_NAMES) or ds_atmos.get( - [p for p in ATMOS_PARAM_NAMES if not p == "APCP_P8_L1_GLL0_acc"] - ) # skip running total column in the first forecast - - for name, param in HEIGHT_PARAM_NAMES.items(): - if name == "TMP_P0_L103_GLL0": - level = TEMPERATURE_HEIGHT - else: - level = WIND_HEIGHT - filtered_ds_atmos[name] = ( - ds_atmos[name] - .sel({param: level}) - .assign_attrs(level=level) - .drop_vars(param) - ) - # if hour==0 add running total column from future forecasts - if forecast_hour == 0: - precip = xr.zeros_like(filtered_ds_atmos["GUST_P0_L1_GLL0"]) - precip.name = "APCP_P8_L1_GLL0_acc" - filtered_ds_atmos = xr.combine_by_coords( - [filtered_ds_atmos, precip], coords="mimal" - ) - # filter wave to requested variables - with xr.open_dataset(ds_wave_file, engine="pynio") as ds_wave: - filtered_ds_wave = ds_wave.get(WAVE_PARAM_NAMES) - - # concatinate atmos and wave into a single dataset - combined_product = filtered_ds_atmos.merge( - filtered_ds_wave.reindex_like(filtered_ds_atmos, method="nearest") - ) - - # transfer to pandas - df = combined_product.to_dataframe() - - # convert longitude values into the standard range of -180 degrees to +180 degrees - # TODO: do we want to do it? - latitudes = df.index.get_level_values("lat_0") - longitudes = df.index.get_level_values("lon_0") - - map_function = lambda lon: (lon - 360) if (lon > 180) else lon - remapped_longitudes = longitudes.map(map_function) - df["longitude"] = remapped_longitudes - df["latitude"] = latitudes - # dump datafrate to csv on disk - if forecast_hour == 0: - df.to_csv( - os.path.join( - SAVE_DIR, - save_to, - ), - index=False, - ) - else: - df.to_csv( - os.path.join(SAVE_DIR, save_to), - index=False, - mode="a", - header=False, - ) - - # clean up grib files - os.remove(ds_wave_file) - os.remove(ds_atmos_file) - - return save_to diff --git a/services/grib/service.py b/services/grib/service.py index 1fe8fc9..5831e9c 100644 --- a/services/grib/service.py +++ b/services/grib/service.py @@ -56,3 +56,138 @@ class Grib: hour_str = str((target_time.hour // 6) * 6).zfill(2) target_url = f"https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod/gfs.{date_str}/{hour_str}/{looking_at}/gridded/gfs{looking_at}.t{hour_str}z.global.0p25.f{prod_hour}.grib2" return target_url + + @staticmethod + def fresh_grib_time(target_time=None): + """Find most recent available GRIB file + for GFS atmospheric and GFSWave forecasts + + :param target_time: no more recent than this time + :returns: most recent time GRIB is available + """ + if not target_time: + target_time = datetime.now( + tz=ZoneInfo("US/Eastern") + ) # noaa is located in washington + + for fallback in range(4): + # wasting a bit of time here on useless first cycle (mult by 0, subtract 0) + # allows us to kepp code a bit neater later + fallback_hours = fallback * 6 + target_time = target_time - timedelta(hours=fallback_hours) + + if not Grib.is_reachable( + Grib.form_gfs_link(target_time=target_time, prod_hour=384) + ): + continue + if not Grib.is_reachable( + Grib.form_gfswave_link(target_time=target_time, prod_hour=384) + ): + continue + break + else: + raise Exception("No forecasts in the last 24 hours were reachable") + + # both forecasts are reachable @ target_time + return target_time + + @staticmethod + def download_fresh_grib(target_time, prod_hour=0): + """Download GRIB files for atmos and wave + + :param target_time: download GRIB from this time + :param prod_hour: download forecast for this hour + :returns: filenames where GRIB files are downloaded to + """ + gfs_atmos = Grib.form_gfs_link(target_time=target_time, prod_hour=prod_hour) + gfs_wave = Grib.form_gfswave_link(target_time=target_time, prod_hour=prod_hour) + + return ( + wget.download(gfs_atmos, out=SAVE_DIR), + wget.download(gfs_wave, out=SAVE_DIR), + ) + + @staticmethod + def extract_useful_data(): + """Download and process GRIB files into csv of requested parameters + + :returns: filenames of the resulting csv + """ + target_time = Grib.fresh_grib_time() + save_to = f"{target_time.strftime('%Y%m%d')}-{str((target_time.hour // 6) * 6).zfill(2)}.csv" + + for forecast_hour in range(MAX_FORECAST_HOUR): + # download gribs + ds_atmos_file, ds_wave_file = Grib.download_fresh_grib( + target_time=target_time, prod_hour=forecast_hour + ) + + # filter atmos to requested variables + with xr.open_dataset(ds_atmos_file, engine="pynio") as ds_atmos: + filtered_ds_atmos = ds_atmos.get(ATMOS_PARAM_NAMES) or ds_atmos.get( + [p for p in ATMOS_PARAM_NAMES if not p == "APCP_P8_L1_GLL0_acc"] + ) # skip running total column in the first forecast + + for name, param in HEIGHT_PARAM_NAMES.items(): + if name == "TMP_P0_L103_GLL0": + level = TEMPERATURE_HEIGHT + else: + level = WIND_HEIGHT + filtered_ds_atmos[name] = ( + ds_atmos[name] + .sel({param: level}) + .assign_attrs(level=level) + .drop_vars(param) + ) + # if hour==0 add running total column from future forecasts + if forecast_hour == 0: + precip = xr.zeros_like(filtered_ds_atmos["GUST_P0_L1_GLL0"]) + precip.name = "APCP_P8_L1_GLL0_acc" + filtered_ds_atmos = xr.combine_by_coords( + [filtered_ds_atmos, precip], coords="mimal" + ) + # filter wave to requested variables + with xr.open_dataset(ds_wave_file, engine="pynio") as ds_wave: + filtered_ds_wave = ds_wave.get(WAVE_PARAM_NAMES) + + # concatinate atmos and wave into a single dataset + combined_product = filtered_ds_atmos.merge( + filtered_ds_wave.reindex_like( + filtered_ds_atmos, method="nearest" + ) + ) + + # transfer to pandas + df = combined_product.to_dataframe() + + # convert longitude values into the standard range of -180 degrees to +180 degrees + # TODO: do we want to do it? + latitudes = df.index.get_level_values("lat_0") + longitudes = df.index.get_level_values("lon_0") + + map_function = lambda lon: (lon - 360) if (lon > 180) else lon + remapped_longitudes = longitudes.map(map_function) + df["longitude"] = remapped_longitudes + df["latitude"] = latitudes + # dump datafrate to csv on disk + if forecast_hour == 0: + df.to_csv( + os.path.join( + SAVE_DIR, + save_to, + ), + index=False, + ) + else: + df.to_csv( + os.path.join(SAVE_DIR, save_to), + index=False, + mode="a", + header=False, + ) + + # clean up grib files + os.remove(ds_wave_file) + os.remove(ds_atmos_file) + + return save_to