From 0942c473ce56163fdd1fbc62762f8164e3afa7bf Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Mon, 20 May 2024 20:46:16 +0200 Subject: [PATCH 1/8] Switch from fetch to batch in test Signed-off-by: Tomasz Pietrek --- async-nats/tests/jetstream_tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index 2e007c95a..f2821e016 100755 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -2375,7 +2375,7 @@ mod jetstream { .unwrap(); } - let mut iter = consumer.fetch().max_messages(100).messages().await.unwrap(); + let mut iter = consumer.batch().max_messages(10).messages().await.unwrap(); client.flush().await.unwrap(); tryhard::retry_fn(|| async { From c5a8c792ec2448863b452c250fdaaf3deb3d1727 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Tue, 21 May 2024 07:59:52 +0200 Subject: [PATCH 2/8] Fix compatibility test Signed-off-by: Tomasz Pietrek --- async-nats/tests/compatibility.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/async-nats/tests/compatibility.rs b/async-nats/tests/compatibility.rs index 9d6a7612a..0a657651a 100644 --- a/async-nats/tests/compatibility.rs +++ b/async-nats/tests/compatibility.rs @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#[cfg(feature = "compatibility_tests")] +// #[cfg(feature = "compatibility_tests")] mod compatibility { use futures::{pin_mut, stream::Peekable, StreamExt}; use ring::digest::{self, SHA256}; @@ -41,7 +41,12 @@ mod compatibility { .init(); let url = std::env::var("NATS_URL").unwrap_or_else(|_| "localhost:4222".to_string()); tracing::info!("staring client for object store tests at {}", url); - let client = async_nats::connect(url).await.unwrap(); + let client = async_nats::ConnectOptions::new() + .max_reconnects(10) + .retry_on_initial_connect() + .connect(&url) + .await + .unwrap(); let tests = client .subscribe("tests.object-store.>") @@ -422,7 +427,12 @@ mod compatibility { async fn service_core() { let url = std::env::var("NATS_URL").unwrap_or_else(|_| "localhost:4222".to_string()); tracing::info!("staring client for service tests at {}", url); - let client = async_nats::connect(url).await.unwrap(); + let client = async_nats::ConnectOptions::new() + .max_reconnects(10) + .retry_on_initial_connect() + .connect(&url) + .await + .unwrap(); let mut tests = client .subscribe("tests.service.core.>") From 9c6831b34c904df9957e9ff3d08dc3a256654c55 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Tue, 21 May 2024 11:18:13 +0200 Subject: [PATCH 3/8] Bump docker version Signed-off-by: Tomasz Pietrek --- async-nats/tests/configs/docker/Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/async-nats/tests/configs/docker/Dockerfile b/async-nats/tests/configs/docker/Dockerfile index 3203f9cbd..766975d26 100644 --- a/async-nats/tests/configs/docker/Dockerfile +++ b/async-nats/tests/configs/docker/Dockerfile @@ -1,7 +1,7 @@ -FROM rust:1.71.1 +FROM rust:1.78 WORKDIR /usr/src/nats.rs/async-nats ARG PROFILE=test COPY . /usr/src/nats.rs RUN cargo test --features compatibility_tests --no-run ENV NATS_URL=localhost:4222 -CMD cargo test --features compatibility_tests compatibility -- --nocapture \ No newline at end of file +CMD cargo test --features compatibility_tests compatibility -- --nocapture From 1fbb459c942afdbf13612321dee6c7a5ad9d97e5 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Tue, 21 May 2024 11:54:55 +0200 Subject: [PATCH 4/8] Bump Rust version Signed-off-by: Tomasz Pietrek --- async-nats/Cargo.toml | 2 +- async-nats/tests/compatibility.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/async-nats/Cargo.toml b/async-nats/Cargo.toml index d61882ea9..d24573d32 100644 --- a/async-nats/Cargo.toml +++ b/async-nats/Cargo.toml @@ -3,7 +3,7 @@ name = "async-nats" authors = ["Tomasz Pietrek ", "Casper Beyer "] version = "0.35.1" edition = "2021" -rust = "1.67.0" +rust = "1.74.0" description = "A async Rust NATS client" license = "Apache-2.0" documentation = "https://docs.rs/async-nats" diff --git a/async-nats/tests/compatibility.rs b/async-nats/tests/compatibility.rs index 0a657651a..e433fc30c 100644 --- a/async-nats/tests/compatibility.rs +++ b/async-nats/tests/compatibility.rs @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -// #[cfg(feature = "compatibility_tests")] +#[cfg(feature = "compatibility_tests")] mod compatibility { use futures::{pin_mut, stream::Peekable, StreamExt}; use ring::digest::{self, SHA256}; From 4c81c96e495ed171c3d96c99d6acf1759388bf95 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Mon, 27 May 2024 14:25:07 +0200 Subject: [PATCH 5/8] Protect `is_server_compatible` against empty version Signed-off-by: Tomasz Pietrek --- async-nats/src/client.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index d09618f0f..47ab987e8 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -115,6 +115,10 @@ impl Client { /// Returns true if the server version is compatible with the version components. /// + /// This has to be used with caution, as it is not guaranteed that the server + /// that client is connected to is the same version that the one that is + /// a JetStream meta/stream/consumer leader, especially across leafnodes. + /// /// # Examples /// /// ```no_run @@ -128,7 +132,10 @@ impl Client { pub fn is_server_compatible(&self, major: i64, minor: i64, patch: i64) -> bool { let info = self.server_info(); - let server_version_captures = VERSION_RE.captures(&info.version).unwrap(); + let server_version_captures = match VERSION_RE.captures(&info.version) { + Some(captures) => captures, + None => return false, + }; let server_major = server_version_captures .get(1) From 8c89006c0acc0766bcd12fadc53d998570a0341b Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Mon, 27 May 2024 14:25:42 +0200 Subject: [PATCH 6/8] Remove check for server compatibility from consumer create Checking server version is not a reliable way to know if given API call is supported for given context, as client might be connected to a leaf node, while the meta leader is a different version, part of different cluster. Signed-off-by: Tomasz Pietrek --- async-nats/src/jetstream/context.rs | 35 +++++++++-------------------- 1 file changed, 10 insertions(+), 25 deletions(-) diff --git a/async-nats/src/jetstream/context.rs b/async-nats/src/jetstream/context.rs index 2b3b2aed5..15721432a 100644 --- a/async-nats/src/jetstream/context.rs +++ b/async-nats/src/jetstream/context.rs @@ -1003,32 +1003,17 @@ impl Context { let config = config.into_consumer_config(); let subject = { - if self.client.is_server_compatible(2, 9, 0) { - let filter = if config.filter_subject.is_empty() { - "".to_string() - } else { - format!(".{}", config.filter_subject) - }; - config - .name - .as_ref() - .or(config.durable_name.as_ref()) - .map(|name| format!("CONSUMER.CREATE.{}.{}{}", stream.as_ref(), name, filter)) - .unwrap_or_else(|| format!("CONSUMER.CREATE.{}", stream.as_ref())) - } else if config.name.is_some() { - return Err(ConsumerError::with_source( - ConsumerErrorKind::Other, - "can't use consumer name with server < 2.9.0", - )); - } else if let Some(ref durable_name) = config.durable_name { - format!( - "CONSUMER.DURABLE.CREATE.{}.{}", - stream.as_ref(), - durable_name - ) + let filter = if config.filter_subject.is_empty() { + "".to_string() } else { - format!("CONSUMER.CREATE.{}", stream.as_ref()) - } + format!(".{}", config.filter_subject) + }; + config + .name + .as_ref() + .or(config.durable_name.as_ref()) + .map(|name| format!("CONSUMER.CREATE.{}.{}{}", stream.as_ref(), name, filter)) + .unwrap_or_else(|| format!("CONSUMER.CREATE.{}", stream.as_ref())) }; match self From afd776af34d53d0be49cca159cf77bd062242ff3 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Mon, 27 May 2024 14:34:47 +0200 Subject: [PATCH 7/8] Add test against panic When retry on initial connect was used, some JetStream consumer calls could panic in specific conditions because server version was not available. This test makes sure it will no longer happen. Signed-off-by: Tomasz Pietrek --- async-nats/tests/jetstream_tests.rs | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index f2821e016..b1588852f 100755 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -3633,4 +3633,25 @@ mod jetstream { assert_eq!(err.kind(), ConsumerUpdateErrorKind::DoesNotExist); } + + #[tokio::test] + async fn test_version_on_initial_connect() { + let client = async_nats::ConnectOptions::new() + .retry_on_initial_connect() + .connect("nats://localhost:4222") + .await + .unwrap(); + let jetstream = async_nats::jetstream::new(client.clone()); + + jetstream + .create_consumer_on_stream( + consumer::pull::Config { + durable_name: Some("name".to_string()), + ..Default::default() + }, + "events", + ) + .await + .expect_err("should fail but not panic because of lack of server info"); + } } From f86dce921fad45ece9f06e9ddbf72a6ef085125a Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Mon, 27 May 2024 14:59:28 +0200 Subject: [PATCH 8/8] Update dictionary Signed-off-by: Tomasz Pietrek --- .config/nats.dic | 1 + 1 file changed, 1 insertion(+) diff --git a/.config/nats.dic b/.config/nats.dic index 8cb2484a3..fa6496d6d 100644 --- a/.config/nats.dic +++ b/.config/nats.dic @@ -153,3 +153,4 @@ update_consumer update_consumer_on_stream create_consumer_strict create_consumer_strict_on_stream +leafnodes