Skip to content

Commit 01e4fdc

Browse files
committed
fix: merge and DatedStats
1 parent fba51f1 commit 01e4fdc

File tree

7 files changed

+142
-134
lines changed

7 files changed

+142
-134
lines changed

src/analytics.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use crate::{
4141
},
4242
option::Mode,
4343
parseable::PARSEABLE,
44-
stats::{self, Stats},
44+
stats::{FullStats, Stats},
4545
storage, HTTP_CLIENT,
4646
};
4747

@@ -170,7 +170,7 @@ fn total_event_stats() -> (Stats, Stats, Stats) {
170170
let mut deleted_json_bytes: u64 = 0;
171171

172172
for stream in PARSEABLE.streams.list() {
173-
let Some(stats) = stats::get_current_stats(&stream, "json") else {
173+
let Some(stats) = FullStats::get_current(&stream, "json") else {
174174
continue;
175175
};
176176
total_events += stats.lifetime_stats.events;

src/catalog/mod.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use crate::{
3333
option::Mode,
3434
parseable::PARSEABLE,
3535
query::PartialTimeFilter,
36-
stats::{event_labels_date, get_current_stats, storage_size_labels_date, update_deleted_stats},
36+
stats::{event_labels_date, storage_size_labels_date, update_deleted_stats, FullStats},
3737
storage::{
3838
object_storage::manifest_path, ObjectStorage, ObjectStorageError, ObjectStoreFormat,
3939
},
@@ -181,7 +181,7 @@ pub async fn update_snapshot(
181181
if let Some(mut manifest) = storage.get_manifest(&path).await? {
182182
manifest.apply_change(change);
183183
storage.put_manifest(&path, manifest).await?;
184-
let stats = get_current_stats(stream_name, "json");
184+
let stats = FullStats::get_current(stream_name, "json");
185185
if let Some(stats) = stats {
186186
meta.stats = stats;
187187
}
@@ -307,7 +307,7 @@ async fn create_manifest(
307307
};
308308
manifests.push(new_snapshot_entry);
309309
meta.snapshot.manifest_list = manifests;
310-
let stats = get_current_stats(stream_name, "json");
310+
let stats = FullStats::get_current(stream_name, "json");
311311
if let Some(stats) = stats {
312312
meta.stats = stats;
313313
}

src/handlers/http/cluster/mod.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use actix_web::http::header::{self, HeaderMap};
2525
use actix_web::web::Path;
2626
use actix_web::Responder;
2727
use bytes::Bytes;
28-
use chrono::Utc;
28+
use chrono::{NaiveDate, Utc};
2929
use clokwerk::{AsyncScheduler, Interval};
3030
use http::{header as http_header, StatusCode};
3131
use itertools::Itertools;
@@ -368,7 +368,7 @@ pub async fn sync_role_update_with_ingestors(
368368
}
369369

370370
pub fn fetch_daily_stats_from_ingestors(
371-
date: &str,
371+
date: NaiveDate,
372372
stream_meta_list: &[ObjectStoreFormat],
373373
) -> Result<Stats, StreamError> {
374374
// for the given date, get the stats from the ingestors
@@ -378,7 +378,7 @@ pub fn fetch_daily_stats_from_ingestors(
378378

379379
for meta in stream_meta_list.iter() {
380380
for manifest in meta.snapshot.manifest_list.iter() {
381-
if manifest.time_lower_bound.date_naive().to_string() == date {
381+
if manifest.time_lower_bound.date_naive() == date {
382382
events_ingested += manifest.events_ingested;
383383
ingestion_size += manifest.ingestion_size;
384384
storage_size += manifest.storage_size;

src/handlers/http/logstream.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use crate::metadata::SchemaVersion;
3535
use crate::parseable::{StreamNotFound, PARSEABLE};
3636
use crate::rbac::role::Action;
3737
use crate::rbac::Users;
38-
use crate::stats::StatsParams;
38+
use crate::stats::{FullStats, Stats, StatsParams};
3939
use crate::storage::retention::Retention;
4040
use crate::storage::{StreamInfo, StreamType};
4141
use crate::utils::actix::extract_session_key_from_req;
@@ -211,11 +211,12 @@ pub async fn get_stats(
211211
return Err(StreamNotFound(stream_name.clone()).into());
212212
}
213213

214-
if let Some(stats) = params.get_stats(&stream_name) {
214+
if let Some(date) = params.date {
215+
let stats = Stats::for_stream_on_date(date, &stream_name);
215216
return Ok(HttpResponse::build(StatusCode::OK).json(stats));
216217
}
217218

218-
let stats = stats::get_current_stats(&stream_name, "json")
219+
let stats = FullStats::get_current(&stream_name, "json")
219220
.ok_or_else(|| StreamNotFound(stream_name.clone()))?;
220221
let time = Utc::now();
221222
let stats = {

src/prism/home/mod.rs

+8-37
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,10 @@ use tracing::error;
2929
use crate::{
3030
alerts::{get_alerts_info, AlertError, AlertsInfo, ALERTS},
3131
correlation::{CorrelationError, CORRELATIONS},
32-
handlers::http::{
33-
cluster::fetch_daily_stats_from_ingestors,
34-
logstream::{error::StreamError, get_stats_date},
35-
},
32+
handlers::http::logstream::error::StreamError,
3633
parseable::PARSEABLE,
3734
rbac::{map::SessionKey, role::Action, Users},
38-
stats::Stats,
35+
stats::{DatedStats, Stats},
3936
storage::{ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY},
4037
users::{dashboards::DASHBOARDS, filters::FILTERS},
4138
};
@@ -47,14 +44,6 @@ struct StreamInfo {
4744
stats_summary: Stats,
4845
}
4946

50-
#[derive(Debug, Serialize, Default)]
51-
struct DatedStats {
52-
date: String,
53-
events: u64,
54-
ingestion_size: u64,
55-
storage_size: u64,
56-
}
57-
5847
#[derive(Debug, Serialize)]
5948
struct TitleAndId {
6049
title: String,
@@ -155,8 +144,8 @@ pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, Pr
155144
.checked_sub_signed(chrono::Duration::days(i))
156145
.ok_or_else(|| anyhow::Error::msg("Date conversion faield"))
157146
.unwrap()
147+
.date_naive()
158148
})
159-
.map(|date| date.format("%Y-%m-%d").to_string())
160149
.collect_vec();
161150

162151
let mut stream_details = Vec::new();
@@ -193,7 +182,11 @@ pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, Pr
193182
}
194183

195184
for date in dates.into_iter() {
196-
let dated_stats = stats_for_date(date, stream_wise_ingestor_stream_json.clone()).await?;
185+
let Some(dated_stats) =
186+
DatedStats::for_all_streams(date, &stream_wise_ingestor_stream_json).await?
187+
else {
188+
continue;
189+
};
197190
summary.stats_summary.events += dated_stats.events;
198191
summary.stats_summary.ingestion += dated_stats.ingestion_size;
199192
summary.stats_summary.storage += dated_stats.storage_size;
@@ -213,28 +206,6 @@ pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, Pr
213206
})
214207
}
215208

216-
async fn stats_for_date(
217-
date: String,
218-
stream_wise_meta: HashMap<String, Vec<ObjectStoreFormat>>,
219-
) -> Result<DatedStats, PrismHomeError> {
220-
// collect stats for all the streams for the given date
221-
let mut details = DatedStats {
222-
date: date.clone(),
223-
..Default::default()
224-
};
225-
226-
for (stream, meta) in stream_wise_meta {
227-
let querier_stats = get_stats_date(&stream, &date).await?;
228-
let ingestor_stats = fetch_daily_stats_from_ingestors(&date, &meta)?;
229-
// collect date-wise stats for all streams
230-
details.events += querier_stats.events + ingestor_stats.events;
231-
details.ingestion_size += querier_stats.ingestion + ingestor_stats.ingestion;
232-
details.storage_size += querier_stats.storage + ingestor_stats.storage;
233-
}
234-
235-
Ok(details)
236-
}
237-
238209
#[derive(Debug, thiserror::Error)]
239210
pub enum PrismHomeError {
240211
#[error("Error: {0}")]

src/prism/logstream/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use crate::{
3434
query::update_schema_when_distributed,
3535
},
3636
parseable::{StreamNotFound, PARSEABLE},
37-
stats,
37+
stats::FullStats,
3838
storage::{retention::Retention, StreamInfo, StreamType},
3939
LOCK_EXPECT,
4040
};
@@ -93,7 +93,7 @@ async fn get_stream_schema_helper(stream_name: &str) -> Result<Arc<Schema>, Stre
9393
}
9494

9595
async fn get_stats(stream_name: &str) -> Result<QueriedStats, PrismLogstreamError> {
96-
let stats = stats::get_current_stats(stream_name, "json")
96+
let stats = FullStats::get_current(stream_name, "json")
9797
.ok_or_else(|| StreamNotFound(stream_name.to_owned()))?;
9898

9999
let ingestor_stats = if PARSEABLE

0 commit comments

Comments
 (0)