Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions nativelink-store/src/redis_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1143,7 +1143,7 @@ impl RedisPatternSubscriber for MockPubSub {
_channel_pattern: &str,
) -> impl Future<Output = RedisResult<Pin<Box<dyn Stream<Item = Msg> + '_ + Send>>>> + Send
{
async move { Ok(stream::empty().boxed()) }
async move { Ok(stream::pending().boxed()) }
}
}

Expand All @@ -1162,7 +1162,7 @@ impl RedisSubscriptionManager {
let mut local_subscriber_channel: Pin<Box<dyn Stream<Item = PushInfo> + Send>> =
subscriber_channel
.and_then(|channel| Some(UnboundedReceiverStream::new(channel).boxed()))
.unwrap_or_else(|| stream::empty::<PushInfo>().boxed());
.unwrap_or_else(|| stream::pending::<PushInfo>().boxed());
Self {
subscribed_keys,
tx_for_test,
Expand Down
23 changes: 22 additions & 1 deletion nativelink-store/tests/redis_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use core::ops::RangeBounds;
use core::time::Duration;
use std::collections::HashMap;

use bytes::{Bytes, BytesMut};
Expand All @@ -27,7 +28,7 @@ use nativelink_redis_tester::{
use nativelink_store::cas_utils::ZERO_BYTE_DIGESTS;
use nativelink_store::redis_store::{
DEFAULT_MAX_CHUNK_UPLOADS_PER_UPDATE, DEFAULT_MAX_COUNT_PER_CURSOR, LUA_VERSION_SET_SCRIPT,
RedisStore,
RedisStore, RedisSubscriptionManager,
};
use nativelink_util::buf_channel::make_buf_channel_pair;
use nativelink_util::common::DigestInfo;
Expand All @@ -39,6 +40,7 @@ use nativelink_util::store_trait::{
use pretty_assertions::assert_eq;
use redis::{RedisError, Value};
use redis_test::{MockCmd, MockRedisConnection};
use tokio::time::sleep;
use tracing::{Instrument, info, info_span};

const VALID_HASH1: &str = "3031323334353637383961626364656630303030303030303030303030303030";
Expand Down Expand Up @@ -1169,3 +1171,22 @@ fn test_search_by_index_resp3() -> Result<(), Error> {

Ok(())
}

#[nativelink_test]
async fn no_items_from_none_subscription_channel() -> Result<(), Error> {
let subscription_manager =
RedisSubscriptionManager::new(MockPubSub::new(), None, "test_pub_sub".into());

// To give the stream enough time to get polled
sleep(Duration::from_secs(1)).await;

assert!(!logs_contain(
"Error receiving message in RedisSubscriptionManager from subscriber_channel"
));
assert!(!logs_contain("ERROR"));

// Because otherwise it gets dropped immediately, and we need it to live to do things
drop(subscription_manager);

Ok(())
}
Loading