Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/net/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,9 @@ impl TcpStream {
/// Successive calls return the same data. This is accomplished by passing
/// `MSG_PEEK` as a flag to the underlying recv system call.
pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.peek(buf)
// Need to re-register if `peek` returns `WouldBlock`
// to ensure the socket will receive more events once it is ready again.
self.inner.do_io(|inner| inner.peek(buf))
}

/// Execute an I/O operation ensuring that the socket receives more events
Expand Down
126 changes: 126 additions & 0 deletions tests/tcp_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -876,3 +876,129 @@ fn send_oob_data<S: AsRawFd>(stream: &S, data: &[u8]) -> io::Result<usize> {
}
}
}

#[test]
fn peek_ok() {
let mut buf = [0; 2];
let (mut poll, mut events) = init_with_poll();

let listener = net::TcpListener::bind(any_local_address()).unwrap();
let sockaddr = listener.local_addr().unwrap();
let thread_handle = thread::spawn(move || listener.accept().unwrap());
let stream1 = net::TcpStream::connect(sockaddr).unwrap();
let (mut stream2, _) = thread_handle.join().unwrap();

stream1.set_nonblocking(true).unwrap();
let mut stream1 = TcpStream::from_std(stream1);

poll.registry()
.register(&mut stream1, ID1, Interest::READABLE)
.unwrap();

expect_no_events(&mut poll, &mut events);

assert_eq!(stream2.write(&[0]).unwrap(), 1);
// peek multiple times until we get a byte
peek_until_ok(&mut buf, &mut stream1, 1);
// a successful peek shouldn't remove readable interest
// so we should still get a readable event
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Readiness::READABLE)],
);
}

fn peek_until_ok<const N: usize>(buf: &mut [u8; N], stream1: &mut TcpStream, expected: usize) {
loop {
let res = stream1.peek(buf);
match res {
Ok(x) => {
assert_eq!(x, expected);
break;
}
Err(err) if err.kind() == io::ErrorKind::WouldBlock => continue,
_ => panic!("Unexpected error: {:?}", res),
}
}
}

#[test]
fn peek_would_block() {
let mut buf = [0; 1];
let (mut poll, mut events) = init_with_poll();

let listener = net::TcpListener::bind(any_local_address()).unwrap();
let sockaddr = listener.local_addr().unwrap();
let thread_handle = thread::spawn(move || listener.accept().unwrap());
let stream1 = net::TcpStream::connect(sockaddr).unwrap();
let (mut stream2, _) = thread_handle.join().unwrap();

stream1.set_nonblocking(true).unwrap();
let mut stream1 = TcpStream::from_std(stream1);

poll.registry()
.register(&mut stream1, ID1, Interest::READABLE)
.unwrap();

expect_no_events(&mut poll, &mut events);

assert_eq!(stream2.write(&[0]).unwrap(), 1);
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Readiness::READABLE)],
);

assert_eq!(stream1.read(&mut buf).unwrap(), 1);
assert_would_block(stream1.peek(&mut buf));

assert_eq!(stream2.write(&[0, 1, 2, 3]).unwrap(), 4);

expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Readiness::READABLE)],
);
}

#[test]
fn read_peek_would_block() {
let mut buf = [0; 1];
let (mut poll, mut events) = init_with_poll();

let listener = net::TcpListener::bind(any_local_address()).unwrap();
let sockaddr = listener.local_addr().unwrap();
let thread_handle = thread::spawn(move || listener.accept().unwrap());
let stream1 = net::TcpStream::connect(sockaddr).unwrap();
let (mut stream2, _) = thread_handle.join().unwrap();

stream1.set_nonblocking(true).unwrap();
let mut stream1 = TcpStream::from_std(stream1);

poll.registry()
.register(&mut stream1, ID1, Interest::READABLE)
.unwrap();

assert_would_block(stream1.read(&mut buf));

assert_eq!(stream2.write(&[0]).unwrap(), 1);

expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Readiness::READABLE)],
);

assert_eq!(stream1.read(&mut buf).unwrap(), 1);

assert_would_block(stream1.peek(&mut buf));

assert_eq!(stream2.write(&[1]).unwrap(), 1);

expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Readiness::READABLE)],
);
}
2 changes: 2 additions & 0 deletions tests/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ impl From<Interest> for Readiness {
}
}

#[track_caller]
pub fn expect_events(poll: &mut Poll, events: &mut Events, mut expected: Vec<ExpectEvent>) {
// In a lot of calls we expect more then one event, but it could be that
// poll returns the first event only in a single call. To be a bit more
Expand Down Expand Up @@ -164,6 +165,7 @@ pub fn expect_events(poll: &mut Poll, events: &mut Events, mut expected: Vec<Exp
);
}

#[track_caller]
pub fn expect_no_events(poll: &mut Poll, events: &mut Events) {
poll.poll(events, Some(Duration::from_millis(50)))
.expect("unable to poll");
Expand Down
Loading