Skip to content

Preserve stream names in adapter storage keys#1419

Open
abishekgiri wants to merge 5 commits intoiii-hq:mainfrom
abishekgiri:stream-storage-key-handling
Open

Preserve stream names in adapter storage keys#1419
abishekgiri wants to merge 5 commits intoiii-hq:mainfrom
abishekgiri:stream-storage-key-handling

Conversation

@abishekgiri
Copy link
Copy Markdown
Contributor

@abishekgiri abishekgiri commented Apr 7, 2026

Summary

  • add shared stream storage key helpers so stream names containing colons round-trip correctly
  • keep group ids with colons intact when listing groups or enumerating stream metadata
  • update both the built-in KV adapter and the Redis adapter to use the shared key handling
  • add regression tests for colon-containing stream names, colon-containing group ids, and names that start with the reserved encoding prefix

Testing

  • cargo test -p iii stream_storage_key_ -- --nocapture
  • cargo test -p iii list_all_stream_preserves_stream_names_with_colons -- --nocapture

Summary by CodeRabbit

  • Bug Fixes

    • Fixed handling of stream names that include colons so streams and their groups are reliably listed and retrieved.
    • Improved preservation of stream and group identifiers during storage enumeration to prevent misclassification.
  • Tests

    • Added tests to verify correct handling of stream names and group IDs with special characters.

@vercel
Copy link
Copy Markdown
Contributor

vercel bot commented Apr 7, 2026

@abishekgiri is attempting to deploy a commit to the motia Team on Vercel.

A member of the Team first needs to authorize it.

@abishekgiri
Copy link
Copy Markdown
Contributor Author

Updated the stream storage key handling shared by the built-in KV adapter and the Redis adapter so stream names with colons are encoded safely, group ids with colons still round-trip, and list-all metadata no longer truncates names. This also adds regression coverage for colon-heavy names and the reserved encoding prefix.

@abishekgiri abishekgiri marked this pull request as ready for review April 7, 2026 00:57
Copy link
Copy Markdown
Contributor

@sergiofilhowz sergiofilhowz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a formatting issue that blocked the pipeline, mind reviewing it? should be really simple

cargo fmt --all

https://github.com/iii-hq/iii/actions/runs/24058380271/job/70316261651?pr=1419

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 8, 2026

📝 Walkthrough

Walkthrough

Replaces hardcoded stream key formatting/parsing with shared helpers that encode stream-name segments (base64 when needed) and update KV and Redis adapters to use these helpers; adds unit and integration tests validating preservation of stream names that contain colons.

Changes

Cohort / File(s) Summary
Stream Storage Key Helpers
engine/src/workers/stream/adapters/mod.rs
Add stream_storage_key(), stream_storage_prefix(), and parse_stream_storage_key() using URL-safe base64 (no padding) when stream names contain : or the reserved marker. Includes unit tests for encoding/decoding and group-id preservation.
KV Adapter
engine/src/workers/stream/adapters/kv_store.rs
Replace hardcoded stream:{name}:{group} key construction with stream_storage_key/stream_storage_prefix; use parse_stream_storage_key to extract stream/group from KV scan results; update filtering logic accordingly; add test for colon-containing stream names.
Redis Adapter
engine/src/workers/stream/adapters/redis_adapter.rs
Switch Redis key construction to stream_storage_key/stream_storage_prefix; replace colon-splitting of SCAN results with parse_stream_storage_key; add ignored async integration test ensuring stream names with : are preserved.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

🐇 I hopped through keys both near and far,

I wrapped colons in a b64 jar.
Now streams keep names as true as can be,
No more splits — just tidy keys from me! ✨

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 29.63% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the primary change: implementing shared stream storage key helpers to preserve stream names containing colons in adapter storage keys. It directly reflects the main objective and is concise and specific.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@abishekgiri
Copy link
Copy Markdown
Contributor Author

There's a formatting issue that blocked the pipeline, mind reviewing it? should be really simple

cargo fmt --all

https://github.com/iii-hq/iii/actions/runs/24058380271/job/70316261651?pr=1419

Applied the formatting fix and pushed it to this branch. cargo fmt --all -- --check passes locally now, and the Build Engine check is re-running on the updated commit.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
engine/src/workers/stream/adapters/redis_adapter.rs (1)

423-470: Consider adding a Redis-specific integration test for stream names with colons.

The KV adapter has a dedicated test (list_all_stream_preserves_stream_names_with_colons), but the Redis adapter lacks an equivalent test. While the shared helpers are unit-tested in mod.rs, an integration test for Redis would provide confidence that the SCAN-based list_all_stream correctly handles encoded keys in a real Redis environment.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@engine/src/workers/stream/adapters/redis_adapter.rs` around lines 423 - 470,
Add an integration test that exercises RedisAdapter::list_all_stream to ensure
stream names containing colons are preserved: create a test (e.g.,
list_all_stream_preserves_stream_names_with_colons_redis) that boots a real
Redis instance or uses the test Redis fixture, writes keys using the same
storage key encoding (so parse_stream_storage_key can decode them) for a stream
name containing colons and multiple group IDs, then call list_all_stream() and
assert the returned StreamMetadata contains the original stream id (with colons)
and the expected sorted groups; target the Redis adapter code path
(list_all_stream in redis_adapter.rs) and use the same connection/publisher
setup the adapter uses so SCAN is exercised end-to-end.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@engine/src/workers/stream/adapters/kv_store.rs`:
- Around line 149-154: In list_all_stream, the loop over all_keys currently
drops keys that fail parse_stream_storage_key silently; change the loop so that
when parse_stream_storage_key(&key) returns None and key.starts_with("stream:")
you emit a warning log including the offending key (e.g., via tracing::warn! or
the crate's logger) to surface unparseable keys for migration/debugging, while
preserving current behavior for non-"stream:" keys; leave
stream_map.entry(...).insert(...) logic unchanged for successfully parsed keys.

In `@engine/src/workers/stream/adapters/redis_adapter.rs`:
- Around line 443-447: The loop over keys silently skips any key where
parse_stream_storage_key(&key) returns None; add diagnostic logging for those
cases so unparseable keys aren't hidden—inside the for key in keys loop, if
parse_stream_storage_key(&key) yields None, emit a warning/debug log (e.g.,
tracing::warn! or the crate's logger) that includes the raw key value and
context (e.g., "unparseable stream storage key, skipping") so you can detect
legacy or malformed keys while still inserting parsed (stream_name, group_id)
into stream_map as currently done.

---

Nitpick comments:
In `@engine/src/workers/stream/adapters/redis_adapter.rs`:
- Around line 423-470: Add an integration test that exercises
RedisAdapter::list_all_stream to ensure stream names containing colons are
preserved: create a test (e.g.,
list_all_stream_preserves_stream_names_with_colons_redis) that boots a real
Redis instance or uses the test Redis fixture, writes keys using the same
storage key encoding (so parse_stream_storage_key can decode them) for a stream
name containing colons and multiple group IDs, then call list_all_stream() and
assert the returned StreamMetadata contains the original stream id (with colons)
and the expected sorted groups; target the Redis adapter code path
(list_all_stream in redis_adapter.rs) and use the same connection/publisher
setup the adapter uses so SCAN is exercised end-to-end.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 24e7ffc4-747f-47d7-84a6-1bf679b7d2ad

📥 Commits

Reviewing files that changed from the base of the PR and between b31d5dd and 6bc83c0.

📒 Files selected for processing (3)
  • engine/src/workers/stream/adapters/kv_store.rs
  • engine/src/workers/stream/adapters/mod.rs
  • engine/src/workers/stream/adapters/redis_adapter.rs

@abishekgiri
Copy link
Copy Markdown
Contributor Author

Addressed the remaining stream adapter review items in the latest push.

  • Added warning logs in both KV and Redis list_all_stream paths when a stream: key cannot be parsed, so legacy or malformed keys are surfaced instead of being silently skipped.
  • Added a Redis-specific regression test for colon-containing stream names to cover the SCAN-based code path. It is marked ignored because it requires a running Redis instance.

Validated with:

  • cargo fmt --all -- --check
  • cargo test -p iii stream_storage_key_ -- --nocapture
  • cargo test -p iii list_all_stream_preserves_stream_names_with_colons -- --nocapture

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
engine/src/workers/stream/adapters/redis_adapter.rs (1)

409-420: Use SCAN instead of KEYS in list_groups to avoid Redis blocking.

Line 414 uses KEYS, which blocks Redis under larger keyspaces. list_all_stream already uses cursor-based scanning in the same file; list_groups should follow the same pattern for consistency.

♻️ Proposed refactor
     async fn list_groups(&self, stream_name: &str) -> anyhow::Result<Vec<String>> {
         let mut conn = self.publisher.lock().await;
         let prefix = stream_storage_prefix(stream_name);
         let pattern = format!("{prefix}*");
-
-        match conn.keys::<_, Vec<String>>(pattern).await {
-            Ok(keys) => Ok(keys
-                .into_iter()
-                .filter_map(|key| key.strip_prefix(&prefix).map(|s| s.to_string()))
-                .collect()),
-            Err(e) => Err(anyhow::anyhow!("Failed to list groups from Redis: {}", e)),
-        }
+        use std::collections::BTreeSet;
+        let mut cursor = 0u64;
+        let mut groups = BTreeSet::new();
+
+        loop {
+            let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
+                .arg(cursor)
+                .arg("MATCH")
+                .arg(&pattern)
+                .query_async(&mut *conn)
+                .await
+                .map_err(|e| anyhow::anyhow!("Failed to scan groups from Redis: {}", e))?;
+
+            for key in keys {
+                if let Some(group) = key.strip_prefix(&prefix) {
+                    groups.insert(group.to_string());
+                }
+            }
+
+            cursor = next_cursor;
+            if cursor == 0 {
+                break;
+            }
+        }
+
+        Ok(groups.into_iter().collect())
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@engine/src/workers/stream/adapters/redis_adapter.rs` around lines 409 - 420,
The list_groups function uses Redis KEYS (via conn.keys) which can block large
keyspaces; replace it with a cursor-based SCAN loop like list_all_stream:
acquire the same publisher lock, build the same prefix and pattern
(stream_storage_prefix and pattern = format!("{prefix}*")), then iterate SCAN
with MATCH pattern and a reasonable COUNT (e.g. 1000), accumulating returned
keys until cursor == 0, map each key via strip_prefix(&prefix).map(|s|
s.to_string()) and return the Vec<String>; propagate any Redis errors as
anyhow::Error similarly to the existing Err branch.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@engine/src/workers/stream/adapters/redis_adapter.rs`:
- Around line 409-420: The list_groups function uses Redis KEYS (via conn.keys)
which can block large keyspaces; replace it with a cursor-based SCAN loop like
list_all_stream: acquire the same publisher lock, build the same prefix and
pattern (stream_storage_prefix and pattern = format!("{prefix}*")), then iterate
SCAN with MATCH pattern and a reasonable COUNT (e.g. 1000), accumulating
returned keys until cursor == 0, map each key via strip_prefix(&prefix).map(|s|
s.to_string()) and return the Vec<String>; propagate any Redis errors as
anyhow::Error similarly to the existing Err branch.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 5f2517e1-2539-487a-8127-1bdab466c554

📥 Commits

Reviewing files that changed from the base of the PR and between 6bc83c0 and 0e471dc.

📒 Files selected for processing (2)
  • engine/src/workers/stream/adapters/kv_store.rs
  • engine/src/workers/stream/adapters/redis_adapter.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • engine/src/workers/stream/adapters/kv_store.rs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants