Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/target

.env
AGENTS.md
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,13 @@ All endpoints return Loki-compatible JSON responses and reuse the same error sha
| Endpoint | Description |
| --------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `GET /loki/api/v1/query` | Instant query. Supports the same LogQL used by Grafana's Explore panel. An optional `time` parameter (nanoseconds) defaults to "now", and the adapter automatically looks back 5 minutes when computing SQL bounds. |
| `GET /loki/api/v1/query_range` | Range query. Requires `start`/`end` nanoseconds and accepts `limit`/`step`. Log queries stream raw lines; metric queries return Loki matrix results and require a `step` value (the adapter may clamp it to keep bucket counts bounded, default cap 240 buckets). |
| `GET /loki/api/v1/query_range` | Range query. Accepts `start`/`end` (default past hour), `since` (relative duration), `limit`, `interval`, `step`, and `direction`. Log queries stream raw lines (`interval` down-samples entries, `direction` controls scan order); metric queries return Loki matrix results and require a `step` value (the adapter may clamp it to keep bucket counts bounded, default cap 240 buckets). |
| `GET /loki/api/v1/labels` | Lists known label keys for the selected schema. Optional `start`/`end` parameters (nanoseconds) fence the search window; unspecified values default to the last 5 minutes, matching Grafana's Explore defaults. |
| `GET /loki/api/v1/label/{label}/values` | Lists distinct values for a specific label key using the same optional `start`/`end` bounds as `/labels`. Works for both `loki` and `flat` schemas and automatically filters out empty strings. |
| `GET /loki/api/v1/index/stats` | Returns approximate `streams`, `chunks`, `entries`, and `bytes` counters for a selector over a `[start, end]` window. `chunks` are estimated via unique stream keys because Databend does not store Loki chunks. |
| `GET /loki/api/v1/tail` | WebSocket tail endpoint that streams live logs for a LogQL query; compatible with Grafana Explore and `logcli --tail`. |

`/query` and `/query_range` share the same LogQL parser and SQL builder. Instant queries fall back to `DEFAULT_LOOKBACK_NS` (5 minutes) when no explicit window is supplied, while range queries honor the caller's `start`/`end` bounds. `/labels` and `/label/{label}/values` delegate to schema-aware metadata lookups: the loki schema uses `map_keys`/`labels['key']` expressions, whereas the flat schema issues `SELECT DISTINCT` on the physical column and returns values in sorted order.
`/query` and `/query_range` share the same LogQL parser and SQL builder. Instant queries fall back to `DEFAULT_LOOKBACK_NS` (5 minutes) when no explicit window is supplied, while range queries default to `[now - 1h, now]` and also honor Loki's `since` helper to derive `start`. `/loki/api/v1/query_range` log queries fully implement Loki's `direction` (`forward`/`backward`) and `interval` parameters: the adapter scans in the requested direction, emits entries in that order, and down-samples each stream so successive log lines are at least `interval` apart starting from `start`. `/labels` and `/label/{label}/values` delegate to schema-aware metadata lookups: the loki schema uses `map_keys`/`labels['key']` expressions, whereas the flat schema issues `SELECT DISTINCT` on the physical column and returns values in sorted order.

### Tail streaming

Expand Down
164 changes: 139 additions & 25 deletions src/app/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ use crate::{

use super::{
responses::{
LabelsResponse, LokiResponse, ProcessedEntry, collect_processed_entries,
entries_to_streams, metric_matrix, metric_vector, rows_to_streams, tail_chunk,
LabelsResponse, LokiResponse, ProcessedEntry, StreamDirection, StreamOptions,
collect_processed_entries, entries_to_streams, metric_matrix, metric_vector,
rows_to_streams, tail_chunk,
},
state::{AppState, DEFAULT_LOOKBACK_NS},
};

const DEFAULT_TAIL_LIMIT: u64 = 100;
const DEFAULT_RANGE_LOOKBACK_NS: i64 = 60 * 60 * 1_000_000_000;
const DEFAULT_TAIL_LOOKBACK_NS: i64 = 60 * 60 * 1_000_000_000;
const MAX_TAIL_DELAY_SECONDS: u64 = 5;
const TAIL_IDLE_SLEEP_MS: u64 = 200;
Expand Down Expand Up @@ -77,7 +79,10 @@ struct RangeQueryParams {
limit: Option<u64>,
start: Option<i64>,
end: Option<i64>,
since: Option<String>,
step: Option<String>,
interval: Option<String>,
direction: Option<String>,
}

#[derive(Debug, Deserialize)]
Expand Down Expand Up @@ -162,7 +167,12 @@ async fn instant_query(
sql
);
let rows = execute_query(state.client(), &sql).await?;
let streams = rows_to_streams(state.schema(), rows, &expr.pipeline)?;
let streams = rows_to_streams(
state.schema(),
rows,
&expr.pipeline,
StreamOptions::default(),
)?;
Ok(Json(LokiResponse::success(streams)))
}

Expand All @@ -171,27 +181,23 @@ async fn range_query(
Query(params): Query<RangeQueryParams>,
) -> Result<Json<LokiResponse>, AppError> {
log::debug!(
"range query received: query=`{}` limit={:?} start={:?} end={:?} step={:?}",
"range query received: query=`{}` limit={:?} start={:?} end={:?} step={:?} direction={:?}",
params.query,
params.limit,
params.start,
params.end,
params.step
params.step,
params.direction
);
let start = params
.start
.ok_or_else(|| AppError::BadRequest("start is required".into()))?;
let end = params
.end
.ok_or_else(|| AppError::BadRequest("end is required".into()))?;

if start >= end {
return Err(AppError::BadRequest(
"start must be smaller than end".into(),
));
}
let (start, end) = resolve_range_bounds(params.start, params.end, params.since.as_deref())?;
let interval_ns = parse_optional_duration_ns(params.interval.as_deref(), "interval")?;

if let Some(metric) = state.parse_metric(&params.query)? {
if interval_ns.is_some() {
return Err(AppError::BadRequest(
"metric queries do not support the `interval` parameter".into(),
));
}
let step_raw = params
.step
.as_deref()
Expand Down Expand Up @@ -226,6 +232,7 @@ async fn range_query(
return Ok(Json(metric_matrix(samples)));
}

let order = parse_range_direction(params.direction.as_deref())?;
let expr = state.parse(&params.query)?;

let limit = state.clamp_limit(params.limit);
Expand All @@ -236,7 +243,7 @@ async fn range_query(
start_ns: Some(start),
end_ns: Some(end),
limit,
order: SqlOrder::Asc,
order,
},
)?;

Expand All @@ -248,7 +255,12 @@ async fn range_query(
sql
);
let rows = execute_query(state.client(), &sql).await?;
let streams = rows_to_streams(state.schema(), rows, &expr.pipeline)?;
let stream_options = StreamOptions {
direction: StreamDirection::from(order),
interval_ns,
interval_start_ns: start.into(),
};
let streams = rows_to_streams(state.schema(), rows, &expr.pipeline, stream_options)?;
Ok(Json(LokiResponse::success(streams)))
}

Expand Down Expand Up @@ -309,12 +321,16 @@ async fn tail_logs(
}

fn parse_step_duration(step_raw: &str) -> Result<DurationValue, AppError> {
match DurationValue::parse_literal(step_raw) {
parse_duration_field(step_raw, "step")
}

fn parse_duration_field(raw: &str, field: &str) -> Result<DurationValue, AppError> {
match DurationValue::parse_literal(raw) {
Ok(value) => Ok(value),
Err(literal_err) => match parse_numeric_step_seconds(step_raw) {
Err(literal_err) => match parse_numeric_step_seconds(raw) {
Ok(value) => Ok(value),
Err(numeric_err) => Err(AppError::BadRequest(format!(
"invalid step duration `{step_raw}`: {literal_err}; {numeric_err}"
"invalid {field} `{raw}`: {literal_err}; {numeric_err}"
))),
},
}
Expand Down Expand Up @@ -346,6 +362,55 @@ fn parse_numeric_step_seconds(step_raw: &str) -> Result<DurationValue, String> {
.map_err(|err| format!("failed to convert numeric seconds to duration: {err}"))
}

fn parse_optional_duration_ns(raw: Option<&str>, field: &str) -> Result<Option<i64>, AppError> {
raw.map(|text| parse_duration_field(text, field).map(|value| value.as_nanoseconds()))
.transpose()
}

fn parse_range_direction(raw: Option<&str>) -> Result<SqlOrder, AppError> {
match raw {
None => Ok(SqlOrder::Desc),
Some(text) => {
let normalized = text.trim().to_ascii_lowercase();
match normalized.as_str() {
"forward" => Ok(SqlOrder::Asc),
"backward" => Ok(SqlOrder::Desc),
"" => Err(AppError::BadRequest(
"direction must be `forward` or `backward`".into(),
)),
_ => Err(AppError::BadRequest(format!(
"direction must be `forward` or `backward`, got `{}`",
text
))),
}
}
}
}

fn resolve_range_bounds(
start_param: Option<i64>,
end_param: Option<i64>,
since_param: Option<&str>,
) -> Result<(i64, i64), AppError> {
let now = current_time_ns();
let end_ns = end_param.unwrap_or(now);
let since_ns = parse_optional_duration_ns(since_param, "since")?;
let start_ns = if let Some(start) = start_param {
start
} else if let Some(since_ns) = since_ns {
let anchor = if end_ns > now { now } else { end_ns };
anchor.saturating_sub(since_ns)
} else {
end_ns.saturating_sub(DEFAULT_RANGE_LOOKBACK_NS)
};
if start_ns >= end_ns {
return Err(AppError::BadRequest(
"start must be smaller than end".into(),
));
}
Ok((start_ns, end_ns))
}

fn clamp_metric_step_ns(range_ns: i64, requested_step_ns: i64, max_buckets: i64) -> i64 {
if range_ns <= 0 || requested_step_ns <= 0 {
return requested_step_ns;
Expand Down Expand Up @@ -538,7 +603,7 @@ async fn tail_loop(
sleep(Duration::from_millis(TAIL_IDLE_SLEEP_MS)).await;
continue;
}
let streams = entries_to_streams(filtered)?;
let streams = entries_to_streams(filtered, StreamOptions::default())?;
let payload = serde_json::to_string(&tail_chunk(streams))
.map_err(|err| AppError::Internal(format!("failed to encode tail payload: {err}")))?;
socket
Expand Down Expand Up @@ -650,9 +715,11 @@ impl TailRequest {
#[cfg(test)]
mod tests {
use super::{
ProcessedEntry, TailCursor, clamp_metric_step_ns, filter_tail_entries,
parse_constant_vector_expr,
DEFAULT_RANGE_LOOKBACK_NS, ProcessedEntry, TailCursor, clamp_metric_step_ns,
current_time_ns, filter_tail_entries, parse_constant_vector_expr,
parse_optional_duration_ns, parse_range_direction, resolve_range_bounds,
};
use crate::databend::SqlOrder;
use std::collections::BTreeMap;

#[test]
Expand Down Expand Up @@ -740,4 +807,51 @@ mod tests {
assert_eq!(clamp_metric_step_ns(-10, 1_000_000, 600), 1_000_000);
assert_eq!(clamp_metric_step_ns(1_000, 0, 600), 0);
}

#[test]
fn range_direction_defaults_to_backward() {
assert_eq!(parse_range_direction(None).unwrap(), SqlOrder::Desc);
}

#[test]
fn range_direction_accepts_forward() {
assert_eq!(
parse_range_direction(Some("FORWARD")).unwrap(),
SqlOrder::Asc
);
assert_eq!(
parse_range_direction(Some(" backward ")).unwrap(),
SqlOrder::Desc
);
}

#[test]
fn range_direction_rejects_invalid() {
assert!(parse_range_direction(Some("")).is_err());
assert!(parse_range_direction(Some("sideways")).is_err());
}

#[test]
fn resolves_range_bounds_with_defaults() {
let (start, end) = resolve_range_bounds(None, None, None).unwrap();
let now = current_time_ns();
assert!(end <= now);
assert_eq!(end - start, DEFAULT_RANGE_LOOKBACK_NS);
}

#[test]
fn resolves_range_bounds_with_since() {
let (start, end) = resolve_range_bounds(None, Some(2_000_000_000), Some("1s")).unwrap();
assert_eq!(end, 2_000_000_000);
assert_eq!(start, 1_000_000_000);
}

#[test]
fn parse_optional_duration_handles_invalid_input() {
assert!(parse_optional_duration_ns(Some("foo"), "interval").is_err());
assert_eq!(
parse_optional_duration_ns(Some("2s"), "interval").unwrap(),
Some(2_000_000_000)
);
}
}
67 changes: 58 additions & 9 deletions src/app/responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,51 @@ use databend_driver::Row;
use serde::Serialize;

use crate::{
databend::{MetricMatrixSample, MetricSample, SchemaAdapter},
databend::{MetricMatrixSample, MetricSample, SchemaAdapter, SqlOrder},
error::AppError,
logql::Pipeline,
};

#[derive(Clone, Copy)]
pub(crate) enum StreamDirection {
Forward,
Backward,
}

impl From<SqlOrder> for StreamDirection {
fn from(value: SqlOrder) -> Self {
match value {
SqlOrder::Asc => StreamDirection::Forward,
SqlOrder::Desc => StreamDirection::Backward,
}
}
}

#[derive(Clone, Copy)]
pub(crate) struct StreamOptions {
pub direction: StreamDirection,
pub interval_ns: Option<i64>,
pub interval_start_ns: i128,
}

impl Default for StreamOptions {
fn default() -> Self {
Self {
direction: StreamDirection::Forward,
interval_ns: None,
interval_start_ns: 0,
}
}
}

pub(crate) fn rows_to_streams(
schema: &SchemaAdapter,
rows: Vec<Row>,
pipeline: &Pipeline,
options: StreamOptions,
) -> Result<Vec<LokiStream>, AppError> {
let entries = collect_processed_entries(schema, rows, pipeline)?;
entries_to_streams(entries)
entries_to_streams(entries, options)
}

pub(crate) fn collect_processed_entries(
Expand All @@ -52,6 +85,7 @@ pub(crate) fn collect_processed_entries(

pub(crate) fn entries_to_streams(
entries: Vec<ProcessedEntry>,
options: StreamOptions,
) -> Result<Vec<LokiStream>, AppError> {
let mut buckets: BTreeMap<String, StreamBucket> = BTreeMap::new();
for entry in entries {
Expand All @@ -64,7 +98,7 @@ pub(crate) fn entries_to_streams(
}
let mut result = Vec::with_capacity(buckets.len());
for bucket in buckets.into_values() {
result.push(bucket.into_stream());
result.push(bucket.into_stream(&options));
}
Ok(result)
}
Expand Down Expand Up @@ -137,13 +171,28 @@ impl StreamBucket {
}
}

fn into_stream(mut self) -> LokiStream {
fn into_stream(mut self, options: &StreamOptions) -> LokiStream {
self.values.sort_by_key(|(ts, _)| *ts);
let values = self
.values
.into_iter()
.map(|(ts, line)| [ts.to_string(), line])
.collect();
let values = if let Some(interval_ns) = options.interval_ns {
let mut filtered = Vec::with_capacity(self.values.len());
let step = i128::from(interval_ns.max(1));
let mut next_allowed = options.interval_start_ns;
for (ts, line) in self.values.into_iter() {
if ts < next_allowed {
continue;
}
filtered.push((ts, line));
next_allowed = ts.saturating_add(step);
}
filtered
} else {
self.values
};
let iter: Box<dyn Iterator<Item = (i128, String)>> = match options.direction {
StreamDirection::Forward => Box::new(values.into_iter()),
StreamDirection::Backward => Box::new(values.into_iter().rev()),
};
let values = iter.map(|(ts, line)| [ts.to_string(), line]).collect();
LokiStream {
stream: self.labels,
values,
Expand Down
2 changes: 1 addition & 1 deletion src/databend/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ pub struct LabelQueryBounds {
pub end_ns: Option<i64>,
}

#[derive(Clone, Copy)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum SqlOrder {
Asc,
Desc,
Expand Down
Loading