diff --git a/service/tasks.py b/service/tasks.py index bb0906d..aaed630 100644 --- a/service/tasks.py +++ b/service/tasks.py @@ -133,192 +133,196 @@ def raschet(table_name='service_placementpoint'): pts_inf = pts_inf.loc[pts_inf.matching_status == 'New'].reset_index(drop=True) pts_inf['geometry'] = pts_inf['geometry'].apply(wkb.loads, hex=True) pts_inf = gpd.GeoDataFrame(pts_inf, geometry='geometry', crs='epsg:4326') - pts_inf = pts_inf.to_crs('epsg:32637') - - pts_inf['buf'] = pts_inf.buffer(500) - pts_inf = gpd.GeoDataFrame(pts_inf, geometry='buf', crs='epsg:32637') - pts_target = pts.loc[(pts.status == 'Working') | - (pts.status == 'Installation') | - (pts.sample_trn == True)].reset_index(drop=True) - pts_target = pts_target[['geometry']] - pts_target['cnt'] = 1 - pts_target = gpd.GeoDataFrame(pts_target, geometry='geometry', crs='epsg:32637') - - target_feature_coords = [] - for i in range(0, len(pts_target)): - target_feature_coords.append((pts_target.geometry.x[i], pts_target.geometry.y[i])) - target_feature_coords = np.array(target_feature_coords) - - pts_inf['target_dist'] = pts_inf.apply( - lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[0])), - axis=1, - ) - pts_inf.loc[pts_inf.target_dist > 700, 'target_dist'] = 700 - - pts_inf = pts_inf.sort_values(by='id').reset_index(drop=True) - target_post = gpd.sjoin(pts_inf, pts_target, op='contains').groupby('id', as_index=False).agg({'cnt': 'count'}) - target_post = target_post.rename(columns={'cnt': 'target_post_cnt'}) - pts_inf = pts_inf.drop(columns=['target_post_cnt']) - pts_inf = pts_inf.join(target_post.set_index('id'), on='id') - pts_inf['age_day_init'] = pts_inf['age_day'] - pts_inf['age_day'] = 240 - X_inf = pts_inf[feats] - - seeds = [3, 99, 87, 21, 15] - - # Обучение, инференс - r2_scores = [] - mapes = [] - y_infers = [] - status.status = 'Обучение inference 0%' - status.save() - for i in seeds: - status.status = 'Обучение inference: ' + str(int((seeds.index(i) + 1) / len(seeds) * 100)) + '%' + if len(pts_inf) > 0: + pts_inf = pts_inf.to_crs('epsg:32637') + + pts_inf['buf'] = pts_inf.buffer(500) + pts_inf = gpd.GeoDataFrame(pts_inf, geometry='buf', crs='epsg:32637') + pts_target = pts.loc[(pts.status == 'Working') | + (pts.status == 'Installation') | + (pts.sample_trn == True)].reset_index(drop=True) + pts_target = pts_target[['geometry']] + pts_target['cnt'] = 1 + pts_target = gpd.GeoDataFrame(pts_target, geometry='geometry', crs='epsg:32637') + + target_feature_coords = [] + for i in range(0, len(pts_target)): + target_feature_coords.append((pts_target.geometry.x[i], pts_target.geometry.y[i])) + target_feature_coords = np.array(target_feature_coords) + + pts_inf['target_dist'] = pts_inf.apply( + lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[0])), + axis=1, + ) + pts_inf.loc[pts_inf.target_dist > 700, 'target_dist'] = 700 + + pts_inf = pts_inf.sort_values(by='id').reset_index(drop=True) + target_post = gpd.sjoin(pts_inf, pts_target, op='contains').groupby('id', as_index=False).agg({'cnt': 'count'}) + target_post = target_post.rename(columns={'cnt': 'target_post_cnt'}) + pts_inf = pts_inf.drop(columns=['target_post_cnt']) + pts_inf = pts_inf.join(target_post.set_index('id'), on='id') + pts_inf['age_day_init'] = pts_inf['age_day'] + pts_inf['age_day'] = 240 + X_inf = pts_inf[feats] + + seeds = [3, 99, 87, 21, 15] + + # Обучение, инференс + r2_scores = [] + mapes = [] + y_infers = [] + status.status = 'Обучение inference 0%' status.save() - x_trn, x_test, y_trn, y_test = ms.train_test_split(X_trn, Y_trn, test_size=0.2, random_state=i) - model = catboost.CatBoostRegressor(cat_features=['property_era'], random_state=i) - model.fit(x_trn, y_trn, verbose=False) - r2_score = metrics.r2_score(y_test, model.predict(x_test)) - mape = metrics.mean_absolute_percentage_error(y_test, model.predict(x_test)) - if ((r2_score > 0.45) & (mape < 0.25)): - r2_scores.append(r2_score) - mapes.append(mape) - y_infers.append(model.predict(X_inf.drop(columns=['id']))) - status.status = 'Обучение inference 100%' - current_pred = sum(y_infers) / 5 - - # Обновление полей по результатам работы модели - update_fields = pts_inf[ - [ - 'id', 'age_day_init', 'status', 'fact', 'delta_current', 'delta_first', 'plan_current', 'plan_first', - 'prediction_first', - ] - ] - update_fields = update_fields.join( - pd.concat( + for i in seeds: + status.status = 'Обучение inference: ' + str(int((seeds.index(i) + 1) / len(seeds) * 100)) + '%' + status.save() + x_trn, x_test, y_trn, y_test = ms.train_test_split(X_trn, Y_trn, test_size=0.2, random_state=i) + model = catboost.CatBoostRegressor(cat_features=['property_era'], random_state=i) + model.fit(x_trn, y_trn, verbose=False) + r2_score = metrics.r2_score(y_test, model.predict(x_test)) + mape = metrics.mean_absolute_percentage_error(y_test, model.predict(x_test)) + if ((r2_score > 0.45) & (mape < 0.25)): + r2_scores.append(r2_score) + mapes.append(mape) + y_infers.append(model.predict(X_inf.drop(columns=['id']))) + status.status = 'Обучение inference 100%' + current_pred = sum(y_infers) / 5 + + # Обновление полей по результатам работы модели + update_fields = pts_inf[ [ - X_inf[['id']], - pd.DataFrame({'prediction_current': current_pred}), - ], + 'id', 'age_day_init', 'status', 'fact', 'delta_current', 'delta_first', 'plan_current', 'plan_first', + 'prediction_first', + ] + ] + update_fields = update_fields.join( + pd.concat( + [ + X_inf[['id']], + pd.DataFrame({'prediction_current': current_pred}), + ], + axis=1, + ).set_index('id'), + on='id', + ) + update_fields['prediction_current'] = update_fields['prediction_current'].astype(int) + + days_x = np.array([0, 30, 60, 90, 120, 150, 180, 210, 240, 270]) + perc_y = np.array([0, 0.15, 0.20, 0.30, 0.60, 0.70, 0.70, 0.75, 0.75, 0.80]) + spl = interpolate.splrep(days_x, perc_y) + + update_fields['plan_first'] = update_fields.apply( + lambda x: (x.prediction_first * interpolate.splev(x.age_day_init, spl) if x.status == 'Working' else 0), axis=1, - ).set_index('id'), - on='id', - ) - update_fields['prediction_current'] = update_fields['prediction_current'].astype(int) - - days_x = np.array([0, 30, 60, 90, 120, 150, 180, 210, 240, 270]) - perc_y = np.array([0, 0.15, 0.20, 0.30, 0.60, 0.70, 0.70, 0.75, 0.75, 0.80]) - spl = interpolate.splrep(days_x, perc_y) - - update_fields['plan_first'] = update_fields.apply( - lambda x: (x.prediction_first * interpolate.splev(x.age_day_init, spl) if x.status == 'Working' else 0), - axis=1, - ) - update_fields['plan_current'] = update_fields.apply( - lambda x: (x.prediction_current * interpolate.splev(x.age_day_init, spl) if x.status == 'Working' else 0), - axis=1, - ) - update_fields['delta_first'] = update_fields.apply( - lambda x: ((x.fact - x.plan_first) / x.plan_first * 100 if x.status == 'Working' else 0), - axis=1, - ) - update_fields['delta_current'] = update_fields.apply( - lambda x: ((x.fact - x.plan_current) / x.plan_current * 100 if x.status == 'Working' else 0), - axis=1, - ) + ) + update_fields['plan_current'] = update_fields.apply( + lambda x: (x.prediction_current * interpolate.splev(x.age_day_init, spl) if x.status == 'Working' else 0), + axis=1, + ) + update_fields['delta_first'] = update_fields.apply( + lambda x: ((x.fact - x.plan_first) / x.plan_first * 100 if x.status == 'Working' else 0), + axis=1, + ) + update_fields['delta_current'] = update_fields.apply( + lambda x: ((x.fact - x.plan_current) / x.plan_current * 100 if x.status == 'Working' else 0), + axis=1, + ) - update_fields_working = update_fields.loc[update_fields.status == 'Working'].reset_index(drop=True) - update_fields_working = update_fields_working.fillna(0) - connection.close() + update_fields_working = update_fields.loc[update_fields.status == 'Working'].reset_index(drop=True) + update_fields_working = update_fields_working.fillna(0) + connection.close() except Exception as e: log_to_telegram(f'Ошибка при обновлении полей в базе данных: {e}') log_to_telegram('Начинается обновление полей в базе') - status.status = 'Перерасчет ML: 50%' - status.save() - # Загрузка в базу обновленных значений - try: - log_to_telegram('Подключение к базе данных 2') - conn2 = psycopg2.connect( - database=os.getenv('POSTGRES_DB', 'postgres'), user=os.getenv('POSTGRES_USER', 'postgres'), - password=os.getenv('POSTGRES_PASSWORD', 'postgres'), - host=os.getenv('POSTGRES_HOST', 'postgres'), port=os.getenv('POSTGRES_PORT', 'postgres'), - options='-c search_path=public', - ) - cursor = conn2.cursor() - except: - conn2 = None - log_to_telegram('Не удалось подключиться к базе данных') - - # prediction_current - if conn2 is not None: - update_records1 = [] - for i in range(0, len(update_fields)): - update_records1.append((int(update_fields.prediction_current[i]), int(update_fields.id[i]))) - sql_update_query = f"""Update {table_name} set prediction_current = %s where id = %s""" - try: - psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1) - conn2.commit() - except Exception: - cursor.execute('ROLLBACK') - psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1) - conn2.commit() - - # plan_first - update_records2 = [] - for i in range(0, len(update_fields_working)): - update_records2.append((int(update_fields_working.plan_first[i]), int(update_fields_working.id[i]))) - sql_update_query = f"""Update {table_name} set plan_first = %s where id = %s""" - try: - psycopg2.extras.execute_batch(cursor, sql_update_query, update_records2) - conn2.commit() - except Exception: - cursor.execute('ROLLBACK') - psycopg2.extras.execute_batch(cursor, sql_update_query, update_records2) - conn2.commit() - - # plan_current - update_records3 = [] - for i in range(0, len(update_fields_working)): - update_records3.append((int(update_fields_working.plan_current[i]), int(update_fields_working.id[i]))) - sql_update_query = f"""Update {table_name} set plan_current = %s where id = %s""" - try: - psycopg2.extras.execute_batch(cursor, sql_update_query, update_records3) - conn2.commit() - except Exception: - cursor.execute('ROLLBACK') - psycopg2.extras.execute_batch(cursor, sql_update_query, update_records3) - conn2.commit() - - # delta_first - update_records4 = [] - for i in range(0, len(update_fields_working)): - update_records4.append((int(update_fields_working.delta_first[i]), int(update_fields_working.id[i]))) - sql_update_query = f"""Update {table_name} set delta_first = %s where id = %s""" - try: - psycopg2.extras.execute_batch(cursor, sql_update_query, update_records4) - conn2.commit() - except Exception: - cursor.execute('ROLLBACK') - psycopg2.extras.execute_batch(cursor, sql_update_query, update_records4) - conn2.commit() - - # delta_current - update_records5 = [] - for i in range(0, len(update_fields_working)): - update_records5.append((int(update_fields_working.delta_current[i]), int(update_fields_working.id[i]))) - sql_update_query = f"""Update {table_name} set delta_current = %s where id = %s""" - try: - psycopg2.extras.execute_batch(cursor, sql_update_query, update_records5) - conn2.commit() - except Exception: - cursor.execute('ROLLBACK') - psycopg2.extras.execute_batch(cursor, sql_update_query, update_records5) - conn2.commit() - conn2.close() - cache.clear() - log_to_telegram('end raschet') - status.status = 'Перерасчет ML завершен' + if len(pts_inf) > 0: + status.status = 'Перерасчет ML: 50%' status.save() + # Загрузка в базу обновленных значений + try: + log_to_telegram('Подключение к базе данных 2') + conn2 = psycopg2.connect( + database=os.getenv('POSTGRES_DB', 'postgres'), user=os.getenv('POSTGRES_USER', 'postgres'), + password=os.getenv('POSTGRES_PASSWORD', 'postgres'), + host=os.getenv('POSTGRES_HOST', 'postgres'), port=os.getenv('POSTGRES_PORT', 'postgres'), + options='-c search_path=public', + ) + cursor = conn2.cursor() + except: + conn2 = None + log_to_telegram('Не удалось подключиться к базе данных') + + # prediction_current + if conn2 is not None: + update_records1 = [] + for i in range(0, len(update_fields)): + update_records1.append((int(update_fields.prediction_current[i]), int(update_fields.id[i]))) + sql_update_query = f"""Update {table_name} set prediction_current = %s where id = %s""" + try: + psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1) + conn2.commit() + except Exception: + cursor.execute('ROLLBACK') + psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1) + conn2.commit() + + # plan_first + update_records2 = [] + for i in range(0, len(update_fields_working)): + update_records2.append((int(update_fields_working.plan_first[i]), int(update_fields_working.id[i]))) + sql_update_query = f"""Update {table_name} set plan_first = %s where id = %s""" + try: + psycopg2.extras.execute_batch(cursor, sql_update_query, update_records2) + conn2.commit() + except Exception: + cursor.execute('ROLLBACK') + psycopg2.extras.execute_batch(cursor, sql_update_query, update_records2) + conn2.commit() + + # plan_current + update_records3 = [] + for i in range(0, len(update_fields_working)): + update_records3.append((int(update_fields_working.plan_current[i]), int(update_fields_working.id[i]))) + sql_update_query = f"""Update {table_name} set plan_current = %s where id = %s""" + try: + psycopg2.extras.execute_batch(cursor, sql_update_query, update_records3) + conn2.commit() + except Exception: + cursor.execute('ROLLBACK') + psycopg2.extras.execute_batch(cursor, sql_update_query, update_records3) + conn2.commit() + + # delta_first + update_records4 = [] + for i in range(0, len(update_fields_working)): + update_records4.append((int(update_fields_working.delta_first[i]), int(update_fields_working.id[i]))) + sql_update_query = f"""Update {table_name} set delta_first = %s where id = %s""" + try: + psycopg2.extras.execute_batch(cursor, sql_update_query, update_records4) + conn2.commit() + except Exception: + cursor.execute('ROLLBACK') + psycopg2.extras.execute_batch(cursor, sql_update_query, update_records4) + conn2.commit() + + # delta_current + update_records5 = [] + for i in range(0, len(update_fields_working)): + update_records5.append((int(update_fields_working.delta_current[i]), int(update_fields_working.id[i]))) + sql_update_query = f"""Update {table_name} set delta_current = %s where id = %s""" + try: + psycopg2.extras.execute_batch(cursor, sql_update_query, update_records5) + conn2.commit() + except Exception: + cursor.execute('ROLLBACK') + psycopg2.extras.execute_batch(cursor, sql_update_query, update_records5) + conn2.commit() + conn2.close() + cache.clear() + log_to_telegram('end raschet') + status.status = 'Перерасчет ML завершен' + status.save() + LastMLCall.objects.all().delete() + LastMLCall.objects.create() @shared_task @@ -379,8 +383,7 @@ def load_post_and_pvz(obj_id: int): status.status = "Завершено" cache.clear() status.save() - LastMLCall.objects.all().delete() - LastMLCall.objects.create() + @shared_task()