diff --git a/crates/observation-tools-client/src/client.rs b/crates/observation-tools-client/src/client.rs index 38ddbb2..55686f6 100644 --- a/crates/observation-tools-client/src/client.rs +++ b/crates/observation-tools-client/src/client.rs @@ -4,13 +4,13 @@ use crate::error::Result; use crate::execution::BeginExecution; use crate::execution::ExecutionHandle; use crate::observation_handle::ObservationHandle; -use crate::ObservationWithPayload; use async_channel; use log::error; use log::info; use log::trace; use napi_derive::napi; use observation_tools_shared::models::Execution; +use observation_tools_shared::Observation; // Re-export constants from shared crate for convenience pub use observation_tools_shared::BATCH_SIZE; pub use observation_tools_shared::BLOB_THRESHOLD_BYTES; @@ -31,11 +31,18 @@ pub(crate) enum UploaderMessage { handle: ExecutionHandle, uploaded_tx: tokio::sync::watch::Sender, }, - Observations { - observations: Vec, + Observation { + observation: Observation, handle: ObservationHandle, uploaded_tx: tokio::sync::watch::Sender, }, + Payload { + observation_id: observation_tools_shared::ObservationId, + execution_id: observation_tools_shared::models::ExecutionId, + payload_id: observation_tools_shared::PayloadId, + name: String, + payload: observation_tools_shared::Payload, + }, Flush, Shutdown, } @@ -48,15 +55,26 @@ impl std::fmt::Debug for UploaderMessage { .debug_struct("Execution") .field("execution", execution) .finish(), - Self::Observations { - observations, + Self::Observation { + observation, handle, .. } => f - .debug_struct("Observations") - .field("observations", observations) + .debug_struct("Observation") + .field("observation", observation) .field("handle", handle) .finish(), + Self::Payload { + observation_id, + payload_id, + name, + .. + } => f + .debug_struct("Payload") + .field("observation_id", observation_id) + .field("payload_id", payload_id) + .field("name", name) + .finish(), Self::Flush => write!(f, "Flush"), Self::Shutdown => write!(f, "Shutdown"), } @@ -279,6 +297,18 @@ impl ClientBuilder { } } +/// Data for a payload ready to be uploaded +#[derive(Debug)] +pub(crate) struct PayloadUploadData { + pub(crate) observation_id: observation_tools_shared::ObservationId, + pub(crate) execution_id: observation_tools_shared::models::ExecutionId, + pub(crate) payload_id: observation_tools_shared::PayloadId, + pub(crate) name: String, + pub(crate) mime_type: String, + pub(crate) size: usize, + pub(crate) data: Vec, +} + async fn uploader_task( api_client: crate::server_client::Client, rx: async_channel::Receiver, @@ -291,12 +321,14 @@ async fn uploader_task( tokio::sync::watch::Sender, ); - let flush_observations = async |buffer: &mut Vec, - senders: &mut Vec| { - if buffer.is_empty() { + let flush = async |observation_buffer: &mut Vec, + senders: &mut Vec, + payload_buffer: &mut Vec| { + if observation_buffer.is_empty() && payload_buffer.is_empty() { return; } - let result = upload_observations(&api_client, buffer.drain(..).collect()).await; + let result = + upload_observations(&api_client, observation_buffer.drain(..).collect(), payload_buffer.drain(..).collect()).await; match result { Ok(()) => { // Signal all senders that observations were uploaded successfully @@ -314,8 +346,9 @@ async fn uploader_task( } } }; - let mut observation_buffer: Vec = Vec::new(); + let mut observation_buffer: Vec = Vec::new(); let mut sender_buffer: Vec = Vec::new(); + let mut payload_buffer: Vec = Vec::new(); loop { let msg = rx.recv().await.ok(); match msg { @@ -327,7 +360,6 @@ async fn uploader_task( let result = upload_execution(&api_client, execution).await; match result { Ok(()) => { - // Signal successful upload with handle let _ = uploaded_tx.send(Some(Ok(handle))); } Err(e) => { @@ -337,22 +369,36 @@ async fn uploader_task( } } } - Some(UploaderMessage::Observations { - observations, + Some(UploaderMessage::Observation { + observation, handle, uploaded_tx, }) => { - observation_buffer.extend(observations); + observation_buffer.push(observation); sender_buffer.push((handle, uploaded_tx)); - if observation_buffer.len() >= BATCH_SIZE { - flush_observations(&mut observation_buffer, &mut sender_buffer).await; - } + } + Some(UploaderMessage::Payload { + observation_id, + execution_id, + payload_id, + name, + payload, + }) => { + payload_buffer.push(PayloadUploadData { + observation_id, + execution_id, + payload_id, + name, + mime_type: payload.mime_type, + size: payload.size, + data: payload.data, + }); } Some(UploaderMessage::Flush) => { - flush_observations(&mut observation_buffer, &mut sender_buffer).await; + flush(&mut observation_buffer, &mut sender_buffer, &mut payload_buffer).await; } Some(UploaderMessage::Shutdown) | None => { - flush_observations(&mut observation_buffer, &mut sender_buffer).await; + flush(&mut observation_buffer, &mut sender_buffer, &mut payload_buffer).await; break; } } @@ -383,31 +429,42 @@ async fn upload_execution( async fn upload_observations( client: &crate::server_client::Client, - observations: Vec, + observations: Vec, + payloads: Vec, ) -> Result<()> { - if observations.is_empty() { + if observations.is_empty() && payloads.is_empty() { return Ok(()); } // Group by execution_id - let mut by_execution: std::collections::HashMap<_, Vec<_>> = std::collections::HashMap::new(); + let mut by_execution: std::collections::HashMap<_, (Vec<_>, Vec<_>)> = + std::collections::HashMap::new(); for obs in observations { by_execution - .entry(obs.observation.execution_id) + .entry(obs.execution_id) .or_default() + .0 .push(obs); } + for p in payloads { + by_execution + .entry(p.execution_id) + .or_default() + .1 + .push(p); + } // Upload each batch via multipart form - for (execution_id, observations) in by_execution { + for (execution_id, (observations, payloads)) in by_execution { trace!( - "Uploading {} observations for execution {}", + "Uploading {} observations + {} payloads for execution {}", observations.len(), + payloads.len(), execution_id ); client - .create_observations_multipart(&execution_id.to_string(), observations) + .create_observations_multipart(&execution_id.to_string(), observations, payloads) .await .map_err(|e| crate::error::Error::Config(e.to_string()))?; } diff --git a/crates/observation-tools-client/src/lib.rs b/crates/observation-tools-client/src/lib.rs index 3538e6c..5c358c5 100644 --- a/crates/observation-tools-client/src/lib.rs +++ b/crates/observation-tools-client/src/lib.rs @@ -33,8 +33,8 @@ pub use observation_handle::SendObservation; // Re-export procedural macro pub use observation_tools_macros::observe; // Re-export from shared for convenience -use observation_tools_shared::Observation; pub use observation_tools_shared::Payload; +pub use observation_tools_shared::PayloadBuilder; /// Register a global execution shared across all threads /// @@ -64,9 +64,3 @@ pub fn current_execution() -> Option { pub fn clear_global_execution() { context::clear_global_execution() } - -#[derive(Debug)] -struct ObservationWithPayload { - observation: Observation, - payload: Payload, -} diff --git a/crates/observation-tools-client/src/logger.rs b/crates/observation-tools-client/src/logger.rs index 50dc64a..bb4c09c 100644 --- a/crates/observation-tools-client/src/logger.rs +++ b/crates/observation-tools-client/src/logger.rs @@ -40,7 +40,7 @@ impl Log for ObservationLogger { let builder = ObservationBuilder::new("ObservationLogger") .observation_type(ObservationType::LogEntry) .log_level(record.level().into()) - .label(format!("log/{}", record.target())); + .metadata("target", record.target()); let builder = if let (Some(file), Some(line)) = (record.file(), record.line()) { builder.source(file, line) } else { diff --git a/crates/observation-tools-client/src/observation.rs b/crates/observation-tools-client/src/observation.rs index 8a25a39..7c58ed7 100644 --- a/crates/observation-tools-client/src/observation.rs +++ b/crates/observation-tools-client/src/observation.rs @@ -7,14 +7,15 @@ use crate::execution::ExecutionHandle; use crate::observation_handle::ObservationHandle; use crate::observation_handle::SendObservation; use crate::Error; -use crate::ObservationWithPayload; use napi_derive::napi; +use observation_tools_shared::GroupId; use observation_tools_shared::LogLevel; use observation_tools_shared::Markdown; use observation_tools_shared::Observation; use observation_tools_shared::ObservationId; use observation_tools_shared::ObservationType; use observation_tools_shared::Payload; +use observation_tools_shared::PayloadId; use observation_tools_shared::SourceInfo; use serde::Serialize; use std::any::TypeId; @@ -32,7 +33,7 @@ use std::fmt::Debug; #[napi] pub struct ObservationBuilder { name: String, - labels: Vec, + group_ids: Vec, metadata: HashMap, source: Option, parent_span_id: Option, @@ -49,7 +50,7 @@ impl ObservationBuilder { pub fn new>(name: T) -> Self { Self { name: name.as_ref().to_string(), - labels: Vec::new(), + group_ids: Vec::new(), metadata: HashMap::new(), source: None, parent_span_id: None, @@ -78,18 +79,6 @@ impl ObservationBuilder { self } - /// Add a label to the observation - pub fn label(mut self, label: impl Into) -> Self { - self.labels.push(label.into()); - self - } - - /// Add multiple labels to the observation - pub fn labels(mut self, labels: impl IntoIterator>) -> Self { - self.labels.extend(labels.into_iter().map(|l| l.into())); - self - } - /// Add metadata to the observation pub fn metadata(mut self, key: impl Into, value: impl Into) -> Self { self.metadata.insert(key.into(), value.into()); @@ -155,7 +144,7 @@ impl ObservationBuilder { } /// Internal method to build and send the observation - fn send_observation(mut self, payload: Payload) -> SendObservation { + pub(crate) fn send_observation(mut self, payload: Payload) -> SendObservation { let Some(execution) = self.execution.take().or_else(context::get_current_execution) else { log::error!( "No execution context available for observation '{}'", @@ -167,11 +156,22 @@ impl ObservationBuilder { self.send_with_execution(payload, &execution) } - /// Internal method to build and send the observation with a resolved execution + /// Internal method to build and send the observation with a resolved execution. + /// Sends the default payload with name "default". fn send_with_execution( self, payload: Payload, execution: &ExecutionHandle, + ) -> SendObservation { + self.send_with_execution_and_name(payload, "default", execution) + } + + /// Internal method to build and send the observation with a named payload. + fn send_with_execution_and_name( + self, + payload: Payload, + payload_name: impl Into, + execution: &ExecutionHandle, ) -> SendObservation { let observation_id = self.custom_id.unwrap_or_else(ObservationId::new); @@ -196,13 +196,12 @@ impl ObservationBuilder { name: self.name, observation_type: self.observation_type, log_level: self.log_level, - labels: self.labels, + group_ids: self.group_ids, + parent_group_id: None, metadata: self.metadata, source: self.source, parent_span_id, created_at: chrono::Utc::now(), - mime_type: payload.mime_type.clone(), - payload_size: payload.size, }; let (uploaded_tx, uploaded_rx) = tokio::sync::watch::channel::(None); @@ -215,13 +214,11 @@ impl ObservationBuilder { observation_id ); + // Send observation metadata if let Err(e) = execution .uploader_tx - .try_send(UploaderMessage::Observations { - observations: vec![ObservationWithPayload { - observation, - payload, - }], + .try_send(UploaderMessage::Observation { + observation, handle: handle.clone(), uploaded_tx, }) @@ -230,6 +227,15 @@ impl ObservationBuilder { return SendObservation::stub(Error::ChannelClosed); } + // Send the payload as a separate message + let _ = execution.uploader_tx.try_send(UploaderMessage::Payload { + observation_id, + execution_id: execution.id(), + payload_id: PayloadId::new(), + name: payload_name.into(), + payload, + }); + SendObservation::new(handle, uploaded_rx) } } @@ -275,10 +281,10 @@ impl ObservationBuilder { Ok(self) } - /// Add a label to the observation - #[napi(js_name = "label")] - pub fn label_napi(&mut self, label: String) -> &Self { - self.labels.push(label); + /// Add a group to the observation by group ID string + #[napi(js_name = "group")] + pub fn group_napi(&mut self, group_id: String) -> &Self { + self.group_ids.push(GroupId::from(group_id)); self } diff --git a/crates/observation-tools-client/src/server_client.rs b/crates/observation-tools-client/src/server_client.rs index f5b9e9b..0549727 100644 --- a/crates/observation-tools-client/src/server_client.rs +++ b/crates/observation-tools-client/src/server_client.rs @@ -1,6 +1,7 @@ use crate::server_client::types::PayloadOrPointerResponse; -use crate::ObservationWithPayload; +use observation_tools_shared::Observation; use reqwest::multipart::Part; +use serde::Serialize; use std::time::Duration; include!(concat!(env!("OUT_DIR"), "/observation_tools_openapi.rs")); @@ -21,6 +22,13 @@ impl PayloadOrPointerResponse { } } +impl types::GetObservation { + /// Get the first (default) payload's data, for backward-compatible tests + pub fn payload(&self) -> &PayloadOrPointerResponse { + &self.payloads[0].data + } +} + pub fn create_client(base_url: &str, api_key: Option) -> anyhow::Result { Ok(Client::new_with_client( &base_url, @@ -51,41 +59,68 @@ pub async fn pre_hook_async( Ok(()) } +/// Default payload name used for the primary observation payload +pub const DEFAULT_PAYLOAD_NAME: &str = "default"; + +/// Entry in the payload manifest sent alongside the multipart form +#[derive(Serialize)] +struct PayloadManifestEntry { + observation_id: String, + payload_id: String, + name: String, + mime_type: String, + size: usize, +} + // Extension methods for Client impl Client { pub(crate) async fn create_observations_multipart( &self, execution_id: &str, - observations: Vec, + observations: Vec, + payloads: Vec, ) -> anyhow::Result<()> { + if observations.is_empty() && payloads.is_empty() { + return Ok(()); + } + let url = format!("{}/api/exe/{}/obs", self.baseurl, execution_id); - let observation_count = observations.len(); log::trace!( - "Creating observations via multipart: url={}, count={}", + "Creating observations via multipart: url={}, observations={}, payloads={}", url, - observation_count + observations.len(), + payloads.len() ); // Build multipart form let mut form = reqwest::multipart::Form::new(); - let (observations, payloads): (Vec<_>, Vec<_>) = observations - .into_iter() - .map(|obs| (obs.observation, obs.payload)) - .unzip(); - let payloads = observations - .iter() - .zip(payloads.into_iter()) - .map(|(obs, payload)| (obs.id.to_string(), payload.data)) - .collect::>(); - + // Part 1: observations JSON let observations_json = serde_json::to_vec(&observations)?; let observations_part = Part::bytes(observations_json).mime_str("application/json")?; form = form.part("observations", observations_part); - for (obs_id, payload_data) in payloads { - let part = Part::bytes(payload_data); - form = form.part(obs_id, part); + + // Part 2: payload manifest JSON + let manifest: Vec = payloads + .iter() + .map(|p| PayloadManifestEntry { + observation_id: p.observation_id.to_string(), + payload_id: p.payload_id.as_str().to_string(), + name: p.name.clone(), + mime_type: p.mime_type.clone(), + size: p.size, + }) + .collect(); + let manifest_json = serde_json::to_vec(&manifest)?; + let manifest_part = Part::bytes(manifest_json).mime_str("application/json")?; + form = form.part("payload_manifest", manifest_part); + + // Part 3: payload data parts + for p in payloads { + let part_key = format!("{}:{}:{}", p.observation_id, p.payload_id.as_str(), p.name); + let part = Part::bytes(p.data); + form = form.part(part_key, part); } let mut request_builder = self.client.post(&url).multipart(form);