import os import catboost import pandas as pd import psycopg2 import sqlalchemy from celery import shared_task from django.db.models import F from sklearn import metrics from sklearn import model_selection as ms from sqlalchemy import text from postamates.settings import AGE_DAY_LIMIT from postamates.settings import DB_URL from service.models import PlacementPoint # Запустить worker # celery -A postamates worker -l info # Запустить scheduler # celery -A postamates beat -l INFO. @shared_task() def raschet(): conn = sqlalchemy.create_engine( DB_URL, connect_args={'options': '-csearch_path=public'}, ) query = text('select * from service_placementpoint') pts = pd.read_sql(query, conn.connect()) feats = ['id', 'popul_home', 'popul_job', 'other_post_cnt', 'yndxfood_sum', 'target_post_cnt'] # Записи для обучения pts_trn = pts.loc[pts.sample_trn == True].reset_index(drop=True) X_trn = pts_trn[feats].drop(columns=['id']) Y_trn = pts_trn[['fact']] # Записи для инференса pts_inf = pts.loc[(pts.status == 'Pending') | (pts.status == 'Installation') | (pts.status == 'Cancelled') | ((pts.status == 'Pending') & (pts.sample_trn == False))].reset_index(drop=True) X_inf = pts_inf[feats] # Обучение, инференс r2_scores = [] mapes = [] y_infers = [] while len(r2_scores) < 5: x_trn, x_test, y_trn, y_test = ms.train_test_split(X_trn, Y_trn, test_size=0.2) model = catboost.CatBoostRegressor(cat_features=[]) model.fit(x_trn, y_trn, verbose=False) r2_score = metrics.r2_score(y_test, model.predict(x_test)) mape = metrics.mean_absolute_percentage_error(y_test, model.predict(x_test)) if ((r2_score > 0) & (mape < 0.5)): r2_scores.append(r2_score) mapes.append(mape) y_infers.append(model.predict(X_inf.drop(columns=['id']))) current_pred = sum(y_infers) / 5 # Обновление полей по результатам работы модели update_fields = pts_inf[['id', 'delta_current', 'delta_first', 'plan_current', 'plan_first', 'prediction_first']] update_fields = update_fields.join( pd.concat( [ X_inf[['id']], pd.DataFrame({'prediction_current': current_pred}), ], axis=1, ).set_index('id'), on='id', ) update_fields['prediction_current'] = update_fields['prediction_current'].astype(int) # Загрузка в базу обновленных значений conn2 = psycopg2.connect( database=os.getenv('POSTGRES_DB', 'postgres'), user=os.getenv('POSTGRES_USER', 'postgres'), password=os.getenv('POSTGRES_PASSWORD', 'postgres'), host=os.getenv('POSTGRES_HOST', 'postgres'), port=os.getenv('POSTGRES_PORT', 'postgres'), options='-c search_path=public', ) cursor = conn2.cursor() update_records1 = [] for i in range(0, len(update_fields)): update_records1.append((int(update_fields.prediction_current[i]), int(update_fields.id[i]))) sql_update_query = """Update service_placementpoint set prediction_current = %s where id = %s""" cursor.executemany(sql_update_query, update_records1) conn2.commit() @shared_task() def add_age_day(): qs = PlacementPoint.objects c1 = qs.filter(sample_trn=True).count() qs.update(age_day=F('age_day') + 1) qs2 = qs.filter(age_day__gt=AGE_DAY_LIMIT) qs2.update(sample_trn=True) c2 = PlacementPoint.objects.filter(sample_trn=True).count() if c2 - c1 != 0: raschet.delay()