diff --git a/async-nats/CHANGELOG.md b/async-nats/CHANGELOG.md index 2547bccb7..ee30690f3 100644 --- a/async-nats/CHANGELOG.md +++ b/async-nats/CHANGELOG.md @@ -1,3 +1,53 @@ +# 0.35.1 +## Overview + +This release fixes broken docs build caused by `fips`, which cannot be built in restricted docs.rs environment. + +## What's Changed +* Fix docs.rs build by @Jarema in https://github.com/nats-io/nats.rs/pull/1259 +* Preserve case of server error messages by @oscarwcl in https://github.com/nats-io/nats.rs/pull/1258 + + +**Full Changelog**: https://github.com/nats-io/nats.rs/compare/async-nats/v0.35.0...async-nats/v0.35.1 + +# 0.35.0 + +## Overview + +This release makes tls setup more flexible, leveraging rustls v0.23 and allowing to pick crypto backend: +* ring +* aws-lc-rs +* fips + +Some other highlights: +* force reconnect via `force_reconnect` method +* explicit create/update consumer API + +Thank you for all your contributions! + +## Added +* Add `ToServerAddrs` impl for array/vector of strings by @mmalek in https://github.com/nats-io/nats.rs/pull/1231 +* Add public constructor for Acker by @AbstractiveNord in https://github.com/nats-io/nats.rs/pull/1232 +* Add force reconnect by @Jarema in https://github.com/nats-io/nats.rs/pull/1240 +* Add features check by @Jarema in https://github.com/nats-io/nats.rs/pull/1247 +* Add stream placement by @Jarema in https://github.com/nats-io/nats.rs/pull/1250 +* Add consumer action by @Jarema in https://github.com/nats-io/nats.rs/pull/1254 +* Add support for aws-lc-rs (rustls v0.23.0) by @paolobarbolini in https://github.com/nats-io/nats.rs/pull/1222 + +## Fixed +* Use last header value for JetStream messages by @Jarema in https://github.com/nats-io/nats.rs/pull/1239 + +## Changed +* Wrap inbox prefix in an `Arc` by @thomastaylor312 in https://github.com/nats-io/nats.rs/pull/1236 +* Document feature flags by @Jarema in https://github.com/nats-io/nats.rs/pull/1246 +* Don't force flush if write buffer isn't empty by @paolobarbolini in https://github.com/nats-io/nats.rs/pull/1241 + +## New Contributors +* @mmalek made their first contribution in https://github.com/nats-io/nats.rs/pull/1231 + +**Full Changelog**: https://github.com/nats-io/nats.rs/compare/async-nats/v0.34.0...async-nats/v0.35.0 + + # 0.34.0 ## Overview diff --git a/async-nats/Cargo.toml b/async-nats/Cargo.toml index 551df784d..31035755f 100644 --- a/async-nats/Cargo.toml +++ b/async-nats/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "async-nats" authors = ["Tomasz Pietrek ", "Casper Beyer "] -version = "0.34.0" +version = "0.35.1" edition = "2021" rust = "1.67.0" description = "A async Rust NATS client" @@ -84,5 +84,6 @@ harness = false lto = true [package.metadata.docs.rs] -all-features = true +# We can't use `all-features` because the `fips` doesn't compile in restricted docs.rs environment. +features = ["server_2_10", "service", "experimental", "ring", "aws-lc-rs"] rustdoc-args = ["--cfg", "docsrs"] diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index d5a5fd9d5..3aa1ea1d1 100755 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -84,6 +84,10 @@ //! for _ in 0..10 { //! client.publish(subject, data.clone()).await?; //! } +//! +//! // Flush internal buffer before exiting to make sure all messages are sent +//! client.flush().await?; +//! //! # Ok(()) //! # } //! ``` @@ -1381,6 +1385,30 @@ pub enum ServerError { Other(String), } +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum ClientError { + Other(String), + MaxReconnects, +} +impl std::fmt::Display for ClientError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Other(error) => write!(f, "nats: {error}"), + Self::MaxReconnects => write!(f, "nats: max reconnects reached"), + } + } +} + +impl ServerError { + fn new(error: String) -> ServerError { + match error.to_lowercase().as_str() { + "authorization violation" => ServerError::AuthorizationViolation, + // error messages can contain case-sensitive values which should be preserved + _ => ServerError::Other(error), + } + } +} + impl std::fmt::Display for ServerError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { diff --git a/async-nats/src/options.rs b/async-nats/src/options.rs index b21f27ada..7eefa33d5 100644 --- a/async-nats/src/options.rs +++ b/async-nats/src/options.rs @@ -42,7 +42,6 @@ use tokio_rustls::rustls; /// # } /// ``` pub struct ConnectOptions { - // pub(crate) auth: AuthStyle, pub(crate) name: Option, pub(crate) no_echo: bool, pub(crate) max_reconnects: Option, diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index 0f56c5c30..2e007c95a 100755 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -2378,10 +2378,18 @@ mod jetstream { let mut iter = consumer.fetch().max_messages(100).messages().await.unwrap(); client.flush().await.unwrap(); - // TODO: when rtt() is available, use it here. - tokio::time::sleep(Duration::from_millis(400)).await; - let info = consumer.info().await.unwrap(); - assert_eq!(info.num_ack_pending, 10); + tryhard::retry_fn(|| async { + let mut consumer = consumer.clone(); + let num_ack_pending = consumer.info().await?.num_ack_pending; + if num_ack_pending != 10 { + return Err(format!("expected {}, got {}", 10, num_ack_pending).into()); + } + Ok::<(), async_nats::Error>(()) + }) + .retries(10) + .exponential_backoff(Duration::from_millis(500)) + .await + .unwrap(); // standard ack if let Some(message) = iter.next().await { @@ -2389,19 +2397,36 @@ mod jetstream { } client.flush().await.unwrap(); - tokio::time::sleep(Duration::from_millis(100)).await; - let info = consumer.info().await.unwrap(); - assert_eq!(info.num_ack_pending, 9); + tryhard::retry_fn(|| async { + let mut consumer = consumer.clone(); + let num_ack_pending = consumer.info().await?.num_ack_pending; + if num_ack_pending != 9 { + return Err(format!("expected {}, got {}", 9, num_ack_pending).into()); + } + Ok::<(), async_nats::Error>(()) + }) + .retries(10) + .exponential_backoff(Duration::from_millis(500)) + .await + .unwrap(); // double ack if let Some(message) = iter.next().await { message.unwrap().double_ack().await.unwrap(); } - client.flush().await.unwrap(); - tokio::time::sleep(Duration::from_millis(500)).await; - let info = consumer.info().await.unwrap(); - assert_eq!(info.num_ack_pending, 8); + tryhard::retry_fn(|| async { + let mut consumer = consumer.clone(); + let num_ack_pending = consumer.info().await?.num_ack_pending; + if num_ack_pending != 8 { + return Err(format!("expected {}, got {}", 8, num_ack_pending).into()); + } + Ok::<(), async_nats::Error>(()) + }) + .retries(10) + .exponential_backoff(Duration::from_millis(500)) + .await + .unwrap(); // in progress if let Some(message) = iter.next().await {