diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc index 0103b5910f8..7f22cd1acbe 100644 --- a/src/server/redis_connection.cc +++ b/src/server/redis_connection.cc @@ -192,19 +192,21 @@ bool Connection::CanMigrate() const { } void Connection::SubscribeChannel(const std::string &channel) { + std::string ns_channel = ComposeNamespaceKey(ns_, channel, srv_->storage->IsSlotIdEncoded()); for (const auto &chan : subscribe_channels_) { - if (channel == chan) return; + if (ns_channel == chan) return; } - subscribe_channels_.emplace_back(channel); - owner_->srv->SubscribeChannel(channel, this); + subscribe_channels_.emplace_back(ns_channel); + owner_->srv->SubscribeChannel(ns_channel, this); } void Connection::UnsubscribeChannel(const std::string &channel) { + std::string ns_channel = ComposeNamespaceKey(ns_, channel, srv_->storage->IsSlotIdEncoded()); for (auto iter = subscribe_channels_.begin(); iter != subscribe_channels_.end(); iter++) { - if (*iter == channel) { + if (*iter == ns_channel) { subscribe_channels_.erase(iter); - owner_->srv->UnsubscribeChannel(channel, this); + owner_->srv->UnsubscribeChannel(ns_channel, this); return; } } @@ -230,18 +232,20 @@ void Connection::UnsubscribeAll(const UnsubscribeCallback &reply) { int Connection::SubscriptionsCount() { return static_cast(subscribe_channels_.size()); } void Connection::PSubscribeChannel(const std::string &pattern) { + std::string ns_pattern = ComposeNamespaceKey(ns_, pattern, srv_->storage->IsSlotIdEncoded()); for (const auto &p : subscribe_patterns_) { - if (pattern == p) return; + if (ns_pattern == p) return; } - subscribe_patterns_.emplace_back(pattern); - owner_->srv->PSubscribeChannel(pattern, this); + subscribe_patterns_.emplace_back(ns_pattern); + owner_->srv->PSubscribeChannel(ns_pattern, this); } void Connection::PUnsubscribeChannel(const std::string &pattern) { + std::string ns_pattern = ComposeNamespaceKey(ns_, pattern, srv_->storage->IsSlotIdEncoded()); for (auto iter = subscribe_patterns_.begin(); iter != subscribe_patterns_.end(); iter++) { - if (*iter == pattern) { + if (*iter == ns_pattern) { subscribe_patterns_.erase(iter); - owner_->srv->PUnsubscribeChannel(pattern, this); + owner_->srv->PUnsubscribeChannel(ns_pattern, this); return; } } @@ -267,19 +271,21 @@ void Connection::PUnsubscribeAll(const UnsubscribeCallback &reply) { int Connection::PSubscriptionsCount() { return static_cast(subscribe_patterns_.size()); } void Connection::SSubscribeChannel(const std::string &channel, uint16_t slot) { + std::string ns_channel = ComposeNamespaceKey(ns_, channel, srv_->storage->IsSlotIdEncoded()); for (const auto &chan : subscribe_shard_channels_) { - if (channel == chan) return; + if (ns_channel == chan) return; } - subscribe_shard_channels_.emplace_back(channel); - owner_->srv->SSubscribeChannel(channel, this, slot); + subscribe_shard_channels_.emplace_back(ns_channel); + owner_->srv->SSubscribeChannel(ns_channel, this, slot); } void Connection::SUnsubscribeChannel(const std::string &channel, uint16_t slot) { + std::string ns_channel = ComposeNamespaceKey(ns_, channel, srv_->storage->IsSlotIdEncoded()); for (auto iter = subscribe_shard_channels_.begin(); iter != subscribe_shard_channels_.end(); iter++) { - if (*iter == channel) { + if (*iter == ns_channel) { subscribe_shard_channels_.erase(iter); - owner_->srv->SUnsubscribeChannel(channel, this, slot); + owner_->srv->SUnsubscribeChannel(ns_channel, this, slot); return; } }