From e8d5e764226cfc9a72cd1a3280cf61b699eb2931 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 21 Aug 2025 11:08:29 +0800 Subject: [PATCH 01/18] fix: windows peek reregister after success or would block Signed-off-by: discord9 --- src/io_source.rs | 8 +++++ src/net/tcp/stream.rs | 3 ++ src/sys/windows/mod.rs | 22 ++++++++++++++ tests/tcp_stream.rs | 69 ++++++++++++++++++++++++++++++++++++++++++ tests/util/mod.rs | 2 ++ 5 files changed, 104 insertions(+) diff --git a/src/io_source.rs b/src/io_source.rs index cd57653ab..2ad76d55e 100644 --- a/src/io_source.rs +++ b/src/io_source.rs @@ -69,6 +69,14 @@ impl IoSource { self.state.do_io(f, &self.inner) } + #[cfg(windows)] + pub fn do_io_and_reregister(&self, f: F) -> io::Result + where + F: FnOnce(&T) -> io::Result, + { + self.state.do_io_and_reregister(f, &self.inner) + } + /// Returns the I/O source, dropping the state. /// /// # Notes diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index f3db74fab..2d8d70421 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -212,6 +212,9 @@ impl TcpStream { /// Successive calls return the same data. This is accomplished by passing /// `MSG_PEEK` as a flag to the underlying recv system call. pub fn peek(&self, buf: &mut [u8]) -> io::Result { + #[cfg(windows)] + return self.inner.do_io_and_reregister(|inner| inner.peek(buf)); + #[cfg(not(windows))] self.inner.peek(buf) } diff --git a/src/sys/windows/mod.rs b/src/sys/windows/mod.rs index 89d74b1a2..02d14adb1 100644 --- a/src/sys/windows/mod.rs +++ b/src/sys/windows/mod.rs @@ -98,6 +98,28 @@ cfg_io_source! { result } + pub fn do_io_and_reregister(&self, f: F, io: &T) -> io::Result + where + F: FnOnce(&T) -> io::Result, + { + let result = f(io); + + let is_ok_or_would_block = match &result{ + Ok(_) => true, + Err(e) if e.kind() == io::ErrorKind::WouldBlock => true, + _ => false + }; + if is_ok_or_would_block{ + self.inner.as_ref().map_or(Ok(()), |state| { + state + .selector + .reregister(state.sock_state.clone(), state.token, state.interests) + })?; + } + + result + } + pub fn register( &mut self, registry: &Registry, diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index 433de7d12..c4eccba15 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -876,3 +876,72 @@ fn send_oob_data(stream: &S, data: &[u8]) -> io::Result { } } } + +/// Test that `peek` works correctly with readiness events. +/// +/// `peek` should not remove the readable interest on Windows, +/// but since it is also a read operation, so previously `SockState::feed_event` will remove the readable interest after a successful peek. +#[test] +fn peek_readiness() { + let mut buf = [0; 2]; + let (mut poll, mut events) = init_with_poll(); + + let listener = net::TcpListener::bind(any_local_address()).unwrap(); + let sockaddr = listener.local_addr().unwrap(); + let thread_handle = thread::spawn(move || listener.accept().unwrap()); + let stream1 = net::TcpStream::connect(sockaddr).unwrap(); + let (mut stream2, _) = thread_handle.join().unwrap(); + + stream1.set_nonblocking(true).unwrap(); + let mut stream1 = TcpStream::from_std(stream1); + + poll.registry() + .register(&mut stream1, ID1, Interest::READABLE) + .unwrap(); + + expect_no_events(&mut poll, &mut events); + assert_would_block(stream1.peek(&mut buf)); + + assert_eq!(stream2.write(&[0]).unwrap(), 1); + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Readiness::READABLE)], + ); + assert_eq!(stream1.peek(&mut buf).unwrap(), 1); + // a successful peek shouldn't remove readable interest + // (which did on windows that due to peek is also read) + + assert_eq!(stream2.write(&[0]).unwrap(), 1); + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Readiness::READABLE)], + ); + assert_eq!(stream1.peek(&mut buf).unwrap(), 2); + assert_eq!(stream1.read(&mut buf).unwrap(), 2); + assert_would_block(stream1.peek(&mut buf)); + // a would block peek also should not remove readable interest on windows + // but it previously did as peek is also read + + assert_eq!(stream2.write(&[0, 1, 2, 3]).unwrap(), 4); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Readiness::READABLE)], + ); + assert_eq!(stream1.read(&mut buf).unwrap(), 2); + assert_eq!(stream1.peek(&mut buf).unwrap(), 2); + + assert_eq!(stream1.read(&mut buf).unwrap(), 2); + assert_would_block(stream1.peek(&mut buf)); + + drop(stream2); + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Readiness::READABLE)], + ); + assert_eq!(stream1.peek(&mut buf).unwrap(), 0); +} diff --git a/tests/util/mod.rs b/tests/util/mod.rs index 4f59605e9..579a10d00 100644 --- a/tests/util/mod.rs +++ b/tests/util/mod.rs @@ -134,6 +134,7 @@ impl From for Readiness { } } +#[track_caller] pub fn expect_events(poll: &mut Poll, events: &mut Events, mut expected: Vec) { // In a lot of calls we expect more then one event, but it could be that // poll returns the first event only in a single call. To be a bit more @@ -164,6 +165,7 @@ pub fn expect_events(poll: &mut Poll, events: &mut Events, mut expected: Vec Date: Thu, 21 Aug 2025 11:35:53 +0800 Subject: [PATCH 02/18] chore: fmt Signed-off-by: discord9 --- src/sys/windows/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sys/windows/mod.rs b/src/sys/windows/mod.rs index 02d14adb1..d0bf79fbb 100644 --- a/src/sys/windows/mod.rs +++ b/src/sys/windows/mod.rs @@ -109,6 +109,7 @@ cfg_io_source! { Err(e) if e.kind() == io::ErrorKind::WouldBlock => true, _ => false }; + if is_ok_or_would_block{ self.inner.as_ref().map_or(Ok(()), |state| { state From 64c26527ec43aed47d61750043c74b84e94822cf Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 21 Aug 2025 21:09:50 +0800 Subject: [PATCH 03/18] chore: some ci failure Signed-off-by: discord9 --- src/io_source.rs | 2 +- src/net/tcp/stream.rs | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/io_source.rs b/src/io_source.rs index 2ad76d55e..24b7b9ec4 100644 --- a/src/io_source.rs +++ b/src/io_source.rs @@ -69,7 +69,7 @@ impl IoSource { self.state.do_io(f, &self.inner) } - #[cfg(windows)] + #[cfg(all(windows, feature="net", feature="os-poll"))] pub fn do_io_and_reregister(&self, f: F) -> io::Result where F: FnOnce(&T) -> io::Result, diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 2d8d70421..001efb6b3 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -212,10 +212,14 @@ impl TcpStream { /// Successive calls return the same data. This is accomplished by passing /// `MSG_PEEK` as a flag to the underlying recv system call. pub fn peek(&self, buf: &mut [u8]) -> io::Result { - #[cfg(windows)] - return self.inner.do_io_and_reregister(|inner| inner.peek(buf)); - #[cfg(not(windows))] - self.inner.peek(buf) + #[cfg(all(windows, feature = "net", feature = "os-poll"))] + { + self.inner.do_io_and_reregister(|inner| inner.peek(buf)) + } + #[cfg(not(all(windows, feature = "net", feature = "os-poll")))] + { + self.inner.peek(buf) + } } /// Execute an I/O operation ensuring that the socket receives more events From a21e843cf2e662099b56175ac9c28379b3dadbcc Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 22 Aug 2025 15:47:01 +0800 Subject: [PATCH 04/18] unfix: just peek_ok/would block Signed-off-by: discord9 --- src/io_source.rs | 1 + src/net/tcp/stream.rs | 3 +- src/sys/windows/mod.rs | 3 +- tests/tcp_stream.rs | 79 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 84 insertions(+), 2 deletions(-) diff --git a/src/io_source.rs b/src/io_source.rs index 24b7b9ec4..7fc3edf94 100644 --- a/src/io_source.rs +++ b/src/io_source.rs @@ -69,6 +69,7 @@ impl IoSource { self.state.do_io(f, &self.inner) } + #[allow(unused)] #[cfg(all(windows, feature="net", feature="os-poll"))] pub fn do_io_and_reregister(&self, f: F) -> io::Result where diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 001efb6b3..c3251939b 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -214,7 +214,8 @@ impl TcpStream { pub fn peek(&self, buf: &mut [u8]) -> io::Result { #[cfg(all(windows, feature = "net", feature = "os-poll"))] { - self.inner.do_io_and_reregister(|inner| inner.peek(buf)) + // self.inner.do_io_and_reregister(|inner| inner.peek(buf)) + self.inner.peek(buf) } #[cfg(not(all(windows, feature = "net", feature = "os-poll")))] { diff --git a/src/sys/windows/mod.rs b/src/sys/windows/mod.rs index d0bf79fbb..94b3bc61d 100644 --- a/src/sys/windows/mod.rs +++ b/src/sys/windows/mod.rs @@ -98,6 +98,7 @@ cfg_io_source! { result } + #[allow(unused)] pub fn do_io_and_reregister(&self, f: F, io: &T) -> io::Result where F: FnOnce(&T) -> io::Result, @@ -105,7 +106,7 @@ cfg_io_source! { let result = f(io); let is_ok_or_would_block = match &result{ - Ok(_) => true, + Ok(_) => false, Err(e) if e.kind() == io::ErrorKind::WouldBlock => true, _ => false }; diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index c4eccba15..b13d8c162 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -945,3 +945,82 @@ fn peek_readiness() { ); assert_eq!(stream1.peek(&mut buf).unwrap(), 0); } + +#[test] +fn peek_ok(){ + let mut buf = [0; 2]; + let (mut poll, mut events) = init_with_poll(); + + let listener = net::TcpListener::bind(any_local_address()).unwrap(); + let sockaddr = listener.local_addr().unwrap(); + let thread_handle = thread::spawn(move || listener.accept().unwrap()); + let stream1 = net::TcpStream::connect(sockaddr).unwrap(); + let (mut stream2, _) = thread_handle.join().unwrap(); + + stream1.set_nonblocking(true).unwrap(); + let mut stream1 = TcpStream::from_std(stream1); + + poll.registry() + .register(&mut stream1, ID1, Interest::READABLE) + .unwrap(); + + expect_no_events(&mut poll, &mut events); + + assert_eq!(stream2.write(&[0]).unwrap(), 1); + assert_eq!(stream1.peek(&mut buf).unwrap(), 1); + // a successful peek shouldn't remove readable interest + // so we should still get a readable event + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Readiness::READABLE)], + ); + + assert_eq!(stream2.write(&[0]).unwrap(), 1); + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Readiness::READABLE)], + ); +} + + +#[test] +fn peek_would_block(){ + let mut buf = [0; 2]; + let (mut poll, mut events) = init_with_poll(); + + let listener = net::TcpListener::bind(any_local_address()).unwrap(); + let sockaddr = listener.local_addr().unwrap(); + let thread_handle = thread::spawn(move || listener.accept().unwrap()); + let stream1 = net::TcpStream::connect(sockaddr).unwrap(); + let (mut stream2, _) = thread_handle.join().unwrap(); + + stream1.set_nonblocking(true).unwrap(); + let mut stream1 = TcpStream::from_std(stream1); + + poll.registry() + .register(&mut stream1, ID1, Interest::READABLE) + .unwrap(); + + expect_no_events(&mut poll, &mut events); + + assert_eq!(stream2.write(&[0]).unwrap(), 1); + // a would block peek also should not remove readable interest + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Readiness::READABLE)], + ); + + assert_eq!(stream1.read(&mut buf).unwrap(), 1); + assert_would_block(stream1.peek(&mut buf)); + + assert_eq!(stream2.write(&[0, 1, 2, 3]).unwrap(), 4); + // a would block peek also should not remove readable interest + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Readiness::READABLE)], + ); +} From 5f64c3f6cd2f0b907b0b8236b8196e5519e77019 Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 22 Aug 2025 15:54:18 +0800 Subject: [PATCH 05/18] fix: don't know why, confused Signed-off-by: discord9 --- src/net/tcp/stream.rs | 4 ++-- src/sys/windows/mod.rs | 2 +- tests/tcp_stream.rs | 8 +++++--- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index c3251939b..b1e5bb1bd 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -214,8 +214,8 @@ impl TcpStream { pub fn peek(&self, buf: &mut [u8]) -> io::Result { #[cfg(all(windows, feature = "net", feature = "os-poll"))] { - // self.inner.do_io_and_reregister(|inner| inner.peek(buf)) - self.inner.peek(buf) + self.inner.do_io_and_reregister(|inner| inner.peek(buf)) + // self.inner.peek(buf) } #[cfg(not(all(windows, feature = "net", feature = "os-poll")))] { diff --git a/src/sys/windows/mod.rs b/src/sys/windows/mod.rs index 94b3bc61d..43c0a8cc8 100644 --- a/src/sys/windows/mod.rs +++ b/src/sys/windows/mod.rs @@ -106,7 +106,7 @@ cfg_io_source! { let result = f(io); let is_ok_or_would_block = match &result{ - Ok(_) => false, + Ok(_) => true, Err(e) if e.kind() == io::ErrorKind::WouldBlock => true, _ => false }; diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index b13d8c162..325848037 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -975,8 +975,9 @@ fn peek_ok(){ &mut events, vec![ExpectEvent::new(ID1, Readiness::READABLE)], ); - + assert_eq!(stream2.write(&[0]).unwrap(), 1); + // this panic with no event on windows if not re-register after successful peek expect_events( &mut poll, &mut events, @@ -1006,7 +1007,6 @@ fn peek_would_block(){ expect_no_events(&mut poll, &mut events); assert_eq!(stream2.write(&[0]).unwrap(), 1); - // a would block peek also should not remove readable interest expect_events( &mut poll, &mut events, @@ -1017,7 +1017,9 @@ fn peek_would_block(){ assert_would_block(stream1.peek(&mut buf)); assert_eq!(stream2.write(&[0, 1, 2, 3]).unwrap(), 4); - // a would block peek also should not remove readable interest + + // this panic with no event on windows if not re-register after would block peek + // I don't know why it need to re-register for that to work expect_events( &mut poll, &mut events, From fb48578ce6c562acf9b271b47da241c1e76e9888 Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 22 Aug 2025 16:15:44 +0800 Subject: [PATCH 06/18] unfix again to show problem in peek_would_block Signed-off-by: discord9 --- src/net/tcp/stream.rs | 4 ++-- src/sys/windows/mod.rs | 2 +- tests/tcp_stream.rs | 8 -------- 3 files changed, 3 insertions(+), 11 deletions(-) diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index b1e5bb1bd..a9cbae0d0 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -214,8 +214,8 @@ impl TcpStream { pub fn peek(&self, buf: &mut [u8]) -> io::Result { #[cfg(all(windows, feature = "net", feature = "os-poll"))] { - self.inner.do_io_and_reregister(|inner| inner.peek(buf)) - // self.inner.peek(buf) + // self.inner.do_io(|inner| inner.peek(buf)) + self.inner.peek(buf) } #[cfg(not(all(windows, feature = "net", feature = "os-poll")))] { diff --git a/src/sys/windows/mod.rs b/src/sys/windows/mod.rs index 43c0a8cc8..94b3bc61d 100644 --- a/src/sys/windows/mod.rs +++ b/src/sys/windows/mod.rs @@ -106,7 +106,7 @@ cfg_io_source! { let result = f(io); let is_ok_or_would_block = match &result{ - Ok(_) => true, + Ok(_) => false, Err(e) if e.kind() == io::ErrorKind::WouldBlock => true, _ => false }; diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index 325848037..57469f942 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -975,14 +975,6 @@ fn peek_ok(){ &mut events, vec![ExpectEvent::new(ID1, Readiness::READABLE)], ); - - assert_eq!(stream2.write(&[0]).unwrap(), 1); - // this panic with no event on windows if not re-register after successful peek - expect_events( - &mut poll, - &mut events, - vec![ExpectEvent::new(ID1, Readiness::READABLE)], - ); } From 38313effabbdfd11270f6691f23e73401f02211d Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 22 Aug 2025 16:18:41 +0800 Subject: [PATCH 07/18] fix: do_io with peek Signed-off-by: discord9 --- src/io_source.rs | 9 --------- src/net/tcp/stream.rs | 4 ++-- src/sys/windows/mod.rs | 24 ------------------------ 3 files changed, 2 insertions(+), 35 deletions(-) diff --git a/src/io_source.rs b/src/io_source.rs index 7fc3edf94..cd57653ab 100644 --- a/src/io_source.rs +++ b/src/io_source.rs @@ -69,15 +69,6 @@ impl IoSource { self.state.do_io(f, &self.inner) } - #[allow(unused)] - #[cfg(all(windows, feature="net", feature="os-poll"))] - pub fn do_io_and_reregister(&self, f: F) -> io::Result - where - F: FnOnce(&T) -> io::Result, - { - self.state.do_io_and_reregister(f, &self.inner) - } - /// Returns the I/O source, dropping the state. /// /// # Notes diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index a9cbae0d0..906dba8ae 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -214,8 +214,8 @@ impl TcpStream { pub fn peek(&self, buf: &mut [u8]) -> io::Result { #[cfg(all(windows, feature = "net", feature = "os-poll"))] { - // self.inner.do_io(|inner| inner.peek(buf)) - self.inner.peek(buf) + self.inner.do_io(|inner| inner.peek(buf)) + // self.inner.peek(buf) } #[cfg(not(all(windows, feature = "net", feature = "os-poll")))] { diff --git a/src/sys/windows/mod.rs b/src/sys/windows/mod.rs index 94b3bc61d..89d74b1a2 100644 --- a/src/sys/windows/mod.rs +++ b/src/sys/windows/mod.rs @@ -98,30 +98,6 @@ cfg_io_source! { result } - #[allow(unused)] - pub fn do_io_and_reregister(&self, f: F, io: &T) -> io::Result - where - F: FnOnce(&T) -> io::Result, - { - let result = f(io); - - let is_ok_or_would_block = match &result{ - Ok(_) => false, - Err(e) if e.kind() == io::ErrorKind::WouldBlock => true, - _ => false - }; - - if is_ok_or_would_block{ - self.inner.as_ref().map_or(Ok(()), |state| { - state - .selector - .reregister(state.sock_state.clone(), state.token, state.interests) - })?; - } - - result - } - pub fn register( &mut self, registry: &Registry, From 0c5be6cc84ebedae650363e13c0922b019b13bcd Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 22 Aug 2025 16:28:43 +0800 Subject: [PATCH 08/18] chore: clean up Signed-off-by: discord9 --- src/net/tcp/stream.rs | 1 - tests/tcp_stream.rs | 71 +------------------------------------------ 2 files changed, 1 insertion(+), 71 deletions(-) diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 906dba8ae..24d777ade 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -215,7 +215,6 @@ impl TcpStream { #[cfg(all(windows, feature = "net", feature = "os-poll"))] { self.inner.do_io(|inner| inner.peek(buf)) - // self.inner.peek(buf) } #[cfg(not(all(windows, feature = "net", feature = "os-poll")))] { diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index 57469f942..b62d25c4d 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -877,75 +877,6 @@ fn send_oob_data(stream: &S, data: &[u8]) -> io::Result { } } -/// Test that `peek` works correctly with readiness events. -/// -/// `peek` should not remove the readable interest on Windows, -/// but since it is also a read operation, so previously `SockState::feed_event` will remove the readable interest after a successful peek. -#[test] -fn peek_readiness() { - let mut buf = [0; 2]; - let (mut poll, mut events) = init_with_poll(); - - let listener = net::TcpListener::bind(any_local_address()).unwrap(); - let sockaddr = listener.local_addr().unwrap(); - let thread_handle = thread::spawn(move || listener.accept().unwrap()); - let stream1 = net::TcpStream::connect(sockaddr).unwrap(); - let (mut stream2, _) = thread_handle.join().unwrap(); - - stream1.set_nonblocking(true).unwrap(); - let mut stream1 = TcpStream::from_std(stream1); - - poll.registry() - .register(&mut stream1, ID1, Interest::READABLE) - .unwrap(); - - expect_no_events(&mut poll, &mut events); - assert_would_block(stream1.peek(&mut buf)); - - assert_eq!(stream2.write(&[0]).unwrap(), 1); - expect_events( - &mut poll, - &mut events, - vec![ExpectEvent::new(ID1, Readiness::READABLE)], - ); - assert_eq!(stream1.peek(&mut buf).unwrap(), 1); - // a successful peek shouldn't remove readable interest - // (which did on windows that due to peek is also read) - - assert_eq!(stream2.write(&[0]).unwrap(), 1); - expect_events( - &mut poll, - &mut events, - vec![ExpectEvent::new(ID1, Readiness::READABLE)], - ); - assert_eq!(stream1.peek(&mut buf).unwrap(), 2); - assert_eq!(stream1.read(&mut buf).unwrap(), 2); - assert_would_block(stream1.peek(&mut buf)); - // a would block peek also should not remove readable interest on windows - // but it previously did as peek is also read - - assert_eq!(stream2.write(&[0, 1, 2, 3]).unwrap(), 4); - - expect_events( - &mut poll, - &mut events, - vec![ExpectEvent::new(ID1, Readiness::READABLE)], - ); - assert_eq!(stream1.read(&mut buf).unwrap(), 2); - assert_eq!(stream1.peek(&mut buf).unwrap(), 2); - - assert_eq!(stream1.read(&mut buf).unwrap(), 2); - assert_would_block(stream1.peek(&mut buf)); - - drop(stream2); - expect_events( - &mut poll, - &mut events, - vec![ExpectEvent::new(ID1, Readiness::READABLE)], - ); - assert_eq!(stream1.peek(&mut buf).unwrap(), 0); -} - #[test] fn peek_ok(){ let mut buf = [0; 2]; @@ -1011,7 +942,7 @@ fn peek_would_block(){ assert_eq!(stream2.write(&[0, 1, 2, 3]).unwrap(), 4); // this panic with no event on windows if not re-register after would block peek - // I don't know why it need to re-register for that to work + // becuase mio simulate edge-triggered behavior in `SockState::feed_event` and need reregister expect_events( &mut poll, &mut events, From d02337f3fee1e96036987f35040b2ec012058e18 Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 22 Aug 2025 16:35:55 +0800 Subject: [PATCH 09/18] chore: clean up cfg windows Signed-off-by: discord9 --- src/net/tcp/stream.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 24d777ade..1a247b76f 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -212,11 +212,14 @@ impl TcpStream { /// Successive calls return the same data. This is accomplished by passing /// `MSG_PEEK` as a flag to the underlying recv system call. pub fn peek(&self, buf: &mut [u8]) -> io::Result { - #[cfg(all(windows, feature = "net", feature = "os-poll"))] + #[cfg(windows)] { + // on windows need to manually reregister the socket + // because `SockState::feed_event` simulate edge-triggered behavior + // and need to reregister the socket to receive more events self.inner.do_io(|inner| inner.peek(buf)) } - #[cfg(not(all(windows, feature = "net", feature = "os-poll")))] + #[cfg(not(windows))] { self.inner.peek(buf) } From 51e54fff4dd2bb3b681652796e23e2534f35176e Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 22 Aug 2025 16:44:24 +0800 Subject: [PATCH 10/18] test: use same buf=data.len to prevent linux special treat Signed-off-by: discord9 --- tests/tcp_stream.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index b62d25c4d..e68d8e349 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -878,7 +878,7 @@ fn send_oob_data(stream: &S, data: &[u8]) -> io::Result { } #[test] -fn peek_ok(){ +fn peek_ok() { let mut buf = [0; 2]; let (mut poll, mut events) = init_with_poll(); @@ -890,7 +890,7 @@ fn peek_ok(){ stream1.set_nonblocking(true).unwrap(); let mut stream1 = TcpStream::from_std(stream1); - + poll.registry() .register(&mut stream1, ID1, Interest::READABLE) .unwrap(); @@ -908,10 +908,9 @@ fn peek_ok(){ ); } - #[test] -fn peek_would_block(){ - let mut buf = [0; 2]; +fn peek_would_block() { + let mut buf = [0; 1]; let (mut poll, mut events) = init_with_poll(); let listener = net::TcpListener::bind(any_local_address()).unwrap(); @@ -922,7 +921,7 @@ fn peek_would_block(){ stream1.set_nonblocking(true).unwrap(); let mut stream1 = TcpStream::from_std(stream1); - + poll.registry() .register(&mut stream1, ID1, Interest::READABLE) .unwrap(); @@ -935,10 +934,10 @@ fn peek_would_block(){ &mut events, vec![ExpectEvent::new(ID1, Readiness::READABLE)], ); - + assert_eq!(stream1.read(&mut buf).unwrap(), 1); assert_would_block(stream1.peek(&mut buf)); - + assert_eq!(stream2.write(&[0, 1, 2, 3]).unwrap(), 4); // this panic with no event on windows if not re-register after would block peek From cc4a904476b3f28fd325ddc0c84218c458c1d9b7 Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 22 Aug 2025 16:55:01 +0800 Subject: [PATCH 11/18] test: peek more after imme write Signed-off-by: discord9 --- tests/tcp_stream.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index e68d8e349..075c57edc 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -898,7 +898,15 @@ fn peek_ok() { expect_no_events(&mut poll, &mut events); assert_eq!(stream2.write(&[0]).unwrap(), 1); - assert_eq!(stream1.peek(&mut buf).unwrap(), 1); + // peek multiple times until we get a byte + loop{ + let res = stream1.peek(&mut buf); + match res{ + Ok(1) => break, + Err(err) if err.kind() == io::ErrorKind::WouldBlock =>{continue} + _ => panic!("Unexpected error: {:?}", res), + } + } // a successful peek shouldn't remove readable interest // so we should still get a readable event expect_events( From 63728394ee9cebdd8dbb7d35635ff8441e0efaad Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 22 Aug 2025 17:02:22 +0800 Subject: [PATCH 12/18] skip test on unsupported force poll Signed-off-by: discord9 --- tests/tcp_stream.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index 075c57edc..aa4e8b01a 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -916,6 +916,7 @@ fn peek_ok() { ); } +#[cfg(not(mio_unsupported_force_poll_poll))] #[test] fn peek_would_block() { let mut buf = [0; 1]; From bcf431f77fe0add22502d7aaaed0525e8838ce3f Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 22 Aug 2025 17:04:56 +0800 Subject: [PATCH 13/18] chore: fmt Signed-off-by: discord9 --- tests/tcp_stream.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index aa4e8b01a..d90a96f58 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -899,11 +899,11 @@ fn peek_ok() { assert_eq!(stream2.write(&[0]).unwrap(), 1); // peek multiple times until we get a byte - loop{ + loop { let res = stream1.peek(&mut buf); - match res{ + match res { Ok(1) => break, - Err(err) if err.kind() == io::ErrorKind::WouldBlock =>{continue} + Err(err) if err.kind() == io::ErrorKind::WouldBlock => continue, _ => panic!("Unexpected error: {:?}", res), } } From 9d2a88720f9d1b2eb9fa375cda3ed6aceaae8e3b Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 22 Aug 2025 17:21:28 +0800 Subject: [PATCH 14/18] fix: test poll also do_io Signed-off-by: discord9 --- src/net/tcp/stream.rs | 4 ++-- tests/tcp_stream.rs | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 1a247b76f..0242542f6 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -212,14 +212,14 @@ impl TcpStream { /// Successive calls return the same data. This is accomplished by passing /// `MSG_PEEK` as a flag to the underlying recv system call. pub fn peek(&self, buf: &mut [u8]) -> io::Result { - #[cfg(windows)] + #[cfg(any(windows,mio_unsupported_force_poll_poll))] { // on windows need to manually reregister the socket // because `SockState::feed_event` simulate edge-triggered behavior // and need to reregister the socket to receive more events self.inner.do_io(|inner| inner.peek(buf)) } - #[cfg(not(windows))] + #[cfg(not(any(windows,mio_unsupported_force_poll_poll)))] { self.inner.peek(buf) } diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index d90a96f58..b1702a798 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -916,7 +916,6 @@ fn peek_ok() { ); } -#[cfg(not(mio_unsupported_force_poll_poll))] #[test] fn peek_would_block() { let mut buf = [0; 1]; From 057baab56c5faa6200a45d4cb55ebe0caf6135e9 Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 22 Aug 2025 17:33:45 +0800 Subject: [PATCH 15/18] chore: fmt Signed-off-by: discord9 --- src/net/tcp/stream.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 0242542f6..6a7991ce2 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -212,14 +212,14 @@ impl TcpStream { /// Successive calls return the same data. This is accomplished by passing /// `MSG_PEEK` as a flag to the underlying recv system call. pub fn peek(&self, buf: &mut [u8]) -> io::Result { - #[cfg(any(windows,mio_unsupported_force_poll_poll))] + #[cfg(any(windows, mio_unsupported_force_poll_poll))] { // on windows need to manually reregister the socket // because `SockState::feed_event` simulate edge-triggered behavior // and need to reregister the socket to receive more events self.inner.do_io(|inner| inner.peek(buf)) } - #[cfg(not(any(windows,mio_unsupported_force_poll_poll)))] + #[cfg(not(any(windows, mio_unsupported_force_poll_poll)))] { self.inner.peek(buf) } From 783d9b5f04ba91a77a2ec6f0bb2c044ae7339125 Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 22 Aug 2025 17:46:45 +0800 Subject: [PATCH 16/18] refactor: do_io peek on all platform Signed-off-by: discord9 --- src/net/tcp/stream.rs | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 6a7991ce2..cedb3e405 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -212,17 +212,10 @@ impl TcpStream { /// Successive calls return the same data. This is accomplished by passing /// `MSG_PEEK` as a flag to the underlying recv system call. pub fn peek(&self, buf: &mut [u8]) -> io::Result { - #[cfg(any(windows, mio_unsupported_force_poll_poll))] - { - // on windows need to manually reregister the socket - // because `SockState::feed_event` simulate edge-triggered behavior - // and need to reregister the socket to receive more events - self.inner.do_io(|inner| inner.peek(buf)) - } - #[cfg(not(any(windows, mio_unsupported_force_poll_poll)))] - { - self.inner.peek(buf) - } + // need to re-register if `peek` returns `WouldBlock` + // to ensure the socket will receive more events once it is ready again. + // This is done by calling `self.inner.do_io`. + self.inner.do_io(|inner| inner.peek(buf)) } /// Execute an I/O operation ensuring that the socket receives more events From 3cad5311c74c215dd52588bdb7ee5a2e9a5c6f1d Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 22 Aug 2025 19:06:06 +0800 Subject: [PATCH 17/18] test: read/peek would block on tcp stream Signed-off-by: discord9 --- tests/tcp_stream.rs | 64 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 56 insertions(+), 8 deletions(-) diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index b1702a798..122dcf53e 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -899,14 +899,7 @@ fn peek_ok() { assert_eq!(stream2.write(&[0]).unwrap(), 1); // peek multiple times until we get a byte - loop { - let res = stream1.peek(&mut buf); - match res { - Ok(1) => break, - Err(err) if err.kind() == io::ErrorKind::WouldBlock => continue, - _ => panic!("Unexpected error: {:?}", res), - } - } + peek_until_ok(&mut buf, &mut stream1, 1); // a successful peek shouldn't remove readable interest // so we should still get a readable event expect_events( @@ -916,6 +909,20 @@ fn peek_ok() { ); } +fn peek_until_ok(buf: &mut [u8; N], stream1: &mut TcpStream, expected: usize) { + loop { + let res = stream1.peek(buf); + match res { + Ok(x) => { + assert_eq!(x, expected); + break; + } + Err(err) if err.kind() == io::ErrorKind::WouldBlock => continue, + _ => panic!("Unexpected error: {:?}", res), + } + } +} + #[test] fn peek_would_block() { let mut buf = [0; 1]; @@ -956,3 +963,44 @@ fn peek_would_block() { vec![ExpectEvent::new(ID1, Readiness::READABLE)], ); } + +#[test] +fn read_peek_would_block() { + let mut buf = [0; 1]; + let (mut poll, mut events) = init_with_poll(); + + let listener = net::TcpListener::bind(any_local_address()).unwrap(); + let sockaddr = listener.local_addr().unwrap(); + let thread_handle = thread::spawn(move || listener.accept().unwrap()); + let stream1 = net::TcpStream::connect(sockaddr).unwrap(); + let (mut stream2, _) = thread_handle.join().unwrap(); + + stream1.set_nonblocking(true).unwrap(); + let mut stream1 = TcpStream::from_std(stream1); + + poll.registry() + .register(&mut stream1, ID1, Interest::READABLE) + .unwrap(); + + assert_would_block(stream1.read(&mut buf)); + + assert_eq!(stream2.write(&[0]).unwrap(), 1); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Readiness::READABLE)], + ); + + assert_eq!(stream1.read(&mut buf).unwrap(), 1); + + assert_would_block(stream1.peek(&mut buf)); + + assert_eq!(stream2.write(&[1]).unwrap(), 1); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(ID1, Readiness::READABLE)], + ); +} From 1f63f210406b72a2076980d57f2146767455ccc8 Mon Sep 17 00:00:00 2001 From: Thomas de Zeeuw Date: Thu, 28 Aug 2025 13:46:34 +0200 Subject: [PATCH 18/18] Remove two comments --- src/net/tcp/stream.rs | 3 +-- tests/tcp_stream.rs | 2 -- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index cedb3e405..0feb1656f 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -212,9 +212,8 @@ impl TcpStream { /// Successive calls return the same data. This is accomplished by passing /// `MSG_PEEK` as a flag to the underlying recv system call. pub fn peek(&self, buf: &mut [u8]) -> io::Result { - // need to re-register if `peek` returns `WouldBlock` + // Need to re-register if `peek` returns `WouldBlock` // to ensure the socket will receive more events once it is ready again. - // This is done by calling `self.inner.do_io`. self.inner.do_io(|inner| inner.peek(buf)) } diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index 122dcf53e..052c66d32 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -955,8 +955,6 @@ fn peek_would_block() { assert_eq!(stream2.write(&[0, 1, 2, 3]).unwrap(), 4); - // this panic with no event on windows if not re-register after would block peek - // becuase mio simulate edge-triggered behavior in `SockState::feed_event` and need reregister expect_events( &mut poll, &mut events,