From 56af85d1724450f8d8944f2734acf6d6e5aecfb1 Mon Sep 17 00:00:00 2001 From: Dave Rigby Date: Thu, 3 Sep 2020 13:09:58 +0100 Subject: [PATCH] MB-41279: Fix data race on Stream::snap_start,snap_end,start seqnos The following data race was observed on Stream member variables: Running [0056/0099]: test full rollback on consumer...================== WARNING: ThreadSanitizer: data race (pid=43827) Write of size 8 at 0x7b540001ef60 by thread T23: #0 PassiveStream::reconnectStream(...) ../kv_engine/engines/ep/src/dcp/passive_stream.cc:240 (libep.so+0x00000012eed6) #1 DcpConsumer::doRollback(unsigned int, Vbid, unsigned long) ../kv_engine/engines/ep/src/dcp/consumer.cc:1107 (libep.so+0x0000001116d3) #2 RollbackTask::run() ../kv_engine/engines/ep/src/dcp/consumer.cc:938 (libep.so+0x000000111871) #3 GlobalTask::execute() ../kv_engine/engines/ep/src/globaltask.cc:73 (libep.so+0x0000002363cb) #4 CB3ExecutorThread::run() ../kv_engine/engines/ep/src/cb3_executorthread.cc:174 (libep.so+0x00000009f7a1) #5 launch_executor_thread ../kv_engine/engines/ep/src/cb3_executorthread.cc:34 (libep.so+0x00000009fded) #6 CouchbaseThread::run() ../platform/src/cb_pthreads.cc:58 (libplatform_so.so.0.1.0+0x000000010e1b) #7 platform_thread_wrap ../platform/src/cb_pthreads.cc:71 (libplatform_so.so.0.1.0+0x000000010e1b) #8 (libtsan.so.0+0x000000024feb) Previous read of size 8 at 0x7b540001ef60 by thread T22 (mutexes: write M639646524455782464): #0 void StatCollector::addStat<...>(...) ../kv_engine/include/statistics/collector.h:343 (libep.so+0x0000000e5e03) #1 void add_casted_stat<>(...) ../kv_engine/include/statistics/collector.h:404 (libep.so+0x0000000e5e03) #2 Stream::addStats(...) ../kv_engine/engines/ep/src/dcp/stream.cc:157 (libep.so+0x00000016b1b4) #3 PassiveStream::addStats(...) ../kv_engine/engines/ep/src/dcp/passive_stream.cc:1058 (libep.so+0x000000133a98) #4 DcpConsumer::addStats(...) ../kv_engine/engines/ep/src/dcp/consumer.cc:1140 (libep.so+0x000000115415) #5 ConnStatBuilder::operator()(std::shared_ptr) ../kv_engine/engines/ep/src/ep_engine.cc:3784 (libep.so+0x0000001f3f9a) #6 void DcpConnMap::each(ConnStatBuilder&) ../kv_engine/engines/ep/src/dcp/dcpconnmap_impl.h:33 (libep.so+0x0000001f3f9a) #7 EventuallyPersistentEngine::doDcpStats(...) ../kv_engine/engines/ep/src/ep_engine.cc:3943 (libep.so+0x0000001d9cd0) #8 EventuallyPersistentEngine::getStats(...) ../kv_engine/engines/ep/src/ep_engine.cc:4675 (libep.so+0x0000001dd67b) #9 KVBucket::snapshotStats() ../kv_engine/engines/ep/src/kv_bucket.cc:1168 (libep.so+0x000000261e32) #10 StatSnap::run() ../kv_engine/engines/ep/src/tasks.cc:72 (libep.so+0x0000002b76a9) #11 GlobalTask::execute() ../kv_engine/engines/ep/src/globaltask.cc:73 (libep.so+0x0000002363cb) #12 CB3ExecutorThread::run() ../kv_engine/engines/ep/src/cb3_executorthread.cc:174 (libep.so+0x00000009f7a1) #13 launch_executor_thread ../kv_engine/engines/ep/src/cb3_executorthread.cc:34 (libep.so+0x00000009fded) #14 CouchbaseThread::run() ../platform/src/cb_pthreads.cc:58 (libplatform_so.so.0.1.0+0x000000010e1b) #15 platform_thread_wrap ../platform/src/cb_pthreads.cc:71 (libplatform_so.so.0.1.0+0x000000010e1b) #16 (libtsan.so.0+0x000000024feb) Fix by acquiring the stream mutex before accessing these. Change-Id: Ie14eb02e8e98c0aa5c9432c6f385766a215eca8f Reviewed-on: http://review.couchbase.org/c/kv_engine/+/135531 Tested-by: Build Bot Reviewed-by: James Harrison Reviewed-by: Paolo Cocchi --- engines/ep/src/dcp/passive_stream.cc | 31 ++++++++++++++-------------- engines/ep/src/dcp/stream.cc | 9 +++----- 2 files changed, 19 insertions(+), 21 deletions(-) diff --git a/engines/ep/src/dcp/passive_stream.cc b/engines/ep/src/dcp/passive_stream.cc index 871cc3cc84..4f1f485751 100644 --- a/engines/ep/src/dcp/passive_stream.cc +++ b/engines/ep/src/dcp/passive_stream.cc @@ -237,27 +237,28 @@ void PassiveStream::reconnectStream(VBucketPtr& vb, info.range.setStart(info.start); } - snap_start_seqno_ = info.range.getStart(); - start_seqno_ = info.start; - snap_end_seqno_ = info.range.getEnd(); - auto stream_req_value = createStreamReqValue(); - log(spdlog::level::level_enum::info, - "({}) Attempting to reconnect stream with opaque {}, start seq " - "no {}, end seq no {}, snap start seqno {}, snap end seqno {}, and vb" - " manifest uid {}", - vb_, - new_opaque, - start_seqno, - end_seqno_, - snap_start_seqno_, - snap_end_seqno_, - stream_req_value.empty() ? "none" : stream_req_value); { LockHolder lh(streamMutex); + + snap_start_seqno_ = info.range.getStart(); + start_seqno_ = info.start; + snap_end_seqno_ = info.range.getEnd(); last_seqno.store(start_seqno); + log(spdlog::level::level_enum::info, + "({}) Attempting to reconnect stream with opaque {}, start seq " + "no {}, end seq no {}, snap start seqno {}, snap end seqno {}, and " + "vb manifest uid {}", + vb_, + new_opaque, + start_seqno, + end_seqno_, + snap_start_seqno_, + snap_end_seqno_, + stream_req_value.empty() ? "none" : stream_req_value); + pushToReadyQ(std::make_unique(vb_, new_opaque, flags_, diff --git a/engines/ep/src/dcp/stream.cc b/engines/ep/src/dcp/stream.cc index e5d117b79a..38c56f9292 100644 --- a/engines/ep/src/dcp/stream.cc +++ b/engines/ep/src/dcp/stream.cc @@ -123,6 +123,8 @@ uint64_t Stream::getReadyQueueMemory() { void Stream::addStats(const AddStatFn& add_stat, const void* c) { try { + LockHolder lh(streamMutex); + const int bsize = 1024; char buffer[bsize]; checked_snprintf( @@ -173,17 +175,12 @@ void Stream::addStats(const AddStatFn& add_stat, const void* c) { vb_.get()); add_casted_stat(buffer, itemsReady.load(), add_stat, c); - size_t readyQsize; - { - std::lock_guard lh(streamMutex); - readyQsize = readyQ.size(); - } checked_snprintf(buffer, bsize, "%s:stream_%d_readyQ_items", name_.c_str(), vb_.get()); - add_casted_stat(buffer, readyQsize, add_stat, c); + add_casted_stat(buffer, readyQ.size(), add_stat, c); } catch (std::exception& error) { EP_LOG_WARN("Stream::addStats: Failed to build stats: {}", error.what());