diff --git a/Cargo.toml b/Cargo.toml index 2a9c4e0e..a27d98f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "nx" -version = "0.4.1" +version = "0.5.0" authors = ["XorTroll", "Pantsman0"] edition = "2024" include = [ @@ -8,10 +8,29 @@ include = [ "nx_derive" ] +[features] +default = [] +services = [] +smc = [] +gpu = ["services"] +vty = ["canvas", "dep:embedded-term", "dep:embedded-graphics-core"] +console = ["fonts"] +canvas = ["gpu", "dep:line_drawing"] +fonts = ["canvas", "dep:ab_glyph", "dep:font8x8"] +truetype = ["fonts"] +fs = ["services", "dep:embedded-io"] +input = ["services", "applet"] +la = ["services"] +rand = ["services", "dep:rand"] +socket = ["services", "dep:embedded-io"] +applet = ["services"] +mii = ["services"] + + [dependencies] paste = "1.0" logpacket = { git = "https://github.com/aarch64-switch-rs/logpacket", tag = "0.1.0"} -arrayvec = { version = "0.7.4", default-features = false } +arrayvec = { version = "0.7.6", default-features = false } static_assertions = "1.1.0" lock_api = { version = "0.4.12", features = ["nightly"] } atomic_enum = "0.3.0" @@ -20,10 +39,13 @@ nx-derive = { path = "nx-derive/" } bitfield-struct = "0.11.0" num-derive = "0.4.2" enum-iterator = "2.1.0" +enum_dispatch = "0.3.13" -[dependencies.libc] -version = "0.2" -default-features = false + +[dependencies.embedded-io] +version = "0.6.1" +optional = true +features = ["alloc"] [dependencies.embedded-term] version = "0.1.1" @@ -40,7 +62,7 @@ version = "1.0.1" [dependencies.ab_glyph] optional = true -version = "0.2" +version = "0.2.31" default-features = false features = ["variable-fonts", "libm"] @@ -56,7 +78,7 @@ default-features = false features = ["libm"] [dependencies.unwinding] -version = "0.2.6" +version = "0.2.8" default-features = false features = ["unwinder", "panic", "fde-custom", "personality"] @@ -67,24 +89,7 @@ features = [ "const_mut_refs", "alloc_ref", "use_spin" ] [dependencies.rand] optional = true -version = "0.9" +version = "0.9.2" default-features = false features = ["alloc"] -[features] -default = [] -services = [] -smc = [] -gpu = ["services"] -vty = ["canvas", "dep:embedded-term", "dep:embedded-graphics-core"] -console = ["fonts"] -canvas = ["gpu", "dep:line_drawing"] -fonts = ["canvas", "dep:ab_glyph", "dep:font8x8"] -truetype = ["fonts"] -fs = ["services"] -input = ["services", "applet"] -la = ["services"] -rand = ["services", "dep:rand"] -socket = ["services"] -applet = ["services"] -mii = ["services"] \ No newline at end of file diff --git a/src/fs.rs b/src/fs.rs index c64564d9..22feb5dc 100755 --- a/src/fs.rs +++ b/src/fs.rs @@ -15,6 +15,9 @@ use alloc::sync::Arc; use alloc::vec::Vec; use core::mem as cmem; use core::ops::DerefMut; +use embedded_io::ErrorType; +pub use embedded_io::SeekFrom; +pub use embedded_io::Write; pub mod rc; @@ -53,7 +56,7 @@ pub trait File: Sync { fn write(&mut self, offset: usize, buf: &[u8], option: FileWriteOption) -> Result<()>; /// Flushes the pending file writes. - fn flush(&mut self) -> Result<()>; + fn flush(&self) -> Result<()>; /// Sets the file size. /// @@ -283,7 +286,7 @@ impl File for ProxyFile { .write(option, offset, buf.len(), ipc_sf::Buffer::from_array(buf)) } - fn flush(&mut self) -> Result<()> { + fn flush(&self) -> Result<()> { self.file_obj.flush() } @@ -498,14 +501,6 @@ impl FileSystem for ProxyFileSystem { } } -/// Represents an offset kind/relativeness. -#[allow(missing_docs)] -pub enum SeekFrom { - Start(usize), - Current(isize), - End(isize), -} - /// Represents a wrapper type to simplify file access, tracking the currently seek-ed location in the file. pub struct FileAccessor { file: Box, @@ -550,11 +545,13 @@ impl FileAccessor { /// * `offset`: The offset to seek to. pub fn seek(&mut self, pos: SeekFrom) -> Result<()> { match pos { - SeekFrom::Start(offset) => self.offset = offset, - SeekFrom::Current(offset) => self.offset = self.offset.saturating_add_signed(offset), + SeekFrom::Start(offset) => self.offset = offset as _, + SeekFrom::Current(offset) => { + self.offset = self.offset.saturating_add_signed(offset as _) + } SeekFrom::End(offset) => { let size = self.get_size()?; - self.offset = size.saturating_add_signed(offset); + self.offset = size.saturating_add_signed(offset as _); } }; Ok(()) @@ -593,18 +590,23 @@ impl FileAccessor { Ok(t) } - // TODO (writes): some sort of "flush" flag to not always flush after writing? - /// Writes data from the given array /// /// # Arguments /// /// * `arr`: The input array - pub fn write_array(&mut self, arr: &[T]) -> Result<()> { + pub fn write_array(&mut self, arr: &[T]) -> Result<()> { let transmuted: &[u8] = unsafe { core::slice::from_raw_parts(arr.as_ptr() as _, core::mem::size_of_val(arr)) }; - self.file - .write(self.offset, transmuted, FileWriteOption::Flush())?; + self.file.write( + self.offset, + transmuted, + if FLUSH { + FileWriteOption::Flush() + } else { + FileWriteOption::None() + }, + )?; self.offset += transmuted.len(); Ok(()) } @@ -614,21 +616,64 @@ impl FileAccessor { /// # Arguments /// /// * `t`: The value to write - pub fn write_val(&mut self, t: &T) -> Result<()> { + pub fn write_val(&mut self, t: &T) -> Result<()> { let transmuted = unsafe { core::slice::from_raw_parts(t as *const T as *const u8, cmem::size_of::()) }; - self.file - .write(self.offset, transmuted, FileWriteOption::Flush())?; + self.file.write( + self.offset, + transmuted, + if FLUSH { + FileWriteOption::Flush() + } else { + FileWriteOption::None() + }, + )?; self.offset += transmuted.len(); Ok(()) } + + /// Flushes reads/writes to the underlying file. + /// + /// It is provided, but it is not currently required as all writes to [`FileAccessor`] send the `Flush` flag. + pub fn flush(&self) -> Result<()> { + self.file.flush() + } +} + +impl ErrorType for FileAccessor { + type Error = ResultCode; +} + +impl embedded_io::Read for FileAccessor { + fn read(&mut self, buf: &mut [u8]) -> Result { + if buf.len() == 0 { + return Ok(0); + } + + self.read_array(buf) + } +} + +impl embedded_io::Write for FileAccessor { + fn write(&mut self, buf: &[u8]) -> Result { + if buf.len() == 0 { + return Ok(0); + } + + self.write_array::(buf).map(|_| buf.len()) + } + + fn flush(&mut self) -> Result<()> { + FileAccessor::flush(&*self) + } } impl core::fmt::Write for FileAccessor { fn write_str(&mut self, s: &str) -> core::fmt::Result { - self.write_array(s.as_bytes()).map_err(|_| core::fmt::Error) + self.write_array::(s.as_bytes()) + .map_err(|_| core::fmt::Error) } } /// Represents a wrapper type to simplify directory access @@ -1034,8 +1079,8 @@ pub fn open_file(path: &str, option: FileOpenOption) -> Result { } }; - let offset: usize = match option.contains(FileOpenOption::Append()) { - true => file.get_size().unwrap_or(0), + let offset: u64 = match option.contains(FileOpenOption::Append()) { + true => file.get_size().unwrap_or(0) as _, false => 0, }; diff --git a/src/ipc.rs b/src/ipc.rs index 9c899719..a035d114 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -80,14 +80,14 @@ impl ObjectInfo { ipc_client_send_control_command!([*self; cmif::ControlRequestId::ConvertCurrentObjectToDomain] () => (domain_object_id: cmif::DomainObjectId)) } - pub fn query_pointer_buffer_size(&mut self) -> Result { + pub fn query_pointer_buffer_size(&self) -> Result { if self.uses_tipc_protocol() { return super::rc::ResultNotSupported::make_err(); } ipc_client_send_control_command!([*self; cmif::ControlRequestId::QueryPointerBufferSize] () => (pointer_buffer_size: u16)) } - pub fn clone_current_object(&mut self) -> Result { + pub fn clone_current_object(&self) -> Result { if self.uses_tipc_protocol() { return super::rc::ResultNotSupported::make_err(); } diff --git a/src/ipc/client.rs b/src/ipc/client.rs index 367eb1c7..01e4b39d 100644 --- a/src/ipc/client.rs +++ b/src/ipc/client.rs @@ -163,7 +163,7 @@ pub trait IClientObject { self.get_session().object_info } - fn set_info(&mut self, info: ObjectInfo) { + unsafe fn set_info(&mut self, info: ObjectInfo) { self.get_session_mut().set_info(info); } @@ -171,7 +171,7 @@ pub trait IClientObject { self.get_session_mut().convert_to_domain() } - fn query_own_pointer_buffer_size(&mut self) -> Result { + fn query_own_pointer_buffer_size(&self) -> Result { self.get_info().query_pointer_buffer_size() } @@ -179,11 +179,24 @@ pub trait IClientObject { self.get_session_mut().close() } - fn is_valid(&mut self) -> bool { + fn is_valid(&self) -> bool { self.get_info().is_valid() } - fn is_domain(&mut self) -> bool { + fn is_domain(&self) -> bool { self.get_info().is_domain() } + + fn clone(&self) -> Result + where + Self: Sized, + { + let handle = self.get_info().clone_current_object()?; + Ok(Self::new(sf::Session { + object_info: ObjectInfo { + handle: handle.handle, + ..self.get_info() + }, + })) + } } diff --git a/src/ipc/sf/bsd.rs b/src/ipc/sf/bsd.rs index 35d28689..c0364019 100644 --- a/src/ipc/sf/bsd.rs +++ b/src/ipc/sf/bsd.rs @@ -655,7 +655,7 @@ impl From> for Linger { #[derive(Copy, Clone, Debug, Request, Response)] #[repr(C)] -pub enum FnCtlCmd { +pub enum FcntlCmd { /// Duplicate file descriptor DupFd = 0, /// Get file descriptor flags (close on exec) @@ -918,7 +918,7 @@ pub trait Bsd { fn listen(&self, sockfd: i32, backlog: i32) -> BsdResult<()>; #[ipc_rid(20)] - fn fnctl(&self, sockfd: i32, cmd: FnCtlCmd, flags: i32) -> BsdResult<()>; + fn fcntl(&self, sockfd: i32, cmd: FcntlCmd, flags: i32) -> BsdResult<()>; #[ipc_rid(21)] fn set_sock_opt( diff --git a/src/ipc/sf/fsp.rs b/src/ipc/sf/fsp.rs index 358961ed..ce359c9c 100755 --- a/src/ipc/sf/fsp.rs +++ b/src/ipc/sf/fsp.rs @@ -128,7 +128,7 @@ pub trait File { buf: sf::InNonSecureMapAliasBuffer<'_, u8>, ); #[ipc_rid(2)] - fn flush(&mut self); + fn flush(&self); #[ipc_rid(3)] fn set_size(&mut self, size: usize); #[ipc_rid(4)] diff --git a/src/result.rs b/src/result.rs index 606a3640..e5fcd95a 100755 --- a/src/result.rs +++ b/src/result.rs @@ -180,3 +180,30 @@ pub trait ResultBase { result_define! { Success: 0, 0 } + +#[cfg(any(feature = "fs", feature = "socket"))] +impl embedded_io::Error for ResultCode { + fn kind(&self) -> embedded_io::ErrorKind { + use embedded_io::ErrorKind; + match (self.get_module(), self.get_description()) { + #[cfg(feature = "socket")] + (crate::socket::rc::RESULT_MODULE, errno) => match errno { + 1004 => ErrorKind::Interrupted, + 1005 => ErrorKind::WriteZero, + 1011 => ErrorKind::TimedOut, + 1032 => ErrorKind::BrokenPipe, + _ => ErrorKind::Other, + }, + #[cfg(feature = "fs")] + (crate::fs::rc::RESULT_SUBMODULE, errno) => match errno { + 4000..=4999 => ErrorKind::InvalidData, + 6003..=6199 => ErrorKind::InvalidInput, + 6602 | 6603 => ErrorKind::NotFound, + 6300..=6399 => ErrorKind::Unsupported, + 6400..=6449 => ErrorKind::PermissionDenied, + _ => ErrorKind::Other, + }, + _ => ErrorKind::Other, + } + } +} diff --git a/src/socket.rs b/src/socket.rs index 0af5ec53..2a502e59 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -1,11 +1,18 @@ //! Implementation of the Rust libstd TCP/UDP APIs, and re-exports of the raw bsd sockets API. use core::alloc::Layout; +use core::ops::Deref; +use core::ops::DerefMut; +use core::sync::atomic::AtomicU16; +use core::sync::atomic::AtomicUsize; +use core::sync::atomic::Ordering; use alloc::alloc::Allocator; use alloc::alloc::Global; use alloc::boxed::Box; +use alloc::vec::Vec; +use crate::ipc::client::IClientObject; use crate::ipc::sf; use crate::ipc::sf::CopyHandle; use crate::mem::alloc::Buffer; @@ -18,13 +25,90 @@ pub use crate::service::bsd::*; use crate::service::new_service_object; use crate::svc::Handle; use crate::svc::MemoryPermission; +use crate::sync::sys::futex::Futex; use crate::sync::{ReadGuard, RwLock, WriteGuard}; +#[repr(usize)] +pub enum Paralellism { + One = 1, + Two, + Three, + Four, + Five, + Six, + Seven, + Eight, + Nine, + Ten, + Eleven, + Twelve, + Thirteen, + Fourteen, + Fifteen, + Sixteen, +} +pub(crate) enum BsdServiceDispatcher { + U(UserBsdService), + A(AppletBsdService), + S(SystemBsdService), +} + +impl Deref for BsdServiceDispatcher { + type Target = dyn IBsdClient; + fn deref(&self) -> &Self::Target { + match self { + Self::A(service) => service as &dyn IBsdClient, + Self::U(service) => service as &dyn IBsdClient, + Self::S(service) => service as &dyn IBsdClient, + } + } +} + +impl DerefMut for BsdServiceDispatcher { + fn deref_mut(&mut self) -> &mut Self::Target { + match self { + Self::A(service) => service as &mut dyn IBsdClient, + Self::U(service) => service as &mut dyn IBsdClient, + Self::S(service) => service as &mut dyn IBsdClient, + } + } +} + +struct BsdServiceHandle<'p> { + parent: &'p BsdSocketService, + slot: usize, +} + +impl Deref for BsdServiceHandle<'_> { + type Target = dyn IBsdClient; + fn deref(&self) -> &Self::Target { + unsafe { self.parent.services.get_unchecked(self.slot) }.deref() + } +} + +impl Drop for BsdServiceHandle<'_> { + fn drop(&mut self) { + let inverse_mask: u16 = !(1 << self.slot); + // return the slot to the queue for use + self.parent + .checkout_slots + .fetch_and(inverse_mask, Ordering::Release); + + // if there are any waiters, notify one + if self.parent.waiters.load(Ordering::Acquire) > 0 { + self.parent.service_waiter.signal_one(); + } + } +} + /// Holder type for the intialized bsd service pub struct BsdSocketService { _tmem_buffer: Buffer, tmem_handle: Handle, - service: Box, + checkout_slots: AtomicU16, + waiters: AtomicUsize, + service_waiter: Futex, + services: Vec, _monitor_service: Box, _bsd_client_pid: u64, } @@ -37,14 +121,33 @@ impl BsdSocketService { config: BsdServiceConfig, kind: BsdSrvkind, transfer_mem_buffer: Option>, + parrallellism: Paralellism, ) -> Result { - let mut service = match kind { - BsdSrvkind::Applet => Box::new(new_service_object::()?) - as Box, - BsdSrvkind::System => Box::new(new_service_object::()?) - as Box, - BsdSrvkind::User => Box::new(new_service_object::()?) - as Box, + let mut services: Vec = match kind { + BsdSrvkind::Applet => { + let mut services = vec![new_service_object::()?]; + for _ in 1..(parrallellism as usize) { + let copy = services[0].clone()?; + services.push(copy); + } + services.into_iter().map(BsdServiceDispatcher::A).collect() + } + BsdSrvkind::System => { + let mut services = vec![new_service_object::()?]; + for _ in 1..(parrallellism as usize) { + let copy = services[0].clone()?; + services.push(copy); + } + services.into_iter().map(BsdServiceDispatcher::S).collect() + } + BsdSrvkind::User => { + let mut services = vec![new_service_object::()?]; + for _ in 1..(parrallellism as usize) { + let copy = services[0].clone()?; + services.push(copy); + } + services.into_iter().map(BsdServiceDispatcher::U).collect() + } }; let mut monitor_service = match kind { @@ -78,7 +181,7 @@ impl BsdSocketService { MemoryPermission::None(), )?; - let bsd_client_pid = service.register_client( + let bsd_client_pid = services[0].register_client( config, sf::ProcessId::new(), tmem_buffer.layout.size(), @@ -90,17 +193,51 @@ impl BsdSocketService { Ok(Self { _tmem_buffer: tmem_buffer, tmem_handle, - service, + checkout_slots: AtomicU16::new(0), + waiters: AtomicUsize::new(0), + service_waiter: Futex::new(), + services, _monitor_service: monitor_service, _bsd_client_pid: bsd_client_pid, }) } + + fn get_service(&self) -> BsdServiceHandle<'_> { + let slot_limit = self.services.len(); + loop { + if let Ok(value) = + self.checkout_slots + .fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| { + let slot = v.trailing_ones() as usize; + if slot < slot_limit { + // write a checkout bit into the checkout slot + Some(v | (1 << slot)) + } else { + // all valid slots are filled, bail on the update so we can wait on the futex. + None + } + }) + { + // since the atomic update succeeded, we can rerun the trailing_ones call on + // the returned original value to re-calculate the slot we took. + return BsdServiceHandle { + parent: self, + slot: value.trailing_ones() as usize, + }; + } else { + // wait for an active hold on the service to be released. + self.waiters.fetch_add(1, Ordering::Release); + self.service_waiter.wait(0, -1); + self.waiters.fetch_sub(1, Ordering::Release); + } + } + } } impl Drop for BsdSocketService { fn drop(&mut self) { self._monitor_service.close_session(); - self.service.close_session(); + self.services.iter_mut().for_each(drop); let _ = crate::svc::close_handle(self.tmem_handle); let _ = wait_for_permission(self._tmem_buffer.ptr as _, MemoryPermission::Write(), None); } @@ -113,6 +250,7 @@ pub fn initialize( kind: BsdSrvkind, config: BsdServiceConfig, tmem_buffer: Option>, + paralellism: Paralellism, ) -> Result<()> { let mut service_handle = BSD_SERVICE.write(); @@ -120,7 +258,12 @@ pub fn initialize( return Ok(()); } - *service_handle = Some(BsdSocketService::new(config, kind, tmem_buffer)?); + *service_handle = Some(BsdSocketService::new( + config, + kind, + tmem_buffer, + paralellism, + )?); Ok(()) } @@ -203,7 +346,7 @@ pub mod net { let socket_server_handle = BSD_SERVICE.read(); let socket_server = socket_server_handle.as_ref().unwrap(); - match socket_server.service.recv( + match socket_server.get_service().recv( self.as_raw_fd(), ReadFlags::None(), Buffer::from_mut_array(data), @@ -232,7 +375,7 @@ pub mod net { let socket_server_handle = BSD_SERVICE.read(); let socket_server = socket_server_handle.as_ref().unwrap(); - match socket_server.service.recv( + match socket_server.get_service().recv( self.as_raw_fd(), ReadFlags::Peek(), Buffer::from_mut_array(data), @@ -255,7 +398,7 @@ pub mod net { let socket_server_handle = BSD_SERVICE.read(); let socket_server = socket_server_handle.as_ref().unwrap(); - match socket_server.service.send( + match socket_server.get_service().send( self.as_raw_fd(), SendFlags::None(), Buffer::from_array(data), @@ -278,7 +421,7 @@ pub mod net { let socket_server_handle = BSD_SERVICE.read(); let socket_server = socket_server_handle.as_ref().unwrap(); - match socket_server.service.send( + match socket_server.get_service().send( self.as_raw_fd(), SendFlags::DontWait(), Buffer::from_array(data), @@ -301,7 +444,7 @@ pub mod net { let socket_server_handle = BSD_SERVICE.read(); let socket_server = socket_server_handle.as_ref().unwrap(); - match socket_server.service.recv(self.as_raw_fd(), ReadFlags::DontWait(), Buffer::from_mut_array(buffer))? { + match socket_server.get_service().recv(self.as_raw_fd(), ReadFlags::DontWait(), Buffer::from_mut_array(buffer))? { BsdResult::Ok(ret, ()) => { Ok(Some(ret as usize)) }, @@ -325,7 +468,7 @@ pub mod net { let mut out_ip: SocketAddrRepr = Default::default(); match socket_server - .service + .get_service() .get_socket_name(self.as_raw_fd(), Buffer::from_mut_var(&mut out_ip))? { BsdResult::Ok(_, written_sockaddr_size) => { @@ -349,7 +492,7 @@ pub mod net { let mut out_ip: SocketAddrRepr = Default::default(); match socket_server - .service + .get_service() .get_peer_name(self.as_raw_fd(), Buffer::from_mut_var(&mut out_ip))? { BsdResult::Ok(_, written_sockaddr_size) => { @@ -374,7 +517,7 @@ pub mod net { let socket_server_handle = BSD_SERVICE.read(); let socket_server = socket_server_handle.as_ref().unwrap(); - if let BsdResult::Err(errno) = socket_server.service.set_sock_opt( + if let BsdResult::Err(errno) = socket_server.get_service().set_sock_opt( self.as_raw_fd(), IpProto::IP as _, IpOptions::TimeToLive as _, @@ -395,7 +538,7 @@ pub mod net { let socket_server = socket_server_handle.as_ref().unwrap(); let mut ttl: u32 = 0; - if let BsdResult::Err(errno) = socket_server.service.get_sock_opt( + if let BsdResult::Err(errno) = socket_server.get_service().get_sock_opt( self.as_raw_fd(), IpProto::IP as _, IpOptions::TimeToLive as _, @@ -419,14 +562,14 @@ pub mod net { /// to be retried, an error with the value set to `EAGAIN` is /// returned. fn set_nonblocking(&self, nonblocking: bool) -> Result<()> { - const O_NONBLOCK: i32 = 0x4000; + const O_NONBLOCK: i32 = 0x800; let socket_server_handle = BSD_SERVICE.read(); let socket_server = socket_server_handle.as_ref().unwrap(); - let current_flags = match socket_server.service.fnctl( + let current_flags = match socket_server.get_service().fcntl( self.as_raw_fd(), - super::FnCtlCmd::GetFl, + super::FcntlCmd::GetFl, 0, )? { BsdResult::Ok(flags, ()) => flags, @@ -444,11 +587,11 @@ pub mod net { current_flags & !O_NONBLOCK }; - if let BsdResult::Err(errno) = - socket_server - .service - .fnctl(self.as_raw_fd(), super::FnCtlCmd::SetFl, flags)? - { + if let BsdResult::Err(errno) = socket_server.get_service().fcntl( + self.as_raw_fd(), + super::FcntlCmd::SetFl, + flags, + )? { return ResultCode::new_err(nx::result::pack_value( rc::RESULT_MODULE, 1000 + errno.cast_unsigned(), @@ -465,7 +608,7 @@ pub mod net { let socket_server = socket_server_handle.as_ref().unwrap(); let mut timeout: BsdDuration = Default::default(); - match socket_server.service.get_sock_opt( + match socket_server.get_service().get_sock_opt( self.as_raw_fd(), SOL_SOCKET, SocketOptions::ReceiveTimeout as _, @@ -501,7 +644,7 @@ pub mod net { let socket_server = socket_server_handle.as_ref().unwrap(); let timeout: BsdDuration = timeout.into(); - if let BsdResult::Err(errno) = socket_server.service.set_sock_opt( + if let BsdResult::Err(errno) = socket_server.get_service().set_sock_opt( self.as_raw_fd(), SOL_SOCKET, SocketOptions::ReceiveTimeout as _, @@ -524,7 +667,7 @@ pub mod net { let socket_server = socket_server_handle.as_ref().unwrap(); let mut timeout: BsdDuration = Default::default(); - match socket_server.service.get_sock_opt( + match socket_server.get_service().get_sock_opt( self.as_raw_fd(), SOL_SOCKET, SocketOptions::SendTimeout as _, @@ -560,7 +703,7 @@ pub mod net { let socket_server = socket_server_handle.as_ref().unwrap(); let timeout: BsdDuration = timeout.into(); - if let BsdResult::Err(errno) = socket_server.service.set_sock_opt( + if let BsdResult::Err(errno) = socket_server.get_service().set_sock_opt( self.as_raw_fd(), SOL_SOCKET, SocketOptions::SendTimeout as _, @@ -586,7 +729,7 @@ pub mod net { let socket_server = socket_server_handle.as_ref().unwrap(); let mut ret_errno: i32 = 0; - if let BsdResult::Err(errno) = socket_server.service.get_sock_opt( + if let BsdResult::Err(errno) = socket_server.get_service().get_sock_opt( self.as_raw_fd(), SOL_SOCKET, SocketOptions::Error as i32, @@ -639,7 +782,7 @@ pub mod net { .ok_or(rc::ResultNotInitialized::make())?; if let BsdResult::Err(errno) = socket_server - .service + .get_service() .poll(Buffer::from_mut_array(fds.as_mut_slice()), timeout)? { return ResultCode::new_err(nx::result::pack_value( @@ -668,7 +811,7 @@ pub mod net { .ok_or(rc::ResultNotInitialized::make())?; let ipaddr = SocketAddrRepr::from((ip, port)); - let listenfd = match socket_server.service.socket( + let listenfd = match socket_server.get_service().socket( super::SocketDomain::INet, super::SocketType::Stream, super::IpProto::IP, @@ -683,7 +826,7 @@ pub mod net { }; let yes = 1i32; - if let BsdResult::Err(errno) = socket_server.service.set_sock_opt( + if let BsdResult::Err(errno) = socket_server.get_service().set_sock_opt( listenfd, SOL_SOCKET, SocketOptions::ReuseAddr as i32, @@ -696,7 +839,7 @@ pub mod net { } if let BsdResult::Err(errno) = socket_server - .service + .get_service() .bind(listenfd, Buffer::from_var(&ipaddr))? { return ResultCode::new_err(nx::result::pack_value( @@ -705,7 +848,7 @@ pub mod net { )); }; - if let BsdResult::Err(errno) = socket_server.service.listen(listenfd, 5)? { + if let BsdResult::Err(errno) = socket_server.get_service().listen(listenfd, 5)? { return ResultCode::new_err(nx::result::pack_value( rc::RESULT_MODULE, 1000 + errno.cast_unsigned(), @@ -723,7 +866,7 @@ pub mod net { let mut out_ip: SocketAddrRepr = Default::default(); match socket_server - .service + .get_service() .accept(self.get_poll_fd(), Buffer::from_mut_var(&mut out_ip))? { BsdResult::Ok(new_sock, written_sockaddr_size) => { @@ -747,7 +890,7 @@ pub mod net { let mut out_ip: SocketAddrRepr = Default::default(); match socket_server - .service + .get_service() .get_socket_name(self.0, Buffer::from_mut_var(&mut out_ip))? { BsdResult::Ok(_, written_sockaddr_size) => { @@ -763,6 +906,56 @@ pub mod net { )), } } + + /// Moves this TCP listener into or out of nonblocking mode. + /// + /// This will result in the `accept` operation + /// becoming nonblocking, i.e., immediately returning from their calls. + /// If the IO operation is successful, `Ok` is returned and no further + /// action is required. If the IO operation could not be completed and needs + /// to be retried, an error with the value set to `EAGAIN` is + /// returned. + /// + /// The function returns a bool representing if the listener was already in non-blocking mode. + pub fn set_nonblocking(&self, nonblocking: bool) -> Result { + const O_NONBLOCK: i32 = 0x800; + + let socket_server_handle = BSD_SERVICE.read(); + let socket_server = socket_server_handle.as_ref().unwrap(); + + let current_flags = + match socket_server + .get_service() + .fcntl(self.0, super::FcntlCmd::GetFl, 0)? + { + BsdResult::Ok(flags, ()) => flags, + BsdResult::Err(errno) => { + return ResultCode::new_err(nx::result::pack_value( + rc::RESULT_MODULE, + 1000 + errno.cast_unsigned(), + )); + } + }; + + let flags = if nonblocking { + current_flags | O_NONBLOCK + } else { + current_flags & !O_NONBLOCK + }; + + if let BsdResult::Err(errno) = + socket_server + .get_service() + .fcntl(self.0, super::FcntlCmd::SetFl, flags)? + { + return ResultCode::new_err(nx::result::pack_value( + rc::RESULT_MODULE, + 1000 + errno.cast_unsigned(), + )); + } + + Ok(current_flags & O_NONBLOCK != 0) + } } impl traits::Pollable for TcpListener { @@ -776,7 +969,7 @@ pub mod net { let socket_server_handle = BSD_SERVICE.read(); let socket_server = socket_server_handle.as_ref().unwrap(); - socket_server.service.close(self.0); + let _ = socket_server.get_service().close(self.0); } } pub struct TcpStream(i32); @@ -789,7 +982,7 @@ pub mod net { .as_ref() .ok_or(rc::ResultNotInitialized::make())?; - let socket = match socket_server.service.socket( + let socket = match socket_server.get_service().socket( super::SocketDomain::INet, super::SocketType::Stream, super::IpProto::IP, @@ -804,7 +997,7 @@ pub mod net { }; if let BsdResult::Err(errno) = socket_server - .service + .get_service() .connect(socket, Buffer::from_var(&destination))? { return ResultCode::new_err(nx::result::pack_value( @@ -822,7 +1015,7 @@ pub mod net { let socket_server = socket_server_handle.as_ref().unwrap(); let mut linger: Linger = Default::default(); - if let BsdResult::Err(errno) = socket_server.service.get_sock_opt( + if let BsdResult::Err(errno) = socket_server.get_service().get_sock_opt( self.0, SOL_SOCKET, SocketOptions::Linger as i32, @@ -845,7 +1038,7 @@ pub mod net { let socket_server = socket_server_handle.as_ref().unwrap(); let mut delay: i32 = 0; - match socket_server.service.get_sock_opt( + match socket_server.get_service().get_sock_opt( self.0, IpProto::IP as _, TcpOptions::NoDelay as _, @@ -873,7 +1066,7 @@ pub mod net { let socket_server_handle = BSD_SERVICE.read(); let socket_server = socket_server_handle.as_ref().unwrap(); - if let BsdResult::Err(errno) = socket_server.service.set_sock_opt( + if let BsdResult::Err(errno) = socket_server.get_service().set_sock_opt( self.0, SOL_SOCKET, SocketOptions::Broadcast as _, @@ -907,7 +1100,7 @@ pub mod net { let socket_server_handle = BSD_SERVICE.read(); let socket_server = socket_server_handle.as_ref().unwrap(); - if let BsdResult::Err(errno) = socket_server.service.shutdown(self.0, mode)? { + if let BsdResult::Err(errno) = socket_server.get_service().shutdown(self.0, mode)? { return ResultCode::new_err(nx::result::pack_value( rc::RESULT_MODULE, 1000 + errno.cast_unsigned(), @@ -923,7 +1116,7 @@ pub mod net { let socket_server_handle = BSD_SERVICE.read(); let socket_server = socket_server_handle.as_ref().unwrap(); - socket_server.service.close(self.0); + let _ = socket_server.get_service().close(self.0); } } @@ -1027,7 +1220,7 @@ pub mod net { let socket_server = socket_server_handle .as_ref() .ok_or(rc::ResultNotInitialized::make())?; - let socket = match socket_server.service.socket( + let socket = match socket_server.get_service().socket( super::SocketDomain::INet, super::SocketType::DataGram, super::IpProto::UDP, @@ -1044,7 +1237,7 @@ pub mod net { let ipaddr: Ipv4Addr = addr.into(); let socketaddr = SocketAddrRepr::from((ipaddr, port.unwrap_or(0))); if let BsdResult::Err(errno) = socket_server - .service + .get_service() .bind(socket, Buffer::from_var(&socketaddr))? { return ResultCode::new_err(nx::result::pack_value( @@ -1062,7 +1255,7 @@ pub mod net { .as_ref() .ok_or(rc::ResultNotInitialized::make())?; - let socket = match socket_server.service.socket( + let socket = match socket_server.get_service().socket( super::SocketDomain::INet, super::SocketType::DataGram, super::IpProto::UDP, @@ -1077,7 +1270,7 @@ pub mod net { }; if let BsdResult::Err(errno) = socket_server - .service + .get_service() .connect(socket, Buffer::from_var(&destination))? { return ResultCode::new_err(nx::result::pack_value( @@ -1099,7 +1292,7 @@ pub mod net { let socket_server = socket_server_handle.as_ref().unwrap(); let mut out_addr: SocketAddrRepr = Default::default(); - match socket_server.service.recv_from( + match socket_server.get_service().recv_from( self.0, ReadFlags::None(), Buffer::from_mut_array(buffer), @@ -1131,7 +1324,7 @@ pub mod net { let socket_server = socket_server_handle.as_ref().unwrap(); let mut out_addr: SocketAddrRepr = Default::default(); - match socket_server.service.recv_from( + match socket_server.get_service().recv_from( self.0, ReadFlags::Peek(), Buffer::from_mut_array(buffer), @@ -1162,7 +1355,7 @@ pub mod net { let socket_server = socket_server_handle.as_ref().unwrap(); let mut out_addr: SocketAddrRepr = Default::default(); - match socket_server.service.recv_from(self.0, ReadFlags::DontWait(), Buffer::from_mut_array(buffer), Buffer::from_mut_var(&mut out_addr))? { + match socket_server.get_service().recv_from(self.0, ReadFlags::DontWait(), Buffer::from_mut_array(buffer), Buffer::from_mut_var(&mut out_addr))? { BsdResult::Ok(ret, ()) => { Ok(Some((ret as usize, Ipv4Addr::from_bits(u32::from_be_bytes(out_addr.addr)), u16::from_be(out_addr.port)))) }, @@ -1190,7 +1383,7 @@ pub mod net { let socket_server_handle = BSD_SERVICE.read(); let socket_server = socket_server_handle.as_ref().unwrap(); - match socket_server.service.send_to( + match socket_server.get_service().send_to( self.0, SendFlags::None(), Buffer::from_array(data), @@ -1212,7 +1405,7 @@ pub mod net { let socket_server = socket_server_handle.as_ref().unwrap(); let mut broadcast: i32 = 0; - match socket_server.service.get_sock_opt( + match socket_server.get_service().get_sock_opt( self.0, SOL_SOCKET, SocketOptions::Broadcast as _, @@ -1235,7 +1428,7 @@ pub mod net { let socket_server_handle = BSD_SERVICE.read(); let socket_server = socket_server_handle.as_ref().unwrap(); - if let BsdResult::Err(errno) = socket_server.service.set_sock_opt( + if let BsdResult::Err(errno) = socket_server.get_service().set_sock_opt( self.0, SOL_SOCKET, SocketOptions::Broadcast as _, @@ -1258,7 +1451,7 @@ pub mod net { let socket_server = socket_server_handle.as_ref().unwrap(); let mut mm_loop: u8 = 0; - match socket_server.service.get_sock_opt( + match socket_server.get_service().get_sock_opt( self.0, IpProto::IP as _, IpOptions::MulticastLoopback as _, @@ -1281,7 +1474,7 @@ pub mod net { let socket_server_handle = BSD_SERVICE.read(); let socket_server = socket_server_handle.as_ref().unwrap(); - if let BsdResult::Err(errno) = socket_server.service.set_sock_opt( + if let BsdResult::Err(errno) = socket_server.get_service().set_sock_opt( self.0, IpProto::IP as _, IpOptions::MulticastLoopback as _, @@ -1304,7 +1497,7 @@ pub mod net { let socket_server = socket_server_handle.as_ref().unwrap(); let mut ttl: u8 = 0; - match socket_server.service.get_sock_opt( + match socket_server.get_service().get_sock_opt( self.0, IpProto::IP as _, IpOptions::MulticastTimeToLive as _, @@ -1327,7 +1520,7 @@ pub mod net { let socket_server_handle = BSD_SERVICE.read(); let socket_server = socket_server_handle.as_ref().unwrap(); - if let BsdResult::Err(errno) = socket_server.service.set_sock_opt( + if let BsdResult::Err(errno) = socket_server.get_service().set_sock_opt( self.0, IpProto::IP as _, IpOptions::MulticastTimeToLive as _, @@ -1359,7 +1552,7 @@ pub mod net { multicast_addr, interface_addr, }; - if let BsdResult::Err(errno) = socket_server.service.set_sock_opt( + if let BsdResult::Err(errno) = socket_server.get_service().set_sock_opt( self.0, IpProto::IP as _, IpOptions::MulticastAddMembership as _, @@ -1388,7 +1581,7 @@ pub mod net { multicast_addr, interface_addr, }; - if let BsdResult::Err(errno) = socket_server.service.set_sock_opt( + if let BsdResult::Err(errno) = socket_server.get_service().set_sock_opt( self.0, IpProto::IP as _, IpOptions::MulticastDropMembership as _, @@ -1409,7 +1602,7 @@ pub mod net { let socket_server_handle = BSD_SERVICE.read(); let socket_server = socket_server_handle.as_ref().unwrap(); - socket_server.service.close(self.0); + let _ = socket_server.get_service().close(self.0); } } @@ -1431,7 +1624,6 @@ pub mod net { } } - /// Despite the impl requirements, the object is not mutated impl core::fmt::Write for TcpStream { fn write_str(&mut self, s: &str) -> core::fmt::Result { match self.send(s.as_bytes()) { @@ -1441,7 +1633,6 @@ pub mod net { } } - /// Despite the impl requirements, the object is not mutated impl core::fmt::Write for UdpSocket { fn write_str(&mut self, s: &str) -> core::fmt::Result { match self.send(s.as_bytes()) { @@ -1450,4 +1641,72 @@ pub mod net { } } } + pub mod io_impls { + use crate::{result::ResultCode, socket::net::traits::SocketCommon}; + + use super::{TcpStream, UdpSocket}; + + impl embedded_io::ErrorType for TcpStream { + type Error = ResultCode; + } + impl embedded_io::ErrorType for UdpSocket { + type Error = ResultCode; + } + + impl embedded_io::Read for TcpStream { + fn read(&mut self, buf: &mut [u8]) -> Result { + if buf.len() == 0 { + return Ok(0); + } + + self.recv(buf).map(|l| l as usize) + } + } + + impl embedded_io::Read for UdpSocket { + fn read(&mut self, buf: &mut [u8]) -> Result { + if buf.len() == 0 { + return Ok(0); + } + + self.recv(buf).map(|l| l as usize) + } + } + + impl embedded_io::Write for TcpStream { + fn write(&mut self, buf: &[u8]) -> core::result::Result { + if buf.len() == 0 { + return Ok(0); + } + + match self.send(buf) { + Ok(0) => Err(ResultCode::new(1005)), + Ok(l) => Ok(l as usize), + Err(e) => Err(e), + } + } + + fn flush(&mut self) -> core::result::Result<(), Self::Error> { + Ok(()) + } + } + + impl embedded_io::Write for UdpSocket { + fn write(&mut self, buf: &[u8]) -> core::result::Result { + if buf.len() == 0 { + return Ok(0); + } + + match self.send(buf) { + Ok(0) => Err(ResultCode::new(1005)), + Ok(l) => Ok(l as usize), + Err(e) => Err(e), + } + } + + fn flush(&mut self) -> core::result::Result<(), Self::Error> { + Ok(()) + } + } + } } diff --git a/src/thread.rs b/src/thread.rs index 29db5a8b..a185ee7d 100644 --- a/src/thread.rs +++ b/src/thread.rs @@ -594,8 +594,7 @@ impl Thread { /// # Safety /// If `name` is `ThreadName::Other(_)`, the contained string must be valid UTF-8. unsafe fn new_inner(name: ThreadName) -> Thread { - // We have to use `unsafe` here to construct the `Parker` in-place, - // which is required for the UNIX implementation. + // We have to use `unsafe` here to construct the `Inner` in-place. // // SAFETY: We pin the Arc immediately after creation, so its address never // changes. @@ -670,6 +669,18 @@ impl Thread { pub fn name(&self) -> ThreadName { self.inner.name } + + /// Atomically makes the handle’s token available if it is not already. + /// + /// Takes no effect if the thread was currently unparked, but this marks parked threads for scheduling. + pub fn unpark(&self) { + let _ = unsafe { + svc::set_thread_activity( + core::ptr::read(self.inner.thread_handle.get()), + svc::SchedulerState::Runnable, + ) + }; + } } impl fmt::Debug for Thread { @@ -771,8 +782,19 @@ impl Drop for Packet<'_, T> { // the scope function will panic. let unhandled_panic = matches!(self.result.get_mut(), Some(Err(_))); - // we don't support panic unwinding - if unhandled_panic { + // Drop the result without causing unwinding. + // This is only relevant for threads that aren't join()ed, as + // join() will take the `result` and set it to None, such that + // there is nothing left to drop here. + // If this panics, we should handle that, because we're outside the + // outermost `catch_unwind` of our thread. + // We just abort in that case, since there's nothing else we can do. + // (And even if we tried to handle it somehow, we'd also need to handle + // the case where the panic payload we get out of it also panics on + // drop, and so on. See issue #86027.) + if let Err(_) = unwinding::panic::catch_unwind(core::panic::AssertUnwindSafe(|| { + *self.result.get_mut() = None; + })) { abort::abort(AbortLevel::Panic(), crate::rc::ResultPanicked::make()); } @@ -1503,3 +1525,13 @@ pub fn r#yield(yield_type: YieldType) -> crate::result::Result<()> { pub fn exit() -> ! { svc::exit_thread() } + +/// Blocks unless or until the current thread’s token is made available. +/// +/// A call to `park` does not guarantee that the thread will remain parked forever, +/// and callers should be prepared for this possibility. +/// However, it is guaranteed that this function will not panic. +pub fn park() { + let current_thread = unsafe { current().as_mut().unwrap() }; + let _ = svc::set_thread_activity(current_thread.handle(), svc::SchedulerState::Paused); +} diff --git a/src/thread/scoped.rs b/src/thread/scoped.rs index b0abcba6..07bf73e9 100644 --- a/src/thread/scoped.rs +++ b/src/thread/scoped.rs @@ -141,7 +141,7 @@ where }; // Run `f`, the scoped function. - let result = f(&scope); + let result = unwinding::panic::catch_unwind(core::panic::AssertUnwindSafe(|| f(&scope))); // Wait until all the threads are finished. This should always happen first try as we can only drop a thread handle when the thread has finished. // TODO: we can possibly detect panics if the num_running_threads > 0 - as this should never happen in our model. @@ -149,7 +149,17 @@ where let _ = super::r#yield(super::YieldType::ToAnyThread); } - result + // Throw any panic from `f`, or the return value of `f` if no thread panicked. + match result { + Err(e) => { + unwinding::panic::begin_panic(e); + unreachable!() + } + Ok(_) if scope.data.a_thread_panicked.load(Ordering::Relaxed) => { + panic!("a scoped thread panicked") + } + Ok(result) => result, + } } impl<'scope> Scope<'scope, '_> {