Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
paolobarbolini committed Jun 3, 2024
2 parents 9f2a1e7 + f86dce9 commit f49133d
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 32 deletions.
1 change: 1 addition & 0 deletions .config/nats.dic
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,4 @@ update_consumer
update_consumer_on_stream
create_consumer_strict
create_consumer_strict_on_stream
leafnodes
2 changes: 1 addition & 1 deletion async-nats/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "async-nats"
authors = ["Tomasz Pietrek <[email protected]>", "Casper Beyer <[email protected]>"]
version = "0.35.1"
edition = "2021"
rust = "1.67.0"
rust = "1.74.0"
description = "A async Rust NATS client"
license = "Apache-2.0"
documentation = "https://docs.rs/async-nats"
Expand Down
9 changes: 8 additions & 1 deletion async-nats/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ impl Client {

/// Returns true if the server version is compatible with the version components.
///
/// This has to be used with caution, as it is not guaranteed that the server
/// that client is connected to is the same version that the one that is
/// a JetStream meta/stream/consumer leader, especially across leafnodes.
///
/// # Examples
///
/// ```no_run
Expand All @@ -160,7 +164,10 @@ impl Client {
pub fn is_server_compatible(&self, major: i64, minor: i64, patch: i64) -> bool {
let info = self.server_info();

let server_version_captures = VERSION_RE.captures(&info.version).unwrap();
let server_version_captures = match VERSION_RE.captures(&info.version) {
Some(captures) => captures,
None => return false,
};

let server_major = server_version_captures
.get(1)
Expand Down
35 changes: 10 additions & 25 deletions async-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1003,32 +1003,17 @@ impl Context {
let config = config.into_consumer_config();

let subject = {
if self.client.is_server_compatible(2, 9, 0) {
let filter = if config.filter_subject.is_empty() {
"".to_string()
} else {
format!(".{}", config.filter_subject)
};
config
.name
.as_ref()
.or(config.durable_name.as_ref())
.map(|name| format!("CONSUMER.CREATE.{}.{}{}", stream.as_ref(), name, filter))
.unwrap_or_else(|| format!("CONSUMER.CREATE.{}", stream.as_ref()))
} else if config.name.is_some() {
return Err(ConsumerError::with_source(
ConsumerErrorKind::Other,
"can't use consumer name with server < 2.9.0",
));
} else if let Some(ref durable_name) = config.durable_name {
format!(
"CONSUMER.DURABLE.CREATE.{}.{}",
stream.as_ref(),
durable_name
)
let filter = if config.filter_subject.is_empty() {
"".to_string()
} else {
format!("CONSUMER.CREATE.{}", stream.as_ref())
}
format!(".{}", config.filter_subject)
};
config
.name
.as_ref()
.or(config.durable_name.as_ref())
.map(|name| format!("CONSUMER.CREATE.{}.{}{}", stream.as_ref(), name, filter))
.unwrap_or_else(|| format!("CONSUMER.CREATE.{}", stream.as_ref()))
};

match self
Expand Down
14 changes: 12 additions & 2 deletions async-nats/tests/compatibility.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ mod compatibility {
.init();
let url = std::env::var("NATS_URL").unwrap_or_else(|_| "localhost:4222".to_string());
tracing::info!("staring client for object store tests at {}", url);
let client = async_nats::connect(url).await.unwrap();
let client = async_nats::ConnectOptions::new()
.max_reconnects(10)
.retry_on_initial_connect()
.connect(&url)
.await
.unwrap();

let tests = client
.subscribe("tests.object-store.>")
Expand Down Expand Up @@ -425,7 +430,12 @@ mod compatibility {
async fn service_core() {
let url = std::env::var("NATS_URL").unwrap_or_else(|_| "localhost:4222".to_string());
tracing::info!("staring client for service tests at {}", url);
let client = async_nats::connect(url).await.unwrap();
let client = async_nats::ConnectOptions::new()
.max_reconnects(10)
.retry_on_initial_connect()
.connect(&url)
.await
.unwrap();

let mut tests = client
.subscribe("tests.service.core.>")
Expand Down
4 changes: 2 additions & 2 deletions async-nats/tests/configs/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM rust:1.71.1
FROM rust:1.78
WORKDIR /usr/src/nats.rs/async-nats
ARG PROFILE=test
COPY . /usr/src/nats.rs
RUN cargo test --features compatibility_tests --no-run
ENV NATS_URL=localhost:4222
CMD cargo test --features compatibility_tests compatibility -- --nocapture
CMD cargo test --features compatibility_tests compatibility -- --nocapture
23 changes: 22 additions & 1 deletion async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2375,7 +2375,7 @@ mod jetstream {
.unwrap();
}

let mut iter = consumer.fetch().max_messages(100).messages().await.unwrap();
let mut iter = consumer.batch().max_messages(10).messages().await.unwrap();
client.flush().await.unwrap();

tryhard::retry_fn(|| async {
Expand Down Expand Up @@ -3633,4 +3633,25 @@ mod jetstream {

assert_eq!(err.kind(), ConsumerUpdateErrorKind::DoesNotExist);
}

#[tokio::test]
async fn test_version_on_initial_connect() {
let client = async_nats::ConnectOptions::new()
.retry_on_initial_connect()
.connect("nats://localhost:4222")
.await
.unwrap();
let jetstream = async_nats::jetstream::new(client.clone());

jetstream
.create_consumer_on_stream(
consumer::pull::Config {
durable_name: Some("name".to_string()),
..Default::default()
},
"events",
)
.await
.expect_err("should fail but not panic because of lack of server info");
}
}

0 comments on commit f49133d

Please sign in to comment.