diff --git a/Cargo.toml b/Cargo.toml index f89da3da2..4ae971e2b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ features = [ "proxy", "rbac", "schema-sync", + "tracing-spans", "runtime-tokio-native-tls", "postgres-array", "postgres-vector", @@ -105,6 +106,7 @@ uuid = { version = "1", features = ["v4"] } [features] debug-print = [] +tracing-spans = [] default = [ "macros", "with-json", diff --git a/src/database/db_connection.rs b/src/database/db_connection.rs index 1ead10a7e..4aff50514 100644 --- a/src/database/db_connection.rs +++ b/src/database/db_connection.rs @@ -144,95 +144,153 @@ impl ConnectionTrait for DatabaseConnection { #[instrument(level = "trace")] #[allow(unused_variables)] async fn execute_raw(&self, stmt: Statement) -> Result { - match &self.inner { - #[cfg(feature = "sqlx-mysql")] - DatabaseConnectionType::SqlxMySqlPoolConnection(conn) => conn.execute(stmt).await, - #[cfg(feature = "sqlx-postgres")] - DatabaseConnectionType::SqlxPostgresPoolConnection(conn) => conn.execute(stmt).await, - #[cfg(feature = "sqlx-sqlite")] - DatabaseConnectionType::SqlxSqlitePoolConnection(conn) => conn.execute(stmt).await, - #[cfg(feature = "rusqlite")] - DatabaseConnectionType::RusqliteSharedConnection(conn) => conn.execute(stmt), - #[cfg(feature = "mock")] - DatabaseConnectionType::MockDatabaseConnection(conn) => conn.execute(stmt), - #[cfg(feature = "proxy")] - DatabaseConnectionType::ProxyDatabaseConnection(conn) => conn.execute(stmt).await, - DatabaseConnectionType::Disconnected => Err(conn_err("Disconnected")), - } + crate::with_db_span!( + "sea_orm.execute", + self.get_database_backend(), + stmt.sql.as_str(), + record_stmt = true, + async { + match &self.inner { + #[cfg(feature = "sqlx-mysql")] + DatabaseConnectionType::SqlxMySqlPoolConnection(conn) => { + conn.execute(stmt).await + } + #[cfg(feature = "sqlx-postgres")] + DatabaseConnectionType::SqlxPostgresPoolConnection(conn) => { + conn.execute(stmt).await + } + #[cfg(feature = "sqlx-sqlite")] + DatabaseConnectionType::SqlxSqlitePoolConnection(conn) => { + conn.execute(stmt).await + } + #[cfg(feature = "rusqlite")] + DatabaseConnectionType::RusqliteSharedConnection(conn) => conn.execute(stmt), + #[cfg(feature = "mock")] + DatabaseConnectionType::MockDatabaseConnection(conn) => conn.execute(stmt), + #[cfg(feature = "proxy")] + DatabaseConnectionType::ProxyDatabaseConnection(conn) => { + conn.execute(stmt).await + } + DatabaseConnectionType::Disconnected => Err(conn_err("Disconnected")), + } + } + ) } #[instrument(level = "trace")] #[allow(unused_variables)] async fn execute_unprepared(&self, sql: &str) -> Result { - match &self.inner { - #[cfg(feature = "sqlx-mysql")] - DatabaseConnectionType::SqlxMySqlPoolConnection(conn) => { - conn.execute_unprepared(sql).await - } - #[cfg(feature = "sqlx-postgres")] - DatabaseConnectionType::SqlxPostgresPoolConnection(conn) => { - conn.execute_unprepared(sql).await - } - #[cfg(feature = "sqlx-sqlite")] - DatabaseConnectionType::SqlxSqlitePoolConnection(conn) => { - conn.execute_unprepared(sql).await - } - #[cfg(feature = "rusqlite")] - DatabaseConnectionType::RusqliteSharedConnection(conn) => conn.execute_unprepared(sql), - #[cfg(feature = "mock")] - DatabaseConnectionType::MockDatabaseConnection(conn) => { - let db_backend = conn.get_database_backend(); - let stmt = Statement::from_string(db_backend, sql); - conn.execute(stmt) - } - #[cfg(feature = "proxy")] - DatabaseConnectionType::ProxyDatabaseConnection(conn) => { - let db_backend = conn.get_database_backend(); - let stmt = Statement::from_string(db_backend, sql); - conn.execute(stmt).await + crate::with_db_span!( + "sea_orm.execute_unprepared", + self.get_database_backend(), + sql, + record_stmt = false, + async { + match &self.inner { + #[cfg(feature = "sqlx-mysql")] + DatabaseConnectionType::SqlxMySqlPoolConnection(conn) => { + conn.execute_unprepared(sql).await + } + #[cfg(feature = "sqlx-postgres")] + DatabaseConnectionType::SqlxPostgresPoolConnection(conn) => { + conn.execute_unprepared(sql).await + } + #[cfg(feature = "sqlx-sqlite")] + DatabaseConnectionType::SqlxSqlitePoolConnection(conn) => { + conn.execute_unprepared(sql).await + } + #[cfg(feature = "rusqlite")] + DatabaseConnectionType::RusqliteSharedConnection(conn) => { + conn.execute_unprepared(sql) + } + #[cfg(feature = "mock")] + DatabaseConnectionType::MockDatabaseConnection(conn) => { + let db_backend = conn.get_database_backend(); + let stmt = Statement::from_string(db_backend, sql); + conn.execute(stmt) + } + #[cfg(feature = "proxy")] + DatabaseConnectionType::ProxyDatabaseConnection(conn) => { + let db_backend = conn.get_database_backend(); + let stmt = Statement::from_string(db_backend, sql); + conn.execute(stmt).await + } + DatabaseConnectionType::Disconnected => Err(conn_err("Disconnected")), + } } - DatabaseConnectionType::Disconnected => Err(conn_err("Disconnected")), - } + ) } #[instrument(level = "trace")] #[allow(unused_variables)] async fn query_one_raw(&self, stmt: Statement) -> Result, DbErr> { - match &self.inner { - #[cfg(feature = "sqlx-mysql")] - DatabaseConnectionType::SqlxMySqlPoolConnection(conn) => conn.query_one(stmt).await, - #[cfg(feature = "sqlx-postgres")] - DatabaseConnectionType::SqlxPostgresPoolConnection(conn) => conn.query_one(stmt).await, - #[cfg(feature = "sqlx-sqlite")] - DatabaseConnectionType::SqlxSqlitePoolConnection(conn) => conn.query_one(stmt).await, - #[cfg(feature = "rusqlite")] - DatabaseConnectionType::RusqliteSharedConnection(conn) => conn.query_one(stmt), - #[cfg(feature = "mock")] - DatabaseConnectionType::MockDatabaseConnection(conn) => conn.query_one(stmt), - #[cfg(feature = "proxy")] - DatabaseConnectionType::ProxyDatabaseConnection(conn) => conn.query_one(stmt).await, - DatabaseConnectionType::Disconnected => Err(conn_err("Disconnected")), - } + crate::with_db_span!( + "sea_orm.query_one", + self.get_database_backend(), + stmt.sql.as_str(), + record_stmt = true, + async { + match &self.inner { + #[cfg(feature = "sqlx-mysql")] + DatabaseConnectionType::SqlxMySqlPoolConnection(conn) => { + conn.query_one(stmt).await + } + #[cfg(feature = "sqlx-postgres")] + DatabaseConnectionType::SqlxPostgresPoolConnection(conn) => { + conn.query_one(stmt).await + } + #[cfg(feature = "sqlx-sqlite")] + DatabaseConnectionType::SqlxSqlitePoolConnection(conn) => { + conn.query_one(stmt).await + } + #[cfg(feature = "rusqlite")] + DatabaseConnectionType::RusqliteSharedConnection(conn) => conn.query_one(stmt), + #[cfg(feature = "mock")] + DatabaseConnectionType::MockDatabaseConnection(conn) => conn.query_one(stmt), + #[cfg(feature = "proxy")] + DatabaseConnectionType::ProxyDatabaseConnection(conn) => { + conn.query_one(stmt).await + } + DatabaseConnectionType::Disconnected => Err(conn_err("Disconnected")), + } + } + ) } #[instrument(level = "trace")] #[allow(unused_variables)] async fn query_all_raw(&self, stmt: Statement) -> Result, DbErr> { - match &self.inner { - #[cfg(feature = "sqlx-mysql")] - DatabaseConnectionType::SqlxMySqlPoolConnection(conn) => conn.query_all(stmt).await, - #[cfg(feature = "sqlx-postgres")] - DatabaseConnectionType::SqlxPostgresPoolConnection(conn) => conn.query_all(stmt).await, - #[cfg(feature = "sqlx-sqlite")] - DatabaseConnectionType::SqlxSqlitePoolConnection(conn) => conn.query_all(stmt).await, - #[cfg(feature = "rusqlite")] - DatabaseConnectionType::RusqliteSharedConnection(conn) => conn.query_all(stmt), - #[cfg(feature = "mock")] - DatabaseConnectionType::MockDatabaseConnection(conn) => conn.query_all(stmt), - #[cfg(feature = "proxy")] - DatabaseConnectionType::ProxyDatabaseConnection(conn) => conn.query_all(stmt).await, - DatabaseConnectionType::Disconnected => Err(conn_err("Disconnected")), - } + crate::with_db_span!( + "sea_orm.query_all", + self.get_database_backend(), + stmt.sql.as_str(), + record_stmt = true, + async { + match &self.inner { + #[cfg(feature = "sqlx-mysql")] + DatabaseConnectionType::SqlxMySqlPoolConnection(conn) => { + conn.query_all(stmt).await + } + #[cfg(feature = "sqlx-postgres")] + DatabaseConnectionType::SqlxPostgresPoolConnection(conn) => { + conn.query_all(stmt).await + } + #[cfg(feature = "sqlx-sqlite")] + DatabaseConnectionType::SqlxSqlitePoolConnection(conn) => { + conn.query_all(stmt).await + } + #[cfg(feature = "rusqlite")] + DatabaseConnectionType::RusqliteSharedConnection(conn) => conn.query_all(stmt), + #[cfg(feature = "mock")] + DatabaseConnectionType::MockDatabaseConnection(conn) => conn.query_all(stmt), + #[cfg(feature = "proxy")] + DatabaseConnectionType::ProxyDatabaseConnection(conn) => { + conn.query_all(stmt).await + } + DatabaseConnectionType::Disconnected => Err(conn_err("Disconnected")), + } + } + ) } #[cfg(feature = "mock")] diff --git a/src/database/transaction.rs b/src/database/transaction.rs index 8b449fdfb..d20fde672 100644 --- a/src/database/transaction.rs +++ b/src/database/transaction.rs @@ -44,69 +44,77 @@ impl DatabaseTransaction { open: true, metric_callback, }; - { - #[cfg(not(feature = "sync"))] - let conn = &mut *res.conn.lock().await; - #[cfg(feature = "sync")] - let conn = &mut *res.conn.lock().map_err(|_| DbErr::MutexPoisonError)?; - - match conn { - #[cfg(feature = "sqlx-mysql")] - InnerConnection::MySql(c) => { - // in MySQL SET TRANSACTION operations must be executed before transaction start - crate::driver::sqlx_mysql::set_transaction_config( - c, - isolation_level, - access_mode, - ) - .await?; - ::TransactionManager::begin(c, None) - .await - .map_err(sqlx_error_to_query_err) - } - #[cfg(feature = "sqlx-postgres")] - InnerConnection::Postgres(c) => { - ::TransactionManager::begin(c, None) - .await - .map_err(sqlx_error_to_query_err)?; - // in PostgreSQL SET TRANSACTION operations must be executed inside transaction - crate::driver::sqlx_postgres::set_transaction_config( - c, - isolation_level, - access_mode, - ) - .await - } - #[cfg(feature = "sqlx-sqlite")] - InnerConnection::Sqlite(c) => { - // in SQLite isolation level and access mode are global settings - crate::driver::sqlx_sqlite::set_transaction_config( - c, - isolation_level, - access_mode, - ) - .await?; - ::TransactionManager::begin(c, None) + + let begin_result: Result<(), DbErr> = crate::with_db_span!( + "sea_orm.begin", + backend, + "BEGIN", + record_stmt = false, + async { + #[cfg(not(feature = "sync"))] + let conn = &mut *res.conn.lock().await; + #[cfg(feature = "sync")] + let conn = &mut *res.conn.lock().map_err(|_| DbErr::MutexPoisonError)?; + + match conn { + #[cfg(feature = "sqlx-mysql")] + InnerConnection::MySql(c) => { + // in MySQL SET TRANSACTION operations must be executed before transaction start + crate::driver::sqlx_mysql::set_transaction_config( + c, + isolation_level, + access_mode, + ) + .await?; + ::TransactionManager::begin(c, None) + .await + .map_err(sqlx_error_to_query_err) + } + #[cfg(feature = "sqlx-postgres")] + InnerConnection::Postgres(c) => { + ::TransactionManager::begin(c, None) + .await + .map_err(sqlx_error_to_query_err)?; + // in PostgreSQL SET TRANSACTION operations must be executed inside transaction + crate::driver::sqlx_postgres::set_transaction_config( + c, + isolation_level, + access_mode, + ) .await - .map_err(sqlx_error_to_query_err) - } - #[cfg(feature = "rusqlite")] - InnerConnection::Rusqlite(c) => c.begin(), - #[cfg(feature = "mock")] - InnerConnection::Mock(c) => { - c.begin(); - Ok(()) - } - #[cfg(feature = "proxy")] - InnerConnection::Proxy(c) => { - c.begin().await; - Ok(()) + } + #[cfg(feature = "sqlx-sqlite")] + InnerConnection::Sqlite(c) => { + // in SQLite isolation level and access mode are global settings + crate::driver::sqlx_sqlite::set_transaction_config( + c, + isolation_level, + access_mode, + ) + .await?; + ::TransactionManager::begin(c, None) + .await + .map_err(sqlx_error_to_query_err) + } + #[cfg(feature = "rusqlite")] + InnerConnection::Rusqlite(c) => c.begin(), + #[cfg(feature = "mock")] + InnerConnection::Mock(c) => { + c.begin(); + Ok(()) + } + #[cfg(feature = "proxy")] + InnerConnection::Proxy(c) => { + c.begin().await; + Ok(()) + } + #[allow(unreachable_patterns)] + _ => Err(conn_err("Disconnected")), } - #[allow(unreachable_patterns)] - _ => Err(conn_err("Disconnected")), - }? - }; + } + ); + begin_result?; Ok(res) } @@ -137,45 +145,55 @@ impl DatabaseTransaction { #[instrument(level = "trace")] #[allow(unreachable_code, unused_mut)] pub async fn commit(mut self) -> Result<(), DbErr> { - #[cfg(not(feature = "sync"))] - let conn = &mut *self.conn.lock().await; - #[cfg(feature = "sync")] - let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?; - - match conn { - #[cfg(feature = "sqlx-mysql")] - InnerConnection::MySql(c) => { - ::TransactionManager::commit(c) - .await - .map_err(sqlx_error_to_query_err) - } - #[cfg(feature = "sqlx-postgres")] - InnerConnection::Postgres(c) => { - ::TransactionManager::commit(c) - .await - .map_err(sqlx_error_to_query_err) - } - #[cfg(feature = "sqlx-sqlite")] - InnerConnection::Sqlite(c) => { - ::TransactionManager::commit(c) - .await - .map_err(sqlx_error_to_query_err) - } - #[cfg(feature = "rusqlite")] - InnerConnection::Rusqlite(c) => c.commit(), - #[cfg(feature = "mock")] - InnerConnection::Mock(c) => { - c.commit(); - Ok(()) - } - #[cfg(feature = "proxy")] - InnerConnection::Proxy(c) => { - c.commit().await; - Ok(()) + let result: Result<(), DbErr> = crate::with_db_span!( + "sea_orm.commit", + self.backend, + "COMMIT", + record_stmt = false, + async { + #[cfg(not(feature = "sync"))] + let conn = &mut *self.conn.lock().await; + #[cfg(feature = "sync")] + let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?; + + match conn { + #[cfg(feature = "sqlx-mysql")] + InnerConnection::MySql(c) => { + ::TransactionManager::commit(c) + .await + .map_err(sqlx_error_to_query_err) + } + #[cfg(feature = "sqlx-postgres")] + InnerConnection::Postgres(c) => { + ::TransactionManager::commit(c) + .await + .map_err(sqlx_error_to_query_err) + } + #[cfg(feature = "sqlx-sqlite")] + InnerConnection::Sqlite(c) => { + ::TransactionManager::commit(c) + .await + .map_err(sqlx_error_to_query_err) + } + #[cfg(feature = "rusqlite")] + InnerConnection::Rusqlite(c) => c.commit(), + #[cfg(feature = "mock")] + InnerConnection::Mock(c) => { + c.commit(); + Ok(()) + } + #[cfg(feature = "proxy")] + InnerConnection::Proxy(c) => { + c.commit().await; + Ok(()) + } + #[allow(unreachable_patterns)] + _ => Err(conn_err("Disconnected")), + } } - #[allow(unreachable_patterns)] - _ => Err(conn_err("Disconnected")), - }?; + ); + + result?; self.open = false; // read by start_rollback Ok(()) } @@ -184,45 +202,55 @@ impl DatabaseTransaction { #[instrument(level = "trace")] #[allow(unreachable_code, unused_mut)] pub async fn rollback(mut self) -> Result<(), DbErr> { - #[cfg(not(feature = "sync"))] - let conn = &mut *self.conn.lock().await; - #[cfg(feature = "sync")] - let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?; - - match conn { - #[cfg(feature = "sqlx-mysql")] - InnerConnection::MySql(c) => { - ::TransactionManager::rollback(c) - .await - .map_err(sqlx_error_to_query_err) - } - #[cfg(feature = "sqlx-postgres")] - InnerConnection::Postgres(c) => { - ::TransactionManager::rollback(c) - .await - .map_err(sqlx_error_to_query_err) - } - #[cfg(feature = "sqlx-sqlite")] - InnerConnection::Sqlite(c) => { - ::TransactionManager::rollback(c) - .await - .map_err(sqlx_error_to_query_err) - } - #[cfg(feature = "rusqlite")] - InnerConnection::Rusqlite(c) => c.rollback(), - #[cfg(feature = "mock")] - InnerConnection::Mock(c) => { - c.rollback(); - Ok(()) - } - #[cfg(feature = "proxy")] - InnerConnection::Proxy(c) => { - c.rollback().await; - Ok(()) + let result: Result<(), DbErr> = crate::with_db_span!( + "sea_orm.rollback", + self.backend, + "ROLLBACK", + record_stmt = false, + async { + #[cfg(not(feature = "sync"))] + let conn = &mut *self.conn.lock().await; + #[cfg(feature = "sync")] + let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?; + + match conn { + #[cfg(feature = "sqlx-mysql")] + InnerConnection::MySql(c) => { + ::TransactionManager::rollback(c) + .await + .map_err(sqlx_error_to_query_err) + } + #[cfg(feature = "sqlx-postgres")] + InnerConnection::Postgres(c) => { + ::TransactionManager::rollback(c) + .await + .map_err(sqlx_error_to_query_err) + } + #[cfg(feature = "sqlx-sqlite")] + InnerConnection::Sqlite(c) => { + ::TransactionManager::rollback(c) + .await + .map_err(sqlx_error_to_query_err) + } + #[cfg(feature = "rusqlite")] + InnerConnection::Rusqlite(c) => c.rollback(), + #[cfg(feature = "mock")] + InnerConnection::Mock(c) => { + c.rollback(); + Ok(()) + } + #[cfg(feature = "proxy")] + InnerConnection::Proxy(c) => { + c.rollback().await; + Ok(()) + } + #[allow(unreachable_patterns)] + _ => Err(conn_err("Disconnected")), + } } - #[allow(unreachable_patterns)] - _ => Err(conn_err("Disconnected")), - }?; + ); + + result?; self.open = false; // read by start_rollback Ok(()) } @@ -298,48 +326,56 @@ impl ConnectionTrait for DatabaseTransaction { async fn execute_raw(&self, stmt: Statement) -> Result { debug_print!("{}", stmt); - #[cfg(not(feature = "sync"))] - let conn = &mut *self.conn.lock().await; - #[cfg(feature = "sync")] - let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?; - - match conn { - #[cfg(feature = "sqlx-mysql")] - InnerConnection::MySql(conn) => { - let query = crate::driver::sqlx_mysql::sqlx_query(&stmt); - let conn: &mut sqlx::MySqlConnection = &mut *conn; - crate::metric::metric!(self.metric_callback, &stmt, { - query.execute(conn).await.map(Into::into) - }) - .map_err(sqlx_error_to_exec_err) - } - #[cfg(feature = "sqlx-postgres")] - InnerConnection::Postgres(conn) => { - let query = crate::driver::sqlx_postgres::sqlx_query(&stmt); - let conn: &mut sqlx::PgConnection = &mut *conn; - crate::metric::metric!(self.metric_callback, &stmt, { - query.execute(conn).await.map(Into::into) - }) - .map_err(sqlx_error_to_exec_err) - } - #[cfg(feature = "sqlx-sqlite")] - InnerConnection::Sqlite(conn) => { - let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt); - let conn: &mut sqlx::SqliteConnection = &mut *conn; - crate::metric::metric!(self.metric_callback, &stmt, { - query.execute(conn).await.map(Into::into) - }) - .map_err(sqlx_error_to_exec_err) + crate::with_db_span!( + "sea_orm.execute", + self.backend, + stmt.sql.as_str(), + record_stmt = true, + async { + #[cfg(not(feature = "sync"))] + let conn = &mut *self.conn.lock().await; + #[cfg(feature = "sync")] + let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?; + + match conn { + #[cfg(feature = "sqlx-mysql")] + InnerConnection::MySql(conn) => { + let query = crate::driver::sqlx_mysql::sqlx_query(&stmt); + let conn: &mut sqlx::MySqlConnection = &mut *conn; + crate::metric::metric!(self.metric_callback, &stmt, { + query.execute(conn).await.map(Into::into) + }) + .map_err(sqlx_error_to_exec_err) + } + #[cfg(feature = "sqlx-postgres")] + InnerConnection::Postgres(conn) => { + let query = crate::driver::sqlx_postgres::sqlx_query(&stmt); + let conn: &mut sqlx::PgConnection = &mut *conn; + crate::metric::metric!(self.metric_callback, &stmt, { + query.execute(conn).await.map(Into::into) + }) + .map_err(sqlx_error_to_exec_err) + } + #[cfg(feature = "sqlx-sqlite")] + InnerConnection::Sqlite(conn) => { + let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt); + let conn: &mut sqlx::SqliteConnection = &mut *conn; + crate::metric::metric!(self.metric_callback, &stmt, { + query.execute(conn).await.map(Into::into) + }) + .map_err(sqlx_error_to_exec_err) + } + #[cfg(feature = "rusqlite")] + InnerConnection::Rusqlite(conn) => conn.execute(stmt, &self.metric_callback), + #[cfg(feature = "mock")] + InnerConnection::Mock(conn) => conn.execute(stmt), + #[cfg(feature = "proxy")] + InnerConnection::Proxy(conn) => conn.execute(stmt).await, + #[allow(unreachable_patterns)] + _ => Err(conn_err("Disconnected")), + } } - #[cfg(feature = "rusqlite")] - InnerConnection::Rusqlite(conn) => conn.execute(stmt, &self.metric_callback), - #[cfg(feature = "mock")] - InnerConnection::Mock(conn) => return conn.execute(stmt), - #[cfg(feature = "proxy")] - InnerConnection::Proxy(conn) => return conn.execute(stmt).await, - #[allow(unreachable_patterns)] - _ => Err(conn_err("Disconnected")), - } + ) } #[instrument(level = "trace")] @@ -347,53 +383,61 @@ impl ConnectionTrait for DatabaseTransaction { async fn execute_unprepared(&self, sql: &str) -> Result { debug_print!("{}", sql); - #[cfg(not(feature = "sync"))] - let conn = &mut *self.conn.lock().await; - #[cfg(feature = "sync")] - let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?; - - match conn { - #[cfg(feature = "sqlx-mysql")] - InnerConnection::MySql(conn) => { - let conn: &mut sqlx::MySqlConnection = &mut *conn; - sqlx::Executor::execute(conn, sql) - .await - .map(Into::into) - .map_err(sqlx_error_to_exec_err) - } - #[cfg(feature = "sqlx-postgres")] - InnerConnection::Postgres(conn) => { - let conn: &mut sqlx::PgConnection = &mut *conn; - sqlx::Executor::execute(conn, sql) - .await - .map(Into::into) - .map_err(sqlx_error_to_exec_err) - } - #[cfg(feature = "sqlx-sqlite")] - InnerConnection::Sqlite(conn) => { - let conn: &mut sqlx::SqliteConnection = &mut *conn; - sqlx::Executor::execute(conn, sql) - .await - .map(Into::into) - .map_err(sqlx_error_to_exec_err) - } - #[cfg(feature = "rusqlite")] - InnerConnection::Rusqlite(conn) => conn.execute_unprepared(sql), - #[cfg(feature = "mock")] - InnerConnection::Mock(conn) => { - let db_backend = conn.get_database_backend(); - let stmt = Statement::from_string(db_backend, sql); - conn.execute(stmt) - } - #[cfg(feature = "proxy")] - InnerConnection::Proxy(conn) => { - let db_backend = conn.get_database_backend(); - let stmt = Statement::from_string(db_backend, sql); - conn.execute(stmt).await + crate::with_db_span!( + "sea_orm.execute_unprepared", + self.backend, + sql, + record_stmt = false, + async { + #[cfg(not(feature = "sync"))] + let conn = &mut *self.conn.lock().await; + #[cfg(feature = "sync")] + let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?; + + match conn { + #[cfg(feature = "sqlx-mysql")] + InnerConnection::MySql(conn) => { + let conn: &mut sqlx::MySqlConnection = &mut *conn; + sqlx::Executor::execute(conn, sql) + .await + .map(Into::into) + .map_err(sqlx_error_to_exec_err) + } + #[cfg(feature = "sqlx-postgres")] + InnerConnection::Postgres(conn) => { + let conn: &mut sqlx::PgConnection = &mut *conn; + sqlx::Executor::execute(conn, sql) + .await + .map(Into::into) + .map_err(sqlx_error_to_exec_err) + } + #[cfg(feature = "sqlx-sqlite")] + InnerConnection::Sqlite(conn) => { + let conn: &mut sqlx::SqliteConnection = &mut *conn; + sqlx::Executor::execute(conn, sql) + .await + .map(Into::into) + .map_err(sqlx_error_to_exec_err) + } + #[cfg(feature = "rusqlite")] + InnerConnection::Rusqlite(conn) => conn.execute_unprepared(sql), + #[cfg(feature = "mock")] + InnerConnection::Mock(conn) => { + let db_backend = conn.get_database_backend(); + let stmt = Statement::from_string(db_backend, sql); + conn.execute(stmt) + } + #[cfg(feature = "proxy")] + InnerConnection::Proxy(conn) => { + let db_backend = conn.get_database_backend(); + let stmt = Statement::from_string(db_backend, sql); + conn.execute(stmt).await + } + #[allow(unreachable_patterns)] + _ => Err(conn_err("Disconnected")), + } } - #[allow(unreachable_patterns)] - _ => Err(conn_err("Disconnected")), - } + ) } #[instrument(level = "trace")] @@ -401,51 +445,59 @@ impl ConnectionTrait for DatabaseTransaction { async fn query_one_raw(&self, stmt: Statement) -> Result, DbErr> { debug_print!("{}", stmt); - #[cfg(not(feature = "sync"))] - let conn = &mut *self.conn.lock().await; - #[cfg(feature = "sync")] - let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?; - - match conn { - #[cfg(feature = "sqlx-mysql")] - InnerConnection::MySql(conn) => { - let query = crate::driver::sqlx_mysql::sqlx_query(&stmt); - let conn: &mut sqlx::MySqlConnection = &mut *conn; - crate::metric::metric!(self.metric_callback, &stmt, { - crate::sqlx_map_err_ignore_not_found( - query.fetch_one(conn).await.map(|row| Some(row.into())), - ) - }) - } - #[cfg(feature = "sqlx-postgres")] - InnerConnection::Postgres(conn) => { - let query = crate::driver::sqlx_postgres::sqlx_query(&stmt); - let conn: &mut sqlx::PgConnection = &mut *conn; - crate::metric::metric!(self.metric_callback, &stmt, { - crate::sqlx_map_err_ignore_not_found( - query.fetch_one(conn).await.map(|row| Some(row.into())), - ) - }) - } - #[cfg(feature = "sqlx-sqlite")] - InnerConnection::Sqlite(conn) => { - let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt); - let conn: &mut sqlx::SqliteConnection = &mut *conn; - crate::metric::metric!(self.metric_callback, &stmt, { - crate::sqlx_map_err_ignore_not_found( - query.fetch_one(conn).await.map(|row| Some(row.into())), - ) - }) + crate::with_db_span!( + "sea_orm.query_one", + self.backend, + stmt.sql.as_str(), + record_stmt = true, + async { + #[cfg(not(feature = "sync"))] + let conn = &mut *self.conn.lock().await; + #[cfg(feature = "sync")] + let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?; + + match conn { + #[cfg(feature = "sqlx-mysql")] + InnerConnection::MySql(conn) => { + let query = crate::driver::sqlx_mysql::sqlx_query(&stmt); + let conn: &mut sqlx::MySqlConnection = &mut *conn; + crate::metric::metric!(self.metric_callback, &stmt, { + crate::sqlx_map_err_ignore_not_found( + query.fetch_one(conn).await.map(|row| Some(row.into())), + ) + }) + } + #[cfg(feature = "sqlx-postgres")] + InnerConnection::Postgres(conn) => { + let query = crate::driver::sqlx_postgres::sqlx_query(&stmt); + let conn: &mut sqlx::PgConnection = &mut *conn; + crate::metric::metric!(self.metric_callback, &stmt, { + crate::sqlx_map_err_ignore_not_found( + query.fetch_one(conn).await.map(|row| Some(row.into())), + ) + }) + } + #[cfg(feature = "sqlx-sqlite")] + InnerConnection::Sqlite(conn) => { + let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt); + let conn: &mut sqlx::SqliteConnection = &mut *conn; + crate::metric::metric!(self.metric_callback, &stmt, { + crate::sqlx_map_err_ignore_not_found( + query.fetch_one(conn).await.map(|row| Some(row.into())), + ) + }) + } + #[cfg(feature = "rusqlite")] + InnerConnection::Rusqlite(conn) => conn.query_one(stmt, &self.metric_callback), + #[cfg(feature = "mock")] + InnerConnection::Mock(conn) => conn.query_one(stmt), + #[cfg(feature = "proxy")] + InnerConnection::Proxy(conn) => conn.query_one(stmt).await, + #[allow(unreachable_patterns)] + _ => Err(conn_err("Disconnected")), + } } - #[cfg(feature = "rusqlite")] - InnerConnection::Rusqlite(conn) => conn.query_one(stmt, &self.metric_callback), - #[cfg(feature = "mock")] - InnerConnection::Mock(conn) => return conn.query_one(stmt), - #[cfg(feature = "proxy")] - InnerConnection::Proxy(conn) => return conn.query_one(stmt).await, - #[allow(unreachable_patterns)] - _ => Err(conn_err("Disconnected")), - } + ) } #[instrument(level = "trace")] @@ -453,57 +505,65 @@ impl ConnectionTrait for DatabaseTransaction { async fn query_all_raw(&self, stmt: Statement) -> Result, DbErr> { debug_print!("{}", stmt); - #[cfg(not(feature = "sync"))] - let conn = &mut *self.conn.lock().await; - #[cfg(feature = "sync")] - let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?; - - match conn { - #[cfg(feature = "sqlx-mysql")] - InnerConnection::MySql(conn) => { - let query = crate::driver::sqlx_mysql::sqlx_query(&stmt); - let conn: &mut sqlx::MySqlConnection = &mut *conn; - crate::metric::metric!(self.metric_callback, &stmt, { - query - .fetch_all(conn) - .await - .map(|rows| rows.into_iter().map(|r| r.into()).collect()) - .map_err(sqlx_error_to_query_err) - }) - } - #[cfg(feature = "sqlx-postgres")] - InnerConnection::Postgres(conn) => { - let query = crate::driver::sqlx_postgres::sqlx_query(&stmt); - let conn: &mut sqlx::PgConnection = &mut *conn; - crate::metric::metric!(self.metric_callback, &stmt, { - query - .fetch_all(conn) - .await - .map(|rows| rows.into_iter().map(|r| r.into()).collect()) - .map_err(sqlx_error_to_query_err) - }) - } - #[cfg(feature = "sqlx-sqlite")] - InnerConnection::Sqlite(conn) => { - let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt); - let conn: &mut sqlx::SqliteConnection = &mut *conn; - crate::metric::metric!(self.metric_callback, &stmt, { - query - .fetch_all(conn) - .await - .map(|rows| rows.into_iter().map(|r| r.into()).collect()) - .map_err(sqlx_error_to_query_err) - }) + crate::with_db_span!( + "sea_orm.query_all", + self.backend, + stmt.sql.as_str(), + record_stmt = true, + async { + #[cfg(not(feature = "sync"))] + let conn = &mut *self.conn.lock().await; + #[cfg(feature = "sync")] + let conn = &mut *self.conn.lock().map_err(|_| DbErr::MutexPoisonError)?; + + match conn { + #[cfg(feature = "sqlx-mysql")] + InnerConnection::MySql(conn) => { + let query = crate::driver::sqlx_mysql::sqlx_query(&stmt); + let conn: &mut sqlx::MySqlConnection = &mut *conn; + crate::metric::metric!(self.metric_callback, &stmt, { + query + .fetch_all(conn) + .await + .map(|rows| rows.into_iter().map(|r| r.into()).collect()) + .map_err(sqlx_error_to_query_err) + }) + } + #[cfg(feature = "sqlx-postgres")] + InnerConnection::Postgres(conn) => { + let query = crate::driver::sqlx_postgres::sqlx_query(&stmt); + let conn: &mut sqlx::PgConnection = &mut *conn; + crate::metric::metric!(self.metric_callback, &stmt, { + query + .fetch_all(conn) + .await + .map(|rows| rows.into_iter().map(|r| r.into()).collect()) + .map_err(sqlx_error_to_query_err) + }) + } + #[cfg(feature = "sqlx-sqlite")] + InnerConnection::Sqlite(conn) => { + let query = crate::driver::sqlx_sqlite::sqlx_query(&stmt); + let conn: &mut sqlx::SqliteConnection = &mut *conn; + crate::metric::metric!(self.metric_callback, &stmt, { + query + .fetch_all(conn) + .await + .map(|rows| rows.into_iter().map(|r| r.into()).collect()) + .map_err(sqlx_error_to_query_err) + }) + } + #[cfg(feature = "rusqlite")] + InnerConnection::Rusqlite(conn) => conn.query_all(stmt, &self.metric_callback), + #[cfg(feature = "mock")] + InnerConnection::Mock(conn) => conn.query_all(stmt), + #[cfg(feature = "proxy")] + InnerConnection::Proxy(conn) => conn.query_all(stmt).await, + #[allow(unreachable_patterns)] + _ => Err(conn_err("Disconnected")), + } } - #[cfg(feature = "rusqlite")] - InnerConnection::Rusqlite(conn) => conn.query_all(stmt, &self.metric_callback), - #[cfg(feature = "mock")] - InnerConnection::Mock(conn) => return conn.query_all(stmt), - #[cfg(feature = "proxy")] - InnerConnection::Proxy(conn) => return conn.query_all(stmt).await, - #[allow(unreachable_patterns)] - _ => Err(conn_err("Disconnected")), - } + ) } } diff --git a/src/lib.rs b/src/lib.rs index 08e206eb5..f4b6a2b4d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -715,6 +715,41 @@ pub mod query; pub mod rbac; /// Types that defines the schemas of an Entity pub mod schema; +// Internal module for tracing spans +#[cfg(feature = "tracing-spans")] +pub(crate) mod tracing_spans; + +/// Execute a future and wrap it in a tracing span when `tracing-spans` is enabled. +/// +/// When the feature is disabled, this macro simply awaits the future with zero overhead. +/// +/// # Arguments +/// - `$name`: span name (e.g., "sea_orm.execute") +/// - `$backend`: DbBackend +/// - `$sql`: &str used for db.operation parsing +/// - `record_stmt`: whether to record `db.statement` +/// - `$fut`: the future to execute +#[doc(hidden)] +#[macro_export] +macro_rules! with_db_span { + ($name:expr, $backend:expr, $sql:expr, record_stmt = $record_stmt:expr, $fut:expr) => {{ + #[cfg(feature = "tracing-spans")] + { + let span = $crate::db_span!($name, $backend, $sql); + if $record_stmt { + span.record("db.statement", $sql); + } + let result = ::tracing::Instrument::instrument($fut, span.clone()).await; + $crate::tracing_spans::record_query_result(&span, &result); + result + } + #[cfg(not(feature = "tracing-spans"))] + { + $fut.await + } + }}; +} + /// Helpers for working with Value pub mod value; diff --git a/src/tracing_spans.rs b/src/tracing_spans.rs new file mode 100644 index 000000000..c08c9a930 --- /dev/null +++ b/src/tracing_spans.rs @@ -0,0 +1,182 @@ +//! Tracing support for database operations. +//! +//! This module provides utilities for instrumenting database operations +//! with tracing spans. Enable the `tracing-spans` feature to automatically +//! generate spans for all database operations. +//! +//! # Example +//! +//! ```toml +//! [dependencies] +//! sea-orm = { version = "2.0", features = ["tracing-spans"] } +//! ``` +//! +//! ```ignore +//! use sea_orm::Database; +//! +//! // Set up a tracing subscriber +//! tracing_subscriber::fmt() +//! .with_max_level(tracing::Level::INFO) +//! .init(); +//! +//! // All database operations will now generate tracing spans +//! let db = Database::connect("sqlite::memory:").await?; +//! let cakes = Cake::find().all(&db).await?; // Generates a span +//! ``` + +use crate::DbBackend; + +/// Database operation type, following OpenTelemetry conventions. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum DbOperation { + /// SELECT query + Select, + /// INSERT statement + Insert, + /// UPDATE statement + Update, + /// DELETE statement + Delete, + /// Other/unknown SQL execution + Execute, +} + +impl DbOperation { + /// Parse the operation type from an SQL query string. + /// + /// This function is allocation-free and uses case-insensitive comparison. + pub fn from_sql(sql: &str) -> Self { + let first_word = sql.trim_start().split_whitespace().next().unwrap_or(""); + if first_word.eq_ignore_ascii_case("SELECT") { + DbOperation::Select + } else if first_word.eq_ignore_ascii_case("INSERT") { + DbOperation::Insert + } else if first_word.eq_ignore_ascii_case("UPDATE") { + DbOperation::Update + } else if first_word.eq_ignore_ascii_case("DELETE") { + DbOperation::Delete + } else { + DbOperation::Execute + } + } +} + +impl std::fmt::Display for DbOperation { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + DbOperation::Select => write!(f, "SELECT"), + DbOperation::Insert => write!(f, "INSERT"), + DbOperation::Update => write!(f, "UPDATE"), + DbOperation::Delete => write!(f, "DELETE"), + DbOperation::Execute => write!(f, "EXECUTE"), + } + } +} + +/// Get the OpenTelemetry system name from DbBackend. +pub(crate) fn db_system_name(backend: DbBackend) -> &'static str { + match backend { + DbBackend::Postgres => "postgresql", + DbBackend::MySql => "mysql", + DbBackend::Sqlite => "sqlite", + } +} + +/// Record query result on a span (success/failure status and error message). +pub(crate) fn record_query_result( + span: &tracing::Span, + result: &Result, +) { + match result { + Ok(_) => { + span.record("otel.status_code", "OK"); + } + Err(e) => { + span.record("otel.status_code", "ERROR"); + span.record("exception.message", tracing::field::display(e)); + } + } +} + +/// Create a tracing span for database operations. +/// +/// Arguments: +/// - `$name`: Span name (e.g., "sea_orm.execute", "sea_orm.query_one") +/// - `$backend`: DbBackend value +/// - `$sql`: SQL statement string (used for operation parsing) +/// +/// Note: `db.statement` is set to Empty. Call `span.record("db.statement", sql)` +/// separately if the query is parameterized (safe to log). +#[doc(hidden)] +#[macro_export] +macro_rules! db_span { + ($name:expr, $backend:expr, $sql:expr) => {{ + let sql: &str = $sql; + let op = $crate::tracing_spans::DbOperation::from_sql(sql); + ::tracing::info_span!( + $name, + db.system = $crate::tracing_spans::db_system_name($backend), + db.operation = %op, + db.statement = ::tracing::field::Empty, + otel.status_code = ::tracing::field::Empty, + exception.message = ::tracing::field::Empty, + ) + }}; +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_db_operation_from_sql() { + assert_eq!( + DbOperation::from_sql("SELECT * FROM users"), + DbOperation::Select + ); + assert_eq!( + DbOperation::from_sql(" SELECT * FROM users"), + DbOperation::Select + ); + assert_eq!( + DbOperation::from_sql("select * from users"), + DbOperation::Select + ); + assert_eq!( + DbOperation::from_sql("INSERT INTO users"), + DbOperation::Insert + ); + assert_eq!( + DbOperation::from_sql("UPDATE users SET"), + DbOperation::Update + ); + assert_eq!( + DbOperation::from_sql("DELETE FROM users"), + DbOperation::Delete + ); + assert_eq!( + DbOperation::from_sql("CREATE TABLE users"), + DbOperation::Execute + ); + assert_eq!( + DbOperation::from_sql("DROP TABLE users"), + DbOperation::Execute + ); + } + + #[test] + fn test_db_system_name() { + assert_eq!(db_system_name(DbBackend::Postgres), "postgresql"); + assert_eq!(db_system_name(DbBackend::MySql), "mysql"); + assert_eq!(db_system_name(DbBackend::Sqlite), "sqlite"); + } + + #[test] + fn test_db_operation_display() { + assert_eq!(DbOperation::Select.to_string(), "SELECT"); + assert_eq!(DbOperation::Insert.to_string(), "INSERT"); + assert_eq!(DbOperation::Update.to_string(), "UPDATE"); + assert_eq!(DbOperation::Delete.to_string(), "DELETE"); + assert_eq!(DbOperation::Execute.to_string(), "EXECUTE"); + } +}