-
Couldn't load subscription status.
- Fork 661
chore(runtime): Do not expose etcd lease ID #3915
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe pull request replaces lease-based identifiers with connection-based identifiers throughout the codebase. This involves removing the Lease abstraction, updating the Endpoint API from lease_id() to connection_id(), removing the discovery module, and refactoring lease lifecycle management in the etcd transport layer. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Poem
Pre-merge checks❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
lib/runtime/src/transports/etcd.rs (2)
258-270: Apply conditional lease handling to all lease-using functions, not just lock().The suggested refactoring is correct: when
lease_idis 0 (which occurs whenconfig.attach_leaseis false), passingNoneinstead ofSome(options)is more semantically appropriate. However, the same issue affects three other functions that use identical patterns:
kv_create(line 117)kv_create_or_validate(line 147)kv_put(line 198)For consistency and correctness, apply this conditional pattern to all four functions that handle leases, not just
lock().
208-217: Conditionally attach lease only when > 0 across all affected functions (kv_create, kv_create_or_validate, kv_put, kv_put_with_options, lock).Lease ID 0 is NoLease in etcd semantics, representing absence of a lease and is not a valid lease identifier. The current code unconditionally passes
with_lease(0)whenself.primary_leaseis 0 (whenconfig.attach_leaseis disabled), which is semantically incorrect.Apply conditional checks to all five functions:
- Line 118 (kv_create)
- Line 148 (kv_create_or_validate)
- Line 199 (kv_put)
- Line 216 (kv_put_with_options)
- Line 265 (lock)
Each should check
if lease > 0before callingwith_lease(), otherwise omit the lease parameter entirely (passingNonefor options or not attaching the lease).
🧹 Nitpick comments (9)
lib/llm/src/mocker/engine.rs (1)
250-250: Remove commented-out debug code.This commented line appears to be leftover from debugging or testing and should be removed to keep the codebase clean.
Apply this diff:
let worker_id = comp.drt().connection_id(); - // let worker_id = 0; tracing::debug!("Worker_id set to: {worker_id}");lib/bindings/python/rust/lib.rs (1)
776-779: Expose a temporary backward‑compat alias for lease_id().To reduce downstream breakage, consider adding a deprecated alias that forwards to connection_id(). Example:
#[pymethods] impl Endpoint { // Opaque unique ID for this worker. May change over worker lifetime. fn connection_id(&self) -> u64 { self.inner.drt().connection_id() } + /// [DEPRECATED] Use connection_id() + fn lease_id(&self) -> u64 { + self.connection_id() + } }lib/runtime/src/component/endpoint.rs (2)
64-69: Naming mismatch: etcd_path_with_lease_id now takes a connection_id.Low risk but confusing. Consider a clearer alias to avoid “lease” wording drift.
For example, in lib/runtime/src/component.rs add:
pub fn etcd_path_with_instance_id(&self, instance_id: u64) -> String { self.etcd_path_with_lease_id(instance_id) }Then use the alias here.
198-221: Avoid attaching an etcd lease when connection_id is 0; fail fast or skip lease.If attach_lease=false ever yields connection_id==0 while etcd_client is Some, with_lease(0) may be invalid. Guard it:
- if let Some(etcd_client) = &etcd_client - && let Err(e) = etcd_client - .kv_create(&etcd_path, info, Some(connection_id)) + let lease_opt = if connection_id == 0 { None } else { Some(connection_id) }; + if let Some(etcd_client) = &etcd_client + && let Err(e) = etcd_client + .kv_create(&etcd_path, info, lease_opt) .await { tracing::error!( ... ); runtime_shutdown_token.cancel(); return Err(error!("Unable to register service for discovery. Check discovery service status")); }Optionally also add:
- let connection_id = endpoint.drt().connection_id(); + let connection_id = endpoint.drt().connection_id(); + debug_assert!(connection_id != 0, "connection_id should be non-zero when registering in etcd");lib/runtime/src/transports/etcd.rs (1)
491-500: TODO comment can be updated to reflect connection_id semantics.The comment still says “proper lease handling.” Consider clarifying that absence of a lease (connection_id==0) should write keys without a lease.
lib/runtime/src/transports/etcd/lease.rs (4)
9-11: Docs are stale and grammatical fix required.Mentions returning a Lease, but API returns a u64 lease_id; also “it's” → “its”. Update for accuracy.
Apply:
-/// Create a [`Lease`] with a given time-to-live (TTL) attached to the [`CancellationToken`] and -/// start it's keep-alive thread. +/// Create an etcd lease with the given TTL, attach it to the provided cancellation token, +/// spawn a keep-alive task, and return the lease id (u64). +/// +/// Note: this function spawns a background task that maintains the lease until the token is +/// cancelled or an unrecoverable error occurs.
81-82: Redundant binding before?.
let _ = client.revoke(...).await?;discards the value but still propagates errors. Just await the call.Apply:
- let _ = client.revoke(lease_id as i64).await?; + client.revoke(lease_id as i64).await?;
38-42: Fix incomplete doc comment.Stray “/// If” reads as a fragment. Tighten the section to clearly state behavior on error/cancellation.
Apply:
/// Task to keep leases alive. /// -/// If this task returns an error, the cancellation token will be invoked on the runtime. -/// If +/// On error, the parent cancellation token is triggered to stop the runtime. +/// On token cancellation, the lease is revoked and the task exits Ok(()).
66-66: Tracing fields: prefer explicit formatting for clarity.Use
%lease_idorlease_id = lease_idso logs consistently render the id.Apply, e.g.:
- tracing::trace!(lease_id, "keep alive response received: {:?}", resp); + tracing::trace!(lease_id = %lease_id, "keep alive response received: {:?}", resp); @@ - tracing::trace!(lease_id, "cancellation token triggered; revoking lease"); + tracing::trace!(lease_id = %lease_id, "cancellation token triggered; revoking lease"); @@ - tracing::trace!(lease_id, "sending keep alive"); + tracing::trace!(lease_id = %lease_id, "sending keep alive");Also applies to: 80-80, 86-86
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (18)
components/src/dynamo/sglang/publisher.py(1 hunks)components/src/dynamo/trtllm/main.py(1 hunks)components/src/dynamo/vllm/main.py(1 hunks)examples/multimodal/components/worker.py(1 hunks)lib/bindings/python/rust/lib.rs(1 hunks)lib/bindings/python/rust/llm/kv.rs(1 hunks)lib/bindings/python/src/dynamo/_core.pyi(1 hunks)lib/llm/src/kv_router.rs(1 hunks)lib/llm/src/kv_router/publisher.rs(1 hunks)lib/llm/src/mocker/engine.rs(1 hunks)lib/runtime/src/component.rs(0 hunks)lib/runtime/src/component/endpoint.rs(8 hunks)lib/runtime/src/discovery.rs(0 hunks)lib/runtime/src/distributed.rs(1 hunks)lib/runtime/src/lib.rs(0 hunks)lib/runtime/src/storage/key_value_store/etcd.rs(2 hunks)lib/runtime/src/transports/etcd.rs(3 hunks)lib/runtime/src/transports/etcd/lease.rs(5 hunks)
💤 Files with no reviewable changes (3)
- lib/runtime/src/component.rs
- lib/runtime/src/lib.rs
- lib/runtime/src/discovery.rs
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-06-05T01:04:24.775Z
Learnt from: PeaBrane
PR: ai-dynamo/dynamo#1392
File: launch/dynamo-run/src/subprocess/vllm_v1_inc.py:71-71
Timestamp: 2025-06-05T01:04:24.775Z
Learning: The `create_endpoint` method in `WorkerMetricsPublisher` has backward compatibility maintained through pyo3 signature annotation `#[pyo3(signature = (component, dp_rank = None))]`, making the `dp_rank` parameter optional with a default value of `None`.
Applied to files:
lib/llm/src/kv_router/publisher.rs
🧬 Code graph analysis (10)
lib/runtime/src/storage/key_value_store/etcd.rs (1)
lib/runtime/src/transports/etcd.rs (2)
new(61-104)new(476-519)
lib/bindings/python/src/dynamo/_core.pyi (3)
lib/bindings/python/rust/lib.rs (1)
connection_id(777-779)lib/runtime/src/distributed.rs (1)
connection_id(213-215)lib/runtime/src/storage/key_value_store/etcd.rs (1)
connection_id(54-56)
lib/runtime/src/component/endpoint.rs (8)
lib/bindings/python/rust/lib.rs (2)
connection_id(777-779)endpoint(678-684)lib/bindings/python/src/dynamo/_core.pyi (2)
connection_id(160-164)endpoint(117-121)lib/runtime/src/distributed.rs (1)
connection_id(213-215)lib/runtime/src/storage/key_value_store/etcd.rs (1)
connection_id(54-56)lib/runtime/src/storage/key_value_store.rs (3)
connection_id(114-114)connection_id(161-168)connection_id(210-212)lib/runtime/src/storage/key_value_store/mem.rs (1)
connection_id(107-109)lib/runtime/src/storage/key_value_store/nats.rs (1)
connection_id(52-54)lib/runtime/src/component.rs (6)
endpoint(270-278)etcd_path_with_lease_id(542-544)subject(577-579)etcd_path(253-256)etcd_path(532-539)etcd_path(682-684)
components/src/dynamo/trtllm/main.py (4)
lib/bindings/python/rust/lib.rs (2)
endpoint(678-684)connection_id(777-779)lib/bindings/python/src/dynamo/_core.pyi (2)
endpoint(117-121)connection_id(160-164)lib/runtime/src/distributed.rs (1)
connection_id(213-215)lib/runtime/src/storage/key_value_store/etcd.rs (1)
connection_id(54-56)
lib/bindings/python/rust/lib.rs (6)
lib/bindings/python/src/dynamo/_core.pyi (1)
connection_id(160-164)lib/runtime/src/distributed.rs (1)
connection_id(213-215)lib/runtime/src/storage/key_value_store/etcd.rs (1)
connection_id(54-56)lib/runtime/src/storage/key_value_store.rs (3)
connection_id(114-114)connection_id(161-168)connection_id(210-212)lib/runtime/src/storage/key_value_store/mem.rs (1)
connection_id(107-109)lib/runtime/src/storage/key_value_store/nats.rs (1)
connection_id(52-54)
lib/runtime/src/distributed.rs (6)
lib/bindings/python/rust/lib.rs (1)
connection_id(777-779)lib/bindings/python/src/dynamo/_core.pyi (1)
connection_id(160-164)lib/runtime/src/storage/key_value_store/etcd.rs (1)
connection_id(54-56)lib/runtime/src/storage/key_value_store.rs (3)
connection_id(114-114)connection_id(161-168)connection_id(210-212)lib/runtime/src/storage/key_value_store/mem.rs (1)
connection_id(107-109)lib/runtime/src/storage/key_value_store/nats.rs (1)
connection_id(52-54)
components/src/dynamo/vllm/main.py (4)
lib/bindings/python/rust/lib.rs (1)
connection_id(777-779)lib/bindings/python/src/dynamo/_core.pyi (1)
connection_id(160-164)lib/runtime/src/distributed.rs (1)
connection_id(213-215)lib/runtime/src/storage/key_value_store/etcd.rs (1)
connection_id(54-56)
components/src/dynamo/sglang/publisher.py (4)
lib/bindings/python/rust/lib.rs (1)
connection_id(777-779)lib/bindings/python/src/dynamo/_core.pyi (1)
connection_id(160-164)lib/runtime/src/distributed.rs (1)
connection_id(213-215)lib/runtime/src/storage/key_value_store/etcd.rs (1)
connection_id(54-56)
examples/multimodal/components/worker.py (4)
lib/bindings/python/rust/lib.rs (2)
endpoint(678-684)connection_id(777-779)lib/bindings/python/src/dynamo/_core.pyi (2)
endpoint(117-121)connection_id(160-164)lib/runtime/src/distributed.rs (1)
connection_id(213-215)lib/runtime/src/storage/key_value_store/etcd.rs (1)
connection_id(54-56)
lib/runtime/src/transports/etcd.rs (1)
lib/runtime/src/transports/etcd/lease.rs (1)
create_lease(11-36)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
- GitHub Check: vllm (arm64)
- GitHub Check: trtllm (arm64)
- GitHub Check: operator (arm64)
- GitHub Check: sglang
- GitHub Check: clippy (.)
- GitHub Check: tests (lib/runtime/examples)
- GitHub Check: tests (launch/dynamo-run)
- GitHub Check: clippy (launch/dynamo-run)
- GitHub Check: tests (.)
- GitHub Check: tests (lib/bindings/python)
- GitHub Check: Build and Test - dynamo
🔇 Additional comments (15)
lib/llm/src/mocker/engine.rs (1)
249-249: LGTM! Correct migration to connection-based identifier.The change from lease-based to connection-based worker identification aligns with the PR objectives and correctly uses the new
connection_id()API on DistributedRuntime.lib/llm/src/kv_router/publisher.rs (1)
787-787: LGTM! Clean migration to connection-based identifier.The change from lease-based to connection-based worker ID is consistent with the PR's objectives and maintains the same functionality.
lib/llm/src/kv_router.rs (1)
227-227: LGTM! Simplified token acquisition.Removing the intermediate
primary_lease()call streamlines the code while maintaining the same functionality.components/src/dynamo/vllm/main.py (1)
138-138: LGTM! Consistent API migration.The change from
lease_id()toconnection_id()aligns with the new Endpoint API and maintains the same worker identification functionality.components/src/dynamo/trtllm/main.py (1)
424-424: LGTM! Consistent with API changes.The migration to
connection_id()matches the pattern applied across other components.examples/multimodal/components/worker.py (1)
166-166: LGTM! Consistent API update.The change aligns with the connection-based identifier migration applied throughout the codebase.
lib/runtime/src/storage/key_value_store/etcd.rs (1)
177-177: LGTM! Simplified lease ID access for etcd operations.The change from
primary_lease().id()tolease_id()removes an intermediate abstraction while maintaining the same functionality for associating etcd keys with leases.Also applies to: 246-246
lib/bindings/python/rust/llm/kv.rs (1)
994-1002: LGTM! Removal of lease validation aligns with API changes.The removed check for
primary_lease()existence is consistent with removing the Lease abstraction from the public API. Since leases still exist internally, any actual lease-related issues will be caught downstream in the KvRouter creation flow.lib/runtime/src/distributed.rs (1)
213-215: LGTM! Well-designed public API for connection identification.The
connection_id()method provides a clean abstraction by delegating to the underlying store, which is appropriate since the connection ID is tied to the storage backend (etcd lease, NATS client, or memory).components/src/dynamo/sglang/publisher.py (1)
126-131: Good switch to connection_id; align Python stub to sync.This aligns with the runtime’s new identifier. However, lib/bindings/python/src/dynamo/_core.pyi declares Endpoint.connection_id() as async, while the runtime method is synchronous. Please update the stub to a synchronous def to avoid type-checker confusion. Based on relevant snippets.
lib/runtime/src/component/endpoint.rs (1)
156-161: LGTM on cancellation semantics simplification.Using only runtime_shutdown_token is fine since lease keepalive now cancels the primary token on failure, propagating to this child.
lib/runtime/src/transports/etcd.rs (3)
141-165: Verify claim that conditional lease handling already exists elsewhere.The review states "Same conditional lease handling" but after exhaustive searching:
kv_create(line 116) uses unconditional:PutOptions::new().with_lease(id as i64)kv_put(line 192) uses unconditional:PutOptions::new().with_lease(id as i64)kv_put_with_options(line 208) uses unconditional:.with_lease(self.lease_id() as i64)- No method across
etcd.rs,key_value_store/etcd.rs, orlock.rsapplies the proposed conditional.then(||)patternThe suggested conditional pattern
(id > 0).then(|| PutOptions::new().with_lease(id as i64))appears to be a new refactoring, not an existing pattern elsewhere. If this change is desirable, the same logic should likely apply tokv_createandkv_putfor consistency. Verify intent: Is this a bug fix or aspirational refactoring that should span multiple methods?
192-206: Address the inconsistency in conditional lease handling across related methods.The suggested change for
kv_putis technically correct—theput()method acceptsOption<PutOptions>directly, so(id > 0).then(|| PutOptions::new().with_lease(id as i64))is valid and clearer than always creating PutOptions with a zero lease. However, three other methods use the identical current pattern:
kv_create(lines 116–118): unconditional PutOptionskv_create_or_validate(lines 147–148): unconditional PutOptionslock()(line 264): unconditional LockOptionsFor consistency, verify whether all similar methods should also apply conditional lease handling when
id > 0, or confirm thatkv_putalone requires this change. If updating all, prioritizekv_createandkv_create_or_validatefirst since they follow the same call pattern.
116-126: Apply lease guard consistently across all kv operations; current fix incomplete.The proposed fix correctly changes
put_optionsfromPutOptionstoOption<PutOptions>using(id > 0).then(...), matching theTxnOp::putAPI signature. However, the same pattern (unconditionally applyingwith_lease(id as i64)whereidcould be 0) appears in at least two other methods:
kv_create_or_validate(line 148)kv_put(line 199)Apply the lease guard consistently across all three methods for uniformity. Additionally, verify whether
self.lease_id()can legitimately return 0 in your initialization logic to ensure this guard is necessary.lib/runtime/src/transports/etcd/lease.rs (1)
11-16: Based on the verification through code inspection and web search onCancellationTokensemantics, the review comment is valid and addresses a real architectural issue. The standard pattern for using CancellationToken in multiple contexts is to clone the token and pass clones to different tasks. The currentcreate_leasefunction consumes the token, preventing the caller from using it elsewhere.However, examining the actual codebase:
- The function does already create a child token (line 20:
let child = token.child_token())- The spawned task uses this child token for cancellation monitoring
- But the original token is still moved into the task and called with
.cancel()on error (line 30)The design issue is real: if the keep-alive task encounters an error, it cancels the original parent token, which will cascade cancellation to all other operations sharing that token. The proposed refactoring improves this by:
- Taking a reference instead (non-breaking for callers already cloning)
- Creating a clone internally for the error-handling task
- Allowing the parent token to remain under the caller's control
The review comment is technically sound and addresses a valid design concern. The proposed fix is appropriate for the stated use case.
Token ownership: create_lease currently consumes the caller's CancellationToken.
The function takes owned
tokenand moves it into a spawned async task, preventing reuse by the caller. If the keep-alive task encounters an error, it callstoken.cancel(), which cascades to all operations using that token.Change the signature to take
&CancellationTokenand clone internally within the spawned task:pub async fn create_lease( mut lease_client: LeaseClient, ttl: u64, - token: CancellationToken, + token: &CancellationToken, ) -> anyhow::Result<u64> { let lease = lease_client.grant(ttl as i64, None).await?; let id = lease.id() as u64; let ttl = lease.ttl() as u64; let child = token.child_token(); + let parent = token.clone(); tokio::spawn(async move { match keep_alive(lease_client, id, ttl, child).await { Ok(_) => tracing::trace!("keep alive task exited successfully"), Err(e) => { tracing::error!(error = %e, "Unable to maintain lease. Check etcd server status"); - token.cancel(); + parent.cancel(); } } }); Ok(id) }This preserves the caller's token for broader shutdown orchestration while still allowing the keep-alive task to signal its own failure.
2f767d4 to
37a0a8e
Compare
Instead there is a `connection_id() -> u64` method on `DistributedRuntime`, which is always present. Remove the `Lease` object and the unused lease related methods. Also delete legacy unused `DiscoveryClient`. Signed-off-by: Graham King <[email protected]>
Signed-off-by: Graham King <[email protected]>
Signed-off-by: Graham King <[email protected]>
Signed-off-by: Graham King <[email protected]>
7b7902f to
0e0a591
Compare
Lost in rebase Signed-off-by: Graham King <[email protected]>
Instead there is a
connection_id() -> u64method onDistributedRuntime, which is always present. Remove theLeaseobject and the unused lease related methods.Also delete legacy unused
DiscoveryClient.Another step towards making etcd optional.
Summary by CodeRabbit
Release Notes
Breaking Changes
Endpoint.lease_id()method replaced withEndpoint.connection_id()returning an opaque, time-variable worker identifier. Existing code using lease-based identifiers requires updates.Improvements