Merge branch 'fix_ml-2' into 'dev'

fix ml 2

See merge request spatial/postamates!125
dev
Timofey Malinin 2 years ago
commit a305972d45

@ -133,192 +133,196 @@ def raschet(table_name='service_placementpoint'):
pts_inf = pts_inf.loc[pts_inf.matching_status == 'New'].reset_index(drop=True)
pts_inf['geometry'] = pts_inf['geometry'].apply(wkb.loads, hex=True)
pts_inf = gpd.GeoDataFrame(pts_inf, geometry='geometry', crs='epsg:4326')
pts_inf = pts_inf.to_crs('epsg:32637')
pts_inf['buf'] = pts_inf.buffer(500)
pts_inf = gpd.GeoDataFrame(pts_inf, geometry='buf', crs='epsg:32637')
pts_target = pts.loc[(pts.status == 'Working') |
(pts.status == 'Installation') |
(pts.sample_trn == True)].reset_index(drop=True)
pts_target = pts_target[['geometry']]
pts_target['cnt'] = 1
pts_target = gpd.GeoDataFrame(pts_target, geometry='geometry', crs='epsg:32637')
target_feature_coords = []
for i in range(0, len(pts_target)):
target_feature_coords.append((pts_target.geometry.x[i], pts_target.geometry.y[i]))
target_feature_coords = np.array(target_feature_coords)
pts_inf['target_dist'] = pts_inf.apply(
lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[0])),
axis=1,
)
pts_inf.loc[pts_inf.target_dist > 700, 'target_dist'] = 700
pts_inf = pts_inf.sort_values(by='id').reset_index(drop=True)
target_post = gpd.sjoin(pts_inf, pts_target, op='contains').groupby('id', as_index=False).agg({'cnt': 'count'})
target_post = target_post.rename(columns={'cnt': 'target_post_cnt'})
pts_inf = pts_inf.drop(columns=['target_post_cnt'])
pts_inf = pts_inf.join(target_post.set_index('id'), on='id')
pts_inf['age_day_init'] = pts_inf['age_day']
pts_inf['age_day'] = 240
X_inf = pts_inf[feats]
seeds = [3, 99, 87, 21, 15]
# Обучение, инференс
r2_scores = []
mapes = []
y_infers = []
status.status = 'Обучение inference 0%'
status.save()
for i in seeds:
status.status = 'Обучение inference: ' + str(int((seeds.index(i) + 1) / len(seeds) * 100)) + '%'
if len(pts_inf) > 0:
pts_inf = pts_inf.to_crs('epsg:32637')
pts_inf['buf'] = pts_inf.buffer(500)
pts_inf = gpd.GeoDataFrame(pts_inf, geometry='buf', crs='epsg:32637')
pts_target = pts.loc[(pts.status == 'Working') |
(pts.status == 'Installation') |
(pts.sample_trn == True)].reset_index(drop=True)
pts_target = pts_target[['geometry']]
pts_target['cnt'] = 1
pts_target = gpd.GeoDataFrame(pts_target, geometry='geometry', crs='epsg:32637')
target_feature_coords = []
for i in range(0, len(pts_target)):
target_feature_coords.append((pts_target.geometry.x[i], pts_target.geometry.y[i]))
target_feature_coords = np.array(target_feature_coords)
pts_inf['target_dist'] = pts_inf.apply(
lambda x: ((sorted(distance.cdist([[x['geometry'].x, x['geometry'].y]], target_feature_coords)[0])[0])),
axis=1,
)
pts_inf.loc[pts_inf.target_dist > 700, 'target_dist'] = 700
pts_inf = pts_inf.sort_values(by='id').reset_index(drop=True)
target_post = gpd.sjoin(pts_inf, pts_target, op='contains').groupby('id', as_index=False).agg({'cnt': 'count'})
target_post = target_post.rename(columns={'cnt': 'target_post_cnt'})
pts_inf = pts_inf.drop(columns=['target_post_cnt'])
pts_inf = pts_inf.join(target_post.set_index('id'), on='id')
pts_inf['age_day_init'] = pts_inf['age_day']
pts_inf['age_day'] = 240
X_inf = pts_inf[feats]
seeds = [3, 99, 87, 21, 15]
# Обучение, инференс
r2_scores = []
mapes = []
y_infers = []
status.status = 'Обучение inference 0%'
status.save()
x_trn, x_test, y_trn, y_test = ms.train_test_split(X_trn, Y_trn, test_size=0.2, random_state=i)
model = catboost.CatBoostRegressor(cat_features=['property_era'], random_state=i)
model.fit(x_trn, y_trn, verbose=False)
r2_score = metrics.r2_score(y_test, model.predict(x_test))
mape = metrics.mean_absolute_percentage_error(y_test, model.predict(x_test))
if ((r2_score > 0.45) & (mape < 0.25)):
r2_scores.append(r2_score)
mapes.append(mape)
y_infers.append(model.predict(X_inf.drop(columns=['id'])))
status.status = 'Обучение inference 100%'
current_pred = sum(y_infers) / 5
# Обновление полей по результатам работы модели
update_fields = pts_inf[
[
'id', 'age_day_init', 'status', 'fact', 'delta_current', 'delta_first', 'plan_current', 'plan_first',
'prediction_first',
]
]
update_fields = update_fields.join(
pd.concat(
for i in seeds:
status.status = 'Обучение inference: ' + str(int((seeds.index(i) + 1) / len(seeds) * 100)) + '%'
status.save()
x_trn, x_test, y_trn, y_test = ms.train_test_split(X_trn, Y_trn, test_size=0.2, random_state=i)
model = catboost.CatBoostRegressor(cat_features=['property_era'], random_state=i)
model.fit(x_trn, y_trn, verbose=False)
r2_score = metrics.r2_score(y_test, model.predict(x_test))
mape = metrics.mean_absolute_percentage_error(y_test, model.predict(x_test))
if ((r2_score > 0.45) & (mape < 0.25)):
r2_scores.append(r2_score)
mapes.append(mape)
y_infers.append(model.predict(X_inf.drop(columns=['id'])))
status.status = 'Обучение inference 100%'
current_pred = sum(y_infers) / 5
# Обновление полей по результатам работы модели
update_fields = pts_inf[
[
X_inf[['id']],
pd.DataFrame({'prediction_current': current_pred}),
],
'id', 'age_day_init', 'status', 'fact', 'delta_current', 'delta_first', 'plan_current', 'plan_first',
'prediction_first',
]
]
update_fields = update_fields.join(
pd.concat(
[
X_inf[['id']],
pd.DataFrame({'prediction_current': current_pred}),
],
axis=1,
).set_index('id'),
on='id',
)
update_fields['prediction_current'] = update_fields['prediction_current'].astype(int)
days_x = np.array([0, 30, 60, 90, 120, 150, 180, 210, 240, 270])
perc_y = np.array([0, 0.15, 0.20, 0.30, 0.60, 0.70, 0.70, 0.75, 0.75, 0.80])
spl = interpolate.splrep(days_x, perc_y)
update_fields['plan_first'] = update_fields.apply(
lambda x: (x.prediction_first * interpolate.splev(x.age_day_init, spl) if x.status == 'Working' else 0),
axis=1,
).set_index('id'),
on='id',
)
update_fields['prediction_current'] = update_fields['prediction_current'].astype(int)
days_x = np.array([0, 30, 60, 90, 120, 150, 180, 210, 240, 270])
perc_y = np.array([0, 0.15, 0.20, 0.30, 0.60, 0.70, 0.70, 0.75, 0.75, 0.80])
spl = interpolate.splrep(days_x, perc_y)
update_fields['plan_first'] = update_fields.apply(
lambda x: (x.prediction_first * interpolate.splev(x.age_day_init, spl) if x.status == 'Working' else 0),
axis=1,
)
update_fields['plan_current'] = update_fields.apply(
lambda x: (x.prediction_current * interpolate.splev(x.age_day_init, spl) if x.status == 'Working' else 0),
axis=1,
)
update_fields['delta_first'] = update_fields.apply(
lambda x: ((x.fact - x.plan_first) / x.plan_first * 100 if x.status == 'Working' else 0),
axis=1,
)
update_fields['delta_current'] = update_fields.apply(
lambda x: ((x.fact - x.plan_current) / x.plan_current * 100 if x.status == 'Working' else 0),
axis=1,
)
)
update_fields['plan_current'] = update_fields.apply(
lambda x: (x.prediction_current * interpolate.splev(x.age_day_init, spl) if x.status == 'Working' else 0),
axis=1,
)
update_fields['delta_first'] = update_fields.apply(
lambda x: ((x.fact - x.plan_first) / x.plan_first * 100 if x.status == 'Working' else 0),
axis=1,
)
update_fields['delta_current'] = update_fields.apply(
lambda x: ((x.fact - x.plan_current) / x.plan_current * 100 if x.status == 'Working' else 0),
axis=1,
)
update_fields_working = update_fields.loc[update_fields.status == 'Working'].reset_index(drop=True)
update_fields_working = update_fields_working.fillna(0)
connection.close()
update_fields_working = update_fields.loc[update_fields.status == 'Working'].reset_index(drop=True)
update_fields_working = update_fields_working.fillna(0)
connection.close()
except Exception as e:
log_to_telegram(f'Ошибка при обновлении полей в базе данных: {e}')
log_to_telegram('Начинается обновление полей в базе')
status.status = 'Перерасчет ML: 50%'
status.save()
# Загрузка в базу обновленных значений
try:
log_to_telegram('Подключение к базе данных 2')
conn2 = psycopg2.connect(
database=os.getenv('POSTGRES_DB', 'postgres'), user=os.getenv('POSTGRES_USER', 'postgres'),
password=os.getenv('POSTGRES_PASSWORD', 'postgres'),
host=os.getenv('POSTGRES_HOST', 'postgres'), port=os.getenv('POSTGRES_PORT', 'postgres'),
options='-c search_path=public',
)
cursor = conn2.cursor()
except:
conn2 = None
log_to_telegram('Не удалось подключиться к базе данных')
# prediction_current
if conn2 is not None:
update_records1 = []
for i in range(0, len(update_fields)):
update_records1.append((int(update_fields.prediction_current[i]), int(update_fields.id[i])))
sql_update_query = f"""Update {table_name} set prediction_current = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1)
conn2.commit()
# plan_first
update_records2 = []
for i in range(0, len(update_fields_working)):
update_records2.append((int(update_fields_working.plan_first[i]), int(update_fields_working.id[i])))
sql_update_query = f"""Update {table_name} set plan_first = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records2)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records2)
conn2.commit()
# plan_current
update_records3 = []
for i in range(0, len(update_fields_working)):
update_records3.append((int(update_fields_working.plan_current[i]), int(update_fields_working.id[i])))
sql_update_query = f"""Update {table_name} set plan_current = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records3)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records3)
conn2.commit()
# delta_first
update_records4 = []
for i in range(0, len(update_fields_working)):
update_records4.append((int(update_fields_working.delta_first[i]), int(update_fields_working.id[i])))
sql_update_query = f"""Update {table_name} set delta_first = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records4)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records4)
conn2.commit()
# delta_current
update_records5 = []
for i in range(0, len(update_fields_working)):
update_records5.append((int(update_fields_working.delta_current[i]), int(update_fields_working.id[i])))
sql_update_query = f"""Update {table_name} set delta_current = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records5)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records5)
conn2.commit()
conn2.close()
cache.clear()
log_to_telegram('end raschet')
status.status = 'Перерасчет ML завершен'
if len(pts_inf) > 0:
status.status = 'Перерасчет ML: 50%'
status.save()
# Загрузка в базу обновленных значений
try:
log_to_telegram('Подключение к базе данных 2')
conn2 = psycopg2.connect(
database=os.getenv('POSTGRES_DB', 'postgres'), user=os.getenv('POSTGRES_USER', 'postgres'),
password=os.getenv('POSTGRES_PASSWORD', 'postgres'),
host=os.getenv('POSTGRES_HOST', 'postgres'), port=os.getenv('POSTGRES_PORT', 'postgres'),
options='-c search_path=public',
)
cursor = conn2.cursor()
except:
conn2 = None
log_to_telegram('Не удалось подключиться к базе данных')
# prediction_current
if conn2 is not None:
update_records1 = []
for i in range(0, len(update_fields)):
update_records1.append((int(update_fields.prediction_current[i]), int(update_fields.id[i])))
sql_update_query = f"""Update {table_name} set prediction_current = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records1)
conn2.commit()
# plan_first
update_records2 = []
for i in range(0, len(update_fields_working)):
update_records2.append((int(update_fields_working.plan_first[i]), int(update_fields_working.id[i])))
sql_update_query = f"""Update {table_name} set plan_first = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records2)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records2)
conn2.commit()
# plan_current
update_records3 = []
for i in range(0, len(update_fields_working)):
update_records3.append((int(update_fields_working.plan_current[i]), int(update_fields_working.id[i])))
sql_update_query = f"""Update {table_name} set plan_current = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records3)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records3)
conn2.commit()
# delta_first
update_records4 = []
for i in range(0, len(update_fields_working)):
update_records4.append((int(update_fields_working.delta_first[i]), int(update_fields_working.id[i])))
sql_update_query = f"""Update {table_name} set delta_first = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records4)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records4)
conn2.commit()
# delta_current
update_records5 = []
for i in range(0, len(update_fields_working)):
update_records5.append((int(update_fields_working.delta_current[i]), int(update_fields_working.id[i])))
sql_update_query = f"""Update {table_name} set delta_current = %s where id = %s"""
try:
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records5)
conn2.commit()
except Exception:
cursor.execute('ROLLBACK')
psycopg2.extras.execute_batch(cursor, sql_update_query, update_records5)
conn2.commit()
conn2.close()
cache.clear()
log_to_telegram('end raschet')
status.status = 'Перерасчет ML завершен'
status.save()
LastMLCall.objects.all().delete()
LastMLCall.objects.create()
@shared_task
@ -379,8 +383,7 @@ def load_post_and_pvz(obj_id: int):
status.status = "Завершено"
cache.clear()
status.save()
LastMLCall.objects.all().delete()
LastMLCall.objects.create()
@shared_task()

Loading…
Cancel
Save