diff --git a/.gitignore b/.gitignore index 83a00ee..a8aaf11 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target .env +AGENTS.md diff --git a/README.md b/README.md index 91cec7e..420f946 100644 --- a/README.md +++ b/README.md @@ -123,13 +123,13 @@ All endpoints return Loki-compatible JSON responses and reuse the same error sha | Endpoint | Description | | --------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | `GET /loki/api/v1/query` | Instant query. Supports the same LogQL used by Grafana's Explore panel. An optional `time` parameter (nanoseconds) defaults to "now", and the adapter automatically looks back 5 minutes when computing SQL bounds. | -| `GET /loki/api/v1/query_range` | Range query. Requires `start`/`end` nanoseconds and accepts `limit`/`step`. Log queries stream raw lines; metric queries return Loki matrix results and require a `step` value (the adapter may clamp it to keep bucket counts bounded, default cap 240 buckets). | +| `GET /loki/api/v1/query_range` | Range query. Accepts `start`/`end` (default past hour), `since` (relative duration), `limit`, `interval`, `step`, and `direction`. Log queries stream raw lines (`interval` down-samples entries, `direction` controls scan order); metric queries return Loki matrix results and require a `step` value (the adapter may clamp it to keep bucket counts bounded, default cap 240 buckets). | | `GET /loki/api/v1/labels` | Lists known label keys for the selected schema. Optional `start`/`end` parameters (nanoseconds) fence the search window; unspecified values default to the last 5 minutes, matching Grafana's Explore defaults. | | `GET /loki/api/v1/label/{label}/values` | Lists distinct values for a specific label key using the same optional `start`/`end` bounds as `/labels`. Works for both `loki` and `flat` schemas and automatically filters out empty strings. | | `GET /loki/api/v1/index/stats` | Returns approximate `streams`, `chunks`, `entries`, and `bytes` counters for a selector over a `[start, end]` window. `chunks` are estimated via unique stream keys because Databend does not store Loki chunks. | | `GET /loki/api/v1/tail` | WebSocket tail endpoint that streams live logs for a LogQL query; compatible with Grafana Explore and `logcli --tail`. | -`/query` and `/query_range` share the same LogQL parser and SQL builder. Instant queries fall back to `DEFAULT_LOOKBACK_NS` (5 minutes) when no explicit window is supplied, while range queries honor the caller's `start`/`end` bounds. `/labels` and `/label/{label}/values` delegate to schema-aware metadata lookups: the loki schema uses `map_keys`/`labels['key']` expressions, whereas the flat schema issues `SELECT DISTINCT` on the physical column and returns values in sorted order. +`/query` and `/query_range` share the same LogQL parser and SQL builder. Instant queries fall back to `DEFAULT_LOOKBACK_NS` (5 minutes) when no explicit window is supplied, while range queries default to `[now - 1h, now]` and also honor Loki's `since` helper to derive `start`. `/loki/api/v1/query_range` log queries fully implement Loki's `direction` (`forward`/`backward`) and `interval` parameters: the adapter scans in the requested direction, emits entries in that order, and down-samples each stream so successive log lines are at least `interval` apart starting from `start`. `/labels` and `/label/{label}/values` delegate to schema-aware metadata lookups: the loki schema uses `map_keys`/`labels['key']` expressions, whereas the flat schema issues `SELECT DISTINCT` on the physical column and returns values in sorted order. ### Tail streaming diff --git a/src/app/handlers.rs b/src/app/handlers.rs index e508450..6cb9dbd 100644 --- a/src/app/handlers.rs +++ b/src/app/handlers.rs @@ -41,13 +41,15 @@ use crate::{ use super::{ responses::{ - LabelsResponse, LokiResponse, ProcessedEntry, collect_processed_entries, - entries_to_streams, metric_matrix, metric_vector, rows_to_streams, tail_chunk, + LabelsResponse, LokiResponse, ProcessedEntry, StreamDirection, StreamOptions, + collect_processed_entries, entries_to_streams, metric_matrix, metric_vector, + rows_to_streams, tail_chunk, }, state::{AppState, DEFAULT_LOOKBACK_NS}, }; const DEFAULT_TAIL_LIMIT: u64 = 100; +const DEFAULT_RANGE_LOOKBACK_NS: i64 = 60 * 60 * 1_000_000_000; const DEFAULT_TAIL_LOOKBACK_NS: i64 = 60 * 60 * 1_000_000_000; const MAX_TAIL_DELAY_SECONDS: u64 = 5; const TAIL_IDLE_SLEEP_MS: u64 = 200; @@ -77,7 +79,10 @@ struct RangeQueryParams { limit: Option, start: Option, end: Option, + since: Option, step: Option, + interval: Option, + direction: Option, } #[derive(Debug, Deserialize)] @@ -162,7 +167,12 @@ async fn instant_query( sql ); let rows = execute_query(state.client(), &sql).await?; - let streams = rows_to_streams(state.schema(), rows, &expr.pipeline)?; + let streams = rows_to_streams( + state.schema(), + rows, + &expr.pipeline, + StreamOptions::default(), + )?; Ok(Json(LokiResponse::success(streams))) } @@ -171,27 +181,23 @@ async fn range_query( Query(params): Query, ) -> Result, AppError> { log::debug!( - "range query received: query=`{}` limit={:?} start={:?} end={:?} step={:?}", + "range query received: query=`{}` limit={:?} start={:?} end={:?} step={:?} direction={:?}", params.query, params.limit, params.start, params.end, - params.step + params.step, + params.direction ); - let start = params - .start - .ok_or_else(|| AppError::BadRequest("start is required".into()))?; - let end = params - .end - .ok_or_else(|| AppError::BadRequest("end is required".into()))?; - - if start >= end { - return Err(AppError::BadRequest( - "start must be smaller than end".into(), - )); - } + let (start, end) = resolve_range_bounds(params.start, params.end, params.since.as_deref())?; + let interval_ns = parse_optional_duration_ns(params.interval.as_deref(), "interval")?; if let Some(metric) = state.parse_metric(¶ms.query)? { + if interval_ns.is_some() { + return Err(AppError::BadRequest( + "metric queries do not support the `interval` parameter".into(), + )); + } let step_raw = params .step .as_deref() @@ -226,6 +232,7 @@ async fn range_query( return Ok(Json(metric_matrix(samples))); } + let order = parse_range_direction(params.direction.as_deref())?; let expr = state.parse(¶ms.query)?; let limit = state.clamp_limit(params.limit); @@ -236,7 +243,7 @@ async fn range_query( start_ns: Some(start), end_ns: Some(end), limit, - order: SqlOrder::Asc, + order, }, )?; @@ -248,7 +255,12 @@ async fn range_query( sql ); let rows = execute_query(state.client(), &sql).await?; - let streams = rows_to_streams(state.schema(), rows, &expr.pipeline)?; + let stream_options = StreamOptions { + direction: StreamDirection::from(order), + interval_ns, + interval_start_ns: start.into(), + }; + let streams = rows_to_streams(state.schema(), rows, &expr.pipeline, stream_options)?; Ok(Json(LokiResponse::success(streams))) } @@ -309,12 +321,16 @@ async fn tail_logs( } fn parse_step_duration(step_raw: &str) -> Result { - match DurationValue::parse_literal(step_raw) { + parse_duration_field(step_raw, "step") +} + +fn parse_duration_field(raw: &str, field: &str) -> Result { + match DurationValue::parse_literal(raw) { Ok(value) => Ok(value), - Err(literal_err) => match parse_numeric_step_seconds(step_raw) { + Err(literal_err) => match parse_numeric_step_seconds(raw) { Ok(value) => Ok(value), Err(numeric_err) => Err(AppError::BadRequest(format!( - "invalid step duration `{step_raw}`: {literal_err}; {numeric_err}" + "invalid {field} `{raw}`: {literal_err}; {numeric_err}" ))), }, } @@ -346,6 +362,55 @@ fn parse_numeric_step_seconds(step_raw: &str) -> Result { .map_err(|err| format!("failed to convert numeric seconds to duration: {err}")) } +fn parse_optional_duration_ns(raw: Option<&str>, field: &str) -> Result, AppError> { + raw.map(|text| parse_duration_field(text, field).map(|value| value.as_nanoseconds())) + .transpose() +} + +fn parse_range_direction(raw: Option<&str>) -> Result { + match raw { + None => Ok(SqlOrder::Desc), + Some(text) => { + let normalized = text.trim().to_ascii_lowercase(); + match normalized.as_str() { + "forward" => Ok(SqlOrder::Asc), + "backward" => Ok(SqlOrder::Desc), + "" => Err(AppError::BadRequest( + "direction must be `forward` or `backward`".into(), + )), + _ => Err(AppError::BadRequest(format!( + "direction must be `forward` or `backward`, got `{}`", + text + ))), + } + } + } +} + +fn resolve_range_bounds( + start_param: Option, + end_param: Option, + since_param: Option<&str>, +) -> Result<(i64, i64), AppError> { + let now = current_time_ns(); + let end_ns = end_param.unwrap_or(now); + let since_ns = parse_optional_duration_ns(since_param, "since")?; + let start_ns = if let Some(start) = start_param { + start + } else if let Some(since_ns) = since_ns { + let anchor = if end_ns > now { now } else { end_ns }; + anchor.saturating_sub(since_ns) + } else { + end_ns.saturating_sub(DEFAULT_RANGE_LOOKBACK_NS) + }; + if start_ns >= end_ns { + return Err(AppError::BadRequest( + "start must be smaller than end".into(), + )); + } + Ok((start_ns, end_ns)) +} + fn clamp_metric_step_ns(range_ns: i64, requested_step_ns: i64, max_buckets: i64) -> i64 { if range_ns <= 0 || requested_step_ns <= 0 { return requested_step_ns; @@ -538,7 +603,7 @@ async fn tail_loop( sleep(Duration::from_millis(TAIL_IDLE_SLEEP_MS)).await; continue; } - let streams = entries_to_streams(filtered)?; + let streams = entries_to_streams(filtered, StreamOptions::default())?; let payload = serde_json::to_string(&tail_chunk(streams)) .map_err(|err| AppError::Internal(format!("failed to encode tail payload: {err}")))?; socket @@ -650,9 +715,11 @@ impl TailRequest { #[cfg(test)] mod tests { use super::{ - ProcessedEntry, TailCursor, clamp_metric_step_ns, filter_tail_entries, - parse_constant_vector_expr, + DEFAULT_RANGE_LOOKBACK_NS, ProcessedEntry, TailCursor, clamp_metric_step_ns, + current_time_ns, filter_tail_entries, parse_constant_vector_expr, + parse_optional_duration_ns, parse_range_direction, resolve_range_bounds, }; + use crate::databend::SqlOrder; use std::collections::BTreeMap; #[test] @@ -740,4 +807,51 @@ mod tests { assert_eq!(clamp_metric_step_ns(-10, 1_000_000, 600), 1_000_000); assert_eq!(clamp_metric_step_ns(1_000, 0, 600), 0); } + + #[test] + fn range_direction_defaults_to_backward() { + assert_eq!(parse_range_direction(None).unwrap(), SqlOrder::Desc); + } + + #[test] + fn range_direction_accepts_forward() { + assert_eq!( + parse_range_direction(Some("FORWARD")).unwrap(), + SqlOrder::Asc + ); + assert_eq!( + parse_range_direction(Some(" backward ")).unwrap(), + SqlOrder::Desc + ); + } + + #[test] + fn range_direction_rejects_invalid() { + assert!(parse_range_direction(Some("")).is_err()); + assert!(parse_range_direction(Some("sideways")).is_err()); + } + + #[test] + fn resolves_range_bounds_with_defaults() { + let (start, end) = resolve_range_bounds(None, None, None).unwrap(); + let now = current_time_ns(); + assert!(end <= now); + assert_eq!(end - start, DEFAULT_RANGE_LOOKBACK_NS); + } + + #[test] + fn resolves_range_bounds_with_since() { + let (start, end) = resolve_range_bounds(None, Some(2_000_000_000), Some("1s")).unwrap(); + assert_eq!(end, 2_000_000_000); + assert_eq!(start, 1_000_000_000); + } + + #[test] + fn parse_optional_duration_handles_invalid_input() { + assert!(parse_optional_duration_ns(Some("foo"), "interval").is_err()); + assert_eq!( + parse_optional_duration_ns(Some("2s"), "interval").unwrap(), + Some(2_000_000_000) + ); + } } diff --git a/src/app/responses.rs b/src/app/responses.rs index 5f14911..e420c6b 100644 --- a/src/app/responses.rs +++ b/src/app/responses.rs @@ -18,18 +18,51 @@ use databend_driver::Row; use serde::Serialize; use crate::{ - databend::{MetricMatrixSample, MetricSample, SchemaAdapter}, + databend::{MetricMatrixSample, MetricSample, SchemaAdapter, SqlOrder}, error::AppError, logql::Pipeline, }; +#[derive(Clone, Copy)] +pub(crate) enum StreamDirection { + Forward, + Backward, +} + +impl From for StreamDirection { + fn from(value: SqlOrder) -> Self { + match value { + SqlOrder::Asc => StreamDirection::Forward, + SqlOrder::Desc => StreamDirection::Backward, + } + } +} + +#[derive(Clone, Copy)] +pub(crate) struct StreamOptions { + pub direction: StreamDirection, + pub interval_ns: Option, + pub interval_start_ns: i128, +} + +impl Default for StreamOptions { + fn default() -> Self { + Self { + direction: StreamDirection::Forward, + interval_ns: None, + interval_start_ns: 0, + } + } +} + pub(crate) fn rows_to_streams( schema: &SchemaAdapter, rows: Vec, pipeline: &Pipeline, + options: StreamOptions, ) -> Result, AppError> { let entries = collect_processed_entries(schema, rows, pipeline)?; - entries_to_streams(entries) + entries_to_streams(entries, options) } pub(crate) fn collect_processed_entries( @@ -52,6 +85,7 @@ pub(crate) fn collect_processed_entries( pub(crate) fn entries_to_streams( entries: Vec, + options: StreamOptions, ) -> Result, AppError> { let mut buckets: BTreeMap = BTreeMap::new(); for entry in entries { @@ -64,7 +98,7 @@ pub(crate) fn entries_to_streams( } let mut result = Vec::with_capacity(buckets.len()); for bucket in buckets.into_values() { - result.push(bucket.into_stream()); + result.push(bucket.into_stream(&options)); } Ok(result) } @@ -137,13 +171,28 @@ impl StreamBucket { } } - fn into_stream(mut self) -> LokiStream { + fn into_stream(mut self, options: &StreamOptions) -> LokiStream { self.values.sort_by_key(|(ts, _)| *ts); - let values = self - .values - .into_iter() - .map(|(ts, line)| [ts.to_string(), line]) - .collect(); + let values = if let Some(interval_ns) = options.interval_ns { + let mut filtered = Vec::with_capacity(self.values.len()); + let step = i128::from(interval_ns.max(1)); + let mut next_allowed = options.interval_start_ns; + for (ts, line) in self.values.into_iter() { + if ts < next_allowed { + continue; + } + filtered.push((ts, line)); + next_allowed = ts.saturating_add(step); + } + filtered + } else { + self.values + }; + let iter: Box> = match options.direction { + StreamDirection::Forward => Box::new(values.into_iter()), + StreamDirection::Backward => Box::new(values.into_iter().rev()), + }; + let values = iter.map(|(ts, line)| [ts.to_string(), line]).collect(); LokiStream { stream: self.labels, values, diff --git a/src/databend/core.rs b/src/databend/core.rs index a225b34..1abd4a3 100644 --- a/src/databend/core.rs +++ b/src/databend/core.rs @@ -208,7 +208,7 @@ pub struct LabelQueryBounds { pub end_ns: Option, } -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum SqlOrder { Asc, Desc,