Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 85 additions & 28 deletions crates/observation-tools-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ use crate::error::Result;
use crate::execution::BeginExecution;
use crate::execution::ExecutionHandle;
use crate::observation_handle::ObservationHandle;
use crate::ObservationWithPayload;
use async_channel;
use log::error;
use log::info;
use log::trace;
use napi_derive::napi;
use observation_tools_shared::models::Execution;
use observation_tools_shared::Observation;
// Re-export constants from shared crate for convenience
pub use observation_tools_shared::BATCH_SIZE;
pub use observation_tools_shared::BLOB_THRESHOLD_BYTES;
Expand All @@ -31,11 +31,18 @@ pub(crate) enum UploaderMessage {
handle: ExecutionHandle,
uploaded_tx: tokio::sync::watch::Sender<ExecutionUploadResult>,
},
Observations {
observations: Vec<ObservationWithPayload>,
Observation {
observation: Observation,
handle: ObservationHandle,
uploaded_tx: tokio::sync::watch::Sender<ObservationUploadResult>,
},
Payload {
observation_id: observation_tools_shared::ObservationId,
execution_id: observation_tools_shared::models::ExecutionId,
payload_id: observation_tools_shared::PayloadId,
name: String,
payload: observation_tools_shared::Payload,
},
Flush,
Shutdown,
}
Expand All @@ -48,15 +55,26 @@ impl std::fmt::Debug for UploaderMessage {
.debug_struct("Execution")
.field("execution", execution)
.finish(),
Self::Observations {
observations,
Self::Observation {
observation,
handle,
..
} => f
.debug_struct("Observations")
.field("observations", observations)
.debug_struct("Observation")
.field("observation", observation)
.field("handle", handle)
.finish(),
Self::Payload {
observation_id,
payload_id,
name,
..
} => f
.debug_struct("Payload")
.field("observation_id", observation_id)
.field("payload_id", payload_id)
.field("name", name)
.finish(),
Self::Flush => write!(f, "Flush"),
Self::Shutdown => write!(f, "Shutdown"),
}
Expand Down Expand Up @@ -279,6 +297,18 @@ impl ClientBuilder {
}
}

/// Data for a payload ready to be uploaded
#[derive(Debug)]
pub(crate) struct PayloadUploadData {
pub(crate) observation_id: observation_tools_shared::ObservationId,
pub(crate) execution_id: observation_tools_shared::models::ExecutionId,
pub(crate) payload_id: observation_tools_shared::PayloadId,
pub(crate) name: String,
pub(crate) mime_type: String,
pub(crate) size: usize,
pub(crate) data: Vec<u8>,
}

async fn uploader_task(
api_client: crate::server_client::Client,
rx: async_channel::Receiver<UploaderMessage>,
Expand All @@ -291,12 +321,14 @@ async fn uploader_task(
tokio::sync::watch::Sender<ObservationUploadResult>,
);

let flush_observations = async |buffer: &mut Vec<ObservationWithPayload>,
senders: &mut Vec<ObservationSender>| {
if buffer.is_empty() {
let flush = async |observation_buffer: &mut Vec<Observation>,
senders: &mut Vec<ObservationSender>,
payload_buffer: &mut Vec<PayloadUploadData>| {
if observation_buffer.is_empty() && payload_buffer.is_empty() {
return;
}
let result = upload_observations(&api_client, buffer.drain(..).collect()).await;
let result =
upload_observations(&api_client, observation_buffer.drain(..).collect(), payload_buffer.drain(..).collect()).await;
match result {
Ok(()) => {
// Signal all senders that observations were uploaded successfully
Expand All @@ -314,8 +346,9 @@ async fn uploader_task(
}
}
};
let mut observation_buffer: Vec<ObservationWithPayload> = Vec::new();
let mut observation_buffer: Vec<Observation> = Vec::new();
let mut sender_buffer: Vec<ObservationSender> = Vec::new();
let mut payload_buffer: Vec<PayloadUploadData> = Vec::new();
loop {
let msg = rx.recv().await.ok();
match msg {
Expand All @@ -327,7 +360,6 @@ async fn uploader_task(
let result = upload_execution(&api_client, execution).await;
match result {
Ok(()) => {
// Signal successful upload with handle
let _ = uploaded_tx.send(Some(Ok(handle)));
}
Err(e) => {
Expand All @@ -337,22 +369,36 @@ async fn uploader_task(
}
}
}
Some(UploaderMessage::Observations {
observations,
Some(UploaderMessage::Observation {
observation,
handle,
uploaded_tx,
}) => {
observation_buffer.extend(observations);
observation_buffer.push(observation);
sender_buffer.push((handle, uploaded_tx));
if observation_buffer.len() >= BATCH_SIZE {
flush_observations(&mut observation_buffer, &mut sender_buffer).await;
}
}
Some(UploaderMessage::Payload {
observation_id,
execution_id,
payload_id,
name,
payload,
}) => {
payload_buffer.push(PayloadUploadData {
observation_id,
execution_id,
payload_id,
name,
mime_type: payload.mime_type,
size: payload.size,
data: payload.data,
});
}
Some(UploaderMessage::Flush) => {
flush_observations(&mut observation_buffer, &mut sender_buffer).await;
flush(&mut observation_buffer, &mut sender_buffer, &mut payload_buffer).await;
}
Some(UploaderMessage::Shutdown) | None => {
flush_observations(&mut observation_buffer, &mut sender_buffer).await;
flush(&mut observation_buffer, &mut sender_buffer, &mut payload_buffer).await;
break;
}
}
Expand Down Expand Up @@ -383,31 +429,42 @@ async fn upload_execution(

async fn upload_observations(
client: &crate::server_client::Client,
observations: Vec<ObservationWithPayload>,
observations: Vec<Observation>,
payloads: Vec<PayloadUploadData>,
) -> Result<()> {
if observations.is_empty() {
if observations.is_empty() && payloads.is_empty() {
return Ok(());
}

// Group by execution_id
let mut by_execution: std::collections::HashMap<_, Vec<_>> = std::collections::HashMap::new();
let mut by_execution: std::collections::HashMap<_, (Vec<_>, Vec<_>)> =
std::collections::HashMap::new();
for obs in observations {
by_execution
.entry(obs.observation.execution_id)
.entry(obs.execution_id)
.or_default()
.0
.push(obs);
}
for p in payloads {
by_execution
.entry(p.execution_id)
.or_default()
.1
.push(p);
}

// Upload each batch via multipart form
for (execution_id, observations) in by_execution {
for (execution_id, (observations, payloads)) in by_execution {
trace!(
"Uploading {} observations for execution {}",
"Uploading {} observations + {} payloads for execution {}",
observations.len(),
payloads.len(),
execution_id
);

client
.create_observations_multipart(&execution_id.to_string(), observations)
.create_observations_multipart(&execution_id.to_string(), observations, payloads)
.await
.map_err(|e| crate::error::Error::Config(e.to_string()))?;
}
Expand Down
8 changes: 1 addition & 7 deletions crates/observation-tools-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ pub use observation_handle::SendObservation;
// Re-export procedural macro
pub use observation_tools_macros::observe;
// Re-export from shared for convenience
use observation_tools_shared::Observation;
pub use observation_tools_shared::Payload;
pub use observation_tools_shared::PayloadBuilder;

/// Register a global execution shared across all threads
///
Expand Down Expand Up @@ -64,9 +64,3 @@ pub fn current_execution() -> Option<ExecutionHandle> {
pub fn clear_global_execution() {
context::clear_global_execution()
}

#[derive(Debug)]
struct ObservationWithPayload {
observation: Observation,
payload: Payload,
}
2 changes: 1 addition & 1 deletion crates/observation-tools-client/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl Log for ObservationLogger {
let builder = ObservationBuilder::new("ObservationLogger")
.observation_type(ObservationType::LogEntry)
.log_level(record.level().into())
.label(format!("log/{}", record.target()));
.metadata("target", record.target());
let builder = if let (Some(file), Some(line)) = (record.file(), record.line()) {
builder.source(file, line)
} else {
Expand Down
64 changes: 35 additions & 29 deletions crates/observation-tools-client/src/observation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ use crate::execution::ExecutionHandle;
use crate::observation_handle::ObservationHandle;
use crate::observation_handle::SendObservation;
use crate::Error;
use crate::ObservationWithPayload;
use napi_derive::napi;
use observation_tools_shared::GroupId;
use observation_tools_shared::LogLevel;
use observation_tools_shared::Markdown;
use observation_tools_shared::Observation;
use observation_tools_shared::ObservationId;
use observation_tools_shared::ObservationType;
use observation_tools_shared::Payload;
use observation_tools_shared::PayloadId;
use observation_tools_shared::SourceInfo;
use serde::Serialize;
use std::any::TypeId;
Expand All @@ -32,7 +33,7 @@ use std::fmt::Debug;
#[napi]
pub struct ObservationBuilder {
name: String,
labels: Vec<String>,
group_ids: Vec<GroupId>,
metadata: HashMap<String, String>,
source: Option<SourceInfo>,
parent_span_id: Option<String>,
Expand All @@ -49,7 +50,7 @@ impl ObservationBuilder {
pub fn new<T: AsRef<str>>(name: T) -> Self {
Self {
name: name.as_ref().to_string(),
labels: Vec::new(),
group_ids: Vec::new(),
metadata: HashMap::new(),
source: None,
parent_span_id: None,
Expand Down Expand Up @@ -78,18 +79,6 @@ impl ObservationBuilder {
self
}

/// Add a label to the observation
pub fn label(mut self, label: impl Into<String>) -> Self {
self.labels.push(label.into());
self
}

/// Add multiple labels to the observation
pub fn labels(mut self, labels: impl IntoIterator<Item = impl Into<String>>) -> Self {
self.labels.extend(labels.into_iter().map(|l| l.into()));
self
}

/// Add metadata to the observation
pub fn metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.metadata.insert(key.into(), value.into());
Expand Down Expand Up @@ -155,7 +144,7 @@ impl ObservationBuilder {
}

/// Internal method to build and send the observation
fn send_observation(mut self, payload: Payload) -> SendObservation {
pub(crate) fn send_observation(mut self, payload: Payload) -> SendObservation {
let Some(execution) = self.execution.take().or_else(context::get_current_execution) else {
log::error!(
"No execution context available for observation '{}'",
Expand All @@ -167,11 +156,22 @@ impl ObservationBuilder {
self.send_with_execution(payload, &execution)
}

/// Internal method to build and send the observation with a resolved execution
/// Internal method to build and send the observation with a resolved execution.
/// Sends the default payload with name "default".
fn send_with_execution(
self,
payload: Payload,
execution: &ExecutionHandle,
) -> SendObservation {
self.send_with_execution_and_name(payload, "default", execution)
}

/// Internal method to build and send the observation with a named payload.
fn send_with_execution_and_name(
self,
payload: Payload,
payload_name: impl Into<String>,
execution: &ExecutionHandle,
) -> SendObservation {
let observation_id = self.custom_id.unwrap_or_else(ObservationId::new);

Expand All @@ -196,13 +196,12 @@ impl ObservationBuilder {
name: self.name,
observation_type: self.observation_type,
log_level: self.log_level,
labels: self.labels,
group_ids: self.group_ids,
parent_group_id: None,
metadata: self.metadata,
source: self.source,
parent_span_id,
created_at: chrono::Utc::now(),
mime_type: payload.mime_type.clone(),
payload_size: payload.size,
};

let (uploaded_tx, uploaded_rx) = tokio::sync::watch::channel::<ObservationUploadResult>(None);
Expand All @@ -215,13 +214,11 @@ impl ObservationBuilder {
observation_id
);

// Send observation metadata
if let Err(e) = execution
.uploader_tx
.try_send(UploaderMessage::Observations {
observations: vec![ObservationWithPayload {
observation,
payload,
}],
.try_send(UploaderMessage::Observation {
observation,
handle: handle.clone(),
uploaded_tx,
})
Expand All @@ -230,6 +227,15 @@ impl ObservationBuilder {
return SendObservation::stub(Error::ChannelClosed);
}

// Send the payload as a separate message
let _ = execution.uploader_tx.try_send(UploaderMessage::Payload {
observation_id,
execution_id: execution.id(),
payload_id: PayloadId::new(),
name: payload_name.into(),
payload,
});

SendObservation::new(handle, uploaded_rx)
}
}
Expand Down Expand Up @@ -275,10 +281,10 @@ impl ObservationBuilder {
Ok(self)
}

/// Add a label to the observation
#[napi(js_name = "label")]
pub fn label_napi(&mut self, label: String) -> &Self {
self.labels.push(label);
/// Add a group to the observation by group ID string
#[napi(js_name = "group")]
pub fn group_napi(&mut self, group_id: String) -> &Self {
self.group_ids.push(GroupId::from(group_id));
self
}

Expand Down
Loading
Loading