import os import catboost import geopandas as gpd import numpy as np import pandas as pd import shap import psycopg2 import sqlalchemy from celery import shared_task from django.db.models import F from scipy import interpolate from scipy.spatial import distance from shapely import wkb from sklearn import metrics from sklearn import model_selection as ms from sqlalchemy import text from django.contrib.gis.db.models.functions import Distance from postamates.settings import AGE_DAY_LIMIT from postamates.settings import DB_URL, STATUS_TASK_NAME, STATUS_TASK_NAME_IMPORT from service.models import PlacementPoint, LastMLCall from service import models from service.utils import log_to_telegram import base64 from io import StringIO from django.core.cache import cache from service.layer_service import LayerService from service.service import PointService from service.utils import run_psql_command, change_status @shared_task() def raschet(table_name='service_placementpoint', need_time=True, task_name=STATUS_TASK_NAME): print('start raschet') # status, _ = models.TaskStatus.objects.get_or_create(task_name=STATUS_TASK_NAME) raschet_objs = models.RaschetObjects.objects.all() if raschet_objs: change_status('Начало расчета кол-ва ПВЗ вокруг точек', task_name=task_name) total = raschet_objs.count() for _i, r_o in enumerate(raschet_objs): obj = models.Post_and_pvz.objects.get(id=r_o.obj_id) LayerService().count_post_pvz_for_placementpoint(obj) change_status(f'Подсчет кол-ва ПВЗ вокруг точек: {str(int((_i + 1) / total * 100))}%', task_name=task_name) change_status('Подсчет кол-ва ПВЗ вокруг точек завершен', task_name=task_name) group_objects = models.RaschetGroups.objects.all() group_total = group_objects.count() if group_objects: change_status('Начало расчета расстояний', task_name=task_name) qs = models.PlacementPoint.objects.all() for _k, g_o in enumerate(group_objects): g = models.Post_and_pvzGroup.objects.get(id=g_o.obj_id) for q in qs: PointService.calculate_dist_for_group(point=q, group=g) change_status(f'Подсчет расстояний: {str(int((_k + 1) / group_total * 100))}%', task_name=task_name) change_status('Расчет завершен', task_name=task_name) models.RaschetObjects.objects.all().delete() models.RaschetGroups.objects.all().delete() # Запуск ML change_status('Шаг 3 из 3 (Прогноз трафика в точках).', task_name=task_name) log_to_telegram(f'{table_name} start raschet') try: log_to_telegram('try connect to db') conn = sqlalchemy.create_engine( DB_URL, connect_args={'options': '-csearch_path=public'}, ) except: log_to_telegram('error connect to db') try: query = text('select * from service_placementpoint') connection = conn.connect() pts = pd.read_sql(query, connection) pts['geometry'] = pts['geometry'].apply(wkb.loads, hex=True) pts = gpd.GeoDataFrame(pts, geometry='geometry', crs='epsg:4326') pts = pts.to_crs('epsg:32637') feats = [ 'id', 'metro_dist', 'target_dist', 'property_price_bargains', 'property_price_offers', 'property_mean_floor', 'property_era', 'flats_cnt', 'popul_home', 'popul_job', 'yndxfood_sum', 'yndxfood_cnt', 'school_cnt', 'kindergar_cnt', 'target_post_cnt', 'public_stop_cnt', 'sport_center_cnt', 'pharmacy_cnt', 'supermarket_cnt', 'supermarket_premium_cnt', 'clinic_cnt', 'bank_cnt', 'reca_cnt', 'lab_cnt', 'culture_cnt', 'attraction_cnt', 'mfc_cnt', 'bc_cnt', 'tc_cnt', 'rival_pvz_cnt', 'rival_post_cnt', 'business_activity', 'age_day', 'target_cnt_ao_mean' ] # Записи для обучения pts_trn = pts.loc[pts.sample_trn == True].reset_index(drop=True) pts_trn = pts_trn.sort_values('address').reset_index(drop=True) pts_trn = gpd.GeoDataFrame(pts_trn, geometry='geometry', crs='epsg:32637') pts_target = pts_trn[['geometry']] pts_target['cnt'] = 1 pts_target = gpd.GeoDataFrame(pts_target, geometry='geometry', crs='epsg:32637') target_feature_coords = [] for i in range(0, len(pts_target)): target_feature_coords.append((pts_target.geometry.x[i], pts_target.geometry.y[i])) target_feature_coords = np.array(target_feature_coords) pts_trn['target_dist'] = pts_trn.apply( lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[1])), axis=1, ) pts_trn.loc[pts_trn.target_dist > 700, 'target_dist'] = 700 pts_trn['buf'] = pts_trn.buffer(500) pts_trn = gpd.GeoDataFrame(pts_trn, geometry='buf', crs='epsg:32637') target_post = gpd.sjoin(pts_trn, pts_target, op='contains').groupby('id', as_index=False).agg({'cnt': 'count'}) target_post = target_post.rename(columns={'cnt': 'target_post_cnt'}) pts_trn = pts_trn.drop(columns=['target_post_cnt']) pts_trn = pts_trn.join(target_post.set_index('id'), on='id') pts_trn['target_post_cnt'] = pts_trn['target_post_cnt'] - 1 pts_trn = pts_trn.sort_values(by='id').reset_index(drop=True) X_trn = pts_trn[feats].drop(columns=['id']) Y_trn = pts_trn[['fact']] # status.status = 'Записи для инференса' # status.save() #change_status('Записи для инференса', task_name=task_name) # Записи для инференса if table_name == 'service_placementpoint': pts_inf = pts.loc[(pts.status == 'Pending') | (pts.status == 'Installation') | (pts.status == 'Cancelled') | ((pts.status == 'Working') & (pts.sample_trn == False))].reset_index(drop=True) elif table_name == 'service_preplacementpoint': pts_inf = pd.DataFrame(conn.connect().execute(text(f"select * from {table_name}"))) pts_inf = pts_inf.loc[pts_inf.matching_status == 'New'].reset_index(drop=True) pts_inf['geometry'] = pts_inf['geometry'].apply(wkb.loads, hex=True) pts_inf = gpd.GeoDataFrame(pts_inf, geometry='geometry', crs='epsg:4326') if len(pts_inf) > 0: pts_inf = pts_inf.to_crs('epsg:32637') pts_inf['buf'] = pts_inf.buffer(500) pts_inf = gpd.GeoDataFrame(pts_inf, geometry='buf', crs='epsg:32637') pts_target = pts.loc[(pts.status == 'Working') | (pts.status == 'Installation')].reset_index(drop=True) pts_target = pts_target[['geometry']] pts_target['cnt'] = 1 pts_target = gpd.GeoDataFrame(pts_target, geometry='geometry', crs='epsg:32637') target_feature_coords = [] for i in range(0, len(pts_target)): target_feature_coords.append((pts_target.geometry.x[i], pts_target.geometry.y[i])) target_feature_coords = np.array(target_feature_coords) pts_inf['target_dist'] = pts_inf.apply( lambda x: ( (sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[1]) if ( (x.status == 'Working') or (x.status == 'Installation')) else (sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[0])), axis=1, ) pts_inf.loc[pts_inf.target_dist > 700, 'target_dist'] = 700 pts_inf = pts_inf.sort_values(by='id').reset_index(drop=True) target_post = gpd.sjoin(pts_inf, pts_target, op='contains').groupby('id', as_index=False).agg( {'cnt': 'count'}) target_post = target_post.rename(columns={'cnt': 'target_post_cnt'}) pts_inf = pts_inf.drop(columns=['target_post_cnt']) pts_inf = pts_inf.join(target_post.set_index('id'), on='id') pts_inf['target_post_cnt'] = pts_inf['target_post_cnt'].fillna(0) pts_inf['target_post_cnt'] = pts_inf.apply(lambda x: ((x.target_post_cnt - 1) if ( (x.status == 'Working') or (x.status == 'Installation')) else x.target_post_cnt), axis=1) pts_inf['age_day_init'] = pts_inf['age_day'] pts_inf['age_day'] = 240 X_inf = pts_inf[feats] seeds = [99, 87, 21, 15] # Обучение, инференс r2_scores = [] mapes = [] y_infers = [] # status.status = 'Обучение inference 0%' # status.save() #change_status('Обучение inference 0%', task_name=task_name) for i in seeds: # status.status = 'Обучение inference: ' + str(int((seeds.index(i) + 1) / len(seeds) * 100)) + '%' # status.save() #change_status(f'Обучение inference: {str(int((seeds.index(i) + 1) / len(seeds) * 100))}%', task_name=task_name) x_trn, x_test, y_trn, y_test = ms.train_test_split(X_trn, Y_trn, test_size=0.2, random_state=i) model = catboost.CatBoostRegressor(cat_features=['property_era'], random_state=i) model.fit(x_trn, y_trn, verbose=False) r2_score = metrics.r2_score(y_test, model.predict(x_test)) mape = metrics.mean_absolute_percentage_error(y_test, model.predict(x_test)) if ((r2_score > 0.45) & (mape < 0.25)): r2_scores.append(r2_score) mapes.append(mape) y_infers.append(model.predict(X_inf.drop(columns=['id']))) #change_status('Обучение inference 100%', task_name=task_name) # status.status = 'Обучение inference 100%' current_pred = sum(y_infers) / 4 # расчет шапов explainer = shap.TreeExplainer(model) shap_values = explainer(X_inf.drop(columns=['id'])) shap_fields = pd.DataFrame(shap_values.values) shap_fields.columns = X_inf.drop(columns=['id']).columns + '_shap' shap_fields = shap_fields.drop(columns=['age_day_shap']) shap_fields['sum'] = abs(shap_fields).sum(axis=1) shap_fields = round(shap_fields.iloc[:, :32].div(shap_fields['sum'], axis=0) * 100, 2) # Обновление полей по результатам работы модели update_fields = pts_inf[ [ 'id', 'age_day_init', 'status', 'fact', 'delta_current', 'delta_first', 'plan_current', 'plan_first', 'prediction_first', 'target_post_cnt', 'target_dist' ] ] update_fields = update_fields.join( pd.concat( [ X_inf[['id']], pd.DataFrame({'prediction_current': current_pred}), ], axis=1, ).set_index('id'), on='id', ) update_fields['prediction_current'] = update_fields['prediction_current'].astype(int) days_x = np.array([0, 30, 60, 90, 120, 150, 180, 210, 240, 270]) perc_y = np.array([0, 0.15, 0.20, 0.30, 0.60, 0.70, 0.70, 0.75, 0.75, 0.80]) spl = interpolate.splrep(days_x, perc_y) update_fields['plan_first'] = update_fields.apply( lambda x: (x.prediction_first * interpolate.splev(x.age_day_init, spl) if x.status == 'Working' else 0), axis=1, ) update_fields['plan_current'] = update_fields.apply( lambda x: ( x.prediction_current * interpolate.splev(x.age_day_init, spl) if x.status == 'Working' else 0), axis=1, ) update_fields['delta_first'] = update_fields.apply( lambda x: ((x.fact - x.plan_first) / x.plan_first * 100 if x.status == 'Working' else 0), axis=1, ) update_fields['delta_current'] = update_fields.apply( lambda x: ((x.fact - x.plan_current) / x.plan_current * 100 if x.status == 'Working' else 0), axis=1, ) update_fields_working = update_fields.loc[update_fields.status == 'Working'].reset_index(drop=True) update_fields_working = update_fields_working.fillna(0) connection.close() except Exception as e: log_to_telegram(f'Ошибка при обновлении полей в базе данных: {e}') log_to_telegram('Начинается обновление полей в базе') if len(pts_inf) > 0: # status.status = 'Перерасчет ML: 50%' # status.save() change_status('Перерасчет ML: 50%', task_name=task_name) # Загрузка в базу обновленных значений try: log_to_telegram('Подключение к базе данных 2') conn2 = psycopg2.connect( database=os.getenv('POSTGRES_DB', 'postgres'), user=os.getenv('POSTGRES_USER', 'postgres'), password=os.getenv('POSTGRES_PASSWORD', 'postgres'), host=os.getenv('POSTGRES_HOST', 'postgres'), port=os.getenv('POSTGRES_PORT', 'postgres'), options='-c search_path=public', ) cursor = conn2.cursor() except: conn2 = None log_to_telegram('Не удалось подключиться к базе данных') if conn2 is not None: # апдейт шапов update_fields_shap = pd.concat([shap_fields, update_fields[['id']]], axis=1) update_records0 = [] for i in range(0, len(update_fields_shap)): update_records1 = [] for n in list(update_fields_shap): update_records1.append(int(update_fields_shap[n][i])) update_records0.append(tuple(update_records1)) shap_fields_name = str(list(shap_fields))[1:-1].replace("'", "").replace(',', '=%s,') sql_update_query = f"""Update {table_name} set {shap_fields_name} = %s where id = %s""" try: psycopg2.extras.execute_batch(cursor, sql_update_query, update_records0) conn2.commit() except: cursor.execute("ROLLBACK") psycopg2.extras.execute_batch(cursor, sql_update_query, update_records0) conn2.commit() # target_post_cnt update_records1 = [] for i in range(0, len(update_fields)): update_records1.append((int(update_fields.target_post_cnt[i]), int(update_fields.id[i]))) sql_update_query = f"""Update {table_name} set target_post_cnt = %s where id = %s""" try: psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1) conn2.commit() except: cursor.execute("ROLLBACK") psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1) conn2.commit() # target_dist update_records1 = [] for i in range(0, len(update_fields)): update_records1.append((int(update_fields.target_dist[i]), int(update_fields.id[i]))) sql_update_query = f"""Update {table_name} set target_dist = %s where id = %s""" try: psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1) conn2.commit() except: cursor.execute("ROLLBACK") psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1) conn2.commit() # prediction_current update_records1 = [] for i in range(0, len(update_fields)): update_records1.append((int(update_fields.prediction_current[i]), int(update_fields.id[i]))) sql_update_query = f"""Update {table_name} set prediction_current = %s where id = %s""" try: psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1) conn2.commit() except Exception: cursor.execute('ROLLBACK') psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1) conn2.commit() # plan_first update_records2 = [] for i in range(0, len(update_fields_working)): update_records2.append((int(update_fields_working.plan_first[i]), int(update_fields_working.id[i]))) sql_update_query = f"""Update {table_name} set plan_first = %s where id = %s""" try: psycopg2.extras.execute_batch(cursor, sql_update_query, update_records2) conn2.commit() except Exception: cursor.execute('ROLLBACK') psycopg2.extras.execute_batch(cursor, sql_update_query, update_records2) conn2.commit() # plan_current update_records3 = [] for i in range(0, len(update_fields_working)): update_records3.append((int(update_fields_working.plan_current[i]), int(update_fields_working.id[i]))) sql_update_query = f"""Update {table_name} set plan_current = %s where id = %s""" try: psycopg2.extras.execute_batch(cursor, sql_update_query, update_records3) conn2.commit() except Exception: cursor.execute('ROLLBACK') psycopg2.extras.execute_batch(cursor, sql_update_query, update_records3) conn2.commit() # delta_first update_records4 = [] for i in range(0, len(update_fields_working)): update_records4.append((int(update_fields_working.delta_first[i]), int(update_fields_working.id[i]))) sql_update_query = f"""Update {table_name} set delta_first = %s where id = %s""" try: psycopg2.extras.execute_batch(cursor, sql_update_query, update_records4) conn2.commit() except Exception: cursor.execute('ROLLBACK') psycopg2.extras.execute_batch(cursor, sql_update_query, update_records4) conn2.commit() # delta_current update_records5 = [] for i in range(0, len(update_fields_working)): update_records5.append((int(update_fields_working.delta_current[i]), int(update_fields_working.id[i]))) sql_update_query = f"""Update {table_name} set delta_current = %s where id = %s""" try: psycopg2.extras.execute_batch(cursor, sql_update_query, update_records5) conn2.commit() except Exception: cursor.execute('ROLLBACK') psycopg2.extras.execute_batch(cursor, sql_update_query, update_records5) conn2.commit() conn2.close() cache.clear() else: log_to_telegram('len(pts_inf) <= 0') run_psql_command() log_to_telegram('end raschet') # status.status = 'Перерасчет ML завершен' # status.save() change_status('Перерасчет ML завершен', task_name=task_name) if need_time: LastMLCall.objects.all().delete() LastMLCall.objects.create() @shared_task def load_post_and_pvz(obj_id: int): file = models.TempFiles.objects.get(id=obj_id) status, _ = models.TaskStatus.objects.get_or_create(task_name='Загрузка ПВЗ и Постаматов') excel_file = base64.b64decode(file.data) df = pd.read_excel(excel_file) df = df.replace(np.nan, None) df = df.replace('NaT', None) df.columns = df.columns.str.lower() data_len = df.shape[0] for _ind, row in enumerate(df.to_dict('records')): status.status = "Загрузка данных: " + str(int(_ind / data_len * 100)) + "%" status.save() category = row.get('category') group = row.get('group') if category: cat, _ = models.Post_and_pvzCategory.objects.get_or_create(name=category) if group: gr, _ = models.Post_and_pvzGroup.objects.get_or_create(name=group, category=cat) row['category'] = cat row['group'] = gr lon = str(row.pop('lon')) lat = str(row.pop("lat")) row['wkt'] = "POINT(" + lon + " " + lat + ")" models.Post_and_pvz.objects.get_or_create(**row) status.status = "Загрузка данных завершена" status.save() groups = df[['group', 'category']].drop_duplicates().to_dict(orient='records') points = models.PlacementPoint.objects.all() num_points = points.count() total = len(groups) * num_points for _i, gr in enumerate(groups): group = models.Post_and_pvzGroup.objects.get(name=gr['group'], category__name=gr['category']) for _j, point in enumerate(points): status.status = "Подсчет расстояний: " + str(int((num_points * _i + _j) / total * 100)) + "%" status.save() post_object = models.Post_and_pvz.objects.filter(group__id=group.id).annotate( distance=Distance("wkt", point.geometry)).order_by('distance').first() d = models.PlacementPointPVZDistance.objects.filter(placement_point=point, pvz_postamates_group=group).first() if d: if d.dist > post_object.distance.m: d.dist = post_object.distance.m d.save() else: models.PlacementPointPVZDistance.objects.create(placement_point=point, pvz_postamates_group=group, dist=post_object.distance.m) status.status = "Подсчет расстояний завершен" status.save() point_qs = models.PlacementPoint.objects.all() data_len = models.PlacementPoint.objects.count() for _ind, point in enumerate(point_qs): status.status = "Пересчет параметров точек: " + str(int(_ind / data_len * 100)) + "%" status.save() LayerService.count_post_pvz(point) status.status = "Завершено" cache.clear() status.save() run_psql_command() @shared_task() def add_age_day(): qs = PlacementPoint.objects.filter(status='Working') # c1 = qs.filter(sample_trn=True).count() qs.update(age_day=F('age_day') + 1) qs2 = qs.filter(age_day__gt=AGE_DAY_LIMIT) qs2.update(sample_trn=True) # c2 = PlacementPoint.objects.filter(sample_trn=True).count() # if c2 - c1 != 0: # raschet.delay() @shared_task() def load_other_objects(obj_id: int): file = models.TempFiles.objects.get(id=obj_id) status, _ = models.TaskStatus.objects.get_or_create(task_name='Загрузка Прочих объектов') excel_file = base64.b64decode(file.data) df = pd.read_excel(excel_file) df = df.replace(np.nan, None) df = df.replace('NaT', None) df.columns = df.columns.str.lower() data_len = df.shape[0] for _ind, row in enumerate(df.to_dict('records')): status.status = "Загрузка данных: " + str(int(_ind / data_len * 100)) + "%" status.save() category = row.get('category') group = row.get('group') if category: cat, _ = models.OtherObjectsCategory.objects.get_or_create(name=category) if group: gr, _ = models.OtherObjectsGroup.objects.get_or_create(name=group, category=cat) row['category'] = cat row['group'] = gr lon = str(row.pop('lon')) lat = str(row.pop("lat")) row['wkt'] = "POINT(" + lon + " " + lat + ")" models.OtherObjects.objects.get_or_create(**row) status.status = "Загрузка данных завершена" cache.clear() status.save() run_psql_command() @shared_task() def load_data(obj_id: int): status, _ = models.TaskStatus.objects.get_or_create(task_name='Загрузка Точек') file = models.TempFiles.objects.get(id=obj_id) csv_file = base64.b64decode(file.data) models.PlacementPoint.objects.all().delete() s = str(csv_file, 'utf-8') data = StringIO(s) df = pd.read_csv(data, delimiter=';') df = df.replace(np.nan, None) df = df.replace('NaT', None) data_len = df.shape[0] for _ind, row in enumerate(df.to_dict('records')): status.status = "Загрузка данных: " + str(int(_ind / data_len * 100)) + "%" status.save() data = { k: row[k] for k in row.keys() if k not in ['id', 'location_id', 'area', 'district', 'age_month'] } models.PlacementPoint.objects.create(**data) status.status = "Загрузка данных завершена" status.save() models.TempFiles.objects.all().delete() @shared_task() def import_task(file_id): PointService().start_mathing(file_id) PointService().make_enrichment() raschet('service_preplacementpoint', need_time=False, task_name=STATUS_TASK_NAME_IMPORT)