From 2b09cae4cf69477fd3313a0c500c8d1e349bf865 Mon Sep 17 00:00:00 2001 From: timofejmalinin Date: Tue, 17 Oct 2023 16:40:11 +0700 Subject: [PATCH] Big fixes --- deploy/django.yml | 2 +- postamates/settings.py | 1 + .../management/commands/create_procedures.py | 11 +++- service/management/commands/kill_update.py | 12 ++++ service/migrations/0040_taskstatus_data.py | 18 +++++ service/models.py | 2 + service/service.py | 20 ++++-- service/tasks.py | 65 ++++++++++--------- service/utils.py | 6 ++ service/views.py | 21 ++++-- 10 files changed, 116 insertions(+), 42 deletions(-) create mode 100644 service/management/commands/kill_update.py create mode 100644 service/migrations/0040_taskstatus_data.py diff --git a/deploy/django.yml b/deploy/django.yml index 1b60a89..e6b0a6a 100644 --- a/deploy/django.yml +++ b/deploy/django.yml @@ -19,7 +19,7 @@ spec: containers: - name: django image: DEPLOY_IMAGE_TAG - command: ["sh", "-c", "python manage.py delete_views && python manage.py create_procedures && python manage.py migrate && python manage.py create_views && python manage.py runserver 0.0.0.0:${DJANGO_PORT}"] + command: ["sh", "-c", "python manage.py delete_views && python manage.py create_procedures && python manage.py migrate && python manage.py create_views && python manage.py kill_update && python manage.py runserver 0.0.0.0:${DJANGO_PORT}"] ports: - containerPort: 8000 name: django-port diff --git a/postamates/settings.py b/postamates/settings.py index 1a878c2..fcf0e4f 100644 --- a/postamates/settings.py +++ b/postamates/settings.py @@ -197,3 +197,4 @@ JSON_EXPORT_FILENAME = 'placement_points.json' DATA_UPLOAD_MAX_NUMBER_FIELDS = None GEOCODER_API_KEY = os.getenv('GEOCODER_API_KEY','TzgdKWgyI2nfaz1WHRD-aYJK4e400MiOJQP7Enf1e1M') STATUS_TASK_NAME='status_task' +STATUS_TASK_NAME_IMPORT='import_status_task' diff --git a/service/management/commands/create_procedures.py b/service/management/commands/create_procedures.py index 30fcff4..f80b587 100644 --- a/service/management/commands/create_procedures.py +++ b/service/management/commands/create_procedures.py @@ -4,13 +4,14 @@ from service.utils import run_sql_command, log_to_telegram CMD_PIVOT_DIST = """CREATE OR REPLACE VIEW compact_placementpoint AS SELECT id, status, category, age_day, fact, area_id, district_id, prediction_first, prediction_current, doors, flat_cnt, rival_post_cnt, rival_pvz_cnt, target_post_cnt, flats_cnt, tc_cnt, culture_cnt, mfc_cnt, public_stop_cnt, supermarket_cnt, target_dist, metro_dist, geometry FROM service_placementpoint; CREATE OR REPLACE procedure pivot_dist() ---RETURNS SET OF record AS $BODY$ DECLARE columnNames TEXT; BEGIN DROP MATERIALIZED VIEW IF EXISTS points_with_dist; SELECT 'placement_point_id bigint, ' || string_agg(c, ', ') FROM (SELECT distinct pvz_postamates_group_id, 'd' || pvz_postamates_group_id || ' double precision' as c from service_placementpointpvzdistance order by 1) as asd INTO columnNames; +IF columnNames IS NOT NULL +THEN EXECUTE format('CREATE MATERIALIZED VIEW points_with_dist AS SELECT * FROM CROSSTAB( $$ @@ -21,6 +22,9 @@ FROM CROSSTAB( ) AS ct(%s) LEFT JOIN compact_placementpoint ON placement_point_id=id' ,columnNames); +ELSE +CREATE MATERIALIZED VIEW points_with_dist AS SELECT placement_point_id, compact_placementpoint.id, status, category, age_day, fact, area_id, district_id, prediction_first, prediction_current, doors, flat_cnt, rival_post_cnt, rival_pvz_cnt, target_post_cnt, flats_cnt, tc_cnt, culture_cnt, mfc_cnt, public_stop_cnt, supermarket_cnt, target_dist, metro_dist, geometry FROM service_placementpointpvzdistance LEFT JOIN compact_placementpoint ON placement_point_id=compact_placementpoint.id; +END IF; END; $BODY$ LANGUAGE plpgsql; @@ -35,6 +39,8 @@ BEGIN DROP MATERIALIZED VIEW IF EXISTS prepoints_with_dist; SELECT 'preplacement_point_id bigint, ' || string_agg(c, ', ') FROM (SELECT distinct pvz_postamates_group_id, 'd' || pvz_postamates_group_id || ' double precision' as c from service_preplacementpointpvzdistance order by 1) as asd INTO columnNames; +IF columnNames IS NOT NULL +THEN EXECUTE format('CREATE MATERIALIZED VIEW prepoints_with_dist AS SELECT * FROM CROSSTAB( $$ @@ -45,6 +51,9 @@ FROM CROSSTAB( ) AS ct(%s) LEFT JOIN compact_preplacementpoint ON preplacement_point_id=id' ,columnNames); +ELSE +CREATE MATERIALIZED VIEW prepoints_with_dist AS SELECT placement_point_id, compact_preplacementpoint.id, status, category, age_day, fact, area_id, district_id, prediction_first, prediction_current, doors, flat_cnt, rival_post_cnt, rival_pvz_cnt, target_post_cnt, flats_cnt, tc_cnt, culture_cnt, mfc_cnt, public_stop_cnt, supermarket_cnt, target_dist, metro_dist, geometry FROM service_preplacementpointpvzdistance LEFT JOIN compact_preplacementpoint ON placement_point_id=compact_preplacementpoint.id; +END IF; END; $BODY$ LANGUAGE plpgsql; diff --git a/service/management/commands/kill_update.py b/service/management/commands/kill_update.py new file mode 100644 index 0000000..b811c46 --- /dev/null +++ b/service/management/commands/kill_update.py @@ -0,0 +1,12 @@ +from django.core.management.base import BaseCommand +from service.utils import run_sql_command, log_to_telegram +from service.models import TaskStatus +from postamates.settings import STATUS_TASK_NAME, STATUS_TASK_NAME_IMPORT + + +class Command(BaseCommand): + help = 'Kill update processes' + + def handle(self, *args, **kwargs): + TaskStatus.objects.filter(task_name=STATUS_TASK_NAME).update(status='Перерасчет ML завершен') + TaskStatus.objects.filter(task_name=STATUS_TASK_NAME_IMPORT).update(status='Перерасчет ML завершен') \ No newline at end of file diff --git a/service/migrations/0040_taskstatus_data.py b/service/migrations/0040_taskstatus_data.py new file mode 100644 index 0000000..cb93cc9 --- /dev/null +++ b/service/migrations/0040_taskstatus_data.py @@ -0,0 +1,18 @@ +# Generated by Django 3.2 on 2023-10-17 09:36 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('service', '0039_auto_20231011_2120'), + ] + + operations = [ + migrations.AddField( + model_name='taskstatus', + name='data', + field=models.JSONField(blank=True, default=dict, null=True, verbose_name='Данные'), + ), + ] diff --git a/service/models.py b/service/models.py index 578d0f4..e570d2b 100644 --- a/service/models.py +++ b/service/models.py @@ -262,6 +262,8 @@ class TaskStatus(models.Model): task_name = models.TextField(blank=False, unique=True, verbose_name='Название задачи') status = models.TextField(blank=True, null=True, verbose_name='Статус выполнения') + data = models.JSONField(blank=True, null=True, verbose_name='Данные', default=dict) + class LastMLCall(models.Model): diff --git a/service/service.py b/service/service.py index b7b5ecb..627aa5d 100644 --- a/service/service.py +++ b/service/service.py @@ -5,10 +5,10 @@ import pandas as pd from django.contrib.gis.measure import Distance from django.db.models import F -from postamates.settings import DEFAULT_PLACEMENT_POINT_UPDATE_RADIUS, AGE_DAY_LIMIT +from postamates.settings import DEFAULT_PLACEMENT_POINT_UPDATE_RADIUS, AGE_DAY_LIMIT, STATUS_TASK_NAME, STATUS_TASK_NAME_IMPORT from service import models from service.enums import PointStatus -from service.utils import create_columns_dist, run_psql_command +from service.utils import create_columns_dist, run_psql_command, change_status import base64 import requests from postamates.settings import GEOCODER_API_KEY @@ -27,7 +27,8 @@ class PointService: qs = self.get_point_by_id(point_id) qs.update(**{'postamat_id': postamat_id}) - def start_mathing(self, obj_id: int): + def start_mathing(self, obj_id: int, task_name=STATUS_TASK_NAME_IMPORT): + change_status('Шаг 1. Начинается мэтчинг точек', task_name) file = models.TempFiles.objects.get(id=obj_id) excel_file = base64.b64decode(file.data) df = pd.read_excel(excel_file) @@ -127,11 +128,18 @@ class PointService: matching_status=MatchingStatus.New.name, status=PointStatus.Pending.name, area=rayon, district=rayon.AO) - return total, matched, problem + change_status(f'Шаг 1. Обработано {matched + problem} из {total}', task_name) - def make_enrichment(self): + ts = models.TaskStatus.objects.get(task_name=task_name) + ts.data = {'total': total, 'matched': matched, 'error': problem, 'unmatched': total - matched - problem} + ts.save() + # return total, matched, problem + + def make_enrichment(self, task_name=STATUS_TASK_NAME_IMPORT): + change_status('Шаг 2. Начинается обогащение точек', task_name) points = models.PrePlacementPoint.objects.filter(matching_status=MatchingStatus.New.name).all() groups = models.Post_and_pvzGroup.objects.all() + for point in points: origin = point.geometry qs = models.PlacementPoint.objects.filter(status=PointStatus.Working.name).annotate( @@ -240,6 +248,8 @@ class PointService: point.save() for group in groups: self.calculate_dist_for_group(point, group, instance_type=models.PrePlacementPointPVZDistance) + change_status(f'Шаг 2. Обогащено {points.filter(matching_status=MatchingStatus.Matched.name).count()} из {points.count()}', task_name) + run_psql_command() @staticmethod diff --git a/service/tasks.py b/service/tasks.py index 5f80fca..77e0a8e 100644 --- a/service/tasks.py +++ b/service/tasks.py @@ -17,7 +17,7 @@ from sklearn import model_selection as ms from sqlalchemy import text from django.contrib.gis.db.models.functions import Distance from postamates.settings import AGE_DAY_LIMIT -from postamates.settings import DB_URL, STATUS_TASK_NAME +from postamates.settings import DB_URL, STATUS_TASK_NAME, STATUS_TASK_NAME_IMPORT from service.models import PlacementPoint, LastMLCall from service import models from service.utils import log_to_telegram @@ -26,44 +26,37 @@ from io import StringIO from django.core.cache import cache from service.layer_service import LayerService from service.service import PointService -from service.utils import run_psql_command +from service.utils import run_psql_command, change_status @shared_task() -def raschet(table_name='service_placementpoint', need_time=True): +def raschet(table_name='service_placementpoint', need_time=True, task_name=STATUS_TASK_NAME): print('start raschet') - status, _ = models.TaskStatus.objects.get_or_create(task_name=STATUS_TASK_NAME) + # status, _ = models.TaskStatus.objects.get_or_create(task_name=STATUS_TASK_NAME) raschet_objs = models.RaschetObjects.objects.all() if raschet_objs: - status.status = 'Начало расчета кол-ва ПВЗ вокруг точек' - status.save() + change_status('Начало расчета кол-ва ПВЗ вокруг точек', task_name=task_name) total = raschet_objs.count() for _i, r_o in enumerate(raschet_objs): obj = models.Post_and_pvz.objects.get(id=r_o.obj_id) LayerService().count_post_pvz_for_placementpoint(obj) - status.status = "Подсчет кол-ва ПВЗ вокруг точек: " + str(int((_i + 1) / total * 100)) + "%" - status.save() - status.status = 'Расчет кол-ва ПВЗ вокруг точек завершен' - status.save() + change_status(f'Подсчет кол-ва ПВЗ вокруг точек: {str(int((_i + 1) / total * 100))}%', task_name=task_name) + change_status('Подсчет кол-ва ПВЗ вокруг точек завершен', task_name=task_name) group_objects = models.RaschetGroups.objects.all() group_total = group_objects.count() if group_objects: - status.status = 'Начало расчета расстояний' - status.save() + change_status('Начало расчета расстояний', task_name=task_name) qs = models.PlacementPoint.objects.all() for _k, g_o in enumerate(group_objects): g = models.Post_and_pvzGroup.objects.get(id=g_o.obj_id) for q in qs: PointService.calculate_dist_for_group(point=q, group=g) - status.status = "Подсчет расстояний: " + str(int(_k / group_total * 100)) + "%" - status.save() - status.status = "Подсчет расстояний завершен" - status.save() + change_status(f'Подсчет расстояний: {str(int((_k + 1) / group_total * 100))}%', task_name=task_name) + change_status('Расчет завершен', task_name=task_name) models.RaschetObjects.objects.all().delete() models.RaschetGroups.objects.all().delete() # Запуск ML - status.status = 'Запуск ML' - status.save() + change_status('Запуск ML', task_name=task_name) log_to_telegram(f'{table_name} start raschet') try: log_to_telegram('try connect to db') @@ -122,8 +115,9 @@ def raschet(table_name='service_placementpoint', need_time=True): Y_trn = pts_trn[['fact']] - status.status = 'Записи для инференса' - status.save() + # status.status = 'Записи для инференса' + # status.save() + change_status('Записи для инференса', task_name=task_name) # Записи для инференса if table_name == 'service_placementpoint': pts_inf = pts.loc[(pts.status == 'Pending') | @@ -173,11 +167,13 @@ def raschet(table_name='service_placementpoint', need_time=True): r2_scores = [] mapes = [] y_infers = [] - status.status = 'Обучение inference 0%' - status.save() + # status.status = 'Обучение inference 0%' + # status.save() + change_status('Обучение inference 0%', task_name=task_name) for i in seeds: - status.status = 'Обучение inference: ' + str(int((seeds.index(i) + 1) / len(seeds) * 100)) + '%' - status.save() + # status.status = 'Обучение inference: ' + str(int((seeds.index(i) + 1) / len(seeds) * 100)) + '%' + # status.save() + change_status(f'Обучение inference: {str(int((seeds.index(i) + 1) / len(seeds) * 100))}%', task_name=task_name) x_trn, x_test, y_trn, y_test = ms.train_test_split(X_trn, Y_trn, test_size=0.2, random_state=i) model = catboost.CatBoostRegressor(cat_features=['property_era'], random_state=i) model.fit(x_trn, y_trn, verbose=False) @@ -187,7 +183,8 @@ def raschet(table_name='service_placementpoint', need_time=True): r2_scores.append(r2_score) mapes.append(mape) y_infers.append(model.predict(X_inf.drop(columns=['id']))) - status.status = 'Обучение inference 100%' + change_status('Обучение inference 100%', task_name=task_name) + # status.status = 'Обучение inference 100%' current_pred = sum(y_infers) / 5 # расчет шапов @@ -246,8 +243,10 @@ def raschet(table_name='service_placementpoint', need_time=True): log_to_telegram(f'Ошибка при обновлении полей в базе данных: {e}') log_to_telegram('Начинается обновление полей в базе') if len(pts_inf) > 0: - status.status = 'Перерасчет ML: 50%' - status.save() + + # status.status = 'Перерасчет ML: 50%' + # status.save() + change_status('Перерасчет ML: 50%', task_name=task_name) # Загрузка в базу обновленных значений try: log_to_telegram('Подключение к базе данных 2') @@ -377,8 +376,9 @@ def raschet(table_name='service_placementpoint', need_time=True): log_to_telegram('len(pts_inf) <= 0') run_psql_command() log_to_telegram('end raschet') - status.status = 'Перерасчет ML завершен' - status.save() + # status.status = 'Перерасчет ML завершен' + # status.save() + change_status('Перерасчет ML завершен', task_name=task_name) if need_time: LastMLCall.objects.all().delete() LastMLCall.objects.create() @@ -511,3 +511,10 @@ def load_data(obj_id: int): status.status = "Загрузка данных завершена" status.save() models.TempFiles.objects.all().delete() + + +@shared_task() +def import_task(file_id): + PointService().start_mathing(file_id) + PointService().make_enrichment() + raschet('service_preplacementpoint', need_time=False, task_name=STATUS_TASK_NAME_IMPORT) \ No newline at end of file diff --git a/service/utils.py b/service/utils.py index 3549510..946f4ab 100644 --- a/service/utils.py +++ b/service/utils.py @@ -15,10 +15,16 @@ from rest_framework.response import Response from rest_framework.viewsets import ReadOnlyModelViewSet from django.db.models import Avg, Min, Max +from postamates.settings import STATUS_TASK_NAME, STATUS_TASK_NAME_IMPORT import psycopg2 from postamates.settings import DB_URL +def change_status(status, task_name=STATUS_TASK_NAME): + ts, _ = models.TaskStatus.objects.get_or_create(task_name=task_name) + ts.status = status + ts.save() + def run_sql_command(command): connection = psycopg2.connect( diff --git a/service/views.py b/service/views.py index 60b97ef..417e5c8 100644 --- a/service/views.py +++ b/service/views.py @@ -14,7 +14,7 @@ from rest_framework.viewsets import ReadOnlyModelViewSet from postamates.settings import AGE_DAY_BORDER from postamates.settings import EXCEL_EXPORT_FILENAME -from postamates.settings import JSON_EXPORT_FILENAME, STATUS_TASK_NAME +from postamates.settings import JSON_EXPORT_FILENAME, STATUS_TASK_NAME, STATUS_TASK_NAME_IMPORT from service import models from service import pagination from service import serializers @@ -22,7 +22,7 @@ from service import utils from service.enums import PointStatus, MatchingStatus from service.permissions import UserPermission from service.service import PointService -from service.tasks import raschet, load_post_and_pvz, load_other_objects, load_data +from service.tasks import raschet, load_post_and_pvz, load_other_objects, load_data, import_task from rest_framework.permissions import AllowAny from django.shortcuts import redirect from django.contrib import messages @@ -379,11 +379,10 @@ class PrePlacementPointViewSet(PlacementPointViewSet): @action(detail=False, methods=['post']) def start_matching(self, request): file_id = request.POST['id'] - total, matched, problem = PointService().start_mathing(file_id) - PointService().make_enrichment() - raschet('service_preplacementpoint', need_time=False) + import_task.delay(file_id) return Response( - {'message': {'total': total, 'matched': matched, 'error': problem, 'unmatched': total - matched - problem}}, + {'message': 'ok'}, + # {'message': {'total': total, 'matched': matched, 'error': problem, 'unmatched': total - matched - problem}}, status=HTTPStatus.OK, ) @@ -426,6 +425,16 @@ class PrePlacementPointViewSet(PlacementPointViewSet): response['Content-Disposition'] = 'attachment; filename="%s"' % os.path.basename('preplacementpoints.xlsx') return response + @action(detail=False, methods=['get']) + def import_status(self, request): + status = models.TaskStatus.objects.filter(task_name=STATUS_TASK_NAME_IMPORT).first() + st = None + data = {} + if status: + st = status.status + data = status.data + return Response({'task_status': st, 'data': data}, status=HTTPStatus.OK) + class refresh_placement_points(APIView): @staticmethod