diff --git a/engines/ep/src/dcp/passive_stream.cc b/engines/ep/src/dcp/passive_stream.cc index 871cc3cc84..4f1f485751 100644 --- a/engines/ep/src/dcp/passive_stream.cc +++ b/engines/ep/src/dcp/passive_stream.cc @@ -237,27 +237,28 @@ void PassiveStream::reconnectStream(VBucketPtr& vb, info.range.setStart(info.start); } - snap_start_seqno_ = info.range.getStart(); - start_seqno_ = info.start; - snap_end_seqno_ = info.range.getEnd(); - auto stream_req_value = createStreamReqValue(); - log(spdlog::level::level_enum::info, - "({}) Attempting to reconnect stream with opaque {}, start seq " - "no {}, end seq no {}, snap start seqno {}, snap end seqno {}, and vb" - " manifest uid {}", - vb_, - new_opaque, - start_seqno, - end_seqno_, - snap_start_seqno_, - snap_end_seqno_, - stream_req_value.empty() ? "none" : stream_req_value); { LockHolder lh(streamMutex); + + snap_start_seqno_ = info.range.getStart(); + start_seqno_ = info.start; + snap_end_seqno_ = info.range.getEnd(); last_seqno.store(start_seqno); + log(spdlog::level::level_enum::info, + "({}) Attempting to reconnect stream with opaque {}, start seq " + "no {}, end seq no {}, snap start seqno {}, snap end seqno {}, and " + "vb manifest uid {}", + vb_, + new_opaque, + start_seqno, + end_seqno_, + snap_start_seqno_, + snap_end_seqno_, + stream_req_value.empty() ? "none" : stream_req_value); + pushToReadyQ(std::make_unique(vb_, new_opaque, flags_, diff --git a/engines/ep/src/dcp/stream.cc b/engines/ep/src/dcp/stream.cc index e5d117b79a..38c56f9292 100644 --- a/engines/ep/src/dcp/stream.cc +++ b/engines/ep/src/dcp/stream.cc @@ -123,6 +123,8 @@ uint64_t Stream::getReadyQueueMemory() { void Stream::addStats(const AddStatFn& add_stat, const void* c) { try { + LockHolder lh(streamMutex); + const int bsize = 1024; char buffer[bsize]; checked_snprintf( @@ -173,17 +175,12 @@ void Stream::addStats(const AddStatFn& add_stat, const void* c) { vb_.get()); add_casted_stat(buffer, itemsReady.load(), add_stat, c); - size_t readyQsize; - { - std::lock_guard lh(streamMutex); - readyQsize = readyQ.size(); - } checked_snprintf(buffer, bsize, "%s:stream_%d_readyQ_items", name_.c_str(), vb_.get()); - add_casted_stat(buffer, readyQsize, add_stat, c); + add_casted_stat(buffer, readyQ.size(), add_stat, c); } catch (std::exception& error) { EP_LOG_WARN("Stream::addStats: Failed to build stats: {}", error.what());