diff --git a/async-nats/src/connector.rs b/async-nats/src/connector.rs index 109d03b48..e88afd096 100644 --- a/async-nats/src/connector.rs +++ b/async-nats/src/connector.rs @@ -411,19 +411,18 @@ impl Handler { #[cfg(feature = "zstd")] if m4ss_zstd { use std::pin::Pin; - use std::task::{Context, Poll, Waker}; + use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite, BufReader, ReadBuf}; #[derive(Debug)] struct Maybe { item: Option, - waker: Option, } impl Maybe { fn new(item: Option) -> Self { - Self { item, waker: None } + Self { item } } fn take_item(&mut self) -> Option { @@ -431,10 +430,8 @@ impl Handler { } fn set_item(&mut self, item: T) { + debug_assert!(self.item.is_none()); self.item = Some(item); - if let Some(waker) = self.waker.take() { - waker.wake(); - } } } @@ -446,10 +443,7 @@ impl Handler { ) -> Poll> { match &mut self.item { Some(item) => Pin::new(item).poll_read(cx, buf), - None => { - self.waker = Some(cx.waker().clone()); - Poll::Pending - } + None => unreachable!(), } } } @@ -462,10 +456,7 @@ impl Handler { ) -> Poll> { match &mut self.item { Some(item) => Pin::new(item).poll_write(cx, buf), - None => { - self.waker = Some(cx.waker().clone()); - Poll::Pending - } + None => unreachable!(), } } @@ -475,18 +466,18 @@ impl Handler { ) -> Poll> { match &mut self.item { Some(item) => Pin::new(item).poll_flush(cx), - None => { - self.waker = Some(cx.waker().clone()); - Poll::Pending - } + None => unreachable!(), } } fn poll_shutdown( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, ) -> Poll> { - Poll::Ready(Ok(())) + match &mut self.item { + Some(item) => Pin::new(item).poll_shutdown(cx), + None => unreachable!(), + } } }