Preserve stream names in adapter storage keys#1419
Preserve stream names in adapter storage keys#1419abishekgiri wants to merge 5 commits intoiii-hq:mainfrom
Conversation
|
@abishekgiri is attempting to deploy a commit to the motia Team on Vercel. A member of the Team first needs to authorize it. |
|
Updated the stream storage key handling shared by the built-in KV adapter and the Redis adapter so stream names with colons are encoded safely, group ids with colons still round-trip, and list-all metadata no longer truncates names. This also adds regression coverage for colon-heavy names and the reserved encoding prefix. |
sergiofilhowz
left a comment
There was a problem hiding this comment.
There's a formatting issue that blocked the pipeline, mind reviewing it? should be really simple
cargo fmt --all
https://github.com/iii-hq/iii/actions/runs/24058380271/job/70316261651?pr=1419
📝 WalkthroughWalkthroughReplaces hardcoded stream key formatting/parsing with shared helpers that encode stream-name segments (base64 when needed) and update KV and Redis adapters to use these helpers; adds unit and integration tests validating preservation of stream names that contain colons. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Applied the formatting fix and pushed it to this branch. cargo fmt --all -- --check passes locally now, and the Build Engine check is re-running on the updated commit. |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
engine/src/workers/stream/adapters/redis_adapter.rs (1)
423-470: Consider adding a Redis-specific integration test for stream names with colons.The KV adapter has a dedicated test (
list_all_stream_preserves_stream_names_with_colons), but the Redis adapter lacks an equivalent test. While the shared helpers are unit-tested inmod.rs, an integration test for Redis would provide confidence that the SCAN-basedlist_all_streamcorrectly handles encoded keys in a real Redis environment.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine/src/workers/stream/adapters/redis_adapter.rs` around lines 423 - 470, Add an integration test that exercises RedisAdapter::list_all_stream to ensure stream names containing colons are preserved: create a test (e.g., list_all_stream_preserves_stream_names_with_colons_redis) that boots a real Redis instance or uses the test Redis fixture, writes keys using the same storage key encoding (so parse_stream_storage_key can decode them) for a stream name containing colons and multiple group IDs, then call list_all_stream() and assert the returned StreamMetadata contains the original stream id (with colons) and the expected sorted groups; target the Redis adapter code path (list_all_stream in redis_adapter.rs) and use the same connection/publisher setup the adapter uses so SCAN is exercised end-to-end.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@engine/src/workers/stream/adapters/kv_store.rs`:
- Around line 149-154: In list_all_stream, the loop over all_keys currently
drops keys that fail parse_stream_storage_key silently; change the loop so that
when parse_stream_storage_key(&key) returns None and key.starts_with("stream:")
you emit a warning log including the offending key (e.g., via tracing::warn! or
the crate's logger) to surface unparseable keys for migration/debugging, while
preserving current behavior for non-"stream:" keys; leave
stream_map.entry(...).insert(...) logic unchanged for successfully parsed keys.
In `@engine/src/workers/stream/adapters/redis_adapter.rs`:
- Around line 443-447: The loop over keys silently skips any key where
parse_stream_storage_key(&key) returns None; add diagnostic logging for those
cases so unparseable keys aren't hidden—inside the for key in keys loop, if
parse_stream_storage_key(&key) yields None, emit a warning/debug log (e.g.,
tracing::warn! or the crate's logger) that includes the raw key value and
context (e.g., "unparseable stream storage key, skipping") so you can detect
legacy or malformed keys while still inserting parsed (stream_name, group_id)
into stream_map as currently done.
---
Nitpick comments:
In `@engine/src/workers/stream/adapters/redis_adapter.rs`:
- Around line 423-470: Add an integration test that exercises
RedisAdapter::list_all_stream to ensure stream names containing colons are
preserved: create a test (e.g.,
list_all_stream_preserves_stream_names_with_colons_redis) that boots a real
Redis instance or uses the test Redis fixture, writes keys using the same
storage key encoding (so parse_stream_storage_key can decode them) for a stream
name containing colons and multiple group IDs, then call list_all_stream() and
assert the returned StreamMetadata contains the original stream id (with colons)
and the expected sorted groups; target the Redis adapter code path
(list_all_stream in redis_adapter.rs) and use the same connection/publisher
setup the adapter uses so SCAN is exercised end-to-end.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 24e7ffc4-747f-47d7-84a6-1bf679b7d2ad
📒 Files selected for processing (3)
engine/src/workers/stream/adapters/kv_store.rsengine/src/workers/stream/adapters/mod.rsengine/src/workers/stream/adapters/redis_adapter.rs
|
Addressed the remaining stream adapter review items in the latest push.
Validated with:
|
There was a problem hiding this comment.
🧹 Nitpick comments (1)
engine/src/workers/stream/adapters/redis_adapter.rs (1)
409-420: UseSCANinstead ofKEYSinlist_groupsto avoid Redis blocking.Line 414 uses
KEYS, which blocks Redis under larger keyspaces.list_all_streamalready uses cursor-based scanning in the same file;list_groupsshould follow the same pattern for consistency.♻️ Proposed refactor
async fn list_groups(&self, stream_name: &str) -> anyhow::Result<Vec<String>> { let mut conn = self.publisher.lock().await; let prefix = stream_storage_prefix(stream_name); let pattern = format!("{prefix}*"); - - match conn.keys::<_, Vec<String>>(pattern).await { - Ok(keys) => Ok(keys - .into_iter() - .filter_map(|key| key.strip_prefix(&prefix).map(|s| s.to_string())) - .collect()), - Err(e) => Err(anyhow::anyhow!("Failed to list groups from Redis: {}", e)), - } + use std::collections::BTreeSet; + let mut cursor = 0u64; + let mut groups = BTreeSet::new(); + + loop { + let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN") + .arg(cursor) + .arg("MATCH") + .arg(&pattern) + .query_async(&mut *conn) + .await + .map_err(|e| anyhow::anyhow!("Failed to scan groups from Redis: {}", e))?; + + for key in keys { + if let Some(group) = key.strip_prefix(&prefix) { + groups.insert(group.to_string()); + } + } + + cursor = next_cursor; + if cursor == 0 { + break; + } + } + + Ok(groups.into_iter().collect()) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine/src/workers/stream/adapters/redis_adapter.rs` around lines 409 - 420, The list_groups function uses Redis KEYS (via conn.keys) which can block large keyspaces; replace it with a cursor-based SCAN loop like list_all_stream: acquire the same publisher lock, build the same prefix and pattern (stream_storage_prefix and pattern = format!("{prefix}*")), then iterate SCAN with MATCH pattern and a reasonable COUNT (e.g. 1000), accumulating returned keys until cursor == 0, map each key via strip_prefix(&prefix).map(|s| s.to_string()) and return the Vec<String>; propagate any Redis errors as anyhow::Error similarly to the existing Err branch.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@engine/src/workers/stream/adapters/redis_adapter.rs`:
- Around line 409-420: The list_groups function uses Redis KEYS (via conn.keys)
which can block large keyspaces; replace it with a cursor-based SCAN loop like
list_all_stream: acquire the same publisher lock, build the same prefix and
pattern (stream_storage_prefix and pattern = format!("{prefix}*")), then iterate
SCAN with MATCH pattern and a reasonable COUNT (e.g. 1000), accumulating
returned keys until cursor == 0, map each key via strip_prefix(&prefix).map(|s|
s.to_string()) and return the Vec<String>; propagate any Redis errors as
anyhow::Error similarly to the existing Err branch.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 5f2517e1-2539-487a-8127-1bdab466c554
📒 Files selected for processing (2)
engine/src/workers/stream/adapters/kv_store.rsengine/src/workers/stream/adapters/redis_adapter.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- engine/src/workers/stream/adapters/kv_store.rs
Summary
Testing
cargo test -p iii stream_storage_key_ -- --nocapturecargo test -p iii list_all_stream_preserves_stream_names_with_colons -- --nocaptureSummary by CodeRabbit
Bug Fixes
Tests