From 7e016078f0f8f9dd85ab464ab409048415d4b1d7 Mon Sep 17 00:00:00 2001 From: AlexP077 Date: Sun, 16 Apr 2023 18:26:59 +0600 Subject: [PATCH] ml_func_3+add_filter --- .flake8 | 2 +- postamates/settings.py | 13 +++ service/tasks.py | 210 +++++++++++++++++++++++++++++++++++++---- service/urls.py | 18 ++-- service/views.py | 4 + 5 files changed, 220 insertions(+), 27 deletions(-) diff --git a/.flake8 b/.flake8 index 8216ee5..88a8bda 100644 --- a/.flake8 +++ b/.flake8 @@ -7,4 +7,4 @@ per-file-ignores = service/migrations/*:E501 service/views.py:C901 service/models.py:F403,F401 - service/tasks.py:E712 + service/tasks.py:E712,C901 diff --git a/postamates/settings.py b/postamates/settings.py index 97f6f9a..01b917b 100644 --- a/postamates/settings.py +++ b/postamates/settings.py @@ -162,6 +162,19 @@ REST_REGISTRATION = { 'VERIFICATION_FROM_EMAIL': 'noreply@spatiality.website', 'USER_LOGIN_FIELDS': ['email'], } + +SWAGGER_SETTINGS = { + 'DEFAULT_INFO': 'service.urls.info', + 'USE_SESSION_AUTH': False, + 'SECURITY_DEFINITIONS': { + 'basic': { + 'type': 'basic', + }, + }, + 'SWAGGER_PATH': 'django_static/swagger/swagger.yaml', +} + + SRID = 4326 # celery config diff --git a/service/tasks.py b/service/tasks.py index fd316cd..37ef992 100644 --- a/service/tasks.py +++ b/service/tasks.py @@ -1,11 +1,16 @@ import os import catboost +import geopandas as gpd +import numpy as np import pandas as pd import psycopg2 import sqlalchemy from celery import shared_task from django.db.models import F +from scipy import interpolate +from scipy.spatial import distance +from shapely import wkb from sklearn import metrics from sklearn import model_selection as ms from sqlalchemy import text @@ -15,12 +20,6 @@ from postamates.settings import DB_URL from service.models import PlacementPoint -# Запустить worker -# celery -A postamates worker -l info -# Запустить scheduler -# celery -A postamates beat -l INFO. - - @shared_task() def raschet(): conn = sqlalchemy.create_engine( @@ -30,8 +29,16 @@ def raschet(): query = text('select * from service_placementpoint') connection = conn.connect() pts = pd.read_sql(query, connection) - pts.loc[pts.target_dist > 700, 'target_dist'] = 700 - pts = pts.sort_values(by='id').reset_index(drop=True) + pts['geometry'] = pts['geometry'].apply(wkb.loads, hex=True) + pts = gpd.GeoDataFrame(pts, geometry='geometry', crs='epsg:4326') + pts = pts.to_crs('epsg:32637') + pts = pts.rename( + columns={ + 'target_cnt_nearby_mean': 'target_dist1', + 'target_age_nearby_mean': 'target_dist2', + 'yndxfood_cnt_cst': 'target_dist3', + }, + ) feats = [ 'id', 'metro_dist', 'target_dist', 'property_price_bargains', 'property_price_offers', @@ -41,13 +48,53 @@ def raschet(): 'pharmacy_cnt', 'supermarket_cnt', 'supermarket_premium_cnt', 'clinic_cnt', 'bank_cnt', 'reca_cnt', 'lab_cnt', 'culture_cnt', 'attraction_cnt', 'mfc_cnt', 'bc_cnt', 'tc_cnt', 'rival_pvz_cnt', 'rival_post_cnt', - 'business_activity', 'age_day', 'target_age_nearby_mean', 'target_cnt_ao_mean', - # 'target_cnt_nearby_mean' + 'business_activity', 'age_day', 'target_cnt_ao_mean', 'target_dist1', 'target_dist2', 'target_dist3', ] # Записи для обучения pts_trn = pts.loc[pts.sample_trn == True].reset_index(drop=True) - # pts_trn = pts_trn.loc[pts_trn.fact < 450].reset_index(drop=True) + pts_trn = gpd.GeoDataFrame(pts_trn, geometry='geometry', crs='epsg:32637') + pts_target = pts_trn[['geometry']] + pts_target['cnt'] = 1 + pts_target = gpd.GeoDataFrame(pts_target, geometry='geometry', crs='epsg:32637') + + target_feature_coords = [] + for i in range(0, len(pts_target)): + target_feature_coords.append((pts_target.geometry.x[i], pts_target.geometry.y[i])) + target_feature_coords = np.array(target_feature_coords) + + pts_trn['target_dist'] = pts_trn.apply( + lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[1])), + axis=1, + ) + pts_trn.loc[pts_trn.target_dist > 700, 'target_dist'] = 700 + + pts_trn['target_dist1'] = pts_trn.apply( + lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[2])), + axis=1, + ) + pts_trn.loc[pts_trn.target_dist1 > 700, 'target_dist1'] = 700 + + pts_trn['target_dist2'] = pts_trn.apply( + lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[3])), + axis=1, + ) + pts_trn.loc[pts_trn.target_dist2 > 700, 'target_dist2'] = 700 + + pts_trn['target_dist3'] = pts_trn.apply( + lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[4])), + axis=1, + ) + pts_trn.loc[pts_trn.target_dist3 > 700, 'target_dist3'] = 700 + + pts_trn['buf'] = pts_trn.buffer(500) + pts_trn = gpd.GeoDataFrame(pts_trn, geometry='buf', crs='epsg:32637') + target_post = gpd.sjoin(pts_trn, pts_target, op='contains').groupby('id', as_index=False).agg({'cnt': 'count'}) + target_post = target_post.rename(columns={'cnt': 'target_post_cnt'}) + pts_trn = pts_trn.drop(columns=['target_post_cnt']) + pts_trn = pts_trn.join(target_post.set_index('id'), on='id') + pts_trn['target_post_cnt'] = pts_trn['target_post_cnt'] - 1 + pts_trn = pts_trn.sort_values(by='id').reset_index(drop=True) X_trn = pts_trn[feats].drop(columns=['id']) Y_trn = pts_trn[['fact']] @@ -56,9 +103,56 @@ def raschet(): (pts.status == 'Installation') | (pts.status == 'Cancelled') | ((pts.status == 'Working') & (pts.sample_trn == False))].reset_index(drop=True) + pts_inf = gpd.GeoDataFrame(pts_inf, geometry='geometry', crs='epsg:32637') + + pts_inf['buf'] = pts_inf.buffer(500) + pts_inf = gpd.GeoDataFrame(pts_inf, geometry='buf', crs='epsg:32637') + pts_target = pts.loc[(pts.status == 'Working') | + (pts.status == 'Installation') | + (pts.sample_trn == True)].reset_index(drop=True) + pts_target = pts_target[['geometry']] + pts_target['cnt'] = 1 + pts_target = gpd.GeoDataFrame(pts_target, geometry='geometry', crs='epsg:32637') + + target_feature_coords = [] + for i in range(0, len(pts_target)): + target_feature_coords.append((pts_target.geometry.x[i], pts_target.geometry.y[i])) + target_feature_coords = np.array(target_feature_coords) + + pts_inf['target_dist'] = pts_inf.apply( + lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[0])), + axis=1, + ) + pts_inf.loc[pts_inf.target_dist > 700, 'target_dist'] = 700 + + pts_inf['target_dist1'] = pts_inf.apply( + lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[1])), + axis=1, + ) + pts_inf.loc[pts_inf.target_dist1 > 700, 'target_dist1'] = 700 + + pts_inf['target_dist2'] = pts_inf.apply( + lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[2])), + axis=1, + ) + pts_inf.loc[pts_inf.target_dist2 > 700, 'target_dist2'] = 700 + + pts_inf['target_dist3'] = pts_inf.apply( + lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[3])), + axis=1, + ) + pts_inf.loc[pts_inf.target_dist3 > 700, 'target_dist3'] = 700 + + pts_inf = pts_inf.sort_values(by='id').reset_index(drop=True) + target_post = gpd.sjoin(pts_inf, pts_target, op='contains').groupby('id', as_index=False).agg({'cnt': 'count'}) + target_post = target_post.rename(columns={'cnt': 'target_post_cnt'}) + pts_inf = pts_inf.drop(columns=['target_post_cnt']) + pts_inf = pts_inf.join(target_post.set_index('id'), on='id') + pts_inf['age_day_init'] = pts_inf['age_day'] pts_inf['age_day'] = 240 X_inf = pts_inf[feats] - seeds = [39, 85, 15, 1, 59] + + seeds = [3, 99, 87, 21, 15] # Обучение, инференс r2_scores = [] @@ -79,12 +173,17 @@ def raschet(): current_pred = sum(y_infers) / 5 # Обновление полей по результатам работы модели - update_fields = pts_inf[['id', 'delta_current', 'delta_first', 'plan_current', 'plan_first', 'prediction_first']] + update_fields = pts_inf[ + [ + 'id', 'age_day_init', 'status', 'fact', 'delta_current', 'delta_first', 'plan_current', 'plan_first', + 'prediction_first', + ] + ] update_fields = update_fields.join( pd.concat( [ X_inf[['id']], - pd.DataFrame([{'prediction_current': current_pred}]), + pd.DataFrame({'prediction_current': current_pred}), ], axis=1, ).set_index('id'), @@ -92,6 +191,30 @@ def raschet(): ) update_fields['prediction_current'] = update_fields['prediction_current'].astype(int) + days_x = np.array([0, 30, 60, 90, 120, 150, 180, 210, 240, 270]) + perc_y = np.array([0, 0.15, 0.20, 0.30, 0.60, 0.70, 0.70, 0.75, 0.75, 0.80]) + spl = interpolate.splrep(days_x, perc_y) + + update_fields['plan_first'] = update_fields.apply( + lambda x: (x.prediction_first * interpolate.splev(x.age_day_init, spl) if x.status == 'Working' else 0), + axis=1, + ) + update_fields['plan_current'] = update_fields.apply( + lambda x: (x.prediction_current * interpolate.splev(x.age_day_init, spl) if x.status == 'Working' else 0), + axis=1, + ) + update_fields['delta_first'] = update_fields.apply( + lambda x: ((x.fact - x.plan_first) / x.plan_first * 100 if x.status == 'Working' else 0), + axis=1, + ) + update_fields['delta_current'] = update_fields.apply( + lambda x: ((x.fact - x.plan_current) / x.plan_current * 100 if x.status == 'Working' else 0), + axis=1, + ) + + update_fields_working = update_fields.loc[update_fields.status == 'Working'].reset_index(drop=True) + update_fields_working = update_fields_working.fillna(0) + # Загрузка в базу обновленных значений conn2 = psycopg2.connect( database=os.getenv('POSTGRES_DB', 'postgres'), user=os.getenv('POSTGRES_USER', 'postgres'), @@ -100,17 +223,70 @@ def raschet(): options='-c search_path=public', ) cursor = conn2.cursor() + + # prediction_current update_records1 = [] for i in range(0, len(update_fields)): update_records1.append((int(update_fields.prediction_current[i]), int(update_fields.id[i]))) - sql_update_query = """Update service_placementpoint set prediction_current = %s where id = %s""" try: - cursor.executemany(sql_update_query, update_records1) + psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1) + conn2.commit() + except Exception: + cursor.execute('ROLLBACK') + psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1) + conn2.commit() + + # plan_first + update_records2 = [] + for i in range(0, len(update_fields_working)): + update_records2.append((int(update_fields_working.plan_first[i]), int(update_fields_working.id[i]))) + sql_update_query = """Update service_placementpoint set plan_first = %s where id = %s""" + try: + psycopg2.extras.execute_batch(cursor, sql_update_query, update_records2) + conn2.commit() + except Exception: + cursor.execute('ROLLBACK') + psycopg2.extras.execute_batch(cursor, sql_update_query, update_records2) + conn2.commit() + + # plan_current + update_records3 = [] + for i in range(0, len(update_fields_working)): + update_records3.append((int(update_fields_working.plan_current[i]), int(update_fields_working.id[i]))) + sql_update_query = """Update service_placementpoint set plan_current = %s where id = %s""" + try: + psycopg2.extras.execute_batch(cursor, sql_update_query, update_records3) + conn2.commit() + except Exception: + cursor.execute('ROLLBACK') + psycopg2.extras.execute_batch(cursor, sql_update_query, update_records3) + conn2.commit() + + # delta_first + update_records4 = [] + for i in range(0, len(update_fields_working)): + update_records4.append((int(update_fields_working.delta_first[i]), int(update_fields_working.id[i]))) + sql_update_query = """Update service_placementpoint set delta_first = %s where id = %s""" + try: + psycopg2.extras.execute_batch(cursor, sql_update_query, update_records4) + conn2.commit() + except Exception: + cursor.execute('ROLLBACK') + psycopg2.extras.execute_batch(cursor, sql_update_query, update_records4) + conn2.commit() + + # delta_current + update_records5 = [] + for i in range(0, len(update_fields_working)): + update_records5.append((int(update_fields_working.delta_current[i]), int(update_fields_working.id[i]))) + sql_update_query = """Update service_placementpoint set delta_current = %s where id = %s""" + try: + psycopg2.extras.execute_batch(cursor, sql_update_query, update_records5) conn2.commit() except Exception: cursor.execute('ROLLBACK') - cursor.executemany(sql_update_query, update_records1) + psycopg2.extras.execute_batch(cursor, sql_update_query, update_records5) conn2.commit() diff --git a/service/urls.py b/service/urls.py index 668a334..4736770 100644 --- a/service/urls.py +++ b/service/urls.py @@ -12,16 +12,16 @@ from service import views router = routers.DefaultRouter() router.register('', views.PlacementPointViewSet) - +info = openapi.Info( + title='Snippets API', + default_version='v1', + description='Test description', + terms_of_service='https://www.google.com/policies/terms/', + contact=openapi.Contact(email='contact@snippets.local'), + license=openapi.License(name='BSD License'), +) schema_view = get_schema_view( - openapi.Info( - title='Snippets API', - default_version='v1', - description='Test description', - terms_of_service='https://www.google.com/policies/terms/', - contact=openapi.Contact(email='contact@snippets.local'), - license=openapi.License(name='BSD License'), - ), + info, url='https://postamates.spatiality.website/', public=True, permission_classes=[permissions.AllowAny], diff --git a/service/views.py b/service/views.py index 257b87c..fdb6d49 100644 --- a/service/views.py +++ b/service/views.py @@ -52,6 +52,7 @@ class PlacementPointViewSet(ReadOnlyModelViewSet): excluded = self.request.GET.get('excluded[]') plan_first = self.request.GET.get('plan_first[]') plan_current = self.request.GET.get('plan_current[]') + delta_first = self.request.GET.get('delta_first[]') delta_current = self.request.GET.get('delta_current[]') rayons = self.request.GET.get('area[]') aos = self.request.GET.get('district[]') @@ -88,6 +89,9 @@ class PlacementPointViewSet(ReadOnlyModelViewSet): if delta_current: delta_current = list(delta_current.split(',')) qs = qs.filter(delta_current__range=delta_current) + if delta_first: + delta_first = list(delta_first.split(',')) + qs = qs.filter(delta_first__range=delta_first) if rayons: rayons = list(rayons.split(',')) qs = qs.filter(area_id__in=rayons)