Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 8 additions & 15 deletions crates/observation-tools-client/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@
"description": "Whether there are more results available",
"type": "boolean"
},
"next_page_token": {
"description": "Token for the next page (cursor-based pagination)",
"nullable": true,
"type": "string"
},
"observations": {
"description": "List of observations",
"items": {
Expand Down Expand Up @@ -519,25 +524,13 @@
}
},
{
"description": "Maximum number of results to return",
"description": "Cursor token for the next page",
"in": "query",
"name": "limit",
"name": "page_token",
"required": false,
"schema": {
"minimum": 0,
"nullable": true,
"type": "integer"
}
},
{
"description": "Number of results to skip (for pagination)",
"in": "query",
"name": "offset",
"required": false,
"schema": {
"minimum": 0,
"nullable": true,
"type": "integer"
"type": "string"
}
}
],
Expand Down
4 changes: 2 additions & 2 deletions crates/observation-tools-client/src/axum/request_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ where
.metadata("method", parts.method.to_string())
.metadata("uri", parts.uri.to_string())
.execution(&execution)
.named_payload("headers", headers_payload)
.named_raw_payload("headers", headers_payload)
.payload("body", bytes_to_payload(&request_body_bytes, &parts.headers));

let response = inner
Expand All @@ -277,7 +277,7 @@ where
.metadata("status", &parts.status.as_u16().to_string())
.log_level(log_level)
.execution(&execution)
.named_payload("headers", resp_headers_payload);
.named_raw_payload("headers", resp_headers_payload);

// Wrap the response body in a streaming observer that captures data as it flows
// through and adds the body payload when the stream completes
Expand Down
6 changes: 3 additions & 3 deletions crates/observation-tools-client/src/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl GroupBuilder {
let observation_id = ObservationId::new();

let group_handle = GroupHandle {
group_id,
group_id: group_id.clone(),
execution_id: execution.id(),
uploader_tx: execution.uploader_tx.clone(),
base_url: execution.base_url().to_string(),
Expand All @@ -126,8 +126,8 @@ impl GroupBuilder {
.with_id(observation_id)
.observation_type(ObservationType::Group)
.log_level(self.log_level.unwrap_or(LogLevel::Info))
.execution(execution)
.group(&group_handle);
.with_group_id(group_id.clone())
.execution(execution);

if let Some(source) = self.source {
builder = builder.source(source.file, source.line);
Expand Down
17 changes: 12 additions & 5 deletions crates/observation-tools-client/src/observation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ impl ObservationBuilder {
self
}

/// Add a raw group ID to the observation (used for Group observations to
/// record their own ID)
pub(crate) fn with_group_id(mut self, id: GroupId) -> Self {
self.group_ids.push(id);
self
}

/// Add metadata to the observation
pub fn metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.metadata.insert(key.into(), value.into());
Expand Down Expand Up @@ -165,7 +172,7 @@ impl ObservationBuilder {

/// Send the observation with a named serde-serialized payload, returning a
/// handle that allows adding more named payloads later.
pub fn named_serde<T: ?Sized + Serialize + 'static>(
pub fn named_payload<T: ?Sized + Serialize + 'static>(
self,
name: impl Into<String>,
value: &T,
Expand All @@ -187,13 +194,13 @@ impl ObservationBuilder {
self.send_named_observation(name, payload)
}

/// Send the observation with a named payload
pub fn named_payload(
/// Send the observation with a named raw payload
pub fn named_raw_payload(
self,
name: impl Into<String>,
payload: impl Into<Payload>,
payload: Payload,
) -> ObservationPayloadHandle {
self.send_named_observation(name, payload.into())
self.send_named_observation(name, payload)
}

fn send_named_observation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ async fn test_named_payloads() -> anyhow::Result<()> {
// Create an observation with a named payload, then add more payloads via the handle
let handle = ObservationBuilder::new("multi-payload-obs")
.metadata("kind", "multi")
.named_serde("headers", &json!({"content-type": "application/json"}));
.named_payload("headers", &json!({"content-type": "application/json"}));

handle.serde("body", &json!({"message": "hello"}));
handle.payload(
Expand Down
3 changes: 3 additions & 0 deletions crates/observation-tools-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ uuid.workspace = true
pulldown-cmark.workspace = true
ammonia.workspace = true

[dev-dependencies]
tempfile.workspace = true

[build-dependencies]
minijinja-embed.workspace = true
serde_json.workspace = true
Expand Down
20 changes: 10 additions & 10 deletions crates/observation-tools-server/src/api/observations/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ pub struct ListObservationsResponse {

/// Whether there are more results available
pub has_next_page: bool,

/// Token for the next page (cursor-based pagination)
#[serde(skip_serializing_if = "Option::is_none")]
pub next_page_token: Option<String>,
}

/// List observations for an execution
Expand All @@ -45,21 +49,17 @@ pub async fn list_observations(
Query(query): Query<ListObservationsQuery>,
) -> Result<Json<ListObservationsResponse>, AppError> {
let execution_id = ExecutionId::parse(&execution_id)?;
let limit = query.limit.unwrap_or(100);
let mut observations = metadata
// Fetch one extra to see if there's a next page
.list_observations(execution_id, Some(limit + 1), query.offset, None)
let page = metadata
.get_observations(execution_id, query.page_token)
.await?;
let has_next_page = observations.len() > limit;
if has_next_page {
// Remove the extra record fetched
observations.pop();
}
let has_next_page = page.pagination.next_page_token.is_some();
Ok(Json(ListObservationsResponse {
observations: observations
observations: page
.observations
.into_iter()
.map(GetObservation::new)
.collect(),
has_next_page,
next_page_token: page.pagination.next_page_token,
}))
}
10 changes: 3 additions & 7 deletions crates/observation-tools-server/src/api/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,12 @@ pub struct CreateObservationsRequest {
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct CreateObservationsResponse {}

/// Query parameters for listing observations
/// Query parameters for listing observations (cursor-based pagination)
#[derive(Debug, Clone, Serialize, Deserialize, Default, ToSchema, IntoParams)]
pub struct ListObservationsQuery {
/// Maximum number of results to return
#[serde(skip_serializing_if = "Option::is_none")]
pub limit: Option<usize>,

/// Number of results to skip (for pagination)
/// Cursor token for the next page
#[serde(skip_serializing_if = "Option::is_none")]
pub offset: Option<usize>,
pub page_token: Option<String>,
}

// ============================================================================
Expand Down
Loading
Loading