You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

303 lines
12 KiB

import os
import catboost
import geopandas as gpd
import numpy as np
import pandas as pd
import psycopg2
import sqlalchemy
from celery import shared_task
from django.db.models import F
from scipy import interpolate
from scipy.spatial import distance
from shapely import wkb
from sklearn import metrics
from sklearn import model_selection as ms
from sqlalchemy import text
from postamates.settings import AGE_DAY_LIMIT
from postamates.settings import DB_URL
from service.models import PlacementPoint
@shared_task()
def raschet():
conn = sqlalchemy.create_engine(
DB_URL,
connect_args={'options': '-csearch_path=public'},
)
query = text('select * from service_placementpoint')
connection = conn.connect()
pts = pd.read_sql(query, connection)
pts['geometry'] = pts['geometry'].apply(wkb.loads, hex=True)
pts = gpd.GeoDataFrame(pts, geometry='geometry', crs='epsg:4326')
pts = pts.to_crs('epsg:32637')
pts = pts.rename(
columns={
'target_cnt_nearby_mean': 'target_dist1',
'target_age_nearby_mean': 'target_dist2',
'yndxfood_cnt_cst': 'target_dist3',
},
)
feats = [
'id', 'metro_dist', 'target_dist', 'property_price_bargains', 'property_price_offers',
'property_mean_floor',
'property_era', 'flats_cnt_2', 'flats_cnt', 'popul_home', 'popul_job', 'other_post_cnt', 'yndxfood_sum',
'yndxfood_cnt', 'school_cnt', 'kindergar_cnt', 'target_post_cnt', 'public_stop_cnt', 'sport_center_cnt',
'pharmacy_cnt', 'supermarket_cnt', 'supermarket_premium_cnt', 'clinic_cnt', 'bank_cnt', 'reca_cnt',
'lab_cnt', 'culture_cnt', 'attraction_cnt', 'mfc_cnt', 'bc_cnt', 'tc_cnt', 'rival_pvz_cnt',
'rival_post_cnt',
'business_activity', 'age_day', 'target_cnt_ao_mean', 'target_dist1', 'target_dist2', 'target_dist3',
]
# Записи для обучения
pts_trn = pts.loc[pts.sample_trn == True].reset_index(drop=True)
pts_trn = gpd.GeoDataFrame(pts_trn, geometry='geometry', crs='epsg:32637')
pts_target = pts_trn[['geometry']]
pts_target['cnt'] = 1
pts_target = gpd.GeoDataFrame(pts_target, geometry='geometry', crs='epsg:32637')
target_feature_coords = []
for i in range(0, len(pts_target)):
target_feature_coords.append((pts_target.geometry.x[i], pts_target.geometry.y[i]))
target_feature_coords = np.array(target_feature_coords)
pts_trn['target_dist'] = pts_trn.apply(
lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[1])),
axis=1,
)
pts_trn.loc[pts_trn.target_dist > 700, 'target_dist'] = 700
pts_trn['target_dist1'] = pts_trn.apply(
lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[2])),
axis=1,
)
pts_trn.loc[pts_trn.target_dist1 > 700, 'target_dist1'] = 700
pts_trn['target_dist2'] = pts_trn.apply(
lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[3])),
axis=1,
)
pts_trn.loc[pts_trn.target_dist2 > 700, 'target_dist2'] = 700
pts_trn['target_dist3'] = pts_trn.apply(
lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[4])),
axis=1,
)
pts_trn.loc[pts_trn.target_dist3 > 700, 'target_dist3'] = 700
pts_trn['buf'] = pts_trn.buffer(500)
pts_trn = gpd.GeoDataFrame(pts_trn, geometry='buf', crs='epsg:32637')
target_post = gpd.sjoin(pts_trn, pts_target, op='contains').groupby('id', as_index=False).agg({'cnt': 'count'})
target_post = target_post.rename(columns={'cnt': 'target_post_cnt'})
pts_trn = pts_trn.drop(columns=['target_post_cnt'])
pts_trn = pts_trn.join(target_post.set_index('id'), on='id')
pts_trn['target_post_cnt'] = pts_trn['target_post_cnt'] - 1
pts_trn = pts_trn.sort_values(by='id').reset_index(drop=True)
X_trn = pts_trn[feats].drop(columns=['id'])
Y_trn = pts_trn[['fact']]
# Записи для инференса
pts_inf = pts.loc[(pts.status == 'Pending') |
(pts.status == 'Installation') |
(pts.status == 'Cancelled') |
((pts.status == 'Working') & (pts.sample_trn == False))].reset_index(drop=True)
pts_inf = gpd.GeoDataFrame(pts_inf, geometry='geometry', crs='epsg:32637')
pts_inf['buf'] = pts_inf.buffer(500)
pts_inf = gpd.GeoDataFrame(pts_inf, geometry='buf', crs='epsg:32637')
pts_target = pts.loc[(pts.status == 'Working') |
(pts.status == 'Installation') |
(pts.sample_trn == True)].reset_index(drop=True)
pts_target = pts_target[['geometry']]
pts_target['cnt'] = 1
pts_target = gpd.GeoDataFrame(pts_target, geometry='geometry', crs='epsg:32637')
target_feature_coords = []
for i in range(0, len(pts_target)):
target_feature_coords.append((pts_target.geometry.x[i], pts_target.geometry.y[i]))
target_feature_coords = np.array(target_feature_coords)
pts_inf['target_dist'] = pts_inf.apply(
lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[0])),
axis=1,
)
pts_inf.loc[pts_inf.target_dist > 700, 'target_dist'] = 700
pts_inf['target_dist1'] = pts_inf.apply(
lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[1])),
axis=1,
)
pts_inf.loc[pts_inf.target_dist1 > 700, 'target_dist1'] = 700
pts_inf['target_dist2'] = pts_inf.apply(
lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[2])),
axis=1,
)
pts_inf.loc[pts_inf.target_dist2 > 700, 'target_dist2'] = 700
pts_inf['target_dist3'] = pts_inf.apply(
lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[3])),
axis=1,
)
pts_inf.loc[pts_inf.target_dist3 > 700, 'target_dist3'] = 700
pts_inf = pts_inf.sort_values(by='id').reset_index(drop=True)
target_post = gpd.sjoin(pts_inf, pts_target, op='contains').groupby('id', as_index=False).agg({'cnt': 'count'})
target_post = target_post.rename(columns={'cnt': 'target_post_cnt'})
pts_inf = pts_inf.drop(columns=['target_post_cnt'])
pts_inf = pts_inf.join(target_post.set_index('id'), on='id')
pts_inf['age_day_init'] = pts_inf['age_day']
pts_inf['age_day'] = 240
X_inf = pts_inf[feats]
seeds = [3, 99, 87, 21, 15]
# Обучение, инференс
r2_scores = []
mapes = []
y_infers = []
for i in seeds:
x_trn, x_test, y_trn, y_test = ms.train_test_split(X_trn, Y_trn, test_size=0.2, random_state=i)
model = catboost.CatBoostRegressor(cat_features=['property_era'], random_state=i)
model.fit(x_trn, y_trn, verbose=False)
r2_score = metrics.r2_score(y_test, model.predict(x_test))
mape = metrics.mean_absolute_percentage_error(y_test, model.predict(x_test))
if ((r2_score > 0.45) & (mape < 0.25)):
r2_scores.append(r2_score)
mapes.append(mape)
y_infers.append(model.predict(X_inf.drop(columns=['id'])))
current_pred = sum(y_infers) / 5
# Обновление полей по результатам работы модели
update_fields = pts_inf[
[
'id', 'age_day_init', 'status', 'fact', 'delta_current', 'delta_first', 'plan_current', 'plan_first',
'prediction_first',
]
]
update_fields = update_fields.join(
pd.concat(
[
X_inf[['id']],
pd.DataFrame({'prediction_current': current_pred}),
],
axis=1,
).set_index('id'),
on='id',
)
update_fields['prediction_current'] = update_fields['prediction_current'].astype(int)
days_x = np.array([0, 30, 60, 90, 120, 150, 180, 210, 240, 270])
perc_y = np.array([0, 0.15, 0.20, 0.30, 0.60, 0.70, 0.70, 0.75, 0.75, 0.80])
spl = interpolate.splrep(days_x, perc_y)
update_fields['plan_first'] = update_fields.apply(
lambda x: (x.prediction_first * interpolate.splev(x.age_day_init, spl) if x.status == 'Working' else 0),
axis=1,
)
update_fields['plan_current'] = update_fields.apply(
lambda x: (x.prediction_current * interpolate.splev(x.age_day_init, spl) if x.status == 'Working' else 0),
axis=1,
)
update_fields['delta_first'] = update_fields.apply(
lambda x: ((x.fact - x.plan_first) / x.plan_first * 100 if x.status == 'Working' else 0),
axis=1,
)
update_fields['delta_current'] = update_fields.apply(
lambda x: ((x.fact - x.plan_current) / x.plan_current * 100 if x.status == 'Working' else 0),
axis=1,
)
update_fields_working = update_fields.loc[update_fields.status == 'Working'].reset_index(drop=True)
update_fields_working = update_fields_working.fillna(0)
# Загрузка в базу обновленных значений
conn2 = psycopg2.connect(
database=os.getenv('POSTGRES_DB', 'postgres'), user=os.getenv('POSTGRES_USER', 'postgres'),
password=os.getenv('POSTGRES_PASSWORD', 'postgres'),
host=os.getenv('POSTGRES_HOST', 'postgres'), port=os.getenv('POSTGRES_PORT', 'postgres'),
options='-c search_path=public',
)
cursor = conn2.cursor()
# prediction_current
update_records1 = []
for i in range(0, len(update_fields)):
update_records1.append((int(update_fields.prediction_current[i]), int(update_fields.id[i])))
sql_update_query = """Update service_placementpoint set prediction_current = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1)
conn2.commit()
# plan_first
update_records2 = []
for i in range(0, len(update_fields_working)):
update_records2.append((int(update_fields_working.plan_first[i]), int(update_fields_working.id[i])))
sql_update_query = """Update service_placementpoint set plan_first = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records2)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records2)
conn2.commit()
# plan_current
update_records3 = []
for i in range(0, len(update_fields_working)):
update_records3.append((int(update_fields_working.plan_current[i]), int(update_fields_working.id[i])))
sql_update_query = """Update service_placementpoint set plan_current = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records3)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records3)
conn2.commit()
# delta_first
update_records4 = []
for i in range(0, len(update_fields_working)):
update_records4.append((int(update_fields_working.delta_first[i]), int(update_fields_working.id[i])))
sql_update_query = """Update service_placementpoint set delta_first = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records4)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records4)
conn2.commit()
# delta_current
update_records5 = []
for i in range(0, len(update_fields_working)):
update_records5.append((int(update_fields_working.delta_current[i]), int(update_fields_working.id[i])))
sql_update_query = """Update service_placementpoint set delta_current = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records5)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records5)
conn2.commit()
@shared_task()
def add_age_day():
qs = PlacementPoint.objects
c1 = qs.filter(sample_trn=True).count()
qs.update(age_day=F('age_day') + 1)
qs2 = qs.filter(age_day__gt=AGE_DAY_LIMIT)
qs2.update(sample_trn=True)
c2 = PlacementPoint.objects.filter(sample_trn=True).count()
if c2 - c1 != 0:
raschet.delay()