You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

514 lines
24 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import catboost
import geopandas as gpd
import numpy as np
import pandas as pd
import shap
import psycopg2
import sqlalchemy
from celery import shared_task
from django.db.models import F
from scipy import interpolate
from scipy.spatial import distance
from shapely import wkb
from sklearn import metrics
from sklearn import model_selection as ms
from sqlalchemy import text
from django.contrib.gis.db.models.functions import Distance
from postamates.settings import AGE_DAY_LIMIT
from postamates.settings import DB_URL, STATUS_TASK_NAME
from service.models import PlacementPoint, LastMLCall
from service import models
from service.utils import log_to_telegram
import base64
from io import StringIO
from django.core.cache import cache
from service.layer_service import LayerService
from service.service import PointService
from service.utils import run_psql_command
@shared_task()
def raschet(table_name='service_placementpoint', need_time=True):
print('start raschet')
status, _ = models.TaskStatus.objects.get_or_create(task_name=STATUS_TASK_NAME)
raschet_objs = models.RaschetObjects.objects.all()
if raschet_objs:
status.status = 'Начало расчета кол-ва ПВЗ вокруг точек'
status.save()
total = raschet_objs.count()
for _i, r_o in enumerate(raschet_objs):
obj = models.Post_and_pvz.objects.get(id=r_o.obj_id)
LayerService().count_post_pvz_for_placementpoint(obj)
status.status = "Подсчет кол-ва ПВЗ вокруг точек: " + str(int((_i + 1) / total * 100)) + "%"
status.save()
status.status = 'Расчет кол-ва ПВЗ вокруг точек завершен'
status.save()
group_objects = models.RaschetGroups.objects.all()
group_total = group_objects.count()
if group_objects:
status.status = 'Начало расчета расстояний'
status.save()
qs = models.PlacementPoint.objects.all()
for _k, g_o in enumerate(group_objects):
g = models.Post_and_pvzGroup.objects.get(id=g_o.obj_id)
for q in qs:
PointService.calculate_dist_for_group(point=q, group=g)
status.status = "Подсчет расстояний: " + str(int(_k / group_total * 100)) + "%"
status.save()
status.status = "Подсчет расстояний завершен"
status.save()
models.RaschetObjects.objects.all().delete()
models.RaschetGroups.objects.all().delete()
# Запуск ML
status.status = 'Запуск ML'
status.save()
log_to_telegram(f'{table_name} start raschet')
try:
log_to_telegram('try connect to db')
conn = sqlalchemy.create_engine(
DB_URL,
connect_args={'options': '-csearch_path=public'},
)
except:
log_to_telegram('error connect to db')
try:
query = text('select * from service_placementpoint')
connection = conn.connect()
pts = pd.read_sql(query, connection)
pts['geometry'] = pts['geometry'].apply(wkb.loads, hex=True)
pts = gpd.GeoDataFrame(pts, geometry='geometry', crs='epsg:4326')
pts = pts.to_crs('epsg:32637')
feats = [
'id', 'metro_dist', 'target_dist', 'property_price_bargains', 'property_price_offers',
'property_mean_floor',
'property_era', 'flats_cnt', 'popul_home', 'popul_job', 'yndxfood_sum',
'yndxfood_cnt', 'school_cnt', 'kindergar_cnt', 'target_post_cnt', 'public_stop_cnt', 'sport_center_cnt',
'pharmacy_cnt', 'supermarket_cnt', 'supermarket_premium_cnt', 'clinic_cnt', 'bank_cnt', 'reca_cnt',
'lab_cnt', 'culture_cnt', 'attraction_cnt', 'mfc_cnt', 'bc_cnt', 'tc_cnt', 'rival_pvz_cnt',
'rival_post_cnt',
'business_activity', 'age_day', 'target_cnt_ao_mean'
]
# Записи для обучения
pts_trn = pts.loc[pts.sample_trn == True].reset_index(drop=True)
pts_trn = gpd.GeoDataFrame(pts_trn, geometry='geometry', crs='epsg:32637')
pts_target = pts_trn[['geometry']]
pts_target['cnt'] = 1
pts_target = gpd.GeoDataFrame(pts_target, geometry='geometry', crs='epsg:32637')
target_feature_coords = []
for i in range(0, len(pts_target)):
target_feature_coords.append((pts_target.geometry.x[i], pts_target.geometry.y[i]))
target_feature_coords = np.array(target_feature_coords)
pts_trn['target_dist'] = pts_trn.apply(
lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[1])),
axis=1,
)
pts_trn.loc[pts_trn.target_dist > 700, 'target_dist'] = 700
pts_trn['buf'] = pts_trn.buffer(500)
pts_trn = gpd.GeoDataFrame(pts_trn, geometry='buf', crs='epsg:32637')
target_post = gpd.sjoin(pts_trn, pts_target, op='contains').groupby('id', as_index=False).agg({'cnt': 'count'})
target_post = target_post.rename(columns={'cnt': 'target_post_cnt'})
pts_trn = pts_trn.drop(columns=['target_post_cnt'])
pts_trn = pts_trn.join(target_post.set_index('id'), on='id')
pts_trn['target_post_cnt'] = pts_trn['target_post_cnt'] - 1
pts_trn = pts_trn.sort_values(by='id').reset_index(drop=True)
X_trn = pts_trn[feats].drop(columns=['id'])
Y_trn = pts_trn[['fact']]
status.status = 'Записи для инференса'
status.save()
# Записи для инференса
if table_name == 'service_placementpoint':
pts_inf = pts.loc[(pts.status == 'Pending') |
(pts.status == 'Installation') |
(pts.status == 'Cancelled') |
((pts.status == 'Working') & (pts.sample_trn == False))].reset_index(drop=True)
elif table_name == 'service_preplacementpoint':
pts_inf = pd.DataFrame(conn.connect().execute(text(f"select * from {table_name}")))
pts_inf = pts_inf.loc[pts_inf.matching_status == 'New'].reset_index(drop=True)
pts_inf['geometry'] = pts_inf['geometry'].apply(wkb.loads, hex=True)
pts_inf = gpd.GeoDataFrame(pts_inf, geometry='geometry', crs='epsg:4326')
if len(pts_inf) > 0:
pts_inf = pts_inf.to_crs('epsg:32637')
pts_inf['buf'] = pts_inf.buffer(500)
pts_inf = gpd.GeoDataFrame(pts_inf, geometry='buf', crs='epsg:32637')
pts_target = pts.loc[(pts.status == 'Working') |
(pts.status == 'Installation')].reset_index(drop=True)
pts_target = pts_target[['geometry']]
pts_target['cnt'] = 1
pts_target = gpd.GeoDataFrame(pts_target, geometry='geometry', crs='epsg:32637')
target_feature_coords = []
for i in range(0, len(pts_target)):
target_feature_coords.append((pts_target.geometry.x[i], pts_target.geometry.y[i]))
target_feature_coords = np.array(target_feature_coords)
pts_inf['target_dist'] = pts_inf.apply(
lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[0])),
axis=1,
)
pts_inf.loc[pts_inf.target_dist > 700, 'target_dist'] = 700
pts_inf = pts_inf.sort_values(by='id').reset_index(drop=True)
target_post = gpd.sjoin(pts_inf, pts_target, op='contains').groupby('id', as_index=False).agg({'cnt': 'count'})
target_post = target_post.rename(columns={'cnt': 'target_post_cnt'})
pts_inf = pts_inf.drop(columns=['target_post_cnt'])
pts_inf = pts_inf.join(target_post.set_index('id'), on='id')
pts_inf['target_post_cnt'] = pts_inf['target_post_cnt'].fillna(0)
pts_inf['age_day_init'] = pts_inf['age_day']
pts_inf['age_day'] = 240
X_inf = pts_inf[feats]
seeds = [3, 99, 87, 21, 15]
# Обучение, инференс
r2_scores = []
mapes = []
y_infers = []
status.status = 'Обучение inference 0%'
status.save()
for i in seeds:
status.status = 'Обучение inference: ' + str(int((seeds.index(i) + 1) / len(seeds) * 100)) + '%'
status.save()
x_trn, x_test, y_trn, y_test = ms.train_test_split(X_trn, Y_trn, test_size=0.2, random_state=i)
model = catboost.CatBoostRegressor(cat_features=['property_era'], random_state=i)
model.fit(x_trn, y_trn, verbose=False)
r2_score = metrics.r2_score(y_test, model.predict(x_test))
mape = metrics.mean_absolute_percentage_error(y_test, model.predict(x_test))
if ((r2_score > 0.45) & (mape < 0.25)):
r2_scores.append(r2_score)
mapes.append(mape)
y_infers.append(model.predict(X_inf.drop(columns=['id'])))
status.status = 'Обучение inference 100%'
current_pred = sum(y_infers) / 5
# расчет шапов
explainer = shap.TreeExplainer(model)
shap_values = explainer(X_inf.drop(columns=['id']))
shap_fields = pd.DataFrame(shap_values.values)
shap_fields.columns = X_inf.drop(columns=['id']).columns + '_shap'
shap_fields = shap_fields.drop(columns = ['age_day_shap'])
shap_fields['sum'] = abs(shap_fields).sum(axis=1)
shap_fields = round(shap_fields.iloc[:,:32].div(shap_fields['sum'], axis=0)*100, 2)
# Обновление полей по результатам работы модели
update_fields = pts_inf[
[
'id', 'age_day_init', 'status', 'fact', 'delta_current', 'delta_first', 'plan_current', 'plan_first',
'prediction_first', 'target_post_cnt', 'target_dist'
]
]
update_fields = update_fields.join(
pd.concat(
[
X_inf[['id']],
pd.DataFrame({'prediction_current': current_pred}),
],
axis=1,
).set_index('id'),
on='id',
)
update_fields['prediction_current'] = update_fields['prediction_current'].astype(int)
days_x = np.array([0, 30, 60, 90, 120, 150, 180, 210, 240, 270])
perc_y = np.array([0, 0.15, 0.20, 0.30, 0.60, 0.70, 0.70, 0.75, 0.75, 0.80])
spl = interpolate.splrep(days_x, perc_y)
update_fields['plan_first'] = update_fields.apply(
lambda x: (x.prediction_first * interpolate.splev(x.age_day_init, spl) if x.status == 'Working' else 0),
axis=1,
)
update_fields['plan_current'] = update_fields.apply(
lambda x: (x.prediction_current * interpolate.splev(x.age_day_init, spl) if x.status == 'Working' else 0),
axis=1,
)
update_fields['delta_first'] = update_fields.apply(
lambda x: ((x.fact - x.plan_first) / x.plan_first * 100 if x.status == 'Working' else 0),
axis=1,
)
update_fields['delta_current'] = update_fields.apply(
lambda x: ((x.fact - x.plan_current) / x.plan_current * 100 if x.status == 'Working' else 0),
axis=1,
)
update_fields_working = update_fields.loc[update_fields.status == 'Working'].reset_index(drop=True)
update_fields_working = update_fields_working.fillna(0)
connection.close()
except Exception as e:
log_to_telegram(f'Ошибка при обновлении полей в базе данных: {e}')
log_to_telegram('Начинается обновление полей в базе')
if len(pts_inf) > 0:
status.status = 'Перерасчет ML: 50%'
status.save()
# Загрузка в базу обновленных значений
try:
log_to_telegram('Подключение к базе данных 2')
conn2 = psycopg2.connect(
database=os.getenv('POSTGRES_DB', 'postgres'), user=os.getenv('POSTGRES_USER', 'postgres'),
password=os.getenv('POSTGRES_PASSWORD', 'postgres'),
host=os.getenv('POSTGRES_HOST', 'postgres'), port=os.getenv('POSTGRES_PORT', 'postgres'),
options='-c search_path=public',
)
cursor = conn2.cursor()
except:
conn2 = None
log_to_telegram('Не удалось подключиться к базе данных')
if conn2 is not None:
# апдейт шапов
update_fields_shap = pd.concat([shap_fields, update_fields[['id']]], axis=1)
update_records0 = []
for i in range(0, len(update_fields_shap)):
update_records1 = []
for n in list(update_fields_shap):
update_records1.append(int(update_fields_shap[n][i]))
update_records0.append(tuple(update_records1))
shap_fields_name = str(list(shap_fields))[1:-1].replace("'", "").replace(',', '=%s,')
sql_update_query = f"""Update {table_name} set {shap_fields_name} = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records0)
conn2.commit()
except:
cursor.execute("ROLLBACK")
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records0)
conn2.commit()
# target_post_cnt
update_records1 = []
for i in range(0, len(update_fields)):
update_records1.append((int(update_fields.target_post_cnt[i]), int(update_fields.id[i])))
sql_update_query = f"""Update {table_name} set target_post_cnt = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1)
conn2.commit()
except:
cursor.execute("ROLLBACK")
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1)
conn2.commit()
# target_dist
update_records1 = []
for i in range(0, len(update_fields)):
update_records1.append((int(update_fields.target_dist[i]), int(update_fields.id[i])))
sql_update_query = f"""Update {table_name} set target_dist = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1)
conn2.commit()
except:
cursor.execute("ROLLBACK")
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1)
conn2.commit()
# prediction_current
update_records1 = []
for i in range(0, len(update_fields)):
update_records1.append((int(update_fields.prediction_current[i]), int(update_fields.id[i])))
sql_update_query = f"""Update {table_name} set prediction_current = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1)
conn2.commit()
# plan_first
update_records2 = []
for i in range(0, len(update_fields_working)):
update_records2.append((int(update_fields_working.plan_first[i]), int(update_fields_working.id[i])))
sql_update_query = f"""Update {table_name} set plan_first = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records2)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records2)
conn2.commit()
# plan_current
update_records3 = []
for i in range(0, len(update_fields_working)):
update_records3.append((int(update_fields_working.plan_current[i]), int(update_fields_working.id[i])))
sql_update_query = f"""Update {table_name} set plan_current = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records3)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records3)
conn2.commit()
# delta_first
update_records4 = []
for i in range(0, len(update_fields_working)):
update_records4.append((int(update_fields_working.delta_first[i]), int(update_fields_working.id[i])))
sql_update_query = f"""Update {table_name} set delta_first = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records4)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records4)
conn2.commit()
# delta_current
update_records5 = []
for i in range(0, len(update_fields_working)):
update_records5.append((int(update_fields_working.delta_current[i]), int(update_fields_working.id[i])))
sql_update_query = f"""Update {table_name} set delta_current = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records5)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records5)
conn2.commit()
conn2.close()
cache.clear()
else:
log_to_telegram('len(pts_inf) <= 0')
run_psql_command()
log_to_telegram('end raschet')
status.status = 'Перерасчет ML завершен'
status.save()
if need_time:
LastMLCall.objects.all().delete()
LastMLCall.objects.create()
@shared_task
def load_post_and_pvz(obj_id: int):
file = models.TempFiles.objects.get(id=obj_id)
status, _ = models.TaskStatus.objects.get_or_create(task_name='Загрузка ПВЗ и Постаматов')
excel_file = base64.b64decode(file.data)
df = pd.read_excel(excel_file)
df = df.replace(np.nan, None)
df = df.replace('NaT', None)
df.columns = df.columns.str.lower()
data_len = df.shape[0]
for _ind, row in enumerate(df.to_dict('records')):
status.status = "Загрузка данных: " + str(int(_ind / data_len * 100)) + "%"
status.save()
category = row.get('category')
group = row.get('group')
if category:
cat, _ = models.Post_and_pvzCategory.objects.get_or_create(name=category)
if group:
gr, _ = models.Post_and_pvzGroup.objects.get_or_create(name=group, category=cat)
row['category'] = cat
row['group'] = gr
lon = str(row.pop('lon'))
lat = str(row.pop("lat"))
row['wkt'] = "POINT(" + lon + " " + lat + ")"
models.Post_and_pvz.objects.get_or_create(**row)
status.status = "Загрузка данных завершена"
status.save()
groups = df[['group', 'category']].drop_duplicates().to_dict(orient='records')
points = models.PlacementPoint.objects.all()
num_points = points.count()
total = len(groups) * num_points
for _i, gr in enumerate(groups):
group = models.Post_and_pvzGroup.objects.get(name=gr['group'], category__name=gr['category'])
for _j, point in enumerate(points):
status.status = "Подсчет расстояний: " + str(int((num_points * _i + _j) / total * 100)) + "%"
status.save()
post_object = models.Post_and_pvz.objects.filter(group__name=group.name).annotate(
distance=Distance("wkt", point.geometry)).order_by('distance').first()
d = models.PlacementPointPVZDistance.objects.filter(placement_point=point,
pvz_postamates_group=group).first()
if d:
if d.dist > post_object.distance.m:
d.dist = post_object.distance.m
d.save()
else:
models.PlacementPointPVZDistance.objects.create(placement_point=point, pvz_postamates_group=group,
dist=post_object.distance.m)
status.status = "Подсчет расстояний завершен"
status.save()
point_qs = models.PlacementPoint.objects.all()
data_len = models.PlacementPoint.objects.count()
for _ind, point in enumerate(point_qs):
status.status = "Пересчет параметров точек: " + str(int(_ind / data_len * 100)) + "%"
status.save()
LayerService.count_post_pvz(point)
status.status = "Завершено"
cache.clear()
status.save()
@shared_task()
def add_age_day():
qs = PlacementPoint.objects.filter(status='Working')
# c1 = qs.filter(sample_trn=True).count()
qs.update(age_day=F('age_day') + 1)
qs2 = qs.filter(age_day__gt=AGE_DAY_LIMIT)
qs2.update(sample_trn=True)
# c2 = PlacementPoint.objects.filter(sample_trn=True).count()
# if c2 - c1 != 0:
# raschet.delay()
@shared_task()
def load_other_objects(obj_id: int):
file = models.TempFiles.objects.get(id=obj_id)
status, _ = models.TaskStatus.objects.get_or_create(task_name='Загрузка Прочих объектов')
excel_file = base64.b64decode(file.data)
df = pd.read_excel(excel_file)
df = df.replace(np.nan, None)
df = df.replace('NaT', None)
df.columns = df.columns.str.lower()
data_len = df.shape[0]
for _ind, row in enumerate(df.to_dict('records')):
status.status = "Загрузка данных: " + str(int(_ind / data_len * 100)) + "%"
status.save()
category = row.get('category')
group = row.get('group')
if category:
cat, _ = models.OtherObjectsCategory.objects.get_or_create(name=category)
if group:
gr, _ = models.OtherObjectsGroup.objects.get_or_create(name=group, category=cat)
row['category'] = cat
row['group'] = gr
lon = str(row.pop('lon'))
lat = str(row.pop("lat"))
row['wkt'] = "POINT(" + lon + " " + lat + ")"
models.OtherObjects.objects.get_or_create(**row)
status.status = "Загрузка данных завершена"
cache.clear()
status.save()
@shared_task()
def load_data(obj_id: int):
status, _ = models.TaskStatus.objects.get_or_create(task_name='Загрузка Точек')
file = models.TempFiles.objects.get(id=obj_id)
csv_file = base64.b64decode(file.data)
models.PlacementPoint.objects.all().delete()
s = str(csv_file, 'utf-8')
data = StringIO(s)
df = pd.read_csv(data, delimiter=';')
df = df.replace(np.nan, None)
df = df.replace('NaT', None)
data_len = df.shape[0]
for _ind, row in enumerate(df.to_dict('records')):
status.status = "Загрузка данных: " + str(int(_ind / data_len * 100)) + "%"
status.save()
data = {
k: row[k] for k in row.keys() if
k not in ['id', 'location_id', 'area', 'district', 'age_month']
}
models.PlacementPoint.objects.create(**data)
status.status = "Загрузка данных завершена"
status.save()
models.TempFiles.objects.all().delete()