From 83d3d7724ac3d07d7f6088f6fdf1579c09179986 Mon Sep 17 00:00:00 2001 From: gman Date: Tue, 28 Apr 2026 21:49:05 +0300 Subject: [PATCH] cian --- README.md | 1 + cian_pipelines.py | 329 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 330 insertions(+) create mode 100644 cian_pipelines.py diff --git a/README.md b/README.md index 862ebf8..a9723ce 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,3 @@ Python 3.13.11 + Airflow 3.1.7 \ No newline at end of file diff --git a/cian_pipelines.py b/cian_pipelines.py new file mode 100644 index 0000000..48d6a83 --- /dev/null +++ b/cian_pipelines.py @@ -0,0 +1,329 @@ +from __future__ import annotations + +import json +import logging +import re +from copy import deepcopy +from datetime import date, datetime +from pathlib import Path +from typing import Any + +import curl_cffi +import stamina +from airflow.decorators import dag, task + + +PROJECT_ROOT = Path(__file__).resolve().parents[1] +DATA_DIR = PROJECT_ROOT.parent / "data" +CLUSTERS_ROOT_DIR = DATA_DIR / "clusters" +ESTIMATIONS_ROOT_DIR = DATA_DIR / "estimations" +ESTIMATION_URL = "https://api.cian.ru/price-estimator/v1/get-estimation-and-trend-web/" +CLUSTERS_URL = "https://api.cian.ru/search-offers-index-map/v1/get-clusters-for-map/" +AREA_PATTERN = re.compile(r"_area_(\d+)\.json$") +CLUSTERS_GEOJSON_PATTERN = re.compile(r"^clusters_(\d{4}-\d{2}-\d{2})\.geojson$") + + +def _cluster_payload_template() -> dict[str, Any]: + return { + "zoom": 15, + "bbox": [ + { + "topLeft": {"lat": 57, "lng": 35.0}, + "bottomRight": {"lat": 54, "lng": 40.0}, + } + ], + "jsonQuery": { + "_type": "flatsale", + "engine_version": {"type": "term", "value": 2}, + "region": {"type": "terms", "value": [1]}, + "total_area": {"type": "range", "value": {"gte": 30, "lte": 31}}, + "only_flat": {"type": "term", "value": True}, + "demolished_in_moscow_programm": {"type": "term", "value": False}, + "flat_share": {"type": "term", "value": 2}, + }, + "extended": False, + "mapContainerArea": 480000, + "subdomain": "www", + } + + +def _build_cluster_payload(total_area: int) -> str: + payload = _cluster_payload_template() + payload["jsonQuery"]["total_area"]["value"]["gte"] = total_area + payload["jsonQuery"]["total_area"]["value"]["lte"] = total_area + 1 + return json.dumps(payload, separators=(",", ":"), ensure_ascii=True) + + +@stamina.retry(on=Exception, attempts=5) +def _request_clusters(payload: str) -> dict[str, Any]: + response = curl_cffi.post( + CLUSTERS_URL, + data=payload, + impersonate="chrome", + timeout=15, + ) + if response.status_code != 200: + raise RuntimeError(f"HTTP {response.status_code}: {response.text[:300]}") + return response.json() + + +def _extract_area_from_filename(path: Path) -> int | None: + match = AREA_PATTERN.search(path.name) + if not match: + return None + return int(match.group(1)) + + +def _make_feature(item: dict[str, Any], area: int | None) -> dict[str, Any] | None: + coordinates = item.get("coordinates") + if not isinstance(coordinates, dict): + return None + + lat = coordinates.get("lat") + lng = coordinates.get("lng") + if lat is None or lng is None: + return None + + properties = {k: v for k, v in item.items() if k != "coordinates"} + properties["area"] = area + return { + "type": "Feature", + "geometry": {"type": "Point", "coordinates": [lng, lat]}, + "properties": properties, + } + + +@stamina.retry( + on=(curl_cffi.requests.RequestsError, Exception), + attempts=5, + wait_initial=2.0, + wait_max=20.0, + wait_jitter=1.0, +) +def _fetch_estimation(offer_id: int) -> dict[str, Any]: + response = curl_cffi.get( + ESTIMATION_URL, + params={"cianOfferId": offer_id}, + impersonate="chrome", + timeout=15, + ) + response.raise_for_status() + return response.json() + + +def _save_json(path: Path, payload: dict[str, Any] | list[int]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") + + +def _load_offer_ids_from_geojson(path: Path) -> list[int]: + data = json.loads(path.read_text(encoding="utf-8")) + features = data.get("features", []) + offer_ids: set[int] = set() + for feature in features: + properties = feature.get("properties", {}) + cluster_ids = properties.get("clusterOfferIds", []) + if not isinstance(cluster_ids, list): + continue + for offer_id in cluster_ids: + if isinstance(offer_id, int): + offer_ids.add(offer_id) + elif isinstance(offer_id, str) and offer_id.isdigit(): + offer_ids.add(int(offer_id)) + return sorted(offer_ids) + + +def _resolve_latest_clusters_geojson() -> Path: + candidates: list[tuple[str, Path]] = [] + for path in CLUSTERS_ROOT_DIR.glob("clusters_*.geojson"): + match = CLUSTERS_GEOJSON_PATTERN.match(path.name) + if match: + candidates.append((match.group(1), path)) + if not candidates: + raise FileNotFoundError(f"No dated clusters geojson found in {CLUSTERS_ROOT_DIR}") + return sorted(candidates, key=lambda x: x[0])[-1][1] + + +def _load_estimations_index(estimations_dir: Path) -> dict[int, Path]: + index: dict[int, Path] = {} + for path in estimations_dir.glob("*.json"): + if path.stem.isdigit(): + index[int(path.stem)] = path + return index + + +@dag( + dag_id="cian_clusters_pipeline", + schedule=None, + start_date=datetime(2026, 1, 1), + catchup=False, + tags=["cian", "clusters"], +) +def cian_clusters_pipeline(): + @task + def load_clusters() -> dict[str, Any]: + run_date = date.today().isoformat() + run_dir = CLUSTERS_ROOT_DIR / run_date + run_dir.mkdir(parents=True, exist_ok=True) + total_areas = range(30, 121) + failed_jobs = 0 + for total_area in total_areas: + filename = run_dir / f"clusters_lng_35.0-40.0_area_{total_area}.json" + payload = _build_cluster_payload(total_area) + try: + data = _request_clusters(payload) + filename.write_text( + json.dumps(data, ensure_ascii=False, indent=2), + encoding="utf-8", + ) + except Exception: + failed_jobs += 1 + logging.exception("Failed area=%s", total_area) + return { + "run_date": run_date, + "run_dir": run_dir.as_posix(), + "areas_total": len(total_areas), + "failed_jobs": failed_jobs, + } + + @task + def transform_clusters(load_result: dict[str, Any]) -> dict[str, Any]: + run_date = str(load_result["run_date"]) + run_dir = Path(load_result["run_dir"]) + features: list[dict[str, Any]] = [] + stats = { + "files_total": 0, + "filtered_items_total": 0, + "features_written": 0, + "skipped_no_geometry": 0, + } + for path in sorted(run_dir.glob("clusters_lng_*-*_area_*.json")): + stats["files_total"] += 1 + area = _extract_area_from_filename(path) + data = json.loads(path.read_text(encoding="utf-8")) + filtered_items = data.get("filtered", []) + stats["filtered_items_total"] += len(filtered_items) + for item in filtered_items: + feature = _make_feature(item, area) + if feature is None: + stats["skipped_no_geometry"] += 1 + continue + features.append(feature) + stats["features_written"] += 1 + + clusters_geojson_path = CLUSTERS_ROOT_DIR / f"clusters_{run_date}.geojson" + clusters_geojson_path.parent.mkdir(parents=True, exist_ok=True) + clusters_geojson_path.write_text( + json.dumps({"type": "FeatureCollection", "features": features}, ensure_ascii=False), + encoding="utf-8", + ) + return { + **stats, + "run_date": run_date, + "run_dir": run_dir.as_posix(), + "clusters_geojson_path": clusters_geojson_path.as_posix(), + } + + transform_clusters(load_clusters()) + + +@dag( + dag_id="cian_estimations_pipeline", + schedule=None, + start_date=datetime(2026, 1, 1), + catchup=False, + tags=["cian", "estimations"], +) +def cian_estimations_pipeline(): + @task + def load_estimations() -> dict[str, Any]: + run_date = date.today().isoformat() + run_dir = ESTIMATIONS_ROOT_DIR / run_date + run_dir.mkdir(parents=True, exist_ok=True) + ids_without_estimations_path = run_dir / f"ids_without_estimations_{run_date}.json" + ids_failed_requests_path = run_dir / f"ids_failed_requests_{run_date}.json" + + clusters_geojson_path = _resolve_latest_clusters_geojson() + offer_ids = _load_offer_ids_from_geojson(clusters_geojson_path) + ids_without_estimations: list[int] = [] + ids_failed_requests: list[int] = [] + saved_estimations = 0 + + for offer_id in offer_ids: + try: + content = _fetch_estimation(offer_id) + except Exception: + ids_failed_requests.append(offer_id) + logging.exception("Request failed for offer_id=%s", offer_id) + continue + + if content.get("graphs"): + _save_json(run_dir.joinpath(f"{offer_id}.json"), content) + saved_estimations += 1 + else: + ids_without_estimations.append(offer_id) + + _save_json(ids_without_estimations_path, ids_without_estimations) + _save_json(ids_failed_requests_path, ids_failed_requests) + + return { + "run_date": run_date, + "run_dir": run_dir.as_posix(), + "clusters_geojson_path": clusters_geojson_path.as_posix(), + "ids_without_estimations_path": ids_without_estimations_path.as_posix(), + "ids_failed_requests_path": ids_failed_requests_path.as_posix(), + "offer_ids_total": len(offer_ids), + "saved_estimations": saved_estimations, + "without_estimations": len(ids_without_estimations), + "failed_requests": len(ids_failed_requests), + } + + @task + def transform_estimations(load_result: dict[str, Any]) -> dict[str, int]: + run_date = str(load_result["run_date"]) + estimations_dir = Path(load_result["run_dir"]) + clusters_geojson_path = Path(load_result["clusters_geojson_path"]) + estimations_index = _load_estimations_index(estimations_dir) + source = json.loads(clusters_geojson_path.read_text(encoding="utf-8")) + source_features = source.get("features", []) + + result_features: list[dict[str, Any]] = [] + stats = { + "source_features": len(source_features), + "expanded_offer_ids": 0, + "features_with_estimations": 0, + "features_without_estimations": 0, + } + + for feature in source_features: + properties = feature.get("properties", {}) + cluster_offer_ids = properties.get("clusterOfferIds", []) + if not isinstance(cluster_offer_ids, list): + continue + + for raw_offer_id in cluster_offer_ids: + offer_id = int(raw_offer_id) + stats["expanded_offer_ids"] += 1 + + estimation_path = estimations_index.get(offer_id) + if estimation_path is None: + stats["features_without_estimations"] += 1 + continue + + estimation_payload = json.loads(estimation_path.read_text(encoding="utf-8")) + cloned_feature = deepcopy(feature) + cloned_feature.setdefault("properties", {}) + cloned_feature["properties"]["offer_id"] = offer_id + cloned_feature["properties"]["estimation"] = estimation_payload + result_features.append(cloned_feature) + stats["features_with_estimations"] += 1 + + features_geojson_path = ESTIMATIONS_ROOT_DIR / f"features_with_estimations_{run_date}.geojson" + _save_json(features_geojson_path, {"type": "FeatureCollection", "features": result_features}) + return stats + + transform_estimations(load_estimations()) + + +clusters_pipeline = cian_clusters_pipeline() +estimations_pipeline = cian_estimations_pipeline()