You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

103 lines
3.7 KiB

import os
import catboost
import pandas as pd
import psycopg2
import sqlalchemy
from celery import shared_task
from django.db.models import F
from sklearn import metrics
from sklearn import model_selection as ms
from sqlalchemy import text
from postamates.settings import AGE_DAY_LIMIT
from postamates.settings import DB_URL
from service.models import PlacementPoint
# Запустить worker
# celery -A postamates worker -l info
# Запустить scheduler
# celery -A postamates beat -l INFO.
@shared_task()
def raschet():
conn = sqlalchemy.create_engine(
DB_URL,
connect_args={'options': '-csearch_path=public'},
)
query = text('select * from service_placementpoint')
pts = pd.read_sql(query, conn.connect())
feats = ['id', 'popul_home', 'popul_job', 'other_post_cnt', 'yndxfood_sum', 'target_post_cnt']
# Записи для обучения
pts_trn = pts.loc[pts.sample_trn == True].reset_index(drop=True)
X_trn = pts_trn[feats].drop(columns=['id'])
Y_trn = pts_trn[['fact']]
# Записи для инференса
pts_inf = pts.loc[(pts.status == 'Pending') |
(pts.status == 'Installation') |
(pts.status == 'Cancelled') |
((pts.status == 'Pending') & (pts.sample_trn == False))].reset_index(drop=True)
X_inf = pts_inf[feats]
# Обучение, инференс
r2_scores = []
mapes = []
y_infers = []
while len(r2_scores) < 5:
x_trn, x_test, y_trn, y_test = ms.train_test_split(X_trn, Y_trn, test_size=0.2)
model = catboost.CatBoostRegressor(cat_features=[])
model.fit(x_trn, y_trn, verbose=False)
r2_score = metrics.r2_score(y_test, model.predict(x_test))
mape = metrics.mean_absolute_percentage_error(y_test, model.predict(x_test))
if ((r2_score > 0) & (mape < 0.5)):
r2_scores.append(r2_score)
mapes.append(mape)
y_infers.append(model.predict(X_inf.drop(columns=['id'])))
current_pred = sum(y_infers) / 5
# Обновление полей по результатам работы модели
update_fields = pts_inf[['id', 'delta_current', 'delta_first', 'plan_current', 'plan_first', 'prediction_first']]
update_fields = update_fields.join(
pd.concat(
[
X_inf[['id']],
pd.DataFrame({'prediction_current': current_pred}),
],
axis=1,
).set_index('id'),
on='id',
)
update_fields['prediction_current'] = update_fields['prediction_current'].astype(int)
# Загрузка в базу обновленных значений
conn2 = psycopg2.connect(
database=os.getenv('POSTGRES_DB', 'postgres'), user=os.getenv('POSTGRES_USER', 'postgres'),
password=os.getenv('POSTGRES_PASSWORD', 'postgres'),
host=os.getenv('POSTGRES_HOST', 'postgres'), port=os.getenv('POSTGRES_PORT', 'postgres'),
options='-c search_path=public',
)
cursor = conn2.cursor()
update_records1 = []
for i in range(0, len(update_fields)):
update_records1.append((int(update_fields.prediction_current[i]), int(update_fields.id[i])))
sql_update_query = """Update service_placementpoint set prediction_current = %s where id = %s"""
cursor.executemany(sql_update_query, update_records1)
conn2.commit()
@shared_task()
def add_age_day():
qs = PlacementPoint.objects
c1 = qs.filter(sample_trn=True).count()
qs.update(age_day=F('age_day') + 1)
qs2 = qs.filter(age_day__gt=AGE_DAY_LIMIT)
qs2.update(sample_trn=True)
c2 = PlacementPoint.objects.filter(sample_trn=True).count()
if c2 - c1 != 0:
raschet.delay()