Skip to content

Commit

Permalink
editoast: clean up and fix deadlock tests
Browse files Browse the repository at this point in the history
  • Loading branch information
hamz2a committed Jul 10, 2024
1 parent 0082c52 commit f49e67c
Show file tree
Hide file tree
Showing 13 changed files with 193 additions and 175 deletions.
8 changes: 8 additions & 0 deletions editoast/editoast_models/src/db_connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ impl DbConnectionPoolV2 {
/// # }
/// ```
///
/// ### Deadlocks
///
/// We encountered a deadlock error in our tests,
/// especially those using `empty_infra` and `small_infra`.
/// Adding `#[serial_test::serial]` solved the issue.
/// We tried increasing the deadlock timeout, but that didn't work.
/// Using random `infra_id` with rand didn't help either.
///
/// ## Guidelines
///
/// To prevent these issues, prefer the following patterns:
Expand Down
2 changes: 1 addition & 1 deletion editoast/src/fixtures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ pub mod tests {
Infra::changeset()
.name("small_infra".to_owned())
.last_railjson_version()
.persist(railjson, db_pool)
.persist(railjson, db_pool.get().await.unwrap().deref_mut())
.await
.unwrap()
}
Expand Down
40 changes: 22 additions & 18 deletions editoast/src/generated_data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,38 +86,39 @@ pub trait GeneratedData {
}

/// Refresh all the generated data of a given infra
#[tracing::instrument(level = "debug", skip_all, fields(infra_id))]
pub async fn refresh_all(
db_pool: Arc<DbConnectionPoolV2>,
infra: i64,
infra_id: i64,
infra_cache: &InfraCache,
) -> Result<()> {
// The other layers depend on track section layer.
// We must wait until its completion before running the other requests in parallel
TrackSectionLayer::refresh_pool(db_pool.clone(), infra, infra_cache).await?;
debug!("⚙️ Infra {infra}: track section layer is generated");
TrackSectionLayer::refresh_pool(db_pool.clone(), infra_id, infra_cache).await?;
debug!("⚙️ Infra {infra_id}: track section layer is generated");
// The analyze step significantly improves the performance when importing and generating together
// It doesn’t seem to make a different when the generation step is ran separately
// It isn’t clear why without analyze the Postgres server seems to run at 100% without halting
sql_query("analyze")
.execute(db_pool.get().await?.deref_mut())
.await?;
debug!("⚙️ Infra {infra}: database analyzed");
debug!("⚙️ Infra {infra_id}: database analyzed");
futures::try_join!(
SpeedSectionLayer::refresh_pool(db_pool.clone(), infra, infra_cache),
SignalLayer::refresh_pool(db_pool.clone(), infra, infra_cache),
SwitchLayer::refresh_pool(db_pool.clone(), infra, infra_cache),
BufferStopLayer::refresh_pool(db_pool.clone(), infra, infra_cache),
ElectrificationLayer::refresh_pool(db_pool.clone(), infra, infra_cache),
DetectorLayer::refresh_pool(db_pool.clone(), infra, infra_cache),
OperationalPointLayer::refresh_pool(db_pool.clone(), infra, infra_cache),
PSLSignLayer::refresh_pool(db_pool.clone(), infra, infra_cache),
NeutralSectionLayer::refresh_pool(db_pool.clone(), infra, infra_cache),
NeutralSignLayer::refresh_pool(db_pool.clone(), infra, infra_cache),
SpeedSectionLayer::refresh_pool(db_pool.clone(), infra_id, infra_cache),
SignalLayer::refresh_pool(db_pool.clone(), infra_id, infra_cache),
SwitchLayer::refresh_pool(db_pool.clone(), infra_id, infra_cache),
BufferStopLayer::refresh_pool(db_pool.clone(), infra_id, infra_cache),
ElectrificationLayer::refresh_pool(db_pool.clone(), infra_id, infra_cache),
DetectorLayer::refresh_pool(db_pool.clone(), infra_id, infra_cache),
OperationalPointLayer::refresh_pool(db_pool.clone(), infra_id, infra_cache),
PSLSignLayer::refresh_pool(db_pool.clone(), infra_id, infra_cache),
NeutralSectionLayer::refresh_pool(db_pool.clone(), infra_id, infra_cache),
NeutralSignLayer::refresh_pool(db_pool.clone(), infra_id, infra_cache),
)?;
debug!("⚙️ Infra {infra}: object layers is generated");
debug!("⚙️ Infra {infra_id}: object layers is generated");
// The error layer depends on the other layers and must be executed at the end.
ErrorLayer::refresh_pool(db_pool.clone(), infra, infra_cache).await?;
debug!("⚙️ Infra {infra}: errors layer is generated");
ErrorLayer::refresh_pool(db_pool.clone(), infra_id, infra_cache).await?;
debug!("⚙️ Infra {infra_id}: errors layer is generated");
Ok(())
}

Expand Down Expand Up @@ -171,7 +172,10 @@ pub mod tests {
use crate::modelsv2::fixtures::create_empty_infra;
use editoast_models::DbConnectionPoolV2;

#[rstest] // Slow test
#[rstest]
// Slow test
// PostgreSQL deadlock can happen in this test, see section `Deadlock` of [DbConnectionPoolV2::get] for more information
#[serial_test::serial]
async fn refresh_all_test() {
let db_pool = DbConnectionPoolV2::for_tests();
let infra = create_empty_infra(db_pool.get_ok().deref_mut()).await;
Expand Down
4 changes: 3 additions & 1 deletion editoast/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,9 @@ async fn import_railjson(
let railjson: RailJson = serde_json::from_reader(BufReader::new(railjson_file))?;

println!("🍞 Importing infra {infra_name}");
let mut infra = infra.persist_v2(railjson, db_pool.clone()).await?;
let mut infra = infra
.persist(railjson, db_pool.get().await?.deref_mut())
.await?;

infra
.bump_version(db_pool.get().await?.deref_mut())
Expand Down
5 changes: 2 additions & 3 deletions editoast/src/modelsv2/fixtures.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::io::Cursor;
use std::sync::Arc;

use chrono::Utc;
use editoast_schemas::infra::InfraObject;
Expand Down Expand Up @@ -287,15 +286,15 @@ where
railjson_object
}

pub async fn create_small_infra(db_pool: Arc<DbConnectionPoolV2>) -> Infra {
pub async fn create_small_infra(conn: &mut DbConnection) -> Infra {
let railjson: RailJson = serde_json::from_str(include_str!(
"../../../tests/data/infras/small_infra/infra.json"
))
.unwrap();
Infra::changeset()
.name("small_infra".to_owned())
.last_railjson_version()
.persist_v2(railjson, db_pool)
.persist(railjson, conn)
.await
.unwrap()
}
110 changes: 64 additions & 46 deletions editoast/src/modelsv2/infra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,9 @@ use crate::modelsv2::get_geometry_layer_table;
use crate::modelsv2::get_table;
use crate::modelsv2::prelude::*;
use crate::modelsv2::railjson::persist_railjson;
use crate::modelsv2::railjson::persist_railjson_v2;
use crate::modelsv2::Create;
use crate::tables::infra::dsl;
use editoast_models::DbConnection;
use editoast_models::DbConnectionPool;
use editoast_models::DbConnectionPoolV2;
use editoast_schemas::infra::RailJson;
use editoast_schemas::infra::RAILJSON_VERSION;
Expand Down Expand Up @@ -77,16 +75,11 @@ pub struct Infra {
}

impl InfraChangeset {
pub async fn persist(
self,
railjson: RailJson,
db_pool: Arc<DbConnectionPool>,
) -> Result<Infra> {
let conn = &mut db_pool.get().await?;
pub async fn persist(self, railjson: RailJson, conn: &mut DbConnection) -> Result<Infra> {
let infra = self.create(conn).await?;
// TODO: lock infra for update
debug!("🛤 Begin importing all railjson objects");
if let Err(e) = persist_railjson(db_pool, infra.id, railjson).await {
if let Err(e) = persist_railjson(conn, infra.id, railjson).await {
error!("Could not import infrastructure {}. Rolling back", infra.id);
infra.delete(conn).await?;
return Err(e);
Expand All @@ -95,23 +88,6 @@ impl InfraChangeset {
Ok(infra)
}

pub async fn persist_v2(
self,
railjson: RailJson,
db_pool: Arc<DbConnectionPoolV2>,
) -> Result<Infra> {
let infra = self.create(db_pool.get().await?.deref_mut()).await?;
// TODO: lock infra for update
debug!("🛤 Begin importing all railjson objects");
if let Err(e) = persist_railjson_v2(db_pool.clone(), infra.id, railjson).await {
error!("Could not import infrastructure {}. Rolling back", infra.id);
infra.delete(db_pool.get().await?.deref_mut()).await?;
return Err(e);
};
debug!("🛤 Import finished successfully");
Ok(infra)
}

#[must_use = "builder methods are intended to be chained"]
pub fn last_railjson_version(self) -> Self {
self.railjson_version(RAILJSON_VERSION.to_owned())
Expand Down Expand Up @@ -337,8 +313,6 @@ pub mod tests {

use super::Infra;
use crate::error::EditoastError;
use crate::fixtures::tests::db_pool;
use crate::fixtures::tests::IntoFixture;
use crate::modelsv2::fixtures::create_empty_infra;
use crate::modelsv2::infra::DEFAULT_INFRA_VERSION;
use crate::modelsv2::prelude::*;
Expand All @@ -359,6 +333,8 @@ pub mod tests {
}

#[rstest]
// PostgreSQL deadlock can happen in this test, see section `Deadlock` of [DbConnectionPoolV2::get] for more information
#[serial_test::serial]
async fn clone_infra_with_new_name_returns_new_cloned_infra() {
// GIVEN
let db_pool = DbConnectionPoolV2::for_tests();
Expand All @@ -378,15 +354,15 @@ pub mod tests {
#[rstest]
#[serial_test::serial]
async fn persists_railjson_ko_version() {
let pool = db_pool();
let db_pool = DbConnectionPoolV2::for_tests();
let railjson_with_invalid_version = RailJson {
version: "0".to_string(),
..Default::default()
};
let res = Infra::changeset()
.name("test".to_owned())
.last_railjson_version()
.persist(railjson_with_invalid_version, pool)
.persist(railjson_with_invalid_version, db_pool.get_ok().deref_mut())
.await;
assert!(res.is_err());
let expected_error = RailJsonError::UnsupportedVersion {
Expand Down Expand Up @@ -418,14 +394,13 @@ pub mod tests {
version: RAILJSON_VERSION.to_string(),
};

let pool = db_pool();
let db_pool = DbConnectionPoolV2::for_tests();
let infra = Infra::changeset()
.name("persist_railjson_ok_infra".to_owned())
.last_railjson_version()
.persist(railjson.clone(), pool.clone())
.persist(railjson.clone(), db_pool.get_ok().deref_mut())
.await
.expect("could not persist infra")
.into_fixture(pool.clone());
.expect("could not persist infra");

// THEN
assert_eq!(infra.railjson_version, railjson.version);
Expand All @@ -435,51 +410,94 @@ pub mod tests {
objects
}

let conn = &mut pool.get().await.unwrap();
let id = infra.id;

assert_eq!(
sort::<BufferStop>(find_all_schemas(conn, id).await.unwrap()),
sort::<BufferStop>(
find_all_schemas(db_pool.get_ok().deref_mut(), id)
.await
.unwrap()
),
sort(railjson.buffer_stops)
);
assert_eq!(
sort::<Route>(find_all_schemas(conn, id).await.unwrap()),
sort::<Route>(
find_all_schemas(db_pool.get_ok().deref_mut(), id)
.await
.unwrap()
),
sort(railjson.routes)
);
assert_eq!(
sort::<SwitchType>(find_all_schemas(conn, id).await.unwrap()),
sort::<SwitchType>(
find_all_schemas(db_pool.get_ok().deref_mut(), id)
.await
.unwrap()
),
sort(railjson.extended_switch_types)
);
assert_eq!(
sort::<Switch>(find_all_schemas(conn, id).await.unwrap()),
sort::<Switch>(
find_all_schemas(db_pool.get_ok().deref_mut(), id)
.await
.unwrap()
),
sort(railjson.switches)
);
assert_eq!(
sort::<TrackSection>(find_all_schemas(conn, id).await.unwrap()),
sort::<TrackSection>(
find_all_schemas(db_pool.get_ok().deref_mut(), id)
.await
.unwrap()
),
sort(railjson.track_sections)
);
assert_eq!(
sort::<SpeedSection>(find_all_schemas(conn, id).await.unwrap()),
sort::<SpeedSection>(
find_all_schemas(db_pool.get_ok().deref_mut(), id)
.await
.unwrap()
),
sort(railjson.speed_sections)
);
assert_eq!(
sort::<NeutralSection>(find_all_schemas(conn, id).await.unwrap()),
sort::<NeutralSection>(
find_all_schemas(db_pool.get_ok().deref_mut(), id)
.await
.unwrap()
),
sort(railjson.neutral_sections)
);
assert_eq!(
sort::<Electrification>(find_all_schemas(conn, id).await.unwrap()),
sort::<Electrification>(
find_all_schemas(db_pool.get_ok().deref_mut(), id)
.await
.unwrap()
),
sort(railjson.electrifications)
);
assert_eq!(
sort::<Signal>(find_all_schemas(conn, id).await.unwrap()),
sort::<Signal>(
find_all_schemas(db_pool.get_ok().deref_mut(), id)
.await
.unwrap()
),
sort(railjson.signals)
);
assert_eq!(
sort::<Detector>(find_all_schemas(conn, id).await.unwrap()),
sort::<Detector>(
find_all_schemas(db_pool.get_ok().deref_mut(), id)
.await
.unwrap()
),
sort(railjson.detectors)
);
assert_eq!(
sort::<OperationalPoint>(find_all_schemas(conn, id).await.unwrap()),
sort::<OperationalPoint>(
find_all_schemas(db_pool.get_ok().deref_mut(), id)
.await
.unwrap()
),
sort(railjson.operational_points)
);
}
Expand Down
Loading

0 comments on commit f49e67c

Please sign in to comment.