from io import BytesIO import pandas as pd from django.contrib.gis.measure import Distance from django.db.models import F from postamates.settings import DEFAULT_PLACEMENT_POINT_UPDATE_RADIUS from service import models from service.enums import PointStatus from service.tasks import raschet class PointService: def update_fact(self, postamat_id: str, fact: int): qs = self.get_point_by_postamat_id(postamat_id) qs.update(**{'fact': fact}) def update_postamat_id(self, point_id: int, postamat_id: str): qs = self.get_point_by_id(point_id) qs.update(**{'postamat_id': postamat_id}) @staticmethod def update_points_in_radius(qs: models.PlacementPoint, new_status: str): for point in qs: if new_status == PointStatus.Installation.name: if point.status == PointStatus.Pending.name: pnts = models.PlacementPoint.objects.filter( geometry__distance_lt=(point.geometry, Distance(m=DEFAULT_PLACEMENT_POINT_UPDATE_RADIUS)), ) pnts.update(target_post_cnt=F('target_post_cnt') + 1) raschet.delay() elif new_status == PointStatus.Cancelled.name or new_status == PointStatus.Pending.name: if point.status == PointStatus.Installation.name: pnts = models.PlacementPoint.objects.filter( geometry__distance_lt=(point.geometry, Distance(m=DEFAULT_PLACEMENT_POINT_UPDATE_RADIUS)), ) pnts.update(target_post_cnt=F('target_post_cnt') - 1 if F('target_post_cnt') != 0 else 0) raschet.delay() @staticmethod def update_status(qs: models.PlacementPoint, new_status: str) -> models.PlacementPoint: qs.update(**{'status': new_status}) @staticmethod def get_point_by_id(point_id: int): return models.PlacementPoint.objects.filter(pk=point_id) @staticmethod def get_point_by_postamat_id(postamat_id: str): return models.PlacementPoint.objects.filter(postamat_id=postamat_id) @staticmethod def to_excel(qs: models.PlacementPoint): data = pd.DataFrame(list(qs.values())) data['start_date'] = data['start_date'].dt.tz_localize(None) data['sample_trn'] = data['sample_trn'].astype(int) with BytesIO() as b: with pd.ExcelWriter(b) as writer: data.to_excel( writer, sheet_name='Placement Points', index=False, ) return b.getvalue() @staticmethod def get_first_10_k(): if models.PlacementPoint.objects.count() > 10000: qs = models.PlacementPoint.objects.order_by('-prediction_current').all()[10000] return qs.prediction_current else: return models.PlacementPoint.objects.order_by('prediction_current').first().prediction_current