diff --git a/.gitignore b/.gitignore index 9d4d03c7..6c1b2dfb 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,8 @@ Cargo.lock .tgops.toml .tgops +.idea +notes/ docs/book/ data/ gha-creds-*.json diff --git a/Cargo.toml b/Cargo.toml index 2617f4de..008190bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -89,14 +89,14 @@ reqwest = { version = "0.12.12", default-features = false, features = [ ] } # # webvh -didwebvh-rs = "=0.1.10" +didwebvh-rs = "0.1.10" # Python bindings pyo3 = { version = "0.26", features = ["serde"] } pythonize = "0.26" # serialize -serde = { version = "1.0.220", features = ["derive"] } +serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0" } serde_with = { version = "3.14", features = ["base64"] } bs58 = "0.5" diff --git a/docs/ADR-001-relationship-state-machine.md b/docs/ADR-001-relationship-state-machine.md new file mode 100644 index 00000000..16731c1a --- /dev/null +++ b/docs/ADR-001-relationship-state-machine.md @@ -0,0 +1,48 @@ +# ADR 001: Relationship State Machine + +## Status +Proposed + +## Context +The current TSP SDK implementation lacks a formal state machine for managing relationship lifecycles. This leads to several issues: +1. **Undefined States**: The `ReverseUnidirectional` status is defined but rarely used, leading to ambiguity when a node receives a relationship request. +2. **Concurrency Issues**: If two nodes request a relationship with each other simultaneously, both end up in a `Unidirectional` state, with no clear resolution path. +3. **No Timeouts**: There is no mechanism to handle lost messages or unresponsive peers during the handshake process. +4. **Idempotency**: Duplicate control messages are not handled consistently. + +## Decision +We will implement a formal `RelationshipMachine` to govern state transitions. + +### 1. State Machine Definition + +The state machine will transition based on `RelationshipEvent`s. + +| Current State | Event | New State | Action/Notes | +| :--- | :--- | :--- | :--- | +| `Unrelated` | `SendRequest` | `Unidirectional` | Store `thread_id` | +| `Unrelated` | `ReceiveRequest` | `ReverseUnidirectional` | Store `thread_id` | +| `Unidirectional` | `ReceiveAccept` | `Bidirectional` | Verify `thread_id` matches. | +| `ReverseUnidirectional` | `SendAccept` | `Bidirectional` | Verify `thread_id` matches. | +| `Bidirectional` | `SendCancel` | `Unrelated` | | +| `Bidirectional` | `ReceiveCancel` | `Unrelated` | | +| `Unidirectional` | `SendRequest` | `Unidirectional` | Idempotent (retransmission) | +| `Unidirectional` | `ReceiveRequest` | *Conflict Resolution* | See Concurrency Handling | + +### 2. Concurrency Handling +When a node in `Unidirectional` state (sent a request) receives a `RequestRelationship` from the target (meaning they also sent a request): +- **Compare `thread_id`s**: The request with the *lower* `thread_id` (lexicographically) wins. +- **If my `thread_id` < their `thread_id`**: I ignore their request (or reject it). I expect them to accept my request. +- **If my `thread_id` > their `thread_id`**: I accept their request. I cancel my pending request state and transition to `ReverseUnidirectional` (effectively accepting their flow). + +### 3. Timeout & Retry +- **Timeout**: A `request_timeout` field will be added to `VidContext`. If a `Unidirectional` state persists beyond the timeout (e.g., 60s), it transitions back to `Unrelated`. +- **Retry**: Before timing out, the system may attempt retransmissions. + +### 4. Idempotency +- **Duplicate Request**: If in `ReverseUnidirectional` or `Bidirectional` and receive the same `RequestRelationship` (same `thread_id`), ignore it or resend the previous response. +- **Duplicate Accept**: If in `Bidirectional` and receive `AcceptRelationship` with the same `thread_id`, ignore it. + +## Consequences +- **Robustness**: Relationship establishment will be reliable under network jitter and concurrency. +- **Complexity**: The `store.rs` logic will become more complex. +- **Breaking Changes**: Existing tests that manually manipulate state might fail and need updating to respect the state machine. diff --git a/examples/tests/cli_tests.rs b/examples/tests/cli_tests.rs index 79ac49bf..70d57fcf 100644 --- a/examples/tests/cli_tests.rs +++ b/examples/tests/cli_tests.rs @@ -113,7 +113,7 @@ fn test_send_command_unverified_receiver_default() { "receive", &marc_did, ]) - .timeout(Duration::from_secs(2)) + .timeout(Duration::from_secs(20)) .assert() .stderr(predicate::str::contains("received relationship request")) .stdout(predicate::str::contains("Oh hello Marc")) @@ -158,7 +158,7 @@ fn test_send_command_unverified_receiver_ask_flag() { "--ask", ]) .write_stdin(input) - .timeout(Duration::from_secs(2)) + .timeout(Duration::from_secs(20)) .assert() .stderr(predicate::str::contains( "Message cannot be sent without verifying the receiver's DID", @@ -182,7 +182,7 @@ fn test_send_command_unverified_receiver_ask_flag() { "--ask", ]) .write_stdin(input) - .timeout(Duration::from_secs(2)) + .timeout(Duration::from_secs(20)) .assert() .stdout(predicate::str::contains( "Do you want to verify receiver DID", @@ -199,7 +199,7 @@ fn test_send_command_unverified_receiver_ask_flag() { "--one", &marc_did, ]) - .timeout(Duration::from_secs(2)) + .timeout(Duration::from_secs(20)) .assert() .stderr(predicate::str::contains("received relationship request")) .success(); @@ -256,7 +256,7 @@ fn test_webvh_creation_key_rotation() { "receive", &bar_did, ]) - .timeout(Duration::from_secs(2)) + .timeout(Duration::from_secs(20)) .assert() .stderr(predicate::str::contains("received relationship request")) .stdout(predicate::str::contains("Oh hello Marc")) @@ -293,7 +293,7 @@ fn test_webvh_creation_key_rotation() { "receive", &bar_did, ]) - .timeout(Duration::from_secs(2)) + .timeout(Duration::from_secs(20)) .assert() .stdout(predicate::str::contains("Oh hello Marc")) .failure(); diff --git a/tsp_sdk/Cargo.toml b/tsp_sdk/Cargo.toml index 6bbbb9de..66d35c09 100644 --- a/tsp_sdk/Cargo.toml +++ b/tsp_sdk/Cargo.toml @@ -87,7 +87,7 @@ arbitrary = { workspace = true, optional = true } async-trait = "0.1.88" # webvh -didwebvh-rs = { workspace = true, optional = true } +didwebvh-rs = { optional = true, version = "0.1.10" } # not used directly, but we need to configure # the JS feature in this transitive dependency diff --git a/tsp_sdk/src/definitions/mod.rs b/tsp_sdk/src/definitions/mod.rs index 8cf62a87..1be80e29 100644 --- a/tsp_sdk/src/definitions/mod.rs +++ b/tsp_sdk/src/definitions/mod.rs @@ -44,7 +44,7 @@ pub struct MessageType { } #[cfg_attr(feature = "serialize", derive(Serialize, Deserialize))] -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub enum RelationshipStatus { _Controlled, Bidirectional { diff --git a/tsp_sdk/src/error.rs b/tsp_sdk/src/error.rs index 758e5073..58160e10 100644 --- a/tsp_sdk/src/error.rs +++ b/tsp_sdk/src/error.rs @@ -25,6 +25,8 @@ pub enum Error { InvalidRoute(String), #[error("Relationship Error: {0}")] Relationship(String), + #[error("Relationship State Error: {0}")] + State(#[from] crate::relationship_machine::StateError), #[error("Error: missing private vid {0}")] MissingPrivateVid(String), #[error("Error: missing vid {0}")] diff --git a/tsp_sdk/src/lib.rs b/tsp_sdk/src/lib.rs index d7378be5..406a8657 100644 --- a/tsp_sdk/src/lib.rs +++ b/tsp_sdk/src/lib.rs @@ -125,6 +125,9 @@ pub use secure_storage::AskarSecureStorage; pub use secure_storage::SecureStorage; pub use definitions::{Payload, PrivateVid, ReceivedTspMessage, RelationshipStatus, VerifiedVid}; +pub use relationship_machine::{RelationshipEvent, RelationshipMachine}; + +pub mod relationship_machine; pub use error::Error; pub use store::{Aliases, SecureStore}; pub use vid::{ExportVid, OwnedVid, Vid}; diff --git a/tsp_sdk/src/relationship_machine.rs b/tsp_sdk/src/relationship_machine.rs new file mode 100644 index 00000000..a6c66fbc --- /dev/null +++ b/tsp_sdk/src/relationship_machine.rs @@ -0,0 +1,283 @@ +use crate::definitions::{Digest, RelationshipStatus}; + +/// Events that trigger state transitions in the relationship lifecycle. +#[derive(Debug, Clone, PartialEq)] +pub enum RelationshipEvent { + /// Sending a relationship request to a peer. + SendRequest { thread_id: Digest }, + /// Receiving a relationship request from a peer. + ReceiveRequest { thread_id: Digest }, + /// Sending an acceptance to a relationship request. + SendAccept { thread_id: Digest }, + /// Receiving an acceptance to a relationship request. + ReceiveAccept { thread_id: Digest }, + /// Sending a cancellation of the relationship. + SendCancel, + /// Receiving a cancellation of the relationship. + ReceiveCancel, + /// A request has timed out. + Timeout, +} + +/// Errors that can occur during state transitions. +#[derive(Debug, thiserror::Error, PartialEq)] +pub enum StateError { + #[error("Invalid state transition from {from:?} with event {event:?}")] + InvalidTransition { + from: RelationshipStatus, + event: RelationshipEvent, + }, + #[error("Thread ID mismatch: expected {expected:?}, got {got:?}")] + ThreadIdMismatch { expected: Digest, got: Digest }, + #[error("Concurrency conflict: both parties requested relationship")] + ConcurrencyConflict, +} + +/// The Relationship State Machine governing lifecycle transitions. +pub struct RelationshipMachine; + +impl RelationshipMachine { + /// Transition the state based on the current state and the incoming event. + pub fn transition( + current: &RelationshipStatus, + event: RelationshipEvent, + ) -> Result { + match (current, event) { + // --- Unrelated Transitions --- + (RelationshipStatus::Unrelated, RelationshipEvent::SendRequest { thread_id }) => { + Ok(RelationshipStatus::Unidirectional { thread_id }) + } + (RelationshipStatus::Unrelated, RelationshipEvent::ReceiveRequest { thread_id }) => { + Ok(RelationshipStatus::ReverseUnidirectional { thread_id }) + } + + // --- Unidirectional Transitions (I requested) --- + ( + RelationshipStatus::Unidirectional { thread_id: my_id }, + RelationshipEvent::ReceiveAccept { thread_id }, + ) => { + if my_id == &thread_id { + Ok(RelationshipStatus::Bidirectional { + thread_id, + outstanding_nested_thread_ids: vec![], + }) + } else { + Err(StateError::ThreadIdMismatch { + expected: *my_id, + got: thread_id, + }) + } + } + (RelationshipStatus::Unidirectional { .. }, RelationshipEvent::SendCancel) => { + Ok(RelationshipStatus::Unrelated) + } + (RelationshipStatus::Unidirectional { .. }, RelationshipEvent::Timeout) => { + Ok(RelationshipStatus::Unrelated) + } + // Idempotency: Retrying the request + ( + RelationshipStatus::Unidirectional { + thread_id: current_id, + }, + RelationshipEvent::SendRequest { thread_id: new_id }, + ) => { + if current_id == &new_id { + Ok(RelationshipStatus::Unidirectional { + thread_id: *current_id, + }) + } else { + // Starting a new request overrides the old one + Ok(RelationshipStatus::Unidirectional { thread_id: new_id }) + } + } + // Concurrency: I requested, but they also requested + ( + RelationshipStatus::Unidirectional { .. }, + RelationshipEvent::ReceiveRequest { .. }, + ) => Err(StateError::ConcurrencyConflict), + + // --- ReverseUnidirectional Transitions (They requested) --- + ( + RelationshipStatus::ReverseUnidirectional { + thread_id: their_id, + }, + RelationshipEvent::SendAccept { thread_id }, + ) => { + if their_id == &thread_id { + Ok(RelationshipStatus::Bidirectional { + thread_id, + outstanding_nested_thread_ids: vec![], + }) + } else { + Err(StateError::ThreadIdMismatch { + expected: *their_id, + got: thread_id, + }) + } + } + (RelationshipStatus::ReverseUnidirectional { .. }, RelationshipEvent::SendCancel) => { + Ok(RelationshipStatus::Unrelated) + } + // Idempotency: Receiving the same request again + ( + RelationshipStatus::ReverseUnidirectional { + thread_id: current_id, + }, + RelationshipEvent::ReceiveRequest { thread_id: new_id }, + ) => { + if current_id == &new_id { + Ok(RelationshipStatus::ReverseUnidirectional { + thread_id: *current_id, + }) + } else { + // They might have restarted the process + Ok(RelationshipStatus::ReverseUnidirectional { thread_id: new_id }) + } + } + + // --- Bidirectional Transitions --- + (RelationshipStatus::Bidirectional { .. }, RelationshipEvent::SendCancel) + | (RelationshipStatus::Bidirectional { .. }, RelationshipEvent::ReceiveCancel) => { + Ok(RelationshipStatus::Unrelated) + } + // Idempotency: Receiving request again when already connected + ( + RelationshipStatus::Bidirectional { + thread_id: current_id, + .. + }, + RelationshipEvent::ReceiveRequest { thread_id: new_id }, + ) => { + if current_id == &new_id { + // Ignore duplicate request, stay connected + Ok(current.clone()) + } else { + // New request implies they might have lost state, but we treat it as a conflict or reset + // For now, let's assume it resets to ReverseUnidirectional to allow re-handshake + Ok(RelationshipStatus::ReverseUnidirectional { thread_id: new_id }) + } + } + // Idempotency: Receiving accept again + ( + RelationshipStatus::Bidirectional { + thread_id: current_id, + .. + }, + RelationshipEvent::ReceiveAccept { thread_id: new_id }, + ) => { + if current_id == &new_id { + Ok(current.clone()) + } else { + Err(StateError::ThreadIdMismatch { + expected: *current_id, + got: new_id, + }) + } + } + + // --- Invalid Transitions --- + (state, event) => Err(StateError::InvalidTransition { + from: state.clone(), + event, + }), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn mock_digest(val: u8) -> Digest { + [val; 32] + } + + #[test] + fn test_normal_flow_initiator() { + let thread_id = mock_digest(1); + let mut state = RelationshipStatus::Unrelated; + + // Send Request + state = + RelationshipMachine::transition(&state, RelationshipEvent::SendRequest { thread_id }) + .unwrap(); + assert!(matches!(state, RelationshipStatus::Unidirectional { .. })); + + // Receive Accept + state = + RelationshipMachine::transition(&state, RelationshipEvent::ReceiveAccept { thread_id }) + .unwrap(); + assert!(matches!(state, RelationshipStatus::Bidirectional { .. })); + } + + #[test] + fn test_normal_flow_receiver() { + let thread_id = mock_digest(2); + let mut state = RelationshipStatus::Unrelated; + + // Receive Request + state = RelationshipMachine::transition( + &state, + RelationshipEvent::ReceiveRequest { thread_id }, + ) + .unwrap(); + assert!(matches!( + state, + RelationshipStatus::ReverseUnidirectional { .. } + )); + + // Send Accept + state = + RelationshipMachine::transition(&state, RelationshipEvent::SendAccept { thread_id }) + .unwrap(); + assert!(matches!(state, RelationshipStatus::Bidirectional { .. })); + } + + #[test] + fn test_cancellation() { + let thread_id = mock_digest(1); + let state = RelationshipStatus::Bidirectional { + thread_id, + outstanding_nested_thread_ids: vec![], + }; + + let new_state = + RelationshipMachine::transition(&state, RelationshipEvent::SendCancel).unwrap(); + assert!(matches!(new_state, RelationshipStatus::Unrelated)); + } + + #[test] + fn test_thread_id_mismatch() { + let thread_id_1 = mock_digest(1); + let thread_id_2 = mock_digest(2); + let state = RelationshipStatus::Unidirectional { + thread_id: thread_id_1, + }; + + let err = RelationshipMachine::transition( + &state, + RelationshipEvent::ReceiveAccept { + thread_id: thread_id_2, + }, + ) + .unwrap_err(); + + assert!(matches!(err, StateError::ThreadIdMismatch { .. })); + } + + #[test] + fn test_concurrency_conflict() { + let thread_id = mock_digest(1); + let state = RelationshipStatus::Unidirectional { thread_id }; + + let err = RelationshipMachine::transition( + &state, + RelationshipEvent::ReceiveRequest { + thread_id: mock_digest(2), + }, + ) + .unwrap_err(); + + assert_eq!(err, StateError::ConcurrencyConflict); + } +} diff --git a/tsp_sdk/src/store.rs b/tsp_sdk/src/store.rs index 1e072155..8b56878a 100644 --- a/tsp_sdk/src/store.rs +++ b/tsp_sdk/src/store.rs @@ -7,6 +7,7 @@ use crate::{ VerifiedVid, }, error::Error, + relationship_machine::{RelationshipEvent, RelationshipMachine, StateError}, vid::{VidError, resolve::verify_vid_offline}, }; #[cfg(feature = "async")] @@ -16,9 +17,19 @@ use std::{ collections::HashMap, fmt::Display, sync::{Arc, RwLock}, + time::{Duration, Instant}, }; use url::Url; +/// Represents a pending relationship request, storing the event that triggered it +/// and the thread ID associated with the request. +#[derive(Clone, Debug)] +#[allow(dead_code)] +pub(crate) struct PendingRequest { + pub(crate) event: RelationshipEvent, + pub(crate) thread_id: Digest, +} + #[derive(Clone)] pub(crate) struct VidContext { vid: Arc, @@ -28,6 +39,8 @@ pub(crate) struct VidContext { parent_vid: Option, tunnel: Option>, metadata: Option, + request_timeout: Option, + pending_request: Option, } impl VidContext { @@ -48,7 +61,10 @@ impl VidContext { &mut self, relation_status: RelationshipStatus, ) -> RelationshipStatus { - std::mem::replace(&mut self.relation_status, relation_status) + let old = std::mem::replace(&mut self.relation_status, relation_status); + self.request_timeout = None; // Reset timeout on status change + self.pending_request = None; // Reset pending request on status change + old } /// Set the route for this VID. The route will be used to send routed messages to this VID @@ -151,6 +167,8 @@ impl SecureStore { parent_vid: vid.parent_vid, tunnel: vid.tunnel, metadata: vid.metadata, + request_timeout: None, + pending_request: None, }, ); @@ -197,6 +215,8 @@ impl SecureStore { vid: verified_vid, private: None, relation_status: RelationshipStatus::Unrelated, + request_timeout: None, + pending_request: None, relation_vid: None, parent_vid: None, tunnel: None, @@ -226,6 +246,8 @@ impl SecureStore { vid: vid.clone(), private: Some(vid), relation_status: RelationshipStatus::Unrelated, + request_timeout: None, + pending_request: None, relation_vid: None, parent_vid: None, tunnel: None, @@ -738,16 +760,80 @@ impl SecureStore { }) } Payload::RequestRelationship { route, thread_id } => { - Ok(ReceivedTspMessage::RequestRelationship { - sender, - receiver: intended_receiver, - route: route.map(|vec| vec.iter().map(|vid| vid.to_vec()).collect()), - thread_id, - nested_vid: None, - }) + // State Machine Check + let current_status = self.get_vid(&sender)?.relation_status.clone(); + let event = RelationshipEvent::ReceiveRequest { thread_id }; + + match RelationshipMachine::transition(¤t_status, event) { + Ok(new_status) => { + self.set_relation_and_status_for_vid( + &sender, + new_status, + receiver_pid.identifier(), + )?; + Ok(ReceivedTspMessage::RequestRelationship { + sender, + receiver: intended_receiver, + route: route + .map(|vec| vec.iter().map(|vid| vid.to_vec()).collect()), + thread_id, + nested_vid: None, + }) + } + Err(StateError::ConcurrencyConflict) => { + // Handle concurrency: compare thread_ids + // We need to know *our* thread_id from the current status + if let RelationshipStatus::Unidirectional { + thread_id: my_thread_id, + } = current_status + { + if my_thread_id < thread_id { + // My request wins, ignore theirs (or reject) + // For now, we just return an error or ignore. + // Returning error might be better for visibility. + Err(Error::Relationship( + "Concurrent request conflict: my request has priority" + .into(), + )) + } else { + // Their request wins, accept theirs + // Transition to ReverseUnidirectional + let new_status = + RelationshipStatus::ReverseUnidirectional { thread_id }; + self.set_relation_and_status_for_vid( + &sender, + new_status, + receiver_pid.identifier(), + )?; + + Ok(ReceivedTspMessage::RequestRelationship { + sender, + receiver: intended_receiver, + route: route.map(|vec| { + vec.iter().map(|vid| vid.to_vec()).collect() + }), + thread_id, + nested_vid: None, + }) + } + } else { + Err(Error::State(StateError::ConcurrencyConflict)) + } + } + Err(e) => Err(Error::State(e)), + } } Payload::AcceptRelationship { thread_id } => { - self.upgrade_relation(receiver_pid.identifier(), &sender, thread_id)?; + // State Machine Check + let current_status = self.get_vid(&sender)?.relation_status.clone(); + let event = RelationshipEvent::ReceiveAccept { thread_id }; + let new_status = RelationshipMachine::transition(¤t_status, event)?; + + self.set_relation_and_status_for_vid( + &sender, + new_status, + receiver_pid.identifier(), + )?; Ok(ReceivedTspMessage::AcceptRelationship { sender, @@ -756,31 +842,35 @@ impl SecureStore { }) } Payload::CancelRelationship { thread_id } => { - if let Some(context) = self.vids.write()?.get_mut(&sender) { - match context.relation_status { - RelationshipStatus::Bidirectional { - thread_id: digest, .. - } - | RelationshipStatus::Unidirectional { thread_id: digest } - | RelationshipStatus::ReverseUnidirectional { thread_id: digest } => - { - if thread_id != digest { - return Err(Error::Relationship( - "invalid attempt to end the relationship, wrong thread_id".into(), - )); - } - context.relation_status = RelationshipStatus::Unrelated; - context.relation_vid = None; - } - RelationshipStatus::_Controlled => { + // State Machine Check + let current_status = self.get_vid(&sender)?.relation_status.clone(); + + // Verify thread_id matches before transition + match ¤t_status { + RelationshipStatus::Bidirectional { + thread_id: digest, .. + } + | RelationshipStatus::Unidirectional { thread_id: digest } + | RelationshipStatus::ReverseUnidirectional { thread_id: digest } => { + if &thread_id != digest { return Err(Error::Relationship( - "you cannot cancel a relationship with yourself".into(), + "invalid attempt to end the relationship, wrong thread_id" + .into(), )); } - RelationshipStatus::Unrelated => {} } + _ => {} // Unrelated or Controlled, let state machine handle invalid transition } + let event = RelationshipEvent::ReceiveCancel; + let new_status = RelationshipMachine::transition(¤t_status, event)?; + + self.set_relation_and_status_for_vid( + &sender, + new_status, + receiver_pid.identifier(), + )?; + Ok(ReceivedTspMessage::CancelRelationship { sender, receiver: intended_receiver, @@ -933,16 +1023,29 @@ impl SecureStore { receiver: &str, route: Option<&[&str]>, ) -> Result<(Url, Vec), Error> { - let sender = self.get_private_vid(sender)?; - let receiver = self.get_verified_vid(receiver)?; + let sender_vid = self.get_private_vid(sender)?; + let receiver_vid = self.get_verified_vid(receiver)?; let path = route; let route = route.map(|collection| collection.iter().map(|vid| vid.as_ref()).collect()); let mut thread_id = Default::default(); + + // State Machine Check + let current_status = self.get_vid(receiver)?.relation_status.clone(); + // let event = RelationshipEvent::SendRequest { thread_id: Default::default() }; // Removed unused variable + + // We need to generate thread_id first to accurately predict state, + // but seal_and_hash generates it. + // Ideally we should generate it here if we want to be strict, + // but for now let's assume transition is valid for *any* new request. + // Actually, RelationshipMachine::transition expects a thread_id. + // Let's generate a dummy one for the check or modify logic. + // Better: seal_and_hash generates it. Let's do it. + let tsp_message = crate::crypto::seal_and_hash( - &*sender, - &*receiver, + &*sender_vid, + &*receiver_vid, None, Payload::RequestRelationship { route, @@ -951,18 +1054,28 @@ impl SecureStore { Some(&mut thread_id), )?; + // Now we have the real thread_id + let event = RelationshipEvent::SendRequest { thread_id }; + let new_status = RelationshipMachine::transition(¤t_status, event)?; + let (transport, tsp_message) = if let Some(hop_list) = path { - self.set_route_for_vid(receiver.identifier(), hop_list)?; + self.set_route_for_vid(receiver_vid.identifier(), hop_list)?; self.resolve_route_and_send(hop_list, &tsp_message)? } else { - (receiver.endpoint().clone(), tsp_message) + (receiver_vid.endpoint().clone(), tsp_message) }; - self.set_relation_and_status_for_vid( - receiver.identifier(), - RelationshipStatus::Unidirectional { thread_id }, - sender.identifier(), - )?; + // Update state + let mut vids = self.vids.write().unwrap(); + if let Some(context) = vids.get_mut(receiver) { + context.relation_status = new_status; + context.request_timeout = Some(Instant::now() + Duration::from_secs(60)); + context.pending_request = Some(PendingRequest { + event: RelationshipEvent::SendRequest { thread_id }, + thread_id, + }); + context.relation_vid = Some(sender.to_string()); + } Ok((transport, tsp_message.to_owned())) } @@ -977,6 +1090,11 @@ impl SecureStore { thread_id: Digest, route: Option<&[&str]>, ) -> Result<(Url, Vec), Error> { + // State Machine Check + let current_status = self.get_vid(receiver)?.relation_status.clone(); + let event = RelationshipEvent::SendAccept { thread_id }; + let new_status = RelationshipMachine::transition(¤t_status, event)?; + let (transport, tsp_message) = self.seal_message_payload( sender, receiver, @@ -991,14 +1109,7 @@ impl SecureStore { (transport.to_owned(), tsp_message) }; - self.set_relation_and_status_for_vid( - receiver, - RelationshipStatus::Bidirectional { - thread_id, - outstanding_nested_thread_ids: Default::default(), - }, - sender, - )?; + self.set_relation_and_status_for_vid(receiver, new_status, sender)?; Ok((transport, tsp_message)) } @@ -1192,42 +1303,6 @@ impl SecureStore { self.add_verified_vid(nested_vid, None) } - fn upgrade_relation( - &self, - my_vid: &str, - other_vid: &str, - thread_id: Digest, - ) -> Result<(), Error> { - let mut vids = self.vids.write()?; - let Some(context) = vids.get_mut(other_vid) else { - return Err(Error::Relationship(format!( - "unknown other vid {other_vid}" - ))); - }; - - let RelationshipStatus::Unidirectional { thread_id: digest } = context.relation_status - else { - return Err(Error::Relationship(format!( - "no unidirectional relationship with {other_vid}, cannot upgrade" - ))); - }; - - if thread_id != digest { - return Err(Error::Relationship( - "thread_id does not match digest".to_string(), - )); - } - - context.relation_vid = Some(my_vid.to_string()); - - context.relation_status = RelationshipStatus::Bidirectional { - thread_id: digest, - outstanding_nested_thread_ids: Default::default(), - }; - - Ok(()) - } - fn add_nested_thread_id(&self, vid: &str, thread_id: Digest) -> Result<(), Error> { let mut vids = self.vids.write()?; let Some(context) = vids.get_mut(vid) else { @@ -1247,6 +1322,38 @@ impl SecureStore { Ok(()) } + /// Check for timed out relationship requests and handle them. + pub fn check_timeouts(&self) -> Result, Error> { + let mut vids = self.vids.write().unwrap(); + let mut timed_out_vids: Vec = Vec::new(); + let now = Instant::now(); + + for (vid, context) in vids.iter_mut() { + if let Some(timeout) = context.request_timeout { + if now > timeout { + // Timeout occurred + // Transition state: Unidirectional -> Unrelated (or retry logic if implemented) + // For now, we just reset to Unrelated + let event = RelationshipEvent::Timeout; + match RelationshipMachine::transition(&context.relation_status, event) { + Ok(new_status) => { + context.relation_status = new_status; + context.request_timeout = None; + context.pending_request = None; + timed_out_vids.push(vid.clone()); + } + Err(e) => { + // Log error but continue + tracing::error!("Error handling timeout for {}: {}", vid, e); + } + } + } + } + } + + Ok(timed_out_vids) + } + fn add_nested_relation( &self, parent_vid: &str, @@ -1254,7 +1361,7 @@ impl SecureStore { thread_id: Digest, ) -> Result<(), Error> { let mut vids = self.vids.write()?; - let Some(context) = vids.get_mut(parent_vid) else { + let Some(parent_context) = vids.get_mut(parent_vid) else { return Err(Error::Relationship(format!( "unknown parent vid {parent_vid}" ))); @@ -1263,7 +1370,7 @@ impl SecureStore { let RelationshipStatus::Bidirectional { ref mut outstanding_nested_thread_ids, .. - } = context.relation_status + } = parent_context.relation_status else { return Err(Error::Relationship(format!( "no relationship set for parent vid {parent_vid}" @@ -1281,13 +1388,13 @@ impl SecureStore { }; outstanding_nested_thread_ids.remove(index); - let Some(context) = vids.get_mut(nested_vid) else { + let Some(nested_context) = vids.get_mut(nested_vid) else { return Err(Error::Relationship(format!( "unknown nested vid {nested_vid}" ))); }; - context.relation_status = RelationshipStatus::Bidirectional { + nested_context.relation_status = RelationshipStatus::Bidirectional { thread_id, outstanding_nested_thread_ids: Default::default(), };