Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
paolobarbolini committed May 20, 2024
2 parents 55f2965 + ec6331c commit 9f2a1e7
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 14 deletions.
50 changes: 50 additions & 0 deletions async-nats/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,53 @@
# 0.35.1
## Overview

This release fixes broken docs build caused by `fips`, which cannot be built in restricted docs.rs environment.

## What's Changed
* Fix docs.rs build by @Jarema in https://github.com/nats-io/nats.rs/pull/1259
* Preserve case of server error messages by @oscarwcl in https://github.com/nats-io/nats.rs/pull/1258


**Full Changelog**: https://github.com/nats-io/nats.rs/compare/async-nats/v0.35.0...async-nats/v0.35.1

# 0.35.0

## Overview

This release makes tls setup more flexible, leveraging rustls v0.23 and allowing to pick crypto backend:
* ring
* aws-lc-rs
* fips

Some other highlights:
* force reconnect via `force_reconnect` method
* explicit create/update consumer API

Thank you for all your contributions!

## Added
* Add `ToServerAddrs` impl for array/vector of strings by @mmalek in https://github.com/nats-io/nats.rs/pull/1231
* Add public constructor for Acker by @AbstractiveNord in https://github.com/nats-io/nats.rs/pull/1232
* Add force reconnect by @Jarema in https://github.com/nats-io/nats.rs/pull/1240
* Add features check by @Jarema in https://github.com/nats-io/nats.rs/pull/1247
* Add stream placement by @Jarema in https://github.com/nats-io/nats.rs/pull/1250
* Add consumer action by @Jarema in https://github.com/nats-io/nats.rs/pull/1254
* Add support for aws-lc-rs (rustls v0.23.0) by @paolobarbolini in https://github.com/nats-io/nats.rs/pull/1222

## Fixed
* Use last header value for JetStream messages by @Jarema in https://github.com/nats-io/nats.rs/pull/1239

## Changed
* Wrap inbox prefix in an `Arc` by @thomastaylor312 in https://github.com/nats-io/nats.rs/pull/1236
* Document feature flags by @Jarema in https://github.com/nats-io/nats.rs/pull/1246
* Don't force flush if write buffer isn't empty by @paolobarbolini in https://github.com/nats-io/nats.rs/pull/1241

## New Contributors
* @mmalek made their first contribution in https://github.com/nats-io/nats.rs/pull/1231

**Full Changelog**: https://github.com/nats-io/nats.rs/compare/async-nats/v0.34.0...async-nats/v0.35.0


# 0.34.0

## Overview
Expand Down
5 changes: 3 additions & 2 deletions async-nats/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "async-nats"
authors = ["Tomasz Pietrek <[email protected]>", "Casper Beyer <[email protected]>"]
version = "0.34.0"
version = "0.35.1"
edition = "2021"
rust = "1.67.0"
description = "A async Rust NATS client"
Expand Down Expand Up @@ -84,5 +84,6 @@ harness = false
lto = true

[package.metadata.docs.rs]
all-features = true
# We can't use `all-features` because the `fips` doesn't compile in restricted docs.rs environment.
features = ["server_2_10", "service", "experimental", "ring", "aws-lc-rs"]
rustdoc-args = ["--cfg", "docsrs"]
28 changes: 28 additions & 0 deletions async-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@
//! for _ in 0..10 {
//! client.publish(subject, data.clone()).await?;
//! }
//!
//! // Flush internal buffer before exiting to make sure all messages are sent
//! client.flush().await?;
//!
//! # Ok(())
//! # }
//! ```
Expand Down Expand Up @@ -1381,6 +1385,30 @@ pub enum ServerError {
Other(String),
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub enum ClientError {
Other(String),
MaxReconnects,
}
impl std::fmt::Display for ClientError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Other(error) => write!(f, "nats: {error}"),
Self::MaxReconnects => write!(f, "nats: max reconnects reached"),
}
}
}

impl ServerError {
fn new(error: String) -> ServerError {
match error.to_lowercase().as_str() {
"authorization violation" => ServerError::AuthorizationViolation,
// error messages can contain case-sensitive values which should be preserved
_ => ServerError::Other(error),
}
}
}

impl std::fmt::Display for ServerError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Expand Down
1 change: 0 additions & 1 deletion async-nats/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use tokio_rustls::rustls;
/// # }
/// ```
pub struct ConnectOptions {
// pub(crate) auth: AuthStyle,
pub(crate) name: Option<String>,
pub(crate) no_echo: bool,
pub(crate) max_reconnects: Option<usize>,
Expand Down
47 changes: 36 additions & 11 deletions async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2378,30 +2378,55 @@ mod jetstream {
let mut iter = consumer.fetch().max_messages(100).messages().await.unwrap();
client.flush().await.unwrap();

// TODO: when rtt() is available, use it here.
tokio::time::sleep(Duration::from_millis(400)).await;
let info = consumer.info().await.unwrap();
assert_eq!(info.num_ack_pending, 10);
tryhard::retry_fn(|| async {
let mut consumer = consumer.clone();
let num_ack_pending = consumer.info().await?.num_ack_pending;
if num_ack_pending != 10 {
return Err(format!("expected {}, got {}", 10, num_ack_pending).into());
}
Ok::<(), async_nats::Error>(())
})
.retries(10)
.exponential_backoff(Duration::from_millis(500))
.await
.unwrap();

// standard ack
if let Some(message) = iter.next().await {
message.unwrap().ack().await.unwrap();
}
client.flush().await.unwrap();

tokio::time::sleep(Duration::from_millis(100)).await;
let info = consumer.info().await.unwrap();
assert_eq!(info.num_ack_pending, 9);
tryhard::retry_fn(|| async {
let mut consumer = consumer.clone();
let num_ack_pending = consumer.info().await?.num_ack_pending;
if num_ack_pending != 9 {
return Err(format!("expected {}, got {}", 9, num_ack_pending).into());
}
Ok::<(), async_nats::Error>(())
})
.retries(10)
.exponential_backoff(Duration::from_millis(500))
.await
.unwrap();

// double ack
if let Some(message) = iter.next().await {
message.unwrap().double_ack().await.unwrap();
}
client.flush().await.unwrap();

tokio::time::sleep(Duration::from_millis(500)).await;
let info = consumer.info().await.unwrap();
assert_eq!(info.num_ack_pending, 8);
tryhard::retry_fn(|| async {
let mut consumer = consumer.clone();
let num_ack_pending = consumer.info().await?.num_ack_pending;
if num_ack_pending != 8 {
return Err(format!("expected {}, got {}", 8, num_ack_pending).into());
}
Ok::<(), async_nats::Error>(())
})
.retries(10)
.exponential_backoff(Duration::from_millis(500))
.await
.unwrap();

// in progress
if let Some(message) = iter.next().await {
Expand Down

0 comments on commit 9f2a1e7

Please sign in to comment.