diff --git a/Cargo.lock b/Cargo.lock index 6eedfb1c3..c2d6485d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4955,7 +4955,7 @@ dependencies = [ [[package]] name = "py-scouter" -version = "0.14.0" +version = "0.15.0" dependencies = [ "ndarray-stats", "num-traits", @@ -5889,7 +5889,7 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "scouter-auth" -version = "0.14.0" +version = "0.15.0" dependencies = [ "jsonwebtoken 9.3.1", "password-auth", @@ -5898,11 +5898,12 @@ dependencies = [ "scouter-sql", "serde", "thiserror 2.0.18", + "tracing", ] [[package]] name = "scouter-client" -version = "0.14.0" +version = "0.15.0" dependencies = [ "chrono", "ndarray", @@ -5933,7 +5934,7 @@ dependencies = [ [[package]] name = "scouter-dataframe" -version = "0.14.0" +version = "0.15.0" dependencies = [ "arrow", "arrow-array", @@ -5958,7 +5959,7 @@ dependencies = [ [[package]] name = "scouter-dispatch" -version = "0.14.0" +version = "0.15.0" dependencies = [ "futures", "mockito", @@ -5974,7 +5975,7 @@ dependencies = [ [[package]] name = "scouter-drift" -version = "0.14.0" +version = "0.15.0" dependencies = [ "approx", "chrono", @@ -6005,7 +6006,7 @@ dependencies = [ [[package]] name = "scouter-evaluate" -version = "0.14.0" +version = "0.15.0" dependencies = [ "chrono", "itertools 0.14.0", @@ -6031,7 +6032,7 @@ dependencies = [ [[package]] name = "scouter-events" -version = "0.14.0" +version = "0.15.0" dependencies = [ "async-trait", "chrono", @@ -6070,7 +6071,7 @@ dependencies = [ [[package]] name = "scouter-http" -version = "0.14.0" +version = "0.15.0" dependencies = [ "pyo3", "reqwest 0.12.28", @@ -6084,14 +6085,14 @@ dependencies = [ [[package]] name = "scouter-macro" -version = "0.14.0" +version = "0.15.0" dependencies = [ "thiserror 2.0.18", ] [[package]] name = "scouter-mocks" -version = "0.14.0" +version = "0.15.0" dependencies = [ "mockito", "potato-head", @@ -6108,7 +6109,7 @@ dependencies = [ [[package]] name = "scouter-observability" -version = "0.14.0" +version = "0.15.0" dependencies = [ "itertools 0.14.0", "ndarray", @@ -6125,7 +6126,7 @@ dependencies = [ [[package]] name = "scouter-profile" -version = "0.14.0" +version = "0.15.0" dependencies = [ "approx", "chrono", @@ -6149,7 +6150,7 @@ dependencies = [ [[package]] name = "scouter-semver" -version = "0.14.0" +version = "0.15.0" dependencies = [ "pyo3", "semver", @@ -6159,7 +6160,7 @@ dependencies = [ [[package]] name = "scouter-server" -version = "0.14.0" +version = "0.15.0" dependencies = [ "anyhow", "approx", @@ -6169,6 +6170,7 @@ dependencies = [ "flume", "futures", "http-body-util", + "jsonwebtoken 9.3.1", "metrics", "metrics-exporter-prometheus", "mimalloc", @@ -6211,7 +6213,7 @@ dependencies = [ [[package]] name = "scouter-settings" -version = "0.14.0" +version = "0.15.0" dependencies = [ "base64", "potato-head", @@ -6223,7 +6225,7 @@ dependencies = [ [[package]] name = "scouter-sql" -version = "0.14.0" +version = "0.15.0" dependencies = [ "anyhow", "async-trait", @@ -6250,7 +6252,7 @@ dependencies = [ [[package]] name = "scouter-state" -version = "0.14.0" +version = "0.15.0" dependencies = [ "thiserror 2.0.18", "tokio", @@ -6259,7 +6261,7 @@ dependencies = [ [[package]] name = "scouter-tonic" -version = "0.14.0" +version = "0.15.0" dependencies = [ "prost", "prost-types", @@ -6277,7 +6279,7 @@ dependencies = [ [[package]] name = "scouter-tracing" -version = "0.14.0" +version = "0.15.0" dependencies = [ "chrono", "opentelemetry", @@ -6301,7 +6303,7 @@ dependencies = [ [[package]] name = "scouter-types" -version = "0.14.0" +version = "0.15.0" dependencies = [ "approx", "base64", @@ -8560,9 +8562,9 @@ checksum = "40990edd51aae2c2b6907af74ffb635029d5788228222c4bb811e9351c0caad3" [[package]] name = "zmij" -version = "1.0.15" +version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94f63c051f4fe3c1509da62131a678643c5b6fbdc9273b2b79d4378ebda003d2" +checksum = "dfcd145825aace48cff44a8844de64bf75feec3080e0aa5cdbde72961ae51a65" [[package]] name = "zstd" diff --git a/Cargo.toml b/Cargo.toml index 9aedec907..34f15a6f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ default-members = [ ] [workspace.package] -version = "0.14.0" +version = "0.15.0" authors = [ "Thorrester ", "russellkemmit ", @@ -20,25 +20,25 @@ repository = "https://github.com/demml/scouter" [workspace.dependencies] -scouter-auth = { path = "crates/scouter_auth", version = "0.14.0" } -scouter-client = { path = "crates/scouter_client", version = "0.14.0" } -scouter-dispatch = { path = "crates/scouter_dispatch", version = "0.14.0" } -scouter-drift = { path = "crates/scouter_drift", version = "0.14.0", default-features = false } -scouter-evaluate = { path = "crates/scouter_evaluate", version = "0.14.0" } -scouter-events = { path = "crates/scouter_events", version = "0.14.0", default-features = false } -scouter-http = { path = "crates/scouter_http", version = "0.14.0" } -scouter-observability = { path = "crates/scouter_observability", version = "0.14.0" } -scouter-profile = { path = "crates/scouter_profile", version = "0.14.0" } -scouter-server = { path = "crates/scouter_server", version = "0.14.0" } -scouter-semver = { path = "crates/scouter_semver", version = "0.14.0" } -scouter-settings = { path = "crates/scouter_settings", version = "0.14.0" } -scouter-dataframe = { path = "crates/scouter_dataframe", version = "0.14.0" } -scouter-macro = { path = "crates/scouter_macro", version = "0.14.0" } -scouter-state = { path = "crates/scouter_state", version = "0.14.0" } -scouter-sql = { path = "crates/scouter_sql", version = "0.14.0" } -scouter-tonic = { path = "crates/scouter_tonic", version = "0.14.0" } -scouter-tracing = { path = "crates/scouter_tracing", version = "0.14.0" } -scouter-types = { path = "crates/scouter_types", version = "0.14.0" } +scouter-auth = { path = "crates/scouter_auth", version = "0.15.0" } +scouter-client = { path = "crates/scouter_client", version = "0.15.0" } +scouter-dispatch = { path = "crates/scouter_dispatch", version = "0.15.0" } +scouter-drift = { path = "crates/scouter_drift", version = "0.15.0", default-features = false } +scouter-evaluate = { path = "crates/scouter_evaluate", version = "0.15.0" } +scouter-events = { path = "crates/scouter_events", version = "0.15.0", default-features = false } +scouter-http = { path = "crates/scouter_http", version = "0.15.0" } +scouter-observability = { path = "crates/scouter_observability", version = "0.15.0" } +scouter-profile = { path = "crates/scouter_profile", version = "0.15.0" } +scouter-server = { path = "crates/scouter_server", version = "0.15.0" } +scouter-semver = { path = "crates/scouter_semver", version = "0.15.0" } +scouter-settings = { path = "crates/scouter_settings", version = "0.15.0" } +scouter-dataframe = { path = "crates/scouter_dataframe", version = "0.15.0" } +scouter-macro = { path = "crates/scouter_macro", version = "0.15.0" } +scouter-state = { path = "crates/scouter_state", version = "0.15.0" } +scouter-sql = { path = "crates/scouter_sql", version = "0.15.0" } +scouter-tonic = { path = "crates/scouter_tonic", version = "0.15.0" } +scouter-tracing = { path = "crates/scouter_tracing", version = "0.15.0" } +scouter-types = { path = "crates/scouter_types", version = "0.15.0" } scouter-mocks = { path = "crates/scouter_mocks" } test-utils = { path = "crates/test_utils" } diff --git a/crates/scouter_auth/Cargo.toml b/crates/scouter_auth/Cargo.toml index e720d2163..108a79953 100644 --- a/crates/scouter_auth/Cargo.toml +++ b/crates/scouter_auth/Cargo.toml @@ -18,6 +18,7 @@ rand = { workspace = true } rayon = { workspace = true } serde = { workspace = true } thiserror = { workspace = true } +tracing = { workspace = true } scouter-sql = { workspace = true } \ No newline at end of file diff --git a/crates/scouter_auth/src/auth.rs b/crates/scouter_auth/src/auth.rs index 7eaa36960..f76a92c8f 100644 --- a/crates/scouter_auth/src/auth.rs +++ b/crates/scouter_auth/src/auth.rs @@ -6,6 +6,7 @@ use rand::Rng; use scouter_sql::sql::schema::User; use serde::{Deserialize, Serialize}; use std::time::{SystemTime, UNIX_EPOCH}; +use tracing::error; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Claims { @@ -87,7 +88,8 @@ impl AuthManager { token, &DecodingKey::from_secret(self.jwt_secret.as_ref()), &Validation::default(), - )?; + ) + .inspect_err(|e| error!("failed to validate JWT: {:?}", e))?; Ok(token_data.claims) } @@ -96,12 +98,14 @@ impl AuthManager { token: &str, ) -> Result { let mut validation = Validation::default(); - validation.insecure_disable_signature_validation(); + // Disable expiration validation ( we just want to decode the claims ) + validation.validate_exp = false; let token_data = decode::( token, &DecodingKey::from_secret(self.jwt_secret.as_ref()), &validation, - )?; + ) + .inspect_err(|e| error!("failed to decode JWT without validation: {:?}", e))?; Ok(token_data.claims) } diff --git a/crates/scouter_events/src/producer/grpc/producer.rs b/crates/scouter_events/src/producer/grpc/producer.rs index d71d37d99..8e8026f65 100644 --- a/crates/scouter_events/src/producer/grpc/producer.rs +++ b/crates/scouter_events/src/producer/grpc/producer.rs @@ -19,9 +19,13 @@ impl GrpcProducer { pub async fn publish(&self, message: MessageRecord) -> Result<(), EventError> { let msg_bytes = serde_json::to_vec(&message)?; + let msg_type = message.record_type(); let response = self.client.insert_message(msg_bytes).await?; - debug!("Published message to drift with response: {:?}", response); + debug!( + "Published message to drift with response: {:?}, message type: {:?}", + response, msg_type + ); Ok(()) } diff --git a/crates/scouter_events/src/queue/bus.rs b/crates/scouter_events/src/queue/bus.rs index 71a633d06..600a1d331 100644 --- a/crates/scouter_events/src/queue/bus.rs +++ b/crates/scouter_events/src/queue/bus.rs @@ -208,7 +208,7 @@ pub struct QueueBus { impl QueueBus { #[instrument(skip_all)] pub fn new(task_state: TaskState, identifier: String) -> Self { - debug!("Creating unbounded QueueBus"); + debug!("Creating unbounded QueueBus for identifier: {}", identifier); Self { task_state, @@ -218,6 +218,10 @@ impl QueueBus { #[instrument(skip_all)] pub fn publish(&self, event: Event) -> Result<(), EventError> { + debug!( + "Publishing event to QueueBus for identifier: {}", + self.identifier + ); Ok(self.task_state.event_tx.send(event)?) } } @@ -231,7 +235,10 @@ impl QueueBus { pub fn insert(&self, item: &Bound<'_, PyAny>) -> Result<(), PyEventError> { let item = QueueItem::from_py_entity(item) .inspect_err(|e| error!("Failed to convert entity to QueueItem: {}", e))?; - debug!("Inserting event into QueueBus: {:?}", item); + debug!( + "Inserting event into QueueBus for identifier: {}: {:?}", + self.identifier, item + ); let event = Event::Task(item); self.publish(event)?; Ok(()) diff --git a/crates/scouter_events/src/queue/genai/queue.rs b/crates/scouter_events/src/queue/genai/queue.rs index 15e628a9d..34c96cefc 100644 --- a/crates/scouter_events/src/queue/genai/queue.rs +++ b/crates/scouter_events/src/queue/genai/queue.rs @@ -1,5 +1,6 @@ use crate::error::EventError; use crate::producer::RustScouterProducer; +use crate::queue::bus::TaskState; use crate::queue::genai::record_queue::GenAIEvalRecordQueue; use crate::queue::traits::BackgroundTask; use crate::queue::traits::QueueMethods; @@ -12,6 +13,7 @@ use scouter_types::genai::GenAIEvalProfile; use scouter_types::GenAIEvalRecord; use std::sync::Arc; use std::sync::RwLock; +use tokio_util::sync::CancellationToken; use tracing::debug; const GENAI_MAX_QUEUE_SIZE: usize = 25; @@ -43,6 +45,8 @@ impl GenAIQueue { drift_profile: GenAIEvalProfile, config: TransportConfig, settings: Arc>, + task_state: &mut TaskState, + identifier: String, ) -> Result { debug!("Creating GenAI Drift Queue"); // ArrayQueue size is based on sample rate @@ -51,16 +55,31 @@ impl GenAIQueue { let last_publish = Arc::new(RwLock::new(Utc::now())); let producer = RustScouterProducer::new(config).await?; + let cancellation_token = CancellationToken::new(); let genai_queue = GenAIQueue { - queue, - record_queue, + queue: queue.clone(), + record_queue: record_queue.clone(), producer, last_publish, capacity: GENAI_MAX_QUEUE_SIZE, settings, }; + let handle = genai_queue.start_background_task( + queue, + record_queue, + genai_queue.producer.clone(), + genai_queue.last_publish.clone(), + genai_queue.capacity, + identifier, + task_state.clone(), + cancellation_token.clone(), + )?; + + task_state.add_background_abort_handle(handle); + task_state.add_background_cancellation_token(cancellation_token); + Ok(genai_queue) } diff --git a/crates/scouter_events/src/queue/py_queue.rs b/crates/scouter_events/src/queue/py_queue.rs index 182d164ca..a8ff0bde2 100644 --- a/crates/scouter_events/src/queue/py_queue.rs +++ b/crates/scouter_events/src/queue/py_queue.rs @@ -80,8 +80,14 @@ impl QueueNum { } }; - let queue = - GenAIQueue::new(genai_profile, transport_config, queue_settings).await?; + let queue = GenAIQueue::new( + genai_profile, + transport_config, + queue_settings, + task_state, + identifier, + ) + .await?; Ok(QueueNum::GenAI(queue)) } } @@ -165,6 +171,7 @@ impl QueueNum { } #[allow(clippy::too_many_arguments)] +#[instrument(skip_all)] async fn spawn_queue_event_handler( mut event_rx: UnboundedReceiver, transport_config: TransportConfig, @@ -401,6 +408,7 @@ impl ScouterQueue { /// /// # Returns /// * `ScouterQueue` - A new ScouterQueue + #[instrument(skip_all)] pub fn from_path_rs( py: Python, path: HashMap, diff --git a/crates/scouter_events/src/queue/traits/queue.rs b/crates/scouter_events/src/queue/traits/queue.rs index d8a76d55c..4d2cfd726 100644 --- a/crates/scouter_events/src/queue/traits/queue.rs +++ b/crates/scouter_events/src/queue/traits/queue.rs @@ -43,7 +43,7 @@ pub trait BackgroundTask: Send + Sync + 'static { let span = info_span!("background_task", task = %identifier); let future = async move { - debug!("Starting background task"); + debug!("Starting background task for {}", identifier); // Set running state immediately task_state.set_background_running(true); @@ -236,10 +236,10 @@ pub fn wait_for_background_task(task_state: &TaskState) -> Result<(), EventError max_retries -= 1; std::thread::sleep(Duration::from_millis(200)); } - error!("Background task failed to start"); + error!("Background task failed to start for {}", task_state.id); Err(EventError::BackgroundTaskFailedToStartError) } else { - debug!("No background handle to wait for"); + debug!("No background handle to wait for {}", task_state.id); Ok(()) } } diff --git a/crates/scouter_http/src/lib.rs b/crates/scouter_http/src/lib.rs index 6bdffc326..46da75dbf 100644 --- a/crates/scouter_http/src/lib.rs +++ b/crates/scouter_http/src/lib.rs @@ -17,7 +17,6 @@ pub fn build_http_client(settings: &HttpConfig) -> Result { let mut headers = HeaderMap::new(); headers.insert("Username", HeaderValue::from_str(&settings.username)?); - headers.insert("Password", HeaderValue::from_str(&settings.password)?); let client_builder = Client::builder().timeout(std::time::Duration::from_secs(TIMEOUT_SECS)); diff --git a/crates/scouter_server/Cargo.toml b/crates/scouter_server/Cargo.toml index 7b27b0d3e..ae0a92eb9 100644 --- a/crates/scouter_server/Cargo.toml +++ b/crates/scouter_server/Cargo.toml @@ -34,6 +34,7 @@ metrics-exporter-prometheus = { workspace = true } mimalloc = { workspace = true } password-auth = { workspace = true } potato-head = { workspace = true } +jsonwebtoken = { workspace = true } rand = { workspace = true } rusty-logging= { workspace = true } scouter-settings = { workspace = true } diff --git a/crates/scouter_server/src/api/grpc/interceptor.rs b/crates/scouter_server/src/api/grpc/interceptor.rs index 38a5a4673..06e4c92a9 100644 --- a/crates/scouter_server/src/api/grpc/interceptor.rs +++ b/crates/scouter_server/src/api/grpc/interceptor.rs @@ -1,14 +1,16 @@ use crate::api::routes::user::utils::get_user; use crate::api::state::AppState; +use jsonwebtoken::errors::ErrorKind; use scouter_auth::permission::UserPermissions; use scouter_sql::sql::traits::UserSqlLogic; use scouter_sql::PostgresClient; use std::sync::Arc; use tonic::body::Body; use tonic::codegen::http::{HeaderValue, Request}; +use tonic::metadata::MetadataMap; use tonic::{async_trait, Status}; use tonic_middleware::RequestInterceptor; -use tracing::info; +use tracing::{error, info, instrument}; const AUTHORIZATION: &str = "authorization"; const X_REFRESHED_TOKEN: &str = "x-refreshed-token"; @@ -20,15 +22,91 @@ pub struct AuthInterceptor { } impl AuthInterceptor { - pub fn new(state: Arc) -> Self { + pub(crate) fn new(state: Arc) -> Self { Self { state } } + async fn handle_token_refresh( + &self, + token: &str, + ) -> Result<(UserPermissions, Option), Status> { + info!("Access token expired, attempting refresh"); + + let expired_claims = self + .state + .auth_manager + .decode_jwt_without_validation(token) + .map_err(|_| Status::unauthenticated("Invalid token format"))?; + + let mut user = get_user(&self.state, &expired_claims.sub) + .await + .map_err(|_| { + error!("Failed to get user for token refresh"); + Status::unauthenticated("User not found") + })?; + + let stored_refresh = user.refresh_token.as_ref().ok_or_else(|| { + error!("No refresh token found for user: {}", user.username); + Status::unauthenticated("Refresh token not found") + })?; + + self.state + .auth_manager + .validate_refresh_token(stored_refresh) + .map_err(|_| { + error!("Invalid refresh token for user: {}", user.username); + Status::unauthenticated("Invalid refresh token") + })?; + + let new_access_token = self.state.auth_manager.generate_jwt(&user); + let new_refresh_token = self.state.auth_manager.generate_refresh_token(&user); + + user.refresh_token = Some(new_refresh_token); + + PostgresClient::update_user(&self.state.db_pool, &user) + .await + .map_err(|e| { + error!("Failed to update refresh token in database: {}", e); + Status::internal("Failed to update refresh token") + })?; + + let auth_middleware = UserPermissions { + username: user.username.clone(), + permissions: user.permissions, + group_permissions: user.group_permissions, + }; + + info!("Token successfully refreshed for user: {}", user.username); + + Ok((auth_middleware, Some(new_access_token))) + } + + fn set_response_headers( + &self, + req: &mut Request, + username: &str, + refreshed_token: Option, + ) -> Result<(), Status> { + let username_header = HeaderValue::from_str(username) + .map_err(|_| Status::internal("Failed to set username header"))?; + req.headers_mut().insert(X_USERNAME, username_header); + + if let Some(token) = refreshed_token { + let mut metadata = MetadataMap::new(); + let token_value = token + .parse() + .map_err(|_| Status::internal("Failed to parse token value"))?; + metadata.insert(X_REFRESHED_TOKEN, token_value); + req.extensions_mut().insert(metadata); + } + + Ok(()) + } } #[async_trait] impl RequestInterceptor for AuthInterceptor { + #[instrument(skip_all)] async fn intercept(&self, mut req: Request) -> Result, Status> { - // Extract bearer token from HTTP headers (not metadata) let token = req .headers() .get(AUTHORIZATION) @@ -36,88 +114,31 @@ impl RequestInterceptor for AuthInterceptor { .and_then(|s| s.strip_prefix("Bearer ")) .ok_or_else(|| Status::unauthenticated("Missing authorization token"))?; - // Validate token match self.state.auth_manager.validate_jwt(token) { Ok(claims) => { - // Token is valid - set username in header for downstream services - let username_header = HeaderValue::from_str(&claims.sub) - .map_err(|_| Status::internal("Failed to set username header"))?; - - req.headers_mut().insert(X_USERNAME, username_header); - - // Store permissions in request extensions let auth_middleware = UserPermissions { - username: claims.sub, + username: claims.sub.clone(), permissions: claims.permissions, group_permissions: claims.group_permissions, }; req.extensions_mut().insert(auth_middleware); - + self.set_response_headers(&mut req, &claims.sub, None)?; Ok(req) } - Err(_) => { - info!("Access token expired, attempting refresh"); - - // Decode without validation to get claims - let expired_claims = self - .state - .auth_manager - .decode_jwt_without_validation(token) - .map_err(|_| Status::unauthenticated("Invalid token format"))?; - - // Get user from database - let mut user = get_user(&self.state, &expired_claims.sub) - .await - .map_err(|_| Status::unauthenticated("User not found"))?; - - // Validate stored refresh token - if let Some(stored_refresh) = user.refresh_token.as_ref() { - if self - .state - .auth_manager - .validate_refresh_token(stored_refresh) - .is_ok() - { - // Generate new tokens - let new_access_token = self.state.auth_manager.generate_jwt(&user); - let new_refresh_token = - self.state.auth_manager.generate_refresh_token(&user); - - // Update refresh token in database - user.refresh_token = Some(new_refresh_token); - - PostgresClient::update_user(&self.state.db_pool, &user) - .await - .map_err(|_| Status::internal("Failed to update refresh token"))?; - - // Store permissions in request extensions - let auth_middleware = UserPermissions { - username: user.username.clone(), - permissions: user.permissions, - group_permissions: user.group_permissions, - }; - req.extensions_mut().insert(auth_middleware); - - // Set username header for downstream services - let username_header = HeaderValue::from_str(&user.username) - .map_err(|_| Status::internal("Failed to set username header"))?; - req.headers_mut().insert(X_USERNAME, username_header); - - // Add new token to headers so it can be returned to client - let new_token_header = - HeaderValue::from_str(&format!("Bearer {}", new_access_token)) - .map_err(|_| { - Status::internal("Failed to set refreshed token header") - })?; - req.headers_mut() - .insert(X_REFRESHED_TOKEN, new_token_header); - - return Ok(req); - } + Err(e) => match e.kind() { + ErrorKind::ExpiredSignature => { + let (auth_middleware, refreshed_token) = + self.handle_token_refresh(token).await?; + let username = auth_middleware.username.clone(); + req.extensions_mut().insert(auth_middleware); + self.set_response_headers(&mut req, &username, refreshed_token)?; + Ok(req) } - - Err(Status::unauthenticated("Token refresh failed")) - } + _ => { + error!("Token validation failed: {:?}", e.kind()); + Err(Status::unauthenticated("Invalid token")) + } + }, } } } diff --git a/crates/scouter_server/src/api/grpc/message.rs b/crates/scouter_server/src/api/grpc/message.rs index f0b5e63d6..123e0edd7 100644 --- a/crates/scouter_server/src/api/grpc/message.rs +++ b/crates/scouter_server/src/api/grpc/message.rs @@ -1,13 +1,13 @@ use crate::api::state::AppState; use anyhow::Result; -use scouter_types::MessageRecord; -use std::sync::Arc; -use tonic::{Request, Response, Status}; -use tracing::{debug, error, instrument}; - use scouter_tonic::{ InsertMessageRequest, InsertMessageResponse, MessageService, MessageServiceServer, }; +use scouter_types::MessageRecord; +use std::sync::Arc; +use tonic::metadata::MetadataMap; +use tonic::{Request, Response, Status}; +use tracing::{debug, error, info, instrument}; #[derive(Clone)] pub struct MessageGrpcService { @@ -34,7 +34,7 @@ impl MessageService for MessageGrpcService { let message_bytes = &request.get_ref().message_record; // Check if token was refreshed and add to response - let refreshed_token = request.metadata().get("x-refreshed-token").cloned(); + let refreshed_metadata = request.extensions().get::(); let message_record: MessageRecord = serde_json::from_slice(message_bytes).map_err(|e| { error!(error = %e, "Failed to deserialize MessageRecord"); @@ -58,8 +58,13 @@ impl MessageService for MessageGrpcService { }); // If token was refreshed, add it to response metadata - if let Some(token) = refreshed_token { - response.metadata_mut().insert("x-refreshed-token", token); + if let Some(metadata) = refreshed_metadata { + if let Some(token) = metadata.get("x-refreshed-token") { + info!("Adding refreshed token to response metadata"); + response + .metadata_mut() + .insert("x-refreshed-token", token.clone()); + } } Ok(response) diff --git a/crates/scouter_server/src/api/routes/auth/middleware.rs b/crates/scouter_server/src/api/routes/auth/middleware.rs index 03384534d..fc09594e6 100644 --- a/crates/scouter_server/src/api/routes/auth/middleware.rs +++ b/crates/scouter_server/src/api/routes/auth/middleware.rs @@ -161,6 +161,7 @@ pub async fn auth_api_middleware( header::AUTHORIZATION, HeaderValue::from_str(&format!("Bearer {new_access_token}")).unwrap(), ); + info!("Successfully refreshed access token"); return Ok(response); } diff --git a/crates/scouter_tonic/src/client.rs b/crates/scouter_tonic/src/client.rs index 2d2d110a2..ef388e086 100644 --- a/crates/scouter_tonic/src/client.rs +++ b/crates/scouter_tonic/src/client.rs @@ -10,7 +10,7 @@ use tonic::transport::Channel; use tonic::Request; use tonic_health::pb::health_client::HealthClient; use tonic_health::pb::HealthCheckRequest; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, instrument}; pub const X_REFRESHED_TOKEN: &str = "x-refreshed-token"; pub const AUTHORIZATION: &str = "authorization"; @@ -56,6 +56,7 @@ impl GrpcClient { } /// Login via gRPC and store the JWT token + #[instrument(skip_all)] pub async fn login(&mut self) -> Result<(), ClientError> { debug!("Attempting gRPC login for user: {}", self.config.username); @@ -171,6 +172,7 @@ impl GrpcClient { } /// Insert message with automatic token refresh and retry + #[instrument(skip_all)] pub async fn insert_message( &self, message_record: Vec, @@ -191,12 +193,10 @@ impl GrpcClient { )) })?; - // Check if server refreshed the token if let Some(new_token) = response .metadata() .get(X_REFRESHED_TOKEN) .and_then(|v| v.to_str().ok()) - .and_then(|s| s.strip_prefix("Bearer ")) { info!("Server refreshed token, updating local copy"); self.update_token(new_token.to_string()); diff --git a/py-scouter/pyproject.toml b/py-scouter/pyproject.toml index d07c39b49..2cf1b8a4c 100644 --- a/py-scouter/pyproject.toml +++ b/py-scouter/pyproject.toml @@ -7,7 +7,7 @@ classifiers = [ "Programming Language :: Python :: Implementation :: PyPy", ] license = "MIT" -version = "0.12.0" +version = "0.15.0" description = "" authors = [ {name = "Thorrester", email = ""}, diff --git a/py-scouter/uv.lock b/py-scouter/uv.lock index 06aa08e3d..cffc79a02 100644 --- a/py-scouter/uv.lock +++ b/py-scouter/uv.lock @@ -1234,7 +1234,7 @@ wheels = [ [[package]] name = "scouter-ml" -version = "0.12.0" +version = "0.15.0" source = { editable = "." } [package.dev-dependencies] diff --git a/release-plz.toml b/release-plz.toml index df798962c..2530f3d48 100644 --- a/release-plz.toml +++ b/release-plz.toml @@ -25,6 +25,14 @@ publish = true name = "scouter-events" publish = true +[[package]] +name = "scouter-http" +publish = true + +[[package]] +name = "scouter-macro" +publish = true + [[package]] name = "scouter-observability" publish = true @@ -41,6 +49,10 @@ publish = true name = "scouter-semver" publish = true +[[package]] +name = "scouter-tracing" +publish = true + [[package]] name = "scouter-types" publish = true