Skip to content

Commit 7731b02

Browse files
authored
chore: Use KeyValueStoreManager instead of etcd::Client (#3822)
Signed-off-by: Graham King <[email protected]>
1 parent 6f9be59 commit 7731b02

File tree

36 files changed

+296
-279
lines changed

36 files changed

+296
-279
lines changed

lib/bindings/c/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ fn dynamo_create_kv_publisher(
154154
{
155155
Ok(drt) => {
156156
let backend = drt.namespace(namespace)?.component(component)?;
157-
KvEventPublisher::new(backend, worker_id, kv_block_size, None)
157+
KvEventPublisher::new(backend, worker_id as u64, kv_block_size, None)
158158
}
159159
Err(e) => Err(e),
160160
}

lib/bindings/python/rust/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -771,7 +771,7 @@ impl Endpoint {
771771
})
772772
}
773773

774-
fn lease_id(&self) -> i64 {
774+
fn lease_id(&self) -> u64 {
775775
self.inner
776776
.drt()
777777
.primary_lease()
@@ -807,7 +807,7 @@ impl Namespace {
807807
impl Client {
808808
/// Get list of current instances.
809809
/// Replaces endpoint_ids.
810-
fn instance_ids(&self) -> Vec<i64> {
810+
fn instance_ids(&self) -> Vec<u64> {
811811
self.router.client.instance_ids()
812812
}
813813

@@ -819,7 +819,7 @@ impl Client {
819819
inner
820820
.wait_for_instances()
821821
.await
822-
.map(|v| v.into_iter().map(|cei| cei.id()).collect::<Vec<i64>>())
822+
.map(|v| v.into_iter().map(|cei| cei.id()).collect::<Vec<u64>>())
823823
.map_err(to_pyerr)
824824
})
825825
}
@@ -920,7 +920,7 @@ impl Client {
920920
&self,
921921
py: Python<'p>,
922922
request: PyObject,
923-
instance_id: i64,
923+
instance_id: u64,
924924
annotated: Option<bool>,
925925
context: Option<context::Context>,
926926
) -> PyResult<Bound<'p, PyAny>> {

lib/bindings/python/rust/llm/kv.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ pub(crate) struct OverlapScores {
322322
#[pymethods]
323323
impl OverlapScores {
324324
#[getter]
325-
fn scores(&self) -> HashMap<(i64, u32), u32> {
325+
fn scores(&self) -> HashMap<(u64, u32), u32> {
326326
// Return scores with full WorkerWithDpRank granularity as (worker_id, dp_rank) tuples
327327
self.inner
328328
.scores

lib/llm/src/block_manager/controller/client.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,10 @@ impl ControlClient {
5959
}
6060

6161
async fn execute<T: DeserializeOwned>(&self, message: ControlMessage) -> Result<T> {
62-
let mut stream = self.client.direct(message.into(), self.instance_id).await?;
62+
let mut stream = self
63+
.client
64+
.direct(message.into(), self.instance_id as u64)
65+
.await?;
6366
let resp = stream
6467
.next()
6568
.await

lib/llm/src/discovery/watcher.rs

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -567,19 +567,22 @@ impl ModelWatcher {
567567
}
568568

569569
/// All the registered ModelDeploymentCard with the EndpointId they are attached to, one per instance
570-
pub async fn all_cards(&self) -> anyhow::Result<Vec<(EndpointId, ModelDeploymentCard)>> {
571-
let Some(etcd_client) = self.drt.etcd_client() else {
572-
anyhow::bail!("all_cards: Missing etcd client");
570+
async fn all_cards(&self) -> anyhow::Result<Vec<(EndpointId, ModelDeploymentCard)>> {
571+
let store = self.drt.store();
572+
573+
//let kvs = etcd_client.kv_get_prefix(model_card::ROOT_PATH).await?;
574+
let Some(card_bucket) = store.get_bucket(model_card::ROOT_PATH).await? else {
575+
// no cards
576+
return Ok(vec![]);
573577
};
574-
let kvs = etcd_client.kv_get_prefix(model_card::ROOT_PATH).await?;
575-
let mut results = Vec::with_capacity(kvs.len());
576-
for kv in kvs {
577-
let maybe_convert = serde_json::from_slice::<ModelDeploymentCard>(kv.value());
578-
let r = match maybe_convert {
578+
let entries = card_bucket.entries().await?;
579+
580+
let mut results = Vec::with_capacity(entries.len());
581+
for (key, card_bytes) in entries {
582+
let r = match serde_json::from_slice::<ModelDeploymentCard>(&card_bytes) {
579583
Ok(card) => {
580-
let maybe_endpoint_id = kv.key_str().map_err(|err| err.into()).and_then(|k| {
581-
etcd_key_extract(k).map(|(endpoint_id, _instance_id)| endpoint_id)
582-
});
584+
let maybe_endpoint_id =
585+
etcd_key_extract(&key).map(|(endpoint_id, _instance_id)| endpoint_id);
583586
let endpoint_id = match maybe_endpoint_id {
584587
Ok(eid) => eid,
585588
Err(err) => {
@@ -590,14 +593,8 @@ impl ModelWatcher {
590593
(endpoint_id, card)
591594
}
592595
Err(err) => {
593-
match kv.value_str() {
594-
Ok(value) => {
595-
tracing::error!(%err, value, "Invalid JSON in model card");
596-
}
597-
Err(value_str_err) => {
598-
tracing::error!(original_error=%err, %value_str_err, "Invalid UTF-8 string in model card, expected JSON");
599-
}
600-
}
596+
let value = String::from_utf8_lossy(&card_bytes);
597+
tracing::error!(%err, %value, "Invalid JSON in model card");
601598
continue;
602599
}
603600
};

lib/llm/src/discovery/worker_monitor.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ impl WorkerLoadState {
5252
/// Worker monitor for tracking KV cache usage and busy states
5353
pub struct KvWorkerMonitor {
5454
client: Arc<Client>,
55-
worker_load_states: Arc<RwLock<HashMap<i64, WorkerLoadState>>>,
55+
worker_load_states: Arc<RwLock<HashMap<u64, WorkerLoadState>>>,
5656
busy_threshold: f64,
5757
}
5858

@@ -67,7 +67,7 @@ impl KvWorkerMonitor {
6767
}
6868

6969
/// Get the worker load states for external access
70-
pub fn load_states(&self) -> Arc<RwLock<HashMap<i64, WorkerLoadState>>> {
70+
pub fn load_states(&self) -> Arc<RwLock<HashMap<u64, WorkerLoadState>>> {
7171
self.worker_load_states.clone()
7272
}
7373
}
@@ -154,7 +154,7 @@ impl WorkerLoadMonitor for KvWorkerMonitor {
154154

155155
// Recalculate all busy instances and update
156156
let states = worker_load_states.read().unwrap();
157-
let busy_instances: Vec<i64> = states
157+
let busy_instances: Vec<u64> = states
158158
.iter()
159159
.filter_map(|(&id, state)| {
160160
state.is_busy(busy_threshold).then_some(id)

lib/llm/src/entrypoint/input/http.rs

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,10 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul
6464
let http_service = match engine_config {
6565
EngineConfig::Dynamic(_) => {
6666
let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
67-
let etcd_client = distributed_runtime.etcd_client();
6867
// This allows the /health endpoint to query etcd for active instances
69-
http_service_builder = http_service_builder.with_etcd_client(etcd_client.clone());
68+
http_service_builder = http_service_builder.store(distributed_runtime.store().clone());
7069
let http_service = http_service_builder.build()?;
70+
let etcd_client = distributed_runtime.etcd_client();
7171
match etcd_client {
7272
Some(ref etcd_client) => {
7373
let router_config = engine_config.local_model().router_config();
@@ -241,17 +241,7 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul
241241
http_service.custom_backend_registry.as_ref(),
242242
) {
243243
// Create DistributedRuntime for polling, matching the engine's mode
244-
// Check if we have etcd_client to determine if we're in dynamic or static mode
245-
let drt = if http_service.state().etcd_client().is_some() {
246-
// Dynamic mode: use from_settings() which respects environment (includes etcd)
247-
DistributedRuntime::from_settings(runtime.clone()).await?
248-
} else {
249-
// Static mode: no etcd
250-
let dst_config =
251-
dynamo_runtime::distributed::DistributedConfig::from_settings(true);
252-
DistributedRuntime::new(runtime.clone(), dst_config).await?
253-
};
254-
244+
let drt = DistributedRuntime::from_settings(runtime.clone()).await?;
255245
tracing::info!(
256246
namespace_component_endpoint=%namespace_component_endpoint,
257247
polling_interval_secs=polling_interval,

lib/llm/src/http/service/service_v2.rs

Lines changed: 9 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,7 @@ use axum_server::tls_rustls::RustlsConfig;
2020
use derive_builder::Builder;
2121
use dynamo_runtime::logging::make_request_span;
2222
use dynamo_runtime::metrics::prometheus_names::name_prefix;
23-
use dynamo_runtime::storage::key_value_store::EtcdStore;
24-
use dynamo_runtime::storage::key_value_store::KeyValueStore;
25-
use dynamo_runtime::storage::key_value_store::MemoryStore;
26-
use dynamo_runtime::transports::etcd;
23+
use dynamo_runtime::storage::key_value_store::KeyValueStoreManager;
2724
use std::net::SocketAddr;
2825
use tokio::task::JoinHandle;
2926
use tokio_util::sync::CancellationToken;
@@ -33,8 +30,7 @@ use tower_http::trace::TraceLayer;
3330
pub struct State {
3431
metrics: Arc<Metrics>,
3532
manager: Arc<ModelManager>,
36-
etcd_client: Option<etcd::Client>,
37-
store: Arc<dyn KeyValueStore>,
33+
store: KeyValueStoreManager,
3834
flags: StateFlags,
3935
}
4036

@@ -75,12 +71,11 @@ impl StateFlags {
7571
}
7672

7773
impl State {
78-
pub fn new(manager: Arc<ModelManager>) -> Self {
74+
pub fn new(manager: Arc<ModelManager>, store: KeyValueStoreManager) -> Self {
7975
Self {
8076
manager,
8177
metrics: Arc::new(Metrics::default()),
82-
etcd_client: None,
83-
store: Arc::new(MemoryStore::new()),
78+
store,
8479
flags: StateFlags {
8580
chat_endpoints_enabled: AtomicBool::new(false),
8681
cmpl_endpoints_enabled: AtomicBool::new(false),
@@ -90,20 +85,6 @@ impl State {
9085
}
9186
}
9287

93-
pub fn new_with_etcd(manager: Arc<ModelManager>, etcd_client: etcd::Client) -> Self {
94-
Self {
95-
manager,
96-
metrics: Arc::new(Metrics::default()),
97-
store: Arc::new(EtcdStore::new(etcd_client.clone())),
98-
etcd_client: Some(etcd_client),
99-
flags: StateFlags {
100-
chat_endpoints_enabled: AtomicBool::new(false),
101-
cmpl_endpoints_enabled: AtomicBool::new(false),
102-
embeddings_endpoints_enabled: AtomicBool::new(false),
103-
responses_endpoints_enabled: AtomicBool::new(false),
104-
},
105-
}
106-
}
10788
/// Get the Prometheus [`Metrics`] object which tracks request counts and inflight requests
10889
pub fn metrics_clone(&self) -> Arc<Metrics> {
10990
self.metrics.clone()
@@ -117,12 +98,8 @@ impl State {
11798
self.manager.clone()
11899
}
119100

120-
pub fn etcd_client(&self) -> Option<&etcd::Client> {
121-
self.etcd_client.as_ref()
122-
}
123-
124-
pub fn store(&self) -> Arc<dyn KeyValueStore> {
125-
self.store.clone()
101+
pub fn store(&self) -> &KeyValueStoreManager {
102+
&self.store
126103
}
127104

128105
// TODO
@@ -186,8 +163,8 @@ pub struct HttpServiceConfig {
186163
#[builder(default = "None")]
187164
request_template: Option<RequestTemplate>,
188165

189-
#[builder(default = "None")]
190-
etcd_client: Option<etcd::Client>,
166+
#[builder(default)]
167+
store: KeyValueStoreManager,
191168

192169
// DEPRECATED: To be removed after custom backends migrate to Dynamo backend.
193170
#[builder(default = "None")]
@@ -335,10 +312,7 @@ impl HttpServiceConfigBuilder {
335312
let config: HttpServiceConfig = self.build_internal()?;
336313

337314
let model_manager = Arc::new(ModelManager::new());
338-
let state = match config.etcd_client {
339-
Some(etcd_client) => Arc::new(State::new_with_etcd(model_manager, etcd_client)),
340-
None => Arc::new(State::new(model_manager)),
341-
};
315+
let state = Arc::new(State::new(model_manager, config.store));
342316
state
343317
.flags
344318
.set(&EndpointType::Chat, config.enable_chat_endpoints);
@@ -422,11 +396,6 @@ impl HttpServiceConfigBuilder {
422396
self
423397
}
424398

425-
pub fn with_etcd_client(mut self, etcd_client: Option<etcd::Client>) -> Self {
426-
self.etcd_client = Some(etcd_client);
427-
self
428-
}
429-
430399
// DEPRECATED: To be removed after custom backends migrate to Dynamo backend.
431400
pub fn with_custom_backend_config(
432401
mut self,

lib/llm/src/kv_router/protocols.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize};
66
use uuid::Uuid;
77

88
/// A worker identifier.
9-
pub type WorkerId = i64;
9+
pub type WorkerId = u64;
1010

1111
/// A data parallel rank identifier.
1212
pub type DpRank = u32;

lib/llm/src/kv_router/publisher.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ pub struct KvEventPublisher {
9797
impl KvEventPublisher {
9898
pub fn new(
9999
component: Component,
100-
worker_id: i64,
100+
worker_id: u64,
101101
kv_block_size: u32,
102102
source_config: Option<KvEventSourceConfig>,
103103
) -> Result<Self> {
@@ -174,7 +174,7 @@ impl Drop for KvEventPublisher {
174174

175175
async fn start_event_processor<P: EventPublisher + Send + Sync + 'static>(
176176
publisher: P,
177-
worker_id: i64,
177+
worker_id: u64,
178178
cancellation_token: CancellationToken,
179179
mut rx: mpsc::UnboundedReceiver<KvCacheEvent>,
180180
) {
@@ -801,7 +801,7 @@ impl WorkerMetricsPublisher {
801801
///
802802
/// This task monitors metric changes (specifically kv_active_blocks and num_requests_waiting)
803803
/// and publishes stable metrics to NATS after they've been unchanged for 1ms.
804-
fn start_nats_metrics_publishing(&self, namespace: Namespace, worker_id: i64) {
804+
fn start_nats_metrics_publishing(&self, namespace: Namespace, worker_id: u64) {
805805
let nats_rx = self.rx.clone();
806806

807807
tokio::spawn(async move {

0 commit comments

Comments
 (0)