Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions app-server/src/db/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ impl Event {
// 8 bytes for timestamp,
return 16 + 16 + 16 + 8 + 8 + self.name.len() + estimate_json_size(&self.attributes);
}

pub fn is_exception(&self) -> bool {
self.name.trim().eq_ignore_ascii_case("exception")
&& (self.attributes.get("exception.message").is_some()
|| self.attributes.get("exception.type").is_some())
}
}

impl Event {
Expand Down
22 changes: 14 additions & 8 deletions app-server/src/quickwit/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use std::{env, sync::Arc};
use anyhow::anyhow;
use tokio::sync::Mutex;
use tonic::transport::{Channel, Endpoint};
use tracing::instrument;

use super::{
QuickwitIndexedSpan,
doc_batch::build_json_doc_batch,
proto::ingest_service::{
CommitType, DocBatch, IngestRequest, ingest_service_client::IngestServiceClient,
Expand Down Expand Up @@ -65,12 +65,13 @@ impl QuickwitClient {
&self.inner.ingest_endpoint
}

pub async fn ingest(
#[instrument(skip(self, docs))]
pub async fn ingest<T: serde::Serialize>(
&self,
index_id: &str,
spans: &[QuickwitIndexedSpan],
docs: &[T],
) -> anyhow::Result<()> {
let doc_batch = build_doc_batch(index_id, spans)?;
let doc_batch = build_doc_batch(index_id, docs)?;
let request = IngestRequest {
doc_batches: vec![doc_batch],
commit: CommitType::Auto as i32,
Expand All @@ -84,6 +85,7 @@ impl QuickwitClient {
.map_err(|status| anyhow!("Quickwit ingest request failed: {status}"))
}

#[instrument(skip(self))]
pub async fn search_spans(
&self,
query_body: serde_json::Value,
Expand Down Expand Up @@ -115,11 +117,15 @@ impl QuickwitClient {
}
}

pub fn build_doc_batch(index_id: &str, spans: &[QuickwitIndexedSpan]) -> anyhow::Result<DocBatch> {
build_json_doc_batch(index_id, spans).map_err(|err| {
#[instrument(skip(docs))]
pub fn build_doc_batch<T: serde::Serialize>(
index_id: &str,
docs: &[T],
) -> anyhow::Result<DocBatch> {
build_json_doc_batch(index_id, docs).map_err(|err| {
anyhow!(
"Failed to encode spans for Quickwit ingestion ({} docs): {}",
spans.len(),
"Failed to encode documents for Quickwit ingestion ({} docs): {}",
docs.len(),
err
)
})
Expand Down
74 changes: 59 additions & 15 deletions app-server/src/quickwit/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ use tokio::time::Duration;
use crate::{
mq::{MessageQueue, MessageQueueDeliveryTrait, MessageQueueReceiverTrait, MessageQueueTrait},
quickwit::{
QuickwitIndexedSpan, SPANS_INDEXER_EXCHANGE, SPANS_INDEXER_QUEUE,
IndexerQueuePayload, SPANS_INDEXER_EXCHANGE, SPANS_INDEXER_QUEUE,
SPANS_INDEXER_ROUTING_KEY, client::QuickwitClient,
},
};

const QUICKWIT_SPANS_DEFAULT_INDEX_ID: &str = "spans";
const QUICKWIT_EVENTS_DEFAULT_INDEX_ID: &str = "events";

/// Extract text content from a JSON value for searchability.
/// Recursively extracts all string values and keys, avoiding double-encoding.
Expand Down Expand Up @@ -118,8 +119,13 @@ async fn inner_process_spans_indexer_queue(
let acker = delivery.acker();
let payload = delivery.data();

let mut indexed_spans: Vec<QuickwitIndexedSpan> = match serde_json::from_slice(&payload) {
Ok(spans) => spans,
let (mut indexed_spans, mut indexed_events) = match serde_json::from_slice(&payload) {
Ok(payload) => match payload {
IndexerQueuePayload::IndexerQueueMessage(message) => {
(message.spans, message.events)
}
IndexerQueuePayload::SpansOnly(spans) => (spans, vec![]),
},
Err(e) => {
log::error!(
"Failed to deserialize Quickwit span payload ({} bytes): {:?}",
Expand All @@ -136,7 +142,7 @@ async fn inner_process_spans_indexer_queue(
}
};

if indexed_spans.is_empty() {
if indexed_spans.is_empty() && indexed_events.is_empty() {
if let Err(e) = acker.ack().await {
log::error!(
"Failed to ack empty Quickwit indexing batch delivery: {:?}",
Expand All @@ -146,32 +152,70 @@ async fn inner_process_spans_indexer_queue(
continue;
}

for span in &mut indexed_spans {
indexed_spans.iter_mut().for_each(|span| {
// Extract text content from JSON value for searchability
// This avoids double-encoding: we want plain text, not a JSON-encoded string
let attributes_text = extract_text_from_json_value(&span.attributes);
let attributes_text = attributes_text.replace('{', " { ").replace('}', " } ");
span.attributes = serde_json::Value::String(attributes_text);
}
});
indexed_events.iter_mut().for_each(|event| {
// Extract text content from JSON value for searchability
// This avoids double-encoding: we want plain text, not a JSON-encoded string
let attributes_text = extract_text_from_json_value(&event.attributes);
let attributes_text = attributes_text.replace('{', " { ").replace('}', " } ");
event.attributes = serde_json::Value::String(attributes_text);
});

let index_id = std::env::var("QUICKWIT_SPANS_INDEX_ID")
let spans_index_id = std::env::var("QUICKWIT_SPANS_INDEX_ID")
.unwrap_or(QUICKWIT_SPANS_DEFAULT_INDEX_ID.to_string());
let events_index_id = std::env::var("QUICKWIT_EVENTS_INDEX_ID")
.unwrap_or(QUICKWIT_EVENTS_DEFAULT_INDEX_ID.to_string());

// Ingest spans if present
let spans_result = if !indexed_spans.is_empty() {
quickwit_client
.ingest(&spans_index_id, &indexed_spans)
.await
} else {
Ok(())
};

match quickwit_client.ingest(&index_id, &indexed_spans).await {
Ok(_) => {
// Ingest events if present
let events_result = if !indexed_events.is_empty() {
quickwit_client
.ingest(&events_index_id, &indexed_events)
.await
} else {
Ok(())
};

// Only ack if both ingests succeeded
match (spans_result, events_result) {
(Ok(_), Ok(_)) => {
if let Err(e) = acker.ack().await {
log::error!(
"Failed to ack Quickwit indexing delivery after ingest success: {:?}",
e
);
}
}
Err(e) => {
log::error!(
"Failed to ingest {} spans into Quickwit: {:?}",
indexed_spans.len(),
e
);
(spans_res, events_res) => {
// Log specific failures
if let Err(e) = spans_res {
log::error!(
"Failed to ingest {} spans into Quickwit: {:?}",
indexed_spans.len(),
e
);
}
if let Err(e) = events_res {
log::error!(
"Failed to ingest {} events into Quickwit: {:?}",
indexed_events.len(),
e
);
}

let _ = acker.reject(true).await.map_err(|reject_err| {
log::error!(
Expand Down
53 changes: 48 additions & 5 deletions app-server/src/quickwit/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
pub mod client;
pub mod consumer;
mod doc_batch;
pub mod producer;
mod proto;

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use uuid::Uuid;

use crate::db::events::Event;
use crate::db::spans::Span;
use crate::utils::json_value_to_string;

Expand Down Expand Up @@ -35,8 +42,44 @@ impl From<&Span> for QuickwitIndexedSpan {
}
}

pub mod client;
pub mod consumer;
pub mod doc_batch;
pub mod producer;
pub mod proto;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QuickwitIndexedEvent {
pub id: Uuid,
pub is_exception: bool,
pub span_id: Uuid,
pub project_id: Uuid,
pub trace_id: Uuid,
pub timestamp: DateTime<Utc>,
pub name: String,
pub attributes: Value,
}

impl From<&Event> for QuickwitIndexedEvent {
fn from(event: &Event) -> Self {
Self {
id: event.id,
is_exception: event.is_exception(),
span_id: event.span_id,
project_id: event.project_id,
trace_id: event.trace_id,
timestamp: event.timestamp,
name: event.name.clone(),
attributes: event.attributes.clone(),
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexerQueueMessage {
pub spans: Vec<QuickwitIndexedSpan>,
pub events: Vec<QuickwitIndexedEvent>,
}

// TODO: remove this once the change is merged and all items are removed
// from the queue, and send the inner struct from producer directly.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum IndexerQueuePayload {
SpansOnly(Vec<QuickwitIndexedSpan>),
IndexerQueueMessage(IndexerQueueMessage),
}
17 changes: 13 additions & 4 deletions app-server/src/quickwit/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,28 @@ use serde_json;

use crate::{
mq::{MessageQueue, MessageQueueTrait, utils::mq_max_payload},
quickwit::{QuickwitIndexedSpan, SPANS_INDEXER_EXCHANGE, SPANS_INDEXER_ROUTING_KEY},
quickwit::{
IndexerQueueMessage, IndexerQueuePayload, QuickwitIndexedEvent, QuickwitIndexedSpan,
SPANS_INDEXER_EXCHANGE, SPANS_INDEXER_ROUTING_KEY,
},
};

pub async fn publish_spans_for_indexing(
spans: &[QuickwitIndexedSpan],
events: &[QuickwitIndexedEvent],
queue: Arc<MessageQueue>,
) -> anyhow::Result<()> {
if spans.is_empty() {
if spans.is_empty() && events.is_empty() {
return Ok(());
}

let payload =
serde_json::to_vec(spans).context("Failed to serialize spans for Quickwit indexing")?;
let payload = IndexerQueueMessage {
spans: spans.to_vec(),
events: events.to_vec(),
};

let payload = serde_json::to_vec(&IndexerQueuePayload::IndexerQueueMessage(payload))
.context("Failed to serialize spans for Quickwit indexing")?;
let payload_size = payload.len();

let max_payload = mq_max_payload();
Expand Down
8 changes: 6 additions & 2 deletions app-server/src/traces/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::{
MessageQueueTrait,
},
pubsub::PubSub,
quickwit::{QuickwitIndexedSpan, producer::publish_spans_for_indexing},
quickwit::{QuickwitIndexedEvent, QuickwitIndexedSpan, producer::publish_spans_for_indexing},
storage::Storage,
traces::{
IngestedBytes,
Expand Down Expand Up @@ -380,7 +380,11 @@ async fn process_batch(

// Index spans in Quickwit
let quickwit_spans: Vec<QuickwitIndexedSpan> = spans.iter().map(|span| span.into()).collect();
if let Err(e) = publish_spans_for_indexing(&quickwit_spans, queue.clone()).await {
let quickwit_events: Vec<QuickwitIndexedEvent> =
all_events.iter().map(|event| event.into()).collect();
if let Err(e) =
publish_spans_for_indexing(&quickwit_spans, &quickwit_events, queue.clone()).await
{
log::error!(
"Failed to publish {} spans for Quickwit indexing: {:?}",
quickwit_spans.len(),
Expand Down
44 changes: 44 additions & 0 deletions frontend/lib/quickwit/indexes/events.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
version: "0.8"
index_id: events
doc_mapping:
field_mappings:
- name: is_exception
type: bool
fast: true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stored false or need it when querying?

- name: project_id
type: text
tokenizer: raw
fast: true
- name: trace_id
type: text
tokenizer: raw
fast: true
- name: span_id
type: text
tokenizer: raw
fast: true
- name: id
type: text
tokenizer: raw
fast: true
- name: timestamp
type: datetime
fast: true
stored: false
- name: name
type: text
tokenizer: default
stored: false
- name: attributes
type: text
tokenizer: default
fast: false
stored: false
record: position
tag_fields: []
timestamp_field: timestamp
search_settings:
default_search_fields: [name, attributes]
retention:
period: 90 days
schedule: daily
2 changes: 1 addition & 1 deletion frontend/lib/quickwit/indexes/spans.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: "0.7"
version: "0.8"
index_id: spans
doc_mapping:
field_mappings:
Expand Down
3 changes: 2 additions & 1 deletion frontend/lib/quickwit/migrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ export async function initializeQuickwitIndexes(): Promise<void> {
const exists = await indexExists(indexId);

if (exists) {
// It's not possible to change most of the index configuration after
// it's created.
console.log(`✓ Index "${indexId}" already exists, skipping`);
// Optionally: check if update is needed (compare configs)
} else {
console.log(`Creating index "${indexId}"...`);
await createIndex(indexConfig);
Expand Down