Skip to content

Commit 9f78eea

Browse files
authored
fix(server): correct VSR client session routing and lifecycle (#3407)
- Drop replicated sessions cluster-wide on disconnect. A lost connection now routes a synthetic `Logout` to the metadata owner, so the replicated session slot is released everywhere instead of lingering until peer eviction. - Mutate the SessionManager only after the register commits. Login/register no longer flips the connection to `Authenticated` before submit (with rollback on failure); it submits, awaits the commit (post-quorum), then transitions `Connected → Bound` in one step. A transient submit failure leaves the connection untouched and the SDK replays. - Split the`bootstrap.rs` into focused modules — `wire`, `pat`, `responses`, `auth`, `dispatch` — leaving `bootstrap.rs` with cluster boot/recovery/startup only. The request handlers, owner-forwarders (`submit_*_on_owner`), wire-response builders, and credential verification now live next to each other.
1 parent 5858410 commit 9f78eea

17 files changed

Lines changed: 2509 additions & 2295 deletions

File tree

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,6 @@ compio = { version = "=0.19.0", features = [
123123
"fs",
124124
] }
125125
compio-buf = "0.8.2"
126-
# Pin compio-driver >= 0.11.2 to fix musl compilation (compio-rs/compio#668)
127126
compio-driver = "0.12.1"
128127
compio-quic = "=0.8.0"
129128
compio-ws = "=0.4.0"

core/consensus/src/metadata_helpers.rs

Lines changed: 102 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,44 @@ use crate::client_table::{ClientTable, REGISTER_REQUEST_ID, RequestStatus};
2323
use crate::{Consensus, Pipeline, PipelineEntry, VsrConsensus};
2424
use iggy_binary_protocol::{EvictionHeader, EvictionReason, HEADER_SIZE};
2525
use message_bus::MessageBus;
26-
use server_common::Message;
26+
use server_common::iobuf::Frozen;
27+
use server_common::{MESSAGE_ALIGN, Message};
2728
use std::cell::RefCell;
2829

30+
/// What [`request_preflight`] decided, without touching the wire itself.
31+
///
32+
/// The preflight runs on shard 0 (the metadata owner), but the client's
33+
/// transport connection lives on its home shard. Sending a resend/eviction
34+
/// from here would route by the VSR consensus `client_id`, whose top bits are
35+
/// random and carry no home-shard routing -- so it (almost) never reaches the
36+
/// client. Returning the decision instead lets the home shard
37+
/// (`submit_request_in_process` -> `handle_client_request`) emit the frame by
38+
/// transport id, exactly like a fresh commit.
39+
pub enum PreflightOutcome {
40+
/// New (client, request): dispatch a fresh prepare through consensus.
41+
Dispatch,
42+
/// Duplicate retry: resend this cached committed reply (wire bytes).
43+
Replay(Frozen<MESSAGE_ALIGN>),
44+
/// Session gone (`NoSession`) or rotated past the retry (`SessionTooLow`):
45+
/// the client must be told with an eviction frame.
46+
Evict(EvictionReason),
47+
/// Absorbed with nothing to send: in-flight prepare, not-caught-up
48+
/// primary, stale/gap retry, or a client-bug newer session.
49+
Drop,
50+
}
51+
2952
/// Request preflight (metadata only): session validation, dedup, in-flight check.
3053
///
31-
/// # Returns
32-
/// `true` -> dispatch through consensus. `false` -> absorbed here
33-
/// (cached reply resent / duplicate dropped / eviction sent).
34-
#[allow(clippy::future_not_send)]
35-
pub async fn request_preflight<B, P>(
54+
/// Pure decision -- emits no frames (see [`PreflightOutcome`]). Callers turn
55+
/// the outcome into a reply: the home-shard path resends by transport id, the
56+
/// message-plane paths fall back to [`apply_preflight_consensus_plane`].
57+
pub fn request_preflight<B, P>(
3658
consensus: &VsrConsensus<B, P>,
3759
client_table: &RefCell<ClientTable>,
3860
client_id: u128,
3961
session: u64,
4062
request: u64,
41-
) -> bool
63+
) -> PreflightOutcome
4264
where
4365
B: MessageBus,
4466
P: Pipeline<Entry = PipelineEntry>,
@@ -55,7 +77,7 @@ where
5577
request,
5678
"request_preflight: in-flight prepare, drop"
5779
);
58-
return false;
80+
return PreflightOutcome::Drop;
5981
}
6082

6183
// Catch-up gate: stale ClientTable on a new primary could return `New`
@@ -73,77 +95,82 @@ where
7395
commit_max = consensus.commit_max(),
7496
"request_preflight: not caught up, drop"
7597
);
76-
return false;
98+
return PreflightOutcome::Drop;
7799
}
78100

79101
let status = client_table
80102
.borrow()
81103
.check_request(client_id, session, request);
82104
match status {
105+
// Frozen-backed cache -> refcount handoff to the home shard, no copy.
83106
RequestStatus::Duplicate(cached_reply) => {
84-
// TODO(vsr-resend-misroute): `client_id` here is the VSR
85-
// consensus id (the random u128 the SDK mints), but
86-
// `send_to_client` routes by the top 16 bits, which only carry
87-
// home-shard bits for *transport* ids. A VSR id's top bits are
88-
// random, so this resend routes to a garbage shard (~never the
89-
// owning one, even single-shard) -> no registry slot -> dropped.
90-
// The client then never receives the cached reply, retries the
91-
// same (client, request), hits Duplicate again, and livelocks
92-
// until the session times out. shard 0 cannot reconstruct the
93-
// VSR->transport mapping, so the fix is to return the Duplicate
94-
// outcome to the home shard and let it resend the cached body by
95-
// transport id (mirroring the fresh-commit path in
96-
// `submit_request_in_process` / `handle_client_request`). Must
97-
// land before VSR client traffic goes past tier-1; not reached by
98-
// the single-shard tcp happy path the tier-1 suite covers.
99-
// Best-effort resend (client may have disconnected). Frozen-backed
100-
// cache -> refcount handoff, no copy.
101-
let _ = consensus
102-
.message_bus()
103-
.send_to_client(client_id, cached_reply.into_wire_bytes())
104-
.await;
105-
false
106-
}
107-
RequestStatus::NoSession => {
108-
// Session evicted under capacity pressure. SAFETY: catch-up
109-
// gate makes this replica authoritative for session truth.
110-
// TODO(vsr-resend-misroute): `send_eviction_to_client` resends by
111-
// the VSR consensus `client_id` and misroutes exactly like the
112-
// Duplicate arm above -- the client never learns it was evicted.
113-
// Same fix: route the eviction through the home shard by transport
114-
// id. See the Duplicate arm for the full rationale.
115-
send_eviction_to_client(consensus, client_id, EvictionReason::NoSession).await;
116-
false
107+
PreflightOutcome::Replay(cached_reply.into_wire_bytes())
117108
}
109+
// Session evicted under capacity pressure. SAFETY: catch-up gate makes
110+
// this replica authoritative for session truth.
111+
RequestStatus::NoSession => PreflightOutcome::Evict(EvictionReason::NoSession),
118112
RequestStatus::SessionMismatch { expected, received } => {
119113
// expected > received: stale session (rotated post-eviction) -> terminal eviction.
120114
// expected < received: client bug; silent drop, log.
121115
// SAFETY: catch-up gate makes this replica authoritative.
122116
if expected > received {
123-
// TODO(vsr-resend-misroute): same VSR-id misroute as the
124-
// Duplicate / NoSession arms -- the eviction is sent by the
125-
// consensus client_id and never reaches the client. See the
126-
// Duplicate arm for the full rationale + fix.
127-
send_eviction_to_client(consensus, client_id, EvictionReason::SessionTooLow).await;
117+
PreflightOutcome::Evict(EvictionReason::SessionTooLow)
128118
} else {
129119
// Catch-up gate rules out network race; newer-than-issued
130-
// session = client bug. Error log,
131-
// no eviction (transient bug must not kill session), no
132-
// rate limit (per-event).
120+
// session = client bug. Error log, no eviction (transient bug
121+
// must not kill session), no rate limit (per-event).
133122
tracing::error!(
134123
client_id,
135124
expected,
136125
received,
137126
"request_preflight: ignoring newer session (client bug)"
138127
);
128+
PreflightOutcome::Drop
139129
}
140-
false
141130
}
142131
// Client bug; recovered by client retry. Silent drop.
143132
RequestStatus::Stale
144133
| RequestStatus::RequestGap { .. }
145-
| RequestStatus::AlreadyRegistered { .. } => false,
146-
RequestStatus::New => true,
134+
| RequestStatus::AlreadyRegistered { .. } => PreflightOutcome::Drop,
135+
RequestStatus::New => PreflightOutcome::Dispatch,
136+
}
137+
}
138+
139+
/// Message-plane fallback for [`request_preflight`]: best-effort resend by the
140+
/// consensus `client_id`.
141+
///
142+
/// The wire-path ingress (`on_request`) and the queued retry drain have no
143+
/// home-shard transport context to route through, so they apply the outcome
144+
/// here. Returns `true` iff the caller should dispatch a fresh prepare.
145+
///
146+
/// Delivery is best-effort: a VSR consensus id's top bits are random, so
147+
/// `send_to_client` may not reach the client. The in-process client path
148+
/// (`submit_request_in_process`) instead carries the outcome back to the home
149+
/// shard and resends by transport id.
150+
#[allow(clippy::future_not_send)]
151+
pub async fn apply_preflight_consensus_plane<B, P>(
152+
consensus: &VsrConsensus<B, P>,
153+
outcome: PreflightOutcome,
154+
client_id: u128,
155+
) -> bool
156+
where
157+
B: MessageBus,
158+
P: Pipeline<Entry = PipelineEntry>,
159+
{
160+
match outcome {
161+
PreflightOutcome::Dispatch => true,
162+
PreflightOutcome::Replay(reply) => {
163+
let _ = consensus
164+
.message_bus()
165+
.send_to_client(client_id, reply)
166+
.await;
167+
false
168+
}
169+
PreflightOutcome::Evict(reason) => {
170+
send_eviction_to_client(consensus, client_id, reason).await;
171+
false
172+
}
173+
PreflightOutcome::Drop => false,
147174
}
148175
}
149176

@@ -339,7 +366,6 @@ mod tests {
339366
use crate::{CLIENTS_TABLE_MAX, LocalPipeline};
340367
use iggy_binary_protocol::{Command2, Operation, ReplyHeader};
341368
use message_bus::SendError;
342-
use server_common::{MESSAGE_ALIGN, iobuf::Frozen};
343369

344370
/// Production-sized `ClientTable`.
345371
fn fresh_client_table() -> RefCell<ClientTable> {
@@ -464,12 +490,16 @@ mod tests {
464490

465491
let client_id: u128 = 0xCAFE;
466492

467-
let result = futures::executor::block_on(request_preflight(
493+
let result = futures::executor::block_on(apply_preflight_consensus_plane(
468494
&consensus,
469-
&client_table,
495+
request_preflight(
496+
&consensus,
497+
&client_table,
498+
client_id,
499+
10, // session
500+
1, // request
501+
),
470502
client_id,
471-
10, // session
472-
1, // request
473503
));
474504
assert!(!result, "NoSession short-circuits");
475505

@@ -503,12 +533,10 @@ mod tests {
503533
.commit_register(client_id, initial_reply, |_| false);
504534

505535
// Older retry (17 < 99): stale-session case.
506-
let result = futures::executor::block_on(request_preflight(
536+
let result = futures::executor::block_on(apply_preflight_consensus_plane(
507537
&consensus,
508-
&client_table,
538+
request_preflight(&consensus, &client_table, client_id, 17, 1),
509539
client_id,
510-
17,
511-
1,
512540
));
513541
assert!(!result, "SessionMismatch short-circuits");
514542

@@ -541,12 +569,10 @@ mod tests {
541569
.commit_register(client_id, initial_reply, |_| false);
542570

543571
// Client claims newer session (99 > 17), client bug.
544-
let result = futures::executor::block_on(request_preflight(
572+
let result = futures::executor::block_on(apply_preflight_consensus_plane(
545573
&consensus,
546-
&client_table,
574+
request_preflight(&consensus, &client_table, client_id, 99, 1),
547575
client_id,
548-
99,
549-
1,
550576
));
551577
assert!(!result, "SessionMismatch short-circuits");
552578

@@ -569,12 +595,16 @@ mod tests {
569595

570596
let client_id: u128 = 0xCAFE;
571597

572-
let result = futures::executor::block_on(request_preflight(
598+
let result = futures::executor::block_on(apply_preflight_consensus_plane(
573599
&consensus,
574-
&client_table,
600+
request_preflight(
601+
&consensus,
602+
&client_table,
603+
client_id,
604+
10, // session
605+
1, // request
606+
),
575607
client_id,
576-
10, // session
577-
1, // request
578608
));
579609
assert!(!result, "NoSession short-circuits");
580610

@@ -605,12 +635,10 @@ mod tests {
605635
.borrow_mut()
606636
.commit_reply(client_id, session, advanced);
607637

608-
let result = futures::executor::block_on(request_preflight(
638+
let result = futures::executor::block_on(apply_preflight_consensus_plane(
609639
&consensus,
610-
&client_table,
640+
request_preflight(&consensus, &client_table, client_id, session, 3), // stale
611641
client_id,
612-
session,
613-
3, // stale
614642
));
615643
assert!(!result);
616644

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/* Licensed to the Apache Software Foundation (ASF) under one
2+
* or more contributor license agreements. See the NOTICE file
3+
* distributed with this work for additional information
4+
* regarding copyright ownership. The ASF licenses this file
5+
* to you under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance
7+
* with the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing,
12+
* software distributed under the License is distributed on an
13+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
* KIND, either express or implied. See the License for the
15+
* specific language governing permissions and limitations
16+
* under the License.
17+
*/
18+
19+
//! VSR connected-client reads: `get_me` (own connection, from the local
20+
//! `SessionManager`) and `get_clients` (cluster-wide, scatter-gathered
21+
//! across all shards). Run on a 3-node cluster so the gather genuinely
22+
//! crosses shards.
23+
//!
24+
//! TODO(remove-when-scenarios-vsr): these are a focused stopgap. The
25+
//! canonical coverage lives in `server::scenarios::authentication_scenario`
26+
//! (`get_clients`/`get_me`/`get_client`), but that whole suite is
27+
//! `#[cfg(not(feature = "vsr"))]` because it also does real
28+
//! `send_messages` / `poll_messages`, which are not wired under VSR yet
29+
//! (data plane / partition reconciliation). Once the data plane lands and
30+
//! the `server` scenario suite is un-gated under VSR, it subsumes these
31+
//! tests -- delete this file then.
32+
33+
#![cfg(feature = "vsr")]
34+
35+
use iggy::prelude::*;
36+
use integration::iggy_harness;
37+
38+
#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic])]
39+
async fn get_me_returns_own_connection(harness: &TestHarness) {
40+
let client = harness.new_client().await.unwrap();
41+
client
42+
.login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
43+
.await
44+
.unwrap();
45+
46+
let me = client.get_me().await.expect("get_me");
47+
// Sourced from the per-shard SessionManager: authenticated user id +
48+
// a real transport label + a non-empty peer address.
49+
assert_eq!(me.user_id, Some(0), "root is user id 0 in server-ng");
50+
assert!(!me.transport.is_empty(), "transport label must be set");
51+
assert!(!me.address.is_empty(), "peer address must be set");
52+
53+
client.logout_user().await.unwrap();
54+
}
55+
56+
#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic])]
57+
async fn get_clients_gathers_all_connections(harness: &TestHarness) {
58+
// Two extra clients alongside the harness client; on a 3-node cluster
59+
// their connections are spread across shards, so a complete list
60+
// exercises the cross-shard gather.
61+
let a = harness.new_client().await.unwrap();
62+
a.login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
63+
.await
64+
.unwrap();
65+
let b = harness.new_client().await.unwrap();
66+
b.login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
67+
.await
68+
.unwrap();
69+
70+
let clients = a.get_clients().await.expect("get_clients");
71+
// At least the two we just logged in (the gather may also include
72+
// other harness/bookkeeping connections).
73+
assert!(
74+
clients.len() >= 2,
75+
"expected >= 2 connected clients, got {}",
76+
clients.len()
77+
);
78+
// Records are real, not the old empty stub: transport + address set.
79+
for info in &clients {
80+
assert!(!info.transport.is_empty(), "transport label must be set");
81+
assert!(!info.address.is_empty(), "peer address must be set");
82+
}
83+
84+
a.logout_user().await.unwrap();
85+
b.logout_user().await.unwrap();
86+
}

core/integration/tests/sdk/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
* under the License.
1717
*/
1818

19+
#[cfg(feature = "vsr")]
20+
mod clients;
1921
mod hello_world;
2022
#[cfg(not(feature = "vsr"))]
2123
mod producer;

0 commit comments

Comments
 (0)