Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: get_stats merge and simplify #1150

Open
wants to merge 27 commits into
base: main
Choose a base branch
from

Conversation

de-sh
Copy link
Contributor

@de-sh de-sh commented Jan 30, 2025

Fixes #XXXX.

Description

  • Use serde and Query to abstract away param extraction
  • Merge get_stats from logstream.rs(used in both ingestor and standalone) and query_logstream.rs(used in querier)
  • Organize imports in affected files

This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • New Features
    • Enabled date-specific statistics retrieval, offering more detailed insights across streams.
    • Introduced new methods for enhanced statistics retrieval, including metrics for current, lifetime, and deleted events.
  • Refactor
    • Streamlined the aggregation and merging of statistics to ensure consistent and efficient JSON responses.
    • Simplified query parameter handling for a clearer, more robust statistics endpoint experience.
  • Chores
    • Removed deprecated functions and redundant code to improve overall maintainability.

@de-sh de-sh changed the title refactor: simplify query param extraction refactor: simplify query param extraction and merge get_stats Jan 31, 2025
@de-sh de-sh changed the title refactor: simplify query param extraction and merge get_stats refactor: get_stats merge and simplify Feb 4, 2025
Copy link

coderabbitai bot commented Feb 13, 2025

Walkthrough

This pull request refactors the statistics handling and query processing across several modules. It introduces a new merge method on QueriedStats to aggregate statistics and removes the legacy merge_quried_stats function. The get_stats implementation in the logstream handler is updated to use typed query parameters instead of the raw HTTP request, and outdated methods for fetching date-specific stats have been removed or consolidated. Additionally, new structs and methods are added to the stats module for date-based queries, and routing changes have been made to reflect the updated aggregation logic.

Changes

File(s) Change Summary
src/handlers/http/cluster/utils.rs Added QueriedStats::merge method; removed merge_quried_stats function.
src/prism/logstream/mod.rs Replaced call to merge_quried_stats with QueriedStats::merge and removed its import.
src/handlers/http/logstream.rs Removed get_stats_date; updated get_stats to use Query<StatsParams> and streamlined error handling.
src/handlers/http/modal/query/querier_logstream.rs Removed get_stats function; consolidated imports and removed unused ingestor fetching functions.
src/handlers/http/modal/query_server.rs Updated route to invoke logstream::get_stats instead of querier_logstream::get_stats.
src/stats.rs Added DatedStats, StatsParams, and new methods (for_all_streams, for_stream_on_date, fetch_from_ingestors); updated FullStats::get_current.
src/analytics.rs Updated import and replaced call to stats::get_current_stats with FullStats::get_current.
src/catalog/mod.rs Replaced calls to get_current_stats with FullStats::get_current.
src/handlers/http/cluster/mod.rs Removed fetch_daily_stats_from_ingestors function.
src/prism/home/mod.rs Removed local DatedStats and stats_for_date function; now uses DatedStats::for_all_streams in home response generation.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant HTTP_Server
    participant LogstreamHandler
    participant StatsModule
    participant HttpResponse

    Client->>HTTP_Server: GET /stats?date=YYYY-MM-DD
    HTTP_Server->>LogstreamHandler: Invoke get_stats(Query<StatsParams>, stream_name)
    LogstreamHandler->>StatsModule: If date exists, call for_stream_on_date(date, stream)
    StatsModule-->>LogstreamHandler: Return computed stats
    LogstreamHandler->>HTTP_Server: Build JSON response via HttpResponse::build(OK)
    HTTP_Server-->>Client: Return HTTP 200 with stats JSON
Loading

Possibly related PRs

Suggested labels

for next release

Suggested reviewers

  • nikhilsinhaparseable

Poem

I'm a rabbit with a heart so light,
Hopping through code, day and night.
Merging stats with keen insight,
Refactored paths shining bright.
Cheers to changes in every byte!
🐇💻 Hop on, let's code it right!


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0a07fbc and f820090.

📒 Files selected for processing (1)
  • src/stats.rs (3 hunks)
🔇 Additional comments (7)
src/stats.rs (7)

19-37: Well-organized imports with clear structure.

The imports have been nicely reorganized with proper grouping, making the codebase more maintainable.


47-109: Good implementation of Stats utility methods.

The implementation of get_current, get_lifetime, and get_deleted methods follows a consistent pattern with proper error handling using ok()? instead of unwrap(). This prevents possible panics if metrics retrieval fails.


135-160: Well-implemented fetching logic from ingestors.

The fetch_from_ingestors method effectively retrieves stats by iterating through manifests and filtering by date. The error handling is appropriately implemented with a Result return type.


163-193: Good implementation of DatedStats aggregation.

The DatedStats struct and its for_all_streams method provide a clean way to aggregate statistics across multiple streams for a specific date. The method properly combines stats from both queriers and ingestors.


202-213: Good refactoring of FullStats::get_current.

The implementation now leverages the new Stats methods, which simplifies the code and improves maintainability.


266-266: Update to use the renamed method.

The call to FullStats::get_current correctly uses the renamed method.


330-347: Proper date format validation implemented.

The StatsParams struct with the custom deserialize_date function provides robust date format validation by explicitly parsing with the format pattern %Y-%m-%d. This addresses a previous review comment about adding date format validation.

✨ Finishing Touches
  • 📝 Generate Docstrings

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (4)
src/handlers/http/logstream.rs (3)

224-227: Refined function signature for typed query parameters.

Accepting Query<StatsParams> instead of an HttpRequest is a clean improvement that reduces manual parsing and fosters clarity. As a future enhancement, consider validating the stream_name and ensuring the StatsParams fields are in valid ranges, if necessary.


278-283: Consistent string formatting for ingestion stats.

Appending "Bytes" is straightforward. Watch out for very large values (potential numeric overflow) although typical use should suffice.


296-308: Repeated block for stats instantiation.

The logic looks correct and consistent with the earlier block. Consider refactoring these two blocks into a shared function to avoid duplication (DRY principle).

src/stats.rs (1)

211-214: Add documentation for the new struct.

The StatsParams struct lacks documentation explaining its purpose and the expected format of the date field.

Add documentation comments:

+/// Parameters for retrieving stream statistics.
+/// Used to filter statistics by date.
 #[derive(Debug, Deserialize)]
 pub struct StatsParams {
+    /// Optional date in YYYY-MM-DD format to filter statistics.
     pub date: Option<String>,
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 20e66a4 and 82f04d6.

📒 Files selected for processing (5)
  • src/handlers/http/cluster/utils.rs (1 hunks)
  • src/handlers/http/logstream.rs (8 hunks)
  • src/handlers/http/modal/query/querier_logstream.rs (2 hunks)
  • src/handlers/http/modal/query_server.rs (1 hunks)
  • src/stats.rs (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (15)
src/handlers/http/logstream.rs (11)

19-20: Imports look good.

The addition of StatusCode and Query is consistent with the refactor to handle HTTP responses and structured query parameters, respectively.


39-39: Usage of StatsParams.

Good inclusion of StatsParams to facilitate typed, date-based or other parameterized stats retrieval.


45-46: Imports for cluster utilities.

Importing fetch_stats_from_ingestors and related types ensures a clearer modular approach to gather stats, especially when running in Query mode.


230-230: Test-mode condition check.

Using cfg!(not(test)) here prevents potential test failures due to missing in-memory data. This approach is sensible; just be sure any integration or end-to-end tests still handle stream setup as needed.


247-248: Short-circuit stats retrieval.

Returning early if params.get_stats(&stream_name) yields a valid result is a neat step to accommodate date-based or custom stats. This approach looks good.


254-262: Conditional ingestor stats retrieval.

Fetching ingestor stats only if we're in Query mode and the stream is internal. This seems intentional but might skip potential external streams. Double-check whether external streams are meant to be included or not.


286-288: Size formatting for storage stats.

Matches the ingestion stats approach and is consistent. No issues detected.


315-317: Merging multiple ingestor stats.

QueriedStats::merge is a clear, reusable method to unify multiple stats sources. This is a clean approach.


322-322: Returning aggregated stats.

Building the final HTTP response with StatusCode::OK and the merged stats is straightforward and clear.


633-638: Expanded test imports.

The usage of anyhow::bail and StatsParams suggests robust test coverage. Consider adding more tests for different StatsParams variants.

Also applies to: 640-640


650-661: New test for nonexistent stream.

Good test coverage ensuring get_stats returns StreamNotFound for an unknown stream name. Adding more tests (e.g., handling date-based scenarios) would further strengthen confidence.

src/handlers/http/modal/query/querier_logstream.rs (3)

20-20: Import consolidation.

The updated imports from actix_web properly reduce clutter. No issues found.


31-32: Shifted cluster and error imports.

Removing references to old stats-collection methods and using StreamError from logstream::error aligns with the consolidated approach to stats retrieval.


36-36: Added stats module import.

Including stats here suggests potential usage in stream deletion or synchronization logic. Looks consistent with the updated design.

src/handlers/http/modal/query_server.rs (1)

286-286: LGTM!

The route handler change aligns with the PR objectives to merge and simplify the get_stats functionality.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/handlers/http/logstream.rs (1)

254-262: Improve error handling for ingestor stats.

The error handling for get_stream could be more explicit. Consider separating the stream type check to provide clearer error messages.

-    let ingestor_stats = if PARSEABLE.options.mode == Mode::Query
-        && PARSEABLE
-            .get_stream(&stream_name)
-            .is_ok_and(|s| s.get_stream_type() == StreamType::Internal)
-    {
-        Some(fetch_stats_from_ingestors(&stream_name).await?)
-    } else {
-        None
-    };
+    let ingestor_stats = if PARSEABLE.options.mode == Mode::Query {
+        match PARSEABLE.get_stream(&stream_name) {
+            Ok(stream) if stream.get_stream_type() == StreamType::Internal => {
+                Some(fetch_stats_from_ingestors(&stream_name).await?)
+            }
+            Ok(_) => None,
+            Err(e) => return Err(e.into()),
+        }
+    } else {
+        None
+    };
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 82f04d6 and 3902bd2.

📒 Files selected for processing (1)
  • src/handlers/http/logstream.rs (8 hunks)
🔇 Additional comments (3)
src/handlers/http/logstream.rs (3)

19-48: LGTM!

The import changes are well-organized and properly support the new functionality.


224-227: LGTM! Function signature improvements.

The change from HttpRequest to Query<StatsParams> provides better type safety and structured parameter handling.


635-636: LGTM! Test updates.

The test updates correctly reflect the new function signature and import changes.

Also applies to: 647-650

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
src/handlers/http/logstream.rs (1)

275-315: Reduce code duplication in stats creation.

The stats creation code is identical in both match arms of stream_meta.first_event_at. This duplication can be eliminated.

Consider moving the common code outside the match:

let ingestion_stats = IngestionStats::new(
    stats.current_stats.events,
    format!("{} Bytes", stats.current_stats.ingestion),
    stats.lifetime_stats.events,
    format!("{} Bytes", stats.lifetime_stats.ingestion),
    stats.deleted_stats.events,
    format!("{} Bytes", stats.deleted_stats.ingestion),
    "json",
);
let storage_stats = StorageStats::new(
    format!("{} Bytes", stats.current_stats.storage),
    format!("{} Bytes", stats.lifetime_stats.storage),
    format!("{} Bytes", stats.deleted_stats.storage),
    "parquet",
);

let stats = QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats);
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3902bd2 and 92fba76.

📒 Files selected for processing (1)
  • src/handlers/http/logstream.rs (11 hunks)
🧰 Additional context used
🪛 GitHub Actions: Lint, Test and Coverage Report
src/handlers/http/logstream.rs

[error] 328-328: This if statement can be collapsed. For further information visit https://rust-lang.github.io/rust-clippy/master/index.html#collapsible_if.

⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (2)
src/handlers/http/logstream.rs (2)

19-48: LGTM! Well-organized imports.

The imports are logically grouped and support the refactored stats functionality.


226-229: LGTM! Improved parameter handling.

The function signature change to use Query<StatsParams> provides better structure and type safety for query parameters.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/handlers/http/logstream.rs (1)

278-283: Extract repeated size formatting into a helper function.

The " Bytes" suffix formatting is duplicated across multiple places. Consider extracting this into a helper function to improve maintainability and reduce code duplication.

+ fn format_size(size: u64) -> String {
+     format!("{} Bytes", size)
+ }

  let ingestion_stats = IngestionStats::new(
      stats.current_stats.events,
-     format!("{} Bytes", stats.current_stats.ingestion),
+     format_size(stats.current_stats.ingestion),
      stats.lifetime_stats.events,
-     format!("{} Bytes", stats.lifetime_stats.ingestion),
+     format_size(stats.lifetime_stats.ingestion),
      stats.deleted_stats.events,
-     format!("{} Bytes", stats.deleted_stats.ingestion),
+     format_size(stats.deleted_stats.ingestion),
      "json",
  );
  let storage_stats = StorageStats::new(
-     format!("{} Bytes", stats.current_stats.storage),
-     format!("{} Bytes", stats.lifetime_stats.storage),
-     format!("{} Bytes", stats.deleted_stats.storage),
+     format_size(stats.current_stats.storage),
+     format_size(stats.lifetime_stats.storage),
+     format_size(stats.deleted_stats.storage),
      "parquet",
  );

Also applies to: 286-289, 298-303, 306-309

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 92fba76 and 9f54e9e.

📒 Files selected for processing (1)
  • src/handlers/http/logstream.rs (11 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (3)
src/handlers/http/logstream.rs (3)

19-47: LGTM! Import changes look good.

The imports have been well-organized and the new imports (Query, StatsParams) align with the PR's objective of using structured query parameters.


168-177: Extract repeated stream existence check into a helper function.

The same error handling pattern for checking stream existence is duplicated across multiple functions. This can be refactored into a reusable helper function to improve maintainability and reduce code duplication.

Also applies to: 195-204, 235-244, 327-337, 386-395, 440-449, 468-477


226-249: LGTM! Improved parameter handling and error checking.

The changes effectively:

  • Use structured query parameters via StatsParams
  • Add support for date-specific queries
  • Improve error handling with better stream existence checks

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/stats.rs (1)

244-255: Add date range validation.

While the date parsing is handled correctly, consider adding validation for reasonable date ranges to prevent processing of far future or past dates.

 pub fn deserialize_date<'de, D>(deserializer: D) -> Result<Option<NaiveDate>, D::Error>
 where
     D: Deserializer<'de>,
 {
     let Some(s) = Option::<String>::deserialize(deserializer)? else {
         return Ok(None);
     };

-    NaiveDate::parse_from_str(&s, "%Y-%m-%d")
+    let date = NaiveDate::parse_from_str(&s, "%Y-%m-%d")
+        .map_err(serde::de::Error::custom)?;
+    
+    // Validate date range (e.g., not too far in the past or future)
+    let today = chrono::Local::now().date_naive();
+    if date > today || date < today.pred().pred().pred().pred().pred() { // 5 days in the past
+        return Err(serde::de::Error::custom("Date out of valid range"));
+    }
+    
+    Ok(Some(date))
-        .map(Some)
-        .map_err(serde::de::Error::custom)
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9f54e9e and 5e7d8d1.

📒 Files selected for processing (1)
  • src/stats.rs (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (2)
src/stats.rs (2)

21-33: LGTM! Well-organized imports and clean struct updates.

The imports are properly organized, and the structs are correctly updated with serde traits while maintaining backward compatibility.

Also applies to: 36-48


212-242: Add error handling for metric retrieval.

The current implementation uses unwrap() which can cause panics if metric retrieval fails. Consider using proper error handling to gracefully handle failures.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
src/stats.rs (1)

225-255: ⚠️ Potential issue

Add error handling for metric retrieval.

The current implementation uses unwrap() which can cause panics if metric retrieval fails. Consider using proper error handling to gracefully handle failures.

Apply this diff to add error handling:

 impl StatsParams {
     pub fn get_stats(self, stream_name: &str) -> Option<Stats> {
         let date = self.date?.to_string();
         let event_labels = event_labels_date(stream_name, "json", &date);
         let storage_size_labels = storage_size_labels_date(stream_name, &date);
         let events_ingested = EVENTS_INGESTED_DATE
             .get_metric_with_label_values(&event_labels)
-            .unwrap()
+            .ok()?
             .get() as u64;
         let ingestion_size = EVENTS_INGESTED_SIZE_DATE
             .get_metric_with_label_values(&event_labels)
-            .unwrap()
+            .ok()?
             .get() as u64;
         let storage_size = EVENTS_STORAGE_SIZE_DATE
             .get_metric_with_label_values(&storage_size_labels)
-            .unwrap()
+            .ok()?
             .get() as u64;

         Some(Stats {
             events: events_ingested,
             ingestion: ingestion_size,
             storage: storage_size,
         })
     }
 }
🧹 Nitpick comments (1)
src/handlers/http/logstream.rs (1)

203-292: Consider extracting byte size formatting into a helper function.

The code repeats string concatenation for byte sizes in multiple places. Consider extracting this into a helper function for better maintainability and consistency.

Create a helper function:

fn format_bytes(bytes: u64) -> String {
    format!("{} Bytes", bytes)
}

Then use it throughout the code:

-format!("{} Bytes", stats.current_stats.ingestion)
+format_bytes(stats.current_stats.ingestion)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5e7d8d1 and 2a3825a.

📒 Files selected for processing (3)
  • src/handlers/http/logstream.rs (7 hunks)
  • src/handlers/http/modal/query_server.rs (1 hunks)
  • src/stats.rs (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/handlers/http/modal/query_server.rs
🔇 Additional comments (5)
src/stats.rs (3)

21-33: LGTM!

The imports are well-organized and necessary for the new functionality.


35-48: LGTM!

The addition of Serialize and Deserialize traits to the structs is appropriate for the new functionality.


257-268: LGTM!

The date deserialization function is well-implemented with proper error handling and format validation.

src/handlers/http/logstream.rs (2)

19-47: LGTM!

The imports are well-organized and necessary for the new functionality.


576-576: LGTM!

The import path update is correct.

coderabbitai[bot]
coderabbitai bot previously approved these changes Feb 16, 2025
coderabbitai[bot]
coderabbitai bot previously approved these changes Mar 6, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (5)
src/handlers/http/logstream.rs (2)

242-242: Use convenience builder for clarity.
Instead of HttpResponse::build(StatusCode::OK).json(...), consider using HttpResponse::Ok().json(stats) for readability and consistency across the codebase.

- return Ok(HttpResponse::build(StatusCode::OK).json(stats));
+ return Ok(HttpResponse::Ok().json(stats));

527-527: Add coverage for date-based stats retrieval in tests.
You introduced usage of PutStreamHeaders here, but there's no test verifying get_stats with date queries. Consider adding a test to ensure all date-based paths work as intended.

Would you like assistance creating that test?

src/stats.rs (3)

74-104: Consider concurrency or partial aggregation handling.
While iterating over each stream for the date-based statistics, you might benefit from parallel processing if performance is a concern. Also, partial errors might deserve special handling if some streams are unavailable.


113-172: Consider fallback or logging for missing gauges in FullStats::get_current.
Currently, the code returns None if any label is missing, which can mask partially available stats. Logging or partial fallback might improve visibility into potential data gaps.


290-307: Optional date parsing logic is well-structured.
The custom deserializer for %Y-%m-%d is straightforward. Consider adding minimal documentation indicating that only the YYYY-MM-DD format is supported.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fba51f1 and 01e4fdc.

📒 Files selected for processing (7)
  • src/analytics.rs (2 hunks)
  • src/catalog/mod.rs (3 hunks)
  • src/handlers/http/cluster/mod.rs (3 hunks)
  • src/handlers/http/logstream.rs (5 hunks)
  • src/prism/home/mod.rs (3 hunks)
  • src/prism/logstream/mod.rs (3 hunks)
  • src/stats.rs (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/prism/logstream/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
🔇 Additional comments (19)
src/catalog/mod.rs (3)

36-36: Good update on import to include the new FullStats struct

The import now correctly includes the FullStats struct which is used in the updated function calls below.


184-187: Well-executed refactor to use the encapsulated method

The change from using the standalone get_current_stats function to the encapsulated FullStats::get_current method aligns with good object-oriented design principles by grouping related functionality together.


310-313: Consistent update to match earlier refactoring

This change maintains consistency with the earlier refactoring, using the same FullStats::get_current method here as well.

src/handlers/http/cluster/mod.rs (2)

28-28: Good addition of NaiveDate import

Importing NaiveDate from chrono is necessary for the type signature change below.


381-381: More robust date comparison using NaiveDate

The direct comparison of NaiveDate objects is cleaner and less error-prone than string-based date comparisons.

src/analytics.rs (2)

44-44: Updated import to use FullStats directly

This change is consistent with the refactoring approach used in other files.


173-175: Consistent migration to use FullStats::get_current method

This change follows the same pattern as in other files, replacing the standalone function with the encapsulated method.

src/prism/home/mod.rs (4)

35-36: Good restructuring of imports

Importing DatedStats from the stats module instead of defining it locally improves code organization.


147-148: Simplified date handling without string formatting

Using date_naive() directly instead of formatting dates as strings is cleaner and avoids potential formatting issues.


185-189: Improved error handling with pattern matching

The use of pattern matching with Some provides better error handling for the case when stats retrieval returns None. This eliminates potential null pointer exceptions.


1-240:

✅ Verification successful

Verify that the removed DatedStats struct has an equivalent in the stats module

The local DatedStats struct and stats_for_date function have been removed. Please confirm that equivalent functionality exists in the stats module to support the DatedStats::for_all_streams method call.


🏁 Script executed:

#!/bin/bash
# Check if the DatedStats struct and related methods exist in the stats module
grep -A 20 "struct DatedStats" src/stats.rs
grep -A 10 "for_all_streams" src/stats.rs

Length of output: 1180


Confirmation: Equivalent DatedStats functionality exists in the stats module.

The src/stats.rs file now includes the DatedStats struct along with the asynchronous for_all_streams method, which provides the equivalent functionality to what was previously implemented locally. No further action is needed.

src/handlers/http/logstream.rs (4)

19-20: All newly added imports look good.
No issues found with these additions, as each import is actively used in the updated code.

Also applies to: 38-38, 44-45


201-204: Verify all references to get_stats after signature change.
The function signature now takes Query<StatsParams> instead of extracting parameters directly through HttpRequest. Ensure that all routing configurations and upstream calls have been updated accordingly to avoid runtime errors.


214-216: Beware potential panic from unwrap() in Stats::for_stream_on_date.
This call relies on underlying Prometheus gauge values that may not exist for certain dates, leading to .unwrap() panics. A safer approach would be to handle the missing metric and return default values or an error. This was previously suggested.


219-219: Good error handling pattern.
Using .ok_or_else(...) to return StreamNotFound if no stats are present is a clean approach to handle missing metrics.

src/stats.rs (4)

19-20: Imports appear appropriate for the new date-based functionality.
All added imports are justified by the new stats retrieval and date parsing logic.

Also applies to: 21-21, 25-25, 28-29


41-41: Derive attributes look good.
Implementing Default, Serialize, Deserialize, and other traits aligns well with usage in metrics and serialization contexts.


48-72: Revisit .unwrap() usage for metric lookups.
This could panic if the metric labels are not found. The need for safer error handling was raised previously.


226-226: Good workflow updating object store after local metric changes.
Storing updated stats into the object store if present helps maintain consistent state.

Comment on lines 370 to 373
pub fn fetch_daily_stats_from_ingestors(
date: &str,
date: NaiveDate,
stream_meta_list: &[ObjectStoreFormat],
) -> Result<Stats, StreamError> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improved type safety with NaiveDate parameter

Changing the parameter type from a string to NaiveDate improves type safety and eliminates potential string formatting issues when handling dates.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
src/stats.rs (1)

47-97: 🛠️ Refactor suggestion

Add error handling for metric retrieval in Stats::for_stream_on_date.

The current implementation uses unwrap() which can cause panics if metric retrieval fails. Consider using proper error handling to gracefully handle failures.

Apply this diff to add error handling:

 pub fn for_stream_on_date(date: NaiveDate, stream_name: &str) -> Stats {
     let date = date.to_string();
     let event_labels = event_labels_date(stream_name, "json", &date);
     let storage_size_labels = storage_size_labels_date(stream_name, &date);
     let events_ingested = EVENTS_INGESTED_DATE
         .get_metric_with_label_values(&event_labels)
-        .unwrap()
+        .unwrap_or_else(|_| EVENTS_INGESTED_DATE.with_label_values(&event_labels))
         .get() as u64;
     let ingestion_size = EVENTS_INGESTED_SIZE_DATE
         .get_metric_with_label_values(&event_labels)
-        .unwrap()
+        .unwrap_or_else(|_| EVENTS_INGESTED_SIZE_DATE.with_label_values(&event_labels))
         .get() as u64;
     let storage_size = EVENTS_STORAGE_SIZE_DATE
         .get_metric_with_label_values(&storage_size_labels)
-        .unwrap()
+        .unwrap_or_else(|_| EVENTS_STORAGE_SIZE_DATE.with_label_values(&storage_size_labels))
         .get() as u64;
🧹 Nitpick comments (1)
src/handlers/http/logstream.rs (1)

219-240: Consider extracting the QueriedStats creation logic to a method.

The code for creating a QueriedStats instance from FullStats is somewhat complex and could benefit from being moved into a dedicated method, possibly on the QueriedStats struct itself.

 QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats)

Consider adding a method to QueriedStats:

impl QueriedStats {
    // This creates a QueriedStats from FullStats
    pub fn from_full_stats(stream_name: &str, stats: &FullStats, time: DateTime<Utc>) -> Self {
        let ingestion_stats = IngestionStats::new(
            stats.current_stats.events,
            stats.current_stats.ingestion,
            stats.lifetime_stats.events,
            stats.lifetime_stats.ingestion,
            stats.deleted_stats.events,
            stats.deleted_stats.ingestion,
            "json",
        );
        let storage_stats = StorageStats::new(
            stats.current_stats.storage,
            stats.lifetime_stats.storage,
            stats.deleted_stats.storage,
            "parquet",
        );

        Self::new(stream_name, time, ingestion_stats, storage_stats)
    }
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 01e4fdc and 43db1b6.

📒 Files selected for processing (7)
  • src/analytics.rs (2 hunks)
  • src/catalog/mod.rs (3 hunks)
  • src/handlers/http/cluster/mod.rs (0 hunks)
  • src/handlers/http/logstream.rs (5 hunks)
  • src/prism/home/mod.rs (3 hunks)
  • src/prism/logstream/mod.rs (3 hunks)
  • src/stats.rs (3 hunks)
💤 Files with no reviewable changes (1)
  • src/handlers/http/cluster/mod.rs
🚧 Files skipped from review as they are similar to previous changes (3)
  • src/catalog/mod.rs
  • src/prism/logstream/mod.rs
  • src/analytics.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
🔇 Additional comments (10)
src/prism/home/mod.rs (3)

35-35: Update to DatedStats import.

The import has been changed to use the new DatedStats struct from the stats module, which aligns with the refactoring of the stats functionality.


147-147: Simplification of date formatting.

The date formatting has been simplified by using the .date_naive() method directly, making the code more concise.


184-189: Clean usage of the new DatedStats::for_all_streams method.

The code has been refactored to use the newly created DatedStats::for_all_streams method, utilizing Rust's let-else pattern for cleaner error handling. This is a good improvement, moving functionality to the appropriate struct.

src/handlers/http/logstream.rs (4)

19-45: Improved organization of imports.

The imports have been reorganized and grouped more logically. The changes reflect the shift to using structured query parameters via the Query extractor instead of manual request parsing.


201-204: Enhanced function signature for better parameter handling.

The function signature has been improved to use Query<StatsParams> for structured parameter extraction instead of manually parsing the HttpRequest. This follows Actix Web best practices and makes the code more maintainable.


214-217: Streamlined date-based stats retrieval.

The implementation now elegantly handles date parameters using the Stats::for_stream_on_date method. This simplifies the code significantly compared to manual parameter extraction and validation.


242-242: Improved response construction.

Using HttpResponse::build().json() is a cleaner approach for constructing JSON responses compared to previous implementation.

src/stats.rs (3)

100-130: Well-structured DatedStats implementation.

The new DatedStats struct and its for_all_streams method provide a clean way to aggregate statistics across multiple streams for a specific date. The method correctly combines stats from both querier and ingestor, which supports the PR's goal of merging stats functionality.


139-198: Improved FullStats::get_current implementation with proper error handling.

The new method replaces the previous standalone function and uses ok()? for error handling instead of unwrap(), which makes the code more robust. This is a good refactoring that moves functionality to the appropriate struct.


316-333: Add date format validation to StatsParams.

The deserialize_date function correctly parses dates but doesn't validate the format before attempting to parse. Consider adding additional validation to ensure the input is in the expected format.

Add basic validation before parsing:

pub fn deserialize_date<'de, D>(deserializer: D) -> Result<Option<NaiveDate>, D::Error>
where
    D: Deserializer<'de>,
{
    let Some(s) = Option::<String>::deserialize(deserializer)? else {
        return Ok(None);
    };

+   // Basic validation for date format (YYYY-MM-DD)
+   if s.len() != 10 || s.chars().filter(|&c| c == '-').count() != 2 {
+       return Err(serde::de::Error::custom("Date must be in YYYY-MM-DD format"));
+   }

    NaiveDate::parse_from_str(&s, "%Y-%m-%d")
        .map(Some)
        .map_err(serde::de::Error::custom)
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Inline review comments failed to post. This is likely due to GitHub's limits when posting large numbers of comments. If you are seeing this consistently it is likely a permissions issue. Please check "Moderation" -> "Code review limits" under your organization settings.

Actionable comments posted: 2

🔭 Outside diff range comments (1)
src/handlers/http/query.rs (1)

170-184: ⚠️ Potential issue

Duplicate fetch_schema call within the conditional block.

Lines 173 and 174 both call fetch_schema(table).await. This appears unintentional and may cause redundant requests and overhead. Consider removing the extra call:

   for table in tables {
-      if let Ok(new_schema) = fetch_schema(table).await {
           if let Ok(new_schema) = fetch_schema(table).await {
               // commit schema merges the schema internally and updates the schema in storage.
               commit_schema_to_storage(table, new_schema.clone()).await?;
               commit_schema(table, Arc::new(new_schema))?;
           }
       }
🧹 Nitpick comments (6)
CONTRIBUTING.md (2)

5-5: Improve grammar for clarity.

Change "Thank you for considering to contribute to Parseable." to use the gerund form:

-Thank you for considering to contribute to Parseable.
+Thank you for considering contributing to Parseable.
🧰 Tools
🪛 LanguageTool

[grammar] ~5-~5: The verb ‘considering’ is used with the gerund form.
Context: ...contribute to Parseable. Thank you for considering to contribute to Parseable. The goal of this document...

(ADMIT_ENJOY_VB)


16-16: Avoid repetitively starting sentences with “You've.”

Reword this line to vary the sentence structure and improve readability. For example:

- - You've read the [Parseable documentation](https://www.parseable.com/docs).
+ - Please review the [Parseable documentation](https://www.parseable.com/docs).
🧰 Tools
🪛 LanguageTool

[style] ~16-~16: Three successive sentences begin with the same word. Consider rewording the sentence or use a thesaurus to find a synonym.
Context: ...th GitHub Pull Requests(PR) workflow. - You've read the [Parseable documentation](h...

(ENGLISH_WORD_REPEAT_BEGINNING_RULE)

src/cli.rs (1)

90-92: Add a brief comment about Kafka usage.

Introducing a dedicated Kafka field within LocalStoreArgs is sensible. Consider adding a short doc comment explaining the optional nature of Kafka config:

 #[cfg(feature = "kafka")]
 #[command(flatten)]
+/// Kafka settings for local store (enabled only with the "kafka" feature).
 pub kafka: KafkaConfig,
src/parseable/mod.rs (3)

1-19: Add test coverage for new functionalities.

This file introduces many new methods, yet there appear to be no direct tests verifying their behavior or error paths. Recommended to add unit tests (or integration tests) to validate both normal and edge-case scenarios for stream creation, updates, and storage validation.

Would you like me to propose or outline test cases for these methods?


58-60: Consider further modularization to reduce file size and complexity.

Currently, this file has grown significantly in length, making it harder to navigate and maintain. You might consider splitting some of these methods (e.g., creation, updating, validation) into separate modules or files to improve clarity.


69-107: Confirm concurrency approach for the global static PARSEABLE.

Using a global shared state in a multi-threaded environment increases the risk of unexpected race conditions or partial updates. Consider documenting thread-safety guarantees and referencing concurrency tests or patterns to ensure that global usage remains safe.

🛑 Comments failed to post (2)
src/parseable/mod.rs (2)

372-407: 🛠️ Refactor suggestion

Mitigate potential duplication in create_stream_if_not_exists.

Similar to get_or_create_stream, this logic checks if a stream exists using a read operation (self.streams.contains) before possibly writing. Parallel callers might create duplicate streams. Consider a single locking strategy or a post-lock re-check to enforce uniqueness.


156-171: 🛠️ Refactor suggestion

Address race condition risk in get_or_create_stream.

When the code first checks for an existing stream via a read lock and then creates a new stream under a write lock, multiple threads calling this method at the same time could trigger duplicate creations. A common fix is to lock once in write mode and do a second existence check before creation.

Example refactor (simplified):

-pub fn get_or_create_stream(&self, stream_name: &str) -> StreamRef {
-    if let Ok(staging) = self.get_stream(stream_name) {
-        return staging;
-    }
-    self.streams.create(/* ... */)
+pub fn get_or_create_stream(&self, stream_name: &str) -> StreamRef {
+    let mut guard = self.streams.write().unwrap();
+    if let Some(stream) = guard.get(stream_name) {
+        return stream.clone();
+    }
+    guard.create(/* ... */)
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

    }

    /// Get the handle to a stream in staging, create one if it doesn't exist
    pub fn get_or_create_stream(&self, stream_name: &str) -> StreamRef {
        let mut guard = self.streams.write().unwrap();
        if let Some(stream) = guard.get(stream_name) {
            return stream.clone();
        }
        guard.create(
            self.options.clone(),
            stream_name.to_owned(),
            LogStreamMetadata::default(),
            self.ingestor_metadata
                .as_ref()
                .map(|meta| meta.get_ingestor_id()),
        )
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant