From 862398d2cff21cb0ab878997e2ecdda609d09eba Mon Sep 17 00:00:00 2001 From: timofejmalinin Date: Mon, 15 May 2023 12:40:10 +0400 Subject: [PATCH] Logs to periodic task --- service/tasks.py | 372 ++++++++++++++++++++++++----------------------- 1 file changed, 187 insertions(+), 185 deletions(-) diff --git a/service/tasks.py b/service/tasks.py index f3d0f4c..4ee9b28 100644 --- a/service/tasks.py +++ b/service/tasks.py @@ -37,195 +37,197 @@ def raschet(): ) except: log_to_telegram('error connect to db') + try: + query = text('select * from service_placementpoint') + connection = conn.connect() + pts = pd.read_sql(query, connection) + pts['geometry'] = pts['geometry'].apply(wkb.loads, hex=True) + pts = gpd.GeoDataFrame(pts, geometry='geometry', crs='epsg:4326') + pts = pts.to_crs('epsg:32637') + pts = pts.rename( + columns={ + 'target_cnt_nearby_mean': 'target_dist1', + 'target_age_nearby_mean': 'target_dist2', + 'yndxfood_cnt_cst': 'target_dist3', + }, + ) - query = text('select * from service_placementpoint') - connection = conn.connect() - pts = pd.read_sql(query, connection) - pts['geometry'] = pts['geometry'].apply(wkb.loads, hex=True) - pts = gpd.GeoDataFrame(pts, geometry='geometry', crs='epsg:4326') - pts = pts.to_crs('epsg:32637') - pts = pts.rename( - columns={ - 'target_cnt_nearby_mean': 'target_dist1', - 'target_age_nearby_mean': 'target_dist2', - 'yndxfood_cnt_cst': 'target_dist3', - }, - ) - - feats = [ - 'id', 'metro_dist', 'target_dist', 'property_price_bargains', 'property_price_offers', - 'property_mean_floor', - 'property_era', 'flats_cnt_2', 'flats_cnt', 'popul_home', 'popul_job', 'other_post_cnt', 'yndxfood_sum', - 'yndxfood_cnt', 'school_cnt', 'kindergar_cnt', 'target_post_cnt', 'public_stop_cnt', 'sport_center_cnt', - 'pharmacy_cnt', 'supermarket_cnt', 'supermarket_premium_cnt', 'clinic_cnt', 'bank_cnt', 'reca_cnt', - 'lab_cnt', 'culture_cnt', 'attraction_cnt', 'mfc_cnt', 'bc_cnt', 'tc_cnt', 'rival_pvz_cnt', - 'rival_post_cnt', - 'business_activity', 'age_day', 'target_cnt_ao_mean', 'target_dist1', 'target_dist2', 'target_dist3', - ] - - # Записи для обучения - pts_trn = pts.loc[pts.sample_trn == True].reset_index(drop=True) - pts_trn = gpd.GeoDataFrame(pts_trn, geometry='geometry', crs='epsg:32637') - pts_target = pts_trn[['geometry']] - pts_target['cnt'] = 1 - pts_target = gpd.GeoDataFrame(pts_target, geometry='geometry', crs='epsg:32637') - - target_feature_coords = [] - for i in range(0, len(pts_target)): - target_feature_coords.append((pts_target.geometry.x[i], pts_target.geometry.y[i])) - target_feature_coords = np.array(target_feature_coords) - - pts_trn['target_dist'] = pts_trn.apply( - lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[1])), - axis=1, - ) - pts_trn.loc[pts_trn.target_dist > 700, 'target_dist'] = 700 - - pts_trn['target_dist1'] = pts_trn.apply( - lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[2])), - axis=1, - ) - pts_trn.loc[pts_trn.target_dist1 > 700, 'target_dist1'] = 700 - - pts_trn['target_dist2'] = pts_trn.apply( - lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[3])), - axis=1, - ) - pts_trn.loc[pts_trn.target_dist2 > 700, 'target_dist2'] = 700 - - pts_trn['target_dist3'] = pts_trn.apply( - lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[4])), - axis=1, - ) - pts_trn.loc[pts_trn.target_dist3 > 700, 'target_dist3'] = 700 - - pts_trn['buf'] = pts_trn.buffer(500) - pts_trn = gpd.GeoDataFrame(pts_trn, geometry='buf', crs='epsg:32637') - target_post = gpd.sjoin(pts_trn, pts_target, op='contains').groupby('id', as_index=False).agg({'cnt': 'count'}) - target_post = target_post.rename(columns={'cnt': 'target_post_cnt'}) - pts_trn = pts_trn.drop(columns=['target_post_cnt']) - pts_trn = pts_trn.join(target_post.set_index('id'), on='id') - pts_trn['target_post_cnt'] = pts_trn['target_post_cnt'] - 1 - pts_trn = pts_trn.sort_values(by='id').reset_index(drop=True) - X_trn = pts_trn[feats].drop(columns=['id']) - Y_trn = pts_trn[['fact']] - - # Записи для инференса - pts_inf = pts.loc[(pts.status == 'Pending') | - (pts.status == 'Installation') | - (pts.status == 'Cancelled') | - ((pts.status == 'Working') & (pts.sample_trn == False))].reset_index(drop=True) - pts_inf = gpd.GeoDataFrame(pts_inf, geometry='geometry', crs='epsg:32637') - - pts_inf['buf'] = pts_inf.buffer(500) - pts_inf = gpd.GeoDataFrame(pts_inf, geometry='buf', crs='epsg:32637') - pts_target = pts.loc[(pts.status == 'Working') | - (pts.status == 'Installation') | - (pts.sample_trn == True)].reset_index(drop=True) - pts_target = pts_target[['geometry']] - pts_target['cnt'] = 1 - pts_target = gpd.GeoDataFrame(pts_target, geometry='geometry', crs='epsg:32637') - - target_feature_coords = [] - for i in range(0, len(pts_target)): - target_feature_coords.append((pts_target.geometry.x[i], pts_target.geometry.y[i])) - target_feature_coords = np.array(target_feature_coords) - - pts_inf['target_dist'] = pts_inf.apply( - lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[0])), - axis=1, - ) - pts_inf.loc[pts_inf.target_dist > 700, 'target_dist'] = 700 - - pts_inf['target_dist1'] = pts_inf.apply( - lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[1])), - axis=1, - ) - pts_inf.loc[pts_inf.target_dist1 > 700, 'target_dist1'] = 700 - - pts_inf['target_dist2'] = pts_inf.apply( - lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[2])), - axis=1, - ) - pts_inf.loc[pts_inf.target_dist2 > 700, 'target_dist2'] = 700 - - pts_inf['target_dist3'] = pts_inf.apply( - lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[3])), - axis=1, - ) - pts_inf.loc[pts_inf.target_dist3 > 700, 'target_dist3'] = 700 - - pts_inf = pts_inf.sort_values(by='id').reset_index(drop=True) - target_post = gpd.sjoin(pts_inf, pts_target, op='contains').groupby('id', as_index=False).agg({'cnt': 'count'}) - target_post = target_post.rename(columns={'cnt': 'target_post_cnt'}) - pts_inf = pts_inf.drop(columns=['target_post_cnt']) - pts_inf = pts_inf.join(target_post.set_index('id'), on='id') - pts_inf['age_day_init'] = pts_inf['age_day'] - pts_inf['age_day'] = 240 - X_inf = pts_inf[feats] - - seeds = [3, 99, 87, 21, 15] - - # Обучение, инференс - r2_scores = [] - mapes = [] - y_infers = [] - - for i in seeds: - x_trn, x_test, y_trn, y_test = ms.train_test_split(X_trn, Y_trn, test_size=0.2, random_state=i) - model = catboost.CatBoostRegressor(cat_features=['property_era'], random_state=i) - model.fit(x_trn, y_trn, verbose=False) - r2_score = metrics.r2_score(y_test, model.predict(x_test)) - mape = metrics.mean_absolute_percentage_error(y_test, model.predict(x_test)) - if ((r2_score > 0.45) & (mape < 0.25)): - r2_scores.append(r2_score) - mapes.append(mape) - y_infers.append(model.predict(X_inf.drop(columns=['id']))) - - current_pred = sum(y_infers) / 5 - - # Обновление полей по результатам работы модели - update_fields = pts_inf[ - [ - 'id', 'age_day_init', 'status', 'fact', 'delta_current', 'delta_first', 'plan_current', 'plan_first', - 'prediction_first', + feats = [ + 'id', 'metro_dist', 'target_dist', 'property_price_bargains', 'property_price_offers', + 'property_mean_floor', + 'property_era', 'flats_cnt_2', 'flats_cnt', 'popul_home', 'popul_job', 'other_post_cnt', 'yndxfood_sum', + 'yndxfood_cnt', 'school_cnt', 'kindergar_cnt', 'target_post_cnt', 'public_stop_cnt', 'sport_center_cnt', + 'pharmacy_cnt', 'supermarket_cnt', 'supermarket_premium_cnt', 'clinic_cnt', 'bank_cnt', 'reca_cnt', + 'lab_cnt', 'culture_cnt', 'attraction_cnt', 'mfc_cnt', 'bc_cnt', 'tc_cnt', 'rival_pvz_cnt', + 'rival_post_cnt', + 'business_activity', 'age_day', 'target_cnt_ao_mean', 'target_dist1', 'target_dist2', 'target_dist3', ] - ] - update_fields = update_fields.join( - pd.concat( + + # Записи для обучения + pts_trn = pts.loc[pts.sample_trn == True].reset_index(drop=True) + pts_trn = gpd.GeoDataFrame(pts_trn, geometry='geometry', crs='epsg:32637') + pts_target = pts_trn[['geometry']] + pts_target['cnt'] = 1 + pts_target = gpd.GeoDataFrame(pts_target, geometry='geometry', crs='epsg:32637') + + target_feature_coords = [] + for i in range(0, len(pts_target)): + target_feature_coords.append((pts_target.geometry.x[i], pts_target.geometry.y[i])) + target_feature_coords = np.array(target_feature_coords) + + pts_trn['target_dist'] = pts_trn.apply( + lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[1])), + axis=1, + ) + pts_trn.loc[pts_trn.target_dist > 700, 'target_dist'] = 700 + + pts_trn['target_dist1'] = pts_trn.apply( + lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[2])), + axis=1, + ) + pts_trn.loc[pts_trn.target_dist1 > 700, 'target_dist1'] = 700 + + pts_trn['target_dist2'] = pts_trn.apply( + lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[3])), + axis=1, + ) + pts_trn.loc[pts_trn.target_dist2 > 700, 'target_dist2'] = 700 + + pts_trn['target_dist3'] = pts_trn.apply( + lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[4])), + axis=1, + ) + pts_trn.loc[pts_trn.target_dist3 > 700, 'target_dist3'] = 700 + + pts_trn['buf'] = pts_trn.buffer(500) + pts_trn = gpd.GeoDataFrame(pts_trn, geometry='buf', crs='epsg:32637') + target_post = gpd.sjoin(pts_trn, pts_target, op='contains').groupby('id', as_index=False).agg({'cnt': 'count'}) + target_post = target_post.rename(columns={'cnt': 'target_post_cnt'}) + pts_trn = pts_trn.drop(columns=['target_post_cnt']) + pts_trn = pts_trn.join(target_post.set_index('id'), on='id') + pts_trn['target_post_cnt'] = pts_trn['target_post_cnt'] - 1 + pts_trn = pts_trn.sort_values(by='id').reset_index(drop=True) + X_trn = pts_trn[feats].drop(columns=['id']) + Y_trn = pts_trn[['fact']] + + # Записи для инференса + pts_inf = pts.loc[(pts.status == 'Pending') | + (pts.status == 'Installation') | + (pts.status == 'Cancelled') | + ((pts.status == 'Working') & (pts.sample_trn == False))].reset_index(drop=True) + pts_inf = gpd.GeoDataFrame(pts_inf, geometry='geometry', crs='epsg:32637') + + pts_inf['buf'] = pts_inf.buffer(500) + pts_inf = gpd.GeoDataFrame(pts_inf, geometry='buf', crs='epsg:32637') + pts_target = pts.loc[(pts.status == 'Working') | + (pts.status == 'Installation') | + (pts.sample_trn == True)].reset_index(drop=True) + pts_target = pts_target[['geometry']] + pts_target['cnt'] = 1 + pts_target = gpd.GeoDataFrame(pts_target, geometry='geometry', crs='epsg:32637') + + target_feature_coords = [] + for i in range(0, len(pts_target)): + target_feature_coords.append((pts_target.geometry.x[i], pts_target.geometry.y[i])) + target_feature_coords = np.array(target_feature_coords) + + pts_inf['target_dist'] = pts_inf.apply( + lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[0])), + axis=1, + ) + pts_inf.loc[pts_inf.target_dist > 700, 'target_dist'] = 700 + + pts_inf['target_dist1'] = pts_inf.apply( + lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[1])), + axis=1, + ) + pts_inf.loc[pts_inf.target_dist1 > 700, 'target_dist1'] = 700 + + pts_inf['target_dist2'] = pts_inf.apply( + lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[2])), + axis=1, + ) + pts_inf.loc[pts_inf.target_dist2 > 700, 'target_dist2'] = 700 + + pts_inf['target_dist3'] = pts_inf.apply( + lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[3])), + axis=1, + ) + pts_inf.loc[pts_inf.target_dist3 > 700, 'target_dist3'] = 700 + + pts_inf = pts_inf.sort_values(by='id').reset_index(drop=True) + target_post = gpd.sjoin(pts_inf, pts_target, op='contains').groupby('id', as_index=False).agg({'cnt': 'count'}) + target_post = target_post.rename(columns={'cnt': 'target_post_cnt'}) + pts_inf = pts_inf.drop(columns=['target_post_cnt']) + pts_inf = pts_inf.join(target_post.set_index('id'), on='id') + pts_inf['age_day_init'] = pts_inf['age_day'] + pts_inf['age_day'] = 240 + X_inf = pts_inf[feats] + + seeds = [3, 99, 87, 21, 15] + + # Обучение, инференс + r2_scores = [] + mapes = [] + y_infers = [] + + for i in seeds: + x_trn, x_test, y_trn, y_test = ms.train_test_split(X_trn, Y_trn, test_size=0.2, random_state=i) + model = catboost.CatBoostRegressor(cat_features=['property_era'], random_state=i) + model.fit(x_trn, y_trn, verbose=False) + r2_score = metrics.r2_score(y_test, model.predict(x_test)) + mape = metrics.mean_absolute_percentage_error(y_test, model.predict(x_test)) + if ((r2_score > 0.45) & (mape < 0.25)): + r2_scores.append(r2_score) + mapes.append(mape) + y_infers.append(model.predict(X_inf.drop(columns=['id']))) + + current_pred = sum(y_infers) / 5 + + # Обновление полей по результатам работы модели + update_fields = pts_inf[ [ - X_inf[['id']], - pd.DataFrame({'prediction_current': current_pred}), - ], + 'id', 'age_day_init', 'status', 'fact', 'delta_current', 'delta_first', 'plan_current', 'plan_first', + 'prediction_first', + ] + ] + update_fields = update_fields.join( + pd.concat( + [ + X_inf[['id']], + pd.DataFrame({'prediction_current': current_pred}), + ], + axis=1, + ).set_index('id'), + on='id', + ) + update_fields['prediction_current'] = update_fields['prediction_current'].astype(int) + + days_x = np.array([0, 30, 60, 90, 120, 150, 180, 210, 240, 270]) + perc_y = np.array([0, 0.15, 0.20, 0.30, 0.60, 0.70, 0.70, 0.75, 0.75, 0.80]) + spl = interpolate.splrep(days_x, perc_y) + + update_fields['plan_first'] = update_fields.apply( + lambda x: (x.prediction_first * interpolate.splev(x.age_day_init, spl) if x.status == 'Working' else 0), + axis=1, + ) + update_fields['plan_current'] = update_fields.apply( + lambda x: (x.prediction_current * interpolate.splev(x.age_day_init, spl) if x.status == 'Working' else 0), + axis=1, + ) + update_fields['delta_first'] = update_fields.apply( + lambda x: ((x.fact - x.plan_first) / x.plan_first * 100 if x.status == 'Working' else 0), axis=1, - ).set_index('id'), - on='id', - ) - update_fields['prediction_current'] = update_fields['prediction_current'].astype(int) - - days_x = np.array([0, 30, 60, 90, 120, 150, 180, 210, 240, 270]) - perc_y = np.array([0, 0.15, 0.20, 0.30, 0.60, 0.70, 0.70, 0.75, 0.75, 0.80]) - spl = interpolate.splrep(days_x, perc_y) - - update_fields['plan_first'] = update_fields.apply( - lambda x: (x.prediction_first * interpolate.splev(x.age_day_init, spl) if x.status == 'Working' else 0), - axis=1, - ) - update_fields['plan_current'] = update_fields.apply( - lambda x: (x.prediction_current * interpolate.splev(x.age_day_init, spl) if x.status == 'Working' else 0), - axis=1, - ) - update_fields['delta_first'] = update_fields.apply( - lambda x: ((x.fact - x.plan_first) / x.plan_first * 100 if x.status == 'Working' else 0), - axis=1, - ) - update_fields['delta_current'] = update_fields.apply( - lambda x: ((x.fact - x.plan_current) / x.plan_current * 100 if x.status == 'Working' else 0), - axis=1, - ) - - update_fields_working = update_fields.loc[update_fields.status == 'Working'].reset_index(drop=True) - update_fields_working = update_fields_working.fillna(0) + ) + update_fields['delta_current'] = update_fields.apply( + lambda x: ((x.fact - x.plan_current) / x.plan_current * 100 if x.status == 'Working' else 0), + axis=1, + ) + + update_fields_working = update_fields.loc[update_fields.status == 'Working'].reset_index(drop=True) + update_fields_working = update_fields_working.fillna(0) + except Exception as e: + log_to_telegram(f'Ошибка при обновлении полей в базе данных: {e}') log_to_telegram('Начинается обновление полей в базе') # Загрузка в базу обновленных значений try: