Merge branch 'new_hereapikey' into 'dev'

New hereapi key; fix ml task, #SSTDEV-1458, SSTDEV-1430, SSTDEV-1397

See merge request spatial/postamates!227
dev
Alexander Shibaev 1 year ago
commit 8434f4b962

2
.gitignore vendored

@ -135,3 +135,5 @@ docker-compose.dev.yml
pg_dumps/
django_static/
dit_frontend/
catboost_info

@ -19,7 +19,7 @@ spec:
containers:
- name: beat
image: DEPLOY_IMAGE_TAG
command: ["sh", "-c", "celery -A postamates beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler"]
command: ["sh", "-c", "celery -A postamates beat -l info"]
envFrom:
- configMapRef:
name: postamates-configmap

@ -3,7 +3,7 @@ FROM ${YC_CONTAINER_REGISTRY}/public/python:3.8
RUN apt-get update && \
apt-get install -y binutils libproj-dev gdal-bin && \
apt-get install -y postgresql-client
apt-get install -y postgresql-client procps gdb
ENV PYTHONUNBUFFERED 1

@ -1,5 +1,3 @@
version: '3.5'
x-postgres-variables: &postgres-variables
POSTGRES_DB: "${POSTGRES_DB}"
POSTGRES_HOST: "${POSTGRES_HOST}"
@ -17,15 +15,20 @@ x-frontend-variables: &frontend-variables
DOMAIN: "${DOMAIN}"
REACT_APP_DOMAIN_URL: "https://${DOMAIN}/"
x-martin-variables: &martin-variables
MARTIN_PORT: "${MARTIN_PORT}"
services:
django:
security_opt:
- seccomp:unconfined
cap_add:
- SYS_PTRACE
container_name: ${CONTAINERS_NAME}_django
build: .
build:
context: .
dockerfile: ./deploy/dockerfiles/Dockerfile
command: >
sh -c "python manage.py migrate &&
python manage.py collectstatic --noinput &&
@ -36,16 +39,17 @@ services:
python manage.py loaddata fixtures/otherobjectsgroups.json &&
python manage.py runserver 0.0.0.0:${DJANGO_PORT}"
environment:
<<: *postgres-variables
<<: *django-variables
<<: *frontend-variables
<<: [*postgres-variables, *django-variables, *frontend-variables]
CELERY_BROKER_URL: amqp://loyalty-rabbit:5672
ports:
- "${DJANGO_PORT}:${DJANGO_PORT}"
- 5678:5678
expose:
- "${DJANGO_PORT}"
restart: always
depends_on:
- db
loyalty-rabbit:
image: rabbitmq:3.9-management
container_name: loyalty-rabbit
@ -59,30 +63,46 @@ services:
- 5672:5672
beat:
security_opt:
- seccomp:unconfined
cap_add:
- SYS_PTRACE
restart: always
environment:
<<: *postgres-variables
<<: *django-variables
<<: [*postgres-variables, *django-variables]
CELERY_BROKER_URL: amqp://loyalty-rabbit:5672
build:
context: .
dockerfile: ./deploy/dockerfiles/Dockerfile
entrypoint: [ "celery", "-A", "postamates", "beat","-l", "info", "--scheduler", "django_celery_beat.schedulers:DatabaseScheduler" ]
depends_on:
- db
- loyalty-rabbit
- django
- worker
ports:
- 5679:5678
worker:
security_opt:
- seccomp:unconfined
cap_add:
- SYS_PTRACE
restart: always
environment:
<<: *postgres-variables
<<: *django-variables
<<: [*postgres-variables, *django-variables]
CELERY_BROKER_URL: amqp://loyalty-rabbit:5672
build:
context: .
entrypoint: [ "celery", "-A", "postamates.celery:app", "worker" ]
dockerfile: ./deploy/dockerfiles/Dockerfile
entrypoint: [ "celery", "-A", "postamates.celery:app", "worker", "--concurrency", "1" ]
depends_on:
- db
- loyalty-rabbit
- django
ports:
- 5680:5678
db:
container_name: ${CONTAINERS_NAME}_db
image: mdillon/postgis
@ -93,7 +113,7 @@ services:
expose:
- "${POSTGRES_PORT}"
volumes:
- ${POSTGRES_VOLUME_PATH}:/var/lib/postgresql/data
- pg_data:/var/lib/postgresql/data
command: -p ${POSTGRES_PORT}
martin:
@ -109,15 +129,16 @@ services:
- db
- django
restart: always
swagger-ui:
image: swaggerapi/swagger-ui
ports:
- "8099:8099"
environment:
PORT: 8099
SWAGGER_JSON_URL: "https://postnet-dev.selftech.ru/media/swagger.json"
SWAGGER_JSON_URL: "https://localhost/media/swagger.json"
volumes:
rabbitmq_data:
rabbitmq_log:
pg_data:

@ -13,6 +13,8 @@ app = Celery(
include=['service.tasks'], )
app.config_from_object('django.conf:settings', namespace=CELERY_NAMESPACE)
app.conf.beat_schedule = {
'age_day_every_24h': {
'task': 'service.tasks.add_age_day',

@ -44,7 +44,7 @@ INSTALLED_APPS = [
'django_json_widget',
'django.contrib.gis',
'django_celery_beat',
'drf_keycloak_auth',
# 'drf_keycloak_auth',
]
MIDDLEWARE = [
@ -178,7 +178,7 @@ AGE_DAY_BORDER = 30
EXCEL_EXPORT_FILENAME = 'placement_points.xlsx'
JSON_EXPORT_FILENAME = 'placement_points.json'
DATA_UPLOAD_MAX_NUMBER_FIELDS = None
GEOCODER_API_KEY = os.getenv('GEOCODER_API_KEY','TzgdKWgyI2nfaz1WHRD-aYJK4e400MiOJQP7Enf1e1M')
GEOCODER_API_KEY = os.getenv('GEOCODER_API_KEY','P9af-1_eLtmraO5A_B5Kbvo4z4MAlzGoGOCH5g1_uiA')
STATUS_TASK_NAME='status_task'
STATUS_TASK_NAME_IMPORT='import_status_task'
@ -186,20 +186,20 @@ STATUS_TASK_NAME_IMPORT='import_status_task'
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework.authentication.SessionAuthentication',
'drf_keycloak_auth.authentication.KeycloakAuthentication',
# 'drf_keycloak_auth.authentication.KeycloakAuthentication',
]
}
DRF_KEYCLOAK_AUTH = {
# 'KEYCLOAK_SERVER_URL': 'http://keycloak.dev.selfservicetech.ru/auth',
'KEYCLOAK_SERVER_URL': os.getenv('KEYCLOAK_SERVER_URL', 'https://kk.dev.selftech.ru/auth'),
'KEYCLOAK_REALM': os.getenv('KEYCLOAK_REALM', 'SST'),
'KEYCLOAK_CLIENT_ID': os.getenv('KEYCLOAK_CLIENT_ID','postnet'),
'KEYCLOAK_CLIENT_SECRET_KEY': os.getenv('KEYCLOAK_CLIENT_SECRET_KEY','K2yHweEUispkVeWn03VMk843sW2Moic5'),
'KEYCLOAK_MANAGE_LOCAL_USER': False,
'KEYCLOAK_ROLE_SET_PREFIX': 'realm_access',
}
# DRF_KEYCLOAK_AUTH = {
# # 'KEYCLOAK_SERVER_URL': 'http://keycloak.dev.selfservicetech.ru/auth',
# 'KEYCLOAK_SERVER_URL': os.getenv('KEYCLOAK_SERVER_URL', 'https://kk.dev.selftech.ru/auth'),
# 'KEYCLOAK_REALM': os.getenv('KEYCLOAK_REALM', 'SST'),
# 'KEYCLOAK_CLIENT_ID': os.getenv('KEYCLOAK_CLIENT_ID','postnet'),
# 'KEYCLOAK_CLIENT_SECRET_KEY': os.getenv('KEYCLOAK_CLIENT_SECRET_KEY','K2yHweEUispkVeWn03VMk843sW2Moic5'),
# 'KEYCLOAK_MANAGE_LOCAL_USER': False,
# 'KEYCLOAK_ROLE_SET_PREFIX': 'realm_access',
# }
KK_EDITOR_ROLE = os.getenv('KK_EDITOR_ROLE', 'postnet_editor')
KK_VIEWER_ROLE = os.getenv('KK_VIEWER_ROLE', 'postnet_viewer')

@ -0,0 +1,4 @@
[tool.ruff]
target-version = 'py38'
line-length = 120
indent-width = 4

@ -1,5 +1,5 @@
from django.core.management.base import BaseCommand
from service.utils import run_sql_command, log_to_telegram
from service.utils import run_sql_command, log
CMD_PIVOT_DIST = """CREATE OR REPLACE VIEW compact_placementpoint AS
SELECT id, status, category, age_day, fact, area_id, district_id, prediction_first, prediction_current, doors, flat_cnt, rival_post_cnt, rival_pvz_cnt, target_post_cnt, flats_cnt, tc_cnt, culture_cnt, mfc_cnt, public_stop_cnt, supermarket_cnt, target_dist, metro_dist, geometry, delta_first, delta_current FROM service_placementpoint;
@ -63,10 +63,10 @@ class Command(BaseCommand):
def handle(self, *args, **kwargs):
try:
log_to_telegram('Creating procedures')
log('Creating procedures')
run_sql_command(CMD_PIVOT_DIST)
log_to_telegram('pivot_dist created')
log('pivot_dist created')
run_sql_command(CMD_PIVOT_DIST_PRE)
log_to_telegram('prepivot_dist created')
log('prepivot_dist created')
except Exception as e:
log_to_telegram('Error creating views: ' + str(e))
log('Error creating views: ' + str(e))

@ -1,5 +1,5 @@
from django.core.management.base import BaseCommand
from service.utils import run_sql_command, log_to_telegram
from service.utils import run_sql_command, log
CMD_PIVOT_DIST = """CALL public.pivot_dist();"""
@ -9,10 +9,10 @@ class Command(BaseCommand):
def handle(self, *args, **kwargs):
try:
log_to_telegram('Creating views')
log('Creating views')
run_sql_command(CMD_PIVOT_DIST)
log_to_telegram('pivot_dist created')
log('pivot_dist created')
run_sql_command(CMD_PIVOT_DIST_PRE)
log_to_telegram('prepivot_dist created')
log('prepivot_dist created')
except Exception as e:
log_to_telegram('Error creating views: ' + str(e))
log('Error creating views: ' + str(e))

@ -1,5 +1,5 @@
from django.core.management.base import BaseCommand
from service.utils import run_sql_command, log_to_telegram
from service.utils import run_sql_command, log
CMD_PIVOT_DIST = """DROP MATERIALIZED VIEW IF EXISTS public.points_with_dist;"""
@ -9,10 +9,10 @@ class Command(BaseCommand):
def handle(self, *args, **kwargs):
try:
log_to_telegram('Deleting views')
log('Deleting views')
run_sql_command(CMD_PIVOT_DIST)
log_to_telegram('pivot_dist deleted')
log('pivot_dist deleted')
run_sql_command(CMD_PIVOT_DIST_PRE)
log_to_telegram('prepivot_dist deleted')
log('prepivot_dist deleted')
except Exception as e:
log_to_telegram('Error deleting views: ' + str(e))
log('Error deleting views: ' + str(e))

@ -1,5 +1,5 @@
from django.core.management.base import BaseCommand
from service.utils import run_sql_command, log_to_telegram
from service.utils import run_sql_command, log
from service.models import TaskStatus
from postamates.settings import STATUS_TASK_NAME, STATUS_TASK_NAME_IMPORT

@ -1,14 +1,31 @@
from rest_framework.permissions import BasePermission
# from drf_keycloak_auth.authentication import KeycloakAuthentication
from django.conf import settings
from logging import getLogger
from django.core.handlers.wsgi import WSGIRequest
from logging import getLogger, basicConfig, DEBUG
basicConfig(level=DEBUG)
logger = getLogger(__name__)
def serialize(obj):
attributes = sorted(list(dir(obj)))
for attr in attributes:
try:
value = getattr(obj, attr)
yield f"{attr}: {value}\n"
except:
pass
class UserPermission(BasePermission):
def has_permission(self, request, view):
def has_permission(self, request: WSGIRequest, view):
return True
# logger.error(f'KK_CLIENT_ID: {settings.DRF_KEYCLOAK_AUTH["KEYCLOAK_CLIENT_ID"]}')
# logger.error(f'KK_CLIENT_SECRET_KEY: {settings.DRF_KEYCLOAK_AUTH["KEYCLOAK_CLIENT_SECRET_KEY"]}')
kk_profile = request.auth
kk_roles = kk_profile.get('resource_access',{}).get('postnet',{}).get('roles',[])
if request.method not in ['GET']:

@ -1,4 +1,5 @@
import os
from typing import cast
import catboost
import geopandas as gpd
@ -6,6 +7,7 @@ import numpy as np
import pandas as pd
import shap
import psycopg2
import psycopg2.extras
import sqlalchemy
from celery import shared_task
from django.db.models import F
@ -20,7 +22,7 @@ from postamates.settings import AGE_DAY_LIMIT
from postamates.settings import DB_URL, STATUS_TASK_NAME, STATUS_TASK_NAME_IMPORT
from service.models import PlacementPoint, LastMLCall
from service import models
from service.utils import log_to_telegram
from service.utils import log
import base64
from io import StringIO
from django.core.cache import cache
@ -29,260 +31,336 @@ from service.service import PointService
from service.utils import run_psql_command, change_status
@shared_task()
def raschet(table_name='service_placementpoint', need_time=True, task_name=STATUS_TASK_NAME):
print('start raschet')
# status, _ = models.TaskStatus.objects.get_or_create(task_name=STATUS_TASK_NAME)
def get_engine():
try:
log("try connect to db")
return sqlalchemy.create_engine(
DB_URL,
connect_args={"options": "-csearch_path=public"},
)
except:
log("error connect to db")
raise
def do_computations(table_name: str, need_time: bool, task_name: str):
def count_nearby_pvz():
raschet_objs = models.RaschetObjects.objects.all()
if raschet_objs:
change_status('Начало расчета кол-ва ПВЗ вокруг точек', task_name=task_name)
if not raschet_objs:
return
change_status("Начало расчета кол-ва ПВЗ вокруг точек", task_name=task_name)
total = raschet_objs.count()
for _i, r_o in enumerate(raschet_objs):
obj = models.Post_and_pvz.objects.get(id=r_o.obj_id)
LayerService().count_post_pvz_for_placementpoint(obj)
change_status(f'Подсчет кол-ва ПВЗ вокруг точек: {str(int((_i + 1) / total * 100))}%', task_name=task_name)
change_status('Подсчет кол-ва ПВЗ вокруг точек завершен', task_name=task_name)
change_status(
f"Подсчет кол-ва ПВЗ вокруг точек: {str(int((_i + 1) / total * 100))}%",
task_name=task_name,
)
change_status("Подсчет кол-ва ПВЗ вокруг точек завершен", task_name=task_name)
def calculate_dist_for_groups():
group_objects = models.RaschetGroups.objects.all()
group_total = group_objects.count()
if group_objects:
change_status('Начало расчета расстояний', task_name=task_name)
change_status("Начало расчета расстояний", task_name=task_name)
qs = models.PlacementPoint.objects.all()
for _k, g_o in enumerate(group_objects):
g = models.Post_and_pvzGroup.objects.get(id=g_o.obj_id)
for q in qs:
PointService.calculate_dist_for_group(point=q, group=g)
change_status(f'Подсчет расстояний: {str(int((_k + 1) / group_total * 100))}%', task_name=task_name)
change_status('Расчет завершен', task_name=task_name)
change_status(
f"Подсчет расстояний: {str(int((_k + 1) / group_total * 100))}%",
task_name=task_name,
)
change_status("Расчет завершен", task_name=task_name)
def process_raschet_objects():
count_nearby_pvz()
calculate_dist_for_groups()
models.RaschetObjects.objects.all().delete()
models.RaschetGroups.objects.all().delete()
print("start raschet")
# status, _ = models.TaskStatus.objects.get_or_create(task_name=STATUS_TASK_NAME)
# Запуск ML
change_status('Шаг 3 из 3 (Прогноз трафика в точках).', task_name=task_name)
log_to_telegram(f'{table_name} start raschet')
try:
log_to_telegram('try connect to db')
conn = sqlalchemy.create_engine(
DB_URL,
connect_args={'options': '-csearch_path=public'},
)
except:
log_to_telegram('error connect to db')
process_raschet_objects()
change_status("Шаг 3 из 3 (Прогноз трафика в точках).", task_name=task_name)
log(f"{table_name} start raschet")
engine = get_engine()
try:
query = text('select * from service_placementpoint')
connection = conn.connect()
pts = pd.read_sql(query, connection)
pts['geometry'] = pts['geometry'].apply(wkb.loads, hex=True)
pts = gpd.GeoDataFrame(pts, geometry='geometry', crs='epsg:4326')
pts = pts.to_crs('epsg:32637')
query = text("select * from service_placementpoint")
connection = engine.connect()
pts_df = pd.read_sql(query, connection)
pts_df.geometry = pts_df.geometry.apply(wkb.loads, hex=True)
pts = gpd.GeoDataFrame(pts_df, geometry="geometry", crs="epsg:4326")
pts = pts.to_crs("epsg:32637")
feats = [
'id', 'metro_dist', 'target_dist', 'property_price_bargains', 'property_price_offers',
'property_mean_floor',
'property_era', 'flats_cnt', 'popul_home', 'popul_job', 'yndxfood_sum',
'yndxfood_cnt', 'school_cnt', 'kindergar_cnt', 'target_post_cnt', 'public_stop_cnt', 'sport_center_cnt',
'pharmacy_cnt', 'supermarket_cnt', 'supermarket_premium_cnt', 'clinic_cnt', 'bank_cnt', 'reca_cnt',
'lab_cnt', 'culture_cnt', 'attraction_cnt', 'mfc_cnt', 'bc_cnt', 'tc_cnt', 'rival_pvz_cnt',
'rival_post_cnt',
'business_activity', 'age_day', 'target_cnt_ao_mean'
"id",
"metro_dist",
"target_dist",
"property_price_bargains",
"property_price_offers",
"property_mean_floor",
"property_era",
"flats_cnt",
"popul_home",
"popul_job",
"yndxfood_sum",
"yndxfood_cnt",
"school_cnt",
"kindergar_cnt",
"target_post_cnt",
"public_stop_cnt",
"sport_center_cnt",
"pharmacy_cnt",
"supermarket_cnt",
"supermarket_premium_cnt",
"clinic_cnt",
"bank_cnt",
"reca_cnt",
"lab_cnt",
"culture_cnt",
"attraction_cnt",
"mfc_cnt",
"bc_cnt",
"tc_cnt",
"rival_pvz_cnt",
"rival_post_cnt",
"business_activity",
"age_day",
"target_cnt_ao_mean",
]
# Записи для обучения
pts_trn = pts.loc[pts.name == 'постамат Яндекс.Маркет из обучающей выборки'].reset_index(drop=True)
pts_trn = pts_trn.sort_values('address').reset_index(drop=True)
pts_trn = gpd.GeoDataFrame(pts_trn, geometry='geometry', crs='epsg:32637')
pts_target = pts_trn[['geometry']]
pts_target['cnt'] = 1
pts_target = gpd.GeoDataFrame(pts_target, geometry='geometry', crs='epsg:32637')
pts_trn = pts.loc[pts.name == "постамат Яндекс.Маркет из обучающей выборки"].reset_index(drop=True)
pts_trn = pts_trn.sort_values("address").reset_index(drop=True)
pts_trn = gpd.GeoDataFrame(pts_trn, geometry="geometry", crs="epsg:32637")
pts_target = cast(gpd.GeoDataFrame, pts_trn[["geometry"]])
pts_target["cnt"] = 1
pts_target = gpd.GeoDataFrame(pts_target, geometry="geometry", crs="epsg:32637")
target_feature_coords = []
for i in range(0, len(pts_target)):
target_feature_coords.append((pts_target.geometry.x[i], pts_target.geometry.y[i]))
target_feature_coords = np.array(target_feature_coords)
pts_trn['target_dist'] = pts_trn.apply(
lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[1])),
pts_trn["target_dist"] = pts_trn.apply(
lambda x: (sorted(distance.cdist([[x["geometry"].x, x["geometry"].y]], target_feature_coords)[0])[1]),
axis=1,
)
pts_trn.loc[pts_trn.target_dist > 700, 'target_dist'] = 700
pts_trn['buf'] = pts_trn.buffer(500)
pts_trn = gpd.GeoDataFrame(pts_trn, geometry='buf', crs='epsg:32637')
target_post = gpd.sjoin(pts_trn, pts_target, op='contains').groupby('id', as_index=False).agg({'cnt': 'count'})
target_post = target_post.rename(columns={'cnt': 'target_post_cnt'})
pts_trn = pts_trn.drop(columns=['target_post_cnt'])
pts_trn = pts_trn.join(target_post.set_index('id'), on='id')
pts_trn['target_post_cnt'] = pts_trn['target_post_cnt'] - 1
pts_trn = pts_trn.sort_values(by='id').reset_index(drop=True)
X_trn = pts_trn[feats].drop(columns=['id'])
Y_trn = pts_trn[['fact']]
pts_trn.loc[pts_trn.target_dist > 700, "target_dist"] = 700
pts_trn["buf"] = pts_trn.buffer(500)
pts_trn = gpd.GeoDataFrame(pts_trn, geometry="buf", crs="epsg:32637")
target_post = gpd.sjoin(pts_trn, pts_target, op="contains").groupby("id", as_index=False).agg({"cnt": "count"})
target_post = target_post.rename(columns={"cnt": "target_post_cnt"})
pts_trn = pts_trn.drop(columns=["target_post_cnt"])
pts_trn = pts_trn.join(target_post.set_index("id"), on="id")
pts_trn["target_post_cnt"] = pts_trn.target_post_cnt - 1
pts_trn = pts_trn.sort_values(by="id").reset_index(drop=True)
X_trn = pts_trn[feats].drop(columns=["id"])
Y_trn = pts_trn[["fact"]]
# status.status = 'Записи для инференса'
# status.save()
#change_status('Записи для инференса', task_name=task_name)
# change_status('Записи для инференса', task_name=task_name)
# Записи для инференса
if table_name == 'service_placementpoint':
pts_inf = pts.loc[(pts.status == 'Pending') |
(pts.status == 'Installation') |
(pts.status == 'Cancelled') |
((pts.status == 'Working') & (pts.sample_trn == False))].reset_index(drop=True)
elif table_name == 'service_preplacementpoint':
pts_inf = pd.DataFrame(conn.connect().execute(text(f"select * from {table_name}")))
pts_inf = pts_inf.loc[pts_inf.matching_status == 'New'].reset_index(drop=True)
pts_inf['geometry'] = pts_inf['geometry'].apply(wkb.loads, hex=True)
pts_inf = gpd.GeoDataFrame(pts_inf, geometry='geometry', crs='epsg:4326')
if len(pts_inf) > 0:
pts_inf = pts_inf.to_crs('epsg:32637')
if table_name == "service_placementpoint":
pts_inf = pts.loc[
(pts.status == "Pending")
| (pts.status == "Installation")
| (pts.status == "Cancelled")
| ((pts.status == "Working") & (pts.sample_trn == False))
].reset_index(drop=True)
elif table_name == "service_preplacementpoint":
pts_inf_df = pd.DataFrame(engine.connect().execute(text(f"select * from {table_name}")))
pts_inf_df = pts_inf_df.loc[pts_inf_df.matching_status == "New"].reset_index(drop=True)
pts_inf_df["geometry"] = pts_inf_df["geometry"].apply(wkb.loads, hex=True)
pts_inf = gpd.GeoDataFrame(pts_inf_df, geometry="geometry", crs="epsg:4326")
else:
raise ValueError("Unexpected `table_name` parameter")
if not len(pts_inf):
raise Exception("Empty point set")
pts_inf: gpd.GeoDataFrame = pts_inf.to_crs("epsg:32637")
pts_inf['buf'] = pts_inf.buffer(500)
pts_inf = gpd.GeoDataFrame(pts_inf, geometry='buf', crs='epsg:32637')
pts_target = pts.loc[(pts.status == 'Working') |
(pts.status == 'Installation')].reset_index(drop=True)
pts_target = pts_target[['geometry']]
pts_target['cnt'] = 1
pts_target = gpd.GeoDataFrame(pts_target, geometry='geometry', crs='epsg:32637')
pts_inf["buf"] = pts_inf.buffer(500)
pts_inf = gpd.GeoDataFrame(pts_inf, geometry="buf", crs="epsg:32637")
pts_target = pts.loc[(pts.status == "Working") | (pts.status == "Installation")].reset_index(drop=True)
pts_target = pts_target[["geometry"]]
pts_target["cnt"] = 1
pts_target = gpd.GeoDataFrame(pts_target, geometry="geometry", crs="epsg:32637")
target_feature_coords = []
for i in range(0, len(pts_target)):
target_feature_coords.append((pts_target.geometry.x[i], pts_target.geometry.y[i]))
target_feature_coords = np.array(target_feature_coords)
pts_inf['target_dist'] = pts_inf.apply(
def dist_working(x):
def cdist_(x):
return distance.cdist([[x["geometry"].x, x["geometry"].y]], target_feature_coords)[0]
return (
sorted(cdist_(x))[1]
if x.status in {"Working", "Installation"}
else sorted(cdist_(x))[0]
)
pts_inf["target_dist"] = pts_inf.apply(dist_working, axis=1)
pts_inf.loc[pts_inf.target_dist > 700, "target_dist"] = 700
pts_inf = pts_inf.sort_values(by="id").reset_index(drop=True) # type: ignore
target_post = gpd.sjoin(pts_inf, pts_target, op="contains").groupby("id", as_index=False).agg({"cnt": "count"})
target_post = target_post.rename(columns={"cnt": "target_post_cnt"})
pts_inf = pts_inf.drop(columns=["target_post_cnt"])
pts_inf = pts_inf.join(target_post.set_index("id"), on="id")
pts_inf.target_post_cnt = pts_inf.target_post_cnt.fillna(0)
pts_inf.target_post_cnt = pts_inf.apply(
lambda x: (
(sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[1]) if (
(x.status == 'Working') or (x.status == 'Installation')) else
(sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[0])),
(x.target_post_cnt - 1)
if ((x.status == "Working") or (x.status == "Installation"))
else x.target_post_cnt
),
axis=1,
)
pts_inf.loc[pts_inf.target_dist > 700, 'target_dist'] = 700
pts_inf = pts_inf.sort_values(by='id').reset_index(drop=True)
target_post = gpd.sjoin(pts_inf, pts_target, op='contains').groupby('id', as_index=False).agg(
{'cnt': 'count'})
target_post = target_post.rename(columns={'cnt': 'target_post_cnt'})
pts_inf = pts_inf.drop(columns=['target_post_cnt'])
pts_inf = pts_inf.join(target_post.set_index('id'), on='id')
pts_inf['target_post_cnt'] = pts_inf['target_post_cnt'].fillna(0)
pts_inf['target_post_cnt'] = pts_inf.apply(lambda x: ((x.target_post_cnt - 1) if (
(x.status == 'Working') or (x.status == 'Installation')) else x.target_post_cnt), axis=1)
pts_inf['age_day_init'] = pts_inf['age_day']
pts_inf['age_day'] = 240
X_inf = pts_inf[feats]
pts_inf["age_day_init"] = pts_inf["age_day"]
pts_inf["age_day"] = 240
X_inf = pts_inf[feats].dropna()
seeds = [99, 87, 21, 15]
# Обучение, инференс
r2_scores = []
mapes = []
x_inf: gpd.GeoDataFrame = X_inf.drop(columns=["id"]) # type: ignore
y_infers = []
# status.status = 'Обучение inference 0%'
# status.save()
#change_status('Обучение inference 0%', task_name=task_name)
# change_status('Обучение inference 0%', task_name=task_name)
model = catboost.CatBoostRegressor()
for i in seeds:
# status.status = 'Обучение inference: ' + str(int((seeds.index(i) + 1) / len(seeds) * 100)) + '%'
# status.save()
#change_status(f'Обучение inference: {str(int((seeds.index(i) + 1) / len(seeds) * 100))}%',
#task_name=task_name)
# change_status(f'Обучение inference: {str(int((seeds.index(i) + 1) / len(seeds) * 100))}%',
# task_name=task_name)
x_trn, x_test, y_trn, y_test = ms.train_test_split(X_trn, Y_trn, test_size=0.2, random_state=i)
model = catboost.CatBoostRegressor(cat_features=['property_era'], random_state=i)
model = catboost.CatBoostRegressor(cat_features=["property_era"], random_state=i)
model.fit(x_trn, y_trn, verbose=False)
r2_score = metrics.r2_score(y_test, model.predict(x_test))
mape = metrics.mean_absolute_percentage_error(y_test, model.predict(x_test))
if ((r2_score > 0.45) & (mape < 0.25)):
if (r2_score > 0.45) & (mape < 0.25):
r2_scores.append(r2_score)
mapes.append(mape)
y_infers.append(model.predict(X_inf.drop(columns=['id'])))
#change_status('Обучение inference 100%', task_name=task_name)
y_infers.append(model.predict(x_inf))
# change_status('Обучение inference 100%', task_name=task_name)
# status.status = 'Обучение inference 100%'
current_pred = sum(y_infers) / 4
# расчет шапов
explainer = shap.TreeExplainer(model)
shap_values = explainer(X_inf.drop(columns=['id']))
shap_values = explainer(x_inf)
shap_fields = pd.DataFrame(shap_values.values)
shap_fields.columns = X_inf.drop(columns=['id']).columns + '_shap'
shap_fields = shap_fields.drop(columns=['age_day_shap'])
shap_fields['sum'] = abs(shap_fields).sum(axis=1)
shap_fields = round(shap_fields.iloc[:, :32].div(shap_fields['sum'], axis=0) * 100, 2)
shap_fields.columns = x_inf.columns + "_shap"
shap_fields = shap_fields.drop(columns=["age_day_shap"])
shap_fields["sum"] = abs(shap_fields).sum(axis=1)
shap_fields = round(shap_fields.iloc[:, :32].div(shap_fields["sum"], axis=0) * 100, 2)
# Обновление полей по результатам работы модели
update_fields = pts_inf[
[
'id', 'age_day_init', 'status', 'fact', 'delta_current', 'delta_first', 'plan_current',
'plan_first',
'prediction_first', 'target_post_cnt', 'target_dist'
]
"id",
"age_day_init",
"status",
"fact",
"delta_current",
"delta_first",
"plan_current",
"plan_first",
"prediction_first",
"target_post_cnt",
"target_dist",
]
].dropna()
update_fields = update_fields.join(
pd.concat(
[
X_inf[['id']],
pd.DataFrame({'prediction_current': current_pred}),
X_inf[["id"]],
pd.DataFrame({"prediction_current": current_pred}),
],
axis=1,
).set_index('id'),
on='id',
)
update_fields['prediction_current'] = update_fields['prediction_current'].astype(int)
).set_index("id"),
on="id",
).dropna()
update_fields["prediction_current"] = update_fields["prediction_current"].astype(int)
days_x = np.array([0, 30, 60, 90, 120, 150, 180, 210, 240, 270])
perc_y = np.array([0, 0.15, 0.20, 0.30, 0.60, 0.70, 0.70, 0.75, 0.75, 0.80])
spl = interpolate.splrep(days_x, perc_y)
update_fields['plan_first'] = update_fields.apply(
lambda x: (x.prediction_first * interpolate.splev(x.age_day_init, spl) if x.status == 'Working' else 0),
update_fields["plan_first"] = update_fields.apply(
lambda x: (x.prediction_first * interpolate.splev(x.age_day_init, spl) if x.status == "Working" else 0),
axis=1,
)
update_fields['plan_current'] = update_fields.apply(
lambda x: (
x.prediction_current * interpolate.splev(x.age_day_init, spl) if x.status == 'Working' else 0),
update_fields["plan_current"] = update_fields.apply(
lambda x: (x.prediction_current * interpolate.splev(x.age_day_init, spl) if x.status == "Working" else 0),
axis=1,
)
update_fields['delta_first'] = update_fields.apply(
lambda x: ((x.fact - x.plan_first) / x.plan_first * 100 if x.status == 'Working' else 0),
update_fields["delta_first"] = update_fields.apply(
lambda x: ((x.fact - x.plan_first) / x.plan_first * 100 if x.status == "Working" else 0),
axis=1,
)
update_fields['delta_current'] = update_fields.apply(
lambda x: ((x.fact - x.plan_current) / x.plan_current * 100 if x.status == 'Working' else 0),
update_fields["delta_current"] = update_fields.apply(
lambda x: ((x.fact - x.plan_current) / x.plan_current * 100 if x.status == "Working" else 0),
axis=1,
)
update_fields_working = update_fields.loc[update_fields.status == 'Working'].reset_index(drop=True)
update_fields_working = update_fields.loc[update_fields.status == "Working"].reset_index(drop=True)
update_fields_working = update_fields_working.fillna(0)
connection.close()
except Exception as e:
log_to_telegram(f'Ошибка при обновлении полей в базе данных: {e}')
log_to_telegram('Начинается обновление полей в базе')
log(f"Ошибка при обновлении полей в базе данных: {e}")
raise
log("Начинается обновление полей в базе")
if len(pts_inf) > 0:
# status.status = 'Перерасчет ML: 50%'
# status.save()
#change_status('Перерасчет ML: 50%', task_name=task_name)
# change_status('Перерасчет ML: 50%', task_name=task_name)
# Загрузка в базу обновленных значений
try:
log_to_telegram('Подключение к базе данных 2')
log("Подключение к базе данных 2")
conn2 = psycopg2.connect(
database=os.getenv('POSTGRES_DB', 'postgres'), user=os.getenv('POSTGRES_USER', 'postgres'),
password=os.getenv('POSTGRES_PASSWORD', 'postgres'),
host=os.getenv('POSTGRES_HOST', 'postgres'), port=os.getenv('POSTGRES_PORT', 'postgres'),
options='-c search_path=public',
database=os.getenv("POSTGRES_DB", "postgres"),
user=os.getenv("POSTGRES_USER", "postgres"),
password=os.getenv("POSTGRES_PASSWORD", "postgres"),
host=os.getenv("POSTGRES_HOST", "postgres"),
port=os.getenv("POSTGRES_PORT", "postgres"),
options="-c search_path=public",
)
cursor = conn2.cursor()
except:
conn2 = None
log_to_telegram('Не удалось подключиться к базе данных')
log("Не удалось подключиться к базе данных")
raise
if conn2 is not None:
# апдейт шапов
update_fields_shap = pd.concat([shap_fields, update_fields[['id']]], axis=1)
update_fields_shap = pd.concat([shap_fields, update_fields[["id"]]], axis=1).dropna()
update_records0 = []
for i in range(0, len(update_fields_shap)):
for i in update_fields_shap.id.keys():
update_records1 = []
for n in list(update_fields_shap):
update_records1.append(int(update_fields_shap[n][i]))
update_records0.append(tuple(update_records1))
shap_fields_name = str(list(shap_fields))[1:-1].replace("'", "").replace(',', '=%s,')
shap_fields_name = str(list(shap_fields))[1:-1].replace("'", "").replace(",", "=%s,")
sql_update_query = f"""Update {table_name} set {shap_fields_name} = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records0)
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records0) # это тоже долго
conn2.commit()
except:
cursor.execute("ROLLBACK")
@ -291,7 +369,7 @@ def raschet(table_name='service_placementpoint', need_time=True, task_name=STATU
# target_post_cnt
update_records1 = []
for i in range(0, len(update_fields)):
for i in update_fields.id.keys():
update_records1.append((int(update_fields.target_post_cnt[i]), int(update_fields.id[i])))
sql_update_query = f"""Update {table_name} set target_post_cnt = %s where id = %s"""
try:
@ -304,7 +382,7 @@ def raschet(table_name='service_placementpoint', need_time=True, task_name=STATU
# target_dist
update_records1 = []
for i in range(0, len(update_fields)):
for i in update_fields.id.keys():
update_records1.append((int(update_fields.target_dist[i]), int(update_fields.id[i])))
sql_update_query = f"""Update {table_name} set target_dist = %s where id = %s"""
try:
@ -317,129 +395,165 @@ def raschet(table_name='service_placementpoint', need_time=True, task_name=STATU
# prediction_current
update_records1 = []
for i in range(0, len(update_fields)):
for i in update_fields.id.keys():
update_records1.append((int(update_fields.prediction_current[i]), int(update_fields.id[i])))
sql_update_query = f"""Update {table_name} set prediction_current = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
cursor.execute("ROLLBACK")
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1)
conn2.commit()
# plan_first
# plan_first вот это очень долго
update_records2 = []
for i in range(0, len(update_fields_working)):
update_records2.append((int(update_fields_working.plan_first[i]), int(update_fields_working.id[i])))
for i in update_fields_working.id.keys():
update_records2.append(
(
int(update_fields_working.plan_first[i]),
int(update_fields_working.id[i]),
)
)
sql_update_query = f"""Update {table_name} set plan_first = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records2)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
cursor.execute("ROLLBACK")
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records2)
conn2.commit()
# plan_current
update_records3 = []
for i in range(0, len(update_fields_working)):
update_records3.append((int(update_fields_working.plan_current[i]), int(update_fields_working.id[i])))
for i in update_fields_working.id.keys():
update_records3.append(
(
int(update_fields_working.plan_current[i]),
int(update_fields_working.id[i]),
)
)
sql_update_query = f"""Update {table_name} set plan_current = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records3)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
cursor.execute("ROLLBACK")
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records3)
conn2.commit()
# delta_first
update_records4 = []
for i in range(0, len(update_fields_working)):
update_records4.append((int(update_fields_working.delta_first[i]), int(update_fields_working.id[i])))
for i in update_fields_working.id.keys():
update_records4.append(
(
int(update_fields_working.delta_first[i]),
int(update_fields_working.id[i]),
)
)
sql_update_query = f"""Update {table_name} set delta_first = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records4)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
cursor.execute("ROLLBACK")
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records4)
conn2.commit()
# delta_current
update_records5 = []
for i in range(0, len(update_fields_working)):
update_records5.append((int(update_fields_working.delta_current[i]), int(update_fields_working.id[i])))
for i in update_fields_working.id.keys():
update_records5.append(
(
int(update_fields_working.delta_current[i]),
int(update_fields_working.id[i]),
)
)
sql_update_query = f"""Update {table_name} set delta_current = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records5)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
cursor.execute("ROLLBACK")
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records5)
conn2.commit()
conn2.close()
cache.clear()
else:
log_to_telegram('len(pts_inf) <= 0')
log("len(pts_inf) <= 0")
run_psql_command()
log_to_telegram('end raschet')
log("end raschet")
# status.status = 'Перерасчет ML завершен'
# status.save()
change_status('Перерасчет ML завершен', task_name=task_name)
change_status("Перерасчет ML завершен", task_name=task_name)
if need_time:
LastMLCall.objects.all().delete()
LastMLCall.objects.create()
@shared_task()
def raschet(table_name="service_placementpoint", need_time=True, task_name=STATUS_TASK_NAME):
try:
do_computations(table_name, need_time, task_name)
finally:
change_status("Перерасчет ML завершен", task_name=task_name)
@shared_task
def load_post_and_pvz(obj_id: int):
file = models.TempFiles.objects.get(id=obj_id)
status, _ = models.TaskStatus.objects.get_or_create(task_name='Загрузка ПВЗ и Постаматов')
status, _ = models.TaskStatus.objects.get_or_create(task_name="Загрузка ПВЗ и Постаматов")
excel_file = base64.b64decode(file.data)
df = pd.read_excel(excel_file)
df = df.replace(np.nan, None)
df = df.replace('NaT', None)
df = df.replace("NaT", None)
df.columns = df.columns.str.lower()
data_len = df.shape[0]
for _ind, row in enumerate(df.to_dict('records')):
for _ind, row in enumerate(df.to_dict("records")):
status.status = "Загрузка данных: " + str(int(_ind / data_len * 100)) + "%"
status.save()
category = row.get('category')
group = row.get('group')
category = row.get("category")
group = row.get("group")
if category:
cat, _ = models.Post_and_pvzCategory.objects.get_or_create(name=category)
if group:
gr, _ = models.Post_and_pvzGroup.objects.get_or_create(name=group, category=cat)
row['category'] = cat
row['group'] = gr
lon = str(row.pop('lon'))
row["category"] = cat
row["group"] = gr
lon = str(row.pop("lon"))
lat = str(row.pop("lat"))
row['wkt'] = "POINT(" + lon + " " + lat + ")"
row["wkt"] = "POINT(" + lon + " " + lat + ")"
models.Post_and_pvz.objects.get_or_create(**row)
status.status = "Загрузка данных завершена"
status.save()
groups = df[['group', 'category']].drop_duplicates().to_dict(orient='records')
groups = df[["group", "category"]].drop_duplicates().to_dict(orient="records")
points = models.PlacementPoint.objects.all()
num_points = points.count()
total = len(groups) * num_points
for _i, gr in enumerate(groups):
group = models.Post_and_pvzGroup.objects.get(name=gr['group'], category__name=gr['category'])
group = models.Post_and_pvzGroup.objects.get(name=gr["group"], category__name=gr["category"])
for _j, point in enumerate(points):
status.status = "Подсчет расстояний: " + str(int((num_points * _i + _j) / total * 100)) + "%"
status.save()
post_object = models.Post_and_pvz.objects.filter(group__id=group.id).annotate(
distance=Distance("wkt", point.geometry)).order_by('distance').first()
d = models.PlacementPointPVZDistance.objects.filter(placement_point=point,
pvz_postamates_group=group).first()
post_object = (
models.Post_and_pvz.objects.filter(group__id=group.id)
.annotate(distance=Distance("wkt", point.geometry))
.order_by("distance")
.first()
)
d = models.PlacementPointPVZDistance.objects.filter(
placement_point=point, pvz_postamates_group=group
).first()
if d:
if d.dist > post_object.distance.m:
d.dist = post_object.distance.m
d.save()
else:
models.PlacementPointPVZDistance.objects.create(placement_point=point, pvz_postamates_group=group,
dist=post_object.distance.m)
models.PlacementPointPVZDistance.objects.create(
placement_point=point,
pvz_postamates_group=group,
dist=post_object.distance.m,
)
status.status = "Подсчет расстояний завершен"
status.save()
point_qs = models.PlacementPoint.objects.all()
@ -456,9 +570,9 @@ def load_post_and_pvz(obj_id: int):
@shared_task()
def add_age_day():
qs = PlacementPoint.objects.filter(status='Working')
qs = PlacementPoint.objects.filter(status="Working")
# c1 = qs.filter(sample_trn=True).count()
qs.update(age_day=F('age_day') + 1)
qs.update(age_day=F("age_day") + 1)
qs2 = qs.filter(age_day__gt=AGE_DAY_LIMIT)
qs2.update(sample_trn=True)
# c2 = PlacementPoint.objects.filter(sample_trn=True).count()
@ -469,27 +583,27 @@ def add_age_day():
@shared_task()
def load_other_objects(obj_id: int):
file = models.TempFiles.objects.get(id=obj_id)
status, _ = models.TaskStatus.objects.get_or_create(task_name='Загрузка Прочих объектов')
status, _ = models.TaskStatus.objects.get_or_create(task_name="Загрузка Прочих объектов")
excel_file = base64.b64decode(file.data)
df = pd.read_excel(excel_file)
df = df.replace(np.nan, None)
df = df.replace('NaT', None)
df = df.replace("NaT", None)
df.columns = df.columns.str.lower()
data_len = df.shape[0]
for _ind, row in enumerate(df.to_dict('records')):
for _ind, row in enumerate(df.to_dict("records")):
status.status = "Загрузка данных: " + str(int(_ind / data_len * 100)) + "%"
status.save()
category = row.get('category')
group = row.get('group')
category = row.get("category")
group = row.get("group")
if category:
cat, _ = models.OtherObjectsCategory.objects.get_or_create(name=category)
if group:
gr, _ = models.OtherObjectsGroup.objects.get_or_create(name=group, category=cat)
row['category'] = cat
row['group'] = gr
lon = str(row.pop('lon'))
row["category"] = cat
row["group"] = gr
lon = str(row.pop("lon"))
lat = str(row.pop("lat"))
row['wkt'] = "POINT(" + lon + " " + lat + ")"
row["wkt"] = "POINT(" + lon + " " + lat + ")"
models.OtherObjects.objects.get_or_create(**row)
status.status = "Загрузка данных завершена"
cache.clear()
@ -499,23 +613,20 @@ def load_other_objects(obj_id: int):
@shared_task()
def load_data(obj_id: int):
status, _ = models.TaskStatus.objects.get_or_create(task_name='Загрузка Точек')
status, _ = models.TaskStatus.objects.get_or_create(task_name="Загрузка Точек")
file = models.TempFiles.objects.get(id=obj_id)
csv_file = base64.b64decode(file.data)
models.PlacementPoint.objects.all().delete()
s = str(csv_file, 'utf-8')
s = str(csv_file, "utf-8")
data = StringIO(s)
df = pd.read_csv(data, delimiter=';')
df = pd.read_csv(data, delimiter=";")
df = df.replace(np.nan, None)
df = df.replace('NaT', None)
df = df.replace("NaT", None)
data_len = df.shape[0]
for _ind, row in enumerate(df.to_dict('records')):
for _ind, row in enumerate(df.to_dict("records")):
status.status = "Загрузка данных: " + str(int(_ind / data_len * 100)) + "%"
status.save()
data = {
k: row[k] for k in row.keys() if
k not in ['id', 'location_id', 'area', 'district', 'age_month']
}
data = {k: row[k] for k in row.keys() if k not in ["id", "location_id", "area", "district", "age_month"]}
models.PlacementPoint.objects.create(**data)
status.status = "Загрузка данных завершена"
status.save()
@ -526,4 +637,4 @@ def load_data(obj_id: int):
def import_task(file_id):
PointService().start_mathing(file_id)
PointService().make_enrichment()
raschet('service_preplacementpoint', need_time=False, task_name=STATUS_TASK_NAME_IMPORT)
# raschet('service_preplacementpoint', need_time=False, task_name=STATUS_TASK_NAME_IMPORT)

@ -88,9 +88,8 @@ def load_dist(filepath: str):
models.PointDist.objects.create(**row)
def log_to_telegram(msg):
requests.post('https://api.telegram.org/bot6275517704:AAHVp_qv9d9NU740JJdOM2fJdgS4r1AgJrw/sendMessage',
json={"chat_id": "-555238820", "text": str(settings.DOMAIN) + '\n' + msg})
def log(msg):
print(msg)
def cached_func(key, func, timeout=settings.CACHE_TIMEOUT, *args, **kwargs):

@ -343,7 +343,7 @@ class PlacementPointViewSet(ReadOnlyModelViewSet):
@action(detail=False, methods=['get'])
def start(self, request):
raschet.delay()
# raschet.delay()
return Response('Sucess', status=HTTPStatus.OK)
@action(detail=False, methods=['get'])
@ -526,10 +526,11 @@ def upload_houses(request):
@api_view(['GET'])
@permission_classes([UserPermission])
def get_current_user(request):
kk_profile = request.auth
kk_roles = kk_profile.get('realm_access', {}).get('roles', [])
# kk_profile = request.auth
# kk_roles = kk_profile.get('realm_access', {}).get('roles', [])
return JsonResponse(
{'groups': kk_roles, 'username': kk_profile.get('preferred_username')},
# {'groups': kk_roles, 'username': kk_profile.get('preferred_username')},
{'groups': [], 'username': 'AnonymousUser'},
)

Loading…
Cancel
Save