from __future__ import annotations import json import logging import re from copy import deepcopy from datetime import date, datetime from pathlib import Path from typing import Any import curl_cffi import stamina from airflow.decorators import dag, task PROJECT_ROOT = Path(__file__).resolve().parents[1] DATA_DIR = PROJECT_ROOT.parent / "data" CLUSTERS_ROOT_DIR = DATA_DIR / "clusters" ESTIMATIONS_ROOT_DIR = DATA_DIR / "estimations" ESTIMATION_URL = "https://api.cian.ru/price-estimator/v1/get-estimation-and-trend-web/" CLUSTERS_URL = "https://api.cian.ru/search-offers-index-map/v1/get-clusters-for-map/" AREA_PATTERN = re.compile(r"_area_(\d+)\.json$") CLUSTERS_GEOJSON_PATTERN = re.compile(r"^clusters_(\d{4}-\d{2}-\d{2})\.geojson$") def _cluster_payload_template() -> dict[str, Any]: return { "zoom": 15, "bbox": [ { "topLeft": {"lat": 57, "lng": 35.0}, "bottomRight": {"lat": 54, "lng": 40.0}, } ], "jsonQuery": { "_type": "flatsale", "engine_version": {"type": "term", "value": 2}, "region": {"type": "terms", "value": [1]}, "total_area": {"type": "range", "value": {"gte": 30, "lte": 31}}, "only_flat": {"type": "term", "value": True}, "demolished_in_moscow_programm": {"type": "term", "value": False}, "flat_share": {"type": "term", "value": 2}, }, "extended": False, "mapContainerArea": 480000, "subdomain": "www", } def _build_cluster_payload(total_area: int) -> str: payload = _cluster_payload_template() payload["jsonQuery"]["total_area"]["value"]["gte"] = total_area payload["jsonQuery"]["total_area"]["value"]["lte"] = total_area + 1 return json.dumps(payload, separators=(",", ":"), ensure_ascii=True) @stamina.retry(on=Exception, attempts=5) def _request_clusters(payload: str) -> dict[str, Any]: response = curl_cffi.post( CLUSTERS_URL, data=payload, impersonate="chrome", timeout=15, ) if response.status_code != 200: raise RuntimeError(f"HTTP {response.status_code}: {response.text[:300]}") return response.json() def _extract_area_from_filename(path: Path) -> int | None: match = AREA_PATTERN.search(path.name) if not match: return None return int(match.group(1)) def _make_feature(item: dict[str, Any], area: int | None) -> dict[str, Any] | None: coordinates = item.get("coordinates") if not isinstance(coordinates, dict): return None lat = coordinates.get("lat") lng = coordinates.get("lng") if lat is None or lng is None: return None properties = {k: v for k, v in item.items() if k != "coordinates"} properties["area"] = area return { "type": "Feature", "geometry": {"type": "Point", "coordinates": [lng, lat]}, "properties": properties, } @stamina.retry( on=(curl_cffi.requests.RequestsError, Exception), attempts=5, wait_initial=2.0, wait_max=20.0, wait_jitter=1.0, ) def _fetch_estimation(offer_id: int) -> dict[str, Any]: response = curl_cffi.get( ESTIMATION_URL, params={"cianOfferId": offer_id}, impersonate="chrome", timeout=15, ) response.raise_for_status() return response.json() def _save_json(path: Path, payload: dict[str, Any] | list[int]) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") def _load_offer_ids_from_geojson(path: Path) -> list[int]: data = json.loads(path.read_text(encoding="utf-8")) features = data.get("features", []) offer_ids: set[int] = set() for feature in features: properties = feature.get("properties", {}) cluster_ids = properties.get("clusterOfferIds", []) if not isinstance(cluster_ids, list): continue for offer_id in cluster_ids: if isinstance(offer_id, int): offer_ids.add(offer_id) elif isinstance(offer_id, str) and offer_id.isdigit(): offer_ids.add(int(offer_id)) return sorted(offer_ids) def _resolve_latest_clusters_geojson() -> Path: candidates: list[tuple[str, Path]] = [] for path in CLUSTERS_ROOT_DIR.glob("clusters_*.geojson"): match = CLUSTERS_GEOJSON_PATTERN.match(path.name) if match: candidates.append((match.group(1), path)) if not candidates: raise FileNotFoundError(f"No dated clusters geojson found in {CLUSTERS_ROOT_DIR}") return sorted(candidates, key=lambda x: x[0])[-1][1] def _load_estimations_index(estimations_dir: Path) -> dict[int, Path]: index: dict[int, Path] = {} for path in estimations_dir.glob("*.json"): if path.stem.isdigit(): index[int(path.stem)] = path return index @dag( dag_id="cian_clusters_pipeline", schedule=None, start_date=datetime(2026, 1, 1), catchup=False, tags=["cian", "clusters"], ) def cian_clusters_pipeline(): @task def load_clusters() -> dict[str, Any]: run_date = date.today().isoformat() run_dir = CLUSTERS_ROOT_DIR / run_date run_dir.mkdir(parents=True, exist_ok=True) total_areas = range(30, 121) failed_jobs = 0 for total_area in total_areas: filename = run_dir / f"clusters_lng_35.0-40.0_area_{total_area}.json" payload = _build_cluster_payload(total_area) try: data = _request_clusters(payload) filename.write_text( json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8", ) except Exception: failed_jobs += 1 logging.exception("Failed area=%s", total_area) return { "run_date": run_date, "run_dir": run_dir.as_posix(), "areas_total": len(total_areas), "failed_jobs": failed_jobs, } @task def transform_clusters(load_result: dict[str, Any]) -> dict[str, Any]: run_date = str(load_result["run_date"]) run_dir = Path(load_result["run_dir"]) features: list[dict[str, Any]] = [] stats = { "files_total": 0, "filtered_items_total": 0, "features_written": 0, "skipped_no_geometry": 0, } for path in sorted(run_dir.glob("clusters_lng_*-*_area_*.json")): stats["files_total"] += 1 area = _extract_area_from_filename(path) data = json.loads(path.read_text(encoding="utf-8")) filtered_items = data.get("filtered", []) stats["filtered_items_total"] += len(filtered_items) for item in filtered_items: feature = _make_feature(item, area) if feature is None: stats["skipped_no_geometry"] += 1 continue features.append(feature) stats["features_written"] += 1 clusters_geojson_path = CLUSTERS_ROOT_DIR / f"clusters_{run_date}.geojson" clusters_geojson_path.parent.mkdir(parents=True, exist_ok=True) clusters_geojson_path.write_text( json.dumps({"type": "FeatureCollection", "features": features}, ensure_ascii=False), encoding="utf-8", ) return { **stats, "run_date": run_date, "run_dir": run_dir.as_posix(), "clusters_geojson_path": clusters_geojson_path.as_posix(), } transform_clusters(load_clusters()) @dag( dag_id="cian_estimations_pipeline", schedule=None, start_date=datetime(2026, 1, 1), catchup=False, tags=["cian", "estimations"], ) def cian_estimations_pipeline(): @task def load_estimations() -> dict[str, Any]: run_date = date.today().isoformat() run_dir = ESTIMATIONS_ROOT_DIR / run_date run_dir.mkdir(parents=True, exist_ok=True) ids_without_estimations_path = run_dir / f"ids_without_estimations_{run_date}.json" ids_failed_requests_path = run_dir / f"ids_failed_requests_{run_date}.json" clusters_geojson_path = _resolve_latest_clusters_geojson() offer_ids = _load_offer_ids_from_geojson(clusters_geojson_path) ids_without_estimations: list[int] = [] ids_failed_requests: list[int] = [] saved_estimations = 0 for offer_id in offer_ids: try: content = _fetch_estimation(offer_id) except Exception: ids_failed_requests.append(offer_id) logging.exception("Request failed for offer_id=%s", offer_id) continue if content.get("graphs"): _save_json(run_dir.joinpath(f"{offer_id}.json"), content) saved_estimations += 1 else: ids_without_estimations.append(offer_id) _save_json(ids_without_estimations_path, ids_without_estimations) _save_json(ids_failed_requests_path, ids_failed_requests) return { "run_date": run_date, "run_dir": run_dir.as_posix(), "clusters_geojson_path": clusters_geojson_path.as_posix(), "ids_without_estimations_path": ids_without_estimations_path.as_posix(), "ids_failed_requests_path": ids_failed_requests_path.as_posix(), "offer_ids_total": len(offer_ids), "saved_estimations": saved_estimations, "without_estimations": len(ids_without_estimations), "failed_requests": len(ids_failed_requests), } @task def transform_estimations(load_result: dict[str, Any]) -> dict[str, int]: run_date = str(load_result["run_date"]) estimations_dir = Path(load_result["run_dir"]) clusters_geojson_path = Path(load_result["clusters_geojson_path"]) estimations_index = _load_estimations_index(estimations_dir) source = json.loads(clusters_geojson_path.read_text(encoding="utf-8")) source_features = source.get("features", []) result_features: list[dict[str, Any]] = [] stats = { "source_features": len(source_features), "expanded_offer_ids": 0, "features_with_estimations": 0, "features_without_estimations": 0, } for feature in source_features: properties = feature.get("properties", {}) cluster_offer_ids = properties.get("clusterOfferIds", []) if not isinstance(cluster_offer_ids, list): continue for raw_offer_id in cluster_offer_ids: offer_id = int(raw_offer_id) stats["expanded_offer_ids"] += 1 estimation_path = estimations_index.get(offer_id) if estimation_path is None: stats["features_without_estimations"] += 1 continue estimation_payload = json.loads(estimation_path.read_text(encoding="utf-8")) cloned_feature = deepcopy(feature) cloned_feature.setdefault("properties", {}) cloned_feature["properties"]["offer_id"] = offer_id cloned_feature["properties"]["estimation"] = estimation_payload result_features.append(cloned_feature) stats["features_with_estimations"] += 1 features_geojson_path = ESTIMATIONS_ROOT_DIR / f"features_with_estimations_{run_date}.geojson" _save_json(features_geojson_path, {"type": "FeatureCollection", "features": result_features}) return stats transform_estimations(load_estimations()) clusters_pipeline = cian_clusters_pipeline() estimations_pipeline = cian_estimations_pipeline()