Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: memory/cpu efficient handling of stream configurations #1191

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

de-sh
Copy link
Contributor

@de-sh de-sh commented Feb 16, 2025

…bles

Fixes #XXXX.

Description


This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • New Features

    • Now supports multiple custom partitions for improved flexibility in data ingestion and management.
  • Improved Error Handling

    • Enhanced header parsing and validation provide clearer, more precise feedback on input issues.
  • Refactor

    • Streamlined metadata migration and schema conversion processes for increased consistency and robustness.

Copy link

coderabbitai bot commented Feb 16, 2025

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

The changes update the handling of custom partitions across the codebase. Variable names, function parameters, and structure fields previously named in the singular (e.g. custom_partition) have been replaced with plural forms (e.g. custom_partitions) to support multiple values. In addition, error handling has been refined (for example, the addition of a HeaderParsing error variant) and certain type signatures have been adjusted. Minor modifications to test cases and header conversion logic ensure that the new multiple-partition model is consistently applied across HTTP handlers, metadata, migration, stream management, storage, and JSON utilities.

Changes

File(s) Change Summary
src/handlers/http/ingest.rs In the test case non_object_arr_is_err, the call to convert_array_to_object now uses &[] instead of None as a parameter.
src/handlers/http/logstream.rs Updated the StreamInfo struct: changed custom_partition to custom_partitions; added a new HeaderParsing error variant in StreamError; replaced .into() with .try_into().unwrap() for header extraction.
src/handlers/http/modal/utils/ingest_utils.rs
src/handlers/http/modal/utils/logstream_utils.rs
In ingest_utils.rs, renamed variables and updated function signatures to use custom_partitions. In logstream_utils.rs, added type aliases, updated the conversion from HeaderMap to use TryFrom, and introduced parse_custom_partition and parse_time_partition_limit for enhanced header parsing with error handling.
src/metadata.rs
src/migration/mod.rs
In LogStreamMetadata, the custom_partition field is replaced with custom_partitions (a Vec<String>) and the constructor/migration logic is updated accordingly.
src/parseable/mod.rs
src/parseable/streams.rs
Function signatures are adjusted to replace custom_partition with custom_partitions (accepting slices or vectors). Getter/setter methods in Stream have been renamed and modified for the new multiple-partition format, and related stream creation/update methods now support multiple values.
src/static_schema.rs The convert_static_schema_to_arrow_schema function now accepts an optional time partition and a slice of custom partitions, streamlining its internal validation logic.
src/storage/mod.rs
src/storage/object_storage.rs
Updated ObjectStoreFormat and StreamInfo: the single optional custom_partition is replaced with a Vec<String>; added custom serialization/deserialization functions. The trait method for updating partitions was renamed and its parameter type changed to a slice.
src/utils/json/flatten.rs
src/utils/json/mod.rs
Modified JSON utility functions to require custom_partitions as a slice instead of an optional string. Test cases have been updated accordingly, and new helper functions for deserializing/serializing custom partitions were added.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant HTTPHandler
    participant PutStreamHeaders
    participant HeaderParser

    Client->>HTTPHandler: Sends HTTP headers
    HTTPHandler->>PutStreamHeaders: Invokes TryFrom(HeaderMap)
    PutStreamHeaders->>HeaderParser: parse_custom_partition & parse_time_partition_limit
    alt Successful parsing
        HeaderParser-->>PutStreamHeaders: Validated partition values
        PutStreamHeaders-->>HTTPHandler: Constructed PutStreamHeaders object
    else Parsing error
        HeaderParser-->>PutStreamHeaders: Return HeaderParseError
        PutStreamHeaders-->>HTTPHandler: Error response (BAD_REQUEST)
    end
Loading
sequenceDiagram
    participant Client
    participant ParseableModule
    participant StreamModule

    Client->>ParseableModule: create_stream(..., custom_partitions: [values])
    ParseableModule->>StreamModule: set_custom_partitions([values])
    StreamModule->>StreamModule: Call prepare_parquet / convert_disk_files_to_parquet
    StreamModule-->>ParseableModule: Processed stream data
    ParseableModule-->>Client: Stream creation/update result
Loading

Suggested reviewers

  • nitisht
  • nikhilsinhaparseable

Poem

I’m a rabbit with a hop so spry,
Code changes bloom like carrots in the sky.
From single to many, partitions multiply,
Errors now parsed, no bugs left to spy.
With each refactored line, my heart does sing –
Celebrate these changes, enjoy the code’s new spring!
🥕🐇 Happy coding!


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (8)
src/handlers/http/modal/utils/logstream_utils.rs (3)

50-52: Use the type alias consistently for custom_partitions.

Lines 50–52 define pub custom_partitions: Vec<String> in PutStreamHeaders despite introducing a CustomPartition type alias earlier (line 35). To maintain consistency and clarity, consider using Vec<CustomPartition> here.

 pub struct PutStreamHeaders {
     pub time_partition: Option<FieldName>,
     pub time_partition_limit: Option<NonZeroU32>,
-    pub custom_partitions: Vec<String>,
+    pub custom_partitions: Vec<CustomPartition>,
     pub static_schema_flag: bool,
     pub update_stream_flag: bool,
     pub stream_type: StreamType,

102-114: Trim spaces when parsing custom partitions.

The parse_custom_partition function splits on commas but does not trim potential whitespace around each partition key, which may lead to unexpected strings like " partition" if headers contain extra spaces.

 pub fn parse_custom_partition(
     custom_partition: &str,
 ) -> Result<Vec<CustomPartition>, HeaderParseError> {
-    let custom_partition_list = custom_partition
-        .split(',')
-        .map(String::from)
-        .collect::<Vec<String>>();
+    let custom_partition_list = custom_partition
+        .split(',')
+        .map(|s| s.trim().to_string())
+        .collect::<Vec<CustomPartition>>();

     if custom_partition_list.len() > 3 {
         return Err(HeaderParseError::TooManyPartitions);
     }
     Ok(custom_partition_list)
 }

119-125: Clarify error message for invalid time partition limit.

Currently, the ZeroOrNegative variant is triggered for any parsing error, including non-numeric inputs (e.g., "abc"). Consider renaming the variant or separating these conditions to avoid confusion.

src/parseable/mod.rs (2)

326-326: Rename local variable to reflect multiple custom partitions.

The local variable custom_partition is receiving a vector (stream_metadata.custom_partitions). Rename it to custom_partitions for consistency with the new plural field name.

-        let custom_partition = stream_metadata.custom_partitions;
+        let custom_partitions = stream_metadata.custom_partitions;

460-464: Avoid using expect("Is Some") when verifying membership.

Lines 460–464 check that time_partition.is_some() and then call time_partition.as_ref().expect("Is Some"). While logically safe, replacing expect with a pattern matching or early return approach can improve readability and avoid panic-oriented code paths.

-if time_partition.is_some()
-    && !custom_partitions.is_empty()
-    && custom_partitions.contains(time_partition.as_ref().expect("Is Some"))
-{
+if let Some(tp) = time_partition {
+    if !custom_partitions.is_empty() && custom_partitions.contains(tp) {
+        return Err(CreateStreamError::Custom {
+            msg: format!("time partition {tp:?} cannot be set as custom partition"),
+            status: StatusCode::BAD_REQUEST,
+        }
+        .into());
+    }
 }
src/parseable/streams.rs (1)

558-564: Consider returning references or slices to avoid cloning.

Currently, both get_custom_partitions (line 558) and set_custom_partitions (line 644) clone or replace the entire vector of partitions. This is functional and likely not performance-critical for small vectors, but if the partitions list scales, consider returning references or using interior mutability constructs to optimize repeated lookups or partial updates.

Also applies to: 644-646

src/storage/object_storage.rs (1)

657-681: Consider using const for magic numbers in file suffix replacement.

The hardcoded value 3 in the file suffix replacement logic could be made more maintainable by using a named constant.

+ const BASE_SUFFIX_REPLACEMENTS: usize = 3;
- file_suffix = str::replacen(filename, ".", "/", 3);
+ file_suffix = str::replacen(filename, ".", "/", BASE_SUFFIX_REPLACEMENTS);

  if !custom_partitions.is_empty() {
-     file_suffix = str::replacen(filename, ".", "/", 3 + custom_partitions.len());
+     file_suffix = str::replacen(filename, ".", "/", BASE_SUFFIX_REPLACEMENTS + custom_partitions.len());
  }
src/utils/json/mod.rs (1)

148-194: Well-implemented backward compatibility for custom partitions.

The new serialization/deserialization implementations handle the conversion between comma-separated strings and vector format, ensuring backward compatibility with existing data.

A few suggestions to enhance the implementation:

  1. Consider adding input validation in serialize_custom_partitions to ensure no partition contains commas
  2. Add documentation about the format requirements and limitations
 pub fn serialize_custom_partitions<S>(value: &[String], serializer: S) -> Result<S::Ok, S::Error>
 where
     S: serde::Serializer,
 {
     if value.is_empty() {
         // Skip serializing this field
         serializer.serialize_none()
     } else {
+        // Validate that no partition contains commas to prevent serialization issues
+        if value.iter().any(|s| s.contains(',')) {
+            return Err(S::Error::custom("Custom partitions cannot contain commas"));
+        }
         serializer.serialize_str(&value.join(","))
     }
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 523ecc7 and cf09d87.

📒 Files selected for processing (13)
  • src/handlers/http/ingest.rs (3 hunks)
  • src/handlers/http/logstream.rs (7 hunks)
  • src/handlers/http/modal/utils/ingest_utils.rs (4 hunks)
  • src/handlers/http/modal/utils/logstream_utils.rs (3 hunks)
  • src/metadata.rs (3 hunks)
  • src/migration/mod.rs (2 hunks)
  • src/parseable/mod.rs (11 hunks)
  • src/parseable/streams.rs (11 hunks)
  • src/static_schema.rs (2 hunks)
  • src/storage/mod.rs (4 hunks)
  • src/storage/object_storage.rs (3 hunks)
  • src/utils/json/flatten.rs (7 hunks)
  • src/utils/json/mod.rs (7 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
🔇 Additional comments (24)
src/parseable/streams.rs (2)

307-307: LGTM for retrieving custom partitions separately.

Extracting custom_partitions in line 307 before parquet preparation is a clear and concise approach.


403-414: Good implementation of custom partition encoding and sorting.

Looping over each partition to apply DELTA_BYTE_ARRAY encoding and add sorting columns is logically correct and aligns with the multi-partition design.

src/static_schema.rs (4)

61-62: LGTM! Parameter types updated for better type safety.

The parameter types have been improved:

  • time_partition: &Option<String> better represents the optional nature of time partitions
  • custom_partitions: &[String] better represents multiple custom partitions

70-78: LGTM! Improved custom partition validation.

The validation of custom partitions has been streamlined:

  • Direct iteration through custom partitions is more efficient
  • Early return with descriptive error message improves error handling

81-84: LGTM! Enhanced time partition check.

The use of is_some_and improves readability and makes the code more idiomatic.


119-123: LGTM! Improved error handling for time partition.

The error message now includes the specific time partition field name, making debugging easier.

src/metadata.rs (2)

86-86: LGTM! Updated field type to support multiple custom partitions.

The field type has been changed from Option<String> to Vec<String> to support multiple custom partitions.


97-99: LGTM! Simplified parameter handling in new method.

The method signature and initialization have been updated to:

  • Accept Option<String> directly for time partition
  • Accept Vec<String> for custom partitions

Also applies to: 112-114

src/handlers/http/modal/utils/ingest_utils.rs (2)

74-74: LGTM! Improved handling of custom partitions.

The changes enhance clarity and consistency:

  • Variable name updated to reflect multiple partitions
  • Condition improved to check for non-empty custom partitions
  • Parameter updated to pass custom partitions slice

Also applies to: 77-77, 82-82


156-159: LGTM! Enhanced parameter type in get_custom_partition_values.

The parameter type has been updated to &[String] to better represent multiple custom partitions.

src/storage/mod.rs (2)

110-115: LGTM! Added custom serialization for multiple custom partitions.

The changes enhance the handling of custom partitions:

  • Field type updated to Vec<String>
  • Custom serialization and deserialization added for compatibility

229-229: LGTM! Updated default initialization.

The default initialization has been updated to use an empty vector for custom partitions.

src/handlers/http/ingest.rs (2)

511-519: LGTM! Parameter update aligns with custom partitions refactor.

The change from None to &[] correctly reflects the new multiple custom partitions support, maintaining test coverage while adapting to the API changes.


708-717: LGTM! Consistent parameter update for array tests.

The change from None to &[] is consistently applied here as well, ensuring test coverage for array handling with the new custom partitions implementation.

src/storage/object_storage.rs (1)

179-194: LGTM! Efficient handling of custom partitions.

The changes improve efficiency through:

  1. Early return for empty partitions
  2. Direct slice assignment instead of optional single value
  3. Clear plural naming reflecting multiple partitions support
src/utils/json/mod.rs (2)

28-30: LGTM! Clean import organization.

The imports are well-organized and specifically import the required custom partition utilities.


38-46: Function signature change improves type safety.

The change from Option<&String> to &[String] for custom_partitions parameter is a good improvement as it:

  1. Removes the need for Option since an empty slice can represent no partitions
  2. Explicitly shows that multiple partitions are supported
  3. Maintains memory efficiency by using a slice instead of owning the data
src/migration/mod.rs (2)

275-276: LGTM! Consistent field renaming.

The field renaming from custom_partition to custom_partitions in the struct destructuring is consistent with the broader changes.


310-311: LGTM! Consistent struct initialization.

The field usage in the LogStreamMetadata struct initialization is consistent with the field renaming.

src/utils/json/flatten.rs (2)

95-98: Function signature change improves validation capabilities.

The change from Option<&String> to &[String] for custom_partitions parameter allows validating multiple partitions while maintaining the same validation rules.


99-134: Validation logic handles multiple partitions efficiently.

The validation logic now iterates through each partition while maintaining all the necessary checks for:

  • Null values
  • Empty values
  • Objects
  • Arrays
  • Periods in values
src/handlers/http/logstream.rs (3)

366-367: LGTM! Consistent field usage.

The field usage in StreamInfo is consistent with the broader changes to support multiple partitions.


558-560: Improved error handling for header parsing.

The addition of the HeaderParsing error variant with appropriate error message improves error handling and user feedback.


632-634: LGTM! Safer header parsing in tests.

The change from .into() to .try_into().unwrap() makes the potential failure points more explicit in the test code.

@de-sh de-sh marked this pull request as draft February 16, 2025 12:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant