From 583163d9ecfb5745cc12bdf99f0099694a521390 Mon Sep 17 00:00:00 2001 From: Alexander Shibaev Date: Mon, 24 Feb 2025 08:06:06 +0000 Subject: [PATCH] New hereapi key; fix ml task, #SSTDEV-1458, SSTDEV-1430, SSTDEV-1397 --- .gitignore | 4 +- deploy/beat.yml | 2 +- deploy/dockerfiles/Dockerfile | 2 +- docker-compose.yml | 51 +- postamates/celery.py | 2 + postamates/settings.py | 24 +- pyproject.toml | 4 + .../management/commands/create_procedures.py | 10 +- service/management/commands/create_views.py | 10 +- service/management/commands/delete_views.py | 10 +- service/management/commands/kill_update.py | 4 +- service/permissions.py | 21 +- service/tasks.py | 637 ++++++++++-------- service/utils.py | 5 +- service/views.py | 9 +- 15 files changed, 476 insertions(+), 319 deletions(-) create mode 100644 pyproject.toml diff --git a/.gitignore b/.gitignore index 474d4cb..26025bd 100644 --- a/.gitignore +++ b/.gitignore @@ -134,4 +134,6 @@ nginx/nginx.conf.prod docker-compose.dev.yml pg_dumps/ django_static/ -dit_frontend/ \ No newline at end of file +dit_frontend/ + +catboost_info diff --git a/deploy/beat.yml b/deploy/beat.yml index f28fbce..f14d364 100644 --- a/deploy/beat.yml +++ b/deploy/beat.yml @@ -19,7 +19,7 @@ spec: containers: - name: beat image: DEPLOY_IMAGE_TAG - command: ["sh", "-c", "celery -A postamates beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler"] + command: ["sh", "-c", "celery -A postamates beat -l info"] envFrom: - configMapRef: name: postamates-configmap diff --git a/deploy/dockerfiles/Dockerfile b/deploy/dockerfiles/Dockerfile index 01aa35a..cc76767 100644 --- a/deploy/dockerfiles/Dockerfile +++ b/deploy/dockerfiles/Dockerfile @@ -3,7 +3,7 @@ FROM ${YC_CONTAINER_REGISTRY}/public/python:3.8 RUN apt-get update && \ apt-get install -y binutils libproj-dev gdal-bin && \ - apt-get install -y postgresql-client + apt-get install -y postgresql-client procps gdb ENV PYTHONUNBUFFERED 1 diff --git a/docker-compose.yml b/docker-compose.yml index 397a3f4..3cc8d98 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,3 @@ -version: '3.5' - x-postgres-variables: &postgres-variables POSTGRES_DB: "${POSTGRES_DB}" POSTGRES_HOST: "${POSTGRES_HOST}" @@ -17,15 +15,20 @@ x-frontend-variables: &frontend-variables DOMAIN: "${DOMAIN}" REACT_APP_DOMAIN_URL: "https://${DOMAIN}/" - x-martin-variables: &martin-variables MARTIN_PORT: "${MARTIN_PORT}" services: django: + security_opt: + - seccomp:unconfined + cap_add: + - SYS_PTRACE container_name: ${CONTAINERS_NAME}_django - build: . + build: + context: . + dockerfile: ./deploy/dockerfiles/Dockerfile command: > sh -c "python manage.py migrate && python manage.py collectstatic --noinput && @@ -36,16 +39,17 @@ services: python manage.py loaddata fixtures/otherobjectsgroups.json && python manage.py runserver 0.0.0.0:${DJANGO_PORT}" environment: - <<: *postgres-variables - <<: *django-variables - <<: *frontend-variables + <<: [*postgres-variables, *django-variables, *frontend-variables] + CELERY_BROKER_URL: amqp://loyalty-rabbit:5672 ports: - "${DJANGO_PORT}:${DJANGO_PORT}" + - 5678:5678 expose: - "${DJANGO_PORT}" restart: always depends_on: - db + loyalty-rabbit: image: rabbitmq:3.9-management container_name: loyalty-rabbit @@ -59,30 +63,46 @@ services: - 5672:5672 beat: + security_opt: + - seccomp:unconfined + cap_add: + - SYS_PTRACE restart: always environment: - <<: *postgres-variables - <<: *django-variables + <<: [*postgres-variables, *django-variables] + CELERY_BROKER_URL: amqp://loyalty-rabbit:5672 build: context: . + dockerfile: ./deploy/dockerfiles/Dockerfile entrypoint: [ "celery", "-A", "postamates", "beat","-l", "info", "--scheduler", "django_celery_beat.schedulers:DatabaseScheduler" ] depends_on: - db - loyalty-rabbit - django - worker + ports: + - 5679:5678 + worker: + security_opt: + - seccomp:unconfined + cap_add: + - SYS_PTRACE restart: always environment: - <<: *postgres-variables - <<: *django-variables + <<: [*postgres-variables, *django-variables] + CELERY_BROKER_URL: amqp://loyalty-rabbit:5672 build: context: . - entrypoint: [ "celery", "-A", "postamates.celery:app", "worker" ] + dockerfile: ./deploy/dockerfiles/Dockerfile + entrypoint: [ "celery", "-A", "postamates.celery:app", "worker", "--concurrency", "1" ] depends_on: - db - loyalty-rabbit - django + ports: + - 5680:5678 + db: container_name: ${CONTAINERS_NAME}_db image: mdillon/postgis @@ -93,7 +113,7 @@ services: expose: - "${POSTGRES_PORT}" volumes: - - ${POSTGRES_VOLUME_PATH}:/var/lib/postgresql/data + - pg_data:/var/lib/postgresql/data command: -p ${POSTGRES_PORT} martin: @@ -109,15 +129,16 @@ services: - db - django restart: always + swagger-ui: image: swaggerapi/swagger-ui ports: - "8099:8099" environment: PORT: 8099 - SWAGGER_JSON_URL: "https://postnet-dev.selftech.ru/media/swagger.json" - + SWAGGER_JSON_URL: "https://localhost/media/swagger.json" volumes: rabbitmq_data: rabbitmq_log: + pg_data: diff --git a/postamates/celery.py b/postamates/celery.py index b052e0f..96bd385 100644 --- a/postamates/celery.py +++ b/postamates/celery.py @@ -13,6 +13,8 @@ app = Celery( include=['service.tasks'], ) app.config_from_object('django.conf:settings', namespace=CELERY_NAMESPACE) + + app.conf.beat_schedule = { 'age_day_every_24h': { 'task': 'service.tasks.add_age_day', diff --git a/postamates/settings.py b/postamates/settings.py index 6e8d813..ddec77d 100644 --- a/postamates/settings.py +++ b/postamates/settings.py @@ -44,7 +44,7 @@ INSTALLED_APPS = [ 'django_json_widget', 'django.contrib.gis', 'django_celery_beat', - 'drf_keycloak_auth', + # 'drf_keycloak_auth', ] MIDDLEWARE = [ @@ -178,7 +178,7 @@ AGE_DAY_BORDER = 30 EXCEL_EXPORT_FILENAME = 'placement_points.xlsx' JSON_EXPORT_FILENAME = 'placement_points.json' DATA_UPLOAD_MAX_NUMBER_FIELDS = None -GEOCODER_API_KEY = os.getenv('GEOCODER_API_KEY','TzgdKWgyI2nfaz1WHRD-aYJK4e400MiOJQP7Enf1e1M') +GEOCODER_API_KEY = os.getenv('GEOCODER_API_KEY','P9af-1_eLtmraO5A_B5Kbvo4z4MAlzGoGOCH5g1_uiA') STATUS_TASK_NAME='status_task' STATUS_TASK_NAME_IMPORT='import_status_task' @@ -186,20 +186,20 @@ STATUS_TASK_NAME_IMPORT='import_status_task' REST_FRAMEWORK = { 'DEFAULT_AUTHENTICATION_CLASSES': [ 'rest_framework.authentication.SessionAuthentication', - 'drf_keycloak_auth.authentication.KeycloakAuthentication', + # 'drf_keycloak_auth.authentication.KeycloakAuthentication', ] } -DRF_KEYCLOAK_AUTH = { - # 'KEYCLOAK_SERVER_URL': 'http://keycloak.dev.selfservicetech.ru/auth', - 'KEYCLOAK_SERVER_URL': os.getenv('KEYCLOAK_SERVER_URL', 'https://kk.dev.selftech.ru/auth'), - 'KEYCLOAK_REALM': os.getenv('KEYCLOAK_REALM', 'SST'), - 'KEYCLOAK_CLIENT_ID': os.getenv('KEYCLOAK_CLIENT_ID','postnet'), - 'KEYCLOAK_CLIENT_SECRET_KEY': os.getenv('KEYCLOAK_CLIENT_SECRET_KEY','K2yHweEUispkVeWn03VMk843sW2Moic5'), - 'KEYCLOAK_MANAGE_LOCAL_USER': False, - 'KEYCLOAK_ROLE_SET_PREFIX': 'realm_access', -} +# DRF_KEYCLOAK_AUTH = { +# # 'KEYCLOAK_SERVER_URL': 'http://keycloak.dev.selfservicetech.ru/auth', +# 'KEYCLOAK_SERVER_URL': os.getenv('KEYCLOAK_SERVER_URL', 'https://kk.dev.selftech.ru/auth'), +# 'KEYCLOAK_REALM': os.getenv('KEYCLOAK_REALM', 'SST'), +# 'KEYCLOAK_CLIENT_ID': os.getenv('KEYCLOAK_CLIENT_ID','postnet'), +# 'KEYCLOAK_CLIENT_SECRET_KEY': os.getenv('KEYCLOAK_CLIENT_SECRET_KEY','K2yHweEUispkVeWn03VMk843sW2Moic5'), +# 'KEYCLOAK_MANAGE_LOCAL_USER': False, +# 'KEYCLOAK_ROLE_SET_PREFIX': 'realm_access', +# } KK_EDITOR_ROLE = os.getenv('KK_EDITOR_ROLE', 'postnet_editor') KK_VIEWER_ROLE = os.getenv('KK_VIEWER_ROLE', 'postnet_viewer') diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..bea1812 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,4 @@ +[tool.ruff] +target-version = 'py38' +line-length = 120 +indent-width = 4 diff --git a/service/management/commands/create_procedures.py b/service/management/commands/create_procedures.py index dc22c84..6b1ba73 100644 --- a/service/management/commands/create_procedures.py +++ b/service/management/commands/create_procedures.py @@ -1,5 +1,5 @@ from django.core.management.base import BaseCommand -from service.utils import run_sql_command, log_to_telegram +from service.utils import run_sql_command, log CMD_PIVOT_DIST = """CREATE OR REPLACE VIEW compact_placementpoint AS SELECT id, status, category, age_day, fact, area_id, district_id, prediction_first, prediction_current, doors, flat_cnt, rival_post_cnt, rival_pvz_cnt, target_post_cnt, flats_cnt, tc_cnt, culture_cnt, mfc_cnt, public_stop_cnt, supermarket_cnt, target_dist, metro_dist, geometry, delta_first, delta_current FROM service_placementpoint; @@ -63,10 +63,10 @@ class Command(BaseCommand): def handle(self, *args, **kwargs): try: - log_to_telegram('Creating procedures') + log('Creating procedures') run_sql_command(CMD_PIVOT_DIST) - log_to_telegram('pivot_dist created') + log('pivot_dist created') run_sql_command(CMD_PIVOT_DIST_PRE) - log_to_telegram('prepivot_dist created') + log('prepivot_dist created') except Exception as e: - log_to_telegram('Error creating views: ' + str(e)) \ No newline at end of file + log('Error creating views: ' + str(e)) diff --git a/service/management/commands/create_views.py b/service/management/commands/create_views.py index bc6c1e8..fc19029 100644 --- a/service/management/commands/create_views.py +++ b/service/management/commands/create_views.py @@ -1,5 +1,5 @@ from django.core.management.base import BaseCommand -from service.utils import run_sql_command, log_to_telegram +from service.utils import run_sql_command, log CMD_PIVOT_DIST = """CALL public.pivot_dist();""" @@ -9,10 +9,10 @@ class Command(BaseCommand): def handle(self, *args, **kwargs): try: - log_to_telegram('Creating views') + log('Creating views') run_sql_command(CMD_PIVOT_DIST) - log_to_telegram('pivot_dist created') + log('pivot_dist created') run_sql_command(CMD_PIVOT_DIST_PRE) - log_to_telegram('prepivot_dist created') + log('prepivot_dist created') except Exception as e: - log_to_telegram('Error creating views: ' + str(e)) \ No newline at end of file + log('Error creating views: ' + str(e)) diff --git a/service/management/commands/delete_views.py b/service/management/commands/delete_views.py index eb7ff35..76f4293 100644 --- a/service/management/commands/delete_views.py +++ b/service/management/commands/delete_views.py @@ -1,5 +1,5 @@ from django.core.management.base import BaseCommand -from service.utils import run_sql_command, log_to_telegram +from service.utils import run_sql_command, log CMD_PIVOT_DIST = """DROP MATERIALIZED VIEW IF EXISTS public.points_with_dist;""" @@ -9,10 +9,10 @@ class Command(BaseCommand): def handle(self, *args, **kwargs): try: - log_to_telegram('Deleting views') + log('Deleting views') run_sql_command(CMD_PIVOT_DIST) - log_to_telegram('pivot_dist deleted') + log('pivot_dist deleted') run_sql_command(CMD_PIVOT_DIST_PRE) - log_to_telegram('prepivot_dist deleted') + log('prepivot_dist deleted') except Exception as e: - log_to_telegram('Error deleting views: ' + str(e)) + log('Error deleting views: ' + str(e)) diff --git a/service/management/commands/kill_update.py b/service/management/commands/kill_update.py index b811c46..d99da33 100644 --- a/service/management/commands/kill_update.py +++ b/service/management/commands/kill_update.py @@ -1,5 +1,5 @@ from django.core.management.base import BaseCommand -from service.utils import run_sql_command, log_to_telegram +from service.utils import run_sql_command, log from service.models import TaskStatus from postamates.settings import STATUS_TASK_NAME, STATUS_TASK_NAME_IMPORT @@ -9,4 +9,4 @@ class Command(BaseCommand): def handle(self, *args, **kwargs): TaskStatus.objects.filter(task_name=STATUS_TASK_NAME).update(status='Перерасчет ML завершен') - TaskStatus.objects.filter(task_name=STATUS_TASK_NAME_IMPORT).update(status='Перерасчет ML завершен') \ No newline at end of file + TaskStatus.objects.filter(task_name=STATUS_TASK_NAME_IMPORT).update(status='Перерасчет ML завершен') diff --git a/service/permissions.py b/service/permissions.py index ce1866c..e44c0d8 100644 --- a/service/permissions.py +++ b/service/permissions.py @@ -1,14 +1,31 @@ from rest_framework.permissions import BasePermission # from drf_keycloak_auth.authentication import KeycloakAuthentication from django.conf import settings -from logging import getLogger +from django.core.handlers.wsgi import WSGIRequest +from logging import getLogger, basicConfig, DEBUG + +basicConfig(level=DEBUG) logger = getLogger(__name__) + +def serialize(obj): + attributes = sorted(list(dir(obj))) + + for attr in attributes: + try: + value = getattr(obj, attr) + yield f"{attr}: {value}\n" + except: + pass + + class UserPermission(BasePermission): - def has_permission(self, request, view): + def has_permission(self, request: WSGIRequest, view): + return True # logger.error(f'KK_CLIENT_ID: {settings.DRF_KEYCLOAK_AUTH["KEYCLOAK_CLIENT_ID"]}') # logger.error(f'KK_CLIENT_SECRET_KEY: {settings.DRF_KEYCLOAK_AUTH["KEYCLOAK_CLIENT_SECRET_KEY"]}') + kk_profile = request.auth kk_roles = kk_profile.get('resource_access',{}).get('postnet',{}).get('roles',[]) if request.method not in ['GET']: diff --git a/service/tasks.py b/service/tasks.py index f8703e5..48dbe1e 100644 --- a/service/tasks.py +++ b/service/tasks.py @@ -1,4 +1,5 @@ import os +from typing import cast import catboost import geopandas as gpd @@ -6,6 +7,7 @@ import numpy as np import pandas as pd import shap import psycopg2 +import psycopg2.extras import sqlalchemy from celery import shared_task from django.db.models import F @@ -20,7 +22,7 @@ from postamates.settings import AGE_DAY_LIMIT from postamates.settings import DB_URL, STATUS_TASK_NAME, STATUS_TASK_NAME_IMPORT from service.models import PlacementPoint, LastMLCall from service import models -from service.utils import log_to_telegram +from service.utils import log import base64 from io import StringIO from django.core.cache import cache @@ -29,260 +31,336 @@ from service.service import PointService from service.utils import run_psql_command, change_status -@shared_task() -def raschet(table_name='service_placementpoint', need_time=True, task_name=STATUS_TASK_NAME): - print('start raschet') - # status, _ = models.TaskStatus.objects.get_or_create(task_name=STATUS_TASK_NAME) - raschet_objs = models.RaschetObjects.objects.all() - if raschet_objs: - change_status('Начало расчета кол-ва ПВЗ вокруг точек', task_name=task_name) +def get_engine(): + try: + log("try connect to db") + return sqlalchemy.create_engine( + DB_URL, + connect_args={"options": "-csearch_path=public"}, + ) + except: + log("error connect to db") + raise + + +def do_computations(table_name: str, need_time: bool, task_name: str): + def count_nearby_pvz(): + raschet_objs = models.RaschetObjects.objects.all() + if not raschet_objs: + return + change_status("Начало расчета кол-ва ПВЗ вокруг точек", task_name=task_name) total = raschet_objs.count() for _i, r_o in enumerate(raschet_objs): obj = models.Post_and_pvz.objects.get(id=r_o.obj_id) LayerService().count_post_pvz_for_placementpoint(obj) - change_status(f'Подсчет кол-ва ПВЗ вокруг точек: {str(int((_i + 1) / total * 100))}%', task_name=task_name) - change_status('Подсчет кол-ва ПВЗ вокруг точек завершен', task_name=task_name) - group_objects = models.RaschetGroups.objects.all() - group_total = group_objects.count() - if group_objects: - change_status('Начало расчета расстояний', task_name=task_name) - qs = models.PlacementPoint.objects.all() - for _k, g_o in enumerate(group_objects): - g = models.Post_and_pvzGroup.objects.get(id=g_o.obj_id) - for q in qs: - PointService.calculate_dist_for_group(point=q, group=g) - change_status(f'Подсчет расстояний: {str(int((_k + 1) / group_total * 100))}%', task_name=task_name) - change_status('Расчет завершен', task_name=task_name) - models.RaschetObjects.objects.all().delete() - models.RaschetGroups.objects.all().delete() + change_status( + f"Подсчет кол-ва ПВЗ вокруг точек: {str(int((_i + 1) / total * 100))}%", + task_name=task_name, + ) + change_status("Подсчет кол-ва ПВЗ вокруг точек завершен", task_name=task_name) + + def calculate_dist_for_groups(): + group_objects = models.RaschetGroups.objects.all() + group_total = group_objects.count() + if group_objects: + change_status("Начало расчета расстояний", task_name=task_name) + qs = models.PlacementPoint.objects.all() + for _k, g_o in enumerate(group_objects): + g = models.Post_and_pvzGroup.objects.get(id=g_o.obj_id) + for q in qs: + PointService.calculate_dist_for_group(point=q, group=g) + change_status( + f"Подсчет расстояний: {str(int((_k + 1) / group_total * 100))}%", + task_name=task_name, + ) + change_status("Расчет завершен", task_name=task_name) + + def process_raschet_objects(): + count_nearby_pvz() + calculate_dist_for_groups() + models.RaschetObjects.objects.all().delete() + models.RaschetGroups.objects.all().delete() + + print("start raschet") + # status, _ = models.TaskStatus.objects.get_or_create(task_name=STATUS_TASK_NAME) # Запуск ML - change_status('Шаг 3 из 3 (Прогноз трафика в точках).', task_name=task_name) - log_to_telegram(f'{table_name} start raschet') - try: - log_to_telegram('try connect to db') - conn = sqlalchemy.create_engine( - DB_URL, - connect_args={'options': '-csearch_path=public'}, - ) - except: - log_to_telegram('error connect to db') + process_raschet_objects() + + change_status("Шаг 3 из 3 (Прогноз трафика в точках).", task_name=task_name) + log(f"{table_name} start raschet") + + engine = get_engine() try: - query = text('select * from service_placementpoint') - connection = conn.connect() - pts = pd.read_sql(query, connection) - pts['geometry'] = pts['geometry'].apply(wkb.loads, hex=True) - pts = gpd.GeoDataFrame(pts, geometry='geometry', crs='epsg:4326') - pts = pts.to_crs('epsg:32637') + query = text("select * from service_placementpoint") + connection = engine.connect() + pts_df = pd.read_sql(query, connection) + pts_df.geometry = pts_df.geometry.apply(wkb.loads, hex=True) + pts = gpd.GeoDataFrame(pts_df, geometry="geometry", crs="epsg:4326") + pts = pts.to_crs("epsg:32637") feats = [ - 'id', 'metro_dist', 'target_dist', 'property_price_bargains', 'property_price_offers', - 'property_mean_floor', - 'property_era', 'flats_cnt', 'popul_home', 'popul_job', 'yndxfood_sum', - 'yndxfood_cnt', 'school_cnt', 'kindergar_cnt', 'target_post_cnt', 'public_stop_cnt', 'sport_center_cnt', - 'pharmacy_cnt', 'supermarket_cnt', 'supermarket_premium_cnt', 'clinic_cnt', 'bank_cnt', 'reca_cnt', - 'lab_cnt', 'culture_cnt', 'attraction_cnt', 'mfc_cnt', 'bc_cnt', 'tc_cnt', 'rival_pvz_cnt', - 'rival_post_cnt', - 'business_activity', 'age_day', 'target_cnt_ao_mean' + "id", + "metro_dist", + "target_dist", + "property_price_bargains", + "property_price_offers", + "property_mean_floor", + "property_era", + "flats_cnt", + "popul_home", + "popul_job", + "yndxfood_sum", + "yndxfood_cnt", + "school_cnt", + "kindergar_cnt", + "target_post_cnt", + "public_stop_cnt", + "sport_center_cnt", + "pharmacy_cnt", + "supermarket_cnt", + "supermarket_premium_cnt", + "clinic_cnt", + "bank_cnt", + "reca_cnt", + "lab_cnt", + "culture_cnt", + "attraction_cnt", + "mfc_cnt", + "bc_cnt", + "tc_cnt", + "rival_pvz_cnt", + "rival_post_cnt", + "business_activity", + "age_day", + "target_cnt_ao_mean", ] # Записи для обучения - pts_trn = pts.loc[pts.name == 'постамат Яндекс.Маркет из обучающей выборки'].reset_index(drop=True) - pts_trn = pts_trn.sort_values('address').reset_index(drop=True) - pts_trn = gpd.GeoDataFrame(pts_trn, geometry='geometry', crs='epsg:32637') - pts_target = pts_trn[['geometry']] - pts_target['cnt'] = 1 - pts_target = gpd.GeoDataFrame(pts_target, geometry='geometry', crs='epsg:32637') + pts_trn = pts.loc[pts.name == "постамат Яндекс.Маркет из обучающей выборки"].reset_index(drop=True) + pts_trn = pts_trn.sort_values("address").reset_index(drop=True) + pts_trn = gpd.GeoDataFrame(pts_trn, geometry="geometry", crs="epsg:32637") + pts_target = cast(gpd.GeoDataFrame, pts_trn[["geometry"]]) + pts_target["cnt"] = 1 + pts_target = gpd.GeoDataFrame(pts_target, geometry="geometry", crs="epsg:32637") target_feature_coords = [] for i in range(0, len(pts_target)): target_feature_coords.append((pts_target.geometry.x[i], pts_target.geometry.y[i])) target_feature_coords = np.array(target_feature_coords) - pts_trn['target_dist'] = pts_trn.apply( - lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[1])), + pts_trn["target_dist"] = pts_trn.apply( + lambda x: (sorted(distance.cdist([[x["geometry"].x, x["geometry"].y]], target_feature_coords)[0])[1]), axis=1, ) - pts_trn.loc[pts_trn.target_dist > 700, 'target_dist'] = 700 - - pts_trn['buf'] = pts_trn.buffer(500) - pts_trn = gpd.GeoDataFrame(pts_trn, geometry='buf', crs='epsg:32637') - target_post = gpd.sjoin(pts_trn, pts_target, op='contains').groupby('id', as_index=False).agg({'cnt': 'count'}) - target_post = target_post.rename(columns={'cnt': 'target_post_cnt'}) - pts_trn = pts_trn.drop(columns=['target_post_cnt']) - pts_trn = pts_trn.join(target_post.set_index('id'), on='id') - pts_trn['target_post_cnt'] = pts_trn['target_post_cnt'] - 1 - pts_trn = pts_trn.sort_values(by='id').reset_index(drop=True) - X_trn = pts_trn[feats].drop(columns=['id']) - Y_trn = pts_trn[['fact']] + pts_trn.loc[pts_trn.target_dist > 700, "target_dist"] = 700 + + pts_trn["buf"] = pts_trn.buffer(500) + pts_trn = gpd.GeoDataFrame(pts_trn, geometry="buf", crs="epsg:32637") + target_post = gpd.sjoin(pts_trn, pts_target, op="contains").groupby("id", as_index=False).agg({"cnt": "count"}) + target_post = target_post.rename(columns={"cnt": "target_post_cnt"}) + pts_trn = pts_trn.drop(columns=["target_post_cnt"]) + pts_trn = pts_trn.join(target_post.set_index("id"), on="id") + pts_trn["target_post_cnt"] = pts_trn.target_post_cnt - 1 + pts_trn = pts_trn.sort_values(by="id").reset_index(drop=True) + X_trn = pts_trn[feats].drop(columns=["id"]) + Y_trn = pts_trn[["fact"]] # status.status = 'Записи для инференса' # status.save() - #change_status('Записи для инференса', task_name=task_name) + # change_status('Записи для инференса', task_name=task_name) # Записи для инференса - if table_name == 'service_placementpoint': - pts_inf = pts.loc[(pts.status == 'Pending') | - (pts.status == 'Installation') | - (pts.status == 'Cancelled') | - ((pts.status == 'Working') & (pts.sample_trn == False))].reset_index(drop=True) - elif table_name == 'service_preplacementpoint': - pts_inf = pd.DataFrame(conn.connect().execute(text(f"select * from {table_name}"))) - pts_inf = pts_inf.loc[pts_inf.matching_status == 'New'].reset_index(drop=True) - pts_inf['geometry'] = pts_inf['geometry'].apply(wkb.loads, hex=True) - pts_inf = gpd.GeoDataFrame(pts_inf, geometry='geometry', crs='epsg:4326') - if len(pts_inf) > 0: - pts_inf = pts_inf.to_crs('epsg:32637') - - pts_inf['buf'] = pts_inf.buffer(500) - pts_inf = gpd.GeoDataFrame(pts_inf, geometry='buf', crs='epsg:32637') - pts_target = pts.loc[(pts.status == 'Working') | - (pts.status == 'Installation')].reset_index(drop=True) - pts_target = pts_target[['geometry']] - pts_target['cnt'] = 1 - pts_target = gpd.GeoDataFrame(pts_target, geometry='geometry', crs='epsg:32637') - - target_feature_coords = [] - for i in range(0, len(pts_target)): - target_feature_coords.append((pts_target.geometry.x[i], pts_target.geometry.y[i])) - target_feature_coords = np.array(target_feature_coords) - - pts_inf['target_dist'] = pts_inf.apply( - lambda x: ( - (sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[1]) if ( - (x.status == 'Working') or (x.status == 'Installation')) else - (sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[0])), - axis=1, + if table_name == "service_placementpoint": + pts_inf = pts.loc[ + (pts.status == "Pending") + | (pts.status == "Installation") + | (pts.status == "Cancelled") + | ((pts.status == "Working") & (pts.sample_trn == False)) + ].reset_index(drop=True) + elif table_name == "service_preplacementpoint": + pts_inf_df = pd.DataFrame(engine.connect().execute(text(f"select * from {table_name}"))) + pts_inf_df = pts_inf_df.loc[pts_inf_df.matching_status == "New"].reset_index(drop=True) + pts_inf_df["geometry"] = pts_inf_df["geometry"].apply(wkb.loads, hex=True) + pts_inf = gpd.GeoDataFrame(pts_inf_df, geometry="geometry", crs="epsg:4326") + else: + raise ValueError("Unexpected `table_name` parameter") + + if not len(pts_inf): + raise Exception("Empty point set") + + pts_inf: gpd.GeoDataFrame = pts_inf.to_crs("epsg:32637") + + pts_inf["buf"] = pts_inf.buffer(500) + pts_inf = gpd.GeoDataFrame(pts_inf, geometry="buf", crs="epsg:32637") + pts_target = pts.loc[(pts.status == "Working") | (pts.status == "Installation")].reset_index(drop=True) + pts_target = pts_target[["geometry"]] + pts_target["cnt"] = 1 + pts_target = gpd.GeoDataFrame(pts_target, geometry="geometry", crs="epsg:32637") + + target_feature_coords = [] + for i in range(0, len(pts_target)): + target_feature_coords.append((pts_target.geometry.x[i], pts_target.geometry.y[i])) + target_feature_coords = np.array(target_feature_coords) + + def dist_working(x): + def cdist_(x): + return distance.cdist([[x["geometry"].x, x["geometry"].y]], target_feature_coords)[0] + + return ( + sorted(cdist_(x))[1] + if x.status in {"Working", "Installation"} + else sorted(cdist_(x))[0] ) - pts_inf.loc[pts_inf.target_dist > 700, 'target_dist'] = 700 - - pts_inf = pts_inf.sort_values(by='id').reset_index(drop=True) - target_post = gpd.sjoin(pts_inf, pts_target, op='contains').groupby('id', as_index=False).agg( - {'cnt': 'count'}) - target_post = target_post.rename(columns={'cnt': 'target_post_cnt'}) - pts_inf = pts_inf.drop(columns=['target_post_cnt']) - pts_inf = pts_inf.join(target_post.set_index('id'), on='id') - pts_inf['target_post_cnt'] = pts_inf['target_post_cnt'].fillna(0) - pts_inf['target_post_cnt'] = pts_inf.apply(lambda x: ((x.target_post_cnt - 1) if ( - (x.status == 'Working') or (x.status == 'Installation')) else x.target_post_cnt), axis=1) - pts_inf['age_day_init'] = pts_inf['age_day'] - pts_inf['age_day'] = 240 - X_inf = pts_inf[feats] - - seeds = [99, 87, 21, 15] - - # Обучение, инференс - r2_scores = [] - mapes = [] - y_infers = [] - # status.status = 'Обучение inference 0%' + + pts_inf["target_dist"] = pts_inf.apply(dist_working, axis=1) + pts_inf.loc[pts_inf.target_dist > 700, "target_dist"] = 700 + + pts_inf = pts_inf.sort_values(by="id").reset_index(drop=True) # type: ignore + target_post = gpd.sjoin(pts_inf, pts_target, op="contains").groupby("id", as_index=False).agg({"cnt": "count"}) + target_post = target_post.rename(columns={"cnt": "target_post_cnt"}) + pts_inf = pts_inf.drop(columns=["target_post_cnt"]) + pts_inf = pts_inf.join(target_post.set_index("id"), on="id") + pts_inf.target_post_cnt = pts_inf.target_post_cnt.fillna(0) + pts_inf.target_post_cnt = pts_inf.apply( + lambda x: ( + (x.target_post_cnt - 1) + if ((x.status == "Working") or (x.status == "Installation")) + else x.target_post_cnt + ), + axis=1, + ) + pts_inf["age_day_init"] = pts_inf["age_day"] + pts_inf["age_day"] = 240 + X_inf = pts_inf[feats].dropna() + + seeds = [99, 87, 21, 15] + + # Обучение, инференс + r2_scores = [] + mapes = [] + x_inf: gpd.GeoDataFrame = X_inf.drop(columns=["id"]) # type: ignore + y_infers = [] + # status.status = 'Обучение inference 0%' + # status.save() + # change_status('Обучение inference 0%', task_name=task_name) + model = catboost.CatBoostRegressor() + for i in seeds: + # status.status = 'Обучение inference: ' + str(int((seeds.index(i) + 1) / len(seeds) * 100)) + '%' # status.save() - #change_status('Обучение inference 0%', task_name=task_name) - for i in seeds: - # status.status = 'Обучение inference: ' + str(int((seeds.index(i) + 1) / len(seeds) * 100)) + '%' - # status.save() - #change_status(f'Обучение inference: {str(int((seeds.index(i) + 1) / len(seeds) * 100))}%', - #task_name=task_name) - x_trn, x_test, y_trn, y_test = ms.train_test_split(X_trn, Y_trn, test_size=0.2, random_state=i) - model = catboost.CatBoostRegressor(cat_features=['property_era'], random_state=i) - model.fit(x_trn, y_trn, verbose=False) - r2_score = metrics.r2_score(y_test, model.predict(x_test)) - mape = metrics.mean_absolute_percentage_error(y_test, model.predict(x_test)) - if ((r2_score > 0.45) & (mape < 0.25)): - r2_scores.append(r2_score) - mapes.append(mape) - y_infers.append(model.predict(X_inf.drop(columns=['id']))) - #change_status('Обучение inference 100%', task_name=task_name) - # status.status = 'Обучение inference 100%' - current_pred = sum(y_infers) / 4 - - # расчет шапов - explainer = shap.TreeExplainer(model) - shap_values = explainer(X_inf.drop(columns=['id'])) - shap_fields = pd.DataFrame(shap_values.values) - shap_fields.columns = X_inf.drop(columns=['id']).columns + '_shap' - shap_fields = shap_fields.drop(columns=['age_day_shap']) - shap_fields['sum'] = abs(shap_fields).sum(axis=1) - shap_fields = round(shap_fields.iloc[:, :32].div(shap_fields['sum'], axis=0) * 100, 2) - - # Обновление полей по результатам работы модели - update_fields = pts_inf[ - [ - 'id', 'age_day_init', 'status', 'fact', 'delta_current', 'delta_first', 'plan_current', - 'plan_first', - 'prediction_first', 'target_post_cnt', 'target_dist' - ] + # change_status(f'Обучение inference: {str(int((seeds.index(i) + 1) / len(seeds) * 100))}%', + # task_name=task_name) + x_trn, x_test, y_trn, y_test = ms.train_test_split(X_trn, Y_trn, test_size=0.2, random_state=i) + model = catboost.CatBoostRegressor(cat_features=["property_era"], random_state=i) + model.fit(x_trn, y_trn, verbose=False) + + r2_score = metrics.r2_score(y_test, model.predict(x_test)) + mape = metrics.mean_absolute_percentage_error(y_test, model.predict(x_test)) + if (r2_score > 0.45) & (mape < 0.25): + r2_scores.append(r2_score) + mapes.append(mape) + y_infers.append(model.predict(x_inf)) + # change_status('Обучение inference 100%', task_name=task_name) + # status.status = 'Обучение inference 100%' + current_pred = sum(y_infers) / 4 + + # расчет шапов + explainer = shap.TreeExplainer(model) + shap_values = explainer(x_inf) + shap_fields = pd.DataFrame(shap_values.values) + shap_fields.columns = x_inf.columns + "_shap" + shap_fields = shap_fields.drop(columns=["age_day_shap"]) + shap_fields["sum"] = abs(shap_fields).sum(axis=1) + shap_fields = round(shap_fields.iloc[:, :32].div(shap_fields["sum"], axis=0) * 100, 2) + + # Обновление полей по результатам работы модели + update_fields = pts_inf[ + [ + "id", + "age_day_init", + "status", + "fact", + "delta_current", + "delta_first", + "plan_current", + "plan_first", + "prediction_first", + "target_post_cnt", + "target_dist", ] - update_fields = update_fields.join( - pd.concat( - [ - X_inf[['id']], - pd.DataFrame({'prediction_current': current_pred}), - ], - axis=1, - ).set_index('id'), - on='id', - ) - update_fields['prediction_current'] = update_fields['prediction_current'].astype(int) + ].dropna() + update_fields = update_fields.join( + pd.concat( + [ + X_inf[["id"]], + pd.DataFrame({"prediction_current": current_pred}), + ], + axis=1, + ).set_index("id"), + on="id", + ).dropna() + update_fields["prediction_current"] = update_fields["prediction_current"].astype(int) - days_x = np.array([0, 30, 60, 90, 120, 150, 180, 210, 240, 270]) - perc_y = np.array([0, 0.15, 0.20, 0.30, 0.60, 0.70, 0.70, 0.75, 0.75, 0.80]) - spl = interpolate.splrep(days_x, perc_y) + days_x = np.array([0, 30, 60, 90, 120, 150, 180, 210, 240, 270]) + perc_y = np.array([0, 0.15, 0.20, 0.30, 0.60, 0.70, 0.70, 0.75, 0.75, 0.80]) + spl = interpolate.splrep(days_x, perc_y) - update_fields['plan_first'] = update_fields.apply( - lambda x: (x.prediction_first * interpolate.splev(x.age_day_init, spl) if x.status == 'Working' else 0), - axis=1, - ) - update_fields['plan_current'] = update_fields.apply( - lambda x: ( - x.prediction_current * interpolate.splev(x.age_day_init, spl) if x.status == 'Working' else 0), - axis=1, - ) - update_fields['delta_first'] = update_fields.apply( - lambda x: ((x.fact - x.plan_first) / x.plan_first * 100 if x.status == 'Working' else 0), - axis=1, - ) - update_fields['delta_current'] = update_fields.apply( - lambda x: ((x.fact - x.plan_current) / x.plan_current * 100 if x.status == 'Working' else 0), - axis=1, - ) + update_fields["plan_first"] = update_fields.apply( + lambda x: (x.prediction_first * interpolate.splev(x.age_day_init, spl) if x.status == "Working" else 0), + axis=1, + ) + update_fields["plan_current"] = update_fields.apply( + lambda x: (x.prediction_current * interpolate.splev(x.age_day_init, spl) if x.status == "Working" else 0), + axis=1, + ) + update_fields["delta_first"] = update_fields.apply( + lambda x: ((x.fact - x.plan_first) / x.plan_first * 100 if x.status == "Working" else 0), + axis=1, + ) + update_fields["delta_current"] = update_fields.apply( + lambda x: ((x.fact - x.plan_current) / x.plan_current * 100 if x.status == "Working" else 0), + axis=1, + ) - update_fields_working = update_fields.loc[update_fields.status == 'Working'].reset_index(drop=True) - update_fields_working = update_fields_working.fillna(0) - connection.close() + update_fields_working = update_fields.loc[update_fields.status == "Working"].reset_index(drop=True) + update_fields_working = update_fields_working.fillna(0) + connection.close() except Exception as e: - log_to_telegram(f'Ошибка при обновлении полей в базе данных: {e}') - log_to_telegram('Начинается обновление полей в базе') + log(f"Ошибка при обновлении полей в базе данных: {e}") + raise + log("Начинается обновление полей в базе") if len(pts_inf) > 0: - # status.status = 'Перерасчет ML: 50%' # status.save() - #change_status('Перерасчет ML: 50%', task_name=task_name) + # change_status('Перерасчет ML: 50%', task_name=task_name) # Загрузка в базу обновленных значений try: - log_to_telegram('Подключение к базе данных 2') + log("Подключение к базе данных 2") conn2 = psycopg2.connect( - database=os.getenv('POSTGRES_DB', 'postgres'), user=os.getenv('POSTGRES_USER', 'postgres'), - password=os.getenv('POSTGRES_PASSWORD', 'postgres'), - host=os.getenv('POSTGRES_HOST', 'postgres'), port=os.getenv('POSTGRES_PORT', 'postgres'), - options='-c search_path=public', + database=os.getenv("POSTGRES_DB", "postgres"), + user=os.getenv("POSTGRES_USER", "postgres"), + password=os.getenv("POSTGRES_PASSWORD", "postgres"), + host=os.getenv("POSTGRES_HOST", "postgres"), + port=os.getenv("POSTGRES_PORT", "postgres"), + options="-c search_path=public", ) cursor = conn2.cursor() except: conn2 = None - log_to_telegram('Не удалось подключиться к базе данных') + log("Не удалось подключиться к базе данных") + raise if conn2 is not None: # апдейт шапов - update_fields_shap = pd.concat([shap_fields, update_fields[['id']]], axis=1) + update_fields_shap = pd.concat([shap_fields, update_fields[["id"]]], axis=1).dropna() update_records0 = [] - for i in range(0, len(update_fields_shap)): + for i in update_fields_shap.id.keys(): update_records1 = [] for n in list(update_fields_shap): update_records1.append(int(update_fields_shap[n][i])) update_records0.append(tuple(update_records1)) - shap_fields_name = str(list(shap_fields))[1:-1].replace("'", "").replace(',', '=%s,') + shap_fields_name = str(list(shap_fields))[1:-1].replace("'", "").replace(",", "=%s,") sql_update_query = f"""Update {table_name} set {shap_fields_name} = %s where id = %s""" try: - psycopg2.extras.execute_batch(cursor, sql_update_query, update_records0) + psycopg2.extras.execute_batch(cursor, sql_update_query, update_records0) # это тоже долго conn2.commit() except: cursor.execute("ROLLBACK") @@ -291,7 +369,7 @@ def raschet(table_name='service_placementpoint', need_time=True, task_name=STATU # target_post_cnt update_records1 = [] - for i in range(0, len(update_fields)): + for i in update_fields.id.keys(): update_records1.append((int(update_fields.target_post_cnt[i]), int(update_fields.id[i]))) sql_update_query = f"""Update {table_name} set target_post_cnt = %s where id = %s""" try: @@ -304,7 +382,7 @@ def raschet(table_name='service_placementpoint', need_time=True, task_name=STATU # target_dist update_records1 = [] - for i in range(0, len(update_fields)): + for i in update_fields.id.keys(): update_records1.append((int(update_fields.target_dist[i]), int(update_fields.id[i]))) sql_update_query = f"""Update {table_name} set target_dist = %s where id = %s""" try: @@ -317,129 +395,165 @@ def raschet(table_name='service_placementpoint', need_time=True, task_name=STATU # prediction_current update_records1 = [] - for i in range(0, len(update_fields)): + for i in update_fields.id.keys(): update_records1.append((int(update_fields.prediction_current[i]), int(update_fields.id[i]))) sql_update_query = f"""Update {table_name} set prediction_current = %s where id = %s""" try: psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1) conn2.commit() except Exception: - cursor.execute('ROLLBACK') + cursor.execute("ROLLBACK") psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1) conn2.commit() - # plan_first + # plan_first вот это очень долго update_records2 = [] - for i in range(0, len(update_fields_working)): - update_records2.append((int(update_fields_working.plan_first[i]), int(update_fields_working.id[i]))) + for i in update_fields_working.id.keys(): + update_records2.append( + ( + int(update_fields_working.plan_first[i]), + int(update_fields_working.id[i]), + ) + ) sql_update_query = f"""Update {table_name} set plan_first = %s where id = %s""" try: psycopg2.extras.execute_batch(cursor, sql_update_query, update_records2) conn2.commit() except Exception: - cursor.execute('ROLLBACK') + cursor.execute("ROLLBACK") psycopg2.extras.execute_batch(cursor, sql_update_query, update_records2) conn2.commit() # plan_current update_records3 = [] - for i in range(0, len(update_fields_working)): - update_records3.append((int(update_fields_working.plan_current[i]), int(update_fields_working.id[i]))) + for i in update_fields_working.id.keys(): + update_records3.append( + ( + int(update_fields_working.plan_current[i]), + int(update_fields_working.id[i]), + ) + ) sql_update_query = f"""Update {table_name} set plan_current = %s where id = %s""" try: psycopg2.extras.execute_batch(cursor, sql_update_query, update_records3) conn2.commit() except Exception: - cursor.execute('ROLLBACK') + cursor.execute("ROLLBACK") psycopg2.extras.execute_batch(cursor, sql_update_query, update_records3) conn2.commit() # delta_first update_records4 = [] - for i in range(0, len(update_fields_working)): - update_records4.append((int(update_fields_working.delta_first[i]), int(update_fields_working.id[i]))) + for i in update_fields_working.id.keys(): + update_records4.append( + ( + int(update_fields_working.delta_first[i]), + int(update_fields_working.id[i]), + ) + ) sql_update_query = f"""Update {table_name} set delta_first = %s where id = %s""" try: psycopg2.extras.execute_batch(cursor, sql_update_query, update_records4) conn2.commit() except Exception: - cursor.execute('ROLLBACK') + cursor.execute("ROLLBACK") psycopg2.extras.execute_batch(cursor, sql_update_query, update_records4) conn2.commit() # delta_current update_records5 = [] - for i in range(0, len(update_fields_working)): - update_records5.append((int(update_fields_working.delta_current[i]), int(update_fields_working.id[i]))) + for i in update_fields_working.id.keys(): + update_records5.append( + ( + int(update_fields_working.delta_current[i]), + int(update_fields_working.id[i]), + ) + ) sql_update_query = f"""Update {table_name} set delta_current = %s where id = %s""" try: psycopg2.extras.execute_batch(cursor, sql_update_query, update_records5) conn2.commit() except Exception: - cursor.execute('ROLLBACK') + cursor.execute("ROLLBACK") psycopg2.extras.execute_batch(cursor, sql_update_query, update_records5) conn2.commit() conn2.close() cache.clear() else: - log_to_telegram('len(pts_inf) <= 0') + log("len(pts_inf) <= 0") run_psql_command() - log_to_telegram('end raschet') + log("end raschet") # status.status = 'Перерасчет ML завершен' # status.save() - change_status('Перерасчет ML завершен', task_name=task_name) + change_status("Перерасчет ML завершен", task_name=task_name) if need_time: LastMLCall.objects.all().delete() LastMLCall.objects.create() +@shared_task() +def raschet(table_name="service_placementpoint", need_time=True, task_name=STATUS_TASK_NAME): + try: + do_computations(table_name, need_time, task_name) + finally: + change_status("Перерасчет ML завершен", task_name=task_name) + + @shared_task def load_post_and_pvz(obj_id: int): file = models.TempFiles.objects.get(id=obj_id) - status, _ = models.TaskStatus.objects.get_or_create(task_name='Загрузка ПВЗ и Постаматов') + status, _ = models.TaskStatus.objects.get_or_create(task_name="Загрузка ПВЗ и Постаматов") excel_file = base64.b64decode(file.data) df = pd.read_excel(excel_file) df = df.replace(np.nan, None) - df = df.replace('NaT', None) + df = df.replace("NaT", None) df.columns = df.columns.str.lower() data_len = df.shape[0] - for _ind, row in enumerate(df.to_dict('records')): + for _ind, row in enumerate(df.to_dict("records")): status.status = "Загрузка данных: " + str(int(_ind / data_len * 100)) + "%" status.save() - category = row.get('category') - group = row.get('group') + category = row.get("category") + group = row.get("group") if category: cat, _ = models.Post_and_pvzCategory.objects.get_or_create(name=category) if group: gr, _ = models.Post_and_pvzGroup.objects.get_or_create(name=group, category=cat) - row['category'] = cat - row['group'] = gr - lon = str(row.pop('lon')) + row["category"] = cat + row["group"] = gr + lon = str(row.pop("lon")) lat = str(row.pop("lat")) - row['wkt'] = "POINT(" + lon + " " + lat + ")" + row["wkt"] = "POINT(" + lon + " " + lat + ")" models.Post_and_pvz.objects.get_or_create(**row) status.status = "Загрузка данных завершена" status.save() - groups = df[['group', 'category']].drop_duplicates().to_dict(orient='records') + groups = df[["group", "category"]].drop_duplicates().to_dict(orient="records") points = models.PlacementPoint.objects.all() num_points = points.count() total = len(groups) * num_points for _i, gr in enumerate(groups): - group = models.Post_and_pvzGroup.objects.get(name=gr['group'], category__name=gr['category']) + group = models.Post_and_pvzGroup.objects.get(name=gr["group"], category__name=gr["category"]) for _j, point in enumerate(points): status.status = "Подсчет расстояний: " + str(int((num_points * _i + _j) / total * 100)) + "%" status.save() - post_object = models.Post_and_pvz.objects.filter(group__id=group.id).annotate( - distance=Distance("wkt", point.geometry)).order_by('distance').first() - d = models.PlacementPointPVZDistance.objects.filter(placement_point=point, - pvz_postamates_group=group).first() + post_object = ( + models.Post_and_pvz.objects.filter(group__id=group.id) + .annotate(distance=Distance("wkt", point.geometry)) + .order_by("distance") + .first() + ) + d = models.PlacementPointPVZDistance.objects.filter( + placement_point=point, pvz_postamates_group=group + ).first() if d: if d.dist > post_object.distance.m: d.dist = post_object.distance.m d.save() else: - models.PlacementPointPVZDistance.objects.create(placement_point=point, pvz_postamates_group=group, - dist=post_object.distance.m) + models.PlacementPointPVZDistance.objects.create( + placement_point=point, + pvz_postamates_group=group, + dist=post_object.distance.m, + ) status.status = "Подсчет расстояний завершен" status.save() point_qs = models.PlacementPoint.objects.all() @@ -456,9 +570,9 @@ def load_post_and_pvz(obj_id: int): @shared_task() def add_age_day(): - qs = PlacementPoint.objects.filter(status='Working') + qs = PlacementPoint.objects.filter(status="Working") # c1 = qs.filter(sample_trn=True).count() - qs.update(age_day=F('age_day') + 1) + qs.update(age_day=F("age_day") + 1) qs2 = qs.filter(age_day__gt=AGE_DAY_LIMIT) qs2.update(sample_trn=True) # c2 = PlacementPoint.objects.filter(sample_trn=True).count() @@ -469,27 +583,27 @@ def add_age_day(): @shared_task() def load_other_objects(obj_id: int): file = models.TempFiles.objects.get(id=obj_id) - status, _ = models.TaskStatus.objects.get_or_create(task_name='Загрузка Прочих объектов') + status, _ = models.TaskStatus.objects.get_or_create(task_name="Загрузка Прочих объектов") excel_file = base64.b64decode(file.data) df = pd.read_excel(excel_file) df = df.replace(np.nan, None) - df = df.replace('NaT', None) + df = df.replace("NaT", None) df.columns = df.columns.str.lower() data_len = df.shape[0] - for _ind, row in enumerate(df.to_dict('records')): + for _ind, row in enumerate(df.to_dict("records")): status.status = "Загрузка данных: " + str(int(_ind / data_len * 100)) + "%" status.save() - category = row.get('category') - group = row.get('group') + category = row.get("category") + group = row.get("group") if category: cat, _ = models.OtherObjectsCategory.objects.get_or_create(name=category) if group: gr, _ = models.OtherObjectsGroup.objects.get_or_create(name=group, category=cat) - row['category'] = cat - row['group'] = gr - lon = str(row.pop('lon')) + row["category"] = cat + row["group"] = gr + lon = str(row.pop("lon")) lat = str(row.pop("lat")) - row['wkt'] = "POINT(" + lon + " " + lat + ")" + row["wkt"] = "POINT(" + lon + " " + lat + ")" models.OtherObjects.objects.get_or_create(**row) status.status = "Загрузка данных завершена" cache.clear() @@ -499,23 +613,20 @@ def load_other_objects(obj_id: int): @shared_task() def load_data(obj_id: int): - status, _ = models.TaskStatus.objects.get_or_create(task_name='Загрузка Точек') + status, _ = models.TaskStatus.objects.get_or_create(task_name="Загрузка Точек") file = models.TempFiles.objects.get(id=obj_id) csv_file = base64.b64decode(file.data) models.PlacementPoint.objects.all().delete() - s = str(csv_file, 'utf-8') + s = str(csv_file, "utf-8") data = StringIO(s) - df = pd.read_csv(data, delimiter=';') + df = pd.read_csv(data, delimiter=";") df = df.replace(np.nan, None) - df = df.replace('NaT', None) + df = df.replace("NaT", None) data_len = df.shape[0] - for _ind, row in enumerate(df.to_dict('records')): + for _ind, row in enumerate(df.to_dict("records")): status.status = "Загрузка данных: " + str(int(_ind / data_len * 100)) + "%" status.save() - data = { - k: row[k] for k in row.keys() if - k not in ['id', 'location_id', 'area', 'district', 'age_month'] - } + data = {k: row[k] for k in row.keys() if k not in ["id", "location_id", "area", "district", "age_month"]} models.PlacementPoint.objects.create(**data) status.status = "Загрузка данных завершена" status.save() @@ -526,4 +637,4 @@ def load_data(obj_id: int): def import_task(file_id): PointService().start_mathing(file_id) PointService().make_enrichment() - raschet('service_preplacementpoint', need_time=False, task_name=STATUS_TASK_NAME_IMPORT) + # raschet('service_preplacementpoint', need_time=False, task_name=STATUS_TASK_NAME_IMPORT) diff --git a/service/utils.py b/service/utils.py index 186bdbb..fe73a79 100644 --- a/service/utils.py +++ b/service/utils.py @@ -88,9 +88,8 @@ def load_dist(filepath: str): models.PointDist.objects.create(**row) -def log_to_telegram(msg): - requests.post('https://api.telegram.org/bot6275517704:AAHVp_qv9d9NU740JJdOM2fJdgS4r1AgJrw/sendMessage', - json={"chat_id": "-555238820", "text": str(settings.DOMAIN) + '\n' + msg}) +def log(msg): + print(msg) def cached_func(key, func, timeout=settings.CACHE_TIMEOUT, *args, **kwargs): diff --git a/service/views.py b/service/views.py index 41a5d46..a3cc2d2 100644 --- a/service/views.py +++ b/service/views.py @@ -343,7 +343,7 @@ class PlacementPointViewSet(ReadOnlyModelViewSet): @action(detail=False, methods=['get']) def start(self, request): - raschet.delay() + # raschet.delay() return Response('Sucess', status=HTTPStatus.OK) @action(detail=False, methods=['get']) @@ -526,10 +526,11 @@ def upload_houses(request): @api_view(['GET']) @permission_classes([UserPermission]) def get_current_user(request): - kk_profile = request.auth - kk_roles = kk_profile.get('realm_access', {}).get('roles', []) + # kk_profile = request.auth + # kk_roles = kk_profile.get('realm_access', {}).get('roles', []) return JsonResponse( - {'groups': kk_roles, 'username': kk_profile.get('preferred_username')}, + # {'groups': kk_roles, 'username': kk_profile.get('preferred_username')}, + {'groups': [], 'username': 'AnonymousUser'}, )