Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions app-server/src/db/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ impl Event {
// 8 bytes for timestamp,
return 16 + 16 + 16 + 8 + 8 + self.name.len() + estimate_json_size(&self.attributes);
}

pub fn is_exception(&self) -> bool {
self.name.trim().eq_ignore_ascii_case("exception")
&& (self.attributes.get("exception.message").is_some()
|| self.attributes.get("exception.type").is_some())
}
}

impl Event {
Expand Down
22 changes: 14 additions & 8 deletions app-server/src/quickwit/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use std::{env, sync::Arc};
use anyhow::anyhow;
use tokio::sync::Mutex;
use tonic::transport::{Channel, Endpoint};
use tracing::instrument;

use super::{
QuickwitIndexedSpan,
doc_batch::build_json_doc_batch,
proto::ingest_service::{
CommitType, DocBatch, IngestRequest, ingest_service_client::IngestServiceClient,
Expand Down Expand Up @@ -65,12 +65,13 @@ impl QuickwitClient {
&self.inner.ingest_endpoint
}

pub async fn ingest(
#[instrument(skip(self, docs))]
pub async fn ingest<T: serde::Serialize>(
&self,
index_id: &str,
spans: &[QuickwitIndexedSpan],
docs: &[T],
) -> anyhow::Result<()> {
let doc_batch = build_doc_batch(index_id, spans)?;
let doc_batch = build_doc_batch(index_id, docs)?;
let request = IngestRequest {
doc_batches: vec![doc_batch],
commit: CommitType::Auto as i32,
Expand All @@ -84,6 +85,7 @@ impl QuickwitClient {
.map_err(|status| anyhow!("Quickwit ingest request failed: {status}"))
}

#[instrument(skip(self))]
pub async fn search_spans(
&self,
query_body: serde_json::Value,
Expand Down Expand Up @@ -115,11 +117,15 @@ impl QuickwitClient {
}
}

pub fn build_doc_batch(index_id: &str, spans: &[QuickwitIndexedSpan]) -> anyhow::Result<DocBatch> {
build_json_doc_batch(index_id, spans).map_err(|err| {
#[instrument(skip(docs))]
pub fn build_doc_batch<T: serde::Serialize>(
index_id: &str,
docs: &[T],
) -> anyhow::Result<DocBatch> {
build_json_doc_batch(index_id, docs).map_err(|err| {
anyhow!(
"Failed to encode spans for Quickwit ingestion ({} docs): {}",
spans.len(),
"Failed to encode documents for Quickwit ingestion ({} docs): {}",
docs.len(),
err
)
})
Expand Down
74 changes: 59 additions & 15 deletions app-server/src/quickwit/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ use tokio::time::Duration;
use crate::{
mq::{MessageQueue, MessageQueueDeliveryTrait, MessageQueueReceiverTrait, MessageQueueTrait},
quickwit::{
QuickwitIndexedSpan, SPANS_INDEXER_EXCHANGE, SPANS_INDEXER_QUEUE,
IndexerQueuePayload, SPANS_INDEXER_EXCHANGE, SPANS_INDEXER_QUEUE,
SPANS_INDEXER_ROUTING_KEY, client::QuickwitClient,
},
};

const QUICKWIT_SPANS_DEFAULT_INDEX_ID: &str = "spans";
const QUICKWIT_EVENTS_DEFAULT_INDEX_ID: &str = "events";

/// Extract text content from a JSON value for searchability.
/// Recursively extracts all string values and keys, avoiding double-encoding.
Expand Down Expand Up @@ -118,8 +119,13 @@ async fn inner_process_spans_indexer_queue(
let acker = delivery.acker();
let payload = delivery.data();

let mut indexed_spans: Vec<QuickwitIndexedSpan> = match serde_json::from_slice(&payload) {
Ok(spans) => spans,
let (mut indexed_spans, mut indexed_events) = match serde_json::from_slice(&payload) {
Ok(payload) => match payload {
IndexerQueuePayload::IndexerQueueMessage(message) => {
(message.spans, message.events)
}
IndexerQueuePayload::SpansOnly(spans) => (spans, vec![]),
},
Err(e) => {
log::error!(
"Failed to deserialize Quickwit span payload ({} bytes): {:?}",
Expand All @@ -136,7 +142,7 @@ async fn inner_process_spans_indexer_queue(
}
};

if indexed_spans.is_empty() {
if indexed_spans.is_empty() && indexed_events.is_empty() {
if let Err(e) = acker.ack().await {
log::error!(
"Failed to ack empty Quickwit indexing batch delivery: {:?}",
Expand All @@ -146,32 +152,70 @@ async fn inner_process_spans_indexer_queue(
continue;
}

for span in &mut indexed_spans {
indexed_spans.iter_mut().for_each(|span| {
// Extract text content from JSON value for searchability
// This avoids double-encoding: we want plain text, not a JSON-encoded string
let attributes_text = extract_text_from_json_value(&span.attributes);
let attributes_text = attributes_text.replace('{', " { ").replace('}', " } ");
span.attributes = serde_json::Value::String(attributes_text);
}
});
indexed_events.iter_mut().for_each(|event| {
// Extract text content from JSON value for searchability
// This avoids double-encoding: we want plain text, not a JSON-encoded string
let attributes_text = extract_text_from_json_value(&event.attributes);
let attributes_text = attributes_text.replace('{', " { ").replace('}', " } ");
event.attributes = serde_json::Value::String(attributes_text);
});

let index_id = std::env::var("QUICKWIT_SPANS_INDEX_ID")
let spans_index_id = std::env::var("QUICKWIT_SPANS_INDEX_ID")
.unwrap_or(QUICKWIT_SPANS_DEFAULT_INDEX_ID.to_string());
let events_index_id = std::env::var("QUICKWIT_EVENTS_INDEX_ID")
.unwrap_or(QUICKWIT_EVENTS_DEFAULT_INDEX_ID.to_string());

// Ingest spans if present
let spans_result = if !indexed_spans.is_empty() {
quickwit_client
.ingest(&spans_index_id, &indexed_spans)
.await
} else {
Ok(())
};

match quickwit_client.ingest(&index_id, &indexed_spans).await {
Ok(_) => {
// Ingest events if present
let events_result = if !indexed_events.is_empty() {
quickwit_client
.ingest(&events_index_id, &indexed_events)
.await
} else {
Ok(())
};

// Only ack if both ingests succeeded
match (spans_result, events_result) {
(Ok(_), Ok(_)) => {
if let Err(e) = acker.ack().await {
log::error!(
"Failed to ack Quickwit indexing delivery after ingest success: {:?}",
e
);
}
}
Err(e) => {
log::error!(
"Failed to ingest {} spans into Quickwit: {:?}",
indexed_spans.len(),
e
);
(spans_res, events_res) => {
// Log specific failures
if let Err(e) = spans_res {
log::error!(
"Failed to ingest {} spans into Quickwit: {:?}",
indexed_spans.len(),
e
);
}
if let Err(e) = events_res {
log::error!(
"Failed to ingest {} events into Quickwit: {:?}",
indexed_events.len(),
e
);
}

let _ = acker.reject(true).await.map_err(|reject_err| {
log::error!(
Expand Down
53 changes: 48 additions & 5 deletions app-server/src/quickwit/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
pub mod client;
pub mod consumer;
mod doc_batch;
pub mod producer;
mod proto;

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use uuid::Uuid;

use crate::db::events::Event;
use crate::db::spans::Span;
use crate::utils::json_value_to_string;

Expand Down Expand Up @@ -35,8 +42,44 @@ impl From<&Span> for QuickwitIndexedSpan {
}
}

pub mod client;
pub mod consumer;
pub mod doc_batch;
pub mod producer;
pub mod proto;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QuickwitIndexedEvent {
pub id: Uuid,
pub is_exception: bool,
pub span_id: Uuid,
pub project_id: Uuid,
pub trace_id: Uuid,
pub timestamp: DateTime<Utc>,
pub name: String,
pub attributes: Value,
}

impl From<&Event> for QuickwitIndexedEvent {
fn from(event: &Event) -> Self {
Self {
id: event.id,
is_exception: event.is_exception(),
span_id: event.span_id,
project_id: event.project_id,
trace_id: event.trace_id,
timestamp: event.timestamp,
name: event.name.clone(),
attributes: event.attributes.clone(),
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexerQueueMessage {
pub spans: Vec<QuickwitIndexedSpan>,
pub events: Vec<QuickwitIndexedEvent>,
}

// TODO: remove this once the change is merged and all items are removed
// from the queue, and send the inner struct from producer directly.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum IndexerQueuePayload {
SpansOnly(Vec<QuickwitIndexedSpan>),
IndexerQueueMessage(IndexerQueueMessage),
}
17 changes: 13 additions & 4 deletions app-server/src/quickwit/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,28 @@ use serde_json;

use crate::{
mq::{MessageQueue, MessageQueueTrait, utils::mq_max_payload},
quickwit::{QuickwitIndexedSpan, SPANS_INDEXER_EXCHANGE, SPANS_INDEXER_ROUTING_KEY},
quickwit::{
IndexerQueueMessage, IndexerQueuePayload, QuickwitIndexedEvent, QuickwitIndexedSpan,
SPANS_INDEXER_EXCHANGE, SPANS_INDEXER_ROUTING_KEY,
},
};

pub async fn publish_spans_for_indexing(
spans: &[QuickwitIndexedSpan],
events: &[QuickwitIndexedEvent],
queue: Arc<MessageQueue>,
) -> anyhow::Result<()> {
if spans.is_empty() {
if spans.is_empty() && events.is_empty() {
return Ok(());
}

let payload =
serde_json::to_vec(spans).context("Failed to serialize spans for Quickwit indexing")?;
let payload = IndexerQueueMessage {
spans: spans.to_vec(),
events: events.to_vec(),
};

let payload = serde_json::to_vec(&IndexerQueuePayload::IndexerQueueMessage(payload))
.context("Failed to serialize spans for Quickwit indexing")?;
let payload_size = payload.len();

let max_payload = mq_max_payload();
Expand Down
8 changes: 6 additions & 2 deletions app-server/src/traces/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::{
MessageQueueTrait,
},
pubsub::PubSub,
quickwit::{QuickwitIndexedSpan, producer::publish_spans_for_indexing},
quickwit::{QuickwitIndexedEvent, QuickwitIndexedSpan, producer::publish_spans_for_indexing},
storage::Storage,
traces::{
IngestedBytes,
Expand Down Expand Up @@ -380,7 +380,11 @@ async fn process_batch(

// Index spans in Quickwit
let quickwit_spans: Vec<QuickwitIndexedSpan> = spans.iter().map(|span| span.into()).collect();
if let Err(e) = publish_spans_for_indexing(&quickwit_spans, queue.clone()).await {
let quickwit_events: Vec<QuickwitIndexedEvent> =
all_events.iter().map(|event| event.into()).collect();
if let Err(e) =
publish_spans_for_indexing(&quickwit_spans, &quickwit_events, queue.clone()).await
{
log::error!(
"Failed to publish {} spans for Quickwit indexing: {:?}",
quickwit_spans.len(),
Expand Down
42 changes: 42 additions & 0 deletions frontend/lib/quickwit/indexes/events.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
version: "0.8"
index_id: events
doc_mapping:
field_mappings:
- name: is_exception
type: bool
fast: true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stored false or need it when querying?

stored: false
- name: project_id
type: text
tokenizer: raw
fast: true
stored: false
- name: trace_id
type: text
tokenizer: raw
fast: true
- name: span_id
type: text
tokenizer: raw
fast: false
- name: timestamp
type: datetime
fast: true
stored: false
- name: name
type: text
tokenizer: default
stored: false
- name: attributes
type: text
tokenizer: default
fast: false
stored: false
record: position
tag_fields: []
timestamp_field: timestamp
search_settings:
default_search_fields: [name, attributes]
retention:
period: 90 days
schedule: daily
2 changes: 1 addition & 1 deletion frontend/lib/quickwit/indexes/spans.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: "0.7"
version: "0.8"
index_id: spans
doc_mapping:
field_mappings:
Expand Down
3 changes: 2 additions & 1 deletion frontend/lib/quickwit/migrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ export async function initializeQuickwitIndexes(): Promise<void> {
const exists = await indexExists(indexId);

if (exists) {
// It's not possible to change most of the index configuration after
// it's created.
console.log(`✓ Index "${indexId}" already exists, skipping`);
// Optionally: check if update is needed (compare configs)
} else {
console.log(`Creating index "${indexId}"...`);
await createIndex(indexConfig);
Expand Down