diff --git a/nativelink-store/src/redis_store.rs b/nativelink-store/src/redis_store.rs index 98ff80c42..590605429 100644 --- a/nativelink-store/src/redis_store.rs +++ b/nativelink-store/src/redis_store.rs @@ -1143,7 +1143,7 @@ impl RedisPatternSubscriber for MockPubSub { _channel_pattern: &str, ) -> impl Future + '_ + Send>>>> + Send { - async move { Ok(stream::empty().boxed()) } + async move { Ok(stream::pending().boxed()) } } } @@ -1162,7 +1162,7 @@ impl RedisSubscriptionManager { let mut local_subscriber_channel: Pin + Send>> = subscriber_channel .and_then(|channel| Some(UnboundedReceiverStream::new(channel).boxed())) - .unwrap_or_else(|| stream::empty::().boxed()); + .unwrap_or_else(|| stream::pending::().boxed()); Self { subscribed_keys, tx_for_test, diff --git a/nativelink-store/tests/redis_store_test.rs b/nativelink-store/tests/redis_store_test.rs index 13ee2395b..4d558b416 100644 --- a/nativelink-store/tests/redis_store_test.rs +++ b/nativelink-store/tests/redis_store_test.rs @@ -13,6 +13,7 @@ // limitations under the License. use core::ops::RangeBounds; +use core::time::Duration; use std::collections::HashMap; use bytes::{Bytes, BytesMut}; @@ -27,7 +28,7 @@ use nativelink_redis_tester::{ use nativelink_store::cas_utils::ZERO_BYTE_DIGESTS; use nativelink_store::redis_store::{ DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE, DEFAULT_MAX_COUNT_PER_CURSOR, LUA_VERSION_SET_SCRIPT, - RedisStore, + RedisStore, RedisSubscriptionManager, }; use nativelink_util::buf_channel::make_buf_channel_pair; use nativelink_util::common::DigestInfo; @@ -39,6 +40,7 @@ use nativelink_util::store_trait::{ use pretty_assertions::assert_eq; use redis::{RedisError, Value}; use redis_test::{MockCmd, MockRedisConnection}; +use tokio::time::sleep; use tracing::{Instrument, info, info_span}; const VALID_HASH1: &str = "3031323334353637383961626364656630303030303030303030303030303030"; @@ -1169,3 +1171,22 @@ fn test_search_by_index_resp3() -> Result<(), Error> { Ok(()) } + +#[nativelink_test] +async fn no_items_from_none_subscription_channel() -> Result<(), Error> { + let subscription_manager = + RedisSubscriptionManager::new(MockPubSub::new(), None, "test_pub_sub".into()); + + // To give the stream enough time to get polled + sleep(Duration::from_secs(1)).await; + + assert!(!logs_contain( + "Error receiving message in RedisSubscriptionManager from subscriber_channel" + )); + assert!(!logs_contain("ERROR")); + + // Because otherwise it gets dropped immediately, and we need it to live to do things + drop(subscription_manager); + + Ok(()) +}