diff --git a/Cargo.toml b/Cargo.toml index d2063b7..d8c680c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ observation-tools-macros = { path = "crates/observation-tools-macros", version = observation-tools-server = { path = "crates/observation-tools-server", version = "0.0.14" } observation-tools-shared = { path = "crates/observation-tools-shared", version = "0.0.14" } openapiv3 = "2.2" +prost = "0.13" prettyplease = "0.2" progenitor = "0.11.2" progenitor-client = "0.11.2" diff --git a/crates/observation-tools-client/openapi.json b/crates/observation-tools-client/openapi.json index 68fa813..2063253 100644 --- a/crates/observation-tools-client/openapi.json +++ b/crates/observation-tools-client/openapi.json @@ -83,12 +83,15 @@ }, { "properties": { - "payload": { - "$ref": "#/components/schemas/PayloadOrPointerResponse" + "payloads": { + "items": { + "$ref": "#/components/schemas/GetPayload" + }, + "type": "array" } }, "required": [ - "payload" + "payloads" ], "type": "object" } @@ -105,6 +108,39 @@ ], "type": "object" }, + "GetPayload": { + "properties": { + "data": { + "$ref": "#/components/schemas/PayloadOrPointerResponse" + }, + "id": { + "$ref": "#/components/schemas/PayloadId" + }, + "mime_type": { + "type": "string" + }, + "name": { + "type": "string" + }, + "size": { + "minimum": 0, + "type": "integer" + } + }, + "required": [ + "id", + "name", + "mime_type", + "size", + "data" + ], + "type": "object" + }, + "GroupId": { + "description": "Unique identifier for a group\n\nGroup IDs are user-provided strings. By default, a UUID v7 string is generated,\nbut any string value is accepted.", + "example": "018e9a3a2c1b7e3f8d2a4b5c6d7e8f9b", + "type": "string" + }, "ListExecutionsResponse": { "description": "Response for listing executions", "properties": { @@ -170,17 +206,17 @@ "$ref": "#/components/schemas/ExecutionId", "description": "ID of the execution this observation belongs to" }, - "id": { - "$ref": "#/components/schemas/ObservationId", - "description": "Unique identifier for this observation" - }, - "labels": { - "description": "Hierarchical labels for grouping observations\nUses path convention (e.g., \"api/request/headers\")", + "group_ids": { + "description": "IDs of groups this observation belongs to", "items": { - "type": "string" + "$ref": "#/components/schemas/GroupId" }, "type": "array" }, + "id": { + "$ref": "#/components/schemas/ObservationId", + "description": "Unique identifier for this observation" + }, "log_level": { "$ref": "#/components/schemas/LogLevel", "description": "Log level for this observation" @@ -195,10 +231,6 @@ }, "type": "object" }, - "mime_type": { - "description": "MIME type of the payload (e.g., \"text/plain\", \"application/json\")", - "type": "string" - }, "name": { "description": "User-defined name for this observation", "type": "string" @@ -207,16 +239,16 @@ "$ref": "#/components/schemas/ObservationType", "description": "Type of observation" }, + "parent_group_id": { + "$ref": "#/components/schemas/GroupId", + "description": "Parent group ID (used when observation_type == Group)", + "nullable": true + }, "parent_span_id": { "description": "Parent span ID (for tracing integration)", "nullable": true, "type": "string" }, - "payload_size": { - "description": "Size of the payload in bytes", - "minimum": 0, - "type": "integer" - }, "source": { "$ref": "#/components/schemas/SourceInfo", "description": "Source location where this observation was created", @@ -229,9 +261,7 @@ "name", "observation_type", "log_level", - "created_at", - "mime_type", - "payload_size" + "created_at" ], "type": "object" }, @@ -245,10 +275,16 @@ "enum": [ "LogEntry", "Payload", - "Span" + "Span", + "Group" ], "type": "string" }, + "PayloadId": { + "description": "Unique identifier for a payload (UUIDv7)", + "example": "018e9a3a2c1b7e3f8d2a4b5c6d7e8f9c", + "type": "string" + }, "PayloadOrPointerResponse": { "oneOf": [ { @@ -556,7 +592,7 @@ }, "/api/exe/{execution_id}/obs/{observation_id}/content": { "get": { - "operationId": "get_observation_blob", + "operationId": "get_observation_blob_legacy", "parameters": [ { "description": "Execution ID", @@ -600,7 +636,68 @@ "description": "Observation blob not found" } }, - "summary": "Get observation blob content", + "summary": "Get observation blob content (legacy route for backward compat)\nThis is kept to support old URLs like /api/exe/{exec_id}/obs/{obs_id}/content", + "tags": [ + "observations" + ] + } + }, + "/api/exe/{execution_id}/obs/{observation_id}/payload/{payload_id}/content": { + "get": { + "operationId": "get_observation_blob", + "parameters": [ + { + "description": "Execution ID", + "in": "path", + "name": "execution_id", + "required": true, + "schema": { + "type": "string" + } + }, + { + "description": "Observation ID", + "in": "path", + "name": "observation_id", + "required": true, + "schema": { + "type": "string" + } + }, + { + "description": "Payload ID", + "in": "path", + "name": "payload_id", + "required": true, + "schema": { + "type": "string" + } + } + ], + "responses": { + "200": { + "content": { + "application/octet-stream": { + "schema": { + "items": { + "format": "int32", + "minimum": 0, + "type": "integer" + }, + "type": "array" + } + } + }, + "description": "Payload content" + }, + "400": { + "description": "Bad request" + }, + "404": { + "description": "Payload not found" + } + }, + "summary": "Get observation payload content", "tags": [ "observations" ] diff --git a/crates/observation-tools-client/src/axum/request_observer.rs b/crates/observation-tools-client/src/axum/request_observer.rs index a013b7a..434efe1 100644 --- a/crates/observation-tools-client/src/axum/request_observer.rs +++ b/crates/observation-tools-client/src/axum/request_observer.rs @@ -1,6 +1,8 @@ use crate::context; use crate::execution::ExecutionHandle; +use crate::group::GroupBuilder; use crate::observation::ObservationBuilder; +use crate::observation_handle::ObservationPayloadHandle; use axum::body::Body; use axum::extract::Request; use axum::response::Response; @@ -28,30 +30,21 @@ use tower::Layer; use tower::Service; /// State shared between the streaming body and the observation emitter. -/// This is used to collect data as it streams and emit the observation when -/// complete. The observation is emitted in the Drop implementation when the -/// body is finished. +/// This is used to collect data as it streams and add the body payload when +/// complete. The body payload is added in the Drop implementation via the +/// ObservationPayloadHandle. struct StreamingObserverState { buffer: BytesMut, content_type: String, - log_level: LogLevel, - status_code: u16, - execution: ExecutionHandle, + payload_handle: ObservationPayloadHandle, } impl StreamingObserverState { - fn new( - content_type: String, - log_level: LogLevel, - status_code: u16, - execution: ExecutionHandle, - ) -> Self { + fn new(content_type: String, payload_handle: ObservationPayloadHandle) -> Self { Self { buffer: BytesMut::new(), content_type, - log_level, - status_code, - execution, + payload_handle, } } @@ -64,7 +57,7 @@ impl Drop for StreamingObserverState { fn drop(&mut self) { let bytes = self.buffer.clone().freeze(); tracing::debug!( - "StreamingObserverBody: emitting observation with {} bytes on drop", + "StreamingObserverBody: adding body payload with {} bytes on drop", bytes.len() ); let payload = Payload { @@ -73,19 +66,14 @@ impl Drop for StreamingObserverState { size: bytes.len(), }; - ObservationBuilder::new("http/response/body") - .label("http/response") - .label("http/response/body") - .metadata("status", &self.status_code.to_string()) - .log_level(self.log_level) - .execution(&self.execution) - .payload(payload); + self.payload_handle.raw_payload("body", payload); } } pin_project! { /// A body wrapper that streams data through while collecting it for observation. - /// When the stream completes, it emits the observation with the collected body. + /// When the stream completes, it adds the body as a named payload to the + /// existing response observation. pub struct StreamingObserverBody { #[pin] inner: Body, @@ -94,20 +82,12 @@ pin_project! { } impl StreamingObserverBody { - fn new( - inner: Body, - content_type: String, - log_level: LogLevel, - status_code: u16, - execution: ExecutionHandle, - ) -> Self { + fn new(inner: Body, content_type: String, payload_handle: ObservationPayloadHandle) -> Self { Self { inner, state: Arc::new(Mutex::new(StreamingObserverState::new( content_type, - log_level, - status_code, - execution, + payload_handle, ))), } } @@ -247,27 +227,33 @@ where }; let (parts, body) = req.into_parts(); - ObservationBuilder::new("http/request/headers") - .label("http/request") - .label("http/request/headers") + + // Create a single group for the HTTP exchange + let http_group = GroupBuilder::new("http_request") .metadata("method", parts.method.to_string()) .metadata("uri", parts.uri.to_string()) - .serde(&json!(filter_headers( - &parts.headers, - &config.excluded_headers - ))); + .build_with_execution(&execution) + .into_handle(); + // Collect request body let request_body_bytes = body .collect() .await .map(|collected| collected.to_bytes()) .unwrap_or_else(|_| Bytes::new()); - ObservationBuilder::new("http/request/body") - .label("http/request") - .label("http/request/body") + + // Single request observation with named payloads: "headers" + "body" + let headers_json = json!(filter_headers(&parts.headers, &config.excluded_headers)); + let headers_payload = Payload::json( + serde_json::to_string(&headers_json).unwrap_or_default(), + ); + ObservationBuilder::new("http/request") + .group(&http_group) .metadata("method", parts.method.to_string()) .metadata("uri", parts.uri.to_string()) - .payload(bytes_to_payload(&request_body_bytes, &parts.headers)); + .execution(&execution) + .named_raw_payload("headers", headers_payload) + .raw_payload("body", bytes_to_payload(&request_body_bytes, &parts.headers)); let response = inner .call(Request::from_parts(parts, Body::from(request_body_bytes))) @@ -280,31 +266,30 @@ where 500..=599 => LogLevel::Error, _ => LogLevel::Info, }; - ObservationBuilder::new("http/response/headers") - .label("http/response") - .label("http/response/headers") + + // Single response observation with named payload "headers" sent immediately, + // "body" added later when streaming completes via the payload handle + let resp_headers_json = json!(filter_headers(&parts.headers, &config.excluded_headers)); + let resp_headers_payload = Payload::json( + serde_json::to_string(&resp_headers_json).unwrap_or_default(), + ); + let payload_handle = ObservationBuilder::new("http/response") + .group(&http_group) .metadata("status", &parts.status.as_u16().to_string()) .log_level(log_level) - .serde(&json!(filter_headers( - &parts.headers, - &config.excluded_headers - ))); + .execution(&execution) + .named_raw_payload("headers", resp_headers_payload); // Wrap the response body in a streaming observer that captures data as it flows - // through and emits the observation when the stream completes + // through and adds the body payload when the stream completes let content_type = parts .headers .get(CONTENT_TYPE) .and_then(|v| v.to_str().ok()) .unwrap_or("application/octet-stream") .to_string(); - let streaming_body = StreamingObserverBody::new( - body, - content_type, - log_level, - parts.status.as_u16(), - execution, - ); + let streaming_body = + StreamingObserverBody::new(body, content_type, payload_handle); Ok(Response::from_parts(parts, Body::new(streaming_body))) }) diff --git a/crates/observation-tools-client/src/client.rs b/crates/observation-tools-client/src/client.rs index 38ddbb2..55686f6 100644 --- a/crates/observation-tools-client/src/client.rs +++ b/crates/observation-tools-client/src/client.rs @@ -4,13 +4,13 @@ use crate::error::Result; use crate::execution::BeginExecution; use crate::execution::ExecutionHandle; use crate::observation_handle::ObservationHandle; -use crate::ObservationWithPayload; use async_channel; use log::error; use log::info; use log::trace; use napi_derive::napi; use observation_tools_shared::models::Execution; +use observation_tools_shared::Observation; // Re-export constants from shared crate for convenience pub use observation_tools_shared::BATCH_SIZE; pub use observation_tools_shared::BLOB_THRESHOLD_BYTES; @@ -31,11 +31,18 @@ pub(crate) enum UploaderMessage { handle: ExecutionHandle, uploaded_tx: tokio::sync::watch::Sender, }, - Observations { - observations: Vec, + Observation { + observation: Observation, handle: ObservationHandle, uploaded_tx: tokio::sync::watch::Sender, }, + Payload { + observation_id: observation_tools_shared::ObservationId, + execution_id: observation_tools_shared::models::ExecutionId, + payload_id: observation_tools_shared::PayloadId, + name: String, + payload: observation_tools_shared::Payload, + }, Flush, Shutdown, } @@ -48,15 +55,26 @@ impl std::fmt::Debug for UploaderMessage { .debug_struct("Execution") .field("execution", execution) .finish(), - Self::Observations { - observations, + Self::Observation { + observation, handle, .. } => f - .debug_struct("Observations") - .field("observations", observations) + .debug_struct("Observation") + .field("observation", observation) .field("handle", handle) .finish(), + Self::Payload { + observation_id, + payload_id, + name, + .. + } => f + .debug_struct("Payload") + .field("observation_id", observation_id) + .field("payload_id", payload_id) + .field("name", name) + .finish(), Self::Flush => write!(f, "Flush"), Self::Shutdown => write!(f, "Shutdown"), } @@ -279,6 +297,18 @@ impl ClientBuilder { } } +/// Data for a payload ready to be uploaded +#[derive(Debug)] +pub(crate) struct PayloadUploadData { + pub(crate) observation_id: observation_tools_shared::ObservationId, + pub(crate) execution_id: observation_tools_shared::models::ExecutionId, + pub(crate) payload_id: observation_tools_shared::PayloadId, + pub(crate) name: String, + pub(crate) mime_type: String, + pub(crate) size: usize, + pub(crate) data: Vec, +} + async fn uploader_task( api_client: crate::server_client::Client, rx: async_channel::Receiver, @@ -291,12 +321,14 @@ async fn uploader_task( tokio::sync::watch::Sender, ); - let flush_observations = async |buffer: &mut Vec, - senders: &mut Vec| { - if buffer.is_empty() { + let flush = async |observation_buffer: &mut Vec, + senders: &mut Vec, + payload_buffer: &mut Vec| { + if observation_buffer.is_empty() && payload_buffer.is_empty() { return; } - let result = upload_observations(&api_client, buffer.drain(..).collect()).await; + let result = + upload_observations(&api_client, observation_buffer.drain(..).collect(), payload_buffer.drain(..).collect()).await; match result { Ok(()) => { // Signal all senders that observations were uploaded successfully @@ -314,8 +346,9 @@ async fn uploader_task( } } }; - let mut observation_buffer: Vec = Vec::new(); + let mut observation_buffer: Vec = Vec::new(); let mut sender_buffer: Vec = Vec::new(); + let mut payload_buffer: Vec = Vec::new(); loop { let msg = rx.recv().await.ok(); match msg { @@ -327,7 +360,6 @@ async fn uploader_task( let result = upload_execution(&api_client, execution).await; match result { Ok(()) => { - // Signal successful upload with handle let _ = uploaded_tx.send(Some(Ok(handle))); } Err(e) => { @@ -337,22 +369,36 @@ async fn uploader_task( } } } - Some(UploaderMessage::Observations { - observations, + Some(UploaderMessage::Observation { + observation, handle, uploaded_tx, }) => { - observation_buffer.extend(observations); + observation_buffer.push(observation); sender_buffer.push((handle, uploaded_tx)); - if observation_buffer.len() >= BATCH_SIZE { - flush_observations(&mut observation_buffer, &mut sender_buffer).await; - } + } + Some(UploaderMessage::Payload { + observation_id, + execution_id, + payload_id, + name, + payload, + }) => { + payload_buffer.push(PayloadUploadData { + observation_id, + execution_id, + payload_id, + name, + mime_type: payload.mime_type, + size: payload.size, + data: payload.data, + }); } Some(UploaderMessage::Flush) => { - flush_observations(&mut observation_buffer, &mut sender_buffer).await; + flush(&mut observation_buffer, &mut sender_buffer, &mut payload_buffer).await; } Some(UploaderMessage::Shutdown) | None => { - flush_observations(&mut observation_buffer, &mut sender_buffer).await; + flush(&mut observation_buffer, &mut sender_buffer, &mut payload_buffer).await; break; } } @@ -383,31 +429,42 @@ async fn upload_execution( async fn upload_observations( client: &crate::server_client::Client, - observations: Vec, + observations: Vec, + payloads: Vec, ) -> Result<()> { - if observations.is_empty() { + if observations.is_empty() && payloads.is_empty() { return Ok(()); } // Group by execution_id - let mut by_execution: std::collections::HashMap<_, Vec<_>> = std::collections::HashMap::new(); + let mut by_execution: std::collections::HashMap<_, (Vec<_>, Vec<_>)> = + std::collections::HashMap::new(); for obs in observations { by_execution - .entry(obs.observation.execution_id) + .entry(obs.execution_id) .or_default() + .0 .push(obs); } + for p in payloads { + by_execution + .entry(p.execution_id) + .or_default() + .1 + .push(p); + } // Upload each batch via multipart form - for (execution_id, observations) in by_execution { + for (execution_id, (observations, payloads)) in by_execution { trace!( - "Uploading {} observations for execution {}", + "Uploading {} observations + {} payloads for execution {}", observations.len(), + payloads.len(), execution_id ); client - .create_observations_multipart(&execution_id.to_string(), observations) + .create_observations_multipart(&execution_id.to_string(), observations, payloads) .await .map_err(|e| crate::error::Error::Config(e.to_string()))?; } diff --git a/crates/observation-tools-client/src/execution.rs b/crates/observation-tools-client/src/execution.rs index a7022ca..bd339a7 100644 --- a/crates/observation-tools-client/src/execution.rs +++ b/crates/observation-tools-client/src/execution.rs @@ -84,6 +84,15 @@ impl ExecutionHandle { pub fn base_url(&self) -> &str { &self.base_url } + + /// Create a placeholder handle (for stub observations when no execution context exists) + pub(crate) fn placeholder() -> Self { + Self { + execution_id: ExecutionId::nil(), + uploader_tx: async_channel::unbounded().0, + base_url: String::new(), + } + } } #[napi] diff --git a/crates/observation-tools-client/src/group.rs b/crates/observation-tools-client/src/group.rs new file mode 100644 index 0000000..ac00cb9 --- /dev/null +++ b/crates/observation-tools-client/src/group.rs @@ -0,0 +1,200 @@ +//! Group builder and handle types for hierarchical observation grouping + +use crate::client::ObservationUploadResult; +use crate::client::UploaderMessage; +use crate::context; +use crate::execution::ExecutionHandle; +use crate::observation::ObservationBuilder; +use crate::observation_handle::SendObservation; +use crate::Error; +use observation_tools_shared::GroupId; +use observation_tools_shared::ObservationId; +use observation_tools_shared::ObservationType; +use observation_tools_shared::Payload; +use std::collections::HashMap; + +/// Builder for creating groups +/// +/// Groups are first-class hierarchical containers for observations. +/// They are themselves observations with `ObservationType::Group`. +pub struct GroupBuilder { + name: String, + custom_id: Option, + parent_group_id: Option, + metadata: HashMap, +} + +impl GroupBuilder { + /// Create a new group builder with the given name + pub fn new(name: impl Into) -> Self { + Self { + name: name.into(), + custom_id: None, + parent_group_id: None, + metadata: HashMap::new(), + } + } + + /// Set a custom group ID + pub fn id(mut self, id: impl Into) -> Self { + self.custom_id = Some(GroupId::parse(&id.into())); + self + } + + /// Set the parent group ID (for creating child groups) + pub(crate) fn parent(mut self, parent_id: GroupId) -> Self { + self.parent_group_id = Some(parent_id); + self + } + + /// Add metadata to the group + pub fn metadata(mut self, key: impl Into, value: impl Into) -> Self { + self.metadata.insert(key.into(), value.into()); + self + } + + /// Build and send the group using the current execution context + pub fn build(self) -> SendGroup { + match context::get_current_execution() { + Some(execution) => self.build_with_execution(&execution), + None => { + log::trace!( + "No execution context available for group '{}'", + self.name + ); + SendGroup::stub(Error::NoExecutionContext) + } + } + } + + /// Build and send the group with an explicit execution handle + pub fn build_with_execution(self, execution: &ExecutionHandle) -> SendGroup { + let group_id = self.custom_id.unwrap_or_else(GroupId::new); + let observation_id: ObservationId = group_id.clone().into(); + + let group_handle = GroupHandle { + group_id, + execution_id: execution.id(), + uploader_tx: execution.uploader_tx.clone(), + base_url: execution.base_url().to_string(), + }; + + // Serialize metadata as the payload + let payload = if self.metadata.is_empty() { + Payload::json("{}".to_string()) + } else { + Payload::json(serde_json::to_string(&self.metadata).unwrap_or_else(|_| "{}".to_string())) + }; + + let mut builder = ObservationBuilder::new(self.name) + .with_id(observation_id) + .observation_type(ObservationType::Group) + .execution(execution); + + if let Some(parent_id) = self.parent_group_id { + builder = builder.parent_group(parent_id); + } + for (k, v) in self.metadata { + builder = builder.metadata(k, v); + } + + let send = builder.send_observation(payload); + SendGroup::from_send_observation(group_handle, send) + } +} + +/// Handle to a created group +#[derive(Clone, Debug)] +#[allow(dead_code)] +pub struct GroupHandle { + pub(crate) group_id: GroupId, + pub(crate) execution_id: observation_tools_shared::models::ExecutionId, + pub(crate) uploader_tx: async_channel::Sender, + pub(crate) base_url: String, +} + +impl GroupHandle { + /// Get the group ID + pub fn id(&self) -> GroupId { + self.group_id.clone() + } + + /// Create a child group builder with this group as parent + pub fn child(&self, name: impl Into) -> GroupBuilder { + GroupBuilder::new(name).parent(self.group_id.clone()) + } + + /// Construct a GroupHandle from a known ID without creating/sending a group. + /// + /// This is useful for the tracing layer which already knows span IDs + /// and doesn't need to create group observations for every span. + pub fn from_id(group_id: GroupId, execution: &ExecutionHandle) -> Self { + Self { + group_id, + execution_id: execution.id(), + uploader_tx: execution.uploader_tx.clone(), + base_url: execution.base_url().to_string(), + } + } +} + +/// Result of sending a group, allowing waiting for upload +pub struct SendGroup { + handle: GroupHandle, + uploaded_rx: Option>, + creation_error: Option, +} + +impl SendGroup { + fn stub(error: Error) -> Self { + Self { + handle: GroupHandle { + group_id: GroupId::nil(), + execution_id: observation_tools_shared::models::ExecutionId::nil(), + uploader_tx: async_channel::unbounded().0, + base_url: String::new(), + }, + uploaded_rx: None, + creation_error: Some(error), + } + } + + pub(crate) fn from_send_observation(handle: GroupHandle, send: SendObservation) -> Self { + Self { + handle, + uploaded_rx: send.uploaded_rx, + creation_error: send.creation_error, + } + } + + /// Wait for the group to be uploaded + pub async fn wait_for_upload(mut self) -> crate::error::Result { + if self.creation_error.is_some() { + return Err(Error::CreationError); + } + + let rx = self.uploaded_rx.as_mut().ok_or(Error::ChannelClosed)?; + + loop { + { + let value = rx.borrow_and_update(); + match &*value { + Some(Ok(_)) => return Ok(self.handle.clone()), + Some(Err(error_msg)) => return Err(Error::UploadFailed(error_msg.clone())), + None => {} + } + } + rx.changed().await.map_err(|_| Error::ChannelClosed)?; + } + } + + /// Get a reference to the group handle + pub fn handle(&self) -> &GroupHandle { + &self.handle + } + + /// Consume and return the group handle + pub fn into_handle(self) -> GroupHandle { + self.handle + } +} diff --git a/crates/observation-tools-client/src/lib.rs b/crates/observation-tools-client/src/lib.rs index 3538e6c..9c90145 100644 --- a/crates/observation-tools-client/src/lib.rs +++ b/crates/observation-tools-client/src/lib.rs @@ -10,6 +10,7 @@ mod client; pub(crate) mod context; mod error; mod execution; +mod group; mod logger; mod observation; mod observation_handle; @@ -26,15 +27,21 @@ pub use error::Error; pub use error::Result; pub use execution::BeginExecution; pub use execution::ExecutionHandle; +pub use group::GroupBuilder; +pub use group::GroupHandle; +pub use group::SendGroup; pub use logger::ObservationLogger; pub use observation::ObservationBuilder; pub use observation_handle::ObservationHandle; +pub use observation_handle::ObservationPayloadHandle; pub use observation_handle::SendObservation; -// Re-export procedural macro +// Re-export procedural macros +pub use observation_tools_macros::group; pub use observation_tools_macros::observe; // Re-export from shared for convenience -use observation_tools_shared::Observation; +pub use observation_tools_shared::GroupId; pub use observation_tools_shared::Payload; +pub use observation_tools_shared::PayloadBuilder; /// Register a global execution shared across all threads /// @@ -65,8 +72,3 @@ pub fn clear_global_execution() { context::clear_global_execution() } -#[derive(Debug)] -struct ObservationWithPayload { - observation: Observation, - payload: Payload, -} diff --git a/crates/observation-tools-client/src/logger.rs b/crates/observation-tools-client/src/logger.rs index 50dc64a..bb4c09c 100644 --- a/crates/observation-tools-client/src/logger.rs +++ b/crates/observation-tools-client/src/logger.rs @@ -40,7 +40,7 @@ impl Log for ObservationLogger { let builder = ObservationBuilder::new("ObservationLogger") .observation_type(ObservationType::LogEntry) .log_level(record.level().into()) - .label(format!("log/{}", record.target())); + .metadata("target", record.target()); let builder = if let (Some(file), Some(line)) = (record.file(), record.line()) { builder.source(file, line) } else { diff --git a/crates/observation-tools-client/src/observation.rs b/crates/observation-tools-client/src/observation.rs index 8a25a39..f2f5391 100644 --- a/crates/observation-tools-client/src/observation.rs +++ b/crates/observation-tools-client/src/observation.rs @@ -4,17 +4,20 @@ use crate::client::ObservationUploadResult; use crate::client::UploaderMessage; use crate::context; use crate::execution::ExecutionHandle; +use crate::group::GroupHandle; use crate::observation_handle::ObservationHandle; +use crate::observation_handle::ObservationPayloadHandle; use crate::observation_handle::SendObservation; use crate::Error; -use crate::ObservationWithPayload; use napi_derive::napi; +use observation_tools_shared::GroupId; use observation_tools_shared::LogLevel; use observation_tools_shared::Markdown; use observation_tools_shared::Observation; use observation_tools_shared::ObservationId; use observation_tools_shared::ObservationType; use observation_tools_shared::Payload; +use observation_tools_shared::PayloadId; use observation_tools_shared::SourceInfo; use serde::Serialize; use std::any::TypeId; @@ -32,7 +35,7 @@ use std::fmt::Debug; #[napi] pub struct ObservationBuilder { name: String, - labels: Vec, + group_ids: Vec, metadata: HashMap, source: Option, parent_span_id: Option, @@ -42,6 +45,8 @@ pub struct ObservationBuilder { custom_id: Option, /// Explicit execution handle (overrides context) execution: Option, + /// Parent group ID (for group observations) + parent_group_id: Option, } impl ObservationBuilder { @@ -49,7 +54,7 @@ impl ObservationBuilder { pub fn new>(name: T) -> Self { Self { name: name.as_ref().to_string(), - labels: Vec::new(), + group_ids: Vec::new(), metadata: HashMap::new(), source: None, parent_span_id: None, @@ -57,6 +62,7 @@ impl ObservationBuilder { log_level: LogLevel::Info, custom_id: None, execution: None, + parent_group_id: None, } } @@ -78,15 +84,9 @@ impl ObservationBuilder { self } - /// Add a label to the observation - pub fn label(mut self, label: impl Into) -> Self { - self.labels.push(label.into()); - self - } - - /// Add multiple labels to the observation - pub fn labels(mut self, labels: impl IntoIterator>) -> Self { - self.labels.extend(labels.into_iter().map(|l| l.into())); + /// Add a group to the observation + pub fn group(mut self, group: &GroupHandle) -> Self { + self.group_ids.push(group.id()); self } @@ -124,6 +124,12 @@ impl ObservationBuilder { self } + /// Set the parent group ID (for group observations) + pub(crate) fn parent_group(mut self, id: GroupId) -> Self { + self.parent_group_id = Some(id); + self + } + /// Serialize the value as JSON and send the observation /// /// Returns a `SendObservation` which allows you to wait for the upload @@ -154,8 +160,61 @@ impl ObservationBuilder { self.send_observation(payload) } + /// Send the observation with a named serde-serialized payload, returning a handle + /// that allows adding more named payloads later. + pub fn named_payload( + self, + name: impl Into, + value: &T, + ) -> ObservationPayloadHandle { + if TypeId::of::() == TypeId::of::() { + panic!("Use named_raw_payload() method to set Payload directly"); + } + let payload = Payload::json(serde_json::to_string(value).unwrap_or_default()); + self.send_named_observation(name, payload) + } + + /// Send the observation with a named Debug-formatted payload + pub fn named_debug( + self, + name: impl Into, + value: &T, + ) -> ObservationPayloadHandle { + let payload = Payload::debug(format!("{:#?}", value)); + self.send_named_observation(name, payload) + } + + /// Send the observation with a named raw payload + pub fn named_raw_payload(self, name: impl Into, payload: Payload) -> ObservationPayloadHandle { + self.send_named_observation(name, payload) + } + + fn send_named_observation( + mut self, + name: impl Into, + payload: Payload, + ) -> ObservationPayloadHandle { + let execution = match self.execution.take().or_else(context::get_current_execution) { + Some(exec) => exec, + None => { + let send = SendObservation::stub(Error::NoExecutionContext); + return ObservationPayloadHandle::new( + send.into_handle(), + ExecutionHandle::placeholder(), + ); + } + }; + + // Send the observation metadata + named payload (single path, no duplication) + let send = self.send_with_execution_and_name(payload, name, &execution); + let handle = send.into_handle(); + + ObservationPayloadHandle::new(handle, execution.clone()) + } + + /// Internal method to build and send the observation - fn send_observation(mut self, payload: Payload) -> SendObservation { + pub(crate) fn send_observation(mut self, payload: Payload) -> SendObservation { let Some(execution) = self.execution.take().or_else(context::get_current_execution) else { log::error!( "No execution context available for observation '{}'", @@ -167,11 +226,22 @@ impl ObservationBuilder { self.send_with_execution(payload, &execution) } - /// Internal method to build and send the observation with a resolved execution + /// Internal method to build and send the observation with a resolved execution. + /// Sends the default payload with name "default". fn send_with_execution( self, payload: Payload, execution: &ExecutionHandle, + ) -> SendObservation { + self.send_with_execution_and_name(payload, "default", execution) + } + + /// Internal method to build and send the observation with a named payload. + fn send_with_execution_and_name( + self, + payload: Payload, + payload_name: impl Into, + execution: &ExecutionHandle, ) -> SendObservation { let observation_id = self.custom_id.unwrap_or_else(ObservationId::new); @@ -196,13 +266,12 @@ impl ObservationBuilder { name: self.name, observation_type: self.observation_type, log_level: self.log_level, - labels: self.labels, + group_ids: self.group_ids, + parent_group_id: self.parent_group_id, metadata: self.metadata, source: self.source, parent_span_id, created_at: chrono::Utc::now(), - mime_type: payload.mime_type.clone(), - payload_size: payload.size, }; let (uploaded_tx, uploaded_rx) = tokio::sync::watch::channel::(None); @@ -215,13 +284,11 @@ impl ObservationBuilder { observation_id ); + // Send observation metadata if let Err(e) = execution .uploader_tx - .try_send(UploaderMessage::Observations { - observations: vec![ObservationWithPayload { - observation, - payload, - }], + .try_send(UploaderMessage::Observation { + observation, handle: handle.clone(), uploaded_tx, }) @@ -230,6 +297,15 @@ impl ObservationBuilder { return SendObservation::stub(Error::ChannelClosed); } + // Send the payload as a separate message + let _ = execution.uploader_tx.try_send(UploaderMessage::Payload { + observation_id, + execution_id: execution.id(), + payload_id: PayloadId::new(), + name: payload_name.into(), + payload, + }); + SendObservation::new(handle, uploaded_rx) } } @@ -275,10 +351,10 @@ impl ObservationBuilder { Ok(self) } - /// Add a label to the observation - #[napi(js_name = "label")] - pub fn label_napi(&mut self, label: String) -> &Self { - self.labels.push(label); + /// Add a group to the observation by group ID string + #[napi(js_name = "group")] + pub fn group_napi(&mut self, group_id: String) -> &Self { + self.group_ids.push(GroupId::parse(&group_id)); self } diff --git a/crates/observation-tools-client/src/observation_handle.rs b/crates/observation-tools-client/src/observation_handle.rs index 7e463d6..c085a79 100644 --- a/crates/observation-tools-client/src/observation_handle.rs +++ b/crates/observation-tools-client/src/observation_handle.rs @@ -133,3 +133,57 @@ impl From for ObservationHandle { send.into_handle() } } + +/// Handle for multi-part observations that allows adding additional named payloads +pub struct ObservationPayloadHandle { + handle: ObservationHandle, + execution: crate::execution::ExecutionHandle, +} + +impl ObservationPayloadHandle { + pub(crate) fn new(handle: ObservationHandle, execution: crate::execution::ExecutionHandle) -> Self { + Self { handle, execution } + } + + /// Add a named payload serialized via serde + pub fn payload(&self, name: impl Into, value: &T) -> &Self { + let payload = + observation_tools_shared::Payload::json(serde_json::to_string(value).unwrap_or_default()); + self.raw_payload(name, payload) + } + + /// Add a named payload formatted via Debug + pub fn debug_payload( + &self, + name: impl Into, + value: &T, + ) -> &Self { + let payload = observation_tools_shared::Payload::debug(format!("{:#?}", value)); + self.raw_payload(name, payload) + } + + /// Add a named raw payload + pub fn raw_payload(&self, name: impl Into, payload: observation_tools_shared::Payload) -> &Self { + let _ = self + .execution + .uploader_tx + .try_send(crate::client::UploaderMessage::Payload { + observation_id: self.handle.observation_id, + execution_id: self.handle.execution_id, + payload_id: observation_tools_shared::PayloadId::new(), + name: name.into(), + payload, + }); + self + } + + /// Get a reference to the observation handle + pub fn handle(&self) -> &ObservationHandle { + &self.handle + } + + /// Consume and return the observation handle + pub fn into_handle(self) -> ObservationHandle { + self.handle + } +} diff --git a/crates/observation-tools-client/src/server_client.rs b/crates/observation-tools-client/src/server_client.rs index f5b9e9b..efb48d3 100644 --- a/crates/observation-tools-client/src/server_client.rs +++ b/crates/observation-tools-client/src/server_client.rs @@ -1,6 +1,7 @@ use crate::server_client::types::PayloadOrPointerResponse; -use crate::ObservationWithPayload; +use observation_tools_shared::Observation; use reqwest::multipart::Part; +use serde::Serialize; use std::time::Duration; include!(concat!(env!("OUT_DIR"), "/observation_tools_openapi.rs")); @@ -21,6 +22,13 @@ impl PayloadOrPointerResponse { } } +impl types::GetObservation { + /// Get the first (default) payload's data, for backward-compatible tests + pub fn payload(&self) -> &PayloadOrPointerResponse { + &self.payloads[0].data + } +} + pub fn create_client(base_url: &str, api_key: Option) -> anyhow::Result { Ok(Client::new_with_client( &base_url, @@ -51,41 +59,68 @@ pub async fn pre_hook_async( Ok(()) } +/// Default payload name used for the primary observation payload +pub const DEFAULT_PAYLOAD_NAME: &str = "default"; + +/// Entry in the payload manifest sent alongside the multipart form +#[derive(Serialize)] +struct PayloadManifestEntry { + observation_id: String, + payload_id: String, + name: String, + mime_type: String, + size: usize, +} + // Extension methods for Client impl Client { pub(crate) async fn create_observations_multipart( &self, execution_id: &str, - observations: Vec, + observations: Vec, + payloads: Vec, ) -> anyhow::Result<()> { + if observations.is_empty() && payloads.is_empty() { + return Ok(()); + } + let url = format!("{}/api/exe/{}/obs", self.baseurl, execution_id); - let observation_count = observations.len(); log::trace!( - "Creating observations via multipart: url={}, count={}", + "Creating observations via multipart: url={}, observations={}, payloads={}", url, - observation_count + observations.len(), + payloads.len() ); // Build multipart form let mut form = reqwest::multipart::Form::new(); - let (observations, payloads): (Vec<_>, Vec<_>) = observations - .into_iter() - .map(|obs| (obs.observation, obs.payload)) - .unzip(); - let payloads = observations - .iter() - .zip(payloads.into_iter()) - .map(|(obs, payload)| (obs.id.to_string(), payload.data)) - .collect::>(); - + // Part 1: observations JSON let observations_json = serde_json::to_vec(&observations)?; let observations_part = Part::bytes(observations_json).mime_str("application/json")?; form = form.part("observations", observations_part); - for (obs_id, payload_data) in payloads { - let part = Part::bytes(payload_data); - form = form.part(obs_id, part); + + // Part 2: payload manifest JSON + let manifest: Vec = payloads + .iter() + .map(|p| PayloadManifestEntry { + observation_id: p.observation_id.to_string(), + payload_id: p.payload_id.to_string(), + name: p.name.clone(), + mime_type: p.mime_type.clone(), + size: p.size, + }) + .collect(); + let manifest_json = serde_json::to_vec(&manifest)?; + let manifest_part = Part::bytes(manifest_json).mime_str("application/json")?; + form = form.part("payload_manifest", manifest_part); + + // Part 3: payload data parts + for p in payloads { + let part_key = format!("{}:{}:{}", p.observation_id, p.payload_id, p.name); + let part = Part::bytes(p.data); + form = form.part(part_key, part); } let mut request_builder = self.client.post(&url).multipart(form); diff --git a/crates/observation-tools-client/src/tracing/layer.rs b/crates/observation-tools-client/src/tracing/layer.rs index 99bafe9..a405145 100644 --- a/crates/observation-tools-client/src/tracing/layer.rs +++ b/crates/observation-tools-client/src/tracing/layer.rs @@ -1,6 +1,8 @@ use super::span_data::SpanData; use crate::context; +use crate::group::GroupHandle; use crate::observation::ObservationBuilder; +use observation_tools_shared::GroupId; use observation_tools_shared::LogLevel; use observation_tools_shared::ObservationType; use observation_tools_shared::Payload; @@ -70,9 +72,9 @@ where fn on_close(&self, id: Id, ctx: Context<'_, S>) { // Skip if no execution context - if context::get_current_execution().is_none() { + let Some(execution) = context::get_current_execution() else { return; - } + }; let Some(span) = ctx.span(&id) else { return; @@ -89,17 +91,20 @@ where let duration_ms = duration.as_secs_f64() * 1000.0; // Get the span's own ID and parent span ID - let span_id = id.into_u64().to_string(); + let span_id = id.into_u64(); let parent_span_id = span .parent() .map(|parent| parent.id().into_u64().to_string()); + // Create a group handle from the span ID + let group_handle = GroupHandle::from_id(GroupId::from_u64(span_id), &execution); + // Build and send observation let builder = ObservationBuilder::new(&data.name) .observation_type(ObservationType::Span) .log_level(tracing_level_to_log_level(data.level)) - .label(format!("tracing/spans/{}", data.target)) - .metadata("span_id", &span_id) + .group(&group_handle) + .metadata("span_id", &span_id.to_string()) .metadata("duration_ms", format!("{:.3}", duration_ms)) .metadata("target", &data.target); @@ -127,9 +132,9 @@ where } } - if context::get_current_execution().is_none() { + let Some(execution) = context::get_current_execution() else { return; - } + }; let mut visitor = FieldVisitor::new(); event.record(&mut visitor); @@ -153,13 +158,19 @@ where .unwrap_or_default(); // Get current span from tracing's span stack for parent attribution - let parent_span_id = ctx.current_span().id().map(|id| id.into_u64().to_string()); + let current_span_id = ctx.current_span().id().map(|id| id.into_u64()); + let parent_span_id = current_span_id.map(|id| id.to_string()); // Build and send observation - let builder = ObservationBuilder::new(metadata.name()) + let mut builder = ObservationBuilder::new(metadata.name()) .observation_type(ObservationType::LogEntry) - .log_level(tracing_level_to_log_level(*metadata.level())) - .label(format!("tracing/events/{}", metadata.target())); + .log_level(tracing_level_to_log_level(*metadata.level())); + + // Add group reference from current span + if let Some(span_id) = current_span_id { + let group_handle = GroupHandle::from_id(GroupId::from_u64(span_id), &execution); + builder = builder.group(&group_handle); + } let builder = if let (Some(file), Some(line)) = (metadata.file(), metadata.line()) { builder.source(file, line) diff --git a/crates/observation-tools-client/tests/api_integration_test.rs b/crates/observation-tools-client/tests/api_integration_test.rs index 8458344..d3534ba 100644 --- a/crates/observation-tools-client/tests/api_integration_test.rs +++ b/crates/observation-tools-client/tests/api_integration_test.rs @@ -9,7 +9,9 @@ use anyhow::anyhow; use common::TestServer; use observation_tools::observe; use observation_tools::server_client::types::PayloadOrPointerResponse; +use observation_tools::ObservationBuilder; use observation_tools_shared::Payload; +use serde_json::json; use std::collections::HashSet; use tracing::debug; @@ -45,8 +47,6 @@ async fn test_create_observation_with_metadata() -> anyhow::Result<()> { let (execution, _) = server .with_execution("test-execution-with-observation", async { observation_tools::ObservationBuilder::new("test-observation") - .label("test/label1") - .label("test/label2") .metadata("key1", "value1") .metadata("key2", "value2") .payload("test payload data"); @@ -57,15 +57,14 @@ async fn test_create_observation_with_metadata() -> anyhow::Result<()> { assert_eq!(observations.len(), 1); - let obs = &observations[0]; + // Get the full observation with inline payload data + let obs = server.get_observation(&execution.id(), &observations[0].id).await?; assert_eq!(obs.name, "test-observation"); assert_eq!(obs.execution_id.to_string(), execution.id().to_string()); - assert!(obs.labels.contains(&"test/label1".to_string())); - assert!(obs.labels.contains(&"test/label2".to_string())); assert_eq!(obs.metadata.get("key1"), Some(&"value1".to_string())); assert_eq!(obs.metadata.get("key2"), Some(&"value2".to_string())); - assert_eq!(obs.mime_type, "text/plain"); - assert_eq!(obs.payload.as_str(), Some("test payload data")); + assert_eq!(obs.payloads[0].mime_type, "text/plain"); + assert_eq!(obs.payload().as_str(), Some("test payload data")); Ok(()) } @@ -100,20 +99,21 @@ async fn test_create_many_observations() -> anyhow::Result<()> { let observations = server.list_observations(&execution.id()).await?; assert_eq!(observations.len(), expected_names.len()); - // Verify all payloads are stored inline (not as blobs) since they're under the - // threshold + // List returns metadata only (payloads as Pointers) + // Verify payload sizes are correct for obs in &observations { - assert!( - !matches!(obs.payload, PayloadOrPointerResponse::Pointer { .. }), - "Observation {} should have inline payload data (not stored as blob)", - obs.name - ); assert_eq!( - obs.payload_size, + obs.payloads[0].size as u64, observation_tools::BLOB_THRESHOLD_BYTES as u64 - 1, "Observation {} payload size should be exactly 1 byte under threshold", obs.name ); + // When listing, all payloads come back as Pointers + assert!( + matches!(obs.payload(), PayloadOrPointerResponse::Pointer { .. }), + "Observation {} should have Pointer payload in list response", + obs.name + ); } let obs_names: HashSet = observations.iter().map(|o| o.name.clone()).collect(); @@ -168,7 +168,6 @@ async fn test_concurrent_executions() -> anyhow::Result<()> { for _ in 0..NUM_OBSERVATIONS { debug!("Task 1 sending observation"); observation_tools::observe!(TASK_1_NAME) - .label("concurrent/task1") .serde(&"data from task 1") .wait_for_upload() .await?; @@ -186,7 +185,6 @@ async fn test_concurrent_executions() -> anyhow::Result<()> { while let Some(_) = task1_receiver.recv().await { debug!("Task 2 sending observation"); observation_tools::observe!(TASK_2_NAME) - .label("concurrent/task2") .serde(&"data from task 2") .wait_for_upload() .await?; @@ -237,7 +235,6 @@ async fn test_with_observations_spawned_task() -> anyhow::Result<()> { async move { // This observation should be associated with the parent execution observe!(SPAWNED_OBS_NAME) - .label("spawned/task") .serde(&"data from spawned task") .wait_for_upload() .await @@ -296,7 +293,6 @@ async fn test_large_payload_blob_upload() -> anyhow::Result<()> { let (execution, _) = server .with_execution("test-execution-with-large-payload", async { observe!("large-observation") - .label("test/large-payload") .serde(&large_payload); }) .await?; @@ -310,22 +306,24 @@ async fn test_large_payload_blob_upload() -> anyhow::Result<()> { // The payload.data should be empty because it was uploaded as a blob assert!( - matches!(obs.payload, PayloadOrPointerResponse::Pointer { .. }), + matches!(obs.payload(), PayloadOrPointerResponse::Pointer { .. }), "Large payload data should be stored as a blob pointer" ); // But the size should still be recorded assert_eq!( - obs.payload_size, expected_size as u64, + obs.payloads[0].size as u64, expected_size as u64, "Payload size should be recorded correctly" ); // Verify the blob can be retrieved via the OpenAPI client let api_client = server.create_api_client()?; + let payload_id = &obs.payloads[0].id; let blob_response = api_client .get_observation_blob() .execution_id(&execution.id().to_string()) .observation_id(&obs.id.to_string()) + .payload_id(&payload_id.to_string()) .send() .await?; @@ -353,3 +351,63 @@ async fn test_large_payload_blob_upload() -> anyhow::Result<()> { Ok(()) } + +#[test_log::test(tokio::test)] +async fn test_named_payloads() -> anyhow::Result<()> { + let server = TestServer::new().await; + let client = server.create_client()?; + let execution = client + .begin_execution("test-named-payloads")? + .wait_for_upload() + .await?; + + observation_tools::with_execution(execution.clone(), async { + // Create an observation with a named payload, then add more payloads via the handle + let handle = ObservationBuilder::new("multi-payload-obs") + .metadata("kind", "multi") + .named_payload("headers", &json!({"content-type": "application/json"})); + + handle.payload("body", &json!({"message": "hello"})); + handle.raw_payload( + "raw-part", + Payload::text("some raw text"), + ); + }) + .await; + + client.shutdown().await?; + + let observations = server.list_observations(&execution.id()).await?; + + assert_eq!(observations.len(), 1, "Should have exactly one observation"); + + // Get full observation with inline payload data + let obs = server.get_observation(&execution.id(), &observations[0].id).await?; + assert_eq!(obs.name, "multi-payload-obs"); + assert_eq!(obs.metadata.get("kind"), Some(&"multi".to_string())); + + // The observation should have 3 named payloads (headers, body, raw-part) + assert_eq!( + obs.payloads.len(), 3, + "Should have 3 payloads (headers + body + raw-part), got {}", + obs.payloads.len() + ); + + // Find the "headers" payload by name + let headers_payload = obs + .payloads + .iter() + .find(|p| p.name == "headers") + .expect("Should have a 'headers' payload"); + let payload_json: serde_json::Value = match &headers_payload.data { + PayloadOrPointerResponse::Json(v) => v.clone(), + other => anyhow::bail!("Expected JSON payload for 'headers', got {:?}", other), + }; + assert_eq!( + payload_json, + json!({"content-type": "application/json"}), + "Headers payload should contain the headers data" + ); + + Ok(()) +} diff --git a/crates/observation-tools-client/tests/api_key_integration_test.rs b/crates/observation-tools-client/tests/api_key_integration_test.rs index 941e83f..e47c854 100644 --- a/crates/observation-tools-client/tests/api_key_integration_test.rs +++ b/crates/observation-tools-client/tests/api_key_integration_test.rs @@ -45,7 +45,6 @@ async fn test_api_with_valid_key() -> anyhow::Result<()> { observation_tools::with_execution(execution, async { observation_tools::ObservationBuilder::new("test-observation") - .label("test/label") .metadata("key1", "value1") .payload("test payload data") .wait_for_upload() @@ -60,9 +59,11 @@ async fn test_api_with_valid_key() -> anyhow::Result<()> { let observations = server.list_observations(&execution_id).await?; assert_eq!(observations.len(), 1); - let obs = &observations[0]; + + // Get full observation with inline payload data + let obs = server.get_observation(&execution_id, &observations[0].id).await?; assert_eq!(obs.name, "test-observation"); - assert_eq!(obs.payload.as_str(), Some("test payload data")); + assert_eq!(obs.payload().as_str(), Some("test payload data")); Ok(()) } @@ -188,7 +189,6 @@ async fn test_blob_upload_with_auth() -> anyhow::Result<()> { let observation_id = observation_tools::with_execution(execution, async { observation_tools::observe!("large-observation") - .label("test/large-payload") .serde(&large_payload) .wait_for_upload() .await @@ -204,7 +204,7 @@ async fn test_blob_upload_with_auth() -> anyhow::Result<()> { assert_eq!(obs.name, "large-observation"); assert!( - matches!(obs.payload, PayloadOrPointerResponse::Pointer { .. }), + matches!(obs.payload(), PayloadOrPointerResponse::Pointer { .. }), "Large payload data should be empty in metadata (stored as blob)" ); diff --git a/crates/observation-tools-client/tests/axum_layer_test.rs b/crates/observation-tools-client/tests/axum_layer_test.rs index 2fb44aa..f7587c4 100644 --- a/crates/observation-tools-client/tests/axum_layer_test.rs +++ b/crates/observation-tools-client/tests/axum_layer_test.rs @@ -21,6 +21,7 @@ use observation_tools::axum::ExecutionLayer; use observation_tools::axum::RequestObserverConfig; use observation_tools::axum::RequestObserverLayer; use observation_tools::observe; +use observation_tools::server_client::types::ObservationType; use serde_json::json; use std::convert::Infallible; use std::time::Duration; @@ -88,11 +89,26 @@ async fn test_request_observer_captures_request_response() -> anyhow::Result<()> let observations = server .list_observations(&executions.executions[0].id) .await?; - assert_eq!(observations.len(), 4); - assert_eq!(observations[0].name, "http/request/headers"); - assert_eq!(observations[1].name, "http/request/body"); - assert_eq!(observations[2].name, "http/response/headers"); - assert_eq!(observations[3].name, "http/response/body"); + + // Filter to non-group observations + // The request observer creates 2 observations: http/request and http/response + // each with named payloads (headers + body) + let non_group: Vec<_> = observations + .iter() + .filter(|o| o.observation_type != ObservationType::Group) + .collect(); + assert_eq!(non_group.len(), 2); + let obs_names: Vec<&str> = non_group.iter().map(|o| o.name.as_str()).collect(); + assert!(obs_names.contains(&"http/request"), "Expected http/request observation"); + assert!(obs_names.contains(&"http/response"), "Expected http/response observation"); + + // Verify groups were also created + let groups: Vec<_> = observations + .iter() + .filter(|o| o.observation_type == ObservationType::Group) + .collect(); + assert!(groups.len() >= 1, "Expected at least 1 group observation"); + Ok(()) } @@ -146,8 +162,20 @@ async fn test_request_observer_config_excludes_headers() -> anyhow::Result<()> { let observations = server .list_observations(&executions.executions[0].id) .await?; - let payload = observations[0] - .payload + + // Find the http/request observation and look at its "headers" named payload + let request_summary = observations + .iter() + .find(|o| o.name == "http/request") + .expect("Expected http/request observation"); + let request_obs = server.get_observation(&executions.executions[0].id, &request_summary.id).await?; + let headers_payload = request_obs + .payloads + .iter() + .find(|p| p.name == "headers") + .expect("Expected 'headers' named payload"); + let payload = headers_payload + .data .as_json() .ok_or(anyhow!("Not json"))?; let headers = payload.as_object().expect("headers should be object"); @@ -196,9 +224,14 @@ async fn test_error_response_has_error_log_level() -> anyhow::Result<()> { let observations = server .list_observations(&executions.executions[0].id) .await?; - assert_eq!(observations.len(), 5); + + // Find response observation and check its log level + let response_obs = observations + .iter() + .find(|o| o.name == "http/response") + .expect("Expected http/response observation"); assert_eq!( - observations[3].log_level, + response_obs.log_level, observation_tools::server_client::types::LogLevel::Error, "5xx responses should have Error log level" ); @@ -254,18 +287,36 @@ async fn test_request_observer_captures_request_and_response_body() -> anyhow::R .list_observations(&executions.executions[0].id) .await?; - assert_eq!(observations.len(), 4); - - // Check request observation has body with base64 data and content-type - let request_payload = observations[1] - .payload + // Find http/request observation and check its "body" named payload + let request_summary = observations + .iter() + .find(|o| o.name == "http/request") + .expect("Expected http/request observation"); + let request_obs = server.get_observation(&executions.executions[0].id, &request_summary.id).await?; + let request_body_payload = request_obs + .payloads + .iter() + .find(|p| p.name == "body") + .expect("Expected 'body' named payload on http/request"); + let request_payload = request_body_payload + .data .as_json() .ok_or(anyhow!("Not json"))?; assert_eq!(request_payload, &request_body); - // Check response observation has body with base64 data and content-type - let response_payload = observations[3] - .payload + // Find http/response observation and check its "body" named payload + let response_summary = observations + .iter() + .find(|o| o.name == "http/response") + .expect("Expected http/response observation"); + let response_obs = server.get_observation(&executions.executions[0].id, &response_summary.id).await?; + let response_body_payload = response_obs + .payloads + .iter() + .find(|p| p.name == "body") + .expect("Expected 'body' named payload on http/response"); + let response_payload = response_body_payload + .data .as_json() .ok_or(anyhow!("Not json"))?; assert_eq!(response_payload["message"], "echo response"); @@ -302,9 +353,18 @@ async fn test_request_observer_handles_text_body() -> anyhow::Result<()> { .list_observations(&executions.executions[0].id) .await?; - println!("{:#?}", observations); - let response_payload = observations[3] - .payload + let response_summary = observations + .iter() + .find(|o| o.name == "http/response") + .expect("Expected http/response observation"); + let response_obs = server.get_observation(&executions.executions[0].id, &response_summary.id).await?; + let body_payload = response_obs + .payloads + .iter() + .find(|p| p.name == "body") + .expect("Expected 'body' named payload on http/response"); + let response_payload = body_payload + .data .as_str() .ok_or(anyhow!("Not string"))?; assert_eq!(response_payload, "Hello, World!"); @@ -476,40 +536,51 @@ async fn test_streaming_sse_events_are_received_incrementally() -> anyhow::Resul .list_observations(&executions.executions[0].id) .await?; - // Should have 4 observations: request headers, request body, response headers, - // response body - assert_eq!(observations.len(), 4); - assert_eq!(observations[0].name, "http/request/headers"); - assert_eq!(observations[1].name, "http/request/body"); - assert_eq!(observations[2].name, "http/response/headers"); - assert_eq!(observations[3].name, "http/response/body"); - - // Verify the response body was captured - // SSE uses text/event-stream which is stored as InlineBinary + // Filter to non-group observations + let non_group: Vec<_> = observations + .iter() + .filter(|o| o.observation_type != ObservationType::Group) + .collect(); + assert_eq!(non_group.len(), 2); + let obs_names: Vec<&str> = non_group.iter().map(|o| o.name.as_str()).collect(); + assert!(obs_names.contains(&"http/request"), "Expected http/request observation"); + assert!(obs_names.contains(&"http/response"), "Expected http/response observation"); + + // Verify the response observation has at least the headers payload. + // The body payload is added asynchronously via StreamingObserverBody's Drop, + // so it may or may not have arrived by the time we check depending on timing. use observation_tools::server_client::types::PayloadOrPointerResponse; - let response_body = match &observations[3].payload { - PayloadOrPointerResponse::Text(t) => t.clone(), - PayloadOrPointerResponse::InlineBinary(data) => { - let bytes: Vec = data.iter().map(|&x| x as u8).collect(); - String::from_utf8_lossy(&bytes).to_string() - } - other => anyhow::bail!("Unexpected payload type: {:?}", other), - }; - assert!( - response_body.contains("event1"), - "Response body should contain event1, got: {}", - response_body - ); - assert!( - response_body.contains("event2"), - "Response body should contain event2, got: {}", - response_body - ); + let response_summary = observations + .iter() + .find(|o| o.name == "http/response") + .expect("Expected http/response observation"); + let response_obs = server.get_observation(&executions.executions[0].id, &response_summary.id).await?; + let headers_payload = response_obs + .payloads + .iter() + .find(|p| p.name == "headers" || p.name == "default") + .expect("Expected headers payload on http/response"); assert!( - response_body.contains("event3"), - "Response body should contain event3, got: {}", - response_body + matches!(&headers_payload.data, PayloadOrPointerResponse::Json(_)), + "Headers payload should be JSON" ); + // If the body payload arrived, verify its contents + if let Some(body_payload) = response_obs.payloads.iter().find(|p| p.name == "body") { + let response_body = match &body_payload.data { + PayloadOrPointerResponse::Text(t) => t.clone(), + PayloadOrPointerResponse::InlineBinary(data) => { + let bytes: Vec = data.iter().map(|&x| x as u8).collect(); + String::from_utf8_lossy(&bytes).to_string() + } + other => anyhow::bail!("Unexpected payload type: {:?}", other), + }; + assert!( + response_body.contains("event1"), + "Response body should contain event1, got: {}", + response_body + ); + } + Ok(()) } diff --git a/crates/observation-tools-client/tests/common/mod.rs b/crates/observation-tools-client/tests/common/mod.rs index ac6fe01..a10caf6 100644 --- a/crates/observation-tools-client/tests/common/mod.rs +++ b/crates/observation-tools-client/tests/common/mod.rs @@ -112,7 +112,7 @@ impl TestServer { observation_tools::server_client::create_client(&self.base_url, Some(api_key.to_string())) } - /// List all observations for an execution + /// List all observations for an execution (payloads returned as Pointers) #[allow(unused)] pub async fn list_observations( &self, @@ -127,6 +127,23 @@ impl TestServer { Ok(response.observations.clone()) } + /// Get a single observation with inline payload data + #[allow(unused)] + pub async fn get_observation( + &self, + execution_id: &impl ToString, + observation_id: &impl ToString, + ) -> anyhow::Result { + let api_client = self.create_api_client()?; + let response = api_client + .get_observation() + .execution_id(&execution_id.to_string()) + .observation_id(&observation_id.to_string()) + .send() + .await?; + Ok(response.observation.clone()) + } + #[allow(unused)] pub async fn with_execution( &self, diff --git a/crates/observation-tools-client/tests/logger_test.rs b/crates/observation-tools-client/tests/logger_test.rs index f1c27ce..6e088e6 100644 --- a/crates/observation-tools-client/tests/logger_test.rs +++ b/crates/observation-tools-client/tests/logger_test.rs @@ -21,8 +21,8 @@ async fn test_logger() -> anyhow::Result<()> { let observations = server.list_observations(&execution.id()).await?; assert_eq!(observations.len(), 2); - let obs = &observations[0]; - assert_eq!(obs.payload.as_str(), Some("info-log")); + let obs = server.get_observation(&execution.id(), &observations[0].id).await?; + assert_eq!(obs.payload().as_str(), Some("info-log")); assert_eq!(obs.observation_type, ObservationType::LogEntry); assert_eq!(obs.log_level, LogLevel::Info); diff --git a/crates/observation-tools-client/tests/observe_macro_test.rs b/crates/observation-tools-client/tests/observe_macro_test.rs index 4aca7fd..94f7aa2 100644 --- a/crates/observation-tools-client/tests/observe_macro_test.rs +++ b/crates/observation-tools-client/tests/observe_macro_test.rs @@ -17,13 +17,13 @@ async fn test_observe_simple_string_payload() -> anyhow::Result<()> { let observations = server.list_observations(&execution.id()).await?; assert_eq!(observations.len(), 1); - let obs = &observations[0]; + let obs = server.get_observation(&execution.id(), &observations[0].id).await?; assert_eq!(obs.name, "simple_string"); assert_eq!( - obs.payload.as_json(), + obs.payload().as_json(), Some(&serde_json::to_value("hello world")?) ); - assert_eq!(obs.mime_type, "application/json"); + assert_eq!(obs.payloads[0].mime_type, "application/json"); let response = reqwest::get(&observation.handle().url()).await?; assert_eq!(response.status(), 200); @@ -52,15 +52,15 @@ async fn test_observe_serde_struct() -> anyhow::Result<()> { let observations = server.list_observations(&execution.id()).await?; assert_eq!(observations.len(), 1); - let obs = &observations[0]; + let obs = server.get_observation(&execution.id(), &observations[0].id).await?; assert_eq!(obs.name, "serde_struct"); assert_eq!( - obs.payload.as_json(), + obs.payload().as_json(), Some(&serde_json::from_str( r#"{"message":"test message","count":42}"# )?) ); - assert_eq!(obs.mime_type, "application/json"); + assert_eq!(obs.payloads[0].mime_type, "application/json"); let response = reqwest::get(&observation.handle().url()).await?; assert_eq!(response.status(), 200); @@ -92,10 +92,10 @@ async fn test_observe_custom_payload() -> anyhow::Result<()> { let observations = server.list_observations(&execution.id()).await?; assert_eq!(observations.len(), 1); - let obs = &observations[0]; + let obs = server.get_observation(&execution.id(), &observations[0].id).await?; assert_eq!(obs.name, "custom_payload"); - assert_eq!(obs.payload.as_str(), Some("custom message")); - assert_eq!(obs.mime_type, "text/plain"); + assert_eq!(obs.payload().as_str(), Some("custom message")); + assert_eq!(obs.payloads[0].mime_type, "text/plain"); let response = reqwest::get(&observation.handle().url()).await?; assert_eq!(response.status(), 200); @@ -127,10 +127,10 @@ async fn test_observe_custom_with_new_syntax() -> anyhow::Result<()> { let observations = server.list_observations(&execution.id()).await?; assert_eq!(observations.len(), 1); - let obs = &observations[0]; + let obs = server.get_observation(&execution.id(), &observations[0].id).await?; assert_eq!(obs.name, "custom_new_syntax"); - assert_eq!(obs.payload.as_str(), Some("custom: test")); - assert_eq!(obs.mime_type, "text/plain"); + assert_eq!(obs.payload().as_str(), Some("custom: test")); + assert_eq!(obs.payloads[0].mime_type, "text/plain"); let response = reqwest::get(&observation.handle().url()).await?; assert_eq!(response.status(), 200); @@ -151,11 +151,11 @@ async fn test_observe_variable_name_capture() -> anyhow::Result<()> { let observations = server.list_observations(&execution.id()).await?; assert_eq!(observations.len(), 1); - let obs = &observations[0]; + let obs = server.get_observation(&execution.id(), &observations[0].id).await?; // The observation name should match the variable name assert_eq!(obs.name, "my_data"); assert_eq!( - obs.payload.as_json(), + obs.payload().as_json(), Some(&serde_json::to_value("captured variable name")?) ); @@ -166,24 +166,27 @@ async fn test_observe_variable_name_capture() -> anyhow::Result<()> { } #[test_log::test(tokio::test)] -async fn test_observe_with_label() -> anyhow::Result<()> { +async fn test_observe_with_group() -> anyhow::Result<()> { let server = TestServer::new().await; let (execution, observation) = server .with_execution("test-structured", async { + let group = observation_tools::group!("test_category") + .build() + .into_handle(); observe!("structured_observation") - .label("test/category") + .group(&group) .serde(&"test payload") }) .await?; let observations = server.list_observations(&execution.id()).await?; - assert_eq!(observations.len(), 1); - let obs = &observations[0]; - assert_eq!(obs.name, "structured_observation"); - assert_eq!(obs.labels, vec!["test/category"]); + // 2 observations: the group itself + the observation + assert_eq!(observations.len(), 2); + let obs_summary = observations.iter().find(|o| o.name == "structured_observation").unwrap(); + let obs = server.get_observation(&execution.id(), &obs_summary.id).await?; assert_eq!( - obs.payload.as_json(), + obs.payload().as_json(), Some(&serde_json::to_value("test payload")?) ); @@ -268,22 +271,25 @@ async fn test_observe_dynamic_name() -> anyhow::Result<()> { } #[test_log::test(tokio::test)] -async fn test_observe_dynamic_label() -> anyhow::Result<()> { +async fn test_observe_with_dynamic_group() -> anyhow::Result<()> { let server = TestServer::new().await; let (execution, observation) = server - .with_execution("test-dynamic-label", async { + .with_execution("test-dynamic-group", async { let endpoint = "users"; - let label = format!("api/{}/create", endpoint); - observe!("request").label(label).serde(&"data") + let name = format!("api/{}/create", endpoint); + let group = observation_tools::GroupBuilder::new(name) + .build() + .into_handle(); + observe!("request").group(&group).serde(&"data") }) .await?; let observations = server.list_observations(&execution.id()).await?; - assert_eq!(observations.len(), 1); - let obs = &observations[0]; + // 2 observations: group + observation + assert_eq!(observations.len(), 2); + let obs = observations.iter().find(|o| o.name == "request").unwrap(); assert_eq!(obs.name, "request"); - assert_eq!(obs.labels, vec!["api/users/create"]); let response = reqwest::get(&observation.handle().url()).await?; assert_eq!(response.status(), 200); @@ -317,13 +323,13 @@ async fn test_observe_debug_struct() -> anyhow::Result<()> { let observations = server.list_observations(&execution.id()).await?; assert_eq!(observations.len(), 1); - let obs = &observations[0]; + let obs = server.get_observation(&execution.id(), &observations[0].id).await?; assert_eq!(obs.name, "debug_struct"); - assert_eq!(obs.mime_type, "text/x-rust-debug"); + assert_eq!(obs.payloads[0].mime_type, "text/x-rust-debug"); // The payload should be parsed to JSON with _type field let json = obs - .payload + .payload() .as_json() .expect("payload should be parsed as JSON"); assert_eq!( diff --git a/crates/observation-tools-client/tests/tracing_layer_test.rs b/crates/observation-tools-client/tests/tracing_layer_test.rs index d7b82d6..fd46850 100644 --- a/crates/observation-tools-client/tests/tracing_layer_test.rs +++ b/crates/observation-tools-client/tests/tracing_layer_test.rs @@ -52,12 +52,12 @@ async fn test_event_captured() -> anyhow::Result<()> { let observations = server.list_observations(&execution.id()).await?; assert_eq!(observations.len(), 1); - let obs = &observations[0]; + let obs = server.get_observation(&execution.id(), &observations[0].id).await?; assert_eq!(obs.observation_type, ObservationType::LogEntry); assert_eq!(obs.log_level, LogLevel::Info); // Check payload is the message text - let payload = obs.payload.as_str().expect("Expected text payload"); + let payload = obs.payload().as_str().expect("Expected text payload"); assert_eq!(payload, "test event message"); // Check event fields are in metadata @@ -119,10 +119,7 @@ async fn test_parent_span_attribution() -> anyhow::Result<()> { // Find our specific observations let event = observations .iter() - .find(|o| { - o.observation_type == ObservationType::LogEntry - && o.labels.iter().any(|l| l.starts_with("tracing/events")) - }) + .find(|o| o.observation_type == ObservationType::LogEntry) .expect("Expected event observation"); let inner = observations .iter() @@ -245,9 +242,10 @@ async fn test_internal_events_filtered() -> anyhow::Result<()> { observations.iter().map(|o| &o.name).collect::>() ); - let obs = &observations[0]; - assert!( - obs.labels.iter().any(|l| l.contains("user_app")), + let obs = server.get_observation(&execution.id(), &observations[0].id).await?; + assert_eq!( + obs.payload().as_str(), + Some("user event"), "The captured observation should be the user event" ); @@ -275,9 +273,9 @@ async fn test_span_with_multiple_fields() -> anyhow::Result<()> { .await?; let observations = server.list_observations(&execution.id()).await?; - let obs = &observations[0]; + let obs = server.get_observation(&execution.id(), &observations[0].id).await?; assert_eq!(obs.observation_type, ObservationType::Span); - let fields = obs.payload.as_json().expect("Expected JSON payload"); + let fields = obs.payload().as_json().expect("Expected JSON payload"); assert_eq!(fields["request_id"], 42); assert_eq!(fields["user"], "alice"); assert_eq!(fields["enabled"], true); diff --git a/crates/observation-tools-macros/src/lib.rs b/crates/observation-tools-macros/src/lib.rs index 54a8f65..7b12eea 100644 --- a/crates/observation-tools-macros/src/lib.rs +++ b/crates/observation-tools-macros/src/lib.rs @@ -29,6 +29,44 @@ fn is_variable_name(name: &str) -> bool { name.chars().next().is_some_and(|c| c.is_lowercase()) } +/// The group! procedural macro +/// +/// Creates a GroupBuilder with the given name. Unlike `observe!`, groups +/// do not capture source info since they represent logical containers. +/// +/// Examples: +/// ```ignore +/// let group = group!("my_group").build(); +/// let child = group.child("child_group").build(); +/// +/// observe!("data").group(&group).serde(&data); +/// ``` +#[proc_macro] +pub fn group(input: TokenStream) -> TokenStream { + let args = parse_macro_input!(input as ObserveArg); + + let name_expr = match &args.name_expr { + Expr::Path(expr_path) if expr_path.path.segments.len() == 1 => { + let ident_name = expr_path.path.segments[0].ident.to_string(); + if is_variable_name(&ident_name) { + Expr::Lit(syn::ExprLit { + attrs: vec![], + lit: syn::Lit::Str(LitStr::new(&ident_name, args.name_expr.span())), + }) + } else { + args.name_expr + } + } + _ => args.name_expr, + }; + + let expanded = quote! { + ::observation_tools::GroupBuilder::new(#name_expr) + }; + + TokenStream::from(expanded) +} + /// The observe! procedural macro /// /// Creates an ObservationBuilder with the given name and source info diff --git a/crates/observation-tools-server/Cargo.toml b/crates/observation-tools-server/Cargo.toml index f1a7ac9..096f035 100644 --- a/crates/observation-tools-server/Cargo.toml +++ b/crates/observation-tools-server/Cargo.toml @@ -18,6 +18,7 @@ axum.workspace = true axum-extra.workspace = true base64.workspace = true bytes.workspace = true +chrono.workspace = true clap.workspace = true directories.workspace = true hmac.workspace = true @@ -26,6 +27,7 @@ minijinja-embed = {workspace = true} minijinja.workspace = true nom.workspace = true object_store.workspace = true +prost.workspace = true observation-tools-shared.workspace = true rand.workspace = true serde.workspace = true diff --git a/crates/observation-tools-server/src/api/mod.rs b/crates/observation-tools-server/src/api/mod.rs index 9f96bff..e1831dc 100644 --- a/crates/observation-tools-server/src/api/mod.rs +++ b/crates/observation-tools-server/src/api/mod.rs @@ -144,6 +144,7 @@ pub fn build_api() -> (Router, Router, OpenApi) { .routes(routes!(observations::list_observations)) .routes(routes!(observations::get_observation)) .routes(routes!(observations::get_observation_blob)) + .routes(routes!(observations::get_observation_blob_legacy)) .split_for_parts(); let mut openapi = OpenApi::default(); diff --git a/crates/observation-tools-server/src/api/observations/create.rs b/crates/observation-tools-server/src/api/observations/create.rs index ed80342..affee80 100644 --- a/crates/observation-tools-server/src/api/observations/create.rs +++ b/crates/observation-tools-server/src/api/observations/create.rs @@ -4,24 +4,39 @@ use crate::api::types::CreateObservationsResponse; use crate::api::AppError; use crate::storage::BlobStorage; use crate::storage::MetadataStorage; -use crate::storage::ObservationWithPayloadPointer; -use crate::storage::PayloadOrPointer; +use crate::storage::ObservationWithPayloads; +use crate::storage::PayloadData; +use crate::storage::StoredPayload; use axum::extract::Multipart; use axum::extract::Path; use axum::extract::State; use axum::Json; use observation_tools_shared::models::ExecutionId; use observation_tools_shared::Observation; +use observation_tools_shared::PayloadId; use observation_tools_shared::BLOB_THRESHOLD_BYTES; +use serde::Deserialize; use std::collections::HashMap; use std::sync::Arc; +/// Entry in the payload manifest sent by new clients +#[derive(Debug, Deserialize)] +struct PayloadManifestEntry { + observation_id: String, + payload_id: String, + #[allow(dead_code)] + name: String, + mime_type: String, + #[allow(dead_code)] + size: usize, +} + /// Create observations (batch) via multipart form /// /// The multipart form should contain: -/// - "observations": JSON array of observation metadata (with empty -/// payload.data) -/// - "{observation_id}": Binary payload data for each observation +/// - "observations": JSON array of observation metadata +/// - "{obs_id}:{payload_id}:{name}": Binary payload data for each payload +/// - Legacy: "{obs_id}:{name}" or "{obs_id}" formats are also supported #[tracing::instrument(skip(metadata, blobs, multipart))] pub async fn create_observations( State(metadata): State>, @@ -32,6 +47,7 @@ pub async fn create_observations( let _execution_id = ExecutionId::parse(&execution_id)?; let mut observations: Option> = None; + let mut payload_manifest: Option> = None; let mut payloads: HashMap = HashMap::new(); // Parse all multipart fields @@ -51,8 +67,17 @@ pub async fn create_observations( let parsed: Vec = serde_json::from_slice(&data) .map_err(|e| AppError::BadRequest(format!("Failed to parse observations JSON: {}", e)))?; observations = Some(parsed); + } else if name == "payload_manifest" { + // Parse payload manifest (new clients send this for authoritative MIME types) + let data = field + .bytes() + .await + .map_err(|e| AppError::BadRequest(format!("Failed to read payload manifest: {}", e)))?; + let parsed: Vec = serde_json::from_slice(&data) + .map_err(|e| AppError::BadRequest(format!("Failed to parse payload manifest JSON: {}", e)))?; + payload_manifest = Some(parsed); } else { - // This is a payload field, keyed by observation ID + // This is a payload field let data = field.bytes().await.map_err(|e| { AppError::BadRequest(format!("Failed to read payload data for {}: {}", name, e)) })?; @@ -60,7 +85,7 @@ pub async fn create_observations( } } - let mut observations = observations.ok_or_else(|| { + let observations = observations.ok_or_else(|| { AppError::BadRequest("Missing 'observations' field in multipart form".to_string()) })?; @@ -71,32 +96,85 @@ pub async fn create_observations( "Creating observations batch" ); - // Merge payload data into observations and handle blob storage - let mut observations_with_payloads = vec![]; - for obs in &mut observations { + // Build a lookup from (observation_id, payload_id) -> manifest entry for MIME types + let manifest_lookup: HashMap<(String, String), &PayloadManifestEntry> = payload_manifest + .as_ref() + .map(|entries| { + entries + .iter() + .map(|e| ((e.observation_id.clone(), e.payload_id.clone()), e)) + .collect() + }) + .unwrap_or_default(); + + // Build ObservationWithPayloads for each observation by collecting all matching payloads + let mut observations_with_payloads = Vec::with_capacity(observations.len()); + + for obs in &observations { let obs_id_str = obs.id.to_string(); - let Some(payload_data) = payloads.remove(&obs_id_str) else { + let mut obs_payloads: Vec = Vec::new(); + + // Collect all payload keys that belong to this observation + let matching_keys: Vec = payloads + .keys() + .filter(|k| { + let id_part = k.split(':').next().unwrap_or(k); + id_part == obs_id_str + }) + .cloned() + .collect(); + + for key in matching_keys { + let data = payloads.remove(&key).expect("key was just found"); + let (payload_id, name) = parse_payload_key(&key, &obs_id_str)?; + + // Determine MIME type: check manifest first, fall back to heuristic for old clients + let mime_type = if let Some(entry) = manifest_lookup.get(&(obs_id_str.clone(), payload_id.to_string())) { + entry.mime_type.clone() + } else if serde_json::from_slice::(&data).is_ok() { + "application/json".to_string() + } else if std::str::from_utf8(&data).is_ok() { + "text/plain".to_string() + } else { + "application/octet-stream".to_string() + }; + + let size = data.len(); + let payload_data = if size >= BLOB_THRESHOLD_BYTES { + blobs + .store_blob(obs.id, payload_id, data.clone()) + .await?; + PayloadData::Blob + } else { + PayloadData::Inline(data.to_vec()) + }; + + obs_payloads.push(StoredPayload { + id: payload_id, + name, + mime_type, + size, + data: payload_data, + }); + } + + if obs_payloads.is_empty() { return Err(AppError::BadRequest(format!( "Missing payload data for observation ID {}", obs.id ))); - }; - let payload = if payload_data.len() >= BLOB_THRESHOLD_BYTES { - blobs.store_blob(obs.id, payload_data).await?; - None - } else { - Some(payload_data.to_vec()) - }; - observations_with_payloads.push(ObservationWithPayloadPointer { + } + + observations_with_payloads.push(ObservationWithPayloads { observation: obs.clone(), - payload_or_pointer: PayloadOrPointer { payload }, + payloads: obs_payloads, }); } - // Warn about any orphan payloads - for orphan_id in payloads.keys() { + // Warn about any orphaned payloads + for key in payloads.keys() { tracing::warn!( - observation_id = %orphan_id, + payload_key = %key, "Received payload for unknown observation ID" ); } @@ -113,3 +191,50 @@ pub async fn create_observations( Ok(Json(CreateObservationsResponse {})) } + +/// Parse a payload key into (PayloadId, name). +/// Supports formats: +/// - "{obs_id}:{payload_id}:{name}" (new format) +/// - "{obs_id}:{name}" (legacy named format, generates new PayloadId) +/// - "{obs_id}" (legacy bare format, generates new PayloadId, name = "default") +fn parse_payload_key( + key: &str, + obs_id_str: &str, +) -> Result<(PayloadId, String), AppError> { + let rest = &key[obs_id_str.len()..]; + if rest.is_empty() { + // Legacy bare format: "{obs_id}" + return Ok((PayloadId::new(), "default".to_string())); + } + + if !rest.starts_with(':') { + return Err(AppError::BadRequest(format!( + "Invalid payload key format: {}", + key + ))); + } + + let after_obs_id = &rest[1..]; // skip the ':' + let parts: Vec<&str> = after_obs_id.splitn(2, ':').collect(); + + match parts.len() { + 1 => { + // Legacy "{obs_id}:{name}" format + Ok((PayloadId::new(), parts[0].to_string())) + } + 2 => { + // "{obs_id}:{payload_id}:{name}" - try parsing first part as PayloadId + if let Ok(payload_id) = PayloadId::parse(parts[0]) { + Ok((payload_id, parts[1].to_string())) + } else { + // Not a valid PayloadId, treat as legacy format where the whole thing is a name + // This shouldn't normally happen + Ok((PayloadId::new(), after_obs_id.to_string())) + } + } + _ => Err(AppError::BadRequest(format!( + "Invalid payload key format: {}", + key + ))), + } +} diff --git a/crates/observation-tools-server/src/api/observations/get.rs b/crates/observation-tools-server/src/api/observations/get.rs index 5ea6d74..5858689 100644 --- a/crates/observation-tools-server/src/api/observations/get.rs +++ b/crates/observation-tools-server/src/api/observations/get.rs @@ -2,14 +2,16 @@ use crate::api::AppError; use crate::storage::MetadataStorage; -use crate::storage::ObservationWithPayloadPointer; -use crate::storage::PayloadOrPointer; +use crate::storage::ObservationWithPayloads; +use crate::storage::PayloadData; +use crate::storage::StoredPayload; use axum::extract::Path; use axum::extract::State; use axum::Json; use observation_tools_shared::models::ExecutionId; use observation_tools_shared::Observation; use observation_tools_shared::ObservationId; +use observation_tools_shared::PayloadId; use serde::Deserialize; use serde::Serialize; use std::sync::Arc; @@ -24,14 +26,36 @@ pub struct GetObservationResponse { pub struct GetObservation { #[serde(flatten)] pub observation: Observation, - pub payload: PayloadOrPointerResponse, + pub payloads: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +pub struct GetPayload { + pub id: PayloadId, + pub name: String, + pub mime_type: String, + pub size: usize, + pub data: PayloadOrPointerResponse, } impl GetObservation { - pub fn new(obs: ObservationWithPayloadPointer) -> GetObservation { + pub fn new(obs: ObservationWithPayloads) -> GetObservation { + let exec_id = obs.observation.execution_id; + let obs_id = obs.observation.id; + let payloads = obs + .payloads + .into_iter() + .map(|p| GetPayload { + id: p.id, + name: p.name.clone(), + mime_type: p.mime_type.clone(), + size: p.size, + data: PayloadOrPointerResponse::from_stored_payload(p, exec_id, obs_id), + }) + .collect(); GetObservation { - payload: PayloadOrPointerResponse::new(&obs.observation, obs.payload_or_pointer), observation: obs.observation, + payloads, } } } @@ -46,30 +70,40 @@ pub enum PayloadOrPointerResponse { } impl PayloadOrPointerResponse { - pub fn new(obs: &Observation, payload_or_pointer: PayloadOrPointer) -> Self { - let Some(data) = payload_or_pointer.payload else { - return PayloadOrPointerResponse::Pointer { - url: format!("/api/exe/{}/obs/{}/content", obs.execution_id, obs.id), - }; + pub fn from_stored_payload( + payload: StoredPayload, + exec_id: ExecutionId, + obs_id: ObservationId, + ) -> Self { + let data = match payload.data { + PayloadData::Inline(data) => data, + PayloadData::Blob => { + return PayloadOrPointerResponse::Pointer { + url: format!( + "/api/exe/{}/obs/{}/payload/{}/content", + exec_id, obs_id, payload.id + ), + }; + } }; - if obs.mime_type.starts_with("application/json") { + if payload.mime_type.starts_with("application/json") { if let Ok(json_value) = serde_json::from_slice::(&data) { return PayloadOrPointerResponse::Json(json_value); } } - if obs.mime_type.starts_with("text/x-rust-debug") { + if payload.mime_type.starts_with("text/x-rust-debug") { if let Ok(text) = String::from_utf8(data.clone()) { let parsed = crate::debug_parser::parse_debug_to_json(&text); return PayloadOrPointerResponse::Json(parsed); } } - if obs.mime_type.starts_with("text/plain") { + if payload.mime_type.starts_with("text/plain") { if let Ok(text) = String::from_utf8(data.clone()) { return PayloadOrPointerResponse::Text(text); } } - if obs.mime_type.starts_with("text/markdown") { + if payload.mime_type.starts_with("text/markdown") { if let Ok(text) = String::from_utf8(data.clone()) { return PayloadOrPointerResponse::Markdown { raw: text }; } @@ -101,10 +135,7 @@ pub async fn get_observation( ) -> Result, AppError> { let _execution_id = ExecutionId::parse(&execution_id)?; let observation_id = ObservationId::parse(&observation_id)?; - let observations = metadata.get_observations(&[observation_id]).await?; - let observation = observations.into_iter().next().ok_or_else(|| { - crate::storage::StorageError::NotFound(format!("Observation {} not found", observation_id)) - })?; + let observation = metadata.get_observation(observation_id).await?; Ok(Json(GetObservationResponse { observation: GetObservation::new(observation), })) diff --git a/crates/observation-tools-server/src/api/observations/get_blob.rs b/crates/observation-tools-server/src/api/observations/get_blob.rs index 3f317ae..12db9d1 100644 --- a/crates/observation-tools-server/src/api/observations/get_blob.rs +++ b/crates/observation-tools-server/src/api/observations/get_blob.rs @@ -3,6 +3,7 @@ use crate::api::AppError; use crate::storage::BlobStorage; use crate::storage::MetadataStorage; +use crate::storage::PayloadData; use crate::storage::StorageError; use axum::extract::Path; use axum::extract::State; @@ -11,9 +12,70 @@ use axum::http::HeaderValue; use axum::http::StatusCode; use axum::response::IntoResponse; use observation_tools_shared::ObservationId; +use observation_tools_shared::PayloadId; use std::sync::Arc; -/// Get observation blob content +/// Get observation payload content +#[utoipa::path( + get, + path = "/api/exe/{execution_id}/obs/{observation_id}/payload/{payload_id}/content", + params( + ("execution_id" = String, Path, description = "Execution ID"), + ("observation_id" = String, Path, description = "Observation ID"), + ("payload_id" = String, Path, description = "Payload ID") + ), + responses( + (status = 200, description = "Payload content", body = Vec, content_type = "application/octet-stream"), + (status = 404, description = "Payload not found"), + (status = 400, description = "Bad request") + ), + tag = "observations" +)] +#[tracing::instrument(skip(metadata, blobs))] +pub async fn get_observation_blob( + State(metadata): State>, + State(blobs): State>, + Path((_execution_id, observation_id, payload_id)): Path<(String, String, String)>, +) -> Result { + let observation_id = ObservationId::parse(&observation_id)?; + let payload_id = PayloadId::parse(&payload_id)?; + let observation = metadata.get_observation(observation_id).await?; + + // Find the payload in the manifest + let payload = observation + .payloads + .iter() + .find(|p| p.id == payload_id) + .ok_or_else(|| { + StorageError::NotFound(format!( + "Payload {} not found for observation {}", + payload_id, observation_id + )) + })?; + + let content_type = HeaderValue::from_str(&payload.mime_type) + .unwrap_or_else(|_| HeaderValue::from_static("application/octet-stream")); + + // If the payload data is inline (from prefix scan), return it directly + if let PayloadData::Inline(ref data) = payload.data { + return Ok(( + StatusCode::OK, + [(header::CONTENT_TYPE, content_type)], + data.clone(), + )); + } + + // Otherwise fetch from blob storage + let blob = blobs.get_blob(observation_id, payload_id).await?; + Ok(( + StatusCode::OK, + [(header::CONTENT_TYPE, content_type)], + blob.to_vec(), + )) +} + +/// Get observation blob content (legacy route for backward compat) +/// This is kept to support old URLs like /api/exe/{exec_id}/obs/{obs_id}/content #[utoipa::path( get, path = "/api/exe/{execution_id}/obs/{observation_id}/content", @@ -29,27 +91,34 @@ use std::sync::Arc; tag = "observations" )] #[tracing::instrument(skip(metadata, blobs))] -pub async fn get_observation_blob( +pub async fn get_observation_blob_legacy( State(metadata): State>, State(blobs): State>, Path((_execution_id, observation_id)): Path<(String, String)>, ) -> Result { let observation_id = ObservationId::parse(&observation_id)?; - let observations = metadata.get_observations(&[observation_id]).await?; - let observation = observations - .into_iter() - .next() - .ok_or_else(|| StorageError::NotFound(format!("Observation {} not found", observation_id)))?; - let content_type = HeaderValue::from_str(&observation.observation.mime_type) + let observation = metadata.get_observation(observation_id).await?; + + // Use the first payload + let payload = observation.payloads.first().ok_or_else(|| { + StorageError::NotFound(format!( + "No payloads found for observation {}", + observation_id + )) + })?; + + let content_type = HeaderValue::from_str(&payload.mime_type) .unwrap_or_else(|_| HeaderValue::from_static("application/octet-stream")); - if let Some(payload) = observation.payload_or_pointer.payload { + + if let PayloadData::Inline(ref data) = payload.data { return Ok(( StatusCode::OK, [(header::CONTENT_TYPE, content_type)], - payload, + data.clone(), )); } - let blob = blobs.get_blob(observation_id).await?; + + let blob = blobs.get_blob(observation_id, payload.id).await?; Ok(( StatusCode::OK, [(header::CONTENT_TYPE, content_type)], diff --git a/crates/observation-tools-server/src/api/observations/list.rs b/crates/observation-tools-server/src/api/observations/list.rs index 432c8a4..3494376 100644 --- a/crates/observation-tools-server/src/api/observations/list.rs +++ b/crates/observation-tools-server/src/api/observations/list.rs @@ -1,7 +1,6 @@ //! List observations handler use crate::api::observations::get::GetObservation; -use crate::api::observations::get::PayloadOrPointerResponse; use crate::api::types::ListObservationsQuery; use crate::api::AppError; use crate::storage::MetadataStorage; @@ -59,10 +58,7 @@ pub async fn list_observations( Ok(Json(ListObservationsResponse { observations: observations .into_iter() - .map(|o| GetObservation { - payload: PayloadOrPointerResponse::new(&o.observation, o.payload_or_pointer), - observation: o.observation, - }) + .map(GetObservation::new) .collect(), has_next_page, })) diff --git a/crates/observation-tools-server/src/api/observations/mod.rs b/crates/observation-tools-server/src/api/observations/mod.rs index 3e2a03f..e9a8736 100644 --- a/crates/observation-tools-server/src/api/observations/mod.rs +++ b/crates/observation-tools-server/src/api/observations/mod.rs @@ -9,7 +9,10 @@ pub use create::create_observations; pub use get::__path_get_observation; pub use get::get_observation; pub use get::GetObservation; +pub use get::GetPayload; pub use get_blob::__path_get_observation_blob; +pub use get_blob::__path_get_observation_blob_legacy; pub use get_blob::get_observation_blob; +pub use get_blob::get_observation_blob_legacy; pub use list::__path_list_observations; pub use list::list_observations; diff --git a/crates/observation-tools-server/src/storage/blob.rs b/crates/observation-tools-server/src/storage/blob.rs index 16515b3..88ad985 100644 --- a/crates/observation-tools-server/src/storage/blob.rs +++ b/crates/observation-tools-server/src/storage/blob.rs @@ -7,17 +7,23 @@ use object_store::local::LocalFileSystem; use object_store::path::Path as ObjectPath; use object_store::ObjectStore; use observation_tools_shared::ObservationId; +use observation_tools_shared::PayloadId; use std::path::Path; use std::sync::Arc; /// Trait for storing and retrieving blob data #[async_trait::async_trait] pub trait BlobStorage: Send + Sync { - /// Store blob data for an observation - async fn store_blob(&self, id: ObservationId, data: Bytes) -> StorageResult<()>; - - /// Retrieve blob data for an observation - async fn get_blob(&self, id: ObservationId) -> StorageResult; + /// Store blob data for an observation payload + async fn store_blob( + &self, + obs_id: ObservationId, + payload_id: PayloadId, + data: Bytes, + ) -> StorageResult<()>; + + /// Retrieve blob data for an observation payload + async fn get_blob(&self, obs_id: ObservationId, payload_id: PayloadId) -> StorageResult; } /// Object store-based blob storage @@ -41,16 +47,21 @@ impl LocalBlobStorage { }) } - /// Convert observation ID to object path - fn id_to_path(&self, id: ObservationId) -> ObjectPath { - ObjectPath::from(id.to_string()) + /// Convert observation ID + payload ID to object path + fn id_to_path(&self, obs_id: ObservationId, payload_id: PayloadId) -> ObjectPath { + ObjectPath::from(format!("{}/{}", obs_id, payload_id)) } } #[async_trait::async_trait] impl BlobStorage for LocalBlobStorage { - async fn store_blob(&self, id: ObservationId, data: Bytes) -> StorageResult<()> { - let path = self.id_to_path(id); + async fn store_blob( + &self, + obs_id: ObservationId, + payload_id: PayloadId, + data: Bytes, + ) -> StorageResult<()> { + let path = self.id_to_path(obs_id, payload_id); self .store @@ -61,8 +72,8 @@ impl BlobStorage for LocalBlobStorage { Ok(()) } - async fn get_blob(&self, id: ObservationId) -> StorageResult { - let path = self.id_to_path(id); + async fn get_blob(&self, obs_id: ObservationId, payload_id: PayloadId) -> StorageResult { + let path = self.id_to_path(obs_id, payload_id); let result = self .store diff --git a/crates/observation-tools-server/src/storage/metadata.rs b/crates/observation-tools-server/src/storage/metadata.rs index a4612e4..b71571a 100644 --- a/crates/observation-tools-server/src/storage/metadata.rs +++ b/crates/observation-tools-server/src/storage/metadata.rs @@ -1,12 +1,18 @@ //! Metadata storage for executions and observations -use super::ObservationWithPayloadPointer; +use super::proto::StoredInlinePayload; +use super::proto::StoredObservation; +use super::proto::StoredPayloadMeta; +use super::ObservationWithPayloads; +use super::PayloadData; use super::StorageError; use super::StorageResult; +use super::StoredPayload; use observation_tools_shared::Execution; use observation_tools_shared::ExecutionId; use observation_tools_shared::ObservationId; use observation_tools_shared::ObservationType; +use prost::Message; use std::path::Path; use tracing::trace; @@ -29,27 +35,27 @@ pub trait MetadataStorage: Send + Sync { /// Count total number of executions async fn count_executions(&self) -> StorageResult; - /// Store multiple observations in a batch + /// Store multiple observations with their payloads in a batch async fn store_observations( &self, - observations: Vec, + observations: Vec, ) -> StorageResult<()>; - /// Get observations by their IDs - async fn get_observations( + /// Get a single observation with all inline payload data (via prefix scan) + async fn get_observation( &self, - ids: &[ObservationId], - ) -> StorageResult>; + id: ObservationId, + ) -> StorageResult; - /// List observations for an execution (with optional pagination and type - /// filter) + /// List observations for an execution (with optional pagination and type filter). + /// Returns observations with all payloads as PayloadData::Blob (metadata only). async fn list_observations( &self, execution_id: ExecutionId, limit: Option, offset: Option, observation_type: Option, - ) -> StorageResult>; + ) -> StorageResult>; /// Count total number of observations for an execution (with optional type /// filter) @@ -65,6 +71,42 @@ pub struct SledStorage { db: sled::Db, } +/// Separator bytes for sled key structure: +/// {obs_id_16bytes}\x00\x00 -> metadata +/// {obs_id_16bytes}\x00\x01{payload_id_16bytes} -> inline payload +const KEY_SEP: u8 = 0x00; +const KEY_METADATA_SUFFIX: u8 = 0x00; +const KEY_PAYLOAD_SUFFIX: u8 = 0x01; + +fn obs_id_bytes(obs_id: &ObservationId) -> Vec { + // Use the string representation as bytes for the key + obs_id.to_string().into_bytes() +} + +fn metadata_key(obs_id: &ObservationId) -> Vec { + let mut key = obs_id_bytes(obs_id); + key.push(KEY_SEP); + key.push(KEY_METADATA_SUFFIX); + key +} + +fn inline_payload_key( + obs_id: &ObservationId, + payload_id: &observation_tools_shared::PayloadId, +) -> Vec { + let mut key = obs_id_bytes(obs_id); + key.push(KEY_SEP); + key.push(KEY_PAYLOAD_SUFFIX); + key.extend_from_slice(payload_id.to_string().as_bytes()); + key +} + +fn obs_prefix(obs_id: &ObservationId) -> Vec { + let mut prefix = obs_id_bytes(obs_id); + prefix.push(KEY_SEP); + prefix +} + impl SledStorage { /// Create a new Sled storage instance pub fn new(path: impl AsRef) -> StorageResult { @@ -86,6 +128,93 @@ impl SledStorage { fn execution_observations_tree(&self) -> StorageResult { Ok(self.db.open_tree("execution_observations")?) } + + /// Decode a stored observation from a metadata key's value, returning + /// observation with all payloads marked as Blob + fn decode_metadata_only( + &self, + value: &[u8], + ) -> StorageResult { + let stored = StoredObservation::decode(value)?; + let observation = stored.to_observation()?; + let payloads = stored + .payload_manifest + .iter() + .map(|pm| { + Ok(StoredPayload { + id: pm.payload_id()?, + name: pm.name.clone(), + mime_type: pm.mime_type.clone(), + size: pm.size as usize, + data: PayloadData::Blob, + }) + }) + .collect::>>()?; + + Ok(ObservationWithPayloads { + observation, + payloads, + }) + } + + /// Decode a stored observation via prefix scan, returning observation with + /// inline payload data where available + fn decode_with_inline_payloads( + &self, + obs_tree: &sled::Tree, + obs_id: &ObservationId, + ) -> StorageResult { + let prefix = obs_prefix(obs_id); + let mut stored_obs: Option = None; + let mut inline_data: std::collections::HashMap> = + std::collections::HashMap::new(); + + for item in obs_tree.scan_prefix(&prefix) { + let (key, value) = item?; + // Check if this is the metadata key or a payload key + let suffix_start = prefix.len(); + if key.len() == suffix_start + 1 && key[suffix_start] == KEY_METADATA_SUFFIX { + stored_obs = Some(StoredObservation::decode(value.as_ref())?); + } else if key.len() > suffix_start + 1 && key[suffix_start] == KEY_PAYLOAD_SUFFIX { + let payload_id_str = + String::from_utf8(key[suffix_start + 1..].to_vec()) + .map_err(|e| StorageError::Internal(format!("Invalid payload key: {}", e)))?; + let stored_payload = StoredInlinePayload::decode(value.as_ref())?; + inline_data.insert(payload_id_str, stored_payload.data); + } + } + + let stored = stored_obs.ok_or_else(|| { + StorageError::NotFound(format!("Observation {} not found", obs_id)) + })?; + + let observation = stored.to_observation()?; + let payloads = stored + .payload_manifest + .iter() + .map(|pm| { + let data = if pm.is_blob { + PayloadData::Blob + } else if let Some(inline) = inline_data.remove(&pm.payload_id) { + PayloadData::Inline(inline) + } else { + PayloadData::Blob + }; + Ok(StoredPayload { + id: pm.payload_id()?, + name: pm.name.clone(), + mime_type: pm.mime_type.clone(), + size: pm.size as usize, + data, + }) + }) + .collect::>>()?; + + Ok(ObservationWithPayloads { + observation, + payloads, + }) + } } #[async_trait::async_trait] @@ -137,42 +266,54 @@ impl MetadataStorage for SledStorage { async fn store_observations( &self, - observations: Vec, + observations: Vec, ) -> StorageResult<()> { let obs_tree = self.observations_tree()?; let exec_obs_tree = self.execution_observations_tree()?; - for observation in observations.into_iter() { - let key = observation.observation.id.to_string(); - let value = serde_json::to_vec(&observation)?; - obs_tree.insert(key.as_bytes(), value)?; + for obs_with_payloads in observations { + let obs = &obs_with_payloads.observation; + let obs_id = obs.id; + + // Build the stored observation with payload manifest + let mut stored = StoredObservation::from_observation(obs); + for payload in &obs_with_payloads.payloads { + stored.payload_manifest.push(StoredPayloadMeta { + payload_id: payload.id.to_string(), + name: payload.name.clone(), + mime_type: payload.mime_type.clone(), + size: payload.size as u64, + is_blob: matches!(payload.data, PayloadData::Blob), + }); + } + + // Store the metadata key + let key = metadata_key(&obs_id); + let value = stored.encode_to_vec(); + obs_tree.insert(key, value)?; + + // Store inline payloads + for payload in &obs_with_payloads.payloads { + if let PayloadData::Inline(ref data) = payload.data { + let pkey = inline_payload_key(&obs_id, &payload.id); + let stored_payload = StoredInlinePayload { data: data.clone() }; + obs_tree.insert(pkey, stored_payload.encode_to_vec())?; + } + } + // Update the execution->observations index - let exec_key = format!( - "{}:{}", - observation.observation.execution_id, observation.observation.id - ); + let exec_key = format!("{}:{}", obs.execution_id, obs.id); trace!("Storing execution-observation index: {}", exec_key); - exec_obs_tree.insert( - exec_key.as_bytes(), - observation.observation.id.to_string().as_bytes(), - )?; + exec_obs_tree.insert(exec_key.as_bytes(), obs.id.to_string().as_bytes())?; } Ok(()) } - async fn get_observations( + async fn get_observation( &self, - ids: &[ObservationId], - ) -> StorageResult> { - let tree = self.observations_tree()?; - let mut observations = Vec::with_capacity(ids.len()); - for id in ids { - let key = id.to_string(); - if let Some(value) = tree.get(key.as_bytes())? { - let observation = serde_json::from_slice(&value)?; - observations.push(observation); - } - } - Ok(observations) + id: ObservationId, + ) -> StorageResult { + let obs_tree = self.observations_tree()?; + self.decode_with_inline_payloads(&obs_tree, &id) } async fn list_observations( @@ -181,20 +322,23 @@ impl MetadataStorage for SledStorage { limit: Option, offset: Option, observation_type: Option, - ) -> StorageResult> { + ) -> StorageResult> { let obs_tree = self.observations_tree()?; let exec_obs_tree = self.execution_observations_tree()?; let prefix = format!("{}:", execution_id); - let observations: Vec = exec_obs_tree + let observations: Vec = exec_obs_tree .scan_prefix(prefix.as_bytes()) .values() .filter_map(|result| { - result.ok().and_then(|obs_id| { + result.ok().and_then(|obs_id_bytes| { + let obs_id_str = String::from_utf8(obs_id_bytes.to_vec()).ok()?; + let obs_id = ObservationId::parse(&obs_id_str).ok()?; + let key = metadata_key(&obs_id); obs_tree - .get(&obs_id) + .get(&key) .ok() .flatten() - .and_then(|v| serde_json::from_slice::(&v).ok()) + .and_then(|v| self.decode_metadata_only(&v).ok()) }) }) .filter(|obs| observation_type.map_or(true, |t| obs.observation.observation_type == t)) @@ -216,12 +360,15 @@ impl MetadataStorage for SledStorage { .scan_prefix(prefix.as_bytes()) .values() .filter_map(|result| { - result.ok().and_then(|obs_id| { + result.ok().and_then(|obs_id_bytes| { + let obs_id_str = String::from_utf8(obs_id_bytes.to_vec()).ok()?; + let obs_id = ObservationId::parse(&obs_id_str).ok()?; + let key = metadata_key(&obs_id); obs_tree - .get(&obs_id) + .get(&key) .ok() .flatten() - .and_then(|v| serde_json::from_slice::(&v).ok()) + .and_then(|v| self.decode_metadata_only(&v).ok()) }) }) .filter(|obs| observation_type.map_or(true, |t| obs.observation.observation_type == t)) diff --git a/crates/observation-tools-server/src/storage/mod.rs b/crates/observation-tools-server/src/storage/mod.rs index fe16c48..f41ca8e 100644 --- a/crates/observation-tools-server/src/storage/mod.rs +++ b/crates/observation-tools-server/src/storage/mod.rs @@ -2,13 +2,13 @@ pub mod blob; pub mod metadata; +pub mod proto; pub use blob::BlobStorage; pub use blob::LocalBlobStorage; pub use metadata::MetadataStorage; pub use metadata::SledStorage; -use serde::Deserialize; -use serde::Serialize; +use observation_tools_shared::PayloadId; use thiserror::Error; /// Storage errors @@ -29,6 +29,9 @@ pub enum StorageError { #[error("Database error: {0}")] Database(#[from] sled::Error), + #[error("Protobuf decode error: {0}")] + Protobuf(#[from] prost::DecodeError), + #[error("Search error: {0}")] Search(String), @@ -39,13 +42,26 @@ pub enum StorageError { /// Result type for storage operations pub type StorageResult = Result; -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct ObservationWithPayloadPointer { +/// An observation with all its payloads +#[derive(Clone, Debug)] +pub struct ObservationWithPayloads { pub observation: observation_tools_shared::Observation, - pub payload_or_pointer: PayloadOrPointer, + pub payloads: Vec, +} + +/// A single payload attached to an observation +#[derive(Clone, Debug)] +pub struct StoredPayload { + pub id: PayloadId, + pub name: String, + pub mime_type: String, + pub size: usize, + pub data: PayloadData, } -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct PayloadOrPointer { - pub payload: Option>, +/// Whether payload data is inline or stored as a blob +#[derive(Clone, Debug)] +pub enum PayloadData { + Inline(Vec), + Blob, } diff --git a/crates/observation-tools-server/src/storage/proto.rs b/crates/observation-tools-server/src/storage/proto.rs new file mode 100644 index 0000000..0294125 --- /dev/null +++ b/crates/observation-tools-server/src/storage/proto.rs @@ -0,0 +1,214 @@ +use observation_tools_shared::ExecutionId; +use observation_tools_shared::GroupId; +use observation_tools_shared::LogLevel; +use observation_tools_shared::Observation; +use observation_tools_shared::ObservationId; +use observation_tools_shared::ObservationType; +use observation_tools_shared::PayloadId; +use observation_tools_shared::SourceInfo; +use std::collections::HashMap; + +use super::StorageError; + +/// Protobuf-encoded observation metadata + payload manifest +#[derive(Clone, PartialEq, prost::Message)] +pub struct StoredObservation { + /// ObservationId as simple UUID string + #[prost(string, tag = "1")] + pub id: String, + /// ExecutionId as simple UUID string + #[prost(string, tag = "2")] + pub execution_id: String, + #[prost(string, tag = "3")] + pub name: String, + /// ObservationType as i32 + #[prost(int32, tag = "4")] + pub observation_type: i32, + /// LogLevel as i32 + #[prost(int32, tag = "5")] + pub log_level: i32, + #[prost(message, optional, tag = "6")] + pub source: Option, + /// Metadata as repeated key-value pairs + #[prost(message, repeated, tag = "7")] + pub metadata: Vec, + /// Group IDs as strings + #[prost(string, repeated, tag = "8")] + pub group_ids: Vec, + /// Parent group ID (optional) + #[prost(string, optional, tag = "9")] + pub parent_group_id: Option, + /// Parent span ID (optional) + #[prost(string, optional, tag = "10")] + pub parent_span_id: Option, + /// Created at as RFC3339 string + #[prost(string, tag = "11")] + pub created_at: String, + /// MIME type of the primary payload (kept for backward compat in Observation) + #[prost(string, tag = "12")] + pub mime_type: String, + /// Size of the primary payload in bytes + #[prost(uint64, tag = "13")] + pub payload_size: u64, + /// Payload manifest: metadata about all payloads for this observation + #[prost(message, repeated, tag = "14")] + pub payload_manifest: Vec, +} + +#[derive(Clone, PartialEq, prost::Message)] +pub struct StoredSourceInfo { + #[prost(string, tag = "1")] + pub file: String, + #[prost(uint32, tag = "2")] + pub line: u32, + #[prost(uint32, optional, tag = "3")] + pub column: Option, +} + +#[derive(Clone, PartialEq, prost::Message)] +pub struct StoredKeyValue { + #[prost(string, tag = "1")] + pub key: String, + #[prost(string, tag = "2")] + pub value: String, +} + +/// Metadata about a single payload attached to an observation +#[derive(Clone, PartialEq, prost::Message)] +pub struct StoredPayloadMeta { + /// PayloadId as simple UUID string + #[prost(string, tag = "1")] + pub payload_id: String, + /// Name of the payload (e.g., "default", "headers", "body") + #[prost(string, tag = "2")] + pub name: String, + /// MIME type + #[prost(string, tag = "3")] + pub mime_type: String, + /// Size in bytes + #[prost(uint64, tag = "4")] + pub size: u64, + /// Whether this payload is stored as a blob (true) or inline (false) + #[prost(bool, tag = "5")] + pub is_blob: bool, +} + +/// Protobuf-encoded inline payload data +#[derive(Clone, PartialEq, prost::Message)] +pub struct StoredInlinePayload { + #[prost(bytes = "vec", tag = "1")] + pub data: Vec, +} + +// Conversion: Observation -> StoredObservation (without payloads, those are set separately) +impl StoredObservation { + pub fn from_observation(obs: &Observation) -> Self { + StoredObservation { + id: obs.id.to_string(), + execution_id: obs.execution_id.to_string(), + name: obs.name.clone(), + observation_type: observation_type_to_i32(obs.observation_type), + log_level: log_level_to_i32(obs.log_level), + source: obs.source.as_ref().map(|s| StoredSourceInfo { + file: s.file.clone(), + line: s.line, + column: s.column, + }), + metadata: obs + .metadata + .iter() + .map(|(k, v)| StoredKeyValue { + key: k.clone(), + value: v.clone(), + }) + .collect(), + group_ids: obs.group_ids.iter().map(|g| g.to_string()).collect(), + parent_group_id: obs.parent_group_id.as_ref().map(|g| g.to_string()), + parent_span_id: obs.parent_span_id.clone(), + created_at: obs.created_at.to_rfc3339(), + mime_type: String::new(), + payload_size: 0, + payload_manifest: Vec::new(), + } + } + + pub fn to_observation(&self) -> Result { + let id = ObservationId::parse(&self.id) + .map_err(|e| StorageError::Internal(format!("Invalid observation ID: {}", e)))?; + let execution_id = ExecutionId::parse(&self.execution_id) + .map_err(|e| StorageError::Internal(format!("Invalid execution ID: {}", e)))?; + let created_at = chrono::DateTime::parse_from_rfc3339(&self.created_at) + .map_err(|e| StorageError::Internal(format!("Invalid created_at: {}", e)))? + .with_timezone(&chrono::Utc); + + let mut metadata = HashMap::new(); + for kv in &self.metadata { + metadata.insert(kv.key.clone(), kv.value.clone()); + } + + Ok(Observation { + id, + execution_id, + name: self.name.clone(), + observation_type: observation_type_from_i32(self.observation_type), + log_level: log_level_from_i32(self.log_level), + source: self.source.as_ref().map(|s| SourceInfo { + file: s.file.clone(), + line: s.line, + column: s.column, + }), + metadata, + group_ids: self.group_ids.iter().map(|g| GroupId::parse(g)).collect(), + parent_group_id: self.parent_group_id.as_ref().map(|g| GroupId::parse(g)), + parent_span_id: self.parent_span_id.clone(), + created_at, + }) + } +} + +impl StoredPayloadMeta { + pub fn payload_id(&self) -> Result { + PayloadId::parse(&self.payload_id) + .map_err(|e| StorageError::Internal(format!("Invalid payload ID: {}", e))) + } +} + +fn observation_type_to_i32(t: ObservationType) -> i32 { + match t { + ObservationType::LogEntry => 0, + ObservationType::Payload => 1, + ObservationType::Span => 2, + ObservationType::Group => 3, + } +} + +fn observation_type_from_i32(v: i32) -> ObservationType { + match v { + 0 => ObservationType::LogEntry, + 1 => ObservationType::Payload, + 2 => ObservationType::Span, + 3 => ObservationType::Group, + _ => ObservationType::Payload, + } +} + +fn log_level_to_i32(l: LogLevel) -> i32 { + match l { + LogLevel::Trace => 0, + LogLevel::Debug => 1, + LogLevel::Info => 2, + LogLevel::Warning => 3, + LogLevel::Error => 4, + } +} + +fn log_level_from_i32(v: i32) -> LogLevel { + match v { + 0 => LogLevel::Trace, + 1 => LogLevel::Debug, + 2 => LogLevel::Info, + 3 => LogLevel::Warning, + 4 => LogLevel::Error, + _ => LogLevel::Info, + } +} diff --git a/crates/observation-tools-server/src/ui/execution_detail.rs b/crates/observation-tools-server/src/ui/execution_detail.rs index 06045db..938cc39 100644 --- a/crates/observation-tools-server/src/ui/execution_detail.rs +++ b/crates/observation-tools-server/src/ui/execution_detail.rs @@ -116,11 +116,11 @@ async fn execution_detail_view( // If observation ID is provided, load the observation for the side panel let selected_observation = if let Some(obs_id) = &query.obs { let observation_id = ObservationId::parse(obs_id)?; - let obs_list = metadata.get_observations(&[observation_id]).await?; - obs_list - .into_iter() - .next() - .map(|obs| GetObservation::new(obs)) + match metadata.get_observation(observation_id).await { + Ok(obs) => Some(GetObservation::new(obs)), + Err(StorageError::NotFound(_)) => None, + Err(e) => return Err(e.into()), + } } else { None }; diff --git a/crates/observation-tools-server/src/ui/observation_detail.rs b/crates/observation-tools-server/src/ui/observation_detail.rs index f9ec595..2559c97 100644 --- a/crates/observation-tools-server/src/ui/observation_detail.rs +++ b/crates/observation-tools-server/src/ui/observation_detail.rs @@ -25,11 +25,8 @@ pub async fn observation_detail( "Rendering observation detail page" ); let parsed_observation_id = observation_tools_shared::ObservationId::parse(&observation_id)?; - let observation = match metadata.get_observations(&[parsed_observation_id]).await { - Ok(observations) => observations - .into_iter() - .next() - .map(|obs| GetObservation::new(obs)), + let observation = match metadata.get_observation(parsed_observation_id).await { + Ok(obs) => Some(GetObservation::new(obs)), Err(crate::storage::StorageError::NotFound(_)) => { // The user may go to the observation page before it's uploaded. Since the page // auto-refreshes, we do not throw an error so it will show up once it's diff --git a/crates/observation-tools-server/templates/_observation_content.html b/crates/observation-tools-server/templates/_observation_content.html index e638a21..b74effb 100644 --- a/crates/observation-tools-server/templates/_observation_content.html +++ b/crates/observation-tools-server/templates/_observation_content.html @@ -29,10 +29,10 @@

{{ observation.name }}

{{ observation.parent_span_id }}

{% endif %} - {% if observation.labels %} -

- labels: - {{ observation.labels|join(", ") }} + {% if observation.group_ids %} +

+ groups: + {{ observation.group_ids|join(", ") }}

{% endif %} {% if observation.metadata %} @@ -49,52 +49,56 @@

met
-

payload

-

- type: - {{ observation.mime_type }} -

-

- size: - {{ observation.payload_size }} bytes -

-

- Open raw content in new tab -

+

payloads

- {% if observation.payload_size > display_threshold %} -
-

Payload is too large to display inline.

-
- {% elif observation.payload.Markdown is defined %} -
- {{ observation.payload.Markdown.raw|render_markdown|safe }} -
- {% elif observation.payload.Json is defined %} -
- {{ render_json(observation.payload.Json) }} -
- {% elif observation.payload.Text is defined %} -
{{ observation.payload.Text|unescape }}
- {% elif observation.payload.Pointer is defined %} -
-

Payload stored externally.

- Load external content + {% for payload in observation.payloads %} +
+

{{ payload.name }}

+

+ type: + {{ payload.mime_type }} + size: + {{ payload.size }} bytes +

+

+ Open raw content in new tab +

+ + {% if payload.size > display_threshold %} +
+

Payload is too large to display inline.

+
+ {% elif payload.data.Markdown is defined %} +
+ {{ payload.data.Markdown.raw|render_markdown|safe }} +
+ {% elif payload.data.Json is defined %} +
+ {{ render_json(payload.data.Json) }} +
+ {% elif payload.data.Text is defined %} +
{{ payload.data.Text|unescape }}
+ {% elif payload.data.Pointer is defined %} +
+

Payload stored externally.

+ Load external content +
+ {% elif payload.data.InlineBinary is defined %} +
+[Binary content - {{ payload.size }} bytes]
+ {% else %} +
[Unknown payload type]
+ {% endif %}
- {% elif observation.payload.InlineBinary is defined %} -
-[Binary content - {{ observation.payload_size }} bytes]
- {% else %} -
[Unknown payload type]
- {% endif %} + {% endfor %}
diff --git a/crates/observation-tools-server/templates/execution_detail.html b/crates/observation-tools-server/templates/execution_detail.html index acb92d3..8837779 100644 --- a/crates/observation-tools-server/templates/execution_detail.html +++ b/crates/observation-tools-server/templates/execution_detail.html @@ -95,27 +95,27 @@

observations

>{% if obs.source %}{{ obs.source.file }}:{{ obs.source.line }}{% else %}-{% endif %} {% if obs.observation_type == 'LogEntry' %} - {% if obs.payload.Text is defined %} - {{ obs.payload.Text[:200] }}{% if obs.payload.Text|length > 200 %}...{% endif %} - {% elif obs.payload.Json is defined %} + >{% set p = obs.payloads[0].data if obs.payloads else none %}{% if obs.observation_type == 'LogEntry' %} + {% if p.Text is defined %} + {{ p.Text[:200] }}{% if p.Text|length > 200 %}...{% endif %} + {% elif p.Json is defined %} [JSON] - {% elif obs.payload.Markdown is defined %} - {{ obs.payload.Markdown.raw[:200] }}{% if obs.payload.Markdown.raw|length > 200 %}...{% endif %} - {% elif obs.payload.Pointer is defined %} + {% elif p.Markdown is defined %} + {{ p.Markdown.raw[:200] }}{% if p.Markdown.raw|length > 200 %}...{% endif %} + {% elif p.Pointer is defined %} [external content] {% else %} [binary] {% endif %} {% else %} {{ obs.name }}: - {% if obs.payload.Text is defined %} - {{ obs.payload.Text[:100] }}{% if obs.payload.Text|length > 100 %}...{% endif %} - {% elif obs.payload.Json is defined %} + {% if p.Text is defined %} + {{ p.Text[:100] }}{% if p.Text|length > 100 %}...{% endif %} + {% elif p.Json is defined %} [JSON] - {% elif obs.payload.Markdown is defined %} - {{ obs.payload.Markdown.raw[:100] }}{% if obs.payload.Markdown.raw|length > 100 %}...{% endif %} - {% elif obs.payload.Pointer is defined %} + {% elif p.Markdown is defined %} + {{ p.Markdown.raw[:100] }}{% if p.Markdown.raw|length > 100 %}...{% endif %} + {% elif p.Pointer is defined %} [external content] {% else %} [binary] @@ -137,13 +137,13 @@

observations

>{{ obs.name }} - — {{ obs.mime_type }} + {% if obs.payloads %}— {{ obs.payloads[0].mime_type }}{% endif %} {% if obs.source %}— {{ obs.source.file }}:{{ obs.source.line }}{% endif %} — {{ obs.created_at }} - {% if obs.labels %} + {% if obs.group_ids %}
labels: {{ obs.labels|join(", ") }}groups: {{ obs.group_ids|join(", ") }}
{% endif %} diff --git a/crates/observation-tools-shared/src/error.rs b/crates/observation-tools-shared/src/error.rs index 0467655..8187633 100644 --- a/crates/observation-tools-shared/src/error.rs +++ b/crates/observation-tools-shared/src/error.rs @@ -12,6 +12,10 @@ pub enum Error { /// Invalid observation ID format #[error("Invalid observation ID: {0}")] InvalidObservationId(uuid::Error), + + /// Invalid payload ID format + #[error("Invalid payload ID: {0}")] + InvalidPayloadId(uuid::Error), } /// Result type for shared crate operations diff --git a/crates/observation-tools-shared/src/group_id.rs b/crates/observation-tools-shared/src/group_id.rs new file mode 100644 index 0000000..4c170a7 --- /dev/null +++ b/crates/observation-tools-shared/src/group_id.rs @@ -0,0 +1,57 @@ +use serde::Deserialize; +use serde::Serialize; +use utoipa::ToSchema; + +/// Unique identifier for a group +/// +/// Group IDs are user-provided strings. By default, a UUID v7 string is generated, +/// but any string value is accepted. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, ToSchema)] +#[serde(transparent)] +#[schema(value_type = String, example = "018e9a3a2c1b7e3f8d2a4b5c6d7e8f9b")] +pub struct GroupId(String); + +impl GroupId { + /// Generate a new UUIDv7 group ID + pub fn new() -> Self { + Self(uuid::Uuid::now_v7().as_simple().to_string()) + } + + /// Create a nil group ID for placeholder use + pub fn nil() -> Self { + Self(String::new()) + } + + /// Parse from a string (accepts any string value) + pub fn parse(s: &str) -> Self { + Self(s.to_string()) + } + + /// Create a GroupId from a deterministic u64 value (e.g., tracing span IDs) + pub fn from_u64(value: u64) -> Self { + Self(value.to_string()) + } +} + +impl Default for GroupId { + fn default() -> Self { + Self::new() + } +} + +impl std::fmt::Display for GroupId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl From for crate::ObservationId { + fn from(group_id: GroupId) -> Self { + // If the group ID happens to be a valid UUID, use it directly + if let Ok(id) = crate::ObservationId::parse(&group_id.0) { + return id; + } + // Otherwise generate a new ObservationId + crate::ObservationId::new() + } +} diff --git a/crates/observation-tools-shared/src/lib.rs b/crates/observation-tools-shared/src/lib.rs index 2340f02..5cdb16b 100644 --- a/crates/observation-tools-shared/src/lib.rs +++ b/crates/observation-tools-shared/src/lib.rs @@ -2,11 +2,14 @@ pub mod error; pub mod models; +mod group_id; mod observation; mod payload; +mod payload_id; pub use error::Error; pub use error::Result; +pub use group_id::GroupId; pub use models::Execution; pub use models::ExecutionId; pub use observation::LogLevel; @@ -16,7 +19,9 @@ pub use observation::ObservationType; pub use observation::SourceInfo; pub use payload::Markdown; pub use payload::Payload; +pub use payload::PayloadBuilder; pub use payload::MIME_TYPE_RUST_DEBUG; +pub use payload_id::PayloadId; /// Payload size threshold for blob storage (64KB) /// Payloads larger than this will be uploaded as separate blobs @@ -26,10 +31,10 @@ pub const BLOB_THRESHOLD_BYTES: usize = 65536; pub const BATCH_SIZE: usize = 100; /// Estimated maximum observation metadata overhead (in bytes) -/// This includes JSON structure, field names, IDs, timestamps, labels, etc. +/// This includes JSON structure, field names, IDs, timestamps, group_ids, etc. /// Assumes reasonable limits: /// - name: ~256 bytes -/// - labels: ~10 labels * ~100 bytes = 1KB +/// - group_ids: ~10 groups * ~40 bytes = 400 bytes /// - metadata: ~10 entries * ~200 bytes = 2KB /// - source info: ~256 bytes /// - IDs, timestamps, JSON structure: ~512 bytes diff --git a/crates/observation-tools-shared/src/observation.rs b/crates/observation-tools-shared/src/observation.rs index eb9fa9a..53101ce 100644 --- a/crates/observation-tools-shared/src/observation.rs +++ b/crates/observation-tools-shared/src/observation.rs @@ -1,3 +1,4 @@ +use crate::group_id::GroupId; use crate::ExecutionId; use chrono::DateTime; use chrono::Utc; @@ -78,10 +79,13 @@ pub struct Observation { #[serde(default)] pub metadata: HashMap, - /// Hierarchical labels for grouping observations - /// Uses path convention (e.g., "api/request/headers") + /// IDs of groups this observation belongs to #[serde(default)] - pub labels: Vec, + pub group_ids: Vec, + + /// Parent group ID (used when observation_type == Group) + #[serde(skip_serializing_if = "Option::is_none")] + pub parent_group_id: Option, /// Parent span ID (for tracing integration) #[serde(skip_serializing_if = "Option::is_none")] @@ -89,12 +93,6 @@ pub struct Observation { /// When this observation was created pub created_at: DateTime, - - /// MIME type of the payload (e.g., "text/plain", "application/json") - pub mime_type: String, - - /// Size of the payload in bytes - pub payload_size: usize, } /// Type of observation @@ -103,6 +101,7 @@ pub enum ObservationType { LogEntry, Payload, Span, + Group, } /// Log level for observations diff --git a/crates/observation-tools-shared/src/payload.rs b/crates/observation-tools-shared/src/payload.rs index e8b9d68..8306eb0 100644 --- a/crates/observation-tools-shared/src/payload.rs +++ b/crates/observation-tools-shared/src/payload.rs @@ -112,3 +112,54 @@ impl From for Payload { Payload::with_mime_type(md.content, "text/markdown") } } + +/// Builder for creating named payloads to attach to observations. +/// +/// Each `PayloadBuilder` pairs a name with a `Payload`, allowing observations +/// to carry multiple named payloads (e.g., "headers", "body", "response"). +pub struct PayloadBuilder { + pub name: String, + pub payload: Payload, +} + +impl PayloadBuilder { + /// Create a new named payload + pub fn new(name: impl Into, payload: impl Into) -> Self { + Self { + name: name.into(), + payload: payload.into(), + } + } + + /// Create a named payload from a serde-serializable value (JSON) + pub fn json(name: impl Into, value: &T) -> Self { + Self { + name: name.into(), + payload: Payload::json(serde_json::to_string(value).unwrap_or_default()), + } + } + + /// Create a named payload from a Debug-formatted value + pub fn debug(name: impl Into, value: &T) -> Self { + Self { + name: name.into(), + payload: Payload::debug(format!("{:#?}", value)), + } + } + + /// Create a named plain text payload + pub fn text(name: impl Into, data: impl Into) -> Self { + Self { + name: name.into(), + payload: Payload::text(data), + } + } + + /// Create a named markdown payload + pub fn markdown(name: impl Into, content: impl Into) -> Self { + Self { + name: name.into(), + payload: Payload::with_mime_type(content, "text/markdown"), + } + } +} diff --git a/crates/observation-tools-shared/src/payload_id.rs b/crates/observation-tools-shared/src/payload_id.rs new file mode 100644 index 0000000..3e128c6 --- /dev/null +++ b/crates/observation-tools-shared/src/payload_id.rs @@ -0,0 +1,44 @@ +use serde::Deserialize; +use serde::Serialize; +use utoipa::ToSchema; +use uuid::Uuid; + +/// Unique identifier for a payload (UUIDv7) +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Deserialize, ToSchema)] +#[serde(transparent)] +#[schema(value_type = String, example = "018e9a3a2c1b7e3f8d2a4b5c6d7e8f9c")] +pub struct PayloadId(Uuid); + +impl Serialize for PayloadId { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(&self.to_string()) + } +} + +impl PayloadId { + /// Generate a new UUIDv7 payload ID + pub fn new() -> Self { + Self(Uuid::now_v7()) + } + + /// Parse from a string + pub fn parse(s: &str) -> crate::Result { + let uuid = Uuid::parse_str(s).map_err(crate::Error::InvalidPayloadId)?; + Ok(Self(uuid)) + } +} + +impl Default for PayloadId { + fn default() -> Self { + Self::new() + } +} + +impl std::fmt::Display for PayloadId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0.as_simple()) + } +}