You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

352 lines
16 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import catboost
import geopandas as gpd
import numpy as np
import pandas as pd
import psycopg2
import sqlalchemy
from celery import shared_task
from django.db.models import F
from scipy import interpolate
from scipy.spatial import distance
from shapely import wkb
from sklearn import metrics
from sklearn import model_selection as ms
from sqlalchemy import text
from django.contrib.gis.db.models.functions import Distance
import requests
from io import StringIO
from postamates.settings import AGE_DAY_LIMIT
from postamates.settings import DB_URL
from service.models import PlacementPoint, LastMLCall
from service import models
def log_to_telegram(msg):
requests.post('https://api.telegram.org/bot6275517704:AAHVp_qv9d9NU740JJdOM2fJdgS4r1AgJrw/sendMessage',
json={"chat_id": "-555238820", "text": msg})
@shared_task()
def raschet():
LastMLCall.objects.all().delete()
LastMLCall.objects.create()
log_to_telegram('start raschet')
try:
log_to_telegram('try connect to db')
conn = sqlalchemy.create_engine(
DB_URL,
connect_args={'options': '-csearch_path=public'},
)
except:
log_to_telegram('error connect to db')
try:
query = text('select * from service_placementpoint')
connection = conn.connect()
pts = pd.read_sql(query, connection)
pts['geometry'] = pts['geometry'].apply(wkb.loads, hex=True)
pts = gpd.GeoDataFrame(pts, geometry='geometry', crs='epsg:4326')
pts = pts.to_crs('epsg:32637')
pts = pts.rename(
columns={
'target_cnt_nearby_mean': 'target_dist1',
'target_age_nearby_mean': 'target_dist2',
'yndxfood_cnt_cst': 'target_dist3',
},
)
feats = [
'id', 'metro_dist', 'target_dist', 'property_price_bargains', 'property_price_offers',
'property_mean_floor',
'property_era', 'flats_cnt_2', 'flats_cnt', 'popul_home', 'popul_job', 'other_post_cnt', 'yndxfood_sum',
'yndxfood_cnt', 'school_cnt', 'kindergar_cnt', 'target_post_cnt', 'public_stop_cnt', 'sport_center_cnt',
'pharmacy_cnt', 'supermarket_cnt', 'supermarket_premium_cnt', 'clinic_cnt', 'bank_cnt', 'reca_cnt',
'lab_cnt', 'culture_cnt', 'attraction_cnt', 'mfc_cnt', 'bc_cnt', 'tc_cnt', 'rival_pvz_cnt',
'rival_post_cnt',
'business_activity', 'age_day', 'target_cnt_ao_mean', 'target_dist1', 'target_dist2', 'target_dist3',
]
# Записи для обучения
pts_trn = pts.loc[pts.sample_trn == True].reset_index(drop=True)
pts_trn = gpd.GeoDataFrame(pts_trn, geometry='geometry', crs='epsg:32637')
pts_target = pts_trn[['geometry']]
pts_target['cnt'] = 1
pts_target = gpd.GeoDataFrame(pts_target, geometry='geometry', crs='epsg:32637')
target_feature_coords = []
for i in range(0, len(pts_target)):
target_feature_coords.append((pts_target.geometry.x[i], pts_target.geometry.y[i]))
target_feature_coords = np.array(target_feature_coords)
pts_trn['target_dist'] = pts_trn.apply(
lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[1])),
axis=1,
)
pts_trn.loc[pts_trn.target_dist > 700, 'target_dist'] = 700
pts_trn['target_dist1'] = pts_trn.apply(
lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[2])),
axis=1,
)
pts_trn.loc[pts_trn.target_dist1 > 700, 'target_dist1'] = 700
pts_trn['target_dist2'] = pts_trn.apply(
lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[3])),
axis=1,
)
pts_trn.loc[pts_trn.target_dist2 > 700, 'target_dist2'] = 700
pts_trn['target_dist3'] = pts_trn.apply(
lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[4])),
axis=1,
)
pts_trn.loc[pts_trn.target_dist3 > 700, 'target_dist3'] = 700
pts_trn['buf'] = pts_trn.buffer(500)
pts_trn = gpd.GeoDataFrame(pts_trn, geometry='buf', crs='epsg:32637')
target_post = gpd.sjoin(pts_trn, pts_target, op='contains').groupby('id', as_index=False).agg({'cnt': 'count'})
target_post = target_post.rename(columns={'cnt': 'target_post_cnt'})
pts_trn = pts_trn.drop(columns=['target_post_cnt'])
pts_trn = pts_trn.join(target_post.set_index('id'), on='id')
pts_trn['target_post_cnt'] = pts_trn['target_post_cnt'] - 1
pts_trn = pts_trn.sort_values(by='id').reset_index(drop=True)
X_trn = pts_trn[feats].drop(columns=['id'])
Y_trn = pts_trn[['fact']]
# Записи для инференса
pts_inf = pts.loc[(pts.status == 'Pending') |
(pts.status == 'Installation') |
(pts.status == 'Cancelled') |
((pts.status == 'Working') & (pts.sample_trn == False))].reset_index(drop=True)
pts_inf = gpd.GeoDataFrame(pts_inf, geometry='geometry', crs='epsg:32637')
pts_inf['buf'] = pts_inf.buffer(500)
pts_inf = gpd.GeoDataFrame(pts_inf, geometry='buf', crs='epsg:32637')
pts_target = pts.loc[(pts.status == 'Working') |
(pts.status == 'Installation') |
(pts.sample_trn == True)].reset_index(drop=True)
pts_target = pts_target[['geometry']]
pts_target['cnt'] = 1
pts_target = gpd.GeoDataFrame(pts_target, geometry='geometry', crs='epsg:32637')
target_feature_coords = []
for i in range(0, len(pts_target)):
target_feature_coords.append((pts_target.geometry.x[i], pts_target.geometry.y[i]))
target_feature_coords = np.array(target_feature_coords)
pts_inf['target_dist'] = pts_inf.apply(
lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[0])),
axis=1,
)
pts_inf.loc[pts_inf.target_dist > 700, 'target_dist'] = 700
pts_inf['target_dist1'] = pts_inf.apply(
lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[1])),
axis=1,
)
pts_inf.loc[pts_inf.target_dist1 > 700, 'target_dist1'] = 700
pts_inf['target_dist2'] = pts_inf.apply(
lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[2])),
axis=1,
)
pts_inf.loc[pts_inf.target_dist2 > 700, 'target_dist2'] = 700
pts_inf['target_dist3'] = pts_inf.apply(
lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[3])),
axis=1,
)
pts_inf.loc[pts_inf.target_dist3 > 700, 'target_dist3'] = 700
pts_inf = pts_inf.sort_values(by='id').reset_index(drop=True)
target_post = gpd.sjoin(pts_inf, pts_target, op='contains').groupby('id', as_index=False).agg({'cnt': 'count'})
target_post = target_post.rename(columns={'cnt': 'target_post_cnt'})
pts_inf = pts_inf.drop(columns=['target_post_cnt'])
pts_inf = pts_inf.join(target_post.set_index('id'), on='id')
pts_inf['age_day_init'] = pts_inf['age_day']
pts_inf['age_day'] = 240
X_inf = pts_inf[feats]
seeds = [3, 99, 87, 21, 15]
# Обучение, инференс
r2_scores = []
mapes = []
y_infers = []
for i in seeds:
x_trn, x_test, y_trn, y_test = ms.train_test_split(X_trn, Y_trn, test_size=0.2, random_state=i)
model = catboost.CatBoostRegressor(cat_features=['property_era'], random_state=i)
model.fit(x_trn, y_trn, verbose=False)
r2_score = metrics.r2_score(y_test, model.predict(x_test))
mape = metrics.mean_absolute_percentage_error(y_test, model.predict(x_test))
if ((r2_score > 0.45) & (mape < 0.25)):
r2_scores.append(r2_score)
mapes.append(mape)
y_infers.append(model.predict(X_inf.drop(columns=['id'])))
current_pred = sum(y_infers) / 5
# Обновление полей по результатам работы модели
update_fields = pts_inf[
[
'id', 'age_day_init', 'status', 'fact', 'delta_current', 'delta_first', 'plan_current', 'plan_first',
'prediction_first',
]
]
update_fields = update_fields.join(
pd.concat(
[
X_inf[['id']],
pd.DataFrame({'prediction_current': current_pred}),
],
axis=1,
).set_index('id'),
on='id',
)
update_fields['prediction_current'] = update_fields['prediction_current'].astype(int)
days_x = np.array([0, 30, 60, 90, 120, 150, 180, 210, 240, 270])
perc_y = np.array([0, 0.15, 0.20, 0.30, 0.60, 0.70, 0.70, 0.75, 0.75, 0.80])
spl = interpolate.splrep(days_x, perc_y)
update_fields['plan_first'] = update_fields.apply(
lambda x: (x.prediction_first * interpolate.splev(x.age_day_init, spl) if x.status == 'Working' else 0),
axis=1,
)
update_fields['plan_current'] = update_fields.apply(
lambda x: (x.prediction_current * interpolate.splev(x.age_day_init, spl) if x.status == 'Working' else 0),
axis=1,
)
update_fields['delta_first'] = update_fields.apply(
lambda x: ((x.fact - x.plan_first) / x.plan_first * 100 if x.status == 'Working' else 0),
axis=1,
)
update_fields['delta_current'] = update_fields.apply(
lambda x: ((x.fact - x.plan_current) / x.plan_current * 100 if x.status == 'Working' else 0),
axis=1,
)
update_fields_working = update_fields.loc[update_fields.status == 'Working'].reset_index(drop=True)
update_fields_working = update_fields_working.fillna(0)
except Exception as e:
log_to_telegram(f'Ошибка при обновлении полей в базе данных: {e}')
log_to_telegram('Начинается обновление полей в базе')
# Загрузка в базу обновленных значений
try:
log_to_telegram('Подключение к базе данных 2')
conn2 = psycopg2.connect(
database=os.getenv('POSTGRES_DB', 'postgres'), user=os.getenv('POSTGRES_USER', 'postgres'),
password=os.getenv('POSTGRES_PASSWORD', 'postgres'),
host=os.getenv('POSTGRES_HOST', 'postgres'), port=os.getenv('POSTGRES_PORT', 'postgres'),
options='-c search_path=public',
)
cursor = conn2.cursor()
except:
log_to_telegram('Не удалось подключиться к базе данных')
# prediction_current
update_records1 = []
for i in range(0, len(update_fields)):
update_records1.append((int(update_fields.prediction_current[i]), int(update_fields.id[i])))
sql_update_query = """Update service_placementpoint set prediction_current = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1)
conn2.commit()
# plan_first
update_records2 = []
for i in range(0, len(update_fields_working)):
update_records2.append((int(update_fields_working.plan_first[i]), int(update_fields_working.id[i])))
sql_update_query = """Update service_placementpoint set plan_first = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records2)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records2)
conn2.commit()
# plan_current
update_records3 = []
for i in range(0, len(update_fields_working)):
update_records3.append((int(update_fields_working.plan_current[i]), int(update_fields_working.id[i])))
sql_update_query = """Update service_placementpoint set plan_current = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records3)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records3)
conn2.commit()
# delta_first
update_records4 = []
for i in range(0, len(update_fields_working)):
update_records4.append((int(update_fields_working.delta_first[i]), int(update_fields_working.id[i])))
sql_update_query = """Update service_placementpoint set delta_first = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records4)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records4)
conn2.commit()
# delta_current
update_records5 = []
for i in range(0, len(update_fields_working)):
update_records5.append((int(update_fields_working.delta_current[i]), int(update_fields_working.id[i])))
sql_update_query = """Update service_placementpoint set delta_current = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records5)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records5)
conn2.commit()
log_to_telegram('end raschet')
@shared_task
def calculate_group_distance(groups: list):
status, _ = models.TaskStatus.objects.get_or_create(task_name='Расчет ближайшего расстояния')
points = models.PlacementPoint.objects.all()
num_points = points.count()
total = len(groups) * num_points
for _i, gr in enumerate(groups):
group = models.Post_and_pvzGroup.objects.get(name=gr['group'], category__name=gr['category'])
for _j, point in enumerate(points):
status.status = "Подсчет расстояний: " + str(int((num_points * _i + _j) / total * 100)) + "%"
status.save()
post_object = models.Post_and_pvz.objects.filter(group__name=group.name).annotate(
distance=Distance("wkt", point.geometry)).order_by('distance').first()
d = models.PlacementPointPVZDistance.objects.filter(placement_point=point,
pvz_postamates_group=group).first()
if d:
if d.dist > post_object.distance.m:
d.dist = post_object.distance.m
d.save()
else:
models.PlacementPointPVZDistance.objects.create(placement_point=point, pvz_postamates_group=group,
dist=post_object.distance.m)
status.status = "Подсчет расстояний завершен"
status.save()
@shared_task()
def add_age_day():
qs = PlacementPoint.objects
c1 = qs.filter(sample_trn=True).count()
qs.update(age_day=F('age_day') + 1)
qs2 = qs.filter(age_day__gt=AGE_DAY_LIMIT)
qs2.update(sample_trn=True)
c2 = PlacementPoint.objects.filter(sample_trn=True).count()
if c2 - c1 != 0:
raschet.delay()