Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 43 additions & 117 deletions app-server/src/browser_events/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
use std::sync::Arc;

use async_trait::async_trait;
use backoff::ExponentialBackoffBuilder;
use serde::{Deserialize, Serialize};
use uuid::Uuid;

use crate::{
api::v1::browser_sessions::{
BROWSER_SESSIONS_EXCHANGE, BROWSER_SESSIONS_QUEUE, BROWSER_SESSIONS_ROUTING_KEY, EventBatch,
},
api::v1::browser_sessions::EventBatch,
cache::Cache,
ch::browser_events::insert_browser_events,
db::DB,
features::{Feature, is_feature_enabled},
mq::{MessageQueue, MessageQueueDeliveryTrait, MessageQueueReceiverTrait, MessageQueueTrait},
traces::limits::update_workspace_limit_exceeded_by_project_id,
worker::MessageHandler,
};

#[derive(Serialize, Deserialize, Clone)]
Expand All @@ -22,141 +21,68 @@ pub struct QueueBrowserEventMessage {
pub project_id: Uuid,
}

pub async fn process_browser_events(
db: Arc<DB>,
clickhouse: clickhouse::Client,
cache: Arc<Cache>,
browser_events_message_queue: Arc<MessageQueue>,
) {
loop {
inner_process_browser_events(
db.clone(),
clickhouse.clone(),
cache.clone(),
browser_events_message_queue.clone(),
)
.await;
}
/// Handler for browser events
pub struct BrowserEventHandler {
pub db: Arc<DB>,
pub clickhouse: clickhouse::Client,
pub cache: Arc<Cache>,
}

async fn inner_process_browser_events(
db: Arc<DB>,
clickhouse: clickhouse::Client,
cache: Arc<Cache>,
queue: Arc<MessageQueue>,
) {
// Add retry logic with exponential backoff for connection failures
let get_receiver = || async {
queue
.get_receiver(
BROWSER_SESSIONS_QUEUE,
BROWSER_SESSIONS_EXCHANGE,
BROWSER_SESSIONS_ROUTING_KEY,
)
.await
.map_err(|e| {
log::error!("Failed to get receiver from browser events queue: {:?}", e);
backoff::Error::transient(e)
})
};

let backoff = ExponentialBackoffBuilder::new()
.with_initial_interval(std::time::Duration::from_secs(1))
.with_max_interval(std::time::Duration::from_secs(60))
.with_max_elapsed_time(Some(std::time::Duration::from_secs(300))) // 5 minutes max
.build();

let mut receiver = match backoff::future::retry(backoff, get_receiver).await {
Ok(receiver) => {
log::info!("Successfully connected to browser events queue");
receiver
}
Err(e) => {
log::error!(
"Failed to connect to browser events queue after retries: {:?}",
e
);
return;
}
};

while let Some(delivery) = receiver.receive().await {
if let Err(e) = delivery {
log::error!("Failed to receive message from queue: {:?}", e);
continue;
}
let delivery = delivery.unwrap();
let acker = delivery.acker();
let message = match serde_json::from_slice::<QueueBrowserEventMessage>(&delivery.data()) {
Ok(message) => message,
Err(e) => {
log::error!("Failed to deserialize message from queue: {:?}", e);
let _ = acker.reject(false).await;
continue;
}
};
#[async_trait]
impl MessageHandler for BrowserEventHandler {
type Message = QueueBrowserEventMessage;

async fn handle(&self, message: Self::Message) -> Result<(), crate::worker::HandlerError> {
let project_id = message.project_id;
let batch = message.batch;

if batch.events.is_empty() {
continue;
return Ok(());
}

let insert_browser_events_fn = || async {
let bytes_written = insert_browser_events(&clickhouse, project_id, &batch).await.map_err(|e| {
log::error!("Failed attempt to insert browser events. Will retry according to backoff policy. Error: {:?}", e);
backoff::Error::transient(e)
})?;

Ok::<usize, backoff::Error<clickhouse::error::Error>>(bytes_written)
insert_browser_events(&self.clickhouse, project_id, &batch)
.await
.map_err(|e| {
log::error!(
"Failed attempt to insert browser events. Will retry: {:?}",
e
);
backoff::Error::transient(e)
})
};
// Starting with 1 second delay, delay multiplies by random factor between 1 and 2
// up to 1 minute and until the total elapsed time is 1 minute
// https://docs.rs/backoff/latest/backoff/default/index.html

let exponential_backoff = ExponentialBackoffBuilder::new()
.with_initial_interval(std::time::Duration::from_millis(1000))
.with_multiplier(1.5)
.with_randomization_factor(0.5)
.with_max_interval(std::time::Duration::from_secs(1 * 60))
.with_max_elapsed_time(Some(std::time::Duration::from_secs(1 * 60)))
.with_max_interval(std::time::Duration::from_secs(60))
.with_max_elapsed_time(Some(std::time::Duration::from_secs(60)))
.build();

match backoff::future::retry(exponential_backoff, insert_browser_events_fn).await {
Ok(bytes_written) => {
if let Err(e) = acker.ack().await {
log::error!("Failed to ack MQ delivery (browser events): {:?}", e);
}
let bytes_written = backoff::future::retry(exponential_backoff, insert_browser_events_fn)
.await
.map_err(|e| anyhow::anyhow!("Failed to insert browser events: {:?}", e))?;

// Update workspace limits cache
if is_feature_enabled(Feature::UsageLimit) {
if let Err(e) = update_workspace_limit_exceeded_by_project_id(
db.clone(),
clickhouse.clone(),
cache.clone(),
project_id,
bytes_written,
)
.await
{
log::error!(
"Failed to update workspace limit exceeded for project [{}]: {:?}",
project_id,
e
);
}
}
}
Err(e) => {
// Update workspace limits cache
if is_feature_enabled(Feature::UsageLimit) {
if let Err(e) = update_workspace_limit_exceeded_by_project_id(
self.db.clone(),
self.clickhouse.clone(),
self.cache.clone(),
project_id,
bytes_written,
)
.await
{
log::error!(
"Exhausted backoff retries. Failed to insert browser events: {:?}",
"Failed to update workspace limit exceeded for project [{}]: {:?}",
project_id,
e
);
// TODO: Implement proper nacks and DLX
if let Err(e) = acker.reject(false).await {
log::error!("Failed to reject MQ delivery (browser events): {:?}", e);
}
}
}

Ok(())
}
}
2 changes: 1 addition & 1 deletion app-server/src/cache/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub const PROJECT_API_KEY_CACHE_KEY: &str = "project_api_key";
pub const PROJECT_CACHE_KEY: &str = "project";
pub const WORKSPACE_LIMITS_CACHE_KEY: &str = "workspace_limits";
pub const PROJECT_EVALUATORS_BY_PATH_CACHE_KEY: &str = "project_evaluators_by_path";
pub const SUMMARY_TRIGGER_SPANS_CACHE_KEY: &str = "summary_trigger_spans";
pub const SEMANTIC_EVENT_TRIGGER_SPANS_CACHE_KEY: &str = "semantic_event_trigger_spans";
pub const PROJECT_EVENT_NAMES_CACHE_KEY: &str = "project_event_names";
pub const WORKSPACE_BYTES_USAGE_CACHE_KEY: &str = "workspace_bytes_usage";
pub const CLUSTERING_LOCK_CACHE_KEY: &str = "clustering_lock";
Expand Down
2 changes: 2 additions & 0 deletions app-server/src/ch/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct CHEvent {
pub user_id: String,
pub session_id: String,
pub size_bytes: u64,
pub source: String,
}

impl CHEvent {
Expand All @@ -43,6 +44,7 @@ impl CHEvent {
"lmnr.event.session_id",
),
size_bytes: event.estimate_size_bytes() as u64,
source: event.source.to_string(),
}
}

Expand Down
33 changes: 33 additions & 0 deletions app-server/src/db/event_cluster_configs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use anyhow::Result;
use sqlx::PgPool;
use uuid::Uuid;

use crate::db::events::EventSource;

#[derive(Debug, Clone, sqlx::FromRow)]
pub struct EventClusterConfig {
pub value_template: String,
}

/// Get event cluster config for a specific event name and project
pub async fn get_event_cluster_config(
pool: &PgPool,
project_id: Uuid,
event_name: &str,
source: EventSource,
) -> Result<Option<EventClusterConfig>> {
let config = sqlx::query_as::<_, EventClusterConfig>(
r#"
SELECT value_template
FROM event_cluster_configs
WHERE project_id = $1 AND event_name = $2 AND event_source = $3
"#,
)
.bind(project_id)
.bind(event_name)
.bind(source.to_string())
.fetch_optional(pool)
.await?;

Ok(config)
}
24 changes: 20 additions & 4 deletions app-server/src/db/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,43 @@ use crate::{

use crate::traces::utils::convert_any_value_to_json_value;

#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum EventSource {
#[serde(rename = "CODE")]
Code,
#[serde(rename = "SEMANTIC")]
Semantic,
}

impl std::fmt::Display for EventSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
EventSource::Code => write!(f, "CODE"),
EventSource::Semantic => write!(f, "SEMANTIC"),
}
}
}

#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Event {
pub id: Uuid,
pub span_id: Uuid,
pub project_id: Uuid,
pub created_at: DateTime<Utc>,
pub timestamp: DateTime<Utc>,
pub name: String,
pub attributes: Value,
pub trace_id: Uuid,
pub source: EventSource,
}

impl Event {
pub fn estimate_size_bytes(&self) -> usize {
// 16 bytes for id,
// 16 bytes for span_id,
// 16 bytes for project_id,
// 8 bytes for created_at,
// 8 bytes for timestamp,
return 16 + 16 + 16 + 8 + 8 + self.name.len() + estimate_json_size(&self.attributes);
return 16 + 16 + 16 + 8 + self.name.len() + estimate_json_size(&self.attributes);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Event size estimate missing new source field

The estimate_size_bytes function calculates event size for usage tracking but doesn't include the newly added source field. The old created_at field (8 bytes) was removed from the calculation, but the new source field (which stores "CODE" or "SEMANTIC") wasn't added. This causes size estimates to be slightly smaller than actual, affecting workspace usage limit calculations.

Fix in Cursor Fix in Web

}

Expand All @@ -46,11 +62,11 @@ impl Event {
id: Uuid::new_v4(),
span_id,
project_id,
created_at: Utc::now(),
timestamp: Utc.timestamp_nanos(event.time_unix_nano as i64),
name: event.name,
attributes: Value::Object(attributes),
trace_id,
source: EventSource::Code,
}
}
}
4 changes: 3 additions & 1 deletion app-server/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@ use sqlx::PgPool;
pub mod datasets;
pub mod evaluations;
pub mod evaluators;
pub mod event_cluster_configs;
pub mod event_definitions;
pub mod events;
pub mod prices;
pub mod project_api_keys;
pub mod projects;
pub mod semantic_event_definitions;
pub mod semantic_event_trigger_spans;
pub mod slack_channel_to_events;
pub mod slack_integrations;
pub mod spans;
pub mod stats;
pub mod summary_trigger_spans;
pub mod tags;
pub mod trace;
pub mod utils;
Expand Down
10 changes: 10 additions & 0 deletions app-server/src/db/semantic_event_definitions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;

/// Semantic event definition with prompt and schema (from definition or template)
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
pub struct SemanticEventDefinition {
pub name: String,
pub prompt: String,
pub structured_output_schema: Value,
}
Loading