diff --git a/cli/src/commands.rs b/cli/src/commands.rs index b1211b883..bfe3a2d21 100644 --- a/cli/src/commands.rs +++ b/cli/src/commands.rs @@ -2177,6 +2177,12 @@ mod tests { executable_path: None, extensions: Vec::new(), cdp: None, + connection_mode: None, + relay_url: None, + profile_id: None, + profile_directory: None, + persist_tab_assignment: false, + agent_id: None, profile: None, state: None, proxy: None, diff --git a/cli/src/connection.rs b/cli/src/connection.rs index 96651407c..e6b4a2e2c 100644 --- a/cli/src/connection.rs +++ b/cli/src/connection.rs @@ -182,6 +182,12 @@ pub struct DaemonOptions<'a> { pub debug: bool, pub executable_path: Option<&'a str>, pub extensions: &'a [String], + pub connection_mode: Option<&'a str>, + pub relay_url: Option<&'a str>, + pub profile_id: Option<&'a str>, + pub profile_directory: Option<&'a str>, + pub persist_tab_assignment: bool, + pub agent_id: Option<&'a str>, pub args: Option<&'a str>, pub user_agent: Option<&'a str>, pub proxy: Option<&'a str>, @@ -219,6 +225,24 @@ fn apply_daemon_env(cmd: &mut Command, session: &str, opts: &DaemonOptions) { if !opts.extensions.is_empty() { cmd.env("AGENT_BROWSER_EXTENSIONS", opts.extensions.join(",")); } + if let Some(mode) = opts.connection_mode { + cmd.env("AGENT_BROWSER_CONNECTION_MODE", mode); + } + if let Some(relay_url) = opts.relay_url { + cmd.env("AGENT_BROWSER_RELAY_URL", relay_url); + } + if let Some(profile_id) = opts.profile_id { + cmd.env("AGENT_BROWSER_PROFILE_ID", profile_id); + } + if let Some(profile_directory) = opts.profile_directory { + cmd.env("AGENT_BROWSER_PROFILE_DIRECTORY", profile_directory); + } + if opts.persist_tab_assignment { + cmd.env("AGENT_BROWSER_PERSIST_TAB_ASSIGNMENT", "1"); + } + if let Some(agent_id) = opts.agent_id { + cmd.env("AGENT_BROWSER_AGENT_ID", agent_id); + } if let Some(a) = opts.args { cmd.env("AGENT_BROWSER_ARGS", a); } diff --git a/cli/src/flags.rs b/cli/src/flags.rs index 8eae1be1d..d3616b7dd 100644 --- a/cli/src/flags.rs +++ b/cli/src/flags.rs @@ -71,6 +71,12 @@ pub struct Config { pub ignore_https_errors: Option, pub allow_file_access: Option, pub cdp: Option, + pub connection_mode: Option, + pub relay_url: Option, + pub profile_id: Option, + pub profile_directory: Option, + pub persist_tab_assignment: Option, + pub agent_id: Option, pub auto_connect: Option, pub headers: Option, pub annotate: Option, @@ -116,6 +122,12 @@ impl Config { ignore_https_errors: other.ignore_https_errors.or(self.ignore_https_errors), allow_file_access: other.allow_file_access.or(self.allow_file_access), cdp: other.cdp.or(self.cdp), + connection_mode: other.connection_mode.or(self.connection_mode), + relay_url: other.relay_url.or(self.relay_url), + profile_id: other.profile_id.or(self.profile_id), + profile_directory: other.profile_directory.or(self.profile_directory), + persist_tab_assignment: other.persist_tab_assignment.or(self.persist_tab_assignment), + agent_id: other.agent_id.or(self.agent_id), auto_connect: other.auto_connect.or(self.auto_connect), headers: other.headers.or(self.headers), annotate: other.annotate.or(self.annotate), @@ -195,6 +207,10 @@ fn extract_config_path(args: &[String]) -> Option> { "--headers", "--executable-path", "--cdp", + "--connection-mode", + "--relay-url", + "--profile-id", + "--profile-directory", "--extension", "--profile", "--state", @@ -206,6 +222,7 @@ fn extract_config_path(args: &[String]) -> Option> { "--provider", "--device", "--session-name", + "--agent-id", "--color-scheme", "--download-path", "--max-output", @@ -271,6 +288,12 @@ pub struct Flags { pub headers: Option, pub executable_path: Option, pub cdp: Option, + pub connection_mode: Option, + pub relay_url: Option, + pub profile_id: Option, + pub profile_directory: Option, + pub persist_tab_assignment: bool, + pub agent_id: Option, pub extensions: Vec, pub profile: Option, pub state: Option, @@ -350,6 +373,21 @@ pub fn parse_flags(args: &[String]) -> Flags { .ok() .or(config.executable_path), cdp: config.cdp, + connection_mode: env::var("AGENT_BROWSER_CONNECTION_MODE") + .ok() + .or(config.connection_mode), + relay_url: env::var("AGENT_BROWSER_RELAY_URL") + .ok() + .or(config.relay_url), + profile_id: env::var("AGENT_BROWSER_PROFILE_ID") + .ok() + .or(config.profile_id), + profile_directory: env::var("AGENT_BROWSER_PROFILE_DIRECTORY") + .ok() + .or(config.profile_directory), + persist_tab_assignment: env_var_is_truthy("AGENT_BROWSER_PERSIST_TAB_ASSIGNMENT") + || config.persist_tab_assignment.unwrap_or(false), + agent_id: env::var("AGENT_BROWSER_AGENT_ID").ok().or(config.agent_id), extensions, profile: env::var("AGENT_BROWSER_PROFILE").ok().or(config.profile), state: env::var("AGENT_BROWSER_STATE").ok().or(config.state), @@ -503,6 +541,30 @@ pub fn parse_flags(args: &[String]) -> Flags { i += 1; } } + "--connection-mode" => { + if let Some(s) = args.get(i + 1) { + flags.connection_mode = Some(s.clone()); + i += 1; + } + } + "--relay-url" => { + if let Some(s) = args.get(i + 1) { + flags.relay_url = Some(s.clone()); + i += 1; + } + } + "--profile-id" => { + if let Some(s) = args.get(i + 1) { + flags.profile_id = Some(s.clone()); + i += 1; + } + } + "--profile-directory" => { + if let Some(s) = args.get(i + 1) { + flags.profile_directory = Some(s.clone()); + i += 1; + } + } "--profile" => { if let Some(s) = args.get(i + 1) { flags.profile = Some(s.clone()); @@ -579,6 +641,19 @@ pub fn parse_flags(args: &[String]) -> Flags { i += 1; } } + "--persist-tab-assignment" => { + let (val, consumed) = parse_bool_arg(args, i); + flags.persist_tab_assignment = val; + if consumed { + i += 1; + } + } + "--agent-id" => { + if let Some(s) = args.get(i + 1) { + flags.agent_id = Some(s.clone()); + i += 1; + } + } "--session-name" => { if let Some(s) = args.get(i + 1) { flags.session_name = Some(s.clone()); @@ -716,6 +791,7 @@ pub fn clean_args(args: &[String]) -> Vec { "--ignore-https-errors", "--allow-file-access", "--auto-connect", + "--persist-tab-assignment", "--annotate", "--content-boundaries", "--confirm-interactive", @@ -726,6 +802,10 @@ pub fn clean_args(args: &[String]) -> Vec { "--headers", "--executable-path", "--cdp", + "--connection-mode", + "--relay-url", + "--profile-id", + "--profile-directory", "--extension", "--profile", "--state", @@ -736,6 +816,7 @@ pub fn clean_args(args: &[String]) -> Vec { "-p", "--provider", "--device", + "--agent-id", "--session-name", "--color-scheme", "--download-path", diff --git a/cli/src/main.rs b/cli/src/main.rs index 448add021..ccae965c0 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -308,6 +308,12 @@ fn main() { debug: flags.debug, executable_path: flags.executable_path.as_deref(), extensions: &flags.extensions, + connection_mode: flags.connection_mode.as_deref(), + relay_url: flags.relay_url.as_deref(), + profile_id: flags.profile_id.as_deref(), + profile_directory: flags.profile_directory.as_deref(), + persist_tab_assignment: flags.persist_tab_assignment, + agent_id: flags.agent_id.as_deref(), args: flags.args.as_deref(), user_agent: flags.user_agent.as_deref(), proxy: flags.proxy.as_deref(), diff --git a/cli/src/native/actions.rs b/cli/src/native/actions.rs index bebffc207..23880b7f0 100644 --- a/cli/src/native/actions.rs +++ b/cli/src/native/actions.rs @@ -8,7 +8,7 @@ use time::{format_description::well_known::Rfc3339, OffsetDateTime}; use tokio::sync::{broadcast, oneshot, RwLock}; use super::auth; -use super::browser::{BrowserManager, WaitUntil}; +use super::browser::{BrowserManager, ConnectionMode, ExtensionRelayConnectOptions, WaitUntil}; use super::cdp::chrome::LaunchOptions; use super::cdp::client::CdpClient; use super::cdp::types::{ @@ -137,6 +137,15 @@ pub struct DaemonState { impl DaemonState { pub fn new() -> Self { + let session_name = env::var("AGENT_BROWSER_SESSION_NAME") + .ok() + .filter(|s| !s.is_empty()); + let session_id = + env::var("AGENT_BROWSER_SESSION").unwrap_or_else(|_| "default".to_string()); + Self::new_for_session(session_id, session_name) + } + + pub fn new_for_session(session_id: String, session_name: Option) -> Self { Self { browser: None, appium: None, @@ -149,8 +158,8 @@ impl DaemonState { .filter(|s| !s.is_empty()) .map(|s| DomainFilter::new(&s)), event_tracker: EventTracker::new(), - session_name: env::var("AGENT_BROWSER_SESSION_NAME").ok(), - session_id: env::var("AGENT_BROWSER_SESSION").unwrap_or_else(|_| "default".to_string()), + session_name, + session_id, tracing_state: TracingState::new(), recording_state: RecordingState::new(), event_rx: None, @@ -173,15 +182,26 @@ impl DaemonState { /// Create state with an optional stream client slot and server instance /// (for daemon startup with stream server). pub fn new_with_stream( + session_id: String, + session_name: Option, stream_client: Option>>>>, stream_server: Option>, ) -> Self { - let mut s = Self::new(); + let mut s = Self::new_for_session(session_id, session_name); s.stream_client = stream_client; s.stream_server = stream_server; s } + pub async fn flush_tab_assignments(&mut self) -> Result<(), String> { + if let Some(browser) = self.browser.as_mut() { + browser + .flush_tab_assignments_if_active("attached", None) + .await?; + } + Ok(()) + } + fn subscribe_to_browser_events(&mut self) { if let Some(ref browser) = self.browser { self.event_rx = Some(browser.client.subscribe()); @@ -579,7 +599,7 @@ pub async fn execute_command(cmd: &Value, state: &mut DaemonState) -> Value { for target_id in &destroyed_targets { if let Some(ref mut mgr) = state.browser { - mgr.remove_page_by_target_id(target_id); + let _ = mgr.remove_page_by_target_id(target_id).await; } } @@ -609,13 +629,17 @@ pub async fn execute_command(cmd: &Value, state: &mut DaemonState) -> Value { .await; } - mgr.add_page(super::browser::PageInfo { - target_id: te.target_info.target_id.clone(), - session_id: attach.session_id, - url: te.target_info.url.clone(), - title: te.target_info.title.clone(), - target_type: te.target_info.target_type.clone(), - }); + let _ = mgr + .add_page(super::browser::PageInfo { + target_id: te.target_info.target_id.clone(), + session_id: attach.session_id, + url: te.target_info.url.clone(), + title: te.target_info.title.clone(), + target_type: te.target_info.target_type.clone(), + browser_context_id: te.target_info.browser_context_id.clone(), + assigned_tab: super::browser::AssignedTabMetadata::default(), + }) + .await; } } } @@ -916,10 +940,46 @@ async fn connect_auto_with_fresh_tab() -> Result { async fn auto_launch(state: &mut DaemonState) -> Result<(), String> { let options = launch_options_from_env(); let engine = env::var("AGENT_BROWSER_ENGINE").ok(); + let env_connection_mode = env::var("AGENT_BROWSER_CONNECTION_MODE").ok(); + let connection_mode = ConnectionMode::resolve( + env_connection_mode.as_deref(), + env::var("AGENT_BROWSER_CDP").is_ok(), + env::var("AGENT_BROWSER_AUTO_CONNECT").is_ok(), + env::var("AGENT_BROWSER_RELAY_URL").is_ok(), + false, + )?; + + if matches!(connection_mode, ConnectionMode::ExtensionRelay) { + let relay_url = env::var("AGENT_BROWSER_RELAY_URL").map_err(|_| { + "AGENT_BROWSER_RELAY_URL is required for connectionMode=extension-relay".to_string() + })?; + let profile_id = env::var("AGENT_BROWSER_PROFILE_ID").ok(); + let profile_directory = env::var("AGENT_BROWSER_PROFILE_DIRECTORY").ok(); + let agent_id = env::var("AGENT_BROWSER_AGENT_ID").ok(); + let mgr = BrowserManager::connect_via_extension_relay(ExtensionRelayConnectOptions { + relay_url: &relay_url, + session_id: &state.session_id, + session_name: state.session_name.as_deref(), + profile_id: profile_id.as_deref(), + profile_directory: profile_directory.as_deref(), + persist_tab_assignment: env::var("AGENT_BROWSER_PERSIST_TAB_ASSIGNMENT") + .map(|v| v == "1" || v.eq_ignore_ascii_case("true")) + .unwrap_or(false), + agent_id: agent_id.as_deref(), + }) + .await?; + state.browser = Some(mgr); + configure_browser_tab_assignment_persistence(state).await?; + state.subscribe_to_browser_events(); + state.update_stream_client().await; + try_auto_restore_state(state).await; + return Ok(()); + } if let Ok(cdp) = env::var("AGENT_BROWSER_CDP") { let mgr = BrowserManager::connect_cdp(&cdp).await?; state.browser = Some(mgr); + configure_browser_tab_assignment_persistence(state).await?; state.subscribe_to_browser_events(); state.update_stream_client().await; try_auto_restore_state(state).await; @@ -928,6 +988,7 @@ async fn auto_launch(state: &mut DaemonState) -> Result<(), String> { if env::var("AGENT_BROWSER_AUTO_CONNECT").is_ok() { state.browser = Some(connect_auto_with_fresh_tab().await?); + configure_browser_tab_assignment_persistence(state).await?; state.subscribe_to_browser_events(); state.update_stream_client().await; try_auto_restore_state(state).await; @@ -936,12 +997,28 @@ async fn auto_launch(state: &mut DaemonState) -> Result<(), String> { let mgr = BrowserManager::launch(options, engine.as_deref()).await?; state.browser = Some(mgr); + configure_browser_tab_assignment_persistence(state).await?; state.subscribe_to_browser_events(); state.update_stream_client().await; try_auto_restore_state(state).await; Ok(()) } +async fn configure_browser_tab_assignment_persistence( + state: &mut DaemonState, +) -> Result<(), String> { + let Some(session_name) = state.session_name.clone() else { + return Ok(()); + }; + let Some(browser) = state.browser.as_mut() else { + return Ok(()); + }; + + browser + .configure_tab_assignment_persistence(state.session_id.clone(), session_name) + .await +} + fn launch_options_from_env() -> LaunchOptions { let headed = env::var("AGENT_BROWSER_HEADED") .map(|v| v == "1" || v == "true") @@ -1027,12 +1104,25 @@ async fn handle_launch(cmd: &Value, state: &mut DaemonState) -> Result Result Result Result Result { state.browser = Some(mgr); + configure_browser_tab_assignment_persistence(state).await?; state.subscribe_to_browser_events(); state.update_stream_client().await; return Ok(json!({ "launched": true, "provider": provider })); @@ -1196,6 +1329,7 @@ async fn handle_launch(cmd: &Value, state: &mut DaemonState) -> Result Result< url: nav_url.clone(), title: String::new(), target_type: "page".to_string(), - }); + browser_context_id: None, + assigned_tab: super::browser::AssignedTabMetadata::default(), + }) + .await?; // Navigate to URL if nav_url != "about:blank" { @@ -4476,7 +4613,10 @@ async fn handle_window_new(cmd: &Value, state: &mut DaemonState) -> Result, - has_cdp: bool, + connection_mode: ConnectionMode, profile: Option<&str>, storage_state: Option<&str>, allow_file_access: bool, @@ -27,13 +34,13 @@ pub fn validate_launch_options( ) -> Result<(), String> { let has_extensions = extensions.map(|e| !e.is_empty()).unwrap_or(false); - if has_extensions && has_cdp { + if has_extensions && matches!(connection_mode, ConnectionMode::Cdp) { return Err( "Cannot use extensions with cdp_url (extensions require local browser launch)" .to_string(), ); } - if profile.is_some() && has_cdp { + if profile.is_some() && matches!(connection_mode, ConnectionMode::Cdp) { return Err( "Cannot use profile with cdp_url (profile requires local browser launch)".to_string(), ); @@ -127,6 +134,27 @@ pub struct PageInfo { pub url: String, pub title: String, pub target_type: String, // "page" or "webview" + pub browser_context_id: Option, + pub assigned_tab: AssignedTabMetadata, +} + +#[derive(Debug, Clone, Default)] +pub struct AssignedTabMetadata { + pub target_id: Option, + pub last_known_url: Option, + pub last_known_title: Option, + pub fallback_index: Option, + pub context_ordinal: Option, +} + +#[derive(Debug, Clone, Default)] +struct TabAssignmentSeed { + tab_id: Option, + target_id: Option, + last_known_url: Option, + last_known_title: Option, + fallback_index: Option, + context_ordinal: Option, } #[derive(Debug, Clone, Copy)] @@ -151,6 +179,92 @@ pub enum BrowserProcess { Lightpanda(LightpandaProcess), } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ConnectionMode { + Local, + Cdp, + ExtensionRelay, +} + +impl ConnectionMode { + pub fn resolve( + explicit: Option<&str>, + has_cdp: bool, + auto_connect: bool, + has_relay: bool, + has_provider: bool, + ) -> Result { + if let Some(mode) = explicit { + let mode = mode.parse::()?; + if matches!(mode, Self::Local) && (has_cdp || auto_connect || has_relay || has_provider) + { + return Err( + "connectionMode=local cannot be combined with cdpUrl, cdpPort, autoConnect, relayUrl, or provider" + .to_string(), + ); + } + if matches!(mode, Self::Cdp) && has_relay { + return Err( + "connectionMode=cdp cannot be combined with relayUrl; use connectionMode=extension-relay" + .to_string(), + ); + } + if matches!(mode, Self::ExtensionRelay) && (has_cdp || auto_connect || has_provider) { + return Err( + "connectionMode=extension-relay cannot be combined with cdpUrl, cdpPort, autoConnect, or provider" + .to_string(), + ); + } + return Ok(mode); + } + + if has_relay { + return Ok(Self::ExtensionRelay); + } + if has_cdp || auto_connect || has_provider { + return Ok(Self::Cdp); + } + Ok(Self::Local) + } + + pub fn is_external(self) -> bool { + !matches!(self, Self::Local) + } +} + +impl FromStr for ConnectionMode { + type Err = String; + + fn from_str(value: &str) -> Result { + match value { + "local" => Ok(Self::Local), + "cdp" => Ok(Self::Cdp), + "extension-relay" => Ok(Self::ExtensionRelay), + other => Err(format!( + "Invalid connection mode '{}'. Expected one of: local, cdp, extension-relay", + other + )), + } + } +} + +#[derive(Debug, Clone, Default)] +pub struct RelayHandshakeMetadata { + pub tab_id: Option, + pub connection_id: Option, + pub target_id: Option, +} + +pub struct ExtensionRelayConnectOptions<'a> { + pub relay_url: &'a str, + pub session_id: &'a str, + pub session_name: Option<&'a str>, + pub profile_id: Option<&'a str>, + pub profile_directory: Option<&'a str>, + pub persist_tab_assignment: bool, + pub agent_id: Option<&'a str>, +} + impl BrowserProcess { pub fn kill(&mut self) { match self { @@ -170,10 +284,18 @@ impl BrowserProcess { pub struct BrowserManager { pub client: Arc, browser_process: Option, + connection_mode: ConnectionMode, ws_url: String, + relay_tab_id: Option, + relay_connection_id: Option, + relay_target_id: Option, pages: Vec, active_page_index: usize, default_timeout_ms: u64, + session_id: Option, + session_name: Option, + tab_assignment_store: Option, + is_shutting_down: bool, } const LIGHTPANDA_CDP_CONNECT_TIMEOUT: Duration = Duration::from_secs(5); @@ -188,7 +310,7 @@ impl BrowserManager { "chrome" => { validate_launch_options( options.extensions.as_deref(), - false, + ConnectionMode::Local, options.profile.as_deref(), options.storage_state.as_deref(), options.allow_file_access, @@ -238,10 +360,18 @@ impl BrowserManager { let mut manager = Self { client, browser_process: Some(process), + connection_mode: ConnectionMode::Local, ws_url, + relay_tab_id: None, + relay_connection_id: None, + relay_target_id: None, pages: Vec::new(), active_page_index: 0, default_timeout_ms: 25_000, + session_id: None, + session_name: None, + tab_assignment_store: None, + is_shutting_down: false, }; manager.discover_and_attach_targets().await?; manager @@ -298,25 +428,167 @@ impl BrowserManager { pub async fn connect_cdp(url: &str) -> Result { let ws_url = resolve_cdp_url(url).await?; + Self::attach_connected_browser( + ws_url, + ConnectionMode::Cdp, + RelayHandshakeMetadata::default(), + ) + .await + } + + pub async fn connect_auto() -> Result { + let ws_url = auto_connect_cdp().await?; + Self::attach_connected_browser( + ws_url, + ConnectionMode::Cdp, + RelayHandshakeMetadata::default(), + ) + .await + } + + pub async fn connect_via_extension_relay( + options: ExtensionRelayConnectOptions<'_>, + ) -> Result { + let (mut relay_ws, _) = tokio_tungstenite::connect_async(options.relay_url) + .await + .map_err(|e| format!("Extension relay WebSocket connect failed: {}", e))?; + + let handshake = json!({ + "type": "connect", + "connectionMode": "extension-relay", + "sessionId": options.session_id, + "sessionName": options.session_name, + "session_id": options.session_id, + "session_name": options.session_name, + "profileId": options.profile_id, + "profileDirectory": options.profile_directory, + "persistTabAssignment": options.persist_tab_assignment, + "agentId": options.agent_id, + }); + + relay_ws + .send(Message::Text(handshake.to_string())) + .await + .map_err(|e| format!("Failed to send extension relay handshake: {}", e))?; + + let response_text = loop { + let next = tokio::time::timeout(Duration::from_secs(10), relay_ws.next()) + .await + .map_err(|_| { + "Timed out after 10s waiting for extension relay handshake response".to_string() + })? + .ok_or_else(|| { + "Extension relay closed before returning a scoped CDP endpoint".to_string() + })?; + match next { + Ok(Message::Text(text)) => break text.to_string(), + Ok(Message::Binary(data)) => { + break String::from_utf8(data.to_vec()).map_err(|e| { + format!( + "Extension relay returned non-UTF8 binary handshake data: {}", + e + ) + })? + } + Ok(Message::Ping(_)) | Ok(Message::Pong(_)) => continue, + Ok(Message::Close(frame)) => { + let reason = frame + .map(|f| f.reason.to_string()) + .unwrap_or_else(|| "no close reason".to_string()); + return Err(format!( + "Extension relay closed before returning a scoped CDP endpoint: {}", + reason + )); + } + Ok(_) => continue, + Err(e) => return Err(format!("Extension relay receive failed: {}", e)), + } + }; + + let _ = relay_ws.close(None).await; + + let response: Value = serde_json::from_str(&response_text) + .map_err(|e| format!("Invalid extension relay handshake response: {}", e))?; + + if relay_response_bool(&response, &["success"]).is_some_and(|success| !success) + || relay_response_bool(&response, &["ok"]).is_some_and(|ok| !ok) + { + return Err(extract_relay_error(&response).unwrap_or_else(|| { + "Extension relay handshake failed without an error message".to_string() + })); + } + + let scoped_endpoint = extract_relay_string( + &response, + &[ + &["scopedCdpUrl"], + &["cdpUrl"], + &["wsUrl"], + &["endpoint"], + &["data", "scopedCdpUrl"], + &["data", "cdpUrl"], + &["data", "wsUrl"], + &["data", "endpoint"], + ], + ) + .ok_or_else(|| { + "Extension relay handshake response did not include a scoped CDP endpoint".to_string() + })?; + + let ws_url = resolve_cdp_url(&scoped_endpoint).await?; + let relay_metadata = RelayHandshakeMetadata { + tab_id: extract_relay_usize( + &response, + &[&["tabId"], &["data", "tabId"], &["assignment", "tabId"]], + ), + connection_id: extract_relay_string( + &response, + &[ + &["connectionId"], + &["data", "connectionId"], + &["assignment", "connectionId"], + ], + ), + target_id: extract_relay_string( + &response, + &[ + &["targetId"], + &["data", "targetId"], + &["assignment", "targetId"], + ], + ), + }; + + Self::attach_connected_browser(ws_url, ConnectionMode::ExtensionRelay, relay_metadata).await + } + + async fn attach_connected_browser( + ws_url: String, + connection_mode: ConnectionMode, + relay_metadata: RelayHandshakeMetadata, + ) -> Result { let client = Arc::new(CdpClient::connect(&ws_url).await?); let mut manager = Self { client, browser_process: None, + connection_mode, ws_url, + relay_tab_id: relay_metadata.tab_id, + relay_connection_id: relay_metadata.connection_id, + relay_target_id: relay_metadata.target_id, pages: Vec::new(), active_page_index: 0, default_timeout_ms: 10_000, + session_id: None, + session_name: None, + tab_assignment_store: None, + is_shutting_down: false, }; manager.discover_and_attach_targets().await?; Ok(manager) } - pub async fn connect_auto() -> Result { - let ws_url = auto_connect_cdp().await?; - Self::connect_cdp(&ws_url).await - } - async fn discover_and_attach_targets(&mut self) -> Result<(), String> { self.client .send_command_typed::<_, Value>( @@ -372,6 +644,8 @@ impl BrowserManager { url: "about:blank".to_string(), title: String::new(), target_type: "page".to_string(), + browser_context_id: None, + assigned_tab: AssignedTabMetadata::default(), }); self.active_page_index = 0; self.enable_domains(&attach_result.session_id).await?; @@ -395,6 +669,8 @@ impl BrowserManager { url: target.url.clone(), title: target.title.clone(), target_type: target.target_type.clone(), + browser_context_id: target.browser_context_id.clone(), + assigned_tab: AssignedTabMetadata::default(), }); } @@ -403,6 +679,42 @@ impl BrowserManager { self.enable_domains(&session_id).await?; } + self.sync_all_assignment_metadata(); + let restored = self.restore_persisted_tab_assignment().await?; + if self.tab_assignment_store.is_some() && !self.pages.is_empty() { + if !restored { + self.active_page_index = 0; + } + self.flush_tab_assignments_if_active("attached", None) + .await?; + } + + Ok(()) + } + + pub async fn configure_tab_assignment_persistence( + &mut self, + session_id: String, + session_name: String, + ) -> Result<(), String> { + self.session_id = Some(session_id.clone()); + self.session_name = Some(session_name.clone()); + self.tab_assignment_store = Some(tab_assignments::read_tab_assignments( + &session_id, + Some(&session_name), + )?); + self.sync_all_assignment_metadata(); + + let restored = self.restore_persisted_tab_assignment().await?; + if !restored && !self.pages.is_empty() { + self.active_page_index = self.active_page_index.min(self.pages.len() - 1); + } + + if !self.pages.is_empty() { + self.flush_tab_assignments_if_active("attached", None) + .await?; + } + Ok(()) } @@ -567,6 +879,12 @@ impl BrowserManager { } pub async fn close(&mut self) -> Result<(), String> { + if self.is_shutting_down { + return Ok(()); + } + self.is_shutting_down = true; + let runtime_fallback = self.capture_page_runtime(self.active_page_index).await.ok(); + if self.browser_process.is_some() { // Only send Browser.close when we launched the browser ourselves. // For external connections (--auto-connect, --cdp) we just disconnect @@ -585,6 +903,12 @@ impl BrowserManager { .await; } + self.pages.clear(); + self.active_page_index = 0; + self.flush_tab_assignments("detached", runtime_fallback.as_ref()) + .await?; + self.is_shutting_down = false; + Ok(()) } @@ -613,6 +937,22 @@ impl BrowserManager { &self.ws_url } + pub fn connection_mode(&self) -> ConnectionMode { + self.connection_mode + } + + pub fn relay_tab_id(&self) -> Option { + self.relay_tab_id + } + + pub fn relay_connection_id(&self) -> Option<&str> { + self.relay_connection_id.as_deref() + } + + pub fn relay_target_id(&self) -> Option<&str> { + self.relay_target_id.as_deref() + } + /// Returns the Chrome debug server address as "host:port". pub fn chrome_host_port(&self) -> &str { let stripped = self @@ -671,6 +1011,11 @@ impl BrowserManager { url: "about:blank".to_string(), title: String::new(), target_type: "page".to_string(), + browser_context_id: self + .pages + .get(self.active_page_index) + .and_then(|page| page.browser_context_id.clone()), + assigned_tab: AssignedTabMetadata::default(), }); self.active_page_index = 0; self.enable_domains(&attach_result.session_id).await?; @@ -745,8 +1090,16 @@ impl BrowserManager { url: target_url.to_string(), title: String::new(), target_type: "page".to_string(), + browser_context_id: self + .pages + .get(self.active_page_index) + .and_then(|page| page.browser_context_id.clone()), + assigned_tab: AssignedTabMetadata::default(), }); self.active_page_index = index; + self.sync_all_assignment_metadata(); + self.flush_tab_assignments_if_active("attached", None) + .await?; Ok(json!({ "index": index, "url": target_url })) } @@ -777,6 +1130,9 @@ impl BrowserManager { page.url = url.clone(); page.title = title.clone(); } + self.sync_all_assignment_metadata(); + self.flush_tab_assignments_if_active("attached", None) + .await?; Ok(json!({ "index": index, "url": url, "title": title })) } @@ -792,6 +1148,7 @@ impl BrowserManager { return Err("Cannot close the last tab".to_string()); } + let runtime_fallback = self.capture_page_runtime(target_index).await.ok(); let page = self.pages.remove(target_index); let _ = self .client @@ -810,6 +1167,9 @@ impl BrowserManager { let session_id = self.pages[self.active_page_index].session_id.clone(); self.enable_domains(&session_id).await?; + self.sync_all_assignment_metadata(); + self.flush_tab_assignments_if_active("attached", runtime_fallback.as_ref()) + .await?; Ok(json!({ "closed": target_index, "activeIndex": self.active_page_index })) } @@ -1047,17 +1407,57 @@ impl BrowserManager { .to_string()) } - pub fn add_page(&mut self, page: PageInfo) { + pub async fn add_page(&mut self, mut page: PageInfo) -> Result<(), String> { + page.assigned_tab = AssignedTabMetadata { + target_id: Some(page.target_id.clone()), + last_known_url: if page.url.is_empty() { + None + } else { + Some(page.url.clone()) + }, + last_known_title: if page.title.is_empty() { + None + } else { + Some(page.title.clone()) + }, + fallback_index: None, + context_ordinal: None, + }; let index = self.pages.len(); self.pages.push(page); self.active_page_index = index; + self.sync_all_assignment_metadata(); + self.flush_tab_assignments_if_active("attached", None).await } - pub fn remove_page_by_target_id(&mut self, target_id: &str) { + pub async fn remove_page_by_target_id(&mut self, target_id: &str) -> Result<(), String> { if let Some(pos) = self.pages.iter().position(|p| p.target_id == target_id) { + let runtime_fallback = self.capture_page_runtime(pos).await.ok(); + let removed_active = pos == self.active_page_index; self.pages.remove(pos); - self.update_active_page_if_needed(); + if self.pages.is_empty() { + self.active_page_index = 0; + } else if self.active_page_index > pos { + self.active_page_index -= 1; + } else { + self.update_active_page_if_needed(); + } + self.sync_all_assignment_metadata(); + + if removed_active && !self.pages.is_empty() { + let session_id = self.pages[self.active_page_index].session_id.clone(); + self.enable_domains(&session_id).await?; + } + + let status = if self.pages.is_empty() { + "detached" + } else { + "attached" + }; + self.flush_tab_assignments_if_active(status, runtime_fallback.as_ref()) + .await?; } + Ok(()) } pub fn has_target(&self, target_id: &str) -> bool { @@ -1214,10 +1614,18 @@ async fn initialize_lightpanda_manager( let mut manager = BrowserManager { client: Arc::new(client), browser_process: None, + connection_mode: ConnectionMode::Local, ws_url: ws_url.clone(), + relay_tab_id: None, + relay_connection_id: None, + relay_target_id: None, pages: Vec::new(), active_page_index: 0, default_timeout_ms: 25_000, + session_id: None, + session_name: None, + tab_assignment_store: None, + is_shutting_down: false, }; match discover_and_attach_lightpanda_targets(&mut manager, deadline).await { @@ -1279,6 +1687,289 @@ fn lightpanda_target_init_timeout(last_error: Option<&str>) -> String { message } +impl BrowserManager { + fn sync_all_assignment_metadata(&mut self) { + let mut context_ordinals = BTreeMap::::new(); + for page in &self.pages { + if let Some(context_id) = page.browser_context_id.as_ref() { + let next_ordinal = context_ordinals.len(); + context_ordinals + .entry(context_id.clone()) + .or_insert(next_ordinal); + } + } + + for index in 0..self.pages.len() { + if let Some(page) = self.pages.get_mut(index) { + page.assigned_tab.target_id = Some(page.target_id.clone()); + page.assigned_tab.fallback_index = Some(index); + page.assigned_tab.context_ordinal = page + .browser_context_id + .as_ref() + .and_then(|context_id| context_ordinals.get(context_id).copied()); + if !page.url.is_empty() { + page.assigned_tab.last_known_url = Some(page.url.clone()); + } + if !page.title.is_empty() { + page.assigned_tab.last_known_title = Some(page.title.clone()); + } + } + } + } + + async fn capture_page_runtime(&mut self, index: usize) -> Result { + if index >= self.pages.len() { + return Err("Tab index out of range".to_string()); + } + + self.sync_all_assignment_metadata(); + if index == self.active_page_index { + if let Ok(url) = self.get_url().await { + if let Some(page) = self.pages.get_mut(index) { + page.url = url.clone(); + page.assigned_tab.last_known_url = Some(url); + } + } + if let Ok(title) = self.get_title().await { + if let Some(page) = self.pages.get_mut(index) { + page.title = title.clone(); + if !title.is_empty() { + page.assigned_tab.last_known_title = Some(title); + } + } + } + } + + self.pages + .get(index) + .map(|page| page.assigned_tab.clone()) + .ok_or_else(|| "Tab index out of range".to_string()) + } + + async fn restore_persisted_tab_assignment(&mut self) -> Result { + let session_id = match self.session_id.as_deref() { + Some(session_id) if !session_id.is_empty() => session_id, + _ => return Ok(false), + }; + let assignment = { + let Some(store) = self.tab_assignment_store.as_ref() else { + return Ok(false); + }; + let Some(assignment) = store.assignments.get(session_id).cloned() else { + return Ok(false); + }; + assignment + }; + if self.pages.is_empty() { + return Ok(false); + } + + self.sync_all_assignment_metadata(); + + let by_target = assignment.target_id.as_ref().and_then(|target_id| { + self.pages + .iter() + .position(|page| page.target_id == *target_id) + }); + let by_url = assignment.last_known_url.as_ref().and_then(|url: &String| { + self.pages.iter().position(|page| { + page.assigned_tab.last_known_url.as_deref() == Some(url.as_str()) + || page.url == *url + }) + }); + let by_title = assignment + .last_known_title + .as_ref() + .and_then(|title: &String| { + self.pages.iter().position(|page| { + page.assigned_tab.last_known_title.as_deref() == Some(title.as_str()) + || page.title == *title + }) + }); + let by_index = assignment + .fallback_index + .filter(|index| *index < self.pages.len()); + + let matched = by_target.or(by_url).or(by_title).or(by_index); + if let Some(index) = matched { + self.active_page_index = index; + let session_id = self.pages[index].session_id.clone(); + self.enable_domains(&session_id).await?; + return Ok(true); + } + + Ok(false) + } + + fn build_tab_assignment( + &self, + session_id: &str, + status: TabAssignmentStatus, + now: &str, + previous_assignment: Option<&TabAssignment>, + seed: Option, + ) -> TabAssignment { + let relay_tab_id = self.relay_tab_id; + let relay_connection_id = self.relay_connection_id.clone(); + let relay_target_id = self.relay_target_id.clone(); + let seed = seed.unwrap_or_default(); + + TabAssignment { + agent_session_id: session_id.to_string(), + tab_id: seed.tab_id.or(relay_tab_id), + target_id: seed + .target_id + .or_else(|| previous_assignment.and_then(|a| a.target_id.clone())) + .or(relay_target_id), + window_id: previous_assignment.and_then(|a| a.window_id), + status, + lease_version: previous_assignment.and_then(|a| a.lease_version), + connection_id: previous_assignment + .and_then(|a| a.connection_id.clone()) + .or(relay_connection_id), + assigned_at: previous_assignment + .map(|a| a.assigned_at.clone()) + .unwrap_or_else(|| now.to_string()), + updated_at: now.to_string(), + last_known_url: seed + .last_known_url + .or_else(|| previous_assignment.and_then(|a| a.last_known_url.clone())), + last_known_title: seed + .last_known_title + .or_else(|| previous_assignment.and_then(|a| a.last_known_title.clone())), + fallback_index: seed + .fallback_index + .or_else(|| previous_assignment.and_then(|a| a.fallback_index)), + context_ordinal: seed + .context_ordinal + .or_else(|| previous_assignment.and_then(|a| a.context_ordinal)), + } + } + + pub async fn flush_tab_assignments( + &mut self, + status: &str, + runtime_fallback: Option<&AssignedTabMetadata>, + ) -> Result<(), String> { + let session_id = match self.session_id.clone() { + Some(session_id) if !session_id.is_empty() => session_id, + _ => return Ok(()), + }; + let session_name = match self.session_name.clone() { + Some(session_name) if !session_name.is_empty() => session_name, + _ => return Ok(()), + }; + + for _ in 0..3 { + let existing = tab_assignments::read_tab_assignments(&session_id, Some(&session_name))?; + let expected_revision = existing.revision; + let previous_assignment = existing.assignments.get(&session_id).cloned(); + let now = OffsetDateTime::now_utc() + .format(&Rfc3339) + .map_err(|e| format!("Failed to format timestamp: {}", e))?; + let assignment_status = TabAssignmentStatus::from_str(status)?; + + let active_runtime = + if !self.pages.is_empty() && self.active_page_index < self.pages.len() { + Some(self.capture_page_runtime(self.active_page_index).await?) + } else { + None + }; + + let seed = if let Some(runtime) = active_runtime.as_ref() { + let page = &self.pages[self.active_page_index]; + Some(TabAssignmentSeed { + tab_id: Some(self.active_page_index), + target_id: runtime + .target_id + .clone() + .or_else(|| Some(page.target_id.clone())), + last_known_url: runtime.last_known_url.clone(), + last_known_title: runtime.last_known_title.clone(), + fallback_index: runtime.fallback_index.or(Some(self.active_page_index)), + context_ordinal: runtime.context_ordinal, + }) + } else { + runtime_fallback.map(|runtime| TabAssignmentSeed { + tab_id: runtime.fallback_index, + target_id: runtime.target_id.clone(), + last_known_url: runtime.last_known_url.clone(), + last_known_title: runtime.last_known_title.clone(), + fallback_index: runtime.fallback_index, + context_ordinal: runtime.context_ordinal, + }) + }; + + let assignment = self.build_tab_assignment( + &session_id, + assignment_status, + &now, + previous_assignment.as_ref(), + seed, + ); + + let mut assignments = existing.assignments.clone(); + assignments.insert(session_id.clone(), assignment.clone()); + + let mut tabs = existing.tabs.clone().unwrap_or_default(); + if let Some(tab_id) = assignment.tab_id { + tabs.insert( + tab_id.to_string(), + TabInfo { + tab_id, + target_id: assignment.target_id.clone(), + owner_session_id: Some(session_id.clone()), + status: assignment_status, + last_known_url: assignment.last_known_url.clone(), + last_known_title: assignment.last_known_title.clone(), + updated_at: now.clone(), + }, + ); + } + + let next_data = TabAssignmentsFile { + version: 1, + revision: expected_revision, + session_name: session_name.clone(), + updated_at: now, + profile: existing.profile.clone(), + transport: existing.transport.clone(), + assignments, + tabs: if tabs.is_empty() { None } else { Some(tabs) }, + }; + + match tab_assignments::write_tab_assignments( + &session_id, + Some(&session_name), + &next_data, + ) { + Ok(written) => { + self.tab_assignment_store = Some(written); + return Ok(()); + } + Err(err) if err.contains("revision mismatch") => continue, + Err(err) => return Err(err), + } + } + + Err(format!( + "Failed to flush tab assignments for session '{}' after 3 CAS attempts", + session_id + )) + } + + pub async fn flush_tab_assignments_if_active( + &mut self, + status: &str, + runtime_fallback: Option<&AssignedTabMetadata>, + ) -> Result<(), String> { + if self.is_shutting_down { + return Ok(()); + } + self.flush_tab_assignments(status, runtime_fallback).await + } +} + async fn resolve_cdp_url(input: &str) -> Result { if input.starts_with("ws://") || input.starts_with("wss://") { return Ok(input.to_string()); @@ -1304,6 +1995,53 @@ async fn resolve_cdp_url(input: &str) -> Result { )) } +fn relay_response_bool(response: &Value, path: &[&str]) -> Option { + let mut current = response; + for key in path { + current = current.get(*key)?; + } + current.as_bool() +} + +fn extract_relay_string(response: &Value, paths: &[&[&str]]) -> Option { + paths.iter().find_map(|path| { + let mut current = response; + for key in *path { + current = current.get(*key)?; + } + current.as_str().map(ToString::to_string) + }) +} + +fn extract_relay_usize(response: &Value, paths: &[&[&str]]) -> Option { + paths.iter().find_map(|path| { + let mut current = response; + for key in *path { + current = current.get(*key)?; + } + current + .as_u64() + .and_then(|value| usize::try_from(value).ok()) + .or_else(|| { + current + .as_str() + .and_then(|value| value.parse::().ok()) + }) + }) +} + +fn extract_relay_error(response: &Value) -> Option { + extract_relay_string( + response, + &[ + &["error"], + &["message"], + &["data", "error"], + &["data", "message"], + ], + ) +} + #[cfg(test)] mod tests { use super::*; @@ -1312,19 +2050,30 @@ mod tests { #[test] fn test_validate_launch_options_extensions_and_cdp() { let ext = vec!["/path/to/ext".to_string()]; - assert!(validate_launch_options(Some(&ext), true, None, None, false, None,).is_err()); + assert!( + validate_launch_options(Some(&ext), ConnectionMode::Cdp, None, None, false, None,) + .is_err() + ); } #[test] fn test_validate_launch_options_profile_and_cdp() { - assert!(validate_launch_options(None, true, Some("/path"), None, false, None,).is_err()); + assert!(validate_launch_options( + None, + ConnectionMode::Cdp, + Some("/path"), + None, + false, + None, + ) + .is_err()); } #[test] fn test_validate_launch_options_storage_state_and_profile() { assert!(validate_launch_options( None, - false, + ConnectionMode::Local, Some("/profile"), Some("/state.json"), false, @@ -1336,23 +2085,48 @@ mod tests { #[test] fn test_validate_launch_options_storage_state_and_extensions() { let ext = vec!["/ext".to_string()]; - assert!( - validate_launch_options(Some(&ext), false, None, Some("/state.json"), false, None,) - .is_err() - ); + assert!(validate_launch_options( + Some(&ext), + ConnectionMode::Local, + None, + Some("/state.json"), + false, + None, + ) + .is_err()); } #[test] fn test_validate_launch_options_allow_file_access_firefox() { + assert!(validate_launch_options( + None, + ConnectionMode::Local, + None, + None, + true, + Some("/usr/bin/firefox"), + ) + .is_err()); + } + + #[test] + fn test_validate_launch_options_valid() { assert!( - validate_launch_options(None, false, None, None, true, Some("/usr/bin/firefox"),) - .is_err() + validate_launch_options(None, ConnectionMode::Local, None, None, false, None,).is_ok() ); } #[test] - fn test_validate_launch_options_valid() { - assert!(validate_launch_options(None, false, None, None, false, None,).is_ok()); + fn test_validate_launch_options_profile_with_extension_relay_is_allowed() { + assert!(validate_launch_options( + None, + ConnectionMode::ExtensionRelay, + Some("/profile"), + None, + false, + None, + ) + .is_ok()); } #[test] diff --git a/cli/src/native/daemon.rs b/cli/src/native/daemon.rs index faf616e0d..1f2122c11 100644 --- a/cli/src/native/daemon.rs +++ b/cli/src/native/daemon.rs @@ -17,6 +17,10 @@ use super::state; use super::stream::StreamServer; pub async fn run_daemon(session: &str) { + let session_id = session.to_string(); + let session_name = env::var("AGENT_BROWSER_SESSION_NAME") + .ok() + .filter(|value| !value.is_empty()); let socket_dir = get_daemon_socket_dir(); if !socket_dir.exists() { let _ = fs::create_dir_all(&socket_dir); @@ -72,6 +76,8 @@ pub async fn run_daemon(session: &str) { let result = run_socket_server( &socket_path, session, + &session_id, + session_name.as_deref(), stream_client, stream_server_instance, idle_timeout_ms, @@ -93,6 +99,8 @@ pub async fn run_daemon(session: &str) { async fn run_socket_server( socket_path: &PathBuf, _session: &str, + session_id: &str, + session_name: Option<&str>, stream_client: Option>>>>, stream_server: Option>, idle_timeout_ms: Option, @@ -102,9 +110,13 @@ async fn run_socket_server( let listener = UnixListener::bind(socket_path).map_err(|e| format!("Failed to bind socket: {}", e))?; - let state: std::sync::Arc> = std::sync::Arc::new( - tokio::sync::Mutex::new(DaemonState::new_with_stream(stream_client, stream_server)), - ); + let state: std::sync::Arc> = + std::sync::Arc::new(tokio::sync::Mutex::new(DaemonState::new_with_stream( + session_id.to_string(), + session_name.map(str::to_string), + stream_client, + stream_server, + ))); let (reset_tx, mut reset_rx) = mpsc::channel::<()>(64); let reset_tx = idle_timeout_ms.map(|_| Arc::new(reset_tx)); @@ -136,6 +148,7 @@ async fn run_socket_server( } }, if idle_timeout_ms.is_some() => { let mut s = state.lock().await; + let _ = s.flush_tab_assignments().await; if let Some(ref mut mgr) = s.browser { let _ = mgr.close().await; } @@ -146,6 +159,7 @@ async fn run_socket_server( } _ = shutdown_signal() => { let mut s = state.lock().await; + let _ = s.flush_tab_assignments().await; if let Some(ref mut mgr) = s.browser { let _ = mgr.close().await; } @@ -161,6 +175,8 @@ async fn run_socket_server( async fn run_socket_server( socket_path: &PathBuf, session: &str, + session_id: &str, + session_name: Option<&str>, stream_client: Option>>>>, stream_server: Option>, idle_timeout_ms: Option, @@ -176,9 +192,13 @@ async fn run_socket_server( let port_path = socket_dir.join(format!("{}.port", session)); let _ = fs::write(&port_path, port.to_string()); - let state: std::sync::Arc> = std::sync::Arc::new( - tokio::sync::Mutex::new(DaemonState::new_with_stream(stream_client, stream_server)), - ); + let state: std::sync::Arc> = + std::sync::Arc::new(tokio::sync::Mutex::new(DaemonState::new_with_stream( + session_id.to_string(), + session_name.map(str::to_string), + stream_client, + stream_server, + ))); let (reset_tx, mut reset_rx) = mpsc::channel::<()>(64); let reset_tx = idle_timeout_ms.map(|_| Arc::new(reset_tx)); @@ -210,6 +230,7 @@ async fn run_socket_server( } }, if idle_timeout_ms.is_some() => { let mut s = state.lock().await; + let _ = s.flush_tab_assignments().await; if let Some(ref mut mgr) = s.browser { let _ = mgr.close().await; } @@ -221,6 +242,7 @@ async fn run_socket_server( } _ = shutdown_signal() => { let mut s = state.lock().await; + let _ = s.flush_tab_assignments().await; if let Some(ref mut mgr) = s.browser { let _ = mgr.close().await; } @@ -290,6 +312,10 @@ async fn handle_connection( } if is_close { + { + let mut s = state.lock().await; + let _ = s.flush_tab_assignments().await; + } tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; process::exit(0); } diff --git a/cli/src/native/mod.rs b/cli/src/native/mod.rs index f86c816e8..6c7956813 100644 --- a/cli/src/native/mod.rs +++ b/cli/src/native/mod.rs @@ -37,6 +37,8 @@ pub mod storage; #[allow(dead_code)] pub mod stream; #[allow(dead_code)] +pub mod tab_assignments; +#[allow(dead_code)] pub mod tracing; #[allow(dead_code)] pub mod webdriver; diff --git a/cli/src/native/tab_assignments.rs b/cli/src/native/tab_assignments.rs new file mode 100644 index 000000000..dc518c761 --- /dev/null +++ b/cli/src/native/tab_assignments.rs @@ -0,0 +1,620 @@ +use aes_gcm::{aead::Aead, aead::KeyInit, Aes256Gcm}; +use base64::engine::general_purpose::STANDARD; +use base64::Engine as _; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::collections::HashMap; +use std::fs::{self, OpenOptions}; +use std::path::{Path, PathBuf}; +use time::{format_description::well_known::Rfc3339, OffsetDateTime}; +use uuid::Uuid; + +#[cfg(unix)] +use std::os::fd::AsRawFd; + +use super::state::get_sessions_dir; + +const TAB_ASSIGNMENTS_FILE_NAME: &str = "tab-assignments.json"; +const TAB_ASSIGNMENTS_SCHEMA_VERSION: u8 = 1; + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +pub enum TabAssignmentStatus { + #[serde(rename = "open")] + Open, + #[serde(rename = "assigned")] + Assigned, + #[serde(rename = "attached")] + Attached, + #[serde(rename = "detached")] + Detached, + #[serde(rename = "closed")] + Closed, + #[serde(rename = "orphaned")] + Orphaned, +} + +impl TabAssignmentStatus { + pub fn from_str(value: &str) -> Result { + match value { + "open" => Ok(Self::Open), + "assigned" => Ok(Self::Assigned), + "attached" => Ok(Self::Attached), + "detached" => Ok(Self::Detached), + "closed" => Ok(Self::Closed), + "orphaned" => Ok(Self::Orphaned), + other => Err(format!("Invalid tab assignment status '{}'", other)), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct TabAssignment { + pub agent_session_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub tab_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub target_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub window_id: Option, + pub status: TabAssignmentStatus, + #[serde(skip_serializing_if = "Option::is_none")] + pub lease_version: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub connection_id: Option, + pub assigned_at: String, + pub updated_at: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_known_url: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_known_title: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub fallback_index: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub context_ordinal: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct TabInfo { + pub tab_id: usize, + #[serde(skip_serializing_if = "Option::is_none")] + pub target_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub owner_session_id: Option, + pub status: TabAssignmentStatus, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_known_url: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_known_title: Option, + pub updated_at: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct TabAssignmentsProfile { + #[serde(skip_serializing_if = "Option::is_none")] + pub profile_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub user_data_dir: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub profile_directory: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct TabAssignmentsTransport { + #[serde(skip_serializing_if = "Option::is_none")] + pub kind: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub relay_url: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub extension_id: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct TabAssignmentsFile { + pub version: u8, + pub revision: u64, + pub session_name: String, + pub updated_at: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub profile: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub transport: Option, + #[serde(default)] + pub assignments: HashMap, + #[serde(skip_serializing_if = "Option::is_none")] + pub tabs: Option>, +} + +impl TabAssignmentsFile { + pub fn new(session_name: impl Into) -> Self { + Self { + version: TAB_ASSIGNMENTS_SCHEMA_VERSION, + revision: 0, + session_name: session_name.into(), + updated_at: current_timestamp(), + profile: None, + transport: None, + assignments: HashMap::new(), + tabs: None, + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct JsonEncryptedPayload { + version: u8, + encrypted: bool, + iv: String, + auth_tag: String, + data: String, +} + +pub fn current_timestamp() -> String { + OffsetDateTime::now_utc() + .format(&Rfc3339) + .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string()) +} + +pub fn read_tab_assignments( + session_id: &str, + session_name: Option<&str>, +) -> Result { + read_tab_assignments_in(&get_sessions_dir(), session_id, session_name) +} + +pub fn write_tab_assignments( + session_id: &str, + session_name: Option<&str>, + file: &TabAssignmentsFile, +) -> Result { + write_tab_assignments_in(&get_sessions_dir(), session_id, session_name, file) +} + +pub fn get_session_dir(session_id: &str, session_name: Option<&str>) -> Result { + get_session_dir_in(&get_sessions_dir(), session_id, session_name) +} + +pub fn get_tab_assignments_path( + session_id: &str, + session_name: Option<&str>, +) -> Result { + get_tab_assignments_path_in(&get_sessions_dir(), session_id, session_name) +} + +fn read_tab_assignments_in( + root: &Path, + session_id: &str, + session_name: Option<&str>, +) -> Result { + let resolved_session_name = resolve_session_name(session_id, session_name)?; + let path = get_tab_assignments_path_in(root, session_id, session_name)?; + if !path.exists() { + return Ok(TabAssignmentsFile::new(resolved_session_name)); + } + + let json = read_json_state_file(&path)?; + let mut file: TabAssignmentsFile = serde_json::from_str(&json).map_err(|e| { + format!( + "Failed to parse tab assignments from {}: {}", + path.display(), + e + ) + })?; + if file.version == 0 { + file.version = TAB_ASSIGNMENTS_SCHEMA_VERSION; + } + if file.session_name.is_empty() { + file.session_name = resolved_session_name; + } + Ok(file) +} + +fn write_tab_assignments_in( + root: &Path, + session_id: &str, + session_name: Option<&str>, + file: &TabAssignmentsFile, +) -> Result { + let path = get_tab_assignments_path_in(root, session_id, session_name)?; + let lock_path = path.with_extension("json.lock"); + let parent = path + .parent() + .ok_or_else(|| format!("Invalid tab assignments path: {}", path.display()))?; + fs::create_dir_all(parent).map_err(|e| { + format!( + "Failed to create tab assignments directory {}: {}", + parent.display(), + e + ) + })?; + + with_exclusive_lock(&lock_path, || { + let current = read_tab_assignments_in(root, session_id, session_name)?; + if current.revision != file.revision { + return Err(format!( + "Tab assignments revision mismatch at {}: expected {}, found {}", + path.display(), + file.revision, + current.revision + )); + } + + let mut next = file.clone(); + next.version = TAB_ASSIGNMENTS_SCHEMA_VERSION; + next.session_name = resolve_session_name(session_id, session_name)?; + if next.updated_at.is_empty() { + next.updated_at = current_timestamp(); + } + next.revision = current.revision + 1; + + let json = serde_json::to_string_pretty(&next) + .map_err(|e| format!("Failed to serialize tab assignments: {}", e))?; + let temp_path = get_temp_path(&path); + write_json_state_file(&temp_path, &json)?; + rename_atomic(&temp_path, &path)?; + Ok(next) + }) +} + +fn get_session_dir_in( + root: &Path, + session_id: &str, + session_name: Option<&str>, +) -> Result { + let session_component = resolve_session_name(session_id, session_name)?; + Ok(root.join(session_component)) +} + +fn get_tab_assignments_path_in( + root: &Path, + session_id: &str, + session_name: Option<&str>, +) -> Result { + Ok(get_session_dir_in(root, session_id, session_name)?.join(TAB_ASSIGNMENTS_FILE_NAME)) +} + +fn resolve_session_name(session_id: &str, session_name: Option<&str>) -> Result { + let candidate = session_name + .filter(|name| !name.is_empty()) + .unwrap_or(session_id); + validate_session_name(candidate)?; + Ok(candidate.to_string()) +} + +fn validate_session_name(name: &str) -> Result<(), String> { + let valid = !name.is_empty() + && name + .chars() + .all(|ch| ch.is_ascii_alphanumeric() || ch == '_' || ch == '-'); + if valid { + Ok(()) + } else { + Err(format!( + "Invalid session name '{}': expected only [a-zA-Z0-9_-]", + name + )) + } +} + +fn get_temp_path(path: &Path) -> PathBuf { + let filename = path + .file_name() + .and_then(|name| name.to_str()) + .unwrap_or(TAB_ASSIGNMENTS_FILE_NAME); + path.with_file_name(format!(".{}.tmp-{}", filename, Uuid::new_v4())) +} + +fn rename_atomic(from: &Path, to: &Path) -> Result<(), String> { + #[cfg(windows)] + { + if to.exists() { + fs::remove_file(to).map_err(|e| { + format!( + "Failed to replace existing tab assignments file {}: {}", + to.display(), + e + ) + })?; + } + } + + fs::rename(from, to).map_err(|e| { + format!( + "Failed to move tab assignments file from {} to {}: {}", + from.display(), + to.display(), + e + ) + }) +} + +fn read_json_state_file(path: &Path) -> Result { + let content = fs::read_to_string(path) + .map_err(|e| format!("Failed to read state file {}: {}", path.display(), e))?; + let parsed: Value = + serde_json::from_str(&content).map_err(|e| format!("Invalid JSON state file: {}", e))?; + + if let Ok(payload) = serde_json::from_value::(parsed.clone()) { + if payload.encrypted { + let key = std::env::var("AGENT_BROWSER_ENCRYPTION_KEY").map_err(|_| { + "Encrypted state file requires AGENT_BROWSER_ENCRYPTION_KEY".to_string() + })?; + return decrypt_json_payload(&payload, &key); + } + } + + Ok(content) +} + +fn write_json_state_file(path: &Path, content: &str) -> Result<(), String> { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).map_err(|e| { + format!( + "Failed to create state directory {}: {}", + parent.display(), + e + ) + })?; + } + + let serialized = if let Ok(key) = std::env::var("AGENT_BROWSER_ENCRYPTION_KEY") { + let payload = encrypt_json_payload(content, &key)?; + serde_json::to_string_pretty(&payload) + .map_err(|e| format!("Failed to serialize encrypted payload: {}", e))? + } else { + content.to_string() + }; + + fs::write(path, serialized) + .map_err(|e| format!("Failed to write state file {}: {}", path.display(), e)) +} + +fn with_exclusive_lock(lock_path: &Path, action: F) -> Result +where + F: FnOnce() -> Result, +{ + if let Some(parent) = lock_path.parent() { + fs::create_dir_all(parent).map_err(|e| { + format!( + "Failed to create lock directory {}: {}", + parent.display(), + e + ) + })?; + } + + let file = OpenOptions::new() + .create(true) + .truncate(false) + .read(true) + .write(true) + .open(lock_path) + .map_err(|e| format!("Failed to open lock file {}: {}", lock_path.display(), e))?; + let _guard = FileLockGuard::acquire(&file, lock_path)?; + action() +} + +struct FileLockGuard<'a> { + #[cfg(unix)] + file: &'a std::fs::File, +} + +impl<'a> FileLockGuard<'a> { + fn acquire(file: &'a std::fs::File, path: &Path) -> Result { + #[cfg(unix)] + { + let rc = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX) }; + if rc != 0 { + return Err(format!( + "Failed to lock tab assignments file {}: {}", + path.display(), + std::io::Error::last_os_error() + )); + } + Ok(Self { file }) + } + + #[cfg(not(unix))] + { + let _ = (file, path); + Ok(Self {}) + } + } +} + +impl Drop for FileLockGuard<'_> { + fn drop(&mut self) { + #[cfg(unix)] + unsafe { + let _ = libc::flock(self.file.as_raw_fd(), libc::LOCK_UN); + } + } +} + +fn encrypt_json_payload(plaintext: &str, key_str: &str) -> Result { + let key_bytes = parse_hex_key(key_str)?; + let cipher = + Aes256Gcm::new_from_slice(&key_bytes).map_err(|e| format!("Invalid key: {}", e))?; + + let mut nonce = [0u8; 12]; + getrandom::getrandom(&mut nonce).map_err(|e| format!("Failed to generate nonce: {}", e))?; + let ciphertext = cipher + .encrypt(aes_gcm::Nonce::from_slice(&nonce), plaintext.as_bytes()) + .map_err(|e| format!("Encryption failed: {}", e))?; + + if ciphertext.len() < 16 { + return Err("Ciphertext too short".to_string()); + } + + let split_at = ciphertext.len() - 16; + let (data, auth_tag) = ciphertext.split_at(split_at); + Ok(JsonEncryptedPayload { + version: 1, + encrypted: true, + iv: STANDARD.encode(nonce), + auth_tag: STANDARD.encode(auth_tag), + data: STANDARD.encode(data), + }) +} + +fn decrypt_json_payload(payload: &JsonEncryptedPayload, key_str: &str) -> Result { + let key_bytes = parse_hex_key(key_str)?; + let cipher = + Aes256Gcm::new_from_slice(&key_bytes).map_err(|e| format!("Invalid key: {}", e))?; + + let nonce = STANDARD + .decode(&payload.iv) + .map_err(|e| format!("Invalid IV encoding: {}", e))?; + let auth_tag = STANDARD + .decode(&payload.auth_tag) + .map_err(|e| format!("Invalid authTag encoding: {}", e))?; + let data = STANDARD + .decode(&payload.data) + .map_err(|e| format!("Invalid encrypted data encoding: {}", e))?; + + if nonce.len() != 12 { + return Err("Invalid IV length".to_string()); + } + + let mut combined = data; + combined.extend_from_slice(&auth_tag); + let decrypted = cipher + .decrypt(aes_gcm::Nonce::from_slice(&nonce), combined.as_ref()) + .map_err(|e| format!("Decryption failed: {}", e))?; + String::from_utf8(decrypted).map_err(|e| format!("Decrypted state is not valid UTF-8: {}", e)) +} + +fn parse_hex_key(key_str: &str) -> Result, String> { + let trimmed = key_str.trim(); + if trimmed.len() != 64 || !trimmed.chars().all(|c| c.is_ascii_hexdigit()) { + return Err("AGENT_BROWSER_ENCRYPTION_KEY must be a 64-character hex string".to_string()); + } + + let mut bytes = Vec::with_capacity(32); + for chunk in trimmed.as_bytes().chunks(2) { + let pair = std::str::from_utf8(chunk).map_err(|e| format!("Invalid hex key: {}", e))?; + let byte = u8::from_str_radix(pair, 16).map_err(|e| format!("Invalid hex key: {}", e))?; + bytes.push(byte); + } + Ok(bytes) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn temp_root() -> PathBuf { + let root = + std::env::temp_dir().join(format!("agent-browser-tab-assignments-{}", Uuid::new_v4())); + fs::create_dir_all(&root).unwrap(); + root + } + + #[test] + fn test_read_missing_returns_default_file() { + let root = temp_root(); + let file = read_tab_assignments_in(&root, "default", Some("named")).unwrap(); + assert_eq!(file.version, TAB_ASSIGNMENTS_SCHEMA_VERSION); + assert_eq!(file.revision, 0); + assert_eq!(file.session_name, "named"); + assert!(file.assignments.is_empty()); + assert!(file.tabs.is_none()); + } + + #[test] + fn test_write_then_read_roundtrip() { + let root = temp_root(); + let mut file = read_tab_assignments_in(&root, "default", Some("named")).unwrap(); + file.updated_at = current_timestamp(); + file.assignments.insert( + "default".to_string(), + TabAssignment { + agent_session_id: "default".to_string(), + tab_id: Some(1), + target_id: Some("target-1".to_string()), + window_id: Some(1), + status: TabAssignmentStatus::Assigned, + lease_version: Some(1), + connection_id: Some("conn-1".to_string()), + assigned_at: current_timestamp(), + updated_at: current_timestamp(), + last_known_url: Some("https://example.com".to_string()), + last_known_title: Some("Example".to_string()), + fallback_index: Some(0), + context_ordinal: Some(0), + }, + ); + + let written = write_tab_assignments_in(&root, "default", Some("named"), &file).unwrap(); + assert_eq!(written.revision, 1); + + let loaded = read_tab_assignments_in(&root, "default", Some("named")).unwrap(); + assert_eq!(loaded.revision, 1); + assert_eq!(loaded.assignments.len(), 1); + assert_eq!( + loaded.assignments["default"].status, + TabAssignmentStatus::Assigned + ); + } + + #[test] + fn test_write_rejects_stale_revision() { + let root = temp_root(); + let file = read_tab_assignments_in(&root, "default", Some("named")).unwrap(); + let _written = write_tab_assignments_in(&root, "default", Some("named"), &file).unwrap(); + + let mut stale = file.clone(); + stale.assignments.insert( + "other".to_string(), + TabAssignment { + agent_session_id: "other".to_string(), + tab_id: Some(2), + target_id: Some("target-2".to_string()), + window_id: None, + status: TabAssignmentStatus::Detached, + lease_version: None, + connection_id: None, + assigned_at: current_timestamp(), + updated_at: current_timestamp(), + last_known_url: None, + last_known_title: None, + fallback_index: Some(1), + context_ordinal: Some(0), + }, + ); + + let err = write_tab_assignments_in(&root, "default", Some("named"), &stale).unwrap_err(); + assert!(err.contains("revision mismatch")); + } + + #[test] + fn test_invalid_session_name_is_rejected() { + let root = temp_root(); + let err = get_tab_assignments_path_in(&root, "default", Some("../evil")).unwrap_err(); + assert!(err.contains("Invalid session name")); + } + + #[test] + fn test_temp_file_uses_dotfile_pattern() { + let path = PathBuf::from("/tmp/tab-assignments.json"); + let temp_path = get_temp_path(&path); + let file_name = temp_path + .file_name() + .and_then(|name| name.to_str()) + .unwrap(); + assert!(file_name.starts_with(".tab-assignments.json.tmp-")); + } + + #[test] + fn test_status_roundtrip() { + let json = serde_json::to_string(&TabAssignmentStatus::Orphaned).unwrap(); + assert_eq!(json, "\"orphaned\""); + let parsed: TabAssignmentStatus = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed, TabAssignmentStatus::Orphaned); + } +}