import os from typing import cast import catboost import geopandas as gpd import numpy as np import pandas as pd import shap import psycopg2 import psycopg2.extras import sqlalchemy from celery import shared_task from django.db.models import F from scipy import interpolate from scipy.spatial import distance from shapely import wkb from sklearn import metrics from sklearn import model_selection as ms from sqlalchemy import text from django.contrib.gis.db.models.functions import Distance from postamates.settings import AGE_DAY_LIMIT from postamates.settings import DB_URL, STATUS_TASK_NAME, STATUS_TASK_NAME_IMPORT from service.models import PlacementPoint, LastMLCall from service import models from service.utils import log import base64 from io import StringIO from django.core.cache import cache from service.layer_service import LayerService from service.service import PointService from service.utils import run_psql_command, change_status def get_engine(): try: log("try connect to db") return sqlalchemy.create_engine( DB_URL, connect_args={"options": "-csearch_path=public"}, ) except: log("error connect to db") raise def do_computations(table_name: str, need_time: bool, task_name: str): def count_nearby_pvz(): raschet_objs = models.RaschetObjects.objects.all() if not raschet_objs: return change_status("Начало расчета кол-ва ПВЗ вокруг точек", task_name=task_name) total = raschet_objs.count() for _i, r_o in enumerate(raschet_objs): obj = models.Post_and_pvz.objects.get(id=r_o.obj_id) LayerService().count_post_pvz_for_placementpoint(obj) change_status( f"Подсчет кол-ва ПВЗ вокруг точек: {str(int((_i + 1) / total * 100))}%", task_name=task_name, ) change_status("Подсчет кол-ва ПВЗ вокруг точек завершен", task_name=task_name) def calculate_dist_for_groups(): group_objects = models.RaschetGroups.objects.all() group_total = group_objects.count() if group_objects: change_status("Начало расчета расстояний", task_name=task_name) qs = models.PlacementPoint.objects.all() for _k, g_o in enumerate(group_objects): g = models.Post_and_pvzGroup.objects.get(id=g_o.obj_id) for q in qs: PointService.calculate_dist_for_group(point=q, group=g) change_status( f"Подсчет расстояний: {str(int((_k + 1) / group_total * 100))}%", task_name=task_name, ) change_status("Расчет завершен", task_name=task_name) def process_raschet_objects(): count_nearby_pvz() calculate_dist_for_groups() models.RaschetObjects.objects.all().delete() models.RaschetGroups.objects.all().delete() print("start raschet") # status, _ = models.TaskStatus.objects.get_or_create(task_name=STATUS_TASK_NAME) # Запуск ML process_raschet_objects() change_status("Шаг 3 из 3 (Прогноз трафика в точках).", task_name=task_name) log(f"{table_name} start raschet") engine = get_engine() try: query = text("select * from service_placementpoint") connection = engine.connect() pts_df = pd.read_sql(query, connection) pts_df.geometry = pts_df.geometry.apply(wkb.loads, hex=True) pts = gpd.GeoDataFrame(pts_df, geometry="geometry", crs="epsg:4326") pts = pts.to_crs("epsg:32637") feats = [ "id", "metro_dist", "target_dist", "property_price_bargains", "property_price_offers", "property_mean_floor", "property_era", "flats_cnt", "popul_home", "popul_job", "yndxfood_sum", "yndxfood_cnt", "school_cnt", "kindergar_cnt", "target_post_cnt", "public_stop_cnt", "sport_center_cnt", "pharmacy_cnt", "supermarket_cnt", "supermarket_premium_cnt", "clinic_cnt", "bank_cnt", "reca_cnt", "lab_cnt", "culture_cnt", "attraction_cnt", "mfc_cnt", "bc_cnt", "tc_cnt", "rival_pvz_cnt", "rival_post_cnt", "business_activity", "age_day", "target_cnt_ao_mean", ] # Записи для обучения pts_trn = pts.loc[pts.name == "постамат Яндекс.Маркет из обучающей выборки"].reset_index(drop=True) pts_trn = pts_trn.sort_values("address").reset_index(drop=True) pts_trn = gpd.GeoDataFrame(pts_trn, geometry="geometry", crs="epsg:32637") pts_target = cast(gpd.GeoDataFrame, pts_trn[["geometry"]]) pts_target["cnt"] = 1 pts_target = gpd.GeoDataFrame(pts_target, geometry="geometry", crs="epsg:32637") target_feature_coords = [] for i in range(0, len(pts_target)): target_feature_coords.append((pts_target.geometry.x[i], pts_target.geometry.y[i])) target_feature_coords = np.array(target_feature_coords) pts_trn["target_dist"] = pts_trn.apply( lambda x: (sorted(distance.cdist([[x["geometry"].x, x["geometry"].y]], target_feature_coords)[0])[1]), axis=1, ) pts_trn.loc[pts_trn.target_dist > 700, "target_dist"] = 700 pts_trn["buf"] = pts_trn.buffer(500) pts_trn = gpd.GeoDataFrame(pts_trn, geometry="buf", crs="epsg:32637") target_post = gpd.sjoin(pts_trn, pts_target, op="contains").groupby("id", as_index=False).agg({"cnt": "count"}) target_post = target_post.rename(columns={"cnt": "target_post_cnt"}) pts_trn = pts_trn.drop(columns=["target_post_cnt"]) pts_trn = pts_trn.join(target_post.set_index("id"), on="id") pts_trn["target_post_cnt"] = pts_trn.target_post_cnt - 1 pts_trn = pts_trn.sort_values(by="id").reset_index(drop=True) X_trn = pts_trn[feats].drop(columns=["id"]) Y_trn = pts_trn[["fact"]] # status.status = 'Записи для инференса' # status.save() # change_status('Записи для инференса', task_name=task_name) # Записи для инференса if table_name == "service_placementpoint": pts_inf = pts.loc[ (pts.status == "Pending") | (pts.status == "Installation") | (pts.status == "Cancelled") | ((pts.status == "Working") & (pts.sample_trn == False)) ].reset_index(drop=True) elif table_name == "service_preplacementpoint": pts_inf_df = pd.DataFrame(engine.connect().execute(text(f"select * from {table_name}"))) pts_inf_df = pts_inf_df.loc[pts_inf_df.matching_status == "New"].reset_index(drop=True) pts_inf_df["geometry"] = pts_inf_df["geometry"].apply(wkb.loads, hex=True) pts_inf = gpd.GeoDataFrame(pts_inf_df, geometry="geometry", crs="epsg:4326") else: raise ValueError("Unexpected `table_name` parameter") if not len(pts_inf): raise Exception("Empty point set") pts_inf: gpd.GeoDataFrame = pts_inf.to_crs("epsg:32637") pts_inf["buf"] = pts_inf.buffer(500) pts_inf = gpd.GeoDataFrame(pts_inf, geometry="buf", crs="epsg:32637") pts_target = pts.loc[(pts.status == "Working") | (pts.status == "Installation")].reset_index(drop=True) pts_target = pts_target[["geometry"]] pts_target["cnt"] = 1 pts_target = gpd.GeoDataFrame(pts_target, geometry="geometry", crs="epsg:32637") target_feature_coords = [] for i in range(0, len(pts_target)): target_feature_coords.append((pts_target.geometry.x[i], pts_target.geometry.y[i])) target_feature_coords = np.array(target_feature_coords) def dist_working(x): def cdist_(x): return distance.cdist([[x["geometry"].x, x["geometry"].y]], target_feature_coords)[0] return ( sorted(cdist_(x))[1] if x.status in {"Working", "Installation"} else sorted(cdist_(x))[0] ) pts_inf["target_dist"] = pts_inf.apply(dist_working, axis=1) pts_inf.loc[pts_inf.target_dist > 700, "target_dist"] = 700 pts_inf = pts_inf.sort_values(by="id").reset_index(drop=True) # type: ignore target_post = gpd.sjoin(pts_inf, pts_target, op="contains").groupby("id", as_index=False).agg({"cnt": "count"}) target_post = target_post.rename(columns={"cnt": "target_post_cnt"}) pts_inf = pts_inf.drop(columns=["target_post_cnt"]) pts_inf = pts_inf.join(target_post.set_index("id"), on="id") pts_inf.target_post_cnt = pts_inf.target_post_cnt.fillna(0) pts_inf.target_post_cnt = pts_inf.apply( lambda x: ( (x.target_post_cnt - 1) if ((x.status == "Working") or (x.status == "Installation")) else x.target_post_cnt ), axis=1, ) pts_inf["age_day_init"] = pts_inf["age_day"] pts_inf["age_day"] = 240 X_inf = pts_inf[feats].dropna() seeds = [99, 87, 21, 15] # Обучение, инференс r2_scores = [] mapes = [] x_inf: gpd.GeoDataFrame = X_inf.drop(columns=["id"]) # type: ignore y_infers = [] # status.status = 'Обучение inference 0%' # status.save() # change_status('Обучение inference 0%', task_name=task_name) model = catboost.CatBoostRegressor() for i in seeds: # status.status = 'Обучение inference: ' + str(int((seeds.index(i) + 1) / len(seeds) * 100)) + '%' # status.save() # change_status(f'Обучение inference: {str(int((seeds.index(i) + 1) / len(seeds) * 100))}%', # task_name=task_name) x_trn, x_test, y_trn, y_test = ms.train_test_split(X_trn, Y_trn, test_size=0.2, random_state=i) model = catboost.CatBoostRegressor(cat_features=["property_era"], random_state=i) model.fit(x_trn, y_trn, verbose=False) r2_score = metrics.r2_score(y_test, model.predict(x_test)) mape = metrics.mean_absolute_percentage_error(y_test, model.predict(x_test)) if (r2_score > 0.45) & (mape < 0.25): r2_scores.append(r2_score) mapes.append(mape) y_infers.append(model.predict(x_inf)) # change_status('Обучение inference 100%', task_name=task_name) # status.status = 'Обучение inference 100%' current_pred = sum(y_infers) / 4 # расчет шапов explainer = shap.TreeExplainer(model) shap_values = explainer(x_inf) shap_fields = pd.DataFrame(shap_values.values) shap_fields.columns = x_inf.columns + "_shap" shap_fields = shap_fields.drop(columns=["age_day_shap"]) shap_fields["sum"] = abs(shap_fields).sum(axis=1) shap_fields = round(shap_fields.iloc[:, :32].div(shap_fields["sum"], axis=0) * 100, 2) # Обновление полей по результатам работы модели update_fields = pts_inf[ [ "id", "age_day_init", "status", "fact", "delta_current", "delta_first", "plan_current", "plan_first", "prediction_first", "target_post_cnt", "target_dist", ] ].dropna() update_fields = update_fields.join( pd.concat( [ X_inf[["id"]], pd.DataFrame({"prediction_current": current_pred}), ], axis=1, ).set_index("id"), on="id", ).dropna() update_fields["prediction_current"] = update_fields["prediction_current"].astype(int) days_x = np.array([0, 30, 60, 90, 120, 150, 180, 210, 240, 270]) perc_y = np.array([0, 0.15, 0.20, 0.30, 0.60, 0.70, 0.70, 0.75, 0.75, 0.80]) spl = interpolate.splrep(days_x, perc_y) update_fields["plan_first"] = update_fields.apply( lambda x: (x.prediction_first * interpolate.splev(x.age_day_init, spl) if x.status == "Working" else 0), axis=1, ) update_fields["plan_current"] = update_fields.apply( lambda x: (x.prediction_current * interpolate.splev(x.age_day_init, spl) if x.status == "Working" else 0), axis=1, ) update_fields["delta_first"] = update_fields.apply( lambda x: ((x.fact - x.plan_first) / x.plan_first * 100 if x.status == "Working" else 0), axis=1, ) update_fields["delta_current"] = update_fields.apply( lambda x: ((x.fact - x.plan_current) / x.plan_current * 100 if x.status == "Working" else 0), axis=1, ) update_fields_working = update_fields.loc[update_fields.status == "Working"].reset_index(drop=True) update_fields_working = update_fields_working.fillna(0) connection.close() except Exception as e: log(f"Ошибка при обновлении полей в базе данных: {e}") raise log("Начинается обновление полей в базе") if len(pts_inf) > 0: # status.status = 'Перерасчет ML: 50%' # status.save() # change_status('Перерасчет ML: 50%', task_name=task_name) # Загрузка в базу обновленных значений try: log("Подключение к базе данных 2") conn2 = psycopg2.connect( database=os.getenv("POSTGRES_DB", "postgres"), user=os.getenv("POSTGRES_USER", "postgres"), password=os.getenv("POSTGRES_PASSWORD", "postgres"), host=os.getenv("POSTGRES_HOST", "postgres"), port=os.getenv("POSTGRES_PORT", "postgres"), options="-c search_path=public", ) cursor = conn2.cursor() except: conn2 = None log("Не удалось подключиться к базе данных") raise if conn2 is not None: # апдейт шапов update_fields_shap = pd.concat([shap_fields, update_fields[["id"]]], axis=1).dropna() update_records0 = [] for i in update_fields_shap.id.keys(): update_records1 = [] for n in list(update_fields_shap): update_records1.append(int(update_fields_shap[n][i])) update_records0.append(tuple(update_records1)) shap_fields_name = str(list(shap_fields))[1:-1].replace("'", "").replace(",", "=%s,") sql_update_query = f"""Update {table_name} set {shap_fields_name} = %s where id = %s""" try: psycopg2.extras.execute_batch(cursor, sql_update_query, update_records0) # это тоже долго conn2.commit() except: cursor.execute("ROLLBACK") psycopg2.extras.execute_batch(cursor, sql_update_query, update_records0) conn2.commit() # target_post_cnt update_records1 = [] for i in update_fields.id.keys(): update_records1.append((int(update_fields.target_post_cnt[i]), int(update_fields.id[i]))) sql_update_query = f"""Update {table_name} set target_post_cnt = %s where id = %s""" try: psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1) conn2.commit() except: cursor.execute("ROLLBACK") psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1) conn2.commit() # target_dist update_records1 = [] for i in update_fields.id.keys(): update_records1.append((int(update_fields.target_dist[i]), int(update_fields.id[i]))) sql_update_query = f"""Update {table_name} set target_dist = %s where id = %s""" try: psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1) conn2.commit() except: cursor.execute("ROLLBACK") psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1) conn2.commit() # prediction_current update_records1 = [] for i in update_fields.id.keys(): update_records1.append((int(update_fields.prediction_current[i]), int(update_fields.id[i]))) sql_update_query = f"""Update {table_name} set prediction_current = %s where id = %s""" try: psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1) conn2.commit() except Exception: cursor.execute("ROLLBACK") psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1) conn2.commit() # plan_first вот это очень долго update_records2 = [] for i in update_fields_working.id.keys(): update_records2.append( ( int(update_fields_working.plan_first[i]), int(update_fields_working.id[i]), ) ) sql_update_query = f"""Update {table_name} set plan_first = %s where id = %s""" try: psycopg2.extras.execute_batch(cursor, sql_update_query, update_records2) conn2.commit() except Exception: cursor.execute("ROLLBACK") psycopg2.extras.execute_batch(cursor, sql_update_query, update_records2) conn2.commit() # plan_current update_records3 = [] for i in update_fields_working.id.keys(): update_records3.append( ( int(update_fields_working.plan_current[i]), int(update_fields_working.id[i]), ) ) sql_update_query = f"""Update {table_name} set plan_current = %s where id = %s""" try: psycopg2.extras.execute_batch(cursor, sql_update_query, update_records3) conn2.commit() except Exception: cursor.execute("ROLLBACK") psycopg2.extras.execute_batch(cursor, sql_update_query, update_records3) conn2.commit() # delta_first update_records4 = [] for i in update_fields_working.id.keys(): update_records4.append( ( int(update_fields_working.delta_first[i]), int(update_fields_working.id[i]), ) ) sql_update_query = f"""Update {table_name} set delta_first = %s where id = %s""" try: psycopg2.extras.execute_batch(cursor, sql_update_query, update_records4) conn2.commit() except Exception: cursor.execute("ROLLBACK") psycopg2.extras.execute_batch(cursor, sql_update_query, update_records4) conn2.commit() # delta_current update_records5 = [] for i in update_fields_working.id.keys(): update_records5.append( ( int(update_fields_working.delta_current[i]), int(update_fields_working.id[i]), ) ) sql_update_query = f"""Update {table_name} set delta_current = %s where id = %s""" try: psycopg2.extras.execute_batch(cursor, sql_update_query, update_records5) conn2.commit() except Exception: cursor.execute("ROLLBACK") psycopg2.extras.execute_batch(cursor, sql_update_query, update_records5) conn2.commit() conn2.close() cache.clear() else: log("len(pts_inf) <= 0") run_psql_command() log("end raschet") # status.status = 'Перерасчет ML завершен' # status.save() change_status("Перерасчет ML завершен", task_name=task_name) if need_time: LastMLCall.objects.all().delete() LastMLCall.objects.create() @shared_task() def raschet(table_name="service_placementpoint", need_time=True, task_name=STATUS_TASK_NAME): try: do_computations(table_name, need_time, task_name) finally: change_status("Перерасчет ML завершен", task_name=task_name) @shared_task def load_post_and_pvz(obj_id: int): file = models.TempFiles.objects.get(id=obj_id) status, _ = models.TaskStatus.objects.get_or_create(task_name="Загрузка ПВЗ и Постаматов") excel_file = base64.b64decode(file.data) df = pd.read_excel(excel_file) df = df.replace(np.nan, None) df = df.replace("NaT", None) df.columns = df.columns.str.lower() data_len = df.shape[0] for _ind, row in enumerate(df.to_dict("records")): status.status = "Загрузка данных: " + str(int(_ind / data_len * 100)) + "%" status.save() category = row.get("category") group = row.get("group") if category: cat, _ = models.Post_and_pvzCategory.objects.get_or_create(name=category) if group: gr, _ = models.Post_and_pvzGroup.objects.get_or_create(name=group, category=cat) row["category"] = cat row["group"] = gr lon = str(row.pop("lon")) lat = str(row.pop("lat")) row["wkt"] = "POINT(" + lon + " " + lat + ")" models.Post_and_pvz.objects.get_or_create(**row) status.status = "Загрузка данных завершена" status.save() groups = df[["group", "category"]].drop_duplicates().to_dict(orient="records") points = models.PlacementPoint.objects.all() num_points = points.count() total = len(groups) * num_points for _i, gr in enumerate(groups): group = models.Post_and_pvzGroup.objects.get(name=gr["group"], category__name=gr["category"]) for _j, point in enumerate(points): status.status = "Подсчет расстояний: " + str(int((num_points * _i + _j) / total * 100)) + "%" status.save() post_object = ( models.Post_and_pvz.objects.filter(group__id=group.id) .annotate(distance=Distance("wkt", point.geometry)) .order_by("distance") .first() ) d = models.PlacementPointPVZDistance.objects.filter( placement_point=point, pvz_postamates_group=group ).first() if d: if d.dist > post_object.distance.m: d.dist = post_object.distance.m d.save() else: models.PlacementPointPVZDistance.objects.create( placement_point=point, pvz_postamates_group=group, dist=post_object.distance.m, ) status.status = "Подсчет расстояний завершен" status.save() point_qs = models.PlacementPoint.objects.all() data_len = models.PlacementPoint.objects.count() for _ind, point in enumerate(point_qs): status.status = "Пересчет параметров точек: " + str(int(_ind / data_len * 100)) + "%" status.save() LayerService.count_post_pvz(point) status.status = "Завершено" cache.clear() status.save() run_psql_command() @shared_task() def add_age_day(): qs = PlacementPoint.objects.filter(status="Working") # c1 = qs.filter(sample_trn=True).count() qs.update(age_day=F("age_day") + 1) qs2 = qs.filter(age_day__gt=AGE_DAY_LIMIT) qs2.update(sample_trn=True) # c2 = PlacementPoint.objects.filter(sample_trn=True).count() # if c2 - c1 != 0: # raschet.delay() @shared_task() def load_other_objects(obj_id: int): file = models.TempFiles.objects.get(id=obj_id) status, _ = models.TaskStatus.objects.get_or_create(task_name="Загрузка Прочих объектов") excel_file = base64.b64decode(file.data) df = pd.read_excel(excel_file) df = df.replace(np.nan, None) df = df.replace("NaT", None) df.columns = df.columns.str.lower() data_len = df.shape[0] for _ind, row in enumerate(df.to_dict("records")): status.status = "Загрузка данных: " + str(int(_ind / data_len * 100)) + "%" status.save() category = row.get("category") group = row.get("group") if category: cat, _ = models.OtherObjectsCategory.objects.get_or_create(name=category) if group: gr, _ = models.OtherObjectsGroup.objects.get_or_create(name=group, category=cat) row["category"] = cat row["group"] = gr lon = str(row.pop("lon")) lat = str(row.pop("lat")) row["wkt"] = "POINT(" + lon + " " + lat + ")" models.OtherObjects.objects.get_or_create(**row) status.status = "Загрузка данных завершена" cache.clear() status.save() run_psql_command() @shared_task() def load_data(obj_id: int): status, _ = models.TaskStatus.objects.get_or_create(task_name="Загрузка Точек") file = models.TempFiles.objects.get(id=obj_id) csv_file = base64.b64decode(file.data) models.PlacementPoint.objects.all().delete() s = str(csv_file, "utf-8") data = StringIO(s) df = pd.read_csv(data, delimiter=";") df = df.replace(np.nan, None) df = df.replace("NaT", None) data_len = df.shape[0] for _ind, row in enumerate(df.to_dict("records")): status.status = "Загрузка данных: " + str(int(_ind / data_len * 100)) + "%" status.save() data = {k: row[k] for k in row.keys() if k not in ["id", "location_id", "area", "district", "age_month"]} models.PlacementPoint.objects.create(**data) status.status = "Загрузка данных завершена" status.save() models.TempFiles.objects.all().delete() @shared_task() def import_task(file_id): PointService().start_mathing(file_id) PointService().make_enrichment() # raschet('service_preplacementpoint', need_time=False, task_name=STATUS_TASK_NAME_IMPORT)