import warnings from http import HTTPStatus from django.db.models import Q from django.http import HttpResponse from django.http import JsonResponse from rest_framework.decorators import action from rest_framework.decorators import api_view from rest_framework.decorators import permission_classes from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response from rest_framework.views import APIView from rest_framework.viewsets import ReadOnlyModelViewSet from postamates.settings import AGE_DAY_BORDER from postamates.settings import EXCEL_EXPORT_FILENAME from postamates.settings import JSON_EXPORT_FILENAME from service import models from service import pagination from service import serializers from service import utils from service.enums import PointStatus from service.permissions import UserPermission from service.service import PointService from service.tasks import raschet from service.utils import load_data from rest_framework.permissions import AllowAny class AOViewSet(ReadOnlyModelViewSet): serializer_class = serializers.AOSerializer queryset = models.AO.objects permission_classes = [AllowAny] class PlacementPointViewSet(ReadOnlyModelViewSet): serializer_class = serializers.PlacementPointSerializer queryset = models.PlacementPoint.objects pagination_class = pagination.MyPagination permission_classes = [UserPermission] def get_queryset(self): qs = self.queryset.all().order_by('id') location_ids = self.request.GET.get('location_ids[]') prediction_first = self.request.GET.get('prediction_first[]') prediction_current = self.request.GET.get('prediction_current[]') age_day = self.request.GET.get('age_day[]') categories = self.request.GET.get('categories[]') status = self.request.GET.get('status[]') delta = self.request.GET.get('delta[]') fact = self.request.GET.get('fact[]') included = self.request.GET.get('included[]') excluded = self.request.GET.get('excluded[]') plan_first = self.request.GET.get('plan_first[]') plan_current = self.request.GET.get('plan_current[]') delta_first = self.request.GET.get('delta_first[]') delta_current = self.request.GET.get('delta_current[]') rayons = self.request.GET.get('area[]') aos = self.request.GET.get('district[]') if location_ids: location_ids = list(location_ids.split(',')) qs = qs.filter(pk__in=location_ids) if prediction_first: prediction_first = list(prediction_first.split(',')) qs = qs.filter(prediction_first__range=prediction_first) if prediction_current: prediction_current = list(prediction_current.split(',')) qs = qs.filter(prediction_current__range=prediction_current) if categories: categories = list(categories.split(',')) qs = qs.filter(category__in=categories) if status: status = list(status.split(',')) qs = qs.filter(status__in=status) if delta: delta = list(delta.split(',')) qs = qs.filter(delta__range=delta) if fact: fact = list(fact.split(',')) qs = qs.filter(fact__range=fact) if plan_first: plan_first = list(plan_first.split(',')) qs = qs.filter(plan_first__range=plan_first) if plan_current: plan_current = list(plan_current.split(',')) qs = qs.filter(plan_current__range=plan_current) if age_day: age_day = list(age_day.split(',')) qs = qs.filter(age_day__range=age_day) if delta_current: delta_current = list(delta_current.split(',')) qs = qs.filter(delta_current__range=delta_current) if delta_first: delta_first = list(delta_first.split(',')) qs = qs.filter(delta_first__range=delta_first) if rayons: rayons = list(rayons.split(',')) qs = qs.filter(area_id__in=rayons) if aos: aos = list(aos.split(',')) qs = qs.filter(district_id__in=aos) if excluded: excluded = list(excluded.split(',')) qs = qs.filter(~Q(pk__in=excluded)) if included: inclded = list(included.split(',')) qs2 = models.PlacementPoint.objects.filter(pk__in=inclded).all() qs = (qs | qs2).distinct() return qs @action(detail=False, methods=['get']) def filters(self, request): qs = self.get_queryset() keys = ( 'age_day', 'prediction_first', 'prediction_current', 'plan_first', 'plan_current', 'fact', 'delta_first', 'delta_current', 'flat_cnt', 'year_bld', 'levels', 'doors', 'flats_cnt', 'popul_home', 'popul_job', 'other_post_cnt', 'target_post_cnt', 'yndxfood_cnt', 'yndxfood_sum', 'yndxfood_cnt_cst', ) temp_data = { key: [ x for x in list(set(qs.values_list(key, flat=True))) if x is not None ] for key in keys } data = { key: [ min(temp_data[key]), max(temp_data[key]), ] if temp_data[key] else [0, 100] for key in keys } return Response(data, status=HTTPStatus.OK) @action(detail=False, methods=['get']) def search_address(self, request): address = self.request.GET.get('address') qs = self.get_queryset() if address: qs = qs.filter(address__icontains=address) qs = qs.distinct() page = self.paginate_queryset(qs) if page is not None: serializer = self.get_serializer(page, many=True) return self.get_paginated_response(serializer.data) serializer = self.get_serializer(qs, many=True) return Response(serializer.data) @action(detail=False, methods=['put']) def update_status(self, request): qs = self.get_queryset() new_status = self.request.GET.get('status') if not new_status or new_status not in [tag.name for tag in PointStatus]: return Response({'message': 'No status'}, HTTPStatus.BAD_REQUEST) PointService.update_points_in_radius(qs, new_status) PointService.update_status(qs, new_status) return Response( {'message': 'status updated'}, status=HTTPStatus.OK, ) @action(detail=False, methods=['put']) def update_fact(self, request): point_id = request.GET.get('postamat_id') fact = request.GET.get('fact') if not point_id or not fact or not fact.isdigit(): return Response(status=HTTPStatus.BAD_REQUEST) qs = models.PlacementPoint.objects.filter(postamat_id=point_id) if not qs: return Response(status=HTTPStatus.NOT_FOUND) for q in qs: if q.age_day < AGE_DAY_BORDER: qs.update(**{'fact': fact, 'fact_raw': fact}) else: new_fact = fact // (q.age_day / AGE_DAY_BORDER) qs.update(fact=new_fact, fact_raw=fact) qs.update(**{'fact_raw': fact}) return Response({'message': 'fact updated'}, status=HTTPStatus.OK) @action(detail=False, methods=['put']) def update_postamat_id(self, request): postamat_id = request.GET.get('postamat_id') point_id = request.GET.get('id') if not point_id or not postamat_id: return Response(status=HTTPStatus.BAD_REQUEST) qs = PointService.get_point_by_id(point_id) if not qs: return Response(status=HTTPStatus.NOT_FOUND) PointService().update_postamat_id(point_id, postamat_id) return Response({'message': 'Postamat id updated'}, status=HTTPStatus.OK) @action(detail=False, methods=['get']) def to_excel(self, request): qs = self.get_queryset() filename = EXCEL_EXPORT_FILENAME res = HttpResponse( PointService.to_excel(qs), content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', ) res['Content-Disposition'] = f'attachment; filename={filename}' return res @action(detail=False, methods=['get']) def to_json(self, request): qs = self.get_queryset() filename = JSON_EXPORT_FILENAME response = HttpResponse(PointService.to_json(qs), content_type='application/json') response['Content-Disposition'] = f'attachment; filename={filename}' return response @action(detail=False, methods=['get']) def get_10k(self, request): pred = PointService.get_first_10_k() return Response({'prediction_current': pred}, status=HTTPStatus.OK) @action(detail=False, methods=['get']) def start(self, request): raschet.delay() return Response('Sucess', status=HTTPStatus.OK) class refresh_placement_points(APIView): @staticmethod def post(request): warnings.filterwarnings('ignore') file = request.FILES['file'] load_data(file) return Response(status=HTTPStatus.OK) class load_ao_and_rayons(APIView): @staticmethod def post(request): warnings.filterwarnings('ignore') file_ao = request.FILES['file_ao'] file_rayon = request.FILES['file_rayon'] utils.load_ao_and_rayons(file_ao, file_rayon) return Response(status=HTTPStatus.OK) @api_view(['POST']) def upload_rivals(request): warnings.filterwarnings('ignore') file_rivals = request.FILES['file_rivals'] utils.load_rivals(file_rivals) return JsonResponse({'message': 'OK'}) @api_view(['POST']) def upload_dist(request): warnings.filterwarnings('ignore') file_dist = request.FILES['file_dist'] utils.load_dist(file_dist) return JsonResponse({'message': 'OK'}) @api_view(['GET']) @permission_classes([IsAuthenticated]) def get_current_user(request): return JsonResponse( {'groups': [gr.name for gr in request.user.groups.all()]}, )