-
Notifications
You must be signed in to change notification settings - Fork 61
feat: add incremental partition assign/unassign and batch seek to kafka consumer #249
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…set, and wrapper exposure Signed-off-by: Shanicky Chen <[email protected]>
…rts and type hints, clean up unused import Signed-off-by: Shanicky Chen <[email protected]>
Enhance simulated Kafka consumer with advanced partition management and offset seeking, improving feature parity and usability. - Add incremental_assign and incremental_unassign methods to BaseConsumer - Implement seek_partitions for atomic offset updates on assignments - Expose new methods in Consumer API for both sync and async usage - Introduce comprehensive tests for assignment, unassignment, and seeking - Increase simulation fidelity for complex Kafka consumer workflows Signed-off-by: Shanicky Chen <[email protected]>
Refactor consumer partition assignment and seek logic for correctness. This prevents partial state mutations on invalid seek requests, ensuring atomicity and internal consistency. - Validate all partitions in seek before mutating state - Only clear buffer and update offsets after successful validation - Replace HashSet deduplication with iterator-based checks - Add tests verifying buffer and offsets remain unchanged on seek error - Improve code readability and robustness for edge cases Signed-off-by: Shanicky Chen <[email protected]>
Refactor the closure used for checking topic-partition assignment to a single-expression inline style for improved readability and idiomatic Rust usage. No logic or behavior is changed. - Replace multi-line closure block with inline closure in `.any()` call - Improve code clarity and stylistic consistency - Align with Rust code style and lint best practices - Change is isolated to one function, with no API or logic impact Signed-off-by: Shanicky Chen <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This pull request adds incremental partition assignment/unassignment and batch seek functionality to the simulated Kafka consumer. The changes enable more granular partition management and multi-partition offset manipulation, improving simulation fidelity.
Key Changes
- Added
incremental_assignandincremental_unassignmethods for modifying partition assignments without replacing the entire assignment - Implemented
seek_partitionsfor atomically updating offsets across multiple partitions with validation - Extended test coverage with comprehensive tests for new APIs and edge cases
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| madsim-rdkafka/src/sim/topic_partition_list.rs | Added elements() method to retrieve all partition list elements |
| madsim-rdkafka/src/sim/consumer.rs | Implemented incremental assign/unassign and batch seek APIs with validation, plus comprehensive test suite |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| { | ||
| let current_tpl = self.tpl.lock(); | ||
| for requested in &topic_partition_list.list { | ||
| match requested.offset { | ||
| Offset::Invalid => { | ||
| return Err(KafkaError::Seek(format!( | ||
| "invalid offset for {}:{}", | ||
| requested.topic, requested.partition | ||
| ))); | ||
| } | ||
| Offset::Stored => { | ||
| return Err(KafkaError::Seek(format!( | ||
| "stored offset is not supported for {}:{}", | ||
| requested.topic, requested.partition | ||
| ))); | ||
| } | ||
| Offset::OffsetTail(_) => { | ||
| return Err(KafkaError::Seek(format!( | ||
| "offset tail is not supported for {}:{}", | ||
| requested.topic, requested.partition | ||
| ))); | ||
| } | ||
| _ => {} | ||
| } | ||
|
|
||
| current_tpl | ||
| .list | ||
| .iter() | ||
| .find(|elem| { | ||
| elem.topic == requested.topic && elem.partition == requested.partition | ||
| }) | ||
| .ok_or_else(|| { | ||
| KafkaError::Seek(format!( | ||
| "partition {}:{} is not currently assigned", | ||
| requested.topic, requested.partition | ||
| )) | ||
| })?; | ||
| } | ||
| } | ||
|
|
||
| self.msgs.lock().clear(); | ||
| let mut current_tpl = self.tpl.lock(); | ||
|
|
||
| for requested in &topic_partition_list.list { | ||
| let current = current_tpl | ||
| .list | ||
| .iter_mut() | ||
| .find(|elem| elem.topic == requested.topic && elem.partition == requested.partition) | ||
| .expect("partition must exist after validation"); | ||
| current.offset = requested.offset; | ||
| } |
Copilot
AI
Nov 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Race condition between validation and offset update. The lock is released after validation (line 229) and reacquired at line 232. Between these operations, another thread could call incremental_unassign to remove a partition that was just validated, causing the expect at line 239 to panic.
Consider holding a single lock for the entire validation and update operation, or re-validate after reacquiring the lock. The buffer clear at line 231 should also happen atomically with the offset updates to maintain consistency.
Refactor BaseConsumer::assign to acquire the tpl mutex only once. This change improves thread safety and efficiency by performing validation, message clearing, and offset updates atomically. - Acquire tpl lock once for all assignment operations - Move message clearing and offset updates into the critical section - Reduce risk of race conditions and inconsistent state - Simplify code for better readability and maintainability Signed-off-by: Shanicky Chen <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Refactor partition management in BaseConsumer for performance and correctness. Replace linear scans with HashSet/HashMap-based lookups, and clarify message buffering semantics. - Use HashSet to deduplicate assignments and unassignments efficiently - Employ HashMap for constant-time partition offset updates in seeking - Improve documentation on buffered message behavior for assign/unassign - Ensure idempotence and robustness in incremental_* operations - Reduce code complexity and potential for subtle bugs Signed-off-by: Shanicky Chen <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Refactor partition offset update logic to validate all requested partitions before mutating internal state in BaseConsumer. - Separate validation and mutation into two distinct phases - Avoid clearing message queue if any partition assignment is invalid - Use collected indices for efficient offset updates after validation - Prevent partial state changes on error for safety and correctness - Improve maintainability by consolidating redundant loops Signed-off-by: Shanicky Chen <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
BugenZhao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
Is it ready to merge now? |
Yes |
Summary
This pull request introduces incremental partition assignment/unassignment and batch seek functionality to the simulated Kafka consumer in
madsim-rdkafka, enhancing its fidelity to real Kafka client behavior. These changes enable more granular control over partition management and offset manipulation, supporting advanced simulation and client use cases.Details
incremental_assignandincremental_unassignAPIs to allow adding or removing specific partitions without affecting the full assignment, closely mirroring librdkafka semantics.seek_partitionsto atomically update offsets for multiple partitions in a batch, with strict validation to reject unsupported offset types (such asOffset::StoredandOffsetTail).Architecture impact
The changes shift partition assignment from a bulk, all-or-nothing operation to a more flexible, incremental model. This enables workflows such as sticky rebalancing and dynamic partition allocation, and improves simulation realism for consumer group scenarios. Offset management is now more precise, with atomic batch operations and buffer invalidation ensuring correctness.
Technical highlights
Testing strategy
Unit and integration tests exercise all new APIs and cover error scenarios, ensuring correctness under concurrent and edge conditions. Manual verification of polling behavior after seek and assignment changes is recommended for confidence in buffer state management.