Skip to content

Commit

Permalink
MB-41279: Fix data race on Stream::snap_start,snap_end,start seqnos
Browse files Browse the repository at this point in the history
The following data race was observed on Stream member variables:

     Running [0056/0099]: test full rollback on consumer...==================
     WARNING: ThreadSanitizer: data race (pid=43827)
       Write of size 8 at 0x7b540001ef60 by thread T23:
         #0 PassiveStream::reconnectStream(...) ../kv_engine/engines/ep/src/dcp/passive_stream.cc:240 (libep.so+0x00000012eed6)
         #1 DcpConsumer::doRollback(unsigned int, Vbid, unsigned long) ../kv_engine/engines/ep/src/dcp/consumer.cc:1107 (libep.so+0x0000001116d3)
         #2 RollbackTask::run() ../kv_engine/engines/ep/src/dcp/consumer.cc:938 (libep.so+0x000000111871)
         #3 GlobalTask::execute() ../kv_engine/engines/ep/src/globaltask.cc:73 (libep.so+0x0000002363cb)
         #4 CB3ExecutorThread::run() ../kv_engine/engines/ep/src/cb3_executorthread.cc:174 (libep.so+0x00000009f7a1)
         #5 launch_executor_thread ../kv_engine/engines/ep/src/cb3_executorthread.cc:34 (libep.so+0x00000009fded)
         #6 CouchbaseThread::run() ../platform/src/cb_pthreads.cc:58 (libplatform_so.so.0.1.0+0x000000010e1b)
         #7 platform_thread_wrap ../platform/src/cb_pthreads.cc:71 (libplatform_so.so.0.1.0+0x000000010e1b)
         #8 <null> <null> (libtsan.so.0+0x000000024feb)

       Previous read of size 8 at 0x7b540001ef60 by thread T22 (mutexes: write M639646524455782464):
         #0 void StatCollector::addStat<...>(...) ../kv_engine/include/statistics/collector.h:343 (libep.so+0x0000000e5e03)
         #1 void add_casted_stat<>(...) ../kv_engine/include/statistics/collector.h:404 (libep.so+0x0000000e5e03)
         #2 Stream::addStats(...) ../kv_engine/engines/ep/src/dcp/stream.cc:157 (libep.so+0x00000016b1b4)
         #3 PassiveStream::addStats(...) ../kv_engine/engines/ep/src/dcp/passive_stream.cc:1058 (libep.so+0x000000133a98)
         #4 DcpConsumer::addStats(...) ../kv_engine/engines/ep/src/dcp/consumer.cc:1140 (libep.so+0x000000115415)
         #5 ConnStatBuilder::operator()(std::shared_ptr<ConnHandler>) ../kv_engine/engines/ep/src/ep_engine.cc:3784 (libep.so+0x0000001f3f9a)
         #6 void DcpConnMap::each<ConnStatBuilder&>(ConnStatBuilder&) ../kv_engine/engines/ep/src/dcp/dcpconnmap_impl.h:33 (libep.so+0x0000001f3f9a)
         #7 EventuallyPersistentEngine::doDcpStats(...) ../kv_engine/engines/ep/src/ep_engine.cc:3943 (libep.so+0x0000001d9cd0)
         #8 EventuallyPersistentEngine::getStats(...) ../kv_engine/engines/ep/src/ep_engine.cc:4675 (libep.so+0x0000001dd67b)
         #9 KVBucket::snapshotStats() ../kv_engine/engines/ep/src/kv_bucket.cc:1168 (libep.so+0x000000261e32)
         #10 StatSnap::run() ../kv_engine/engines/ep/src/tasks.cc:72 (libep.so+0x0000002b76a9)
         #11 GlobalTask::execute() ../kv_engine/engines/ep/src/globaltask.cc:73 (libep.so+0x0000002363cb)
         #12 CB3ExecutorThread::run() ../kv_engine/engines/ep/src/cb3_executorthread.cc:174 (libep.so+0x00000009f7a1)
         #13 launch_executor_thread ../kv_engine/engines/ep/src/cb3_executorthread.cc:34 (libep.so+0x00000009fded)
         #14 CouchbaseThread::run() ../platform/src/cb_pthreads.cc:58 (libplatform_so.so.0.1.0+0x000000010e1b)
         #15 platform_thread_wrap ../platform/src/cb_pthreads.cc:71 (libplatform_so.so.0.1.0+0x000000010e1b)
    #16 <null> <null> (libtsan.so.0+0x000000024feb)

Fix by acquiring the stream mutex before accessing these.

Change-Id: Ie14eb02e8e98c0aa5c9432c6f385766a215eca8f
Reviewed-on: http://review.couchbase.org/c/kv_engine/+/135531
Tested-by: Build Bot <[email protected]>
Reviewed-by: James Harrison <[email protected]>
Reviewed-by: Paolo Cocchi <[email protected]>
  • Loading branch information
daverigby committed Sep 4, 2020
1 parent 56c501a commit 56af85d
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 21 deletions.
31 changes: 16 additions & 15 deletions engines/ep/src/dcp/passive_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,27 +237,28 @@ void PassiveStream::reconnectStream(VBucketPtr& vb,
info.range.setStart(info.start);
}

snap_start_seqno_ = info.range.getStart();
start_seqno_ = info.start;
snap_end_seqno_ = info.range.getEnd();

auto stream_req_value = createStreamReqValue();

log(spdlog::level::level_enum::info,
"({}) Attempting to reconnect stream with opaque {}, start seq "
"no {}, end seq no {}, snap start seqno {}, snap end seqno {}, and vb"
" manifest uid {}",
vb_,
new_opaque,
start_seqno,
end_seqno_,
snap_start_seqno_,
snap_end_seqno_,
stream_req_value.empty() ? "none" : stream_req_value);
{
LockHolder lh(streamMutex);

snap_start_seqno_ = info.range.getStart();
start_seqno_ = info.start;
snap_end_seqno_ = info.range.getEnd();
last_seqno.store(start_seqno);

log(spdlog::level::level_enum::info,
"({}) Attempting to reconnect stream with opaque {}, start seq "
"no {}, end seq no {}, snap start seqno {}, snap end seqno {}, and "
"vb manifest uid {}",
vb_,
new_opaque,
start_seqno,
end_seqno_,
snap_start_seqno_,
snap_end_seqno_,
stream_req_value.empty() ? "none" : stream_req_value);

pushToReadyQ(std::make_unique<StreamRequest>(vb_,
new_opaque,
flags_,
Expand Down
9 changes: 3 additions & 6 deletions engines/ep/src/dcp/stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ uint64_t Stream::getReadyQueueMemory() {

void Stream::addStats(const AddStatFn& add_stat, const void* c) {
try {
LockHolder lh(streamMutex);

const int bsize = 1024;
char buffer[bsize];
checked_snprintf(
Expand Down Expand Up @@ -173,17 +175,12 @@ void Stream::addStats(const AddStatFn& add_stat, const void* c) {
vb_.get());
add_casted_stat(buffer, itemsReady.load(), add_stat, c);

size_t readyQsize;
{
std::lock_guard<std::mutex> lh(streamMutex);
readyQsize = readyQ.size();
}
checked_snprintf(buffer,
bsize,
"%s:stream_%d_readyQ_items",
name_.c_str(),
vb_.get());
add_casted_stat(buffer, readyQsize, add_stat, c);
add_casted_stat(buffer, readyQ.size(), add_stat, c);
} catch (std::exception& error) {
EP_LOG_WARN("Stream::addStats: Failed to build stats: {}",
error.what());
Expand Down

0 comments on commit 56af85d

Please sign in to comment.