Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
paolobarbolini committed Aug 5, 2024
2 parents 6b73d56 + e9919a0 commit ae4f224
Show file tree
Hide file tree
Showing 12 changed files with 184 additions and 16 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4 # v4 uses caching of Go out of the box
with:
go-version: '1.20'
go-version: '1.22'
- name: Install nats-server
run: go install github.com/nats-io/nats-server/v2@main

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ jobs:

- name: Install msrv Rust on ubuntu-latest
id: install-rust
uses: dtolnay/rust-toolchain@1.67.0
uses: dtolnay/rust-toolchain@1.70.0
- name: Cache the build artifacts
uses: Swatinem/rust-cache@v2
with:
Expand Down
2 changes: 1 addition & 1 deletion async-nats/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ tokio-rustls = { version = "0.26", default-features = false }
rustls-pemfile = "2"
nuid = "0.5"
serde_nanos = "0.1.3"
time = { version = "0.3.24", features = ["parsing", "formatting", "serde", "serde-well-known"] }
time = { version = "0.3.36", features = ["parsing", "formatting", "serde", "serde-well-known"] }
rustls-native-certs = "0.7"
tracing = "0.1"
thiserror = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion async-nats/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub(crate) fn reconnect_delay_callback_default(attempts: usize) -> Duration {
if attempts <= 1 {
Duration::from_millis(0)
} else {
let exp: u32 = (attempts - 1).try_into().unwrap_or(std::u32::MAX);
let exp: u32 = (attempts - 1).try_into().unwrap_or(u32::MAX);
let max = Duration::from_secs(4);
cmp::min(Duration::from_millis(2_u64.saturating_pow(exp)), max)
}
Expand Down
48 changes: 47 additions & 1 deletion async-nats/src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,12 @@ impl From<&str> for HeaderValue {
}
}

impl From<String> for HeaderValue {
fn from(inner: String) -> Self {
Self { inner }
}
}

impl HeaderValue {
pub fn new() -> Self {
HeaderValue::default()
Expand Down Expand Up @@ -382,6 +388,14 @@ impl IntoHeaderName for &str {
}
}

impl IntoHeaderName for String {
fn into_header_name(self) -> HeaderName {
HeaderName {
inner: HeaderRepr::Custom(self.into()),
}
}
}

impl IntoHeaderName for HeaderName {
fn into_header_name(self) -> HeaderName {
self
Expand All @@ -400,6 +414,12 @@ impl IntoHeaderValue for &str {
}
}

impl IntoHeaderValue for String {
fn into_header_value(self) -> HeaderValue {
HeaderValue { inner: self }
}
}

impl IntoHeaderValue for HeaderValue {
fn into_header_value(self) -> HeaderValue {
self
Expand Down Expand Up @@ -650,7 +670,7 @@ impl std::error::Error for ParseHeaderNameError {}

#[cfg(test)]
mod tests {
use super::{HeaderMap, HeaderName, HeaderValue};
use super::{HeaderMap, HeaderName, HeaderValue, IntoHeaderName, IntoHeaderValue};
use std::str::{from_utf8, FromStr};

#[test]
Expand Down Expand Up @@ -814,4 +834,30 @@ mod tests {
header
);
}

#[test]
fn header_name_from_string() {
let string = "NATS-Stream".to_string();
let name = string.into_header_name();

assert_eq!("NATS-Stream", name.as_str());
}

#[test]
fn header_value_from_string_with_trait() {
let string = "some value".to_string();

let value = string.into_header_value();

assert_eq!("some value", value.as_str());
}

#[test]
fn header_value_from_string() {
let string = "some value".to_string();

let value: HeaderValue = string.into();

assert_eq!("some value", value.as_str());
}
}
77 changes: 75 additions & 2 deletions async-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ use crate::jetstream::account::Account;
use crate::jetstream::publish::PublishAck;
use crate::jetstream::response::Response;
use crate::subject::ToSubject;
use crate::{header, Client, Command, HeaderMap, HeaderValue, Message, StatusCode};
use crate::{
header, is_valid_subject, Client, Command, HeaderMap, HeaderValue, Message, StatusCode,
};
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::{Future, TryFutureExt};
use futures::{Future, StreamExt, TryFutureExt};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde_json::{self, json};
Expand Down Expand Up @@ -497,6 +499,52 @@ impl Context {
}
}

/// Looks up Stream that contains provided subject.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use futures::TryStreamExt;
/// let client = async_nats::connect("demo.nats.io:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
/// let stream_name = jetstream.stream_by_subject("foo.>");
/// # Ok(())
/// # }
/// ```
pub async fn stream_by_subject<T: Into<String>>(
&self,
subject: T,
) -> Result<String, GetStreamByNameError> {
let subject = subject.into();
if !is_valid_subject(subject.as_str()) {
return Err(GetStreamByNameError::new(
GetStreamByNameErrorKind::InvalidSubject,
));
}
let mut names = StreamNames {
context: self.clone(),
offset: 0,
page_request: None,
streams: Vec::new(),
subject: Some(subject),
done: false,
};
match names.next().await {
Some(name) => match name {
Ok(name) => Ok(name),
Err(err) => Err(GetStreamByNameError::with_source(
GetStreamByNameErrorKind::Request,
err,
)),
},
None => Err(GetStreamByNameError::new(
GetStreamByNameErrorKind::NotFound,
)),
}
}

/// Lists names of all streams for current context.
///
/// # Examples
Expand All @@ -520,6 +568,7 @@ impl Context {
offset: 0,
page_request: None,
streams: Vec::new(),
subject: None,
done: false,
}
}
Expand Down Expand Up @@ -1115,6 +1164,7 @@ impl Context {
description: config.description.clone(),
subjects: vec![chunk_subject, meta_subject],
max_age: config.max_age,
max_bytes: config.max_bytes,
storage: config.storage,
num_replicas: config.num_replicas,
discard: DiscardPolicy::New,
Expand Down Expand Up @@ -1291,6 +1341,7 @@ pub struct StreamNames {
context: Context,
offset: usize,
page_request: Option<PageRequest>,
subject: Option<String>,
streams: Vec<String>,
done: bool,
}
Expand Down Expand Up @@ -1333,12 +1384,14 @@ impl futures::Stream for StreamNames {
}
let context = self.context.clone();
let offset = self.offset;
let subject = self.subject.clone();
self.page_request = Some(Box::pin(async move {
match context
.request(
"STREAM.NAMES",
&json!({
"offset": offset,
"subject": subject
}),
)
.await?
Expand Down Expand Up @@ -1605,7 +1658,27 @@ impl Display for GetStreamErrorKind {
}
}

#[derive(Clone, Debug, PartialEq)]
pub enum GetStreamByNameErrorKind {
Request,
NotFound,
InvalidSubject,
JetStream(super::errors::Error),
}

impl Display for GetStreamByNameErrorKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Request => write!(f, "request error"),
Self::NotFound => write!(f, "stream not found"),
Self::InvalidSubject => write!(f, "invalid subject"),
Self::JetStream(err) => write!(f, "jetstream error: {}", err),
}
}
}

pub type GetStreamError = Error<GetStreamErrorKind>;
pub type GetStreamByNameError = Error<GetStreamByNameErrorKind>;

pub type UpdateStreamError = CreateStreamError;
pub type UpdateStreamErrorKind = CreateStreamErrorKind;
Expand Down
2 changes: 2 additions & 0 deletions async-nats/src/jetstream/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ pub struct Config {
/// Maximum age of any value in the bucket, expressed in nanoseconds
#[serde(default, with = "serde_nanos")]
pub max_age: Duration,
/// How large the storage bucket may become in total bytes.
pub max_bytes: i64,
/// The type of storage backend, `File` (default) and `Memory`
pub storage: StorageType,
/// How many replicas to keep for each value in a cluster, maximum 5.
Expand Down
31 changes: 26 additions & 5 deletions async-nats/src/jetstream/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1111,14 +1111,17 @@ fn default_consumer_limits_as_none<'de, D>(
where
D: Deserializer<'de>,
{
let consumer_limits = ConsumerLimits::deserialize(deserializer)?;
if consumer_limits == ConsumerLimits::default() {
Ok(None)
let consumer_limits = Option::<ConsumerLimits>::deserialize(deserializer)?;
if let Some(cl) = consumer_limits {
if cl == ConsumerLimits::default() {
Ok(None)
} else {
Ok(Some(cl))
}
} else {
Ok(Some(consumer_limits))
Ok(None)
}
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Default)]
pub struct ConsumerLimits {
/// Sets the maximum [crate::jetstream::consumer::Config::inactive_threshold] that can be set on the consumer.
Expand Down Expand Up @@ -2052,3 +2055,21 @@ impl From<super::context::RequestError> for ConsumerCreateStrictError {
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn consumer_limits_de() {
let config = Config {
..Default::default()
};

let roundtrip: Config = {
let ser = serde_json::to_string(&config).unwrap();
serde_json::from_str(&ser).unwrap()
};
assert_eq!(config, roundtrip);
}
}
6 changes: 4 additions & 2 deletions async-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1669,9 +1669,11 @@ impl<T: ToServerAddrs + ?Sized> ToServerAddrs for &T {
}

pub(crate) fn is_valid_subject<T: AsRef<str>>(subject: T) -> bool {
!subject.as_ref().contains([' ', '.', '\r', '\n'])
let subject_str = subject.as_ref();
!subject_str.starts_with('.')
&& !subject_str.ends_with('.')
&& subject_str.bytes().all(|c| !c.is_ascii_whitespace())
}

macro_rules! from_with_timeout {
($t:ty, $k:ty, $origin: ty, $origin_kind: ty) => {
impl From<$origin> for $t {
Expand Down
2 changes: 1 addition & 1 deletion async-nats/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ impl ConnectOptions {

/// Sets the capacity for `Subscribers`. Exceeding it will trigger `slow consumer` error
/// callback and drop messages.
/// Default is set to 1024 messages buffer.
/// Default is set to 65536 messages buffer.
///
/// # Examples
/// ```no_run
Expand Down
25 changes: 24 additions & 1 deletion async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ mod jetstream {
self, push, AckPolicy, DeliverPolicy, Info, OrderedPullConsumer, OrderedPushConsumer,
PullConsumer, PushConsumer, ReplayPolicy,
};
use async_nats::jetstream::context::{Publish, PublishErrorKind};
use async_nats::jetstream::context::{GetStreamByNameErrorKind, Publish, PublishErrorKind};
use async_nats::jetstream::response::Response;
use async_nats::jetstream::stream::{
self, ConsumerCreateStrictErrorKind, ConsumerUpdateErrorKind, DiscardPolicy, StorageType,
Expand Down Expand Up @@ -3646,4 +3646,27 @@ mod jetstream {
.await
.expect_err("should fail but not panic because of lack of server info");
}

#[tokio::test]
async fn test_stream_by_subject() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();

let jetstream = async_nats::jetstream::new(client);

let stream = jetstream
.create_stream(stream::Config {
name: "events".to_string(),
subjects: vec!["events.>".to_string()],
..Default::default()
})
.await
.unwrap();

let stream_name = jetstream.stream_by_subject("events.>").await.unwrap();
assert_eq!(stream_name, stream.cached_info().config.name);

let err = jetstream.stream_by_subject("foo").await.unwrap_err();
assert_eq!(err.kind(), GetStreamByNameErrorKind::NotFound);
}
}
1 change: 1 addition & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ allow = [
"MIT",
"Apache-2.0",
"ISC",
"BSD-2-Clause",
"BSD-3-Clause",
"OpenSSL",
"Unicode-DFS-2016",
Expand Down

0 comments on commit ae4f224

Please sign in to comment.