diff --git a/zbus/Cargo.toml b/zbus/Cargo.toml index e5494e01a..c66caab48 100644 --- a/zbus/Cargo.toml +++ b/zbus/Cargo.toml @@ -31,10 +31,11 @@ async-io = [ "futures-util/io", ] tokio = ["dep:tokio"] -vsock = ["dep:vsock", "dep:async-io"] -tokio-vsock = ["dep:tokio-vsock", "tokio"] +vsock = ["dep:vsock", "dep:async-io", "dbus-addr/vsock"] +tokio-vsock = ["dep:tokio-vsock", "tokio", "dbus-addr/vsock"] [dependencies] +dbus-addr = "0.1" serde = { version = "1.0", features = ["derive"] } serde_repr = "0.1.9" zvariant = { path = "../zvariant", version = "4.0.0", default-features = false, features = [ diff --git a/zbus/src/address/mod.rs b/zbus/src/address/mod.rs deleted file mode 100644 index 27175a910..000000000 --- a/zbus/src/address/mod.rs +++ /dev/null @@ -1,483 +0,0 @@ -//! D-Bus address handling. -//! -//! Server addresses consist of a transport name followed by a colon, and then an optional, -//! comma-separated list of keys and values in the form key=value. -//! -//! See also: -//! -//! * [Server addresses] in the D-Bus specification. -//! -//! [Server addresses]: https://dbus.freedesktop.org/doc/dbus-specification.html#addresses - -pub mod transport; - -use crate::{Error, Guid, OwnedGuid, Result}; -#[cfg(all(unix, not(target_os = "macos")))] -use nix::unistd::Uid; -use std::{collections::HashMap, env, str::FromStr}; - -use std::fmt::{Display, Formatter}; - -use self::transport::Stream; -pub use self::transport::Transport; - -/// A bus address -#[derive(Clone, Debug, PartialEq, Eq)] -#[non_exhaustive] -pub struct Address { - guid: Option, - transport: Transport, -} - -impl Address { - /// Create a new `Address` from a `Transport`. - pub fn new(transport: Transport) -> Self { - Self { - transport, - guid: None, - } - } - - /// Set the GUID for this address. - pub fn set_guid(mut self, guid: G) -> Result - where - G: TryInto, - G::Error: Into, - { - self.guid = Some(guid.try_into().map_err(Into::into)?); - - Ok(self) - } - - /// The transport details for this address. - pub fn transport(&self) -> &Transport { - &self.transport - } - - #[cfg_attr(any(target_os = "macos", windows), async_recursion::async_recursion)] - pub(crate) async fn connect(self) -> Result { - self.transport.connect().await - } - - /// Get the address for session socket respecting the DBUS_SESSION_BUS_ADDRESS environment - /// variable. If we don't recognize the value (or it's not set) we fall back to - /// $XDG_RUNTIME_DIR/bus - pub fn session() -> Result { - match env::var("DBUS_SESSION_BUS_ADDRESS") { - Ok(val) => Self::from_str(&val), - _ => { - #[cfg(windows)] - { - #[cfg(feature = "windows-gdbus")] - return Self::from_str("autolaunch:"); - - #[cfg(not(feature = "windows-gdbus"))] - return Self::from_str("autolaunch:scope=*user"); - } - - #[cfg(all(unix, not(target_os = "macos")))] - { - let runtime_dir = env::var("XDG_RUNTIME_DIR") - .unwrap_or_else(|_| format!("/run/user/{}", Uid::effective())); - let path = format!("unix:path={runtime_dir}/bus"); - - Self::from_str(&path) - } - - #[cfg(target_os = "macos")] - return Self::from_str("launchd:env=DBUS_LAUNCHD_SESSION_BUS_SOCKET"); - } - } - } - - /// Get the address for system bus respecting the DBUS_SYSTEM_BUS_ADDRESS environment - /// variable. If we don't recognize the value (or it's not set) we fall back to - /// /var/run/dbus/system_bus_socket - pub fn system() -> Result { - match env::var("DBUS_SYSTEM_BUS_ADDRESS") { - Ok(val) => Self::from_str(&val), - _ => { - #[cfg(all(unix, not(target_os = "macos")))] - return Self::from_str("unix:path=/var/run/dbus/system_bus_socket"); - - #[cfg(windows)] - return Self::from_str("autolaunch:"); - - #[cfg(target_os = "macos")] - return Self::from_str("launchd:env=DBUS_LAUNCHD_SESSION_BUS_SOCKET"); - } - } - } - - /// The GUID for this address, if known. - pub fn guid(&self) -> Option<&Guid<'_>> { - self.guid.as_ref().map(|guid| guid.inner()) - } -} - -impl Display for Address { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - self.transport.fmt(f)?; - - if let Some(guid) = &self.guid { - write!(f, ",guid={}", guid)?; - } - - Ok(()) - } -} - -impl FromStr for Address { - type Err = Error; - - /// Parse the transport part of a D-Bus address into a `Transport`. - fn from_str(address: &str) -> Result { - let col = address - .find(':') - .ok_or_else(|| Error::Address("address has no colon".to_owned()))?; - let transport = &address[..col]; - let mut options = HashMap::new(); - - if address.len() > col + 1 { - for kv in address[col + 1..].split(',') { - let (k, v) = match kv.find('=') { - Some(eq) => (&kv[..eq], &kv[eq + 1..]), - None => { - return Err(Error::Address( - "missing = when parsing key/value".to_owned(), - )) - } - }; - if options.insert(k, v).is_some() { - return Err(Error::Address(format!( - "Key `{k}` specified multiple times" - ))); - } - } - } - - Ok(Self { - guid: options - .remove("guid") - .map(|s| Guid::from_str(s).map(|guid| OwnedGuid::from(guid).to_owned())) - .transpose()?, - transport: Transport::from_options(transport, options)?, - }) - } -} - -impl TryFrom<&str> for Address { - type Error = Error; - - fn try_from(value: &str) -> Result { - Self::from_str(value) - } -} - -impl From for Address { - fn from(transport: Transport) -> Self { - Self::new(transport) - } -} - -#[cfg(test)] -mod tests { - use super::{ - transport::{Tcp, TcpTransportFamily, Transport}, - Address, - }; - #[cfg(target_os = "macos")] - use crate::address::transport::Launchd; - #[cfg(windows)] - use crate::address::transport::{Autolaunch, AutolaunchScope}; - use crate::{ - address::transport::{Unix, UnixPath}, - Error, - }; - use std::str::FromStr; - use test_log::test; - - #[test] - fn parse_dbus_addresses() { - match Address::from_str("").unwrap_err() { - Error::Address(e) => assert_eq!(e, "address has no colon"), - _ => panic!(), - } - match Address::from_str("foo").unwrap_err() { - Error::Address(e) => assert_eq!(e, "address has no colon"), - _ => panic!(), - } - match Address::from_str("foo:opt").unwrap_err() { - Error::Address(e) => assert_eq!(e, "missing = when parsing key/value"), - _ => panic!(), - } - match Address::from_str("foo:opt=1,opt=2").unwrap_err() { - Error::Address(e) => assert_eq!(e, "Key `opt` specified multiple times"), - _ => panic!(), - } - match Address::from_str("tcp:host=localhost").unwrap_err() { - Error::Address(e) => assert_eq!(e, "tcp address is missing `port`"), - _ => panic!(), - } - match Address::from_str("tcp:host=localhost,port=32f").unwrap_err() { - Error::Address(e) => assert_eq!(e, "invalid tcp `port`"), - _ => panic!(), - } - match Address::from_str("tcp:host=localhost,port=123,family=ipv7").unwrap_err() { - Error::Address(e) => assert_eq!(e, "invalid tcp address `family`: ipv7"), - _ => panic!(), - } - match Address::from_str("unix:foo=blah").unwrap_err() { - Error::Address(e) => assert_eq!(e, "unix: address is invalid"), - _ => panic!(), - } - #[cfg(target_os = "linux")] - match Address::from_str("unix:path=/tmp,abstract=foo").unwrap_err() { - Error::Address(e) => { - assert_eq!(e, "unix: address is invalid") - } - _ => panic!(), - } - assert_eq!( - Address::from_str("unix:path=/tmp/dbus-foo").unwrap(), - Transport::Unix(Unix::new(UnixPath::File("/tmp/dbus-foo".into()))).into(), - ); - #[cfg(target_os = "linux")] - assert_eq!( - Address::from_str("unix:abstract=/tmp/dbus-foo").unwrap(), - Transport::Unix(Unix::new(UnixPath::Abstract("/tmp/dbus-foo".into()))).into(), - ); - let guid = crate::Guid::generate(); - assert_eq!( - Address::from_str(&format!("unix:path=/tmp/dbus-foo,guid={guid}")).unwrap(), - Address::from(Transport::Unix(Unix::new(UnixPath::File( - "/tmp/dbus-foo".into() - )))) - .set_guid(guid.clone()) - .unwrap(), - ); - assert_eq!( - Address::from_str("tcp:host=localhost,port=4142").unwrap(), - Transport::Tcp(Tcp::new("localhost", 4142)).into(), - ); - assert_eq!( - Address::from_str("tcp:host=localhost,port=4142,family=ipv4").unwrap(), - Transport::Tcp(Tcp::new("localhost", 4142).set_family(Some(TcpTransportFamily::Ipv4))) - .into(), - ); - assert_eq!( - Address::from_str("tcp:host=localhost,port=4142,family=ipv6").unwrap(), - Transport::Tcp(Tcp::new("localhost", 4142).set_family(Some(TcpTransportFamily::Ipv6))) - .into(), - ); - assert_eq!( - Address::from_str("tcp:host=localhost,port=4142,family=ipv6,noncefile=/a/file/path") - .unwrap(), - Transport::Tcp( - Tcp::new("localhost", 4142) - .set_family(Some(TcpTransportFamily::Ipv6)) - .set_nonce_file(Some(b"/a/file/path".to_vec())) - ) - .into(), - ); - assert_eq!( - Address::from_str( - "nonce-tcp:host=localhost,port=4142,family=ipv6,noncefile=/a/file/path%20to%20file%201234" - ) - .unwrap(), - Transport::Tcp( - Tcp::new("localhost", 4142) - .set_family(Some(TcpTransportFamily::Ipv6)) - .set_nonce_file(Some(b"/a/file/path to file 1234".to_vec())) - ).into() - ); - #[cfg(windows)] - assert_eq!( - Address::from_str("autolaunch:").unwrap(), - Transport::Autolaunch(Autolaunch::new()).into(), - ); - #[cfg(windows)] - assert_eq!( - Address::from_str("autolaunch:scope=*my_cool_scope*").unwrap(), - Transport::Autolaunch( - Autolaunch::new() - .set_scope(Some(AutolaunchScope::Other("*my_cool_scope*".to_string()))) - ) - .into(), - ); - #[cfg(target_os = "macos")] - assert_eq!( - Address::from_str("launchd:env=my_cool_env_key").unwrap(), - Transport::Launchd(Launchd::new("my_cool_env_key")).into(), - ); - - #[cfg(all(feature = "vsock", not(feature = "tokio")))] - assert_eq!( - Address::from_str(&format!("vsock:cid=98,port=2934,guid={guid}")).unwrap(), - Address::from(Transport::Vsock(super::transport::Vsock::new(98, 2934))) - .set_guid(guid) - .unwrap(), - ); - assert_eq!( - Address::from_str("unix:dir=/some/dir").unwrap(), - Transport::Unix(Unix::new(UnixPath::Dir("/some/dir".into()))).into(), - ); - assert_eq!( - Address::from_str("unix:tmpdir=/some/dir").unwrap(), - Transport::Unix(Unix::new(UnixPath::TmpDir("/some/dir".into()))).into(), - ); - } - - #[test] - fn stringify_dbus_addresses() { - assert_eq!( - Address::from(Transport::Unix(Unix::new(UnixPath::File( - "/tmp/dbus-foo".into() - )))) - .to_string(), - "unix:path=/tmp/dbus-foo", - ); - assert_eq!( - Address::from(Transport::Unix(Unix::new(UnixPath::Dir( - "/tmp/dbus-foo".into() - )))) - .to_string(), - "unix:dir=/tmp/dbus-foo", - ); - assert_eq!( - Address::from(Transport::Unix(Unix::new(UnixPath::TmpDir( - "/tmp/dbus-foo".into() - )))) - .to_string(), - "unix:tmpdir=/tmp/dbus-foo" - ); - // FIXME: figure out how to handle abstract on Windows - #[cfg(target_os = "linux")] - assert_eq!( - Address::from(Transport::Unix(Unix::new(UnixPath::Abstract( - "/tmp/dbus-foo".into() - )))) - .to_string(), - "unix:abstract=/tmp/dbus-foo" - ); - assert_eq!( - Address::from(Transport::Tcp(Tcp::new("localhost", 4142))).to_string(), - "tcp:host=localhost,port=4142" - ); - assert_eq!( - Address::from(Transport::Tcp( - Tcp::new("localhost", 4142).set_family(Some(TcpTransportFamily::Ipv4)) - )) - .to_string(), - "tcp:host=localhost,port=4142,family=ipv4" - ); - assert_eq!( - Address::from(Transport::Tcp( - Tcp::new("localhost", 4142).set_family(Some(TcpTransportFamily::Ipv6)) - )) - .to_string(), - "tcp:host=localhost,port=4142,family=ipv6" - ); - assert_eq!( - Address::from(Transport::Tcp(Tcp::new("localhost", 4142) - .set_family(Some(TcpTransportFamily::Ipv6)) - .set_nonce_file(Some(b"/a/file/path to file 1234".to_vec()) - ))) - .to_string(), - "nonce-tcp:noncefile=/a/file/path%20to%20file%201234,host=localhost,port=4142,family=ipv6" - ); - #[cfg(windows)] - assert_eq!( - Address::from(Transport::Autolaunch(Autolaunch::new())).to_string(), - "autolaunch:" - ); - #[cfg(windows)] - assert_eq!( - Address::from(Transport::Autolaunch(Autolaunch::new().set_scope(Some( - AutolaunchScope::Other("*my_cool_scope*".to_string()) - )))) - .to_string(), - "autolaunch:scope=*my_cool_scope*" - ); - #[cfg(target_os = "macos")] - assert_eq!( - Address::from(Transport::Launchd(Launchd::new("my_cool_key"))).to_string(), - "launchd:env=my_cool_key" - ); - - #[cfg(all(feature = "vsock", not(feature = "tokio")))] - { - let guid = crate::Guid::generate(); - assert_eq!( - Address::from(Transport::Vsock(super::transport::Vsock::new(98, 2934))) - .set_guid(guid.clone()) - .unwrap() - .to_string(), - format!("vsock:cid=98,port=2934,guid={guid}"), - ); - } - } - - #[test] - fn connect_tcp() { - let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); - let port = listener.local_addr().unwrap().port(); - let addr = Address::from_str(&format!("tcp:host=localhost,port={port}")).unwrap(); - crate::utils::block_on(async { addr.connect().await }).unwrap(); - } - - #[test] - fn connect_nonce_tcp() { - struct PercentEncoded<'a>(&'a [u8]); - - impl std::fmt::Display for PercentEncoded<'_> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - super::transport::encode_percents(f, self.0) - } - } - - use std::io::Write; - - const TEST_COOKIE: &[u8] = b"VERILY SECRETIVE"; - - let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); - let port = listener.local_addr().unwrap().port(); - - let mut cookie = tempfile::NamedTempFile::new().unwrap(); - cookie.as_file_mut().write_all(TEST_COOKIE).unwrap(); - - let encoded_path = format!( - "{}", - PercentEncoded(cookie.path().to_str().unwrap().as_ref()) - ); - - let addr = Address::from_str(&format!( - "nonce-tcp:host=localhost,port={port},noncefile={encoded_path}" - )) - .unwrap(); - - let (sender, receiver) = std::sync::mpsc::sync_channel(1); - - std::thread::spawn(move || { - use std::io::Read; - - let mut client = listener.incoming().next().unwrap().unwrap(); - - let mut buf = [0u8; 16]; - client.read_exact(&mut buf).unwrap(); - - sender.send(buf == TEST_COOKIE).unwrap(); - }); - - crate::utils::block_on(addr.connect()).unwrap(); - - let saw_cookie = receiver - .recv_timeout(std::time::Duration::from_millis(100)) - .expect("nonce file content hasn't been received by server thread in time"); - - assert!( - saw_cookie, - "nonce file content has been received, but was invalid" - ); - } -} diff --git a/zbus/src/address/transport/autolaunch.rs b/zbus/src/address/transport/autolaunch.rs deleted file mode 100644 index 1276391d0..000000000 --- a/zbus/src/address/transport/autolaunch.rs +++ /dev/null @@ -1,77 +0,0 @@ -use crate::{Error, Result}; -use std::collections::HashMap; - -/// Transport properties of an autolaunch D-Bus address. -#[derive(Clone, Debug, PartialEq, Eq)] -pub struct Autolaunch { - pub(super) scope: Option, -} - -impl std::fmt::Display for Autolaunch { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "autolaunch:")?; - if let Some(scope) = &self.scope { - write!(f, "scope={}", scope)?; - } - - Ok(()) - } -} - -impl Autolaunch { - /// Create a new autolaunch transport. - pub fn new() -> Self { - Self { scope: None } - } - - /// Set the `autolaunch:` address `scope` value. - pub fn set_scope(mut self, scope: Option) -> Self { - self.scope = scope; - - self - } - - /// The optional scope. - pub fn scope(&self) -> Option<&AutolaunchScope> { - self.scope.as_ref() - } - - pub(super) fn from_options(opts: HashMap<&str, &str>) -> Result { - opts.get("scope") - .map(|scope| -> Result<_> { - let decoded = super::decode_percents(scope)?; - match decoded.as_slice() { - b"install-path" => Ok(AutolaunchScope::InstallPath), - b"user" => Ok(AutolaunchScope::User), - _ => String::from_utf8(decoded) - .map(AutolaunchScope::Other) - .map_err(|_| { - Error::Address("autolaunch scope is not valid UTF-8".to_owned()) - }), - } - }) - .transpose() - .map(|scope| Self { scope }) - } -} - -#[derive(Clone, Debug, PartialEq, Eq)] -#[non_exhaustive] -pub enum AutolaunchScope { - /// Limit session bus to dbus installation path. - InstallPath, - /// Limit session bus to the recent user. - User, - /// other values - specify dedicated session bus like "release", "debug" or other. - Other(String), -} - -impl std::fmt::Display for AutolaunchScope { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::InstallPath => write!(f, "*install-path"), - Self::User => write!(f, "*user"), - Self::Other(o) => write!(f, "{o}"), - } - } -} diff --git a/zbus/src/address/transport/launchd.rs b/zbus/src/address/transport/launchd.rs deleted file mode 100644 index f87b1c4cb..000000000 --- a/zbus/src/address/transport/launchd.rs +++ /dev/null @@ -1,60 +0,0 @@ -use super::{Transport, Unix, UnixPath}; -use crate::{process::run, Result}; -use std::collections::HashMap; - -#[derive(Clone, Debug, PartialEq, Eq)] -#[non_exhaustive] -/// The transport properties of a launchd D-Bus address. -pub struct Launchd { - pub(super) env: String, -} - -impl Launchd { - /// Create a new launchd D-Bus address. - pub fn new(env: &str) -> Self { - Self { - env: env.to_string(), - } - } - - /// The path of the unix domain socket for the launchd created dbus-daemon. - pub fn env(&self) -> &str { - &self.env - } - - /// Determine the actual transport details behin a launchd address. - pub(super) async fn bus_address(&self) -> Result { - let output = run("launchctl", ["getenv", self.env()]) - .await - .expect("failed to wait on launchctl output"); - - if !output.status.success() { - return Err(crate::Error::Address(format!( - "launchctl terminated with code: {}", - output.status - ))); - } - - let addr = String::from_utf8(output.stdout).map_err(|e| { - crate::Error::Address(format!("Unable to parse launchctl output as UTF-8: {}", e)) - })?; - - Ok(Transport::Unix(Unix::new(UnixPath::File( - addr.trim().into(), - )))) - } - - pub(super) fn from_options(opts: HashMap<&str, &str>) -> Result { - opts.get("env") - .ok_or_else(|| crate::Error::Address("missing env key".into())) - .map(|env| Self { - env: env.to_string(), - }) - } -} - -impl std::fmt::Display for Launchd { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "launchd:env={}", self.env) - } -} diff --git a/zbus/src/address/transport/mod.rs b/zbus/src/address/transport/mod.rs deleted file mode 100644 index 6315364a6..000000000 --- a/zbus/src/address/transport/mod.rs +++ /dev/null @@ -1,349 +0,0 @@ -//! D-Bus transport Information module. -//! -//! This module provides the trasport information for D-Bus addresses. - -#[cfg(windows)] -use crate::win32::windows_autolaunch_bus_address; -use crate::{Error, Result}; -#[cfg(not(feature = "tokio"))] -use async_io::Async; -use std::collections::HashMap; -#[cfg(not(feature = "tokio"))] -use std::net::TcpStream; -#[cfg(unix)] -use std::os::unix::net::{SocketAddr, UnixStream}; -#[cfg(feature = "tokio")] -use tokio::net::TcpStream; -#[cfg(feature = "tokio-vsock")] -use tokio_vsock::VsockStream; -#[cfg(windows)] -use uds_windows::UnixStream; -#[cfg(all(feature = "vsock", not(feature = "tokio")))] -use vsock::VsockStream; - -use std::{ - fmt::{Display, Formatter}, - str::from_utf8_unchecked, -}; - -mod unix; -pub use unix::{Unix, UnixPath}; -mod tcp; -pub use tcp::{Tcp, TcpTransportFamily}; -#[cfg(windows)] -mod autolaunch; -#[cfg(windows)] -pub use autolaunch::{Autolaunch, AutolaunchScope}; -#[cfg(target_os = "macos")] -mod launchd; -#[cfg(target_os = "macos")] -pub use launchd::Launchd; -#[cfg(any( - all(feature = "vsock", not(feature = "tokio")), - feature = "tokio-vsock" -))] -#[path = "vsock.rs"] -// Gotta rename to avoid name conflict with the `vsock` crate. -mod vsock_transport; -#[cfg(target_os = "linux")] -use std::os::linux::net::SocketAddrExt; -#[cfg(any( - all(feature = "vsock", not(feature = "tokio")), - feature = "tokio-vsock" -))] -pub use vsock_transport::Vsock; - -/// The transport properties of a D-Bus address. -#[derive(Clone, Debug, PartialEq, Eq)] -#[non_exhaustive] -pub enum Transport { - /// A Unix Domain Socket address. - Unix(Unix), - /// TCP address details - Tcp(Tcp), - /// autolaunch D-Bus address. - #[cfg(windows)] - Autolaunch(Autolaunch), - /// launchd D-Bus address. - #[cfg(target_os = "macos")] - Launchd(Launchd), - #[cfg(any( - all(feature = "vsock", not(feature = "tokio")), - feature = "tokio-vsock" - ))] - /// VSOCK address - /// - /// This variant is only available when either `vsock` or `tokio-vsock` feature is enabled. The - /// type of `stream` is `vsock::VsockStream` with `vsock` feature and - /// `tokio_vsock::VsockStream` with `tokio-vsock` feature. - Vsock(Vsock), -} - -impl Transport { - #[cfg_attr(any(target_os = "macos", windows), async_recursion::async_recursion)] - pub(super) async fn connect(self) -> Result { - match self { - Transport::Unix(unix) => { - // This is a `path` in case of Windows until uds_windows provides the needed API: - // https://github.com/haraldh/rust_uds_windows/issues/14 - let addr = match unix.take_path() { - #[cfg(unix)] - UnixPath::File(path) => SocketAddr::from_pathname(path)?, - #[cfg(windows)] - UnixPath::File(path) => path, - #[cfg(target_os = "linux")] - UnixPath::Abstract(name) => SocketAddr::from_abstract_name(name)?, - UnixPath::Dir(_) | UnixPath::TmpDir(_) => { - // you can't connect to a unix:dir - return Err(Error::Unsupported); - } - }; - let stream = crate::Task::spawn_blocking( - move || -> Result<_> { - #[cfg(unix)] - let stream = UnixStream::connect_addr(&addr)?; - #[cfg(windows)] - let stream = UnixStream::connect(addr)?; - stream.set_nonblocking(true)?; - - Ok(stream) - }, - "unix stream connection", - ) - .await?; - #[cfg(not(feature = "tokio"))] - { - Async::new(stream) - .map(Stream::Unix) - .map_err(|e| Error::InputOutput(e.into())) - } - - #[cfg(feature = "tokio")] - { - #[cfg(unix)] - { - tokio::net::UnixStream::from_std(stream) - .map(Stream::Unix) - .map_err(|e| Error::InputOutput(e.into())) - } - - #[cfg(not(unix))] - { - let _ = path; - Err(Error::Unsupported) - } - } - } - #[cfg(all(feature = "vsock", not(feature = "tokio")))] - Transport::Vsock(addr) => { - let stream = VsockStream::connect_with_cid_port(addr.cid(), addr.port())?; - Async::new(stream).map(Stream::Vsock).map_err(Into::into) - } - - #[cfg(feature = "tokio-vsock")] - Transport::Vsock(addr) => VsockStream::connect(addr.cid(), addr.port()) - .await - .map(Stream::Vsock) - .map_err(Into::into), - - Transport::Tcp(mut addr) => match addr.take_nonce_file() { - Some(nonce_file) => { - #[allow(unused_mut)] - let mut stream = addr.connect().await?; - - #[cfg(unix)] - let nonce_file = { - use std::os::unix::ffi::OsStrExt; - std::ffi::OsStr::from_bytes(&nonce_file) - }; - - #[cfg(windows)] - let nonce_file = std::str::from_utf8(&nonce_file).map_err(|_| { - Error::Address("nonce file path is invalid UTF-8".to_owned()) - })?; - - #[cfg(not(feature = "tokio"))] - { - let nonce = std::fs::read(nonce_file)?; - let mut nonce = &nonce[..]; - - while !nonce.is_empty() { - let len = stream - .write_with(|mut s| std::io::Write::write(&mut s, nonce)) - .await?; - nonce = &nonce[len..]; - } - } - - #[cfg(feature = "tokio")] - { - let nonce = tokio::fs::read(nonce_file).await?; - tokio::io::AsyncWriteExt::write_all(&mut stream, &nonce).await?; - } - - Ok(Stream::Tcp(stream)) - } - None => addr.connect().await.map(Stream::Tcp), - }, - - #[cfg(windows)] - Transport::Autolaunch(Autolaunch { scope }) => match scope { - Some(_) => Err(Error::Address( - "Autolaunch scopes are currently unsupported".to_owned(), - )), - None => { - let addr = windows_autolaunch_bus_address()?; - addr.connect().await - } - }, - - #[cfg(target_os = "macos")] - Transport::Launchd(launchd) => { - let addr = launchd.bus_address().await?; - addr.connect().await - } - } - } - - // Helper for `FromStr` impl of `Address`. - pub(super) fn from_options(transport: &str, options: HashMap<&str, &str>) -> Result { - match transport { - #[cfg(any(unix, not(feature = "tokio")))] - "unix" => Unix::from_options(options).map(Self::Unix), - "tcp" => Tcp::from_options(options, false).map(Self::Tcp), - "nonce-tcp" => Tcp::from_options(options, true).map(Self::Tcp), - #[cfg(any( - all(feature = "vsock", not(feature = "tokio")), - feature = "tokio-vsock" - ))] - "vsock" => Vsock::from_options(options).map(Self::Vsock), - #[cfg(windows)] - "autolaunch" => Autolaunch::from_options(options).map(Self::Autolaunch), - #[cfg(target_os = "macos")] - "launchd" => Launchd::from_options(options).map(Self::Launchd), - - _ => Err(Error::Address(format!( - "unsupported transport '{transport}'" - ))), - } - } -} - -#[cfg(not(feature = "tokio"))] -#[derive(Debug)] -pub(crate) enum Stream { - Unix(Async), - Tcp(Async), - #[cfg(feature = "vsock")] - Vsock(Async), -} - -#[cfg(feature = "tokio")] -#[derive(Debug)] -pub(crate) enum Stream { - #[cfg(unix)] - Unix(tokio::net::UnixStream), - Tcp(TcpStream), - #[cfg(feature = "tokio-vsock")] - Vsock(VsockStream), -} - -fn decode_hex(c: char) -> Result { - match c { - '0'..='9' => Ok(c as u8 - b'0'), - 'a'..='f' => Ok(c as u8 - b'a' + 10), - 'A'..='F' => Ok(c as u8 - b'A' + 10), - - _ => Err(Error::Address( - "invalid hexadecimal character in percent-encoded sequence".to_owned(), - )), - } -} - -pub(crate) fn decode_percents(value: &str) -> Result> { - let mut iter = value.chars(); - let mut decoded = Vec::new(); - - while let Some(c) = iter.next() { - if matches!(c, '-' | '0'..='9' | 'A'..='Z' | 'a'..='z' | '_' | '/' | '.' | '\\' | '*') { - decoded.push(c as u8) - } else if c == '%' { - decoded.push( - decode_hex(iter.next().ok_or_else(|| { - Error::Address("incomplete percent-encoded sequence".to_owned()) - })?)? - << 4 - | decode_hex(iter.next().ok_or_else(|| { - Error::Address("incomplete percent-encoded sequence".to_owned()) - })?)?, - ); - } else { - return Err(Error::Address("Invalid character in address".to_owned())); - } - } - - Ok(decoded) -} - -pub(super) fn encode_percents(f: &mut Formatter<'_>, mut value: &[u8]) -> std::fmt::Result { - const LOOKUP: &str = "\ -%00%01%02%03%04%05%06%07%08%09%0a%0b%0c%0d%0e%0f\ -%10%11%12%13%14%15%16%17%18%19%1a%1b%1c%1d%1e%1f\ -%20%21%22%23%24%25%26%27%28%29%2a%2b%2c%2d%2e%2f\ -%30%31%32%33%34%35%36%37%38%39%3a%3b%3c%3d%3e%3f\ -%40%41%42%43%44%45%46%47%48%49%4a%4b%4c%4d%4e%4f\ -%50%51%52%53%54%55%56%57%58%59%5a%5b%5c%5d%5e%5f\ -%60%61%62%63%64%65%66%67%68%69%6a%6b%6c%6d%6e%6f\ -%70%71%72%73%74%75%76%77%78%79%7a%7b%7c%7d%7e%7f\ -%80%81%82%83%84%85%86%87%88%89%8a%8b%8c%8d%8e%8f\ -%90%91%92%93%94%95%96%97%98%99%9a%9b%9c%9d%9e%9f\ -%a0%a1%a2%a3%a4%a5%a6%a7%a8%a9%aa%ab%ac%ad%ae%af\ -%b0%b1%b2%b3%b4%b5%b6%b7%b8%b9%ba%bb%bc%bd%be%bf\ -%c0%c1%c2%c3%c4%c5%c6%c7%c8%c9%ca%cb%cc%cd%ce%cf\ -%d0%d1%d2%d3%d4%d5%d6%d7%d8%d9%da%db%dc%dd%de%df\ -%e0%e1%e2%e3%e4%e5%e6%e7%e8%e9%ea%eb%ec%ed%ee%ef\ -%f0%f1%f2%f3%f4%f5%f6%f7%f8%f9%fa%fb%fc%fd%fe%ff"; - - loop { - let pos = value.iter().position( - |c| !matches!(c, b'-' | b'0'..=b'9' | b'A'..=b'Z' | b'a'..=b'z' | b'_' | b'/' | b'.' | b'\\' | b'*'), - ); - - if let Some(pos) = pos { - // SAFETY: The above `position()` call made sure that only ASCII chars are in the string - // up to `pos` - f.write_str(unsafe { from_utf8_unchecked(&value[..pos]) })?; - - let c = value[pos]; - value = &value[pos + 1..]; - - let pos = c as usize * 3; - f.write_str(&LOOKUP[pos..pos + 3])?; - } else { - // SAFETY: The above `position()` call made sure that only ASCII chars are in the rest - // of the string - f.write_str(unsafe { from_utf8_unchecked(value) })?; - return Ok(()); - } - } -} - -impl Display for Transport { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - Self::Tcp(tcp) => write!(f, "{}", tcp)?, - Self::Unix(unix) => write!(f, "{}", unix)?, - #[cfg(any( - all(feature = "vsock", not(feature = "tokio")), - feature = "tokio-vsock" - ))] - Self::Vsock(vsock) => write!(f, "{}", vsock)?, - #[cfg(windows)] - Self::Autolaunch(autolaunch) => write!(f, "{}", autolaunch)?, - #[cfg(target_os = "macos")] - Self::Launchd(launchd) => write!(f, "{}", launchd)?, - } - - Ok(()) - } -} diff --git a/zbus/src/address/transport/tcp.rs b/zbus/src/address/transport/tcp.rs deleted file mode 100644 index 81288d9a4..000000000 --- a/zbus/src/address/transport/tcp.rs +++ /dev/null @@ -1,229 +0,0 @@ -use super::encode_percents; -use crate::{Error, Result}; -#[cfg(not(feature = "tokio"))] -use async_io::Async; -#[cfg(not(feature = "tokio"))] -use std::net::{SocketAddr, TcpStream, ToSocketAddrs}; -use std::{ - collections::HashMap, - fmt::{Display, Formatter}, - str::FromStr, -}; -#[cfg(feature = "tokio")] -use tokio::net::TcpStream; - -/// A TCP transport in a D-Bus address. -#[derive(Clone, Debug, PartialEq, Eq)] -pub struct Tcp { - pub(super) host: String, - pub(super) bind: Option, - pub(super) port: u16, - pub(super) family: Option, - pub(super) nonce_file: Option>, -} - -impl Tcp { - /// Create a new TCP transport with the given host and port. - pub fn new(host: &str, port: u16) -> Self { - Self { - host: host.to_owned(), - port, - bind: None, - family: None, - nonce_file: None, - } - } - - /// Set the `tcp:` address `bind` value. - pub fn set_bind(mut self, bind: Option) -> Self { - self.bind = bind; - - self - } - - /// Set the `tcp:` address `family` value. - pub fn set_family(mut self, family: Option) -> Self { - self.family = family; - - self - } - - /// Set the `tcp:` address `noncefile` value. - pub fn set_nonce_file(mut self, nonce_file: Option>) -> Self { - self.nonce_file = nonce_file; - - self - } - - /// Returns the `tcp:` address `host` value. - pub fn host(&self) -> &str { - &self.host - } - - /// Returns the `tcp:` address `bind` value. - pub fn bind(&self) -> Option<&str> { - self.bind.as_deref() - } - - /// Returns the `tcp:` address `port` value. - pub fn port(&self) -> u16 { - self.port - } - - /// Returns the `tcp:` address `family` value. - pub fn family(&self) -> Option { - self.family - } - - /// The nonce file path, if any. - pub fn nonce_file(&self) -> Option<&[u8]> { - self.nonce_file.as_deref() - } - - /// Take ownership of the nonce file path, if any. - pub fn take_nonce_file(&mut self) -> Option> { - self.nonce_file.take() - } - - pub(super) fn from_options( - opts: HashMap<&str, &str>, - nonce_tcp_required: bool, - ) -> Result { - let bind = None; - if opts.contains_key("bind") { - return Err(Error::Address("`bind` isn't yet supported".into())); - } - - let host = opts - .get("host") - .ok_or_else(|| Error::Address("tcp address is missing `host`".into()))? - .to_string(); - let port = opts - .get("port") - .ok_or_else(|| Error::Address("tcp address is missing `port`".into()))?; - let port = port - .parse::() - .map_err(|_| Error::Address("invalid tcp `port`".into()))?; - let family = opts - .get("family") - .map(|f| TcpTransportFamily::from_str(f)) - .transpose()?; - let nonce_file = opts - .get("noncefile") - .map(|f| super::decode_percents(f)) - .transpose()?; - if nonce_tcp_required && nonce_file.is_none() { - return Err(Error::Address( - "nonce-tcp address is missing `noncefile`".into(), - )); - } - - Ok(Self { - host, - bind, - port, - family, - nonce_file, - }) - } - - #[cfg(not(feature = "tokio"))] - pub(super) async fn connect(self) -> Result> { - let addrs = crate::Task::spawn_blocking( - move || -> Result> { - let addrs = (self.host(), self.port()).to_socket_addrs()?.filter(|a| { - if let Some(family) = self.family() { - if family == TcpTransportFamily::Ipv4 { - a.is_ipv4() - } else { - a.is_ipv6() - } - } else { - true - } - }); - Ok(addrs.collect()) - }, - "connect tcp", - ) - .await - .map_err(|e| Error::Address(format!("Failed to receive TCP addresses: {e}")))?; - - // we could attempt connections in parallel? - let mut last_err = Error::Address("Failed to connect".into()); - for addr in addrs { - match Async::::connect(addr).await { - Ok(stream) => return Ok(stream), - Err(e) => last_err = e.into(), - } - } - - Err(last_err) - } - - #[cfg(feature = "tokio")] - pub(super) async fn connect(self) -> Result { - TcpStream::connect((self.host(), self.port())) - .await - .map_err(|e| Error::InputOutput(e.into())) - } -} - -impl Display for Tcp { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self.nonce_file() { - Some(nonce_file) => { - f.write_str("nonce-tcp:noncefile=")?; - encode_percents(f, nonce_file)?; - f.write_str(",")?; - } - None => f.write_str("tcp:")?, - } - f.write_str("host=")?; - - encode_percents(f, self.host().as_bytes())?; - - write!(f, ",port={}", self.port())?; - - if let Some(bind) = self.bind() { - f.write_str(",bind=")?; - encode_percents(f, bind.as_bytes())?; - } - - if let Some(family) = self.family() { - write!(f, ",family={family}")?; - } - - Ok(()) - } -} - -/// A `tcp:` address family. -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -pub enum TcpTransportFamily { - Ipv4, - Ipv6, -} - -impl FromStr for TcpTransportFamily { - type Err = Error; - - fn from_str(family: &str) -> Result { - match family { - "ipv4" => Ok(Self::Ipv4), - "ipv6" => Ok(Self::Ipv6), - _ => Err(Error::Address(format!( - "invalid tcp address `family`: {family}" - ))), - } - } -} - -impl Display for TcpTransportFamily { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - Self::Ipv4 => write!(f, "ipv4"), - Self::Ipv6 => write!(f, "ipv6"), - } - } -} diff --git a/zbus/src/address/transport/unix.rs b/zbus/src/address/transport/unix.rs deleted file mode 100644 index 9e9a8c9be..000000000 --- a/zbus/src/address/transport/unix.rs +++ /dev/null @@ -1,128 +0,0 @@ -use std::{ - collections::HashMap, - ffi::{OsStr, OsString}, - fmt::{Display, Formatter}, -}; - -#[cfg(unix)] -use super::encode_percents; - -/// A Unix domain socket transport in a D-Bus address. -#[derive(Clone, Debug, PartialEq, Eq)] -pub struct Unix { - path: UnixPath, -} - -impl Unix { - /// Create a new Unix transport with the given path. - pub fn new(path: UnixPath) -> Self { - Self { path } - } - - /// The path. - pub fn path(&self) -> &UnixPath { - &self.path - } - - /// Take the path, consuming `self`. - pub fn take_path(self) -> UnixPath { - self.path - } - - #[cfg(any(unix, not(feature = "tokio")))] - pub(super) fn from_options(opts: HashMap<&str, &str>) -> crate::Result { - let path = opts.get("path"); - let abs = opts.get("abstract"); - let dir = opts.get("dir"); - let tmpdir = opts.get("tmpdir"); - let path = match (path, abs, dir, tmpdir) { - (Some(p), None, None, None) => UnixPath::File(OsString::from(p)), - #[cfg(target_os = "linux")] - (None, Some(p), None, None) => UnixPath::Abstract(p.as_bytes().to_owned()), - #[cfg(not(target_os = "linux"))] - (None, Some(_), None, None) => { - return Err(crate::Error::Address( - "abstract sockets currently Linux-only".to_owned(), - )); - } - (None, None, Some(p), None) => UnixPath::Dir(OsString::from(p)), - (None, None, None, Some(p)) => UnixPath::TmpDir(OsString::from(p)), - _ => { - return Err(crate::Error::Address("unix: address is invalid".to_owned())); - } - }; - - Ok(Self::new(path)) - } -} - -impl Display for Unix { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "unix:{}", self.path) - } -} - -/// A Unix domain socket path in a D-Bus address. -#[derive(Clone, Debug, PartialEq, Eq)] -#[non_exhaustive] -pub enum UnixPath { - /// A path to a unix domain socket on the filesystem. - File(OsString), - /// A abstract unix domain socket name. - #[cfg(target_os = "linux")] - Abstract(Vec), - /// A listenable address using the specified path, in which a socket file with a random file - /// name starting with 'dbus-' will be created by the server. See [UNIX domain socket address] - /// reference documentation. - /// - /// This address is mostly relevant to server (typically bus broker) implementations. - /// - /// [UNIX domain socket address]: https://dbus.freedesktop.org/doc/dbus-specification.html#transports-unix-domain-sockets-addresses - Dir(OsString), - /// The same as UnixDir, except that on platforms with abstract sockets, the server may attempt - /// to create an abstract socket whose name starts with this directory instead of a path-based - /// socket. - /// - /// This address is mostly relevant to server (typically bus broker) implementations. - TmpDir(OsString), -} - -impl Display for UnixPath { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - fn fmt_unix_path(f: &mut Formatter<'_>, path: &OsStr) -> std::fmt::Result { - #[cfg(unix)] - { - use std::os::unix::ffi::OsStrExt; - - encode_percents(f, path.as_bytes())?; - } - - #[cfg(windows)] - write!(f, "{}", path.to_str().ok_or(std::fmt::Error)?)?; - - Ok(()) - } - - match self { - UnixPath::File(path) => { - f.write_str("path=")?; - fmt_unix_path(f, path)?; - } - #[cfg(target_os = "linux")] - UnixPath::Abstract(name) => { - f.write_str("abstract=")?; - encode_percents(f, name)?; - } - UnixPath::Dir(path) => { - f.write_str("dir=")?; - fmt_unix_path(f, path)?; - } - UnixPath::TmpDir(path) => { - f.write_str("tmpdir=")?; - fmt_unix_path(f, path)?; - } - } - - Ok(()) - } -} diff --git a/zbus/src/address/transport/vsock.rs b/zbus/src/address/transport/vsock.rs deleted file mode 100644 index 4b590192b..000000000 --- a/zbus/src/address/transport/vsock.rs +++ /dev/null @@ -1,49 +0,0 @@ -use crate::{Error, Result}; -use std::collections::HashMap; - -/// A `tcp:` D-Bus address. -#[derive(Clone, Debug, PartialEq, Eq)] -pub struct Vsock { - pub(super) cid: u32, - pub(super) port: u32, -} - -impl Vsock { - /// Create a new VSOCK address. - pub fn new(cid: u32, port: u32) -> Self { - Self { cid, port } - } - - /// The Client ID. - pub fn cid(&self) -> u32 { - self.cid - } - - /// The port. - pub fn port(&self) -> u32 { - self.port - } - - pub(super) fn from_options(opts: HashMap<&str, &str>) -> Result { - let cid = opts - .get("cid") - .ok_or_else(|| Error::Address("VSOCK address is missing cid=".into()))?; - let cid = cid - .parse::() - .map_err(|e| Error::Address(format!("Failed to parse VSOCK cid `{}`: {}", cid, e)))?; - let port = opts - .get("port") - .ok_or_else(|| Error::Address("VSOCK address is missing port=".into()))?; - let port = port - .parse::() - .map_err(|e| Error::Address(format!("Failed to parse VSOCK port `{}`: {}", port, e)))?; - - Ok(Self { cid, port }) - } -} - -impl std::fmt::Display for Vsock { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "vsock:cid={},port={}", self.cid, self.port) - } -} diff --git a/zbus/src/blocking/connection/builder.rs b/zbus/src/blocking/connection/builder.rs index 3e2ce9d6e..7e1329e41 100644 --- a/zbus/src/blocking/connection/builder.rs +++ b/zbus/src/blocking/connection/builder.rs @@ -10,10 +10,11 @@ use tokio::net::UnixStream; #[cfg(windows)] use uds_windows::UnixStream; +use dbus_addr::ToDBusAddrs; + use zvariant::{ObjectPath, Str}; use crate::{ - address::Address, blocking::Connection, names::{UniqueName, WellKnownName}, object_server::Interface, @@ -42,10 +43,9 @@ impl<'a> Builder<'a> { /// Create a builder for connection that will use the given [D-Bus bus address]. /// /// [D-Bus bus address]: https://dbus.freedesktop.org/doc/dbus-specification.html#addresses - pub fn address(address: A) -> Result + pub fn address<'t, A>(address: &'t A) -> Result where - A: TryInto
, - A::Error: Into, + A: ToDBusAddrs<'t> + ?Sized, { crate::connection::Builder::address(address).map(Self) } diff --git a/zbus/src/connection/builder.rs b/zbus/src/connection/builder.rs index ca3e8db51..74bdce27d 100644 --- a/zbus/src/connection/builder.rs +++ b/zbus/src/connection/builder.rs @@ -21,17 +21,19 @@ use uds_windows::UnixStream; #[cfg(all(feature = "vsock", not(feature = "tokio")))] use vsock::VsockStream; +use dbus_addr::{DBusAddr, ToDBusAddrs}; + use zvariant::{ObjectPath, Str}; use crate::{ - address::{self, Address}, async_lock::RwLock, names::{InterfaceName, UniqueName, WellKnownName}, object_server::Interface, - Connection, Error, Executor, Guid, Result, + Connection, Error, Executor, Guid, OwnedGuid, Result, }; use super::{ + connect::connect_addr, handshake::{AuthMechanism, Authenticated}, socket::{BoxedSplit, ReadHalf, Socket, Split, WriteHalf}, }; @@ -47,7 +49,7 @@ enum Target { feature = "tokio-vsock" ))] VsockStream(VsockStream), - Address(Address), + Address(Vec>), Socket(Split, Box>), } @@ -79,12 +81,12 @@ assert_impl_all!(Builder<'_>: Send, Sync, Unpin); impl<'a> Builder<'a> { /// Create a builder for the session/user message bus connection. pub fn session() -> Result { - Ok(Self::new(Target::Address(Address::session()?))) + Self::address(&dbus_addr::session()?) } /// Create a builder for the system-wide message bus connection. pub fn system() -> Result { - Ok(Self::new(Target::Address(Address::system()?))) + Self::address(&dbus_addr::system()?) } /// Create a builder for connection that will use the given [D-Bus bus address]. @@ -118,14 +120,17 @@ impl<'a> Builder<'a> { /// current session using `ibus address` command. /// /// [D-Bus bus address]: https://dbus.freedesktop.org/doc/dbus-specification.html#addresses - pub fn address(address: A) -> Result + pub fn address<'t, A>(address: &'t A) -> Result where - A: TryInto
, - A::Error: Into, + A: ToDBusAddrs<'t> + ?Sized, { - Ok(Self::new(Target::Address( - address.try_into().map_err(Into::into)?, - ))) + let addr = address + .to_dbus_addrs() + .filter_map(std::result::Result::ok) + .map(|a| a.to_owned()) + .collect(); + + Ok(Builder::new(Target::Address(addr))) } /// Create a builder for connection that will use the given unix stream. @@ -161,7 +166,7 @@ impl<'a> Builder<'a> { /// Create a builder for connection that will use the given socket. pub fn socket(socket: S) -> Self { - Self::new(Target::Socket(Split::new_boxed(socket))) + Self::new(Target::Socket(socket.into())) } /// Specify the mechanisms to use during authentication. @@ -343,17 +348,11 @@ impl<'a> Builder<'a> { } async fn build_(mut self, executor: Executor<'static>) -> Result { - let mut stream = self.stream_for_target().await?; + let (mut stream, server_guid) = self.target_connect().await?; let mut auth = match self.guid { None => { - let guid = match self.target { - Some(Target::Address(ref addr)) => { - addr.guid().map(|guid| guid.to_owned().into()) - } - _ => None, - }; // SASL Handshake - Authenticated::client(stream, guid, self.auth_mechanisms).await? + Authenticated::client(stream, server_guid, self.auth_mechanisms).await? } Some(guid) => { if !self.p2p { @@ -456,36 +455,31 @@ impl<'a> Builder<'a> { } } - async fn stream_for_target(&mut self) -> Result { - // SAFETY: `self.target` is always `Some` from the beginning and this methos is only called + async fn target_connect(&mut self) -> Result<(BoxedSplit, Option)> { + // SAFETY: `self.target` is always `Some` from the beginning and this method is only called // once. - Ok(match self.target.take().unwrap() { + let split = match self.target.take().unwrap() { #[cfg(not(feature = "tokio"))] - Target::UnixStream(stream) => Split::new_boxed(Async::new(stream)?), + Target::UnixStream(stream) => Async::new(stream)?.into(), #[cfg(all(unix, feature = "tokio"))] - Target::UnixStream(stream) => Split::new_boxed(stream), + Target::UnixStream(stream) => stream.into(), #[cfg(all(not(unix), feature = "tokio"))] Target::UnixStream(_) => return Err(Error::Unsupported), #[cfg(not(feature = "tokio"))] - Target::TcpStream(stream) => Split::new_boxed(Async::new(stream)?), + Target::TcpStream(stream) => Async::new(stream)?.into(), #[cfg(feature = "tokio")] - Target::TcpStream(stream) => Split::new_boxed(stream), + Target::TcpStream(stream) => stream.into(), #[cfg(all(feature = "vsock", not(feature = "tokio")))] - Target::VsockStream(stream) => Split::new_boxed(Async::new(stream)?), + Target::VsockStream(stream) => Async::new(stream)?.into(), #[cfg(feature = "tokio-vsock")] - Target::VsockStream(stream) => Split::new_boxed(stream), - Target::Address(address) => match address.connect().await? { - #[cfg(any(unix, not(feature = "tokio")))] - address::transport::Stream::Unix(stream) => Split::new_boxed(stream), - address::transport::Stream::Tcp(stream) => Split::new_boxed(stream), - #[cfg(any( - all(feature = "vsock", not(feature = "tokio")), - feature = "tokio-vsock" - ))] - address::transport::Stream::Vsock(stream) => Split::new_boxed(stream), - }, + Target::VsockStream(stream) => stream.into(), + Target::Address(address) => { + return connect_addr(&address).await; + } Target::Socket(stream) => stream, - }) + }; + + Ok((split, None)) } } diff --git a/zbus/src/connection/connect.rs b/zbus/src/connection/connect.rs new file mode 100644 index 000000000..6591590b8 --- /dev/null +++ b/zbus/src/connection/connect.rs @@ -0,0 +1,60 @@ +use dbus_addr::{transport::Transport, DBusAddr}; +use std::{future::Future, pin::Pin}; +use tracing::debug; + +use crate::{Error, Guid, OwnedGuid, Result}; + +use super::socket::{self, BoxedSplit}; + +mod macos; +mod win32; + +type ConnectResult = Result<(BoxedSplit, Option)>; + +fn connect(addr: &DBusAddr<'_>) -> Pin>> { + let addr = addr.to_owned(); + Box::pin(async move { + let guid = match addr.guid() { + Some(g) => Some(Guid::try_from(g.as_ref())?.into()), + _ => None, + }; + let split = match addr.transport()? { + Transport::Tcp(t) => socket::tcp::connect(&t).await?.into(), + Transport::NonceTcp(t) => socket::tcp::connect_nonce(&t).await?.into(), + #[cfg(any(unix, not(feature = "tokio")))] + Transport::Unix(u) => socket::unix::connect(&u).await?.into(), + #[cfg(any( + all(feature = "vsock", not(feature = "tokio")), + feature = "tokio-vsock" + ))] + Transport::Vsock(v) => socket::vsock::connect(&v).await?.into(), + #[cfg(target_os = "macos")] + Transport::Launchd(l) => macos::connect(&l).await?.into(), + #[cfg(target_os = "windows")] + Transport::Autolaunch(l) => { + return win32::connect(&l).await; + } + _ => { + return Err(Error::Address(format!("Unhandled address: {}", addr))); + } + }; + Ok((split, guid)) + }) +} + +pub(crate) async fn connect_addr( + address: &[DBusAddr<'_>], +) -> Result<(BoxedSplit, Option)> { + for addr in address { + match connect(addr).await { + Ok(res) => { + return Ok(res); + } + Err(e) => { + debug!("Failed to connect to: {}", e); + continue; + } + } + } + Err(Error::Address("No connectable address".into())) +} diff --git a/zbus/src/connection/connect/macos.rs b/zbus/src/connection/connect/macos.rs new file mode 100644 index 000000000..4bbbfe80f --- /dev/null +++ b/zbus/src/connection/connect/macos.rs @@ -0,0 +1,50 @@ +#![cfg(target_os = "macos")] + +use dbus_addr::{transport::Transport, DBusAddr}; + +use super::socket; +use crate::{process::run, Error, Result}; + +async fn launchd_bus_address(env_key: &str) -> Result> { + let output = run("launchctl", ["getenv", env_key]) + .await + .expect("failed to wait on launchctl output"); + + if !output.status.success() { + return Err(Error::Address(format!( + "launchctl terminated with code: {}", + output.status + ))); + } + + let addr = String::from_utf8(output.stdout) + .map_err(|e| Error::Address(format!("Unable to parse launchctl output as UTF-8: {}", e)))?; + + Ok(format!("unix:path={}", addr.trim()).try_into()?) +} + +pub(crate) async fn connect(l: &dbus_addr::transport::Launchd<'_>) -> Result { + let addr = launchd_bus_address(l.env()).await?; + + match addr.transport()? { + Transport::Unix(t) => socket::unix::connect(&t).await, + _ => Err(Error::Address(format!("Address is unsupported: {}", addr))), + } +} + +#[cfg(test)] +mod tests { + use dbus_addr::{transport::Transport, DBusAddr}; + + #[test] + fn connect_launchd_session_bus() { + let addr: DBusAddr<'_> = "launchd:env=DBUS_LAUNCHD_SESSION_BUS_SOCKET" + .try_into() + .unwrap(); + let launchd = match addr.transport().unwrap() { + Transport::Launchd(l) => l, + _ => unreachable!(), + }; + crate::utils::block_on(super::connect(&launchd)).unwrap(); + } +} diff --git a/zbus/src/connection/connect/win32.rs b/zbus/src/connection/connect/win32.rs new file mode 100644 index 000000000..86bc200e9 --- /dev/null +++ b/zbus/src/connection/connect/win32.rs @@ -0,0 +1,40 @@ +#![cfg(target_os = "windows")] + +use dbus_addr::{transport::Transport, DBusAddr}; + +use super::BoxedSplit; +use crate::{win32::windows_autolaunch_bus_address, Error, OwnedGuid, Result}; + +pub(crate) async fn connect( + l: &dbus_addr::transport::Autolaunch<'_>, +) -> Result<(BoxedSplit, Option)> { + if l.scope().is_some() { + return Err(Error::Address( + "autolaunch with scope isn't supported yet".into(), + )); + } + + let addr: DBusAddr<'_> = windows_autolaunch_bus_address()?.try_into()?; + + if let Transport::Autolaunch(_) = addr.transport()? { + return Err(Error::Address("Recursive autolaunch: address".into())); + } + + super::connect(&addr).await +} + +#[cfg(test)] +mod tests { + #[cfg(feature = "windows-gdbus")] + #[test] + fn connect_gdbus_session_bus() { + use dbus_addr::{transport::Transport, DBusAddr}; + + let addr: DBusAddr<'_> = "autolaunch:".try_into().unwrap(); + let autolaunch = match addr.transport().unwrap() { + Transport::Autolaunch(l) => l, + _ => unreachable!(), + }; + crate::utils::block_on(super::connect(&autolaunch)).unwrap(); + } +} diff --git a/zbus/src/connection/handshake.rs b/zbus/src/connection/handshake.rs index b3b33a3fa..0d512cb8c 100644 --- a/zbus/src/connection/handshake.rs +++ b/zbus/src/connection/handshake.rs @@ -1045,7 +1045,7 @@ mod tests { use super::*; - use crate::{connection::socket::Split, Guid, Socket}; + use crate::{Guid, Socket}; fn create_async_socket_pair() -> (impl AsyncWrite + Socket, impl AsyncWrite + Socket) { // Tokio needs us to call the sync function from async context. :shrug: @@ -1071,9 +1071,9 @@ mod tests { let (p0, p1) = create_async_socket_pair(); let guid = OwnedGuid::from(Guid::generate()); - let client = ClientHandshake::new(Split::new_boxed(p0), None, Some(guid.clone())); + let client = ClientHandshake::new(p0.into(), None, Some(guid.clone())); let server = ServerHandshake::new( - Split::new_boxed(p1), + p1.into(), guid, Some(Uid::effective().into()), None, @@ -1097,7 +1097,7 @@ mod tests { fn pipelined_handshake() { let (mut p0, p1) = create_async_socket_pair(); let server = ServerHandshake::new( - Split::new_boxed(p1), + p1.into(), Guid::generate().into(), Some(Uid::effective().into()), None, @@ -1126,7 +1126,7 @@ mod tests { fn separate_external_data() { let (mut p0, p1) = create_async_socket_pair(); let server = ServerHandshake::new( - Split::new_boxed(p1), + p1.into(), Guid::generate().into(), Some(Uid::effective().into()), None, @@ -1153,7 +1153,7 @@ mod tests { fn missing_external_data() { let (mut p0, p1) = create_async_socket_pair(); let server = ServerHandshake::new( - Split::new_boxed(p1), + p1.into(), Guid::generate().into(), Some(Uid::effective().into()), None, @@ -1171,7 +1171,7 @@ mod tests { fn anonymous_handshake() { let (mut p0, p1) = create_async_socket_pair(); let server = ServerHandshake::new( - Split::new_boxed(p1), + p1.into(), Guid::generate().into(), Some(Uid::effective().into()), Some(vec![AuthMechanism::Anonymous].into()), @@ -1189,7 +1189,7 @@ mod tests { fn separate_anonymous_data() { let (mut p0, p1) = create_async_socket_pair(); let server = ServerHandshake::new( - Split::new_boxed(p1), + p1.into(), Guid::generate().into(), Some(Uid::effective().into()), Some(vec![AuthMechanism::Anonymous].into()), diff --git a/zbus/src/connection/mod.rs b/zbus/src/connection/mod.rs index 9e134c938..1a757ec81 100644 --- a/zbus/src/connection/mod.rs +++ b/zbus/src/connection/mod.rs @@ -44,6 +44,8 @@ use socket_reader::SocketReader; pub(crate) mod handshake; use handshake::Authenticated; +mod connect; + const DEFAULT_MAX_QUEUED: usize = 64; const DEFAULT_MAX_METHOD_RETURN_QUEUED: usize = 8; @@ -1570,28 +1572,6 @@ mod tests { ) } - #[cfg(all(windows, feature = "windows-gdbus"))] - #[test] - fn connect_gdbus_session_bus() { - let addr = crate::win32::windows_autolaunch_bus_address() - .expect("Unable to get GDBus session bus address"); - - crate::block_on(async { addr.connect().await }).expect("Unable to connect to session bus"); - } - - #[cfg(target_os = "macos")] - #[test] - fn connect_launchd_session_bus() { - use crate::address::{transport::Launchd, Address, Transport}; - crate::block_on(async { - let addr = Address::from(Transport::Launchd(Launchd::new( - "DBUS_LAUNCHD_SESSION_BUS_SOCKET", - ))); - addr.connect().await - }) - .expect("Unable to connect to session bus"); - } - #[test] #[timeout(15000)] fn disconnect_on_drop() { diff --git a/zbus/src/connection/socket/mod.rs b/zbus/src/connection/socket/mod.rs index 3e96ecac2..18cb07414 100644 --- a/zbus/src/connection/socket/mod.rs +++ b/zbus/src/connection/socket/mod.rs @@ -1,9 +1,9 @@ mod split; pub use split::{BoxedSplit, Split}; -mod tcp; -mod unix; -mod vsock; +pub(crate) mod tcp; +pub(crate) mod unix; +pub(crate) mod vsock; #[cfg(not(feature = "tokio"))] use async_io::Async; diff --git a/zbus/src/connection/socket/split.rs b/zbus/src/connection/socket/split.rs index dc26082c1..403b97568 100644 --- a/zbus/src/connection/socket/split.rs +++ b/zbus/src/connection/socket/split.rs @@ -8,16 +8,6 @@ pub struct Split { } impl Split { - /// Create a new boxed `Split` from `socket`. - pub fn new_boxed>(socket: S) -> BoxedSplit { - let split = socket.split(); - - Split { - read: Box::new(split.read), - write: Box::new(split.write), - } - } - /// Reference to the read half. pub fn read(&self) -> &R { &self.read @@ -46,3 +36,14 @@ impl Split { /// A boxed `Split`. pub type BoxedSplit = Split, Box>; + +impl From for BoxedSplit { + fn from(socket: S) -> Self { + let split = socket.split(); + + Split { + read: Box::new(split.read), + write: Box::new(split.write), + } + } +} diff --git a/zbus/src/connection/socket/tcp.rs b/zbus/src/connection/socket/tcp.rs index 0e4dd4fc2..6a6b1bf76 100644 --- a/zbus/src/connection/socket/tcp.rs +++ b/zbus/src/connection/socket/tcp.rs @@ -1,13 +1,17 @@ #[cfg(not(feature = "tokio"))] -use crate::fdo::ConnectionCredentials; -#[cfg(not(feature = "tokio"))] use async_io::Async; +use dbus_addr::transport::TcpFamily; use std::io; #[cfg(unix)] use std::os::fd::BorrowedFd; #[cfg(not(feature = "tokio"))] use std::{net::TcpStream, sync::Arc}; +#[allow(unused_imports)] +use crate::fdo::ConnectionCredentials; + +use crate::{Error, Result}; + use super::{ReadHalf, RecvmsgResult, WriteHalf}; #[cfg(feature = "tokio")] use super::{Socket, Split}; @@ -102,6 +106,19 @@ impl Socket for tokio::net::TcpStream { } } +#[cfg(feature = "tokio")] +#[cfg(windows)] +fn win32_credentials_from_addr(addr: &std::net::SocketAddr) -> io::Result { + use crate::win32::{socket_addr_get_pid, ProcessToken}; + + let pid = socket_addr_get_pid(addr)? as _; + let sid = ProcessToken::open(if pid != 0 { Some(pid as _) } else { None }) + .and_then(|process_token| process_token.sid())?; + Ok(ConnectionCredentials::default() + .set_process_id(pid) + .set_windows_sid(sid)) +} + #[cfg(feature = "tokio")] #[async_trait::async_trait] impl ReadHalf for tokio::net::tcp::OwnedReadHalf { @@ -119,21 +136,8 @@ impl ReadHalf for tokio::net::tcp::OwnedReadHalf { } #[cfg(windows)] - fn peer_sid(&self) -> Option { - use crate::win32::{socket_addr_get_pid, ProcessToken}; - - let peer_addr = match self.peer_addr() { - Ok(addr) => addr, - Err(_) => return None, - }; - - if let Ok(pid) = socket_addr_get_pid(&peer_addr) { - if let Ok(process_token) = ProcessToken::open(if pid != 0 { Some(pid) } else { None }) { - return process_token.sid().ok(); - } - } - - None + async fn peer_credentials(&mut self) -> io::Result { + win32_credentials_from_addr(&self.peer_addr()?) } } @@ -161,4 +165,187 @@ impl WriteHalf for tokio::net::tcp::OwnedWriteHalf { async fn close(&mut self) -> io::Result<()> { tokio::io::AsyncWriteExt::shutdown(self).await } + + #[cfg(windows)] + async fn peer_credentials(&mut self) -> io::Result { + win32_credentials_from_addr(&self.peer_addr()?) + } +} + +#[cfg(not(feature = "tokio"))] +type Stream = Async; +#[cfg(feature = "tokio")] +type Stream = tokio::net::TcpStream; + +async fn connect_with(host: &str, port: u16, family: Option) -> Result { + #[cfg(not(feature = "tokio"))] + { + use std::net::ToSocketAddrs; + + let host = host.to_string(); + let addrs = crate::Task::spawn_blocking( + move || -> Result> { + let addrs = (host, port).to_socket_addrs()?.filter(|a| { + if let Some(family) = family { + if family == TcpFamily::IPv4 { + a.is_ipv4() + } else { + a.is_ipv6() + } + } else { + true + } + }); + Ok(addrs.collect()) + }, + "connect tcp", + ) + .await + .map_err(|e| Error::Address(format!("Failed to receive TCP addresses: {e}")))?; + + // we could attempt connections in parallel? + let mut last_err = Error::Address("Failed to connect".into()); + for addr in addrs { + match Stream::connect(addr).await { + Ok(stream) => return Ok(stream), + Err(e) => last_err = e.into(), + } + } + + Err(last_err) + } + + #[cfg(feature = "tokio")] + { + // FIXME: doesn't handle family + let _ = family; + Stream::connect((host, port)) + .await + .map_err(|e| Error::InputOutput(e.into())) + } +} + +pub(crate) async fn connect(addr: &dbus_addr::transport::Tcp<'_>) -> Result { + let Some(host) = addr.host() else { + return Err(Error::Address("No host in address".into())); + }; + let Some(port) = addr.port() else { + return Err(Error::Address("No port in address".into())); + }; + + connect_with(host, port, addr.family()).await +} + +pub(crate) async fn connect_nonce(addr: &dbus_addr::transport::NonceTcp<'_>) -> Result { + let Some(host) = addr.host() else { + return Err(Error::Address("No host in address".into())); + }; + let Some(port) = addr.port() else { + return Err(Error::Address("No port in address".into())); + }; + let Some(noncefile) = addr.noncefile() else { + return Err(Error::Address("No noncefile in address".into())); + }; + + #[allow(unused_mut)] + let mut stream = connect_with(host, port, addr.family()).await?; + + #[cfg(not(feature = "tokio"))] + { + use std::io::prelude::*; + + let nonce = std::fs::read(noncefile)?; + let mut nonce = &nonce[..]; + + while !nonce.is_empty() { + let len = stream.write_with(|mut s| s.write(nonce)).await?; + nonce = &nonce[len..]; + } + } + + #[cfg(feature = "tokio")] + { + let nonce = tokio::fs::read(noncefile).await?; + tokio::io::AsyncWriteExt::write_all(&mut stream, &nonce).await?; + } + + Ok(stream) +} + +#[cfg(test)] +mod tests { + use dbus_addr::{encode_percents, transport::Transport, DBusAddr}; + + #[test] + fn connect() { + let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); + let port = listener.local_addr().unwrap().port(); + let addr: DBusAddr<'_> = format!("tcp:host=localhost,port={port}") + .try_into() + .unwrap(); + let tcp = match addr.transport().unwrap() { + Transport::Tcp(tcp) => tcp, + _ => unreachable!(), + }; + crate::utils::block_on(super::connect(&tcp)).unwrap(); + } + + #[test] + fn connect_nonce_tcp() { + struct PercentEncoded<'a>(&'a [u8]); + + impl std::fmt::Display for PercentEncoded<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + encode_percents(f, self.0) + } + } + + use std::io::Write; + + const TEST_COOKIE: &[u8] = b"VERILY SECRETIVE"; + + let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); + let port = listener.local_addr().unwrap().port(); + + let mut cookie = tempfile::NamedTempFile::new().unwrap(); + cookie.as_file_mut().write_all(TEST_COOKIE).unwrap(); + + let encoded_path = format!( + "{}", + PercentEncoded(cookie.path().to_str().unwrap().as_ref()) + ); + + let addr: DBusAddr<'_> = + format!("nonce-tcp:host=localhost,port={port},noncefile={encoded_path}") + .try_into() + .unwrap(); + let tcp = match addr.transport().unwrap() { + Transport::NonceTcp(tcp) => tcp, + _ => unreachable!(), + }; + + let (sender, receiver) = std::sync::mpsc::sync_channel(1); + + std::thread::spawn(move || { + use std::io::Read; + + let mut client = listener.incoming().next().unwrap().unwrap(); + + let mut buf = [0u8; 16]; + client.read_exact(&mut buf).unwrap(); + + sender.send(buf == TEST_COOKIE).unwrap(); + }); + + crate::utils::block_on(super::connect_nonce(&tcp)).unwrap(); + + let saw_cookie = receiver + .recv_timeout(std::time::Duration::from_millis(100)) + .expect("nonce file content hasn't been received by server thread in time"); + + assert!( + saw_cookie, + "nonce file content has been received, but was invalid" + ); + } } diff --git a/zbus/src/connection/socket/unix.rs b/zbus/src/connection/socket/unix.rs index a7c8d5d9c..956b801bf 100644 --- a/zbus/src/connection/socket/unix.rs +++ b/zbus/src/connection/socket/unix.rs @@ -3,7 +3,7 @@ use async_io::Async; use std::io; #[cfg(unix)] use std::os::unix::io::{AsRawFd, BorrowedFd, FromRawFd, RawFd}; -#[cfg(all(unix, not(feature = "tokio")))] +#[cfg(unix)] use std::os::unix::net::UnixStream; #[cfg(not(feature = "tokio"))] use std::sync::Arc; @@ -26,9 +26,13 @@ use nix::{ use super::{ReadHalf, RecvmsgResult, WriteHalf}; #[cfg(feature = "tokio")] use super::{Socket, Split}; + +#[allow(unused_imports)] use crate::fdo::ConnectionCredentials; #[cfg(unix)] use crate::utils::FDS_MAX; +#[cfg(any(unix, not(feature = "tokio")))] +use crate::{Error, Result}; #[cfg(all(unix, not(feature = "tokio")))] #[async_trait::async_trait] @@ -384,3 +388,64 @@ fn send_zero_byte_blocking(fd: RawFd) -> io::Result { ) .map_err(|e| e.into()) } + +#[cfg(not(feature = "tokio"))] +pub(crate) type Stream = Async; +#[cfg(all(unix, feature = "tokio"))] +pub(crate) type Stream = tokio::net::UnixStream; + +#[cfg(any(unix, not(feature = "tokio")))] +pub(crate) async fn connect(addr: &dbus_addr::transport::Unix<'_>) -> Result { + use dbus_addr::transport::UnixAddrKind; + #[cfg(target_os = "linux")] + use std::os::linux::net::SocketAddrExt; + #[cfg(unix)] + use std::os::unix::net::SocketAddr; + + let kind = addr.kind(); + + // This is a `path` in case of Windows until uds_windows provides the needed API: + // https://github.com/haraldh/rust_uds_windows/issues/14 + let addr = match kind { + #[cfg(unix)] + UnixAddrKind::Path(p) => SocketAddr::from_pathname(std::path::Path::new(p))?, + #[cfg(windows)] + UnixAddrKind::Path(p) => p.clone().into_owned(), + #[cfg(target_os = "linux")] + UnixAddrKind::Abstract(name) => SocketAddr::from_abstract_name(name)?, + _ => return Err(Error::Address("Address is not connectable".into())), + }; + + let stream = crate::Task::spawn_blocking( + move || -> Result<_> { + #[cfg(unix)] + let stream = UnixStream::connect_addr(&addr)?; + #[cfg(windows)] + let stream = UnixStream::connect(addr)?; + stream.set_nonblocking(true)?; + + Ok(stream) + }, + "unix stream connection", + ) + .await?; + + #[cfg(not(feature = "tokio"))] + { + Async::new(stream).map_err(|e| Error::InputOutput(e.into())) + } + + #[cfg(feature = "tokio")] + { + #[cfg(unix)] + { + tokio::net::UnixStream::from_std(stream).map_err(|e| Error::InputOutput(e.into())) + } + + #[cfg(not(unix))] + { + let _ = stream; + Err(Error::Unsupported) + } + } +} diff --git a/zbus/src/connection/socket/vsock.rs b/zbus/src/connection/socket/vsock.rs index ec26e487b..bfb94cb0a 100644 --- a/zbus/src/connection/socket/vsock.rs +++ b/zbus/src/connection/socket/vsock.rs @@ -1,3 +1,13 @@ +#[cfg(all(feature = "vsock", not(feature = "tokio")))] +#[cfg(not(feature = "tokio"))] +use async_io::Async; + +#[cfg(any( + all(feature = "vsock", not(feature = "tokio")), + feature = "tokio-vsock" +))] +use crate::{Error, Result}; + #[cfg(feature = "tokio-vsock")] use super::{Socket, Split}; @@ -104,3 +114,37 @@ impl super::WriteHalf for tokio_vsock::WriteHalf { tokio::io::AsyncWriteExt::shutdown(self).await } } + +#[cfg(all(feature = "vsock", not(feature = "tokio")))] +type Stream = Async; +#[cfg(feature = "tokio-vsock")] +type Stream = tokio_vsock::VsockStream; + +#[cfg(any( + all(feature = "vsock", not(feature = "tokio")), + feature = "tokio-vsock" +))] +pub(crate) async fn connect(addr: &dbus_addr::transport::Vsock<'_>) -> Result { + let Some(cid) = addr.cid() else { + return Err(Error::Address("No cid in address".into())); + }; + let Some(port) = addr.port() else { + return Err(Error::Address("No port in address".into())); + }; + + #[cfg(all(feature = "vsock", not(feature = "tokio")))] + { + let stream = crate::Task::spawn_blocking( + move || vsock::VsockStream::connect_with_cid_port(cid, port), + "connect vsock", + ) + .await + .map_err(|e| Error::Address(format!("Failed to connect: {e}")))?; + Ok(Async::new(stream).map_err(|e| Error::InputOutput(e.into()))?) + } + + #[cfg(feature = "tokio-vsock")] + Stream::connect(cid, port) + .await + .map_err(|e| Error::InputOutput(e.into())) +} diff --git a/zbus/src/error.rs b/zbus/src/error.rs index dfee91039..141f173ef 100644 --- a/zbus/src/error.rs +++ b/zbus/src/error.rs @@ -59,6 +59,8 @@ pub enum Error { MissingParameter(&'static str), /// Serial number in the message header is 0 (which is invalid). InvalidSerial, + /// DBus address error + DBusAddr(dbus_addr::Error), } assert_impl_all!(Error: Send, Sync, Unpin); @@ -85,6 +87,7 @@ impl PartialEq for Error { (Self::NameTaken, Self::NameTaken) => true, (Error::InputOutput(_), Self::InputOutput(_)) => false, (Self::Failure(s1), Self::Failure(s2)) => s1 == s2, + (Self::DBusAddr(s), Self::DBusAddr(o)) => s == o, (_, _) => false, } } @@ -113,6 +116,7 @@ impl error::Error for Error { Error::Failure(_) => None, Error::MissingParameter(_) => None, Error::InvalidSerial => None, + Error::DBusAddr(e) => Some(e), } } } @@ -147,6 +151,7 @@ impl fmt::Display for Error { write!(f, "Parameter `{}` was not specified but it is required", p) } Error::InvalidSerial => write!(f, "Serial number in the message header is 0"), + Error::DBusAddr(e) => write!(f, "{e}"), } } } @@ -176,6 +181,7 @@ impl Clone for Error { Error::Failure(e) => Error::Failure(e.clone()), Error::MissingParameter(p) => Error::MissingParameter(p), Error::InvalidSerial => Error::InvalidSerial, + Error::DBusAddr(e) => Error::DBusAddr(e.clone()), } } } @@ -186,6 +192,12 @@ impl From for Error { } } +impl From for Error { + fn from(e: dbus_addr::Error) -> Self { + Error::DBusAddr(e) + } +} + #[cfg(unix)] impl From for Error { fn from(val: nix::Error) -> Self { diff --git a/zbus/src/lib.rs b/zbus/src/lib.rs index e444bfd60..f53e4ab81 100644 --- a/zbus/src/lib.rs +++ b/zbus/src/lib.rs @@ -41,9 +41,6 @@ pub use dbus_error::*; mod error; pub use error::*; -pub mod address; -pub use address::Address; - mod guid; pub use guid::*; diff --git a/zbus/src/win32.rs b/zbus/src/win32.rs index 106dc5450..2cc322aa9 100644 --- a/zbus/src/win32.rs +++ b/zbus/src/win32.rs @@ -28,7 +28,6 @@ use windows_sys::Win32::{ }, }; -use crate::Address; #[cfg(not(feature = "tokio"))] use uds_windows::UnixStream; @@ -314,7 +313,7 @@ fn read_shm(name: &str) -> Result, crate::Error> { Ok(data.to_bytes().to_owned()) } -pub fn windows_autolaunch_bus_address() -> Result { +pub fn windows_autolaunch_bus_address() -> Result { let mutex = Mutex::new("DBusAutolaunchMutex")?; let _guard = mutex.lock(); @@ -322,7 +321,7 @@ pub fn windows_autolaunch_bus_address() -> Result { let addr = String::from_utf8(addr) .map_err(|e| crate::Error::Address(format!("Unable to parse address as UTF-8: {}", e)))?; - addr.parse() + Ok(addr) } #[cfg(test)]